【データ衝突なし】データベース、CSVやJSON同期方法の詳細解説 – 既存テーブルの取り扱い
同期方法の基本戦略
データベースの同期処理では、主に以下の3つのケースを考慮する必要があります:
- 新規レコードの追加
- 既存レコードの更新
- 不要レコードの取り扱い
既存テーブルが存在する場合の処理を詳しく実装してみましょう。
import pandas as pd
from sqlalchemy import create_engine, inspect, MetaData, Table, Column, Integer, String, text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import logging
from datetime import datetime
# ロギング設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Base = declarative_base()
engine = create_engine('postgresql://user:password@localhost:5432/dbname')
Session = sessionmaker(bind=engine)
class Genre(Base):
__tablename__ = 'genres'
id = Column(Integer, primary_key=True)
genre_id = Column(Integer, unique=True, nullable=False)
genre_name = Column(String(255), nullable=False)
genre_level = Column(Integer, nullable=False)
category_id = Column(Integer, nullable=False)
last_updated = Column(String(255), nullable=False) # 更新日時追加
def __repr__(self):
return f"<Genre(genre_id={self.genre_id}, name={self.genre_name})>"
def check_table_structure():
"""既存テーブルの構造をチェック"""
inspector = inspect(engine)
if 'genres' in inspector.get_table_names():
columns = [col['name'] for col in inspector.get_columns('genres')]
required_columns = ['id', 'genre_id', 'genre_name', 'genre_level', 'category_id', 'last_updated']
missing_columns = set(required_columns) - set(columns)
if missing_columns:
logger.warning(f"不足しているカラム: {missing_columns}")
return False
return True
return False
def sync_genres(file_path):
"""ジャンルデータの同期処理"""
try:
# CSVデータ読み込み
df = pd.read_csv(file_path)
session = Session()
# 現在のタイムスタンプ
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 既存のgenre_idリスト取得
existing_genre_ids = set(row[0] for row in session.query(Genre.genre_id).all())
# CSVのgenre_idリスト
csv_genre_ids = set(df['genre_id'].tolist())
# 削除対象の特定(既存にあってCSVにないもの)
genres_to_delete = existing_genre_ids - csv_genre_ids
if genres_to_delete:
logger.info(f"削除対象のgenre_id: {genres_to_delete}")
# 削除処理
session.query(Genre).filter(Genre.genre_id.in_(genres_to_delete)).delete(synchronize_session='fetch')
# データ同期処理
for _, row in df.iterrows():
try:
existing_genre = session.query(Genre).filter_by(
genre_id=row['genre_id']
).first()
if existing_genre:
# 更新が必要か確認
if (existing_genre.genre_name != row['genre_name'] or
existing_genre.genre_level != row['genre_level'] or
existing_genre.category_id != row['category_id']):
# データ更新
existing_genre.genre_name = row['genre_name']
existing_genre.genre_level = row['genre_level']
existing_genre.category_id = row['category_id']
existing_genre.last_updated = current_time
logger.info(f"ジャンル更新: {row['genre_id']}")
else:
# 新規レコード作成
new_genre = Genre(
genre_id=row['genre_id'],
genre_name=row['genre_name'],
genre_level=row['genre_level'],
category_id=row['category_id'],
last_updated=current_time
)
session.add(new_genre)
logger.info(f"新規ジャンル追加: {row['genre_id']}")
except Exception as e:
logger.error(f"レコード処理エラー - genre_id: {row['genre_id']}: {str(e)}")
session.rollback()
continue
# コミット
session.commit()
logger.info(f"同期完了 - 追加/更新: {len(df)}, 削除: {len(genres_to_delete)}")
except Exception as e:
logger.error(f"同期処理エラー: {str(e)}")
session.rollback()
finally:
session.close()
def main():
# テーブル構造チェック
if not check_table_structure():
logger.error("テーブル構造が不適切です")
return
# データ同期実行
sync_genres('genres.csv')
if __name__ == "__main__":
main()
既存テーブルの取り扱い方法
1. テーブル構造のチェック
既存テーブルが存在する場合、まず構造をチェックします:
def check_table_structure():
inspector = inspect(engine)
if 'genres' in inspector.get_table_names():
columns = [col['name'] for col in inspector.get_columns('genres')]
required_columns = ['id', 'genre_id', 'genre_name', 'genre_level', 'category_id', 'last_updated']
missing_columns = set(required_columns) - set(columns)
if missing_columns:
logger.warning(f"不足しているカラム: {missing_columns}")
return False
return True
return False
2. 同期処理の詳細
同期処理では以下の3つのケースを処理します:
a. 新規レコードの追加
- CSVにあって、DBにないレコードを追加
- 追加時に現在のタイムスタンプを記録
b. 既存レコードの更新
- 値の変更がある場合のみ更新
- 更新時にタイムスタンプを更新
c. 不要レコードの削除
- DBにあって、CSVにないレコードを特定
- 一括で削除処理を実行
3. データの整合性確保
# 既存のgenre_idリスト取得
existing_genre_ids = set(row[0] for row in session.query(Genre.genre_id).all())
# CSVのgenre_idリスト
csv_genre_ids = set(df['genre_id'].tolist())
# 削除対象の特定(既存にあってCSVにないもの)
genres_to_delete = existing_genre_ids - csv_genre_ids
4. エラーハンドリング
- トランザクション管理による一貫性確保
- 個別レコードの処理失敗時も継続
- 詳細なログ記録
運用上の注意点
- バックアップ
- 同期処理前にデータのバックアップを取得することを推奨
- 実行タイミング
- システム負荷の少ない時間帯に実行
- 定期実行の場合はスケジューリングを適切に設定
- ログ監視
- 同期処理の結果を必ず確認
- エラーログは適切に保管
- リカバリ計画
- 同期処理が失敗した場合の回復手順を用意
- ロールバック方法を確立
この実装により、既存テーブルの有無に関わらず、安全かつ確実なデータ同期が可能になります。
コメント