""" migrate_sqlite_to_mysql.py ───────────────────────────────────────────────────────────────────────────── 将本地 SQLite (quantum_test.db) 的全量数据迁移至百度云 MySQL (case_platform)。 用法: cd backend source venv/bin/activate python migrate_sqlite_to_mysql.py 迁移顺序(严格按依赖关系,避免外键冲突): test_cases → test_plans → test_tasks → bugs """ import json from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker # ── 源:SQLite ───────────────────────────────────────────────────────────── SQLITE_URL = "sqlite:///./quantum_test.db" # ── 目标:MySQL ──────────────────────────────────────────────────────────── MYSQL_URL = ( "mysql+pymysql://root_dev:8B1EBC1509cc602b" "@mysql1.rdsmbk3ednsgnnt.rds.bj.baidubce.com:3306" "/case_platform?charset=utf8mb4" ) def make_session(url, **engine_kwargs): engine = create_engine(url, **engine_kwargs) Session = sessionmaker(bind=engine) return engine, Session() def migrate(): print("=" * 60) print("📦 SQLite → MySQL 数据迁移") print("=" * 60) # ── 连接两端 ──────────────────────────────────────────────────────────── sqlite_engine, src = make_session( SQLITE_URL, connect_args={"check_same_thread": False} ) mysql_engine, dst = make_session( MYSQL_URL, pool_pre_ping=True, pool_recycle=1800, ) # ── 在 MySQL 中建表(如果不存在) ──────────────────────────────────────── print("\n[1/5] 在 MySQL 中初始化表结构...") import models models.Base.metadata.create_all(bind=mysql_engine) print(" ✅ 表结构就绪") # ── 通用:清空目标表后写入 ─────────────────────────────────────────────── def migrate_table(table_name: str, rows): if not rows: print(f" ⚠️ {table_name}: 源表为空,跳过") return # 清空(先删子表,再删父表,顺序由调用方保证) dst.execute(text(f"DELETE FROM {table_name}")) dst.commit() inserted = 0 for row in rows: data = dict(row._mapping) # JSON 字段在 SQLite 里是字符串,需要反序列化 for col in ("steps", "tags", "case_ids"): if col in data and isinstance(data[col], str): try: data[col] = json.loads(data[col]) except (json.JSONDecodeError, TypeError): data[col] = None cols = ", ".join(data.keys()) placeholders = ", ".join(f":{k}" for k in data.keys()) dst.execute( text(f"INSERT INTO {table_name} ({cols}) VALUES ({placeholders})"), data, ) inserted += 1 dst.commit() print(f" ✅ {table_name}: 迁移 {inserted} 条") # ── 读取 SQLite 数据 ───────────────────────────────────────────────────── print("\n[2/5] 读取 SQLite 数据...") cases = src.execute(text("SELECT * FROM test_cases")).fetchall() plans = src.execute(text("SELECT * FROM test_plans")).fetchall() if _table_exists(src, "test_plans") else [] tasks = src.execute(text("SELECT * FROM test_tasks")).fetchall() if _table_exists(src, "test_tasks") else [] bugs = src.execute(text("SELECT * FROM bugs")).fetchall() if _table_exists(src, "bugs") else [] print(f" test_cases : {len(cases)} 条") print(f" test_plans : {len(plans)} 条") print(f" test_tasks : {len(tasks)} 条") print(f" bugs : {len(bugs)} 条") # ── 迁移(父表先于子表) ───────────────────────────────────────────────── print("\n[3/5] 迁移 test_cases ...") migrate_table("test_cases", cases) print("\n[4/5] 迁移 test_plans / test_tasks / bugs ...") migrate_table("test_plans", plans) migrate_table("test_tasks", tasks) migrate_table("bugs", bugs) print("\n[5/5] 验证 MySQL 行数...") for tbl in ("test_cases", "test_plans", "test_tasks", "bugs"): count = dst.execute(text(f"SELECT COUNT(*) FROM {tbl}")).scalar() print(f" {tbl:<14}: {count} 条") src.close() dst.close() print("\n🎉 迁移完成!") def _table_exists(session, table_name: str) -> bool: """检查 SQLite 中该表是否存在""" result = session.execute( text(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}'") ).fetchone() return result is not None if __name__ == "__main__": migrate()