asyncioは強力ですが、すべてのライブラリがasync/awaitに対応しているわけではありません。requests、psycopg2、多くのレガシーコードはブロッキング操作を行います。asyncioとスレッドを組み合わせることで、これらのコードを非同期環境で活用できます。
なぜ組み合わせが必要か
ブロッキング操作の問題
import asyncio
import requests # ブロッキングライブラリ
async def fetch_data():
# これはイベントループをブロックしてしまう!
response = requests.get("https://api.example.com/data")
return response.json()
async def other_task():
for i in range(5):
print(f"Working... {i}")
await asyncio.sleep(0.1)
async def main():
# other_taskはfetch_dataが完了するまで実行されない
await asyncio.gather(
fetch_data(),
other_task()
)
asyncio.run(main())
イベントループがブロックされると、他のすべてのコルーチンが停止します。
run_in_executor
ブロッキング操作をスレッドプールで実行:
import asyncio
from concurrent.futures import ThreadPoolExecutor
import requests
def blocking_fetch(url: str) -> dict:
"""ブロッキングなHTTPリクエスト"""
response = requests.get(url)
return response.json()
async def async_fetch(url: str) -> dict:
"""ブロッキング操作を非同期化"""
loop = asyncio.get_event_loop()
# スレッドプールで実行
result = await loop.run_in_executor(
None, # デフォルトのExecutorを使用
blocking_fetch,
url
)
return result
async def main():
urls = [
"https://api.github.com/users/python",
"https://api.github.com/users/django",
"https://api.github.com/users/flask",
]
# 並行実行
results = await asyncio.gather(
*[async_fetch(url) for url in urls]
)
for result in results:
print(result.get("login"))
asyncio.run(main())
カスタムExecutor
import asyncio
from concurrent.futures import ThreadPoolExecutor
# スレッド数を制御
executor = ThreadPoolExecutor(max_workers=5)
async def fetch_with_custom_executor(url: str):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
executor,
blocking_fetch,
url
)
async def main():
try:
results = await asyncio.gather(
*[fetch_with_custom_executor(f"url{i}") for i in range(10)]
)
finally:
executor.shutdown(wait=True)
asyncio.to_thread(Python 3.9+)
より簡潔な構文:
import asyncio
import requests
def blocking_operation(url: str) -> str:
response = requests.get(url)
return response.text
async def main():
# to_threadで簡潔に
result = await asyncio.to_thread(
blocking_operation,
"https://api.github.com"
)
print(result[:100])
asyncio.run(main())
複数の引数
import asyncio
import time
def process_data(data: str, multiplier: int, prefix: str) -> str:
time.sleep(0.1) # シミュレート
return f"{prefix}: {data * multiplier}"
async def main():
result = await asyncio.to_thread(
process_data,
"hello",
3,
"Result"
)
print(result) # Result: hellohellohello
asyncio.run(main())
実践パターン
データベース操作のラッピング
import asyncio
from concurrent.futures import ThreadPoolExecutor
import psycopg2 # ブロッキングライブラリ
from contextlib import contextmanager
class AsyncDatabase:
def __init__(self, connection_string: str, max_workers: int = 5):
self.connection_string = connection_string
self.executor = ThreadPoolExecutor(max_workers=max_workers)
@contextmanager
def _get_connection(self):
conn = psycopg2.connect(self.connection_string)
try:
yield conn
finally:
conn.close()
def _execute_query(self, query: str, params: tuple = None) -> list:
with self._get_connection() as conn:
with conn.cursor() as cur:
cur.execute(query, params)
return cur.fetchall()
async def execute(self, query: str, params: tuple = None) -> list:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
self._execute_query,
query,
params
)
async def close(self):
self.executor.shutdown(wait=True)
# 使用例
async def main():
db = AsyncDatabase("postgresql://localhost/mydb")
try:
users = await db.execute("SELECT * FROM users WHERE active = %s", (True,))
print(users)
finally:
await db.close()
ファイルI/Oのラッピング
import asyncio
from pathlib import Path
def read_file_sync(path: Path) -> str:
return path.read_text()
def write_file_sync(path: Path, content: str) -> None:
path.write_text(content)
async def read_file(path: Path) -> str:
return await asyncio.to_thread(read_file_sync, path)
async def write_file(path: Path, content: str) -> None:
await asyncio.to_thread(write_file_sync, path, content)
async def process_files():
files = list(Path(".").glob("*.txt"))
# 並行読み込み
contents = await asyncio.gather(
*[read_file(f) for f in files]
)
for file, content in zip(files, contents):
print(f"{file}: {len(content)} chars")
サードパーティライブラリのラッピング
import asyncio
from functools import partial
import boto3 # AWS SDK(ブロッキング)
class AsyncS3Client:
def __init__(self):
self.client = boto3.client('s3')
async def list_buckets(self) -> list:
return await asyncio.to_thread(
self.client.list_buckets
)
async def get_object(self, bucket: str, key: str) -> bytes:
response = await asyncio.to_thread(
self.client.get_object,
Bucket=bucket,
Key=key
)
# 注意: responseのBodyも読み込みが必要
body = await asyncio.to_thread(response['Body'].read)
return body
async def put_object(self, bucket: str, key: str, data: bytes) -> dict:
return await asyncio.to_thread(
self.client.put_object,
Bucket=bucket,
Key=key,
Body=data
)
レガシーコードの段階的移行
Phase 1: ラッパー作成
# legacy_module.py(変更なし)
def fetch_user(user_id: int) -> dict:
# 既存のブロッキングコード
pass
def update_user(user_id: int, data: dict) -> bool:
pass
# async_legacy.py(新規作成)
import asyncio
from . import legacy_module
async def fetch_user(user_id: int) -> dict:
return await asyncio.to_thread(legacy_module.fetch_user, user_id)
async def update_user(user_id: int, data: dict) -> bool:
return await asyncio.to_thread(legacy_module.update_user, user_id, data)
Phase 2: 呼び出し側を移行
# 移行前
from legacy_module import fetch_user
def process_users(user_ids):
for uid in user_ids:
user = fetch_user(uid)
print(user)
# 移行後
from async_legacy import fetch_user
async def process_users(user_ids):
users = await asyncio.gather(
*[fetch_user(uid) for uid in user_ids]
)
for user in users:
print(user)
Phase 3: 徐々にネイティブasyncに
# 最終的にネイティブasyncに書き換え
import aiohttp
async def fetch_user(user_id: int) -> dict:
async with aiohttp.ClientSession() as session:
async with session.get(f"/api/users/{user_id}") as response:
return await response.json()
注意点とベストプラクティス
スレッドセーフティ
import asyncio
import threading
# グローバル状態に注意
shared_data = {"count": 0}
lock = threading.Lock()
def increment():
with lock: # スレッドセーフにする
shared_data["count"] += 1
async def main():
await asyncio.gather(
*[asyncio.to_thread(increment) for _ in range(100)]
)
print(shared_data["count"]) # 100
Executorのライフサイクル
import asyncio
from concurrent.futures import ThreadPoolExecutor
# 良い例: コンテキストマネージャを使用
async def good_example():
with ThreadPoolExecutor(max_workers=5) as executor:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, blocking_func)
return result
# 悪い例: シャットダウンを忘れる
executor = ThreadPoolExecutor() # グローバル
# ... プログラム終了時にリソースリーク
過度なスレッド生成を避ける
import asyncio
from concurrent.futures import ThreadPoolExecutor
# 適切なワーカー数を設定
MAX_WORKERS = min(32, (os.cpu_count() or 1) + 4)
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
# または、セマフォで制限
semaphore = asyncio.Semaphore(10)
async def limited_blocking_call(func, *args):
async with semaphore:
return await asyncio.to_thread(func, *args)
まとめ
flowchart LR
subgraph Async["asyncioコード"]
A1["async def main()"]
end
subgraph Bridge["ブリッジ"]
B1["run_in_executor"]
B2["to_thread"]
end
subgraph Blocking["ブロッキングコード"]
C1["requests"]
C2["psycopg2"]
C3["レガシーコード"]
end
A1 --> Bridge
Bridge --> Blocking
style Async fill:#8b5cf6,color:#fff
style Bridge fill:#f59e0b,color:#000
style Blocking fill:#6b7280,color:#fff
| メソッド | 用途 | Python |
|---|---|---|
run_in_executor |
カスタムExecutor使用 | 3.4+ |
to_thread |
シンプルなスレッド実行 | 3.9+ |
主要な原則:
- ブロッキング操作をスレッドへ: イベントループをブロックしない
- 適切なワーカー数: 過度なスレッド生成を避ける
- 段階的に移行: まずラッパー、次にネイティブasync
- スレッドセーフティ: 共有状態にはロックを使用
asyncioとスレッドの組み合わせにより、既存のコードベースを活かしながら非同期の恩恵を受けられます。