Celeryメモ
リモートコントロールコマンド
-
起動されたワーカーの状況を確認:
app.control.inspect().stats()
-
ping を送信:
app.control.ping(destination=['celery@worker1'])
destination を指定することで、対象となるワーカーだけにコマンドを送信することができる。
-
ワーカーをシャットダウン:
app.control.shutdown(destination=['celery@worker1'])
または
app.control.broadcast('shutdown', destination=['celery@worker1'])
-
コマンドをブロードキャスト:
app.control.broadcast()
以下はコントロールコマンドを使用してワーカーにデータを送る例です。
def send_data_to_worker(data, destination):
from celery_worker import celery
worker_dest = []
stats = celery.control.inspect().stats()
# sort based on uptime
workers_online = sorted(stats, key=lambda x: stats[x]['uptime'])
if len(destination) == 0:
# default is to send data to the most recently started worker
worker_dest.append(workers_online[0])
else:
if all(x in workers_online for x in destination):
worker_dest = destination
else:
diff = set(destination) - set(workers_online)
raise ValueError(f'Unknown destination: {diff}')
celery.control.broadcast('send_data',
arguments={'data': data},
destination=worker_dest)
コマンドを利用するには、以下のようにコマンドを登録する必要がある。
参考:https://docs.celeryproject.org/en/stable/reference/celery.app.control.html
タスクへのプリフェッチ
プリフェッチを設定することで、ワーカーは broker と頻繫に通信する必要がなく、予めメッセージをプリフェッチし、メモリにアクセスすることで、パフォーマンスを向上できる。
プリフェッチのメリットは短時間タスクに向いている。
長時間タスクの場合、タスクをプリフェッチすると、ワーカーがアイドル状態になる場合があるので、逆に非効率になる。
以下の設定は、プリフェッチを無効にする。
task_acks_late = True
worker_prefetch_multiplier = 1
task_acks_late
のデフォルト値は False で、タスク実行前に承認されるという意味です。タスク実行中にワーカーが終了した場合、タスクは別のワーカーに割り当てられない。
True にする場合、タスク実行後に承認される。タスク実行中にワーカーが終了した場合、タスクは別のワーカーに割り当てられる。
参考:https://docs.celeryq.dev/en/stable/userguide/optimizing.html#reserve-one-task-at-a-time
ワーカー異常終了した場合のタスク再実行
task_acks_late
を True に設定した上で、task_reject_on_worker_lost
を True にすることで、ワーカーがタスク処理中に SIGTERM/SIGKILL などのシグナルを受信したとき、メッセージは再度キューに入れられ、タスクはワーカーに再実行される。
Redis には visibility timeout
という設定もあり、visibility timeout 内にタスクが承認されない場合、タスクは別のワーカーに割り当てられ、実行される。デフォルト値は 1 時間なので、ワーカーが SIGKILL を受信すると、タスクが再割り当てされるまで 1 時間かかる。
visibility timeout を短く設定したい場合、broker_transport_options
で設定できる。
broker_transport_options = {'visibility_timeout': 600}
また、worker_deduplicate_successful_tasks
を True に設定すれば、backend で同じタスクIDの結果キーがあり、タスク状態が「SUCCESS」の場合、タスクが重複実行されない。
優先度付きメッセージ
RabbitMQ の場合は、task_queue_max_priority
とtask_default_priority
を設定することで、すべてのキュー / タスクにデフォルト値を指定することができる。
Redisの場合は、priority_steps で指定したリストの長さ n に基づいて n 個のリストを作成することで、優先度をサポートすることができる。
例えば、priority_steps を [0, 3, 6] に指定すれば、元のキューが3つのキューに分割される。
優先度を 0~2 の範囲で異なる値を指定しても、実は同じ段階です。
redis-cli
で確認してみてください。
最も優先度の高いキューは redis_mul です。 ほかのキューには、名前にセパレータ(\x06\x16)と優先度番号が付けられる。
127.0.0.1:6379> keys *
1) "_kombu.binding.redis_add"
2) "redis_mul\x06\x166"
3) "redis_mul"
4) "_kombu.binding.redis_mul"
5) "redis_mul\x06\x163"
127.0.0.1:6379> type redis_mul
list
127.0.0.1:6379> llen redis_mul
(integer) 1
参考:https://docs.celeryproject.org/en/stable/userguide/routing.html#special-routing-options