127 lines
5.5 KiB
Python
127 lines
5.5 KiB
Python
"""
|
||
migrate_sqlite_to_mysql.py
|
||
─────────────────────────────────────────────────────────────────────────────
|
||
将本地 SQLite (quantum_test.db) 的全量数据迁移至百度云 MySQL (case_platform)。
|
||
|
||
用法:
|
||
cd backend
|
||
source venv/bin/activate
|
||
python migrate_sqlite_to_mysql.py
|
||
|
||
迁移顺序(严格按依赖关系,避免外键冲突):
|
||
test_cases → test_plans → test_tasks → bugs
|
||
"""
|
||
|
||
import json
|
||
from sqlalchemy import create_engine, text
|
||
from sqlalchemy.orm import sessionmaker
|
||
|
||
# ── 源:SQLite ─────────────────────────────────────────────────────────────
|
||
SQLITE_URL = "sqlite:///./quantum_test.db"
|
||
|
||
# ── 目标:MySQL ────────────────────────────────────────────────────────────
|
||
MYSQL_URL = (
|
||
"mysql+pymysql://root_dev:8B1EBC1509cc602b"
|
||
"@mysql1.rdsmbk3ednsgnnt.rds.bj.baidubce.com:3306"
|
||
"/case_platform?charset=utf8mb4"
|
||
)
|
||
|
||
|
||
def make_session(url, **engine_kwargs):
|
||
engine = create_engine(url, **engine_kwargs)
|
||
Session = sessionmaker(bind=engine)
|
||
return engine, Session()
|
||
|
||
|
||
def migrate():
|
||
print("=" * 60)
|
||
print("📦 SQLite → MySQL 数据迁移")
|
||
print("=" * 60)
|
||
|
||
# ── 连接两端 ────────────────────────────────────────────────────────────
|
||
sqlite_engine, src = make_session(
|
||
SQLITE_URL, connect_args={"check_same_thread": False}
|
||
)
|
||
mysql_engine, dst = make_session(
|
||
MYSQL_URL,
|
||
pool_pre_ping=True,
|
||
pool_recycle=1800,
|
||
)
|
||
|
||
# ── 在 MySQL 中建表(如果不存在) ────────────────────────────────────────
|
||
print("\n[1/5] 在 MySQL 中初始化表结构...")
|
||
import models
|
||
models.Base.metadata.create_all(bind=mysql_engine)
|
||
print(" ✅ 表结构就绪")
|
||
|
||
# ── 通用:清空目标表后写入 ───────────────────────────────────────────────
|
||
def migrate_table(table_name: str, rows):
|
||
if not rows:
|
||
print(f" ⚠️ {table_name}: 源表为空,跳过")
|
||
return
|
||
# 清空(先删子表,再删父表,顺序由调用方保证)
|
||
dst.execute(text(f"DELETE FROM {table_name}"))
|
||
dst.commit()
|
||
|
||
inserted = 0
|
||
for row in rows:
|
||
data = dict(row._mapping)
|
||
# JSON 字段在 SQLite 里是字符串,需要反序列化
|
||
for col in ("steps", "tags", "case_ids"):
|
||
if col in data and isinstance(data[col], str):
|
||
try:
|
||
data[col] = json.loads(data[col])
|
||
except (json.JSONDecodeError, TypeError):
|
||
data[col] = None
|
||
cols = ", ".join(data.keys())
|
||
placeholders = ", ".join(f":{k}" for k in data.keys())
|
||
dst.execute(
|
||
text(f"INSERT INTO {table_name} ({cols}) VALUES ({placeholders})"),
|
||
data,
|
||
)
|
||
inserted += 1
|
||
dst.commit()
|
||
print(f" ✅ {table_name}: 迁移 {inserted} 条")
|
||
|
||
# ── 读取 SQLite 数据 ─────────────────────────────────────────────────────
|
||
print("\n[2/5] 读取 SQLite 数据...")
|
||
cases = src.execute(text("SELECT * FROM test_cases")).fetchall()
|
||
plans = src.execute(text("SELECT * FROM test_plans")).fetchall() if _table_exists(src, "test_plans") else []
|
||
tasks = src.execute(text("SELECT * FROM test_tasks")).fetchall() if _table_exists(src, "test_tasks") else []
|
||
bugs = src.execute(text("SELECT * FROM bugs")).fetchall() if _table_exists(src, "bugs") else []
|
||
|
||
print(f" test_cases : {len(cases)} 条")
|
||
print(f" test_plans : {len(plans)} 条")
|
||
print(f" test_tasks : {len(tasks)} 条")
|
||
print(f" bugs : {len(bugs)} 条")
|
||
|
||
# ── 迁移(父表先于子表) ─────────────────────────────────────────────────
|
||
print("\n[3/5] 迁移 test_cases ...")
|
||
migrate_table("test_cases", cases)
|
||
|
||
print("\n[4/5] 迁移 test_plans / test_tasks / bugs ...")
|
||
migrate_table("test_plans", plans)
|
||
migrate_table("test_tasks", tasks)
|
||
migrate_table("bugs", bugs)
|
||
|
||
print("\n[5/5] 验证 MySQL 行数...")
|
||
for tbl in ("test_cases", "test_plans", "test_tasks", "bugs"):
|
||
count = dst.execute(text(f"SELECT COUNT(*) FROM {tbl}")).scalar()
|
||
print(f" {tbl:<14}: {count} 条")
|
||
|
||
src.close()
|
||
dst.close()
|
||
print("\n🎉 迁移完成!")
|
||
|
||
|
||
def _table_exists(session, table_name: str) -> bool:
|
||
"""检查 SQLite 中该表是否存在"""
|
||
result = session.execute(
|
||
text(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}'")
|
||
).fetchone()
|
||
return result is not None
|
||
|
||
|
||
if __name__ == "__main__":
|
||
migrate()
|