Pythonの並列処理入門!スレッド・プロセス・非同期処理の違いと使い分けを解説
こんにちは、とまだです。
プログラムの実行が遅くて、もっと速くできないかと悩んだことはありませんか?
実は、Pythonには複数の処理を同時に実行する仕組みがいくつか用意されています。
今回は現役のエンジニア、そして元プログラミングスクール講師としての経験から、並列処理の基本と実践的な使い分けについて解説します。
並列処理って何?まずは基本から理解しよう
並列処理を理解するには、レストランの厨房を想像してみてください。
料理人が一人だけだと、前菜を作ってからメインを作り、最後にデザートを作ります。 これが通常の処理の流れです。
でも、料理人が複数いたらどうでしょう? 前菜係、メイン係、デザート係が同時に働けますよね。
これが並列処理の基本的な考え方です。
並列処理と並行処理の違い
ここで大事な区別があります。
並列処理は、複数の料理人が本当に同時に働いている状態です。 物理的に複数のCPUコアを使って、まさに同時進行で処理します。
並行処理は、一人の料理人が素早く持ち場を切り替えている状態です。 見た目は同時に進んでいるように見えますが、実際は高速で切り替えているだけです。
Pythonではこの両方が使えますが、場面によって使い分ける必要があります。
PythonのGIL(グローバルインタプリタロック)を知ろう
PythonにはGILという特殊な仕組みがあります。
これは「一度に一つのPythonコードしか実行できない」という制限です。 まるで、厨房に鍵が一つしかなくて、料理人は交代で中に入るようなものです。
この制限があるため、CPUをガンガン使う処理では、スレッドを増やしても速くなりません。
でも、待ち時間が多い処理なら話は別です。 ファイル読み込みやネットワーク通信の待ち時間中は、他のスレッドが動けるからです。
スレッドを使った並列処理
スレッドの基本的な使い方
まずは最もシンプルな並列処理、スレッドから見ていきましょう。
以下のコードは、複数のタスクを同時に実行する基本的な例です。
import threading
import time
def worker(task_id):
print(f"タスク {task_id} を開始しました")
time.sleep(2) # 何か時間のかかる処理
print(f"タスク {task_id} が終了しました")
def main():
threads = []
# 5つのスレッドを作成
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
# 全スレッドの終了を待つ
for t in threads:
t.join()
print("全てのタスクが完了しました")
if __name__ == "__main__":
main()
このコードのポイントは、5つのタスクがほぼ同時に開始される点です。 通常なら10秒(2秒×5回)かかる処理が、約2秒で終わります。
スレッドが向いている場面
スレッドは以下のような場面で威力を発揮します。
- ファイルの読み書きが多い処理
- ネットワーク通信を含む処理
- データベースへのアクセスが多い処理
これらは全て「待ち時間」が発生する処理です。 待っている間に他のスレッドが動けるので、効率が上がります。
スレッドを使う時の注意点
複数のスレッドが同じデータを触ると問題が起きます。
銀行口座の残高を想像してください。 二人が同時に引き出そうとしたら、残高がおかしくなりますよね。
これを防ぐには、Lock
という仕組みを使います。
import threading
# 共有する変数
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
with lock: # ロックを取得
counter += 1 # この部分は同時に一つのスレッドしか実行できない
# 複数スレッドで実行
threads = []
for _ in range(5):
t = threading.Thread(target=increment)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最終的なカウンター: {counter}")
with lock:
の部分で、一度に一つのスレッドしか入れないようにしています。
これで、データの整合性が保たれます。
マルチプロセスを使った並列処理
プロセスの特徴と使い方
スレッドがGILに縛られるなら、プロセスを使えばいい。
それがmultiprocessing
モジュールです。
プロセスは完全に独立した実行環境を持ちます。 別々の厨房で料理人が働くイメージです。
以下は、CPU負荷の高い計算を並列化する例です。
import multiprocessing
import time
def heavy_computation(start, end):
"""大量の計算を行う関数"""
total = 0
for i in range(start, end):
total += i ** 2
return total
def main():
# CPUコア数を取得
num_cores = multiprocessing.cpu_count()
print(f"利用可能なCPUコア数: {num_cores}")
# 計算範囲を分割
chunk_size = 10_000_000 // num_cores
# プロセスプールを作成
with multiprocessing.Pool(processes=num_cores) as pool:
# 各プロセスに仕事を割り当て
jobs = []
for i in range(num_cores):
start = i * chunk_size
end = (i + 1) * chunk_size
job = pool.apply_async(heavy_computation, (start, end))
jobs.append(job)
# 結果を集計
total = sum(job.get() for job in jobs)
print(f"計算結果: {total}")
if __name__ == "__main__":
start_time = time.time()
main()
print(f"処理時間: {time.time() - start_time:.2f}秒")
このコードでは、大きな計算を複数のプロセスに分割しています。 各プロセスが独立して計算するので、CPUコアをフル活用できます。
プロセス間でのデータ共有
プロセス同士でデータを共有したい場合は、特別な仕組みが必要です。
import multiprocessing
def worker(shared_list, index):
"""共有リストに値を追加する"""
shared_list[index] = index ** 2
def main():
# マネージャーを使って共有リストを作成
manager = multiprocessing.Manager()
shared_list = manager.list([0] * 5)
processes = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(shared_list, i))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"結果: {list(shared_list)}")
if __name__ == "__main__":
main()
Manager()
を使うと、プロセス間で安全にデータを共有できます。
ただし、データのやり取りにはコストがかかるので、必要最小限にしましょう。
非同期処理(asyncio)で効率的に
非同期処理の基本
asyncio
は、待ち時間を効率的に活用する仕組みです。
レストランのウェイターを想像してください。 注文を取った後、料理ができるまで待つのは非効率ですよね。 その間に他のテーブルの注文を取りに行くはずです。
これが非同期処理の考え方です。
import asyncio
import aiohttp
async def fetch_url(session, url):
"""URLからデータを取得する"""
print(f"取得開始: {url}")
async with session.get(url) as response:
data = await response.text()
print(f"取得完了: {url} (サイズ: {len(data)})")
return len(data)
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
]
async with aiohttp.ClientSession() as session:
# 全てのURLを並行して取得
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"合計サイズ: {sum(results)}")
if __name__ == "__main__":
asyncio.run(main())
このコードでは、3つのURLに同時にアクセスしています。 一つずつアクセスすると4秒かかりますが、並行処理なら2秒で済みます。
asyncioが得意な処理
非同期処理は以下のような場面で真価を発揮します。
- 大量のAPI呼び出し
- データベースへの複数クエリ
- ファイルの並行読み書き
- WebSocketを使ったリアルタイム通信
待ち時間が多く、CPU負荷が低い処理に最適です。
どの手法を選ぶべきか?
処理の種類で使い分ける
並列処理の手法を選ぶ際は、処理の性質を見極めることが大切です。
I/O待機が多い処理の場合:
- ファイル読み書き → スレッドまたは非同期処理
- ネットワーク通信 → 非同期処理が最適
- データベースアクセス → 非同期処理
CPU負荷が高い処理の場合:
- 数値計算 → マルチプロセス
- 画像処理 → マルチプロセス
- データ分析 → マルチプロセス
実装の簡単さも考慮する
それぞれの手法には、実装の難易度に違いがあります。
最も簡単なのはconcurrent.futures
を使う方法です。
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
def task(n):
"""何か時間のかかる処理"""
time.sleep(1)
return n ** 2
# スレッドプールの例
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(task, range(10)))
print(f"スレッドプールの結果: {results}")
# プロセスプールの例
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(task, range(10)))
print(f"プロセスプールの結果: {results}")
map()
を使えば、リストの各要素に対して並列処理を簡単に適用できます。
実践的な活用例
複数のWebページをスクレイピング
以下は、複数のWebページから情報を取得する実践的な例です。
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def scrape_page(session, url):
"""ページからタイトルを取得"""
try:
async with session.get(url) as response:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
title = soup.find('title').text if soup.find('title') else "タイトルなし"
return url, title
except Exception as e:
return url, f"エラー: {e}"
async def main():
urls = [
"https://www.python.org",
"https://docs.python.org",
"https://pypi.org",
]
async with aiohttp.ClientSession() as session:
tasks = [scrape_page(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, title in results:
print(f"{url}: {title}")
if __name__ == "__main__":
asyncio.run(main())
複数のページを同時に取得することで、処理時間を大幅に短縮できます。
大量の画像をリサイズ
CPU負荷の高い画像処理は、マルチプロセスが適しています。
import multiprocessing
from PIL import Image
import os
def resize_image(args):
"""画像をリサイズする"""
input_path, output_path, size = args
try:
with Image.open(input_path) as img:
resized = img.resize(size, Image.Resampling.LANCZOS)
resized.save(output_path)
return f"成功: {input_path}"
except Exception as e:
return f"失敗: {input_path} - {e}"
def main():
# 処理する画像のリスト
images = [
("input1.jpg", "output1.jpg", (800, 600)),
("input2.jpg", "output2.jpg", (800, 600)),
# ... 他の画像
]
# プロセスプールで並列処理
with multiprocessing.Pool() as pool:
results = pool.map(resize_image, images)
for result in results:
print(result)
if __name__ == "__main__":
main()
各画像の処理が独立しているので、プロセスを分けて効率的に処理できます。
パフォーマンスを測定しよう
シンプルな計測方法
並列化の効果を確認するには、処理時間を測定することが大切です。
import time
from functools import wraps
def measure_time(func):
"""処理時間を測定するデコレータ"""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
end = time.perf_counter()
print(f"{func.__name__}の処理時間: {end - start:.2f}秒")
return result
return wrapper
@measure_time
def sequential_processing():
"""逐次処理の例"""
total = 0
for i in range(5):
time.sleep(1) # 重い処理をシミュレート
total += i
return total
@measure_time
def parallel_processing():
"""並列処理の例"""
import concurrent.futures
def task(n):
time.sleep(1)
return n
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(task, range(5)))
return sum(results)
# 実行して比較
sequential_result = sequential_processing()
parallel_result = parallel_processing()
このように測定することで、並列化の効果を数値で確認できます。
よくある失敗と対策
デッドロックを避ける
複数のロックを使うときは、取得順序を統一しましょう。
import threading
# 悪い例:デッドロックの可能性あり
lock1 = threading.Lock()
lock2 = threading.Lock()
def task1():
with lock1:
# 何か処理
with lock2:
# 何か処理
pass
def task2():
with lock2: # 順序が逆!
# 何か処理
with lock1:
# 何か処理
pass
# 良い例:順序を統一
def better_task1():
with lock1:
with lock2:
# 何か処理
pass
def better_task2():
with lock1: # 同じ順序
with lock2:
# 何か処理
pass
ロックの取得順序を常に同じにすることで、デッドロックを防げます。
適切なワーカー数の設定
ワーカー(スレッドやプロセス)の数は、多ければ良いわけではありません。
import multiprocessing
import concurrent.futures
import time
def find_optimal_workers(task_func, data, max_workers=None):
"""最適なワーカー数を見つける"""
if max_workers is None:
max_workers = multiprocessing.cpu_count() * 2
results = {}
for num_workers in range(1, max_workers + 1):
start = time.perf_counter()
with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor:
list(executor.map(task_func, data))
elapsed = time.perf_counter() - start
results[num_workers] = elapsed
print(f"ワーカー数 {num_workers}: {elapsed:.2f}秒")
optimal = min(results, key=results.get)
print(f"最適なワーカー数: {optimal}")
return optimal
実際のタスクで測定して、最適な数を見つけましょう。
まとめ
Pythonの並列処理には、大きく分けて3つの方法があります。
それぞれの特徴を理解して、適切に使い分けることが大切です。
スレッドは、I/O待機が多い処理に最適です。 実装も比較的簡単で、データ共有もしやすいです。
マルチプロセスは、CPU負荷の高い処理で威力を発揮します。 GILの制限を受けないので、本当の並列処理が可能です。
非同期処理は、大量のI/O処理を効率的に扱えます。 特にネットワーク通信が多い場合に有効です。
まずは簡単な例から始めて、徐々に複雑な処理に挑戦してみてください。
並列処理を使いこなせるようになると、プログラムの実行時間を大幅に短縮できます。
ぜひ、実際のプロジェクトで活用してみてください。
著者について

とまだ
フルスタックエンジニア
Learning Next の創設者。Ruby on Rails と React を中心に、プログラミング教育に情熱を注いでいます。初心者が楽しく学べる環境作りを目指しています。
著者の詳細を見る →