Celeryメモ

リモートコントロールコマンド

  • 起動されたワーカーの状況を確認:app.control.inspect().stats()

  • ping を送信:app.control.ping(destination=['celery@worker1'])

    destination を指定することで、対象となるワーカーだけにコマンドを送信することができる。

  • ワーカーをシャットダウン: app.control.shutdown(destination=['celery@worker1'])

    または app.control.broadcast('shutdown', destination=['celery@worker1'])

  • コマンドをブロードキャスト:app.control.broadcast()

以下はコントロールコマンドを使用してワーカーにデータを送る例です。

def send_data_to_worker(data, destination):
    from celery_worker import celery
    worker_dest = []
    stats = celery.control.inspect().stats()
    # sort based on uptime
    workers_online = sorted(stats, key=lambda x: stats[x]['uptime'])
    if len(destination) == 0:
        # default is to send data to the most recently started worker
        worker_dest.append(workers_online[0])
    else:
        if all(x in workers_online for x in destination):
            worker_dest = destination
        else:
            diff = set(destination) - set(workers_online)
            raise ValueError(f'Unknown destination: {diff}')

    celery.control.broadcast('send_data',
                             arguments={'data': data},
                             destination=worker_dest)

コマンドを利用するには、以下のようにコマンドを登録する必要がある。

from celery.worker.control import Panel


@Panel.register
def send_data(panel, data=None):
    print(data)

参考:https://docs.celeryproject.org/en/stable/reference/celery.app.control.html

タスクへのプリフェッチ

プリフェッチを設定することで、ワーカーは broker と頻繫に通信する必要がなく、予めメッセージをプリフェッチし、メモリにアクセスすることで、パフォーマンスを向上できる。

プリフェッチのメリットは短時間タスクに向いている。

長時間タスクの場合、タスクをプリフェッチすると、ワーカーがアイドル状態になる場合があるので、逆に非効率になる。

以下の設定は、プリフェッチを無効にする。

task_acks_late = True
worker_prefetch_multiplier = 1

task_acks_late のデフォルト値は False で、タスク実行前に承認されるという意味です。タスク実行中にワーカーが終了した場合、タスクは別のワーカーに割り当てられない。

True にする場合、タスク実行後に承認される。タスク実行中にワーカーが終了した場合、タスクは別のワーカーに割り当てられる。

参考:https://docs.celeryq.dev/en/stable/userguide/optimizing.html#reserve-one-task-at-a-time

ワーカー異常終了した場合のタスク再実行

task_acks_late を True に設定した上で、task_reject_on_worker_lost を True にすることで、ワーカーがタスク処理中に SIGTERM/SIGKILL などのシグナルを受信したとき、メッセージは再度キューに入れられ、タスクはワーカーに再実行される。

Redis には visibility timeout という設定もあり、visibility timeout 内にタスクが承認されない場合、タスクは別のワーカーに割り当てられ、実行される。デフォルト値は 1 時間なので、ワーカーが SIGKILL を受信すると、タスクが再割り当てされるまで 1 時間かかる。

visibility timeout を短く設定したい場合、broker_transport_options で設定できる。

broker_transport_options = {'visibility_timeout': 600}

また、worker_deduplicate_successful_tasks を True に設定すれば、backend で同じタスクIDの結果キーがあり、タスク状態が「SUCCESS」の場合、タスクが重複実行されない。

優先度付きメッセージ

RabbitMQ の場合は、task_queue_max_prioritytask_default_priorityを設定することで、すべてのキュー / タスクにデフォルト値を指定することができる。

Redisの場合は、priority_steps で指定したリストの長さ n に基づいて n 個のリストを作成することで、優先度をサポートすることができる。

app.conf.broker_transport_options = {
    'priority_steps': [0, 3, 6],
    'queue_order_strategy': 'priority',
}

例えば、priority_steps を [0, 3, 6] に指定すれば、元のキューが3つのキューに分割される。

優先度を 0~2 の範囲で異なる値を指定しても、実は同じ段階です。

>>> r = multiply.apply_async([2, 1], queue='redis_mul', priority=3)
>>> r = multiply.apply_async([2, 2], queue='redis_mul', priority=1)
>>> r = multiply.apply_async([2, 3], queue='redis_mul', priority=6)

redis-cli で確認してみてください。

最も優先度の高いキューは redis_mul です。 ほかのキューには、名前にセパレータ(\x06\x16)と優先度番号が付けられる。

127.0.0.1:6379> keys *
1) "_kombu.binding.redis_add"
2) "redis_mul\x06\x166"
3) "redis_mul"
4) "_kombu.binding.redis_mul"
5) "redis_mul\x06\x163"
127.0.0.1:6379> type redis_mul
list
127.0.0.1:6379> llen redis_mul
(integer) 1

参考:https://docs.celeryproject.org/en/stable/userguide/routing.html#special-routing-options