Pythonで処理を待機させる方法|time.sleep()からasyncioまで実践例で解説

python icon
Python

こんにちは、とまだです。

Pythonでプログラムを書いていると、一時的に処理を待機させたい場面があります。

たとえば、APIを連続で叩きすぎて怒られたり。 処理が速すぎて結果が追いつかなかったり。

実は、プログラムに「待つ」という動作を教えることは、とても大切なんです。

今回は、Pythonでの待機処理について解説します。 基本的なtime.sleep()から、非同期処理まで。 実際のコード例を見ながら、一緒に学んでいきましょう。

Pythonで待機が必要になる場面とは?

プログラムに「待つ」を教える前に。 まず、どんな時に待機が必要になるのか見ていきましょう。

APIを叩く時の待機

外部のAPIサービスを使う時。 多くのサービスには「レート制限」があります。

たとえば「1分間に60回まで」という制限。 これを守らないと、アクセス拒否されてしまいます。

ちょうど、電話で話す時と同じですね。 相手が話し終わるまで待つ。 そうしないと、会話が成立しません。

ファイルの生成を待つ

別のプログラムがファイルを作成中。 そのファイルを読み込みたい場合。

電子レンジで温めている料理を待つようなものです。 チンと鳴るまで、扉を開けても意味がありません。

定期的な処理の実行

1時間ごとにデータをチェックする。 5分おきにログを確認する。

これも待機が必要な典型例です。

time.sleep()で基本の待機を理解する

Pythonで最も簡単な待機方法。 それがtime.sleep()です。

基本的な使い方

まずは、5秒待つプログラムを書いてみましょう。

import time

print("処理を開始します")
time.sleep(5)  # 5秒待機
print("5秒経過しました")

このコードのポイントはtime.sleep(5)の部分。 引数に秒数を指定するだけで、処理が止まります。

小数点も使える

1秒未満の待機も可能です。

import time

for i in range(5):
    print(f"カウント: {i}")
    time.sleep(0.5)  # 0.5秒待機

0.5秒ごとにカウントが進みます。 ゲームのカウントダウンみたいですね。

time.sleep()の注意点

ただし、time.sleep()には弱点があります。 待機中は、プログラム全体が止まってしまうんです。

信号待ちで車が完全停止するイメージ。 他の処理も一緒に止まってしまいます。

そこで、もっと賢い待機方法を見ていきましょう。

マルチスレッドで並行処理しながら待機

複数の処理を同時に進めたい。 そんな時は、マルチスレッドが便利です。

threadingモジュールの基本

レストランの厨房を想像してください。 料理人が複数いれば、同時に違う料理を作れます。

Pythonでも同じことができます。

import threading
import time

def cook_pasta():
    print("パスタを茹で始めます")
    time.sleep(3)
    print("パスタが茹で上がりました")

def make_sauce():
    print("ソースを作り始めます")
    time.sleep(2)
    print("ソースが完成しました")

# スレッドを作成して実行
t1 = threading.Thread(target=cook_pasta)
t2 = threading.Thread(target=make_sauce)

t1.start()
t2.start()

t1.join()
t2.join()

print("料理が完成しました!")

パスタとソースを同時に作る。 待ち時間を有効活用できますね。

イベントを使った待機

もう一つ便利な仕組みがあります。 それが「イベント」です。

import threading
import time

event = threading.Event()

def waiter():
    print("注文を待っています...")
    event.wait()  # イベントがセットされるまで待機
    print("注文を受けました!")

# ウェイタースレッドを開始
t = threading.Thread(target=waiter)
t.start()

# 2秒後に注文(イベントをセット)
time.sleep(2)
print("お客様が注文します")
event.set()

t.join()

レストランで注文を待つウェイターのように。 合図があるまで待機します。

asyncioで非同期処理を使った待機

Python 3で追加された強力な機能。 それがasyncioです。

非同期処理の基本

複数のWebサイトからデータを取得する場面を考えてみましょう。

import asyncio

async def fetch_data(site_name, seconds):
    print(f"{site_name}からデータ取得開始")
    await asyncio.sleep(seconds)
    print(f"{site_name}からデータ取得完了")
    return f"{site_name}のデータ"

async def main():
    # 3つのサイトから同時にデータ取得
    results = await asyncio.gather(
        fetch_data("サイトA", 2),
        fetch_data("サイトB", 3),
        fetch_data("サイトC", 1)
    )
    print(f"取得したデータ: {results}")

asyncio.run(main())

3つのサイトに同時アクセス。 最も時間がかかるサイトBの3秒で全て完了します。

通常なら2+3+1=6秒かかるところを、3秒で終わらせる。 これが非同期処理の威力です。

asyncio.sleep()の特徴

time.sleep()との違いは何でしょうか。

import asyncio
import time

async def async_task(name):
    print(f"{name} 開始")
    await asyncio.sleep(1)  # 他のタスクに処理を譲る
    print(f"{name} 完了")

async def main():
    start = time.time()
    await asyncio.gather(
        async_task("タスク1"),
        async_task("タスク2"),
        async_task("タスク3")
    )
    end = time.time()
    print(f"実行時間: {end - start:.2f}秒")

asyncio.run(main())

3つのタスクが1秒で完了します。 待機中も他のタスクが動けるからです。

実践的な待機処理の活用例

ここまで学んだ待機処理。 実際の開発ではどう使うのでしょうか。

APIのレート制限に対応する

よくある実装例を見てみましょう。

import time
import requests

def call_api_with_limit(urls, rate_limit=1):
    results = []

    for url in urls:
        # API呼び出し(ここでは例として表示のみ)
        print(f"APIを呼び出し: {url}")
        results.append(f"{url}の結果")

        # レート制限のため待機
        time.sleep(rate_limit)

    return results

# 使用例
urls = ["api.example.com/1", "api.example.com/2", "api.example.com/3"]
results = call_api_with_limit(urls, rate_limit=0.5)

0.5秒ごとにAPIを呼び出します。 これでレート制限を守れます。

タイムアウトを設定する

待機には上限を設けることも大切です。

import asyncio

async def task_with_timeout():
    try:
        # 5秒かかる処理を3秒でタイムアウト
        await asyncio.wait_for(
            asyncio.sleep(5),
            timeout=3
        )
        print("処理完了")
    except asyncio.TimeoutError:
        print("タイムアウトしました")

asyncio.run(task_with_timeout())

相手からの返事を永遠に待つわけにはいきません。 適切なタイムアウトを設定しましょう。

リトライ処理と組み合わせる

失敗した時の再試行も重要です。

import time

def retry_operation(func, max_retries=3, wait_time=1):
    for i in range(max_retries):
        try:
            return func()
        except Exception as e:
            if i < max_retries - 1:
                print(f"失敗しました。{wait_time}秒後に再試行...")
                time.sleep(wait_time)
                wait_time *= 2  # 待機時間を倍にする
            else:
                print("最大試行回数に達しました")
                raise e

# 使用例(50%の確率で失敗する関数)
import random

def unstable_function():
    if random.random() < 0.5:
        raise Exception("処理に失敗")
    return "成功!"

try:
    result = retry_operation(unstable_function)
    print(result)
except:
    print("全ての試行が失敗しました")

失敗するたびに待機時間を増やす。 サーバーへの負荷も考慮した実装です。

待機処理を使う時の注意点

最後に、気をつけるべきポイントをまとめます。

過剰な待機を避ける

待機時間が長すぎると、ユーザーは不満を感じます。 逆に短すぎると、システムに負荷がかかります。

適切なバランスを見つけることが大切です。

テストでの待機処理

テストコードでは、実際の待機は避けたいもの。 モックを使って時間を操作しましょう。

from unittest.mock import patch
import time

def slow_function():
    time.sleep(5)
    return "完了"

# テストでは待機をスキップ
with patch('time.sleep'):
    result = slow_function()
    print(result)  # すぐに"完了"が表示される

開発効率を上げるテクニックです。

適切な方法を選ぶ

  • 単純な待機ならtime.sleep()
  • 並行処理が必要ならthreading
  • I/O待機が多いならasyncio

状況に応じて使い分けましょう。

まとめ

Pythonでの待機処理について学びました。

基本のtime.sleep()から始まり、マルチスレッド、そして非同期処理まで。 それぞれに特徴があり、適した場面があります。

大切なのは、なぜ待機が必要なのかを理解すること。 そして、最適な方法を選ぶことです。

今回紹介したコード例を参考に、実際に手を動かしてみてください。 きっと、より効率的なプログラムが書けるようになるはずです。

共有:

著者について

とまだ

とまだ

フルスタックエンジニア

Learning Next の創設者。Ruby on Rails と React を中心に、プログラミング教育に情熱を注いでいます。初心者が楽しく学べる環境作りを目指しています。

著者の詳細を見る →