PR

【データ衝突なし】データベース、CSVやJSON同期方法の詳細解説 – 既存テーブルの取り扱い

【データ衝突なし】データベース、CSVやJSON同期方法の詳細解説 – 既存テーブルの取り扱い

同期方法の基本戦略

データベースの同期処理では、主に以下の3つのケースを考慮する必要があります:

  1. 新規レコードの追加
  2. 既存レコードの更新
  3. 不要レコードの取り扱い

既存テーブルが存在する場合の処理を詳しく実装してみましょう。

import pandas as pd
from sqlalchemy import create_engine, inspect, MetaData, Table, Column, Integer, String, text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import logging
from datetime import datetime

# ロギング設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

Base = declarative_base()
engine = create_engine('postgresql://user:password@localhost:5432/dbname')
Session = sessionmaker(bind=engine)

class Genre(Base):
    __tablename__ = 'genres'

    id = Column(Integer, primary_key=True)
    genre_id = Column(Integer, unique=True, nullable=False)
    genre_name = Column(String(255), nullable=False)
    genre_level = Column(Integer, nullable=False)
    category_id = Column(Integer, nullable=False)
    last_updated = Column(String(255), nullable=False)  # 更新日時追加

    def __repr__(self):
        return f"<Genre(genre_id={self.genre_id}, name={self.genre_name})>"

def check_table_structure():
    """既存テーブルの構造をチェック"""
    inspector = inspect(engine)
    if 'genres' in inspector.get_table_names():
        columns = [col['name'] for col in inspector.get_columns('genres')]
        required_columns = ['id', 'genre_id', 'genre_name', 'genre_level', 'category_id', 'last_updated']

        missing_columns = set(required_columns) - set(columns)
        if missing_columns:
            logger.warning(f"不足しているカラム: {missing_columns}")
            return False
        return True
    return False

def sync_genres(file_path):
    """ジャンルデータの同期処理"""
    try:
        # CSVデータ読み込み
        df = pd.read_csv(file_path)
        session = Session()

        # 現在のタイムスタンプ
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

        # 既存のgenre_idリスト取得
        existing_genre_ids = set(row[0] for row in session.query(Genre.genre_id).all())
        # CSVのgenre_idリスト
        csv_genre_ids = set(df['genre_id'].tolist())

        # 削除対象の特定(既存にあってCSVにないもの)
        genres_to_delete = existing_genre_ids - csv_genre_ids
        if genres_to_delete:
            logger.info(f"削除対象のgenre_id: {genres_to_delete}")
            # 削除処理
            session.query(Genre).filter(Genre.genre_id.in_(genres_to_delete)).delete(synchronize_session='fetch')

        # データ同期処理
        for _, row in df.iterrows():
            try:
                existing_genre = session.query(Genre).filter_by(
                    genre_id=row['genre_id']
                ).first()

                if existing_genre:
                    # 更新が必要か確認
                    if (existing_genre.genre_name != row['genre_name'] or
                        existing_genre.genre_level != row['genre_level'] or
                        existing_genre.category_id != row['category_id']):

                        # データ更新
                        existing_genre.genre_name = row['genre_name']
                        existing_genre.genre_level = row['genre_level']
                        existing_genre.category_id = row['category_id']
                        existing_genre.last_updated = current_time
                        logger.info(f"ジャンル更新: {row['genre_id']}")

                else:
                    # 新規レコード作成
                    new_genre = Genre(
                        genre_id=row['genre_id'],
                        genre_name=row['genre_name'],
                        genre_level=row['genre_level'],
                        category_id=row['category_id'],
                        last_updated=current_time
                    )
                    session.add(new_genre)
                    logger.info(f"新規ジャンル追加: {row['genre_id']}")

            except Exception as e:
                logger.error(f"レコード処理エラー - genre_id: {row['genre_id']}: {str(e)}")
                session.rollback()
                continue

        # コミット
        session.commit()
        logger.info(f"同期完了 - 追加/更新: {len(df)}, 削除: {len(genres_to_delete)}")

    except Exception as e:
        logger.error(f"同期処理エラー: {str(e)}")
        session.rollback()

    finally:
        session.close()

def main():
    # テーブル構造チェック
    if not check_table_structure():
        logger.error("テーブル構造が不適切です")
        return

    # データ同期実行
    sync_genres('genres.csv')

if __name__ == "__main__":
    main()

既存テーブルの取り扱い方法

1. テーブル構造のチェック

既存テーブルが存在する場合、まず構造をチェックします:

def check_table_structure():
    inspector = inspect(engine)
    if 'genres' in inspector.get_table_names():
        columns = [col['name'] for col in inspector.get_columns('genres')]
        required_columns = ['id', 'genre_id', 'genre_name', 'genre_level', 'category_id', 'last_updated']

        missing_columns = set(required_columns) - set(columns)
        if missing_columns:
            logger.warning(f"不足しているカラム: {missing_columns}")
            return False
        return True
    return False

2. 同期処理の詳細

同期処理では以下の3つのケースを処理します:

a. 新規レコードの追加

  • CSVにあって、DBにないレコードを追加
  • 追加時に現在のタイムスタンプを記録

b. 既存レコードの更新

  • 値の変更がある場合のみ更新
  • 更新時にタイムスタンプを更新

c. 不要レコードの削除

  • DBにあって、CSVにないレコードを特定
  • 一括で削除処理を実行

3. データの整合性確保

# 既存のgenre_idリスト取得
existing_genre_ids = set(row[0] for row in session.query(Genre.genre_id).all())
# CSVのgenre_idリスト
csv_genre_ids = set(df['genre_id'].tolist())

# 削除対象の特定(既存にあってCSVにないもの)
genres_to_delete = existing_genre_ids - csv_genre_ids

4. エラーハンドリング

  • トランザクション管理による一貫性確保
  • 個別レコードの処理失敗時も継続
  • 詳細なログ記録

運用上の注意点

  1. バックアップ
  • 同期処理前にデータのバックアップを取得することを推奨
  1. 実行タイミング
  • システム負荷の少ない時間帯に実行
  • 定期実行の場合はスケジューリングを適切に設定
  1. ログ監視
  • 同期処理の結果を必ず確認
  • エラーログは適切に保管
  1. リカバリ計画
  • 同期処理が失敗した場合の回復手順を用意
  • ロールバック方法を確立

この実装により、既存テーブルの有無に関わらず、安全かつ確実なデータ同期が可能になります。

コメント

タイトルとURLをコピーしました