Celeryのブロードキャストについて
broker は RabbitMQ を使う場合
メッセージブローカーは RabbitMQ を使用する場合は、公式ドキュメントで書いたとおり Broadcast キューを利用しています。
Worker を起動します。-Q
オプションにキューの名前を指定すれば、Worker が指定されたキューからメッセージを消費することができます。
$ celery -A btask.celery worker --loglevel=info -P solo -n worker1@%h
$ celery -A btask.celery worker --loglevel=info -P solo -n worker2@%h -Q btasks_add
普通のキューの exchange タイプは direct
、broadcast キュー(btasks_add)の exchange タイプは fanout
となります。
[config]
.> app: worker1:0x7fc8226d4ad0
.> transport: amqp://rabbit:**@localhost:5672/testhost
.> results: redis://localhost:6379/1
.> concurrency: 8 (solo)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> bcast.e1ebfbd3-eaa3-448d-8cdb-367c68423050 exchange=btasks_add(fanout) key=btasks_add
.> mul exchange=mul(direct) key=mul
[tasks]
. btask.add
. btask.multiply
-
Exchange タイプについて
プロデューサーは Exchange にメッセージを送信します。Exchange はメッセージを受け取り、キューにルーティングします。コンシューマーはキューからメッセージを受信します。
そして、Exchange にはいくつかのタイプを利用することができます。タイプは AMQP によって定義されています。
- Direct exchange では、キューは routing key によって Exchange にバインドされます。Exchange はこれとメッセージの routing key を比較して、一致すればキューに送信します。
- Fanout exchange の場合は、routing key が無視され、受け取ったメッセージはこの Exchange にバインドするすべてのキューに送ります。
他にも Topic と Headers があります。詳しくはこちらを参照ください。
実行してみよう。apply_async() を利用すれば、キューの名前を指定することができます。
broker は Redis を使う場合
Redis を使用する場合は、直接に Queue を使って、Exchange タイプを fanout に指定すればブロードキャストできます。