雑記: Python の asyncio の話

太郎と次郎がレストランに来たとします。2人は知り合いとかではなくただ同時にレストランに来ただけです。2人がレストランですることは料理を待つことと食事をすることとします。料理ができてこなければ食事をすることはできませんが、2人は他人なので相手の料理がきたかとか相手が食事を終えたかとかを待つ必要はありません。Python の asyncio のコルーチンをつかうと「この処理の結果を待たずに他の処理をしていいよ」ということができると思います。「食事をするには料理を待つ必要があるが、2人は互いを待たなくてよい」を記述すると以下になると思います。全ての処理に async を付けてコルーチンにしています。料理ができるのに5秒、食事をするのに5秒かかるとすると、2人とも店に来てから10秒後に店を出ることができると思います。

import asyncio
import time

async def cook():
    await asyncio.sleep(5)  # 料理

async def eat():
    await asyncio.sleep(5)  # 食事

async def restaurant(name):
    t0 = time.time()
    print(f'{name}が店に来ました.')
    await cook()
    print(f'{name}の料理ができました({int(time.time() - t0)}秒経過).')
    await eat()
    print(f'{name}は食事を終えました({int(time.time() - t0)}秒経過).')

async def main():
    await asyncio.gather(
        restaurant('太郎'),
        restaurant('次郎'),
    )

asyncio.run(main())
$ python test.py
太郎が店に来ました.
次郎が店に来ました.
太郎の料理ができました(5秒経過).
次郎の料理ができました(5秒経過).
太郎は食事を終えました(10秒経過).
次郎は食事を終えました(10秒経過).


しかし、上のコードは「店側が太郎の料理と次郎の料理を並行して用意できる」場合のものになると思います。実はこの店はワンオペで、並行して複数人の料理を用意するということができないかもしれません。

上のコードをワンオペ版にしたいと思います。つまり、「# 料理」の箇所が並列処理されないようにしたいと思います。そのためには、この箇所を通るのにこの世に1つしかない通行手形を要求するようにすればいいと思います(昔の鉄道で上りと下りの列車が同時に侵入するのを防ぐために用いられていたタブレットを思い出します)。ここでは、「ある特定のファイルへの排他ロックを得ること」を通行手形とします。同一のファイルへの排他ロックが同時に複数取得されることはないからです。

Python では標準パッケージの fcntl で排他ロックを取得できます(が fcntl は Windows には対応していません!)。対象のファイルは何でもいいのでこのスクリプト自身(__file__)にします。そして、「料理をするには排他ロックの取得を待つ必要がある」というように記述します。fcntl.flock はコルーチンでなく通常の関数なので、run_in_executor で包むことによって結果を待つことができるようにしています。これは店員の手が空くのを待っているようなものです(店員は1人だけだが)。手が空いた店員を確保でき次第、料理させます。料理させた後は忘れずに店員を解放(排他ロックを解放)します(が、以下の LOCK_UN の行を明示的にかかなくてもファイルオブジェクトをクローズすれば排他ロックは解放されます)。以下のワンオペ版コードでは、次郎の料理ができるのが10秒後にずれているのがわかります。

import asyncio
import fcntl
import time

async def cook():
    fd = open(__file__, 'r')
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, lambda: fcntl.flock(fd, fcntl.LOCK_EX))
    await asyncio.sleep(5)  # 料理
    fcntl.flock(fd, fcntl.LOCK_UN)
    fd.close()

async def eat():
    await asyncio.sleep(5)  # 食事

async def restaurant(name):
    t0 = time.time()
    print(f'{name}が店に来ました.')
    await cook()
    print(f'{name}の注文ができました({int(time.time() - t0)}秒経過).')
    await eat()
    print(f'{name}は食事を終えました({int(time.time() - t0)}秒経過).')

async def main():
    await asyncio.gather(
        restaurant('太郎'),
        restaurant('次郎'),
    )

asyncio.run(main())
$ python test.py
太郎が店に来ました.
次郎が店に来ました.
太郎の注文ができました(5秒経過).
太郎は食事を終えました(10秒経過).
次郎の注文ができました(10秒経過).
次郎は食事を終えました(15秒経過).



ところで上のコードの cook で fcntl.flock(fd, fcntl.LOCK_EX) を run_in_executor に包まないと、参考文献 1. の質問者がいっているようにデッドロックが発生すると思います(というか私はデッドロックして参考文献 1. にたどり着きました)。つまり、以下のようにすると太郎の注文も次郎の注文も永遠にできあがりません。

async def cook():
    fd = open(__file__, 'r')
    fcntl.flock(fd, fcntl.LOCK_EX)
    await asyncio.sleep(5)  # 料理
    fcntl.flock(fd, fcntl.LOCK_UN)
    fd.close()

デッドロックが起きるのは、asyncio が太郎のレストラン訪問と次郎のレストラン訪問を本当に並列処理しているわけではなく、シングルスレッドで「もし待ち状態になったらいますぐ着手できる処理に切り替えていく」というものであるため、実際にはまず太郎の処理を開始して「# 料理」まで到達したところで次郎の処理に切り替えたものの、次郎のときには排他ロックを取得できず(既に太郎が取得しているため)排他ロックの取得が試行され続け、太郎の処理に戻れないので排他ロックが解放されないためです。fcntl.LOCK_EX を fcntl.LOCK_EX | fcntl.LOCK_NB としてノンブロッキングにすれば最初に排他ロックを取得できなかった時点で直ちにエラー終了します。

他方、1つ上のコードで「# 料理」を awaitable にしないとコードの実行自体はできます。ただこのとき次郎が店に到着するタイミングがずれていることがわかります。cook が最早 awaitable なコルーチンでなくなったことで「2人は互いを待たなくてよい」が「次郎は太郎の料理を待って行動を開始しなければならない」になってしまっているためです。とはいえ、店員が1人しかいない場合は実質的に「2人は互いを待たなくてよい」状態ではなくなっているので、現実のシステムではこちらの方がいいかもしれません(設計によると思います)。

async def cook():
    fd = open(__file__, 'r')
    fcntl.flock(fd, fcntl.LOCK_EX)
    time.sleep(5)  # 料理
    fcntl.flock(fd, fcntl.LOCK_UN)
    fd.close()
$ python test.py
太郎が店に来ました.
太郎の注文ができました(5秒経過).
次郎が店に来ました.
次郎の注文ができました(5秒経過).
太郎は食事を終えました(10秒経過).
次郎は食事を終えました(10秒経過).