test/platform_app.py

311 lines
12 KiB
Python

"""
自动化平台 - Flask 后端服务
启动: ./venv/bin/python platform_app.py
访问: http://127.0.0.1:5001
"""
import os
import sys
import json
import uuid
import time
import queue
import threading
import subprocess
from datetime import datetime
from flask import Flask, request, jsonify, Response, send_from_directory
from flask_cors import CORS
app = Flask(__name__, static_folder='platform/static', template_folder='platform/templates')
CORS(app)
# ── 持久化存储配置 ─────────────────────────────────────────────────────────────
DB_FILE = "platform_db.json"
def _load_db():
"""从本地文件加载数据,若不存在则初始化空文件"""
if os.path.exists(DB_FILE):
try:
with open(DB_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
return data.get("tasks", {}), data.get("reports", {})
except Exception as e:
print(f"⚠️ 数据库加载失败: {e}")
else:
# 初次运行,初始化一个空文件,方便用户看到文件位置
try:
with open(DB_FILE, 'w', encoding='utf-8') as f:
json.dump({"tasks": {}, "reports": {}}, f)
except:
pass
return {}, {}
def _save_db():
"""保存数据到本地文件"""
try:
# 保护性写入:先写临时文件再 rename
tmp_file = DB_FILE + ".tmp"
with open(tmp_file, 'w', encoding='utf-8') as f:
json.dump({
"tasks": tasks_db,
"reports": reports_db
}, f, indent=2, ensure_ascii=False)
os.replace(tmp_file, DB_FILE)
except Exception as e:
print(f"❌ 数据库保存失败: {e}")
# ── 全局状态加载 ─────────────────────────────────────────────────────────────
tasks_db, reports_db = _load_db() # 启动时恢复历史数据
log_queues = {} # 实时日志队列无需持久化,仅用于当前会话流转
PRODUCTS = {
"robogo": {
"name": "Robogo",
"desc": "Robogo PROD环境全链路 UI 巡检 (文件管理/开发机/云桌面)",
"icon": "🤖",
"entry": "run_ui_tests.py"
},
"data_loop": {
"name": "数据闭环",
"desc": "数据闭环平台端到端验证",
"icon": "🔄",
"entry": None # 待接入
}
}
# ── 任务运行核心 ───────────────────────────────────────────────────────────────
def _stream_run(task_id: str, entry: str, account: str, password: str, run_count: int):
"""在后台线程中运行自动化脚本,并把日志实时推到队列"""
log_q = log_queues.get(task_id) or queue.Queue()
log_queues[task_id] = log_q
task = tasks_db[task_id]
task["status"] = "running"
task["started_at"] = datetime.now().isoformat()
_save_db() # 4. 任务进入运行状态时保存
total_pass = total_fail = 0
logs_all = []
def push(line: str, level: str = "INFO"):
msg = {"ts": datetime.now().strftime("%H:%M:%S"), "level": level, "msg": line}
log_q.put(json.dumps(msg))
logs_all.append(msg)
push(f"🚀 任务启动 [{task['name']}] | 产品: {task['product']} | 计划运行次数: {run_count}", "INFO")
python_bin = os.path.join(os.path.dirname(sys.executable), "python")
if not os.path.exists(python_bin):
python_bin = sys.executable
for run_idx in range(1, run_count + 1):
push(f"─────── 第 {run_idx}/{run_count} 次运行 ───────", "INFO")
run_has_error = False
try:
env = os.environ.copy()
env["ROBOGO_USER"] = account
env["ROBOGO_PWD"] = password
proc = subprocess.Popen(
[python_bin, entry],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
cwd=os.path.dirname(os.path.abspath(__file__)),
env=env,
bufsize=1
)
task["pid"] = proc.pid
for line in proc.stdout:
line = line.rstrip()
if not line:
continue
# 解析日志级别
level = "INFO"
if "[ERROR]" in line or "" in line:
level = "ERROR"
run_has_error = True
elif "[WARNING]" in line or "⚠️" in line:
level = "WARN"
elif "" in line or "🎉" in line or "🎊" in line:
level = "SUCCESS"
push(line, level)
proc.wait()
# 综合判断:退出码 + 日志中是否有 ERROR
success = (proc.returncode == 0) and not run_has_error
if success:
total_pass += 1
push(f"✅ 第 {run_idx} 次运行结束 — 成功", "SUCCESS")
else:
total_fail += 1
push(f"❌ 第 {run_idx} 次运行结束 — 失败", "ERROR")
# 失败重跑逻辑
if task.get("retry_on_fail") and run_idx == run_count:
push(f"🔁 触发失败重跑 (额外第 1 次)", "WARN")
# 追加一次额外运行(简化版:仅追加日志标记)
run_count += 1
except Exception as e:
push(f"💥 执行异常: {e}", "ERROR")
total_fail += 1
# ── 生成报告 ───────────────────────────────────────────────────────────────
finished_at = datetime.now().isoformat()
report = {
"task_id": task_id,
"task_name": task["name"],
"product": task["product"],
"total_runs": run_count,
"pass": total_pass,
"fail": total_fail,
"started_at": task.get("started_at"),
"finished_at": finished_at,
"logs": logs_all,
"result": "PASS" if total_fail == 0 else "FAIL"
}
reports_db[task_id] = report
task["status"] = "pass" if total_fail == 0 else "fail"
task["finished_at"] = finished_at
task["report_id"] = task_id
_save_db() # 2. 报告生成完成,任务状态确立后保存
push(f"\n━━━━━━━━━ 测试完成 ━━━━━━━━━", "INFO")
push(f"总计: {run_count} 次 | 通过: {total_pass} | 失败: {total_fail}", "INFO")
push(f"整体结论: {'✅ PASS' if total_fail == 0 else '❌ FAIL'}", "SUCCESS" if total_fail == 0 else "ERROR")
push("__DONE__", "DONE")
# ── API 路由 ──────────────────────────────────────────────────────────────────
@app.route("/api/products")
def get_products():
return jsonify(PRODUCTS)
@app.route("/api/tasks", methods=["GET"])
def list_tasks():
return jsonify(list(tasks_db.values()))
@app.route("/api/tasks", methods=["POST"])
def create_task():
body = request.json
task_id = str(uuid.uuid4())[:8]
product_key = body.get("product", "robogo")
product = PRODUCTS.get(product_key, {})
entry = product.get("entry", "run_ui_tests.py")
if entry is None:
return jsonify({"error": "该产品暂未接入运行入口"}), 400
task = {
"id": task_id,
"name": body.get("name", f"任务_{task_id}"),
"product": product_key,
"product_name": product.get("name", product_key),
"run_count": int(body.get("run_count", 1)),
"retry_on_fail": body.get("retry_on_fail", False),
"scheduled_at": body.get("scheduled_at"),
"created_at": datetime.now().isoformat(),
"status": "pending",
"pid": None,
"started_at": None,
"finished_at": None,
"report_id": None
}
tasks_db[task_id] = task
log_queues[task_id] = queue.Queue()
_save_db() # 3. 任务创建后保存初始状态
account = body.get("account", "")
password = body.get("password", "")
scheduled_at = task.get("scheduled_at")
def _run_task():
"""统一入口:处理定时等待后再执行"""
log_q = log_queues[task_id]
if scheduled_at:
try:
# 解析定时时间
sched_time = datetime.fromisoformat(scheduled_at)
task["status"] = "pending"
wait_secs = (sched_time - datetime.now()).total_seconds()
if wait_secs > 0:
msg = {"ts": datetime.now().strftime("%H:%M:%S"), "level": "INFO", "msg": f"⏰ 任务已定时,将在 {scheduled_at} 执行(等待 {int(wait_secs)}秒)"}
log_q.put(json.dumps(msg))
# 每 30 秒发心跳,防止 SSE 超时断开
while (sched_time - datetime.now()).total_seconds() > 0:
remaining = int((sched_time - datetime.now()).total_seconds())
heartbeat = {"ts": datetime.now().strftime("%H:%M:%S"), "level": "INFO", "msg": f"⏳ 距离定时执行还有 {remaining} 秒..."}
log_q.put(json.dumps(heartbeat))
time.sleep(min(30, max(remaining, 1)))
launch_msg = {"ts": datetime.now().strftime("%H:%M:%S"), "level": "SUCCESS", "msg": "🚀 定时时间已到,开始执行任务!"}
log_q.put(json.dumps(launch_msg))
except Exception as e:
err_msg = {"ts": datetime.now().strftime("%H:%M:%S"), "level": "WARN", "msg": f"⚠️ 定时解析异常,立即执行: {e}"}
log_q.put(json.dumps(err_msg))
_stream_run(task_id, entry, account, password, task["run_count"])
t = threading.Thread(target=_run_task, daemon=True)
t.start()
return jsonify(task), 201
@app.route("/api/tasks/<task_id>")
def get_task(task_id):
task = tasks_db.get(task_id)
if not task:
return jsonify({"error": "Not Found"}), 404
return jsonify(task)
@app.route("/api/tasks/<task_id>/logs")
def stream_logs(task_id):
"""Server-Sent Events 实时日志流"""
q = log_queues.get(task_id)
if not q:
return jsonify({"error": "No log stream"}), 404
def event_stream():
while True:
try:
msg = q.get(timeout=30)
yield f"data: {msg}\n\n"
data = json.loads(msg)
if data.get("level") == "DONE":
break
except queue.Empty:
yield f"data: {json.dumps({'level':'PING','msg':''})}\n\n"
return Response(event_stream(), content_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
@app.route("/api/reports")
def list_reports():
return jsonify(list(reports_db.values()))
@app.route("/api/reports/<task_id>")
def get_report(task_id):
report = reports_db.get(task_id)
if not report:
return jsonify({"error": "Not Found"}), 404
return jsonify(report)
# ── 前端路由 (SPA 单页) ───────────────────────────────────────────────────────
@app.route("/")
@app.route("/<path:path>")
def serve_index(path=""):
return send_from_directory("platform", "index.html")
if __name__ == "__main__":
print("🚀 自动化平台启动中... http://127.0.0.1:5001")
app.run(host="127.0.0.1", port=5001, debug=False, threaded=True)