Fluentdを利用してS3にPythonログを収集する

Fluentd のインストール

td-agent をインストールするか、または Ruby gem(pip に似ている)で fluentd という gem をインストールします。

td-agent は Fluentd の配布パッケージです。Fluentd gem 以外、fluent-plugin-s3 などのいくつかのプラグインもあらかじめインストールされています。

  • Amazon Linux 2 で td-agent をインストール:
    $ curl -L https://toolbelt.treasuredata.com/sh/install-amazon2-td-agent4.sh | sh
    
  • 起動:
    $ sudo systemctl start td-agent.service
    

    詳細については、公式ドキュメント を参照してください。

Python アプリから Fluentd にログを送信するには、fluent-logger-python ライブラリをインストールします。

$ pip install fluent-logger

Fluentd 側の設定

Fluentd の設定ファイル /etc/td-agent/td-agent.conf を編集します。

  • source ディレクティブ:入力ソースを指定します
  • match ディレクティブ:出力先を指定します

入力ソースは in_forward プラグインを指定し、TCP ソケットをリッスンして Python のログイベントを受信します。

出力先は out_s3 プラグインを利用して、ログをS3に集約します。 tagwebapi.* パターンに一致するタグが出力対象となります。

<source>
  @type forward
  @id input_forward
  bind 127.0.0.1
  port 24224
</source>

<match webapi.*>
  @type s3
  s3_bucket BUCKET_NAME
  s3_region ap-northeast-1
  path logs/%Y/%m/%d/${tag[0]}/${tag[1]}_

  <buffer tag,time>
    @type file
    path /var/log/td-agent/s3/webapi
    timekey 10m  # write in chunks every 10mins
    timekey_wait 3m  # flush delay
    timekey_zone Asia/Tokyo
    chunk_limit_size 256m
  </buffer>

  time_slice_format %H%M
</match>

Pythonアプリ側の設定

fluent-logger-python は FluentHandler というロギングハンドラーを提供しています。

以下は YAML で記述された logging 設定ファイルの例です。

version: 1

formatters:
  standard:
    format: '%(asctime)0.19s %(name)s [%(levelname)s]: %(message)s'
  fluent_fmt:
    '()': fluent.handler.FluentRecordFormatter
    format:
      name: '%(name)s'
      level: '%(levelname)s'
      hostname: '%(hostname)s'

handlers:
  console:
    class: logging.StreamHandler
    level: INFO
    formatter: standard
    stream: ext://sys.stdout

  webapi_fluentd_handler:
    class: fluent.handler.FluentHandler
    host: 127.0.0.1  # ホストを指定します
    port: 24224  # ポート番号を指定します
    tag: webapi.test  # Fluentd がマッチできるようなタグを付けます
    formatter: fluent_fmt
    level: INFO

root:
  level: INFO
  handlers: [console]
  propagate: no

loggers:
  webapi:
    level: INFO
    handlers: [console, webapi_fluentd_handler]
    propagate: no

以上の設定を読み込んで logging.config.dictConfig() に渡します。

import yaml
import logging
import logging.config


def custom_logger(name, yml_fp):
    with open(yml_fp) as f:
        config = yaml.safe_load(f)
    try:
        logging.config.dictConfig(config)
    except Exception as e:
        raise e
    logger = logging.getLogger(name)
    return logger