【初心者向け】プログラミングの「デッドロック」とは?

プログラミングのデッドロック(相互排他状態)を初心者向けに解説。発生原因、具体例、回避方法、対処法を分かりやすく詳しく紹介します

Learning Next 運営
54 分で読めます

みなさん、プログラムが動かなくなって「固まった」状態になった経験はありませんか?

特に複数の処理を同時に行うプログラムで、なぜか処理が進まず、強制終了するしかない状況に遭遇したことがあるかもしれませんね。

その原因の一つが「デッドロック」という現象です。 この記事では、プログラミング初心者の方でも理解できるように、デッドロックの仕組みから対処法まで詳しく解説します。

デッドロックとは?

基本的な概念

デッドロックとは、複数の処理が互いに相手の処理の完了を待ち続けて、全体が停止してしまう状態のことです。

簡単に言うと、「お互いが相手を待っているうちに、誰も動けなくなる」状況です。

日常生活での例

狭い廊下での対面

状況:狭い廊下で二人が向かい合った
Aさん:「Bさんが避けてくれるまで待とう」
Bさん:「Aさんが避けてくれるまで待とう」
結果:二人とも動かず、通路がブロックされる

これがデッドロックの基本的な概念です。

プログラミングでのデッドロック

基本的な仕組み

プログラミングでは、複数のスレッド(処理の流れ)がリソース(データや機能)を取り合う時にデッドロックが発生します。

デッドロックの発生条件

  1. 相互排他: リソースを同時に使えない
  2. 保持と待機: リソースを持ちながら他のリソースを待つ
  3. 非横取り: 他者のリソースを強制的に取れない
  4. 循環待機: 待機が循環状態になる

簡単な例

import threading
import time
# 共有リソース(ロック)
lock1 = threading.Lock()
lock2 = threading.Lock()
def worker1():
"""ワーカー1の処理"""
print("ワーカー1: ロック1を取得しようとしています...")
with lock1:
print("ワーカー1: ロック1を取得しました")
time.sleep(1) # 何らかの処理
print("ワーカー1: ロック2を取得しようとしています...")
with lock2: # ここでワーカー2を待つ
print("ワーカー1: ロック2を取得しました")
print("ワーカー1: 処理完了")
def worker2():
"""ワーカー2の処理"""
print("ワーカー2: ロック2を取得しようとしています...")
with lock2:
print("ワーカー2: ロック2を取得しました")
time.sleep(1) # 何らかの処理
print("ワーカー2: ロック1を取得しようとしています...")
with lock1: # ここでワーカー1を待つ
print("ワーカー2: ロック1を取得しました")
print("ワーカー2: 処理完了")
# デッドロックが発生する実行
thread1 = threading.Thread(target=worker1)
thread2 = threading.Thread(target=worker2)
thread1.start()
thread2.start()
thread1.join()
thread2.join()

この例では、ワーカー1がロック1を持ちながらロック2を待ち、ワーカー2がロック2を持ちながらロック1を待つため、デッドロックが発生します。

デッドロックの具体例

データベースでのデッドロック

典型的なパターン

import sqlite3
import threading
import time
class BankSystem:
def __init__(self):
self.connection = sqlite3.connect('bank.db', check_same_thread=False)
self.setup_database()
def setup_database(self):
"""データベースの初期化"""
cursor = self.connection.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS accounts (
id INTEGER PRIMARY KEY,
name TEXT,
balance INTEGER
)
''')
cursor.execute('DELETE FROM accounts') # テスト用にクリア
cursor.execute('INSERT INTO accounts (id, name, balance) VALUES (1, "Alice", 1000)')
cursor.execute('INSERT INTO accounts (id, name, balance) VALUES (2, "Bob", 1000)')
self.connection.commit()
def transfer_money(self, from_account, to_account, amount):
"""お金の送金(デッドロックが発生する可能性)"""
cursor = self.connection.cursor()
try:
# トランザクション開始
cursor.execute('BEGIN TRANSACTION')
# 送金元の残高をロック
cursor.execute(
'SELECT balance FROM accounts WHERE id = ? FOR UPDATE',
(from_account,)
)
from_balance = cursor.fetchone()[0]
print(f"スレッド{threading.current_thread().name}: アカウント{from_account}をロック")
time.sleep(0.1) # 他のスレッドがロックを取る時間を作る
# 送金先の残高をロック(ここでデッドロック発生の可能性)
cursor.execute(
'SELECT balance FROM accounts WHERE id = ? FOR UPDATE',
(to_account,)
)
to_balance = cursor.fetchone()[0]
print(f"スレッド{threading.current_thread().name}: アカウント{to_account}をロック")
# 残高チェック
if from_balance >= amount:
# 送金実行
cursor.execute(
'UPDATE accounts SET balance = balance - ? WHERE id = ?',
(amount, from_account)
)
cursor.execute(
'UPDATE accounts SET balance = balance + ? WHERE id = ?',
(amount, to_account)
)
print(f"送金完了: {from_account}{to_account} ({amount}円)")
else:
print(f"送金失敗: 残高不足")
cursor.execute('COMMIT')
except Exception as e:
cursor.execute('ROLLBACK')
print(f"エラー: {e}")
def transfer_alice_to_bob():
"""AliceからBobへの送金"""
bank.transfer_money(1, 2, 100)
def transfer_bob_to_alice():
"""BobからAliceへの送金"""
bank.transfer_money(2, 1, 100)
# デッドロックが発生する実行例
bank = BankSystem()
# 同時に逆方向の送金を実行
thread1 = threading.Thread(target=transfer_alice_to_bob, name="T1")
thread2 = threading.Thread(target=transfer_bob_to_alice, name="T2")
thread1.start()
thread2.start()
thread1.join()
thread2.join()

この例では、スレッド1がアカウント1をロックしてアカウント2のロックを待ち、スレッド2がアカウント2をロックしてアカウント1のロックを待つため、デッドロックが発生します。

ファイルアクセスでのデッドロック

複数ファイルの同時処理

import threading
import time
class FileManager:
def __init__(self):
self.file_locks = {
'file1.txt': threading.Lock(),
'file2.txt': threading.Lock()
}
self.file_contents = {
'file1.txt': "File 1 contents",
'file2.txt': "File 2 contents"
}
def read_file(self, filename):
"""ファイル読み込み"""
with self.file_locks[filename]:
print(f"スレッド{threading.current_thread().name}: {filename}を読み込み中...")
time.sleep(0.1)
return self.file_contents[filename]
def write_file(self, filename, content):
"""ファイル書き込み"""
with self.file_locks[filename]:
print(f"スレッド{threading.current_thread().name}: {filename}に書き込み中...")
time.sleep(0.1)
self.file_contents[filename] = content
def copy_files(self, source, destination):
"""ファイルコピー(デッドロックの可能性)"""
print(f"スレッド{threading.current_thread().name}: {source}{destination} コピー開始")
# ソースファイルをロック
with self.file_locks[source]:
print(f"スレッド{threading.current_thread().name}: {source}をロック")
content = self.file_contents[source]
time.sleep(0.1) # 他のスレッドがロックを取る時間
# デスティネーションファイルをロック(デッドロック発生ポイント)
with self.file_locks[destination]:
print(f"スレッド{threading.current_thread().name}: {destination}をロック")
self.file_contents[destination] = content
print(f"スレッド{threading.current_thread().name}: コピー完了")
def copy_1_to_2():
"""ファイル1からファイル2へのコピー"""
file_manager.copy_files('file1.txt', 'file2.txt')
def copy_2_to_1():
"""ファイル2からファイル1へのコピー"""
file_manager.copy_files('file2.txt', 'file1.txt')
# デッドロック発生例
file_manager = FileManager()
thread1 = threading.Thread(target=copy_1_to_2, name="T1")
thread2 = threading.Thread(target=copy_2_to_1, name="T2")
thread1.start()
thread2.start()
thread1.join()
thread2.join()

リソース競合でのデッドロック

プリンターとスキャナーの例

import threading
import time
class Office:
def __init__(self):
self.printer_lock = threading.Lock()
self.scanner_lock = threading.Lock()
def use_printer(self, document):
"""プリンター使用"""
with self.printer_lock:
print(f"スレッド{threading.current_thread().name}: プリンターで{document}を印刷中...")
time.sleep(1)
print(f"スレッド{threading.current_thread().name}: {document}の印刷完了")
def use_scanner(self, document):
"""スキャナー使用"""
with self.scanner_lock:
print(f"スレッド{threading.current_thread().name}: スキャナーで{document}をスキャン中...")
time.sleep(1)
print(f"スレッド{threading.current_thread().name}: {document}のスキャン完了")
def copy_document(self, document):
"""文書のコピー(スキャン → 印刷)"""
print(f"スレッド{threading.current_thread().name}: {document}のコピー作業開始")
# スキャナーを取得
with self.scanner_lock:
print(f"スレッド{threading.current_thread().name}: スキャナーを取得")
time.sleep(0.5)
# プリンターを取得(デッドロック発生ポイント)
with self.printer_lock:
print(f"スレッド{threading.current_thread().name}: プリンターを取得")
print(f"スレッド{threading.current_thread().name}: {document}のコピー完了")
def print_and_scan(self, doc1, doc2):
"""印刷とスキャンの同時実行"""
print(f"スレッド{threading.current_thread().name}: 印刷とスキャン作業開始")
# プリンターを取得
with self.printer_lock:
print(f"スレッド{threading.current_thread().name}: プリンターを取得")
time.sleep(0.5)
# スキャナーを取得(デッドロック発生ポイント)
with self.scanner_lock:
print(f"スレッド{threading.current_thread().name}: スキャナーを取得")
print(f"スレッド{threading.current_thread().name}: 作業完了")
def worker1():
"""ワーカー1: コピー作業"""
office.copy_document("資料A")
def worker2():
"""ワーカー2: 印刷とスキャン作業"""
office.print_and_scan("資料B", "資料C")
# デッドロック発生例
office = Office()
thread1 = threading.Thread(target=worker1, name="Worker1")
thread2 = threading.Thread(target=worker2, name="Worker2")
thread1.start()
thread2.start()
thread1.join()
thread2.join()

デッドロックの回避方法

ロック順序の統一

一貫したロック取得順序

import threading
import time
class SafeBankSystem:
def __init__(self):
self.account_locks = {
1: threading.Lock(),
2: threading.Lock()
}
self.balances = {1: 1000, 2: 1000}
def transfer_money_safe(self, from_account, to_account, amount):
"""安全な送金(ロック順序を統一)"""
# 常にIDの小さい順にロックを取得
first_lock = min(from_account, to_account)
second_lock = max(from_account, to_account)
with self.account_locks[first_lock]:
print(f"スレッド{threading.current_thread().name}: アカウント{first_lock}をロック")
time.sleep(0.1)
with self.account_locks[second_lock]:
print(f"スレッド{threading.current_thread().name}: アカウント{second_lock}をロック")
# 残高チェック
if self.balances[from_account] >= amount:
self.balances[from_account] -= amount
self.balances[to_account] += amount
print(f"送金完了: {from_account}{to_account} ({amount}円)")
print(f"残高 - アカウント1: {self.balances[1]}, アカウント2: {self.balances[2]}")
else:
print("送金失敗: 残高不足")
def safe_transfer_1_to_2():
bank.transfer_money_safe(1, 2, 100)
def safe_transfer_2_to_1():
bank.transfer_money_safe(2, 1, 150)
# 安全な実行例
bank = SafeBankSystem()
thread1 = threading.Thread(target=safe_transfer_1_to_2, name="T1")
thread2 = threading.Thread(target=safe_transfer_2_to_1, name="T2")
thread1.start()
thread2.start()
thread1.join()
thread2.join()

タイムアウトの使用

一定時間でロック取得を諦める

import threading
import time
class TimeoutFileManager:
def __init__(self):
self.file_locks = {
'file1.txt': threading.Lock(),
'file2.txt': threading.Lock()
}
self.file_contents = {
'file1.txt': "File 1 contents",
'file2.txt': "File 2 contents"
}
def copy_files_with_timeout(self, source, destination, timeout=2):
"""タイムアウト付きファイルコピー"""
print(f"スレッド{threading.current_thread().name}: {source}{destination} コピー開始")
# ソースファイルのロック取得
if self.file_locks[source].acquire(timeout=timeout):
try:
print(f"スレッド{threading.current_thread().name}: {source}をロック")
content = self.file_contents[source]
time.sleep(0.1)
# デスティネーションファイルのロック取得(タイムアウト付き)
if self.file_locks[destination].acquire(timeout=timeout):
try:
print(f"スレッド{threading.current_thread().name}: {destination}をロック")
self.file_contents[destination] = content
print(f"スレッド{threading.current_thread().name}: コピー完了")
finally:
self.file_locks[destination].release()
else:
print(f"スレッド{threading.current_thread().name}: {destination}のロック取得タイムアウト")
finally:
self.file_locks[source].release()
else:
print(f"スレッド{threading.current_thread().name}: {source}のロック取得タイムアウト")
def timeout_copy_1_to_2():
file_manager.copy_files_with_timeout('file1.txt', 'file2.txt')
def timeout_copy_2_to_1():
file_manager.copy_files_with_timeout('file2.txt', 'file1.txt')
# タイムアウト付き実行例
file_manager = TimeoutFileManager()
thread1 = threading.Thread(target=timeout_copy_1_to_2, name="T1")
thread2 = threading.Thread(target=timeout_copy_2_to_1, name="T2")
thread1.start()
thread2.start()
thread1.join()
thread2.join()

単一ロックパターン

一つのロックですべてを管理

import threading
import time
class SingleLockOffice:
def __init__(self):
# すべてのリソースを一つのロックで管理
self.office_lock = threading.Lock()
self.printer_busy = False
self.scanner_busy = False
def copy_document_single_lock(self, document):
"""単一ロックでコピー処理"""
with self.office_lock:
print(f"スレッド{threading.current_thread().name}: オフィスリソースをロック")
# スキャナーの使用可能性チェック
if self.scanner_busy:
print(f"スレッド{threading.current_thread().name}: スキャナーが使用中")
return False
# プリンターの使用可能性チェック
if self.printer_busy:
print(f"スレッド{threading.current_thread().name}: プリンターが使用中")
return False
# 両方のリソースを予約
self.scanner_busy = True
self.printer_busy = True
print(f"スレッド{threading.current_thread().name}: {document}のコピー開始")
time.sleep(1) # コピー作業
# リソースを解放
self.scanner_busy = False
self.printer_busy = False
print(f"スレッド{threading.current_thread().name}: {document}のコピー完了")
return True
def print_document_single_lock(self, document):
"""単一ロックで印刷処理"""
with self.office_lock:
print(f"スレッド{threading.current_thread().name}: オフィスリソースをロック")
if self.printer_busy:
print(f"スレッド{threading.current_thread().name}: プリンターが使用中")
return False
self.printer_busy = True
print(f"スレッド{threading.current_thread().name}: {document}の印刷開始")
time.sleep(0.5)
self.printer_busy = False
print(f"スレッド{threading.current_thread().name}: {document}の印刷完了")
return True
def single_lock_worker1():
office.copy_document_single_lock("資料A")
def single_lock_worker2():
office.print_document_single_lock("資料B")
# 単一ロック実行例
office = SingleLockOffice()
thread1 = threading.Thread(target=single_lock_worker1, name="Worker1")
thread2 = threading.Thread(target=single_lock_worker2, name="Worker2")
thread1.start()
thread2.start()
thread1.join()
thread2.join()

リソース順序付け

リソースに優先順位を設定

import threading
import time
class ResourceManager:
def __init__(self):
# リソースに優先順位を設定
self.resources = {
'database': {'lock': threading.Lock(), 'priority': 1},
'file_system': {'lock': threading.Lock(), 'priority': 2},
'network': {'lock': threading.Lock(), 'priority': 3},
'cache': {'lock': threading.Lock(), 'priority': 4}
}
def acquire_resources(self, resource_names, timeout=5):
"""優先順位順でリソースを取得"""
# 優先順位でソート
sorted_resources = sorted(resource_names,
key=lambda r: self.resources[r]['priority'])
acquired_locks = []
try:
for resource_name in sorted_resources:
resource = self.resources[resource_name]
if resource['lock'].acquire(timeout=timeout):
acquired_locks.append(resource_name)
print(f"スレッド{threading.current_thread().name}: {resource_name}を取得")
else:
print(f"スレッド{threading.current_thread().name}: {resource_name}の取得タイムアウト")
# 失敗時は取得済みのロックを解放
self.release_resources(acquired_locks)
return False
return True
except Exception as e:
print(f"エラー: {e}")
self.release_resources(acquired_locks)
return False
def release_resources(self, resource_names):
"""リソースを解放"""
for resource_name in resource_names:
try:
self.resources[resource_name]['lock'].release()
print(f"スレッド{threading.current_thread().name}: {resource_name}を解放")
except Exception as e:
print(f"解放エラー: {e}")
def process_with_resources(self, resource_list, task_name):
"""リソースを使った処理"""
print(f"スレッド{threading.current_thread().name}: {task_name}開始")
if self.acquire_resources(resource_list):
try:
print(f"スレッド{threading.current_thread().name}: {task_name}実行中...")
time.sleep(1) # 実際の処理
print(f"スレッド{threading.current_thread().name}: {task_name}完了")
finally:
self.release_resources(resource_list)
else:
print(f"スレッド{threading.current_thread().name}: {task_name}失敗")
def task1():
"""タスク1: データベースとファイルシステムを使用"""
resource_manager.process_with_resources(['database', 'file_system'], 'データ処理タスク')
def task2():
"""タスク2: ファイルシステムとネットワークを使用"""
resource_manager.process_with_resources(['file_system', 'network'], 'ファイル送信タスク')
def task3():
"""タスク3: ネットワークとキャッシュを使用"""
resource_manager.process_with_resources(['network', 'cache'], 'キャッシュ更新タスク')
# リソース順序付け実行例
resource_manager = ResourceManager()
thread1 = threading.Thread(target=task1, name="Task1")
thread2 = threading.Thread(target=task2, name="Task2")
thread3 = threading.Thread(target=task3, name="Task3")
thread1.start()
thread2.start()
thread3.start()
thread1.join()
thread2.join()
thread3.join()

デッドロックの検出と対処

デッドロック検出ツール

簡単な検出システム

import threading
import time
from collections import defaultdict, deque
class DeadlockDetector:
def __init__(self):
self.wait_graph = defaultdict(set) # 待機グラフ
self.resource_owners = {} # リソースの所有者
self.lock = threading.Lock()
def add_wait_edge(self, waiting_thread, resource, owning_thread):
"""待機関係を追加"""
with self.lock:
self.wait_graph[waiting_thread].add(owning_thread)
print(f"待機関係追加: {waiting_thread}{owning_thread} (リソース: {resource})")
# デッドロック検出
if self.detect_cycle():
print("⚠️ デッドロックを検出しました!")
self.print_deadlock_info()
def remove_wait_edge(self, waiting_thread, owning_thread):
"""待機関係を削除"""
with self.lock:
if owning_thread in self.wait_graph[waiting_thread]:
self.wait_graph[waiting_thread].remove(owning_thread)
print(f"待機関係削除: {waiting_thread}{owning_thread}")
def detect_cycle(self):
"""循環待機の検出"""
visited = set()
rec_stack = set()
def dfs(node):
if node in rec_stack:
return True # 循環発見
if node in visited:
return False
visited.add(node)
rec_stack.add(node)
for neighbor in self.wait_graph[node]:
if dfs(neighbor):
return True
rec_stack.remove(node)
return False
for node in self.wait_graph:
if node not in visited:
if dfs(node):
return True
return False
def print_deadlock_info(self):
"""デッドロック情報の表示"""
print("=== デッドロック情報 ===")
for thread, waiting_for in self.wait_graph.items():
if waiting_for:
print(f"{thread}{list(waiting_for)} を待機中")
print("========================")
class MonitoredLock:
"""監視機能付きロック"""
def __init__(self, name, detector):
self.name = name
self.lock = threading.Lock()
self.detector = detector
self.owner = None
def acquire(self, timeout=None):
current_thread = threading.current_thread().name
# 現在の所有者がいる場合、待機関係を追加
if self.owner and self.owner != current_thread:
self.detector.add_wait_edge(current_thread, self.name, self.owner)
# ロック取得試行
acquired = self.lock.acquire(timeout=timeout)
if acquired:
self.owner = current_thread
print(f"{current_thread}: {self.name}を取得")
else:
print(f"{current_thread}: {self.name}の取得タイムアウト")
return acquired
def release(self):
current_thread = threading.current_thread().name
if self.owner == current_thread:
self.owner = None
self.lock.release()
print(f"{current_thread}: {self.name}を解放")
# 待機関係を削除
for waiting_thread in list(self.detector.wait_graph.keys()):
self.detector.remove_wait_edge(waiting_thread, current_thread)
def __enter__(self):
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
# 使用例
detector = DeadlockDetector()
lock1 = MonitoredLock("Lock1", detector)
lock2 = MonitoredLock("Lock2", detector)
def monitored_worker1():
with lock1:
time.sleep(0.1)
with lock2:
print("Worker1: 処理完了")
def monitored_worker2():
with lock2:
time.sleep(0.1)
with lock1:
print("Worker2: 処理完了")
# 監視付き実行
thread1 = threading.Thread(target=monitored_worker1, name="Worker1")
thread2 = threading.Thread(target=monitored_worker2, name="Worker2")
thread1.start()
thread2.start()
thread1.join(timeout=3)
thread2.join(timeout=3)
if thread1.is_alive() or thread2.is_alive():
print("スレッドがまだ実行中です(デッドロックの可能性)")

自動回復メカニズム

タイムアウトと再試行

import threading
import time
import random
class ResilientTaskManager:
def __init__(self):
self.locks = {
'resource1': threading.Lock(),
'resource2': threading.Lock(),
'resource3': threading.Lock()
}
self.max_retries = 3
self.base_timeout = 1
def execute_task_with_recovery(self, task_func, *args, **kwargs):
"""自動回復機能付きタスク実行"""
for attempt in range(self.max_retries):
try:
print(f"スレッド{threading.current_thread().name}: 試行 {attempt + 1}")
# 指数バックオフでタイムアウトを調整
timeout = self.base_timeout * (2 ** attempt)
result = task_func(timeout=timeout, *args, **kwargs)
if result:
print(f"スレッド{threading.current_thread().name}: タスク成功")
return True
except Exception as e:
print(f"スレッド{threading.current_thread().name}: エラー - {e}")
if attempt < self.max_retries - 1:
# ランダムな待機時間(thundering herd回避)
delay = random.uniform(0.1, 0.5)
print(f"スレッド{threading.current_thread().name}: {delay:.2f}秒待機後に再試行")
time.sleep(delay)
print(f"スレッド{threading.current_thread().name}: タスク失敗(最大試行回数に到達)")
return False
def complex_task(self, resources, timeout=1):
"""複数リソースを使う複雑なタスク"""
acquired_locks = []
try:
# リソースを順番に取得
for resource in resources:
lock = self.locks[resource]
if lock.acquire(timeout=timeout):
acquired_locks.append(resource)
print(f"スレッド{threading.current_thread().name}: {resource}を取得")
else:
print(f"スレッド{threading.current_thread().name}: {resource}取得タイムアウト")
return False
# 実際のタスク処理
print(f"スレッド{threading.current_thread().name}: タスク実行中...")
time.sleep(0.5)
return True
finally:
# 取得したロックを解放
for resource in acquired_locks:
self.locks[resource].release()
print(f"スレッド{threading.current_thread().name}: {resource}を解放")
def resilient_task1():
task_manager.execute_task_with_recovery(
task_manager.complex_task,
['resource1', 'resource2']
)
def resilient_task2():
task_manager.execute_task_with_recovery(
task_manager.complex_task,
['resource2', 'resource3']
)
def resilient_task3():
task_manager.execute_task_with_recovery(
task_manager.complex_task,
['resource3', 'resource1']
)
# 自動回復機能の実行例
task_manager = ResilientTaskManager()
thread1 = threading.Thread(target=resilient_task1, name="Task1")
thread2 = threading.Thread(target=resilient_task2, name="Task2")
thread3 = threading.Thread(target=resilient_task3, name="Task3")
thread1.start()
thread2.start()
thread3.start()
thread1.join()
thread2.join()
thread3.join()

実際の開発での注意点

設計時の考慮事項

デッドロック回避の設計パターン

# 良い設計例:責任の分離とシンプルなロック戦略
class OrderProcessor:
def __init__(self):
self.inventory = Inventory()
self.payment = PaymentService()
self.notification = NotificationService()
def process_order(self, order):
"""注文処理(デッドロック回避設計)"""
try:
# 1. 在庫予約(単独ロック)
if not self.inventory.reserve_items(order.items):
raise Exception("在庫不足")
# 2. 決済処理(単独ロック)
payment_result = self.payment.process_payment(order)
if not payment_result.success:
self.inventory.release_reservation(order.items)
raise Exception("決済失敗")
# 3. 注文確定(単独ロック)
order.status = 'confirmed'
order.save()
# 4. 通知送信(ロックなし・非同期)
self.notification.send_confirmation_async(order)
return True
except Exception as e:
print(f"注文処理エラー: {e}")
return False
class Inventory:
def __init__(self):
self.lock = threading.Lock()
self.items = {'item1': 10, 'item2': 5}
def reserve_items(self, items):
"""在庫予約(単一ロック)"""
with self.lock:
# 全てのアイテムが利用可能かチェック
for item_id, quantity in items.items():
if self.items.get(item_id, 0) < quantity:
return False
# 予約実行
for item_id, quantity in items.items():
self.items[item_id] -= quantity
return True
def release_reservation(self, items):
"""予約解除"""
with self.lock:
for item_id, quantity in items.items():
self.items[item_id] += quantity

コードレビューでのチェックポイント

## デッドロック回避チェックリスト
### ロック設計の確認
- [ ] ロックの取得順序は一貫しているか?
- [ ] 複数ロックを取得する場合の順序は定義されているか?
- [ ] ロックの保持時間は最小限に抑えられているか?
### エラーハンドリング
- [ ] ロック取得失敗時の処理は適切か?
- [ ] タイムアウト機能は実装されているか?
- [ ] 例外発生時にロックが適切に解放されるか?
### 設計の見直し
- [ ] 本当に複数のロックが必要か?
- [ ] より簡単な設計に変更できないか?
- [ ] 非同期処理で解決できないか?
### テスト観点
- [ ] 競合状態のテストが書かれているか?
- [ ] ストレステストで問題が発生しないか?
- [ ] タイムアウト処理のテストがあるか?

デバッグ手法

デッドロックの調査方法

import threading
import traceback
import time
class DeadlockDebugger:
def __init__(self):
self.thread_stacks = {}
self.lock_info = {}
self.monitoring = False
def start_monitoring(self):
"""デッドロック監視開始"""
self.monitoring = True
monitor_thread = threading.Thread(target=self._monitor_threads, daemon=True)
monitor_thread.start()
def _monitor_threads(self):
"""スレッド状態の監視"""
while self.monitoring:
time.sleep(2) # 2秒間隔で監視
current_threads = threading.enumerate()
stuck_threads = []
for thread in current_threads:
if thread.is_alive() and hasattr(thread, '_started'):
thread_id = thread.ident
# スレッドのスタックトレースを取得
frame = None
for thread_id_frame, frame_obj in threading._current_frames().items():
if thread_id_frame == thread_id:
frame = frame_obj
break
if frame:
stack = traceback.format_stack(frame)
# 前回と同じスタックの場合、stuck状態の可能性
if thread_id in self.thread_stacks:
if self.thread_stacks[thread_id] == stack:
stuck_threads.append(thread)
self.thread_stacks[thread_id] = stack
if stuck_threads:
print("🚨 デッドロックの疑いがあるスレッドを検出:")
for thread in stuck_threads:
print(f" - {thread.name} (ID: {thread.ident})")
if thread.ident in self.thread_stacks:
print(" スタックトレース:")
for line in self.thread_stacks[thread.ident][-5:]: # 最後の5行
print(f" {line.strip()}")
def stop_monitoring(self):
"""監視停止"""
self.monitoring = False
def dump_all_stacks(self):
"""全スレッドのスタックダンプ"""
print("=== 全スレッドスタックダンプ ===")
for thread_id, frame in threading._current_frames().items():
print(f"
スレッドID: {thread_id}")
traceback.print_stack(frame)
print("==============================")
# 使用例
debugger = DeadlockDebugger()
debugger.start_monitoring()
# デッドロックが発生する可能性のあるコード実行
# ...
# 問題が発生した場合
debugger.dump_all_stacks()

まとめ

デッドロックは、複数の処理が互いを待ち続けて停止してしまう問題ですが、適切な設計と実装で予防できます。

重要なポイント

  • 発生原因の理解: 4つの条件が揃った時に発生
  • 予防策の実装: ロック順序統一、タイムアウト、単一ロック
  • 検出と対処: 監視システムと自動回復機能
  • 設計の改善: シンプルで理解しやすい設計
  • 継続的な改善: コードレビューと監視の実施

初心者の方は、まず基本的な予防策(ロック順序の統一)から始めて、徐々に高度な手法を身につけていくことをおすすめします。

デッドロックを理解することで、より安全で信頼性の高い並行プログラムを書けるようになります。 ぜひ実際のプロジェクトでこれらの手法を活用してみてください。

関連記事