Celeryのブロードキャストについて
broker は RabbitMQ を使う場合
メッセージブローカーは RabbitMQ を使用する場合は、公式ドキュメントで書いたとおり Broadcast キューを利用しています。
from kombu import Queue, Exchange
from kombu.common import Broadcast
celery = Celery(
'worker',
broker='amqp://rabbit:rabbit@localhost:5672/testhost',
backend='redis://localhost:6379/1'
)
celery.conf.task_queues = (
Broadcast('btasks_add', routing_key='btasks_add'),
Queue('mul', Exchange('mul'), routing_key='mul'),
)
celery.conf.task_routes = {
'add': {
'queue': 'btasks_add',
'exchange': 'btasks_add'
},
'multiply': {
'queue': 'mul',
'exchange': 'mul'
},
}
Worker を起動します。-Q
オプションにキューの名前を指定すれば、Worker が指定されたキューからメッセージを消費することができます。
$ celery -A btask.celery worker --loglevel=info -P solo -n worker1@%h
$ celery -A btask.celery worker --loglevel=info -P solo -n worker2@%h -Q btasks_add
普通のキューの exchange タイプは direct
、broadcast キュー(btasks_add)の exchange タイプは fanout
となります。
[config]
.> app: worker1:0x7fc8226d4ad0
.> transport: amqp://rabbit:**@localhost:5672/testhost
.> results: redis://localhost:6379/1
.> concurrency: 8 (solo)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> bcast.e1ebfbd3-eaa3-448d-8cdb-367c68423050 exchange=btasks_add(fanout) key=btasks_add
.> mul exchange=mul(direct) key=mul
[tasks]
. btask.add
. btask.multiply
-
Exchange タイプについて
プロデューサーは Exchange にメッセージを送信します。Exchange はメッセージを受け取り、キューにルーティングします。コンシューマーはキューからメッセージを受信します。
そして、Exchange にはいくつかのタイプを利用することができます。タイプは AMQP によって定義されています。
- Direct exchange では、キューは routing key によって Exchange にバインドされます。Exchange はこれとメッセージの routing key を比較して、一致すればキューに送信します。
- Fanout exchange の場合は、routing key が無視され、受け取ったメッセージはこの Exchange にバインドするすべてのキューに送ります。
他にも Topic と Headers があります。詳しくはこちらを参照ください。
実行してみよう。apply_async() を利用すれば、キューの名前を指定することができます。
>>> from btask import add, multiply
>>> r = add.apply_async([3, 5], queue='btasks_add')
>>> r = multiply.apply_async([3, 5], queue='mul')
broker は Redis を使う場合
Redis を使用する場合は、直接に Queue を使って、Exchange タイプを fanout に指定すればブロードキャストできます。
celery = Celery(
'worker1',
broker='redis://localhost:6379/2',
backend='redis://localhost:6379/1'
)
celery.conf.task_queues = (
Queue('redis_add', Exchange('redis_add', type='fanout'), routing_key='redis_add'),
Queue('redis_mul', Exchange('redis_mul'), routing_key='redis_mul'),
)
celery.conf.task_routes = {
'add': {
'queue': 'redis_add',
'exchange': 'redis_add'
},
'multiply': {
'queue': 'redis_mul',
'exchange': 'redis_mul'
},
}