107 lines
4.4 KiB
Python
107 lines
4.4 KiB
Python
"""
|
|
run_data_loop.py
|
|
────────────────
|
|
数据闭环巡检入口 — 指挥官模式
|
|
|
|
不再通过 pytest 调度 410 个分散用例,
|
|
而是直接实例化 DataLoopCommander:
|
|
1 个浏览器 → 1 次登录 → 9 个场景串行跑完 → 关闭
|
|
|
|
由 AutoFlow 平台 (platform_app.py) 调用:
|
|
python run_data_loop.py
|
|
"""
|
|
import os
|
|
import sys
|
|
import json
|
|
from pathlib import Path
|
|
|
|
|
|
def main():
|
|
print("🎬 数据闭环巡检 — 指挥官模式启动")
|
|
|
|
# ── 环境变量 (由 platform_app.py 注入) ────────────────────────────────────
|
|
task_id = os.environ.get("ROBOGO_TASK_ID", "default_task")
|
|
scope = os.environ.get("ROBOGO_SCOPE", "all")
|
|
|
|
# 凭证传递
|
|
if os.environ.get("AUTH_ACCOUNT"):
|
|
os.environ["TEST_USERNAME"] = os.environ["AUTH_ACCOUNT"]
|
|
if os.environ.get("AUTH_PASSWORD"):
|
|
os.environ["TEST_PASSWORD"] = os.environ["AUTH_PASSWORD"]
|
|
|
|
# 环境 URL 传递
|
|
robo_env = os.environ.get("ROBOGO_ENV", "FAT").upper()
|
|
if robo_env == "PROD":
|
|
os.environ.setdefault("BASE_URL", "https://cloud.d-robotics.cc/d-cloud/welcome")
|
|
os.environ.setdefault("LOGIN_URL", "https://sso.d-robotics.cc/")
|
|
else:
|
|
os.environ.setdefault("BASE_URL", "https://cloud-fat.d-robotics.cc/d-cloud/welcome")
|
|
os.environ.setdefault("LOGIN_URL", "https://sso-fat.d-robotics.cc/")
|
|
|
|
# ── 关键:在 chdir 之前,把所有路径统一转成绝对路径写回环境变量 ────────────
|
|
# 平台注入的已经是绝对路径, 但本地调试 fallback 是相对路径, 必须在此处锁定
|
|
platform_root = Path.cwd().absolute()
|
|
reports_dir = os.environ.get("ROBOGO_REPORTS_DIR", str(platform_root / "platform_reports"))
|
|
screenshots_dir = os.environ.get("ROBOGO_SCREENSHOTS_DIR", str(platform_root / "platform_artifacts" / "screenshots"))
|
|
|
|
# 确保路径是绝对路径 (如果平台传的是相对路径也兜住)
|
|
reports_dir = str(Path(reports_dir).absolute())
|
|
screenshots_dir = str(Path(screenshots_dir).absolute())
|
|
|
|
# 写回环境变量 — Commander 内部直接读这些
|
|
os.environ["ROBOGO_REPORTS_DIR"] = reports_dir
|
|
os.environ["ROBOGO_SCREENSHOTS_DIR"] = screenshots_dir
|
|
|
|
# ── 切换工作目录到 dataloop 项目根 ────────────────────────────────────────
|
|
dataloop_root = Path("dataloop/data-loop-automation").absolute()
|
|
os.chdir(dataloop_root)
|
|
sys.path.insert(0, str(dataloop_root))
|
|
|
|
# ── 导入并执行指挥官 ─────────────────────────────────────────────────────
|
|
from business.data_loop_commander import DataLoopCommander
|
|
|
|
headless_str = os.environ.get("HEADLESS", "false").lower()
|
|
headless = headless_str == "true"
|
|
|
|
print(f"🚀 执行参数: 环境={robo_env} | 范围={scope} | 无头={headless}")
|
|
print(f" 任务ID={task_id}")
|
|
print(f" 报告目录={reports_dir}")
|
|
print(f" 截图目录={screenshots_dir}")
|
|
|
|
commander = DataLoopCommander(headless=headless)
|
|
|
|
try:
|
|
commander.start()
|
|
commander.login()
|
|
commander.run_all_scenarios(scope=scope)
|
|
print("✅ 巡检任务执行成功")
|
|
exit_code = 0
|
|
except Exception as e:
|
|
print(f"❌ 巡检任务执行失败: {e}")
|
|
exit_code = 1
|
|
finally:
|
|
# ── 保存结构化报告 ─────────────────────────────────────────────────────
|
|
commander.save_results()
|
|
|
|
# 确认报告文件存在
|
|
result_file = Path(reports_dir) / f"{task_id}_results.json"
|
|
if result_file.exists():
|
|
print(f"📊 结构化报告已生成: {result_file}")
|
|
else:
|
|
# 兜底: 直接写入
|
|
try:
|
|
Path(reports_dir).mkdir(exist_ok=True, parents=True)
|
|
with open(result_file, "w", encoding="utf-8") as f:
|
|
json.dump(commander.results, f, ensure_ascii=False, indent=2)
|
|
print(f"📊 结构化报告已生成 (兜底): {result_file}")
|
|
except Exception as e:
|
|
print(f"⚠️ 报告写入失败: {e}")
|
|
|
|
commander.stop()
|
|
|
|
sys.exit(exit_code)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|