Celeryのブロードキャストについて

broker は RabbitMQ を使う場合

メッセージブローカーは RabbitMQ を使用する場合は、公式ドキュメントで書いたとおり Broadcast キューを利用しています。

from kombu import Queue, Exchange
from kombu.common import Broadcast

celery = Celery(
    'worker',
    broker='amqp://rabbit:rabbit@localhost:5672/testhost',
    backend='redis://localhost:6379/1'
)
celery.conf.task_queues = (
    Broadcast('btasks_add', routing_key='btasks_add'),
    Queue('mul', Exchange('mul'), routing_key='mul'),
)
celery.conf.task_routes = {
    'add': {
        'queue': 'btasks_add',
        'exchange': 'btasks_add'
    },
    'multiply': {
        'queue': 'mul',
        'exchange': 'mul'
    },
}

Worker を起動します。-Q オプションにキューの名前を指定すれば、Worker が指定されたキューからメッセージを消費することができます。

$ celery -A btask.celery worker --loglevel=info -P solo -n worker1@%h
$ celery -A btask.celery worker --loglevel=info -P solo -n worker2@%h -Q btasks_add

普通のキューの exchange タイプは direct、broadcast キュー(btasks_add)の exchange タイプは fanout となります。

 [config]
 .> app:         worker1:0x7fc8226d4ad0
 .> transport:   amqp://rabbit:**@localhost:5672/testhost
 .> results:     redis://localhost:6379/1
 .> concurrency: 8 (solo)
 .> task events: OFF (enable -E to monitor tasks in this worker)

 [queues]
 .> bcast.e1ebfbd3-eaa3-448d-8cdb-367c68423050 exchange=btasks_add(fanout) key=btasks_add
 .> mul       exchange=mul(direct) key=mul

[tasks]
  . btask.add
  . btask.multiply

  • Exchange タイプについて

    プロデューサーは Exchange にメッセージを送信します。Exchange はメッセージを受け取り、キューにルーティングします。コンシューマーはキューからメッセージを受信します。

    そして、Exchange にはいくつかのタイプを利用することができます。タイプは AMQP によって定義されています。

    • Direct exchange では、キューは routing key によって Exchange にバインドされます。Exchange はこれとメッセージの routing key を比較して、一致すればキューに送信します。
    • Fanout exchange の場合は、routing key が無視され、受け取ったメッセージはこの Exchange にバインドするすべてのキューに送ります。

    他にも Topic と Headers があります。詳しくはこちらを参照ください。

実行してみよう。apply_async() を利用すれば、キューの名前を指定することができます。

>>> from btask import add, multiply
>>> r = add.apply_async([3, 5], queue='btasks_add')
>>> r = multiply.apply_async([3, 5], queue='mul')

broker は Redis を使う場合

Redis を使用する場合は、直接に Queue を使って、Exchange タイプを fanout に指定すればブロードキャストできます。

celery = Celery(
    'worker1',
    broker='redis://localhost:6379/2',
    backend='redis://localhost:6379/1'
)
celery.conf.task_queues = (
    Queue('redis_add', Exchange('redis_add', type='fanout'), routing_key='redis_add'),
    Queue('redis_mul', Exchange('redis_mul'), routing_key='redis_mul'),
)
celery.conf.task_routes = {
    'add': {
        'queue': 'redis_add',
        'exchange': 'redis_add'
    },
    'multiply': {
        'queue': 'redis_mul',
        'exchange': 'redis_mul'
    },
}