boto3でローカルディレクトリとS3バケットを同期

boto3 は download_file() や upload_file() を提供しているが、これらを実行するたびに同じファイルであってもコピーされる。

AWSCLI のs3 syncコマンドみたいに、差分がある場合だけコピーし、無駄なコピーを避けたいです。

以下の例は、list_objects_v2() で取得した S3 のファイル情報(ファイルサイズとタイムスタンプ)とローカルのファイル情報を比較して、異なっている場合のみファイルをアップロード・ダウンロードする。

まずは、ローカルと S3 のファイル情報を取得。

import os
from pathlib import Path
from typing import NamedTuple

import boto3


class FileInfo(NamedTuple):
    """ローカルに格納されたファイルの情報"""
    size: int
    mtime: int
    filepath: Path


class S3FileInfo(NamedTuple):
    """S3に格納されたファイルの情報"""
    size: int
    mtime: int
    key: str


class S3Sync:
    def __init__(self):
        self.s3_scheme = 's3://'
        self.bucket_name = None
        self.s3_client = boto3.client('s3')

    def get_local_files_info(self, local_path: Path):
        """ローカルディレクトリ内のファイル情報を取得"""
        local_files_info = {}
        local_files = list(
            f.resolve() for f in local_path.glob('**/*') if f.is_file())
        for fp in local_files:
            # ファイルの名前を取得
            local_fname = fp.name
            # ファイルの更新時間を取得
            local_mtime = int(fp.stat().st_mtime)
            # ファイルのサイズを取得
            local_fsize = fp.stat().st_size
            local_files_info[local_fname] = FileInfo(
                local_fsize, local_mtime, fp)
        return local_files_info

    def get_s3_object_list(self, bucket_name, key_name):
        """指定したオブジェクトキー名でS3オブジェクトリストを取得"""
        resp = self.s3_client.list_objects_v2(
            Bucket=bucket_name,
            Prefix=key_name)
        s3_object_list = resp.get('Contents')
        return s3_object_list

    def get_s3_files_info(self, bucket_name, key_name):
        """S3フォルダ内のファイル情報を取得"""
        s3_object_list = self.get_s3_object_list(bucket_name, key_name)
        s3_files_info = {}
        if s3_object_list is not None:
            for item in s3_object_list:
                key = item.get('Key')
                # S3に保存されたファイルの名前を取得
                s3_fname = key.rsplit("/", 1)[1]
                if s3_fname != '':
                    # ファイルの更新時間を取得
                    mdt = item.get('LastModified')
                    s3_mtime = int(mdt.timestamp())
                    # ファイルのサイズを取得
                    s3_size = item.get('Size')
                    s3_files_info[s3_fname] = S3FileInfo(s3_size, s3_mtime, key)
        return s3_files_info

そして、コピー対象となるファイルリストを取得。アップロードの場合はほぼ同じなので省略。

def get_download_list(self, bucket_name, key_name, local_path):
    """ダウンロードが必要なS3ファイルリストを取得"""
    local_files_info = self.get_local_files_info(local_path)
    s3_files_info = self.get_s3_files_info(bucket_name, key_name)

    wait_download = []

    if len(s3_files_info) != 0:
        for s3_fname, s3_finfo in s3_files_info.items():
            # ローカルにファイルが存在するかどうかを確認
            local_finfo = local_files_info.get(s3_fname)
            if local_finfo is None:
                # ローカルにファイルが存在しない
                wait_download.append((s3_finfo.key, s3_finfo.mtime))
            else:
                # ローカルのファイル情報とS3のファイル情報が不一致
                if (local_finfo.size != s3_finfo.size
                   or local_finfo.mtime != s3_finfo.mtime):
                    wait_download.append((s3_finfo.key, s3_finfo.mtime))
    return wait_download

対象となるファイルリストを取得したら、download_file() と upload_file() を使ってダウンロード・アップロードを行えばいい。

注意すべきなのは、コピー済みのファイルに対して、S3 の更新時間に基づいてos.utime()でローカルファイルの更新時間を S3 での更新時間と一致させる必要がある。

Tags:

Updated: