Celeryのロガーをカスタマイズする

Celery は after_setup_logger シグナルを提供している。このシグナルにカスタムログハンドラーを渡せば、Celery がグローバルロガーを設定した後にトリガーされる。

タスク内でロガーを使うには get_task_logger() を利用する。

from celery import Task
from celery.exceptions import Ignore, WorkerShutdown
from celery.utils.log import get_task_logger
from celery.signals import (
    celeryd_init, worker_shutting_down, after_setup_logger)
from utils import custom_logger

logger = custom_logger(__name__)
after_setup_logger.connect(logger)


@celeryd_init.connect
def configure(sender=None, conf=None, **kwargs):
    from redis.exceptions import (
        AuthenticationError, ConnectionError, ResponseError)
    r = RedisClient(
        host=r_cfg['host'], port=r_cfg['port'],
        password=r_cfg['password'], db=r_cfg['db'])
    try:
        r.db.ping()
    except (AuthenticationError, ConnectionError, ResponseError):
        logger.exception(
            f"Failed to connect to Redis at {r_cfg['host']}:{r_cfg['port']}!")
        raise WorkerShutdown()
    logger.info(f"Connected to Redis at {r_cfg['host']}:{r_cfg['port']}")
    MyTask.r_client = r


class MyTask(Task):
    r_client = None
    _logger = get_task_logger(__name__)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        msg = f'[Task {task_id}] failed: {exc}.\n{einfo}'
        self._logger.exception(msg)
        super().on_failure(exc, task_id, args, kwargs, einfo)

Tags:

Updated: