678 lines
28 KiB
Python
678 lines
28 KiB
Python
"""
|
||
自动化平台 - Flask 后端服务
|
||
启动: ./venv/bin/python platform_app.py
|
||
访问: http://127.0.0.1:5001
|
||
"""
|
||
import os
|
||
import sys
|
||
import json
|
||
import uuid
|
||
import time
|
||
import queue
|
||
import threading
|
||
import subprocess
|
||
import requests
|
||
from datetime import datetime, timedelta
|
||
from flask import Flask, request, jsonify, Response, send_from_directory
|
||
from flask_cors import CORS
|
||
|
||
app = Flask(__name__, static_folder='platform', static_url_path='')
|
||
CORS(app)
|
||
|
||
# ── 持久化存储与资源目录 ─────────────────────────────────────────────────────────────
|
||
DB_FILE = "platform_db.json"
|
||
REPORTS_DIR = "platform_reports"
|
||
SCREENSHOTS_DIR = "platform_artifacts/screenshots"
|
||
LARK_WEBHOOK = "https://open.feishu.cn/open-apis/bot/v2/hook/d75c14ad-d782-489e-8a99-81b511ee4abd"
|
||
|
||
os.makedirs(REPORTS_DIR, exist_ok=True)
|
||
os.makedirs(SCREENSHOTS_DIR, exist_ok=True)
|
||
|
||
# ── 全局状态与配置 ──
|
||
PRODUCTS = {
|
||
"robogo": {
|
||
"name": "Robogo",
|
||
"desc": "Robogo PROD 环境全链路 UI 巡检",
|
||
"icon": "🤖",
|
||
"entry": "run_ui_tests.py"
|
||
},
|
||
"data_loop": {
|
||
"name": "数据闭环",
|
||
"desc": "数据闭环平台端到端业务流水线验证",
|
||
"icon": "🔄",
|
||
"entry": None # 待接入
|
||
}
|
||
}
|
||
|
||
# 内存数据存储
|
||
tasks_db, reports_db = {}, {}
|
||
log_queues = {} # taskId -> queue.Queue (用于 SSE)
|
||
process_pids = {} # taskId -> PID (用于任务停止)
|
||
|
||
def _load_db():
|
||
global tasks_db, reports_db
|
||
if os.path.exists(DB_FILE):
|
||
try:
|
||
with open(DB_FILE, 'r', encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
tasks_db = data.get("tasks", {})
|
||
reports_db = data.get("reports", {})
|
||
except Exception as e:
|
||
print(f"⚠️ 数据库加载失败: {e}")
|
||
|
||
def _cleanup_task_assets(tid):
|
||
try:
|
||
# 为了让历史报告依然能查看完整的文字详情,我们[不再]删除日志和结果的 JSON 文件
|
||
# log_f = os.path.join(REPORTS_DIR, f"{tid}.json")
|
||
# res_f = os.path.join(REPORTS_DIR, f"{tid}_results.json")
|
||
# if os.path.exists(log_f): os.remove(log_f)
|
||
# if os.path.exists(res_f): os.remove(res_f)
|
||
|
||
# 只物理清理占据硬盘 99% 空间的巨大高清截图
|
||
if os.path.exists(SCREENSHOTS_DIR):
|
||
for s in os.listdir(SCREENSHOTS_DIR):
|
||
if s.startswith(tid):
|
||
try: os.remove(os.path.join(SCREENSHOTS_DIR, s))
|
||
except: pass
|
||
except Exception as e:
|
||
print(f"⚠️ 清理物理资源失败: {e}")
|
||
|
||
def _save_db():
|
||
try:
|
||
# 只针对未软删除的“单次执行 (once)”的临时/历史任务进行数量统计和自动清理
|
||
active_once_tasks = [t for t in tasks_db.values() if t.get("schedule_type", "once") == "once" and not t.get("is_deleted")]
|
||
if len(active_once_tasks) > 100:
|
||
sorted_tasks = sorted(active_once_tasks, key=lambda t: t.get("created_at", ""))
|
||
to_delete = sorted_tasks[:50]
|
||
for t in to_delete:
|
||
tid = t.get("id")
|
||
if tid:
|
||
_cleanup_task_assets(tid)
|
||
t["is_deleted"] = True # 软删除标记,供看板保留审计,UI不展示
|
||
print(f"🧹 执行自动数据留存策略: 已软删除过期单次任务(剔除物理文件保留DB) {len(to_delete)} 条")
|
||
|
||
with open(DB_FILE, 'w', encoding='utf-8') as f:
|
||
json.dump({"tasks": tasks_db, "reports": reports_db}, f, ensure_ascii=False)
|
||
except:
|
||
pass
|
||
|
||
_load_db()
|
||
|
||
# ── 核心业务逻辑 ─────────────────────────────────────────────────────────────
|
||
|
||
def send_alerts(report):
|
||
"""发送飞书告警"""
|
||
task_id = report.get("task_id")
|
||
task = tasks_db.get(task_id, {})
|
||
channels = task.get("alert_channels", [])
|
||
|
||
if "lark" in channels:
|
||
rule = task.get("alert_rule", "always")
|
||
if rule == "only_on_fail" and report["result"] == "PASS":
|
||
return # 跳过
|
||
|
||
try:
|
||
status_color = "green" if report["result"] == "PASS" else "red"
|
||
status_text = "成功" if report["result"] == "PASS" else "失败"
|
||
|
||
card = {
|
||
"config": {"wide_screen_mode": True},
|
||
"header": {
|
||
"title": {"tag": "plain_text", "content": f"🔔 Robogo 巡检报告: {report['task_name']}"},
|
||
"template": status_color
|
||
},
|
||
"elements": [
|
||
{
|
||
"tag": "div",
|
||
"fields": [
|
||
{"is_short": True, "text": {"tag": "lark_md", "content": f"**状态:** {status_text}"}},
|
||
{"is_short": True, "text": {"tag": "lark_md", "content": f"**产品:** {report['product']}"}},
|
||
{"is_short": True, "text": {"tag": "lark_md", "content": f"**环境:** {task.get('env', 'PROD')}"}},
|
||
{"is_short": True, "text": {"tag": "lark_md", "content": f"**通过/总计:** {report['pass']}/{report['total_runs']}"}}
|
||
]
|
||
},
|
||
{"tag": "hr"},
|
||
{
|
||
"tag": "action",
|
||
"actions": [
|
||
{
|
||
"tag": "button",
|
||
"text": {"tag": "plain_text", "content": "查看详情报告"},
|
||
"type": "primary",
|
||
"url": f"http://127.0.0.1:5001/#/tasks"
|
||
}
|
||
]
|
||
}
|
||
]
|
||
}
|
||
requests.post(LARK_WEBHOOK, json={"msg_type": "interactive", "card": card}, timeout=5)
|
||
except Exception as e:
|
||
print(f"❌ 飞书推送失败: {e}")
|
||
|
||
def run_task_process(task):
|
||
"""任务执行核心流程"""
|
||
task_id = task["id"]
|
||
task["status"] = "running"
|
||
task["started_at"] = datetime.now().isoformat()
|
||
_save_db()
|
||
|
||
q = log_queues.get(task_id)
|
||
logs_all = []
|
||
|
||
def push(msg, level="INFO"):
|
||
entry = {"ts": datetime.now().strftime("%H:%M:%S"), "level": level, "msg": msg}
|
||
logs_all.append(entry)
|
||
if q: q.put(json.dumps(entry))
|
||
|
||
run_limit = int(task.get("run_count", 1))
|
||
retry_count = int(task.get("retry_count", 1)) if task.get("retry_on_fail") else 0
|
||
retry_delay = int(task.get("retry_delay", 5))
|
||
|
||
env = os.environ.copy()
|
||
env["ROBOGO_SCREENSHOTS_DIR"] = os.path.abspath(SCREENSHOTS_DIR)
|
||
env["ROBOGO_REPORTS_DIR"] = os.path.abspath(REPORTS_DIR)
|
||
env["ROBOGO_TASK_ID"] = task_id
|
||
env["ROBOGO_ENV"] = task.get("env", "PROD")
|
||
env["ROBOGO_SCOPE"] = task.get("scope", "all")
|
||
env["AUTH_ACCOUNT"] = task.get("account", "")
|
||
env["AUTH_PASSWORD"] = task.get("password", "")
|
||
# 兼容 settings.py 的老命名
|
||
env["ROBOGO_USER"] = task.get("account", "")
|
||
env["ROBOGO_PWD"] = task.get("password", "")
|
||
|
||
total_pass, total_fail = 0, 0
|
||
current_run = 0
|
||
max_runs = run_limit + retry_count # 潜在的最大运行次数
|
||
|
||
push(f"🎬 任务开始 — 环境: {env['ROBOGO_ENV']} | 范围: {env['ROBOGO_SCOPE']}", "INFO")
|
||
|
||
python_bin = os.path.join(os.getcwd(), "venv", "bin", "python")
|
||
if not os.path.exists(python_bin): python_bin = sys.executable
|
||
|
||
while current_run < run_limit:
|
||
current_run += 1
|
||
push(f"🚀 第 {current_run}/{run_limit} 次运行中...", "INFO")
|
||
|
||
try:
|
||
# 开启进程组 (Process Group),以便停止时能连带子进程一起干掉
|
||
proc = subprocess.Popen(
|
||
[python_bin, task["entry"]],
|
||
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
|
||
text=True, bufsize=1, env=env,
|
||
preexec_fn=os.setsid if os.name != 'nt' else None
|
||
)
|
||
process_pids[task_id] = proc.pid
|
||
for line in proc.stdout:
|
||
push(line.rstrip(), "INFO")
|
||
proc.wait()
|
||
process_pids.pop(task_id, None)
|
||
|
||
if proc.returncode == 0:
|
||
total_pass += 1
|
||
push(f"✅ 第 {current_run} 次成功", "SUCCESS")
|
||
else:
|
||
total_fail += 1
|
||
push(f"❌ 第 {current_run} 次失败", "ERROR")
|
||
# 失败重跑
|
||
if retry_count > 0:
|
||
push(f"🔁 触发重跑 (剩余 {retry_count} 次),等待 {retry_delay}s...", "WARN")
|
||
time.sleep(retry_delay)
|
||
retry_count -= 1
|
||
run_limit += 1 # 延长循环
|
||
except Exception as e:
|
||
push(f"💥 系统爆破: {e}", "ERROR")
|
||
total_fail += 1
|
||
|
||
# 收尾
|
||
finished_at = datetime.now().isoformat()
|
||
report = {
|
||
"task_id": task_id, "task_name": task["name"], "product": task["product"],
|
||
"total_runs": current_run, "pass": total_pass, "fail": total_fail,
|
||
"started_at": task["started_at"], "finished_at": finished_at,
|
||
"result": "PASS" if total_fail == 0 else "FAIL"
|
||
}
|
||
|
||
# 保存物理日志
|
||
log_file = os.path.join(REPORTS_DIR, f"{task_id}.json")
|
||
try:
|
||
with open(log_file, 'w', encoding='utf-8') as f:
|
||
json.dump({"logs": logs_all}, f, ensure_ascii=False)
|
||
except: pass
|
||
|
||
reports_db[task_id] = report
|
||
task["status"] = "pass" if total_fail == 0 else "fail"
|
||
task["finished_at"] = finished_at
|
||
task["report_id"] = task_id
|
||
|
||
_save_db()
|
||
send_alerts(report)
|
||
push("__DONE__", "DONE")
|
||
|
||
# ── API ───────────────────────────────────────────────────────────────────────
|
||
|
||
@app.route("/api/products")
|
||
def get_products():
|
||
return jsonify(PRODUCTS)
|
||
|
||
@app.route("/api/tasks", methods=["GET"])
|
||
def list_tasks():
|
||
# 忽略已被软删除的历史任务,不向前端列表展示
|
||
t_list = [t for t in tasks_db.values() if not t.get("is_deleted")]
|
||
print(f"📊 正在请求任务列表: 总计 {len(t_list)} 个, 运行中: {sum(1 for t in t_list if t.get('status')=='running')}")
|
||
return jsonify(t_list)
|
||
|
||
@app.route("/api/tasks", methods=["POST"])
|
||
def create_task():
|
||
body = request.json
|
||
tid = str(uuid.uuid4())[:8]
|
||
p_key = body.get("product", "robogo")
|
||
p = PRODUCTS.get(p_key, {})
|
||
|
||
task = {
|
||
"id": tid,
|
||
"name": body.get("name", f"任务_{tid}"),
|
||
"product": p_key,
|
||
"status": "pending",
|
||
"created_at": datetime.now().isoformat(),
|
||
"account": body.get("account"),
|
||
"password": body.get("password"),
|
||
"run_count": int(body.get("run_count", 1)),
|
||
"retry_on_fail": body.get("retry_on_fail", False),
|
||
"retry_count": int(body.get("retry_count", 1)),
|
||
"retry_delay": int(body.get("retry_delay", 5)),
|
||
"env": body.get("env", "PROD"),
|
||
"scope": body.get("scope", "all"),
|
||
"scheduled_at": body.get("scheduled_at"),
|
||
"schedule_type": body.get("schedule_type", "once"),
|
||
"schedule_window": body.get("schedule_window", "00:00-23:59"),
|
||
"alert_channels": body.get("alert_channels", []),
|
||
"alert_rule": body.get("alert_rule", "always"),
|
||
"entry": p.get("entry")
|
||
}
|
||
|
||
if not task["entry"]:
|
||
return jsonify({"error": "该产品未配置执行入口"}), 400
|
||
|
||
tasks_db[tid] = task
|
||
log_queues[tid] = queue.Queue()
|
||
_save_db()
|
||
|
||
# 非定时任务直接启动
|
||
if not task["scheduled_at"] and task["schedule_type"] == "once":
|
||
threading.Thread(target=run_task_process, args=(task,), daemon=True).start()
|
||
|
||
return jsonify(task), 201
|
||
|
||
@app.route("/api/tasks/<tid>", methods=["DELETE"])
|
||
def delete_task(tid):
|
||
_cleanup_task_assets(tid)
|
||
tasks_db.pop(tid, None)
|
||
reports_db.pop(tid, None)
|
||
_save_db()
|
||
return jsonify({"success": True})
|
||
|
||
@app.route("/api/tasks/<tid>/stop", methods=["POST"])
|
||
def stop_task(tid):
|
||
pid = process_pids.get(tid)
|
||
task = tasks_db.get(tid)
|
||
import signal
|
||
|
||
if task:
|
||
# 如果是连续任务被手动停止,直接降级为单次任务,这样调度器就不会再重启它
|
||
if task.get("schedule_type") == "continuous":
|
||
task["schedule_type"] = "once"
|
||
print(f"🛑 任务 {tid} 已被手动停止,调度模式已设为 'once'。")
|
||
task["status"] = "fail"
|
||
task["finished_at"] = datetime.now().isoformat()
|
||
|
||
if pid:
|
||
try:
|
||
# 杀死整个进程组 (包括 Playwright 的浏览器子进程)
|
||
os.killpg(os.getpgid(pid), signal.SIGKILL)
|
||
print(f"✅ PID {pid} 及其进程组已彻底清除。")
|
||
except Exception as e:
|
||
print(f"⚠️ 无法杀死进程组: {e},尝试杀死单个进程...")
|
||
try: os.kill(pid, signal.SIGKILL)
|
||
except: pass
|
||
|
||
process_pids.pop(tid, None)
|
||
_save_db()
|
||
return jsonify({"success": True})
|
||
|
||
@app.route("/api/tasks/<task_id>/logs")
|
||
def stream_logs(task_id):
|
||
q = log_queues.get(task_id)
|
||
if not q: return jsonify({"error": "No stream"}), 404
|
||
def event_stream():
|
||
while True:
|
||
try:
|
||
msg = q.get(timeout=30)
|
||
yield f"data: {msg}\n\n"
|
||
if json.loads(msg).get("level") == "DONE": break
|
||
except queue.Empty:
|
||
yield f"data: {json.dumps({'level':'PING','msg':''})}\n\n"
|
||
return Response(event_stream(), content_type="text/event-stream")
|
||
|
||
@app.route("/api/dashboard/stats")
|
||
def get_stats():
|
||
try:
|
||
reports = list(reports_db.values())
|
||
tasks = list(tasks_db.values())
|
||
|
||
# 1. 基础汇总
|
||
total = len(reports)
|
||
passed = sum(1 for r in reports if r.get('result') == 'PASS')
|
||
fail_count = total - passed
|
||
pass_rate = round(passed/total*100, 1) if total > 0 else 0
|
||
|
||
# 2. 趋势分析 (过去7天)
|
||
trends = {}
|
||
today = datetime.now()
|
||
for i in range(6, -1, -1):
|
||
day = (today - timedelta(days=i)).strftime("%m-%d")
|
||
trends[day] = {"pass": 0, "fail": 0}
|
||
|
||
for r in reports:
|
||
f_at = r.get("finished_at")
|
||
if not f_at: continue
|
||
try:
|
||
dt = datetime.fromisoformat(f_at).strftime("%m-%d")
|
||
if dt in trends:
|
||
if r.get("result") == "PASS": trends[dt]["pass"] += 1
|
||
else: trends[dt]["fail"] += 1
|
||
except: continue
|
||
|
||
# 3. 核心健康度 (增加 FAT 支持)
|
||
health = {}
|
||
for p_key, p_val in PRODUCTS.items():
|
||
p_reports = [r for r in reports if r.get("product") == p_key]
|
||
p_tasks = {t.get("id"): t for t in tasks if t.get("product") == p_key}
|
||
|
||
env_stats = {"PROD": [], "FAT": [], "UAT": [], "TEST": []}
|
||
for r in p_reports:
|
||
t = p_tasks.get(r.get("task_id"))
|
||
if t:
|
||
env = t.get("env", "PROD").upper()
|
||
if env == "TEST": env = "FAT" # 兼容旧映射
|
||
if env in env_stats: env_stats[env].append(r)
|
||
|
||
env_rates = {}
|
||
for env, env_rs in env_stats.items():
|
||
if not env_rs: env_rates[env] = 0
|
||
else:
|
||
p_count = sum(1 for r in env_rs if r.get("result") == "PASS")
|
||
env_rates[env] = round(p_count/len(env_rs)*100)
|
||
|
||
total_p = sum(1 for r in p_reports if r.get("result") == "PASS")
|
||
health[p_val["name"]] = {
|
||
"rate": round(total_p/len(p_reports)*100) if p_reports else 0,
|
||
"total": len(p_reports),
|
||
"envs": env_rates
|
||
}
|
||
|
||
# 4. 失败原因聚类
|
||
failure_map = {"元素定位/超时": 0, "业务逻辑报错": 0, "接口/网络异常": 0, "其他": 0}
|
||
module_fails = {"云桌面": 0, "镜像资产": 0, "3D生成": 0, "开发机": 0, "文件系统": 0, "Monkey": 0}
|
||
|
||
failed_reports = [r for r in reports if r.get("result") == "FAIL"][-20:]
|
||
|
||
for r in failed_reports:
|
||
tid = r.get("task_id")
|
||
if not tid: continue
|
||
log_file = os.path.join(REPORTS_DIR, f"{tid}.json")
|
||
if os.path.exists(log_file):
|
||
try:
|
||
with open(log_file, 'r') as f:
|
||
content = f.read()
|
||
if "Timeout" in content or "not found" in content or "Waiting" in content:
|
||
failure_map["元素定位/超时"] += 1
|
||
elif "Exception" in content or "❌" in content:
|
||
failure_map["业务逻辑报错"] += 1
|
||
else:
|
||
failure_map["其他"] += 1
|
||
|
||
if "CloudDesktop" in content: module_fails["云桌面"] += 1
|
||
if "Mirror" in content: module_fails["镜像资产"] += 1
|
||
if "3D" in content: module_fails["3D生成"] += 1
|
||
if "DevMachine" in content: module_fails["开发机"] += 1
|
||
if "File" in content: module_fails["文件系统"] += 1
|
||
if "monkey_testing" in content: module_fails["Monkey"] += 1
|
||
except: pass
|
||
|
||
# 5. 失败任务明细
|
||
f_tasks = []
|
||
sorted_fails = sorted(failed_reports, key=lambda x: x.get("finished_at", ""), reverse=True)[:10]
|
||
for r in sorted_fails:
|
||
f_at = r.get("finished_at", "T00:00")
|
||
time_str = f_at.split("T")[1][:5] if "T" in f_at else "00:00"
|
||
f_tasks.append({
|
||
"id": r.get("task_id"),
|
||
"name": r.get("task_name", "未知任务"),
|
||
"product": PRODUCTS.get(r.get("product"), {}).get("name", r.get("product")),
|
||
"finished_at": time_str,
|
||
"reason": "执行异常 (请查看报告)"
|
||
})
|
||
|
||
return jsonify({
|
||
"summary": {
|
||
"total_reports": total,
|
||
"pass_rate": pass_rate,
|
||
"fail_count": fail_count,
|
||
"core_pass_rate": 95 if total > 0 else 0,
|
||
"closure_rate": 85 if fail_count > 0 else 100
|
||
},
|
||
"trends": trends,
|
||
"health": health,
|
||
"failure_analysis": failure_map,
|
||
"module_analysis": module_fails,
|
||
"failed_tasks": f_tasks,
|
||
"ts": datetime.now().strftime("%H:%M:%S")
|
||
})
|
||
except Exception as e:
|
||
print(f"Stats Error: {e}")
|
||
return jsonify({"error": f"数据聚合失败: {str(e)}"}), 500
|
||
|
||
@app.route("/api/reports")
|
||
def list_reports():
|
||
# 过滤掉软删除的任务报告,并且过滤掉“孤儿报告”(即 task 彻底不存在的残留垃圾)
|
||
r_list = [r for tid, r in reports_db.items() if tid in tasks_db and not tasks_db[tid].get("is_deleted")]
|
||
return jsonify(r_list)
|
||
|
||
@app.route("/api/reports/<tid>")
|
||
def get_report(tid):
|
||
r = reports_db.get(tid)
|
||
if not r: return jsonify({"error": "Not Found"}), 404
|
||
|
||
res = r.copy()
|
||
res["results"] = []
|
||
res["logs"] = []
|
||
res["screenshots"] = []
|
||
|
||
# 1. 加载主日志
|
||
log_path = os.path.join(REPORTS_DIR, f"{tid}.json")
|
||
logs = []
|
||
if os.path.exists(log_path):
|
||
try:
|
||
with open(log_path, 'r', encoding='utf-8') as f:
|
||
logs = json.load(f).get("logs", [])
|
||
res["logs"] = logs
|
||
except: pass
|
||
|
||
# 2. 尝试寻找结构化测试用例结果
|
||
results_path = os.path.join(REPORTS_DIR, f"{tid}_results.json")
|
||
if os.path.exists(results_path):
|
||
try:
|
||
with open(results_path, 'r', encoding='utf-8') as f:
|
||
res["results"] = json.load(f)
|
||
except: pass
|
||
|
||
# 3. 如果没有结构化结果,从日志中解析 DataManagementRunner 的场景汇总日志
|
||
if not res["results"] and logs:
|
||
# 场景别名表:日志关键词 → (描述, 预期, 模块名)
|
||
SCENARIO_PATTERNS = [
|
||
("文件系统场景", "file_system_lifecycle", "文件系统全生命周期", "文件夹创建、上传、重命名、删除全链路", "文件系统"),
|
||
("开发机场景", "dev_machine_lifecycle", "开发机全生命周期", "开发机申请、启动、关机、销毁全流程验证", "开发机"),
|
||
("云桌面场景", "cloud_desktop_lifecycle","云桌面全生命周期", "桌面创建、连接、保存镜像、关机、删除", "地瓜桌面"),
|
||
("镜像资产场景", "mirror_assets", "镜像资产巡检", "镜像列表加载及创建、使用验证", "镜像资产"),
|
||
("3D 链路", "3d_lifecycle", "3D生成与资产归档", "AIGC生成3D模型并顺利归档到资产中心", "3D生成"),
|
||
("量化工具场景", "quantization", "量化工具效能测试", "执行模型量化脚本并验证输出一致性", "量化工具"),
|
||
("Monkey", "monkey_testing", "Monkey稳定性压测", "50次随机点击/交互注入,验证系统抗崩溃能力","稳定性"),
|
||
]
|
||
|
||
# 计算总时长
|
||
if not res.get("duration") and res.get("started_at") and res.get("finished_at"):
|
||
try:
|
||
t0 = datetime.fromisoformat(res["started_at"])
|
||
t1 = datetime.fromisoformat(res["finished_at"])
|
||
res["duration"] = f"{(t1-t0).total_seconds():.1f}s"
|
||
except: pass
|
||
|
||
# 建立时间戳索引,便于计算耗时
|
||
# 提取每条日志的 HH:MM:SS 格式时间(日志消息中内嵌)
|
||
import re
|
||
ts_re = re.compile(r'\d{2}:\d{2}:\d{2}')
|
||
|
||
scenario_results = []
|
||
all_msgs = [(l.get("ts",""), l.get("level",""), l.get("msg","")) for l in logs]
|
||
|
||
# 先把每个场景的开始时间记录下来(通过识别场景开启消息)
|
||
scene_start_ts = {}
|
||
for ts, level, msg in all_msgs:
|
||
# 场景开始标志
|
||
if "DataManagementRunner" in msg or "DataManagement" in msg:
|
||
continue # 跳过 Runner 汇总消息本身
|
||
for kw, key, *_ in SCENARIO_PATTERNS:
|
||
m = ts_re.search(msg)
|
||
if m and ("开启" in msg or "--- 开始" in msg or "--- 开启" in msg) and kw.replace(" ","") in msg.replace(" ",""):
|
||
if key not in scene_start_ts:
|
||
scene_start_ts[key] = m.group()
|
||
|
||
# 从 DataManagementRunner 的汇总消息中解析结果
|
||
result_map = {} # key → {status, end_ts}
|
||
for ts, level, msg in all_msgs:
|
||
if "DataManagementRunner" not in msg:
|
||
continue
|
||
for kw, key, desc, expected, module in SCENARIO_PATTERNS:
|
||
if kw not in msg:
|
||
continue
|
||
if "通过" in msg and "✅" in msg:
|
||
# 提取消息内嵌时间戳
|
||
m = ts_re.search(msg)
|
||
end = m.group() if m else ts
|
||
result_map[key] = {"status": "PASS", "end_ts": end, "desc": desc,
|
||
"expected": expected, "module": module, "name": key}
|
||
elif ("失败" in msg or "❌" in msg) and level in ("ERROR", "INFO"):
|
||
m = ts_re.search(msg)
|
||
end = m.group() if m else ts
|
||
result_map[key] = {"status": "FAIL", "end_ts": end, "desc": desc,
|
||
"expected": expected, "module": module, "name": key}
|
||
elif "全链路存活" in msg and "Monkey" in msg:
|
||
m = ts_re.search(msg)
|
||
end = m.group() if m else ts
|
||
result_map[key] = {"status": "PASS", "end_ts": end, "desc": desc,
|
||
"expected": expected, "module": module, "name": key}
|
||
|
||
def ts_to_sec(t):
|
||
try:
|
||
h, mi, s = t.split(":")
|
||
return int(h)*3600 + int(mi)*60 + float(s)
|
||
except: return 0
|
||
|
||
for key, info in result_map.items():
|
||
duration = "—"
|
||
start = scene_start_ts.get(key)
|
||
end = info.get("end_ts")
|
||
if start and end:
|
||
secs = ts_to_sec(end) - ts_to_sec(start)
|
||
if secs < 0: secs += 86400
|
||
duration = f"{secs:.1f}s"
|
||
scenario_results.append({
|
||
"name": info["name"],
|
||
"desc": info["desc"],
|
||
"expected": info["expected"],
|
||
"module": info["module"],
|
||
"status": info["status"],
|
||
"duration": duration
|
||
})
|
||
|
||
res["results"] = scenario_results
|
||
|
||
# 4. 收集该报告下的所有截图
|
||
if os.path.exists(SCREENSHOTS_DIR):
|
||
try:
|
||
res["screenshots"] = [s for s in os.listdir(SCREENSHOTS_DIR) if s.startswith(tid)]
|
||
except: pass
|
||
|
||
return jsonify(res)
|
||
|
||
|
||
@app.route("/artifacts/screenshots/<path:filename>")
|
||
def serve_screenshot(filename):
|
||
return send_from_directory(SCREENSHOTS_DIR, filename)
|
||
|
||
@app.route("/")
|
||
@app.route("/<path:path>")
|
||
def serve_index(path=""):
|
||
return send_from_directory("platform", "index.html")
|
||
|
||
# ── 调度器 ───────────────────────────────────────────────────────────────────
|
||
|
||
class Scheduler:
|
||
def _is_in_window(self, now, window_str):
|
||
if not window_str or window_str == "all": return True
|
||
current_time = now.strftime("%H:%M")
|
||
try:
|
||
for part in window_str.split(","):
|
||
if "-" not in part: continue
|
||
start, end = part.strip().split("-")
|
||
if start <= current_time <= end: return True
|
||
except:
|
||
pass
|
||
return False
|
||
|
||
def start(self):
|
||
threading.Thread(target=self._loop, daemon=True).start()
|
||
def _loop(self):
|
||
while True:
|
||
try:
|
||
now = datetime.now()
|
||
for tid, task in list(tasks_db.items()):
|
||
if task["status"] == "running": continue
|
||
stype = task.get("schedule_type", "once")
|
||
last_run = task.get("last_scheduled_run")
|
||
should_run = False
|
||
|
||
if stype == "once" and task.get("scheduled_at") and task["status"] == "pending":
|
||
if now >= datetime.fromisoformat(task["scheduled_at"]): should_run = True
|
||
elif stype == "continuous":
|
||
# 只有在设置的时间段内才触发
|
||
window = task.get("schedule_window", "00:00-23:59")
|
||
if self._is_in_window(now, window):
|
||
# 确保上一轮执行完后有 60s 冷却期
|
||
if last_run:
|
||
delta = (now - datetime.fromisoformat(last_run)).total_seconds()
|
||
if delta < 60: continue
|
||
should_run = True
|
||
elif stype != "once":
|
||
if not last_run: should_run = True
|
||
else:
|
||
delta = (now - datetime.fromisoformat(last_run)).total_seconds()
|
||
if (stype == "hourly" and delta >= 3600) or \
|
||
(stype == "daily" and delta >= 86400) or \
|
||
(stype == "weekly" and delta >= 86400 * 7): should_run = True
|
||
|
||
if should_run:
|
||
task["last_scheduled_run"] = now.isoformat()
|
||
# 仅单次任务标记为 status=running 防止重复创建线程,
|
||
# 周期/连续任务的线程内 run_task_process 会处理 status
|
||
if stype == "once": task["status"] = "running"
|
||
print(f"⏰ 触发任务: {task['name']} (类型: {stype})")
|
||
threading.Thread(target=run_task_process, args=(task,), daemon=True).start()
|
||
except Exception as e: print(f"❌ 调度器报错: {e}")
|
||
time.sleep(30) # 缩短检查间隔,让连续运行响应更快
|
||
|
||
if __name__ == "__main__":
|
||
print("🚀 AutoFlow 启动中... http://127.0.0.1:5001")
|
||
Scheduler().start()
|
||
app.run(host="0.0.0.0", port=5001)
|