feat Refactor business logic by deleting old modules and introducing new 3D asset, 3D generation, and quantization pages and scenarios, generating associated test data and reports.

This commit is contained in:
hangyu.tao 2026-03-27 16:24:30 +08:00
parent 5b7b55ec41
commit f9f7f4fdd8
12 changed files with 4748 additions and 47990 deletions

164
README.md Normal file
View File

@ -0,0 +1,164 @@
# 🤖 Robogo 自动化测试平台 - 零基础上手指南
欢迎来到 Robogo 自动化测试项目!哪怕你之前**完全没有**学过前端、不了解自动化是怎么做的,只要你会写简单的 Python 代码,你就能跟着本文档**“像搭积木一样”**愉快地编写并执行各种复杂的自动化流水线!
## 🎯 我们的核心理念 (面向过程的 POM 分层设计)
我们把所有极其复杂的底层网络交互、浏览器驱动都全部封装和隔离了。整个项目对于业务人员来说,只有两个需要改动的地方:
1. **🔨 零件加工厂 (`framework/business/` 目录)**
- **它的作用**:通俗来说,就是告诉机器人**“这个网页长什么样,上面有哪些按钮”**,以及动作规范。
- **举个例子**:在 `quantization_page.py` 里,定义一个动作 `def click_create_task(self):` ,并在该方法里调用一次点击,这就像给工厂造好一个拉杆。
2. **🎬 写剧本的导演室 (`framework/scripts/` 目录)**
- **它的作用**:调用上面的零件,编排**业务操作顺序**。
- **举个例子**:在 `quantization_scenario.py` 里写:先登录 -> 点击主菜单 -> 填名字 -> 点击确认。纯新手流水账思维!
---
## 🚀 零基础实战:跟我 3 步写一个自动化测试
假设现在上线了一个新功能叫做“工作台 (Workbench)”,老板要求跑自动化测试:点击进入工作台 -> 点击新建 -> 填入内容并保存。
### 前往第一步:教 AI 认界面动作 (Business 层)
进到 `framework/business/` 文件夹下,新建 `workbench_page.py`。你可以直接复制下面的通用模板,只要把名字改了即可:
```python
import time
from framework.core.base_page import BasePage
from framework.core.logger import get_logger
logger = get_logger("WorkbenchPage")
class WorkbenchPage(BasePage):
"""工作台页面能做的所有动作全写在这"""
def navigate_to(self):
"""1. 进入工作台"""
logger.info("👉 点击左侧菜单进入工作台页面...")
self.smart_click("工作台") # ✨极简直觉型绝招:只要页面上有这几个字,它就会自动去点!
time.sleep(2) # 留点时间给页面刷新加载
def create_and_fill(self, my_text):
"""2. 点击新建,并填入内容"""
# 点击新建按钮:寻找页面上包含'新建'的 button 把它点掉
self.page.locator("button:has-text('新建')").click()
time.sleep(1)
# 填写弹窗中的输入框:(这里是个非常重要并且好套用的通用句式)
# 逻辑:找包含“名称”两个字的 label(标签),跳到它的外面直接父层(..),再找它里面的 input 框
input_box = self.page.locator("label").filter(has_text="名称").locator("..").locator("input").first
# 写入内容
input_box.fill(my_text)
# 最后点确定
self.page.locator("button:has-text('确定')").last.click()
time.sleep(1)
```
*(是不是很简单?不用辛苦研究所谓底层复杂的 CSS、XPath选择器直接根据**眼睛看到的界面的中文提示词**找元素)*
---
### 前往第二步:把动作串联成业务流流水账 (Scripts 层)
你在 `business` 层里定义好了这个页面能干嘛,现在来写流水账剧本即可。
`framework/scripts/` 目录下,新建 `workbench_scenario.py`
```python
from framework.core.logger import get_logger
from framework.business.workbench_page import WorkbenchPage
logger = get_logger("WorkbenchScenario")
def run_workbench_lifecycle(wb_page: WorkbenchPage):
"""工作台生命周期全流程"""
logger.info("🌟 准备开始测试工作台...")
# 就像指挥小人一样,你把它写成一条时间线往下排编
wb_page.navigate_to()
# [进阶小窍门] 如果要避免重复重名系统报错,可以用随机模块拼凑唯一任务名
import random
my_test_name = f"Test_{random.randint(100, 99999)}"
wb_page.create_and_fill(my_test_name)
logger.info("✅ 工作台场景测试完美通过!")
```
---
### 前往第三步:向总控制台挂载你的心血!
我们的自动化引擎总指挥官叫 `DataManagementRunner` (统筹类在 `framework/business/data_management.py` 里)。所有你想随主线跑的测试,统统要去它那边排队报备。
打开 `data_management.py` 本体文件:
1. **先在代码开头处注册并实例化你写的模块页面**
```python
# 头部引用你写的剧本文件
from framework.scripts.workbench_scenario import run_workbench_lifecycle
from framework.business.workbench_page import WorkbenchPage
...
# 跳转到 class DataManagement(BasePage) 的 def __init__ 方法里
# 实例化该页面使其归中央管辖
self.wb = WorkbenchPage(self.page)
```
2. **把它排队到司令部的日程计划表 `run_all_scenarios` 里**
```python
# 往下翻,在原本的任务执行步骤 (如 6.量化工具场景) 的底下新增你的区块:
# 7. 工作台测试场景 (记得数字标号)
try:
run_workbench_lifecycle(self.wb)
self._safe_screenshot("workbench_pass.png") # 成功了拍照放进快照库
logger.info("✅ 工作台场景通过")
except Exception as e:
logger.error(f"❌ 工作台场景失败: {e}")
self._safe_screenshot("workbench_error.png") # 失败了保留案发现场快照以供排期追责
errors.append(f"工作台测试: {e}")
```
🎉 **大功告成!终端执行 `python run_ui_tests.py` 就可以看见你写的机器人在飞速跑动接管云业务了!**
---
## 🛠️ 新手常见阻碍排查法 (三大法宝口诀)
零基础新手绝对不要害怕运行报错,这套自动化平台为你提供了超强法宝:
### 法宝一:找不到元素?随时使用强制悬停暂停键!💥
不知道代码跑到哪卡住了?在你怀疑有坑的地方插上一句终极代码:
```python
self.page.pause()
```
系统无论跑得多快运行到这一行浏览器一定会自动暂停并弹出调试器Playwright Inspector 录制器),你可以直接在活体页面上观察 DOM 并模拟点按,搞清后别忘了把这行删去再接着测。
### 法宝二:遇到超时 Timeout网络慢没加载出来💥
如果你的按钮点击太快,网页还在无骨转圈或者正在“排队中”:
```python
# 1. 纯死等(不推荐,但你作为新手急于跑通,应急时极好用)
import time; time.sleep(5)
# 2. 状态值智能等待(进阶推荐,极度安全,最多等 60 秒超时)
# 等待该标签可见:
self.page.locator("span, tag").filter(has_text="运行中").first.wait_for(state="visible", timeout=60000)
```
### 法宝三:解决层级乱跳重影错乱的“局部限制术” 💥
如果你发现网页上有几十个叫”确定“的按钮,机器人变傻点错了怎么办?缩小它的搜查圈子:
```python
# 不要瞎搜,先明确告诉它锁定当前名字叫“下载参数”的弹窗气泡区域
dialog = self.page.locator(".p-dialog").filter(has_text="下载参数")
# 随后!只要在 dialog 这层继续 locator它连外面再有成百上千个确定都不会理极大解决冲突。
dialog.locator("button:has-text('确定')").click()
```
---
> 🎉 **恭喜!** 在完全掌握上面这三步后,整个平台的扩展将任由你发挥。
> 平台最后还内嵌了**全局 Monkey 随机测试机制**(可配置拦截非法按钮)来保障系统的极限容错度。所有的 `Error` 都将成为构建企业级高覆盖率流水线的垫脚石! —— Automated Platform Team.

View File

@ -0,0 +1,278 @@
import time
import os
import random
import string
from framework.core.base_page import BasePage
from framework.core.logger import get_logger
logger = get_logger("QuantizationPage")
class QuantizationPage(BasePage):
"""量化工具页面 POM"""
MENU_TEXT = "量化工具"
def navigate_to(self):
"""进入量化工具页面"""
logger.info("正在切换到量化工具页面...")
self.smart_click(self.MENU_TEXT)
try:
self.page.wait_for_selector(".p-button:has-text('保存'), .p-tabmenu", timeout=10000)
except:
pass
time.sleep(2)
def click_create_task(self):
"""点击创建任务按钮"""
logger.info("正在点击创建任务按钮...")
self.smart_click("创建任务")
time.sleep(2)
def input_task_name(self, task_name="AutoQuantization"):
"""选中任务名称清空,输入随机任务名称"""
logger.info(f"📝 准备输入任务名称: {task_name}")
# 1. 精准定位:找到包含该 label 的直接父容器,从而限定在这个特定输入组内
input_box = self.page.locator("label").filter(has_text="任务名称").locator("..").locator("input").first
input_box.wait_for(state="visible", timeout=15000)
# 2. 强制清空
input_box.click()
# 识别系统
modifier = "Meta" if "mac" in str(self.page.context.browser.browser_type).lower() else "Control"
self.page.keyboard.press(f"{modifier}+A")
self.page.keyboard.press("Backspace")
# 3. 填入新值
input_box.fill(task_name)
time.sleep(1)
def navigate_to_quantization_task_config(self):
"""点击编译模型转换模式"""
logger.info("正在切换到量化任务配置页面...")
self.smart_click("编译模型转换模式")
try:
self.page.wait_for_selector(".p-button:has-text('保存'), .p-tabmenu", timeout=10000)
except:
pass
time.sleep(2)
"""点击工具版本"""
def click_tool_version(self):
logger.info("正在点击工具版本...")
self.smart_click("工具版本")
time.sleep(2)
def select_mode_fast_eval(self):
"""选择快速性能评测模式"""
logger.info("下拉选择: 快速性能评测模式")
# 修正: label 包在外部容器中p-select 内部并没有该文本,需通过容器层级定位
container = self.page.locator("div:has(label:has-text('编译模型转换模式'))").first
dropdown = container.locator(".p-select").first
dropdown.click()
dropdown = container.locator(".p-select").first
dropdown.click()
# 弹出的选项列表中点击
option = self.page.locator("li.p-select-option:has-text('快速性能评测模式')")
option.click()
time.sleep(1)
def select_image(self, image_name="dcloud_ai_toolchain_ubuntu_22_s100_cpu-2"):
"""选择镜像流程"""
logger.info(f"点击选择镜像按钮,拉起弹窗...")
self.page.locator("button.p-button-link:has-text('选择镜像')").click()
# 在弹窗中定位镜像项
logger.info(f"📍 在弹窗中寻找镜像: {image_name}")
image_row = self.page.locator(f"div:has-text('{image_name}')").last
image_row.scroll_into_view_if_needed()
image_row.click()
# 点击弹窗内部的确定 (限制定位器在 p-dialog 容器内)
logger.info("⏳ 点击[镜像弹窗]确认按钮...")
dialog_confirm = self.page.locator(".p-dialog:has-text('群组镜像')").locator("button:has-text('确定')").last
dialog_confirm.click()
time.sleep(2)
def input_march_param(self, value="nash-e"):
"""模型参数 march 输入"""
logger.info(f"📝 模型参数输入 march: {value}")
# 精准定位:找到特定的 label回到直接父容器再找 input防止全局 flex 匹配
march_input = self.page.locator("label").filter(has_text="march").locator("..").locator("input").first
march_input.scroll_into_view_if_needed()
# 强制清空再输入
march_input.click()
modifier = "Meta" if "mac" in str(self.page.context.browser.browser_type).lower() else "Control"
self.page.keyboard.press(f"{modifier}+A")
self.page.keyboard.press("Backspace")
march_input.fill(value)
time.sleep(1)
def click_final_submit(self):
"""点击页面最终确认按钮"""
logger.info("💾 点击量化任务最终[确认]按钮")
# 往往在页面正下方,不在弹窗内
confirm_btn = self.page.locator("button:has-text('确定')").last
confirm_btn.click()
logger.info("🎉 量化任务已成功提交")
time.sleep(2)
def upload_model_file(self, folder_name="model", file_name="result.onnx"):
"""点击模型文件框,选择具体模型文件"""
logger.info("📂 点击[模型文件]区域,拉起路径选择...")
# 定位虚线框
self.page.locator("div.border-dashed:has-text('点击上传模型')").click()
time.sleep(1)
# 在“选择文件”弹窗内操作
file_picker = self.page.locator(".p-dialog:has-text('选择文件')")
# 1. 点击左侧文件夹
logger.info(f"📍 进入文件夹: {folder_name}")
file_picker.locator(f"span.text-sm:has-text('{folder_name}')").click()
time.sleep(1)
# 2. 点击右侧文件列表中的目标项
logger.info(f"📄 选择具体文件: {file_name}")
file_picker.locator(f"span:has-text('{file_name}')").click()
time.sleep(1)
# 3. 点击弹窗底部的确定
logger.info("⏳ 点击[选择文件]弹窗确认按钮...")
file_picker.locator("button:has-text('确定')").last.click()
logger.info("✅ 模型文件选取成功")
time.sleep(1)
def open_and_close_logs(self, task_name):
"""等待任务进入运行中状态后,点击日志打开新窗口,等待加载后关闭"""
import re
logger.info(f"📄 准备查看任务 [{task_name}] 的日志...")
row = self.page.locator("tr").filter(has_text=task_name).first
# 1. 拦截队列:必须等到状态跳变出[排队中]
logger.info(f"⏳ 正在监听排队队列,等待任务步入 [运行中] 状态...")
state_loc = row.locator("span, div").filter(has_text=re.compile(r"运行中|已完成|失败")).first
state_loc.wait_for(state="visible", timeout=180000) # 最多等待3分钟排队
curr_state = state_loc.text_content()
if "运行中" not in curr_state:
logger.warning(f"⚠️ 状态跃迁过快或异常,当前状态已达 [{curr_state}],无需再去查看动态运行日志。")
return
logger.info(f"✅ 状态刷新确认: [运行中]!正在拉起全屏日志流面板...")
# 2. 监听新页面的弹出
with self.page.context.expect_page() as new_page_info:
row.locator("button:has-text('日志')").click()
new_page = new_page_info.value
new_page.wait_for_load_state("domcontentloaded")
logger.info("✅ 已切入日志新窗口,等待底层视图渲染...")
new_page.locator("body").wait_for(state="visible", timeout=10000)
time.sleep(2) # 给足够的流式打印时间预览
new_page.close()
logger.info("✅ 日志窗口已平滑关闭,切回主视图。测试流继续下放。")
def stop_task(self, task_name):
"""停止任务并等待状态变为已停止"""
logger.info(f"⏹ 准备停止任务: {task_name}")
row = self.page.locator("tr").filter(has_text=task_name).first
# 等待停止按钮可点击
stop_btn = row.locator("button:has-text('停止')")
stop_btn.click()
logger.info("⏳ 等待任务状态变为 [已停止]...")
# 等待出现 已停止 的标签
row.locator("span, div").filter(has_text="已停止").first.wait_for(state="visible", timeout=60000)
logger.info("✅ 任务停止成功")
def delete_task(self, task_name):
"""删除已停止的任务"""
logger.info(f"🗑 正在删除任务: {task_name}")
row = self.page.locator("tr").filter(has_text=task_name).first
row.locator("button:has-text('删除')").click()
# 点击气泡确认
logger.info("点击确认删除弹窗...")
confirm_btn = self.page.locator(".p-confirm-popup-accept-button, button:has-text('确定')").last
confirm_btn.wait_for(state="visible", timeout=5000)
confirm_btn.click()
# 校验行是否消失
row.wait_for(state="hidden", timeout=10000)
logger.info(f"✅ 任务 [{task_name}] 已从列表中移除")
def wait_for_task_finished(self, task_name, timeout_mins=5):
"""轮询等待任务达到终态 (已完成 / 失败)"""
import re
logger.info(f"⏳ 正在监听任务 [{task_name}] 执行进度 (上限 {timeout_mins} 分钟)...")
row = self.page.locator("tr").filter(has_text=task_name).first
try:
# 同时匹配"已完成"或"失败"任一状态
end_state = row.locator("span, div").filter(has_text=re.compile(r"已完成|失败")).first
end_state.wait_for(state="visible", timeout=timeout_mins * 60000)
state_text = end_state.text_content()
if "失败" in state_text:
logger.error(f"❌ 任务执行异常: 监控到状态变为 [失败]")
raise Exception("后端量化任务处理失败")
else:
logger.info(f"🎉 业务通过: 任务执行成功结束,当前处于 [{state_text}] 状态")
time.sleep(2)
except Exception as e:
logger.error(f"❌ 监听任务进度时发生中断或超时: {e}")
raise e
def open_and_close_report(self, task_name):
"""点击报告打开新窗口,等待页面完全加载后关闭"""
logger.info(f"📊 准备查看任务 [{task_name}] 的检查报告...")
row = self.page.locator("tr").filter(has_text=task_name).first
# 拦截新标签页弹出
with self.page.context.expect_page() as new_page_info:
row.locator("button:has-text('报告')").click()
new_page = new_page_info.value
new_page.wait_for_load_state("domcontentloaded")
new_page.locator("body").wait_for(state="visible", timeout=15000)
logger.info("✅ 已切入[报告]新窗口基础DOM渲染就绪...")
time.sleep(3) # 提供安全的静态视觉缓冲期保证页面完全加载
new_page.close()
logger.info("✅ 报告页面窗口已关闭,切回主视图。")
def download_all_results(self, task_name):
"""点击下载拉起弹窗,下载全部结果并等待下载任务完成后结束"""
logger.info(f"📥 准备下载任务 [{task_name}] 的全部输出结果...")
row = self.page.locator("tr").filter(has_text=task_name).first
# 1. 点击当前行下载动作
row.locator("button:has-text('下载')").click()
# 2. 接管全局下载弹窗
dialog = self.page.locator(".p-dialog").filter(has_text="下载全部结果")
dialog.wait_for(state="visible", timeout=5000)
# 3. 阻塞拦截底层下载事件
logger.info("⏳ 点击[下载全部结果],安全挂起等待文件流写入完毕...")
dl_btn = dialog.locator("button:has-text('下载全部结果')")
with self.page.expect_download(timeout=60000) as download_info:
dl_btn.click()
download = download_info.value
# 主动调用 path() 即可在当前主线程强阻塞,直至磁盘下载动作完全落盘成功
dl_path = download.path()
logger.info(f"🎉 文件下载成功落盘!临时归档路径: {dl_path}")
# 4. 收尾清理弹窗 UI
close_btn = dialog.locator("button.p-dialog-header-close").first
if close_btn.is_visible():
close_btn.click()
time.sleep(1)
logger.info("✅ 下载流程顺畅完结。")

View File

@ -0,0 +1,163 @@
import time
from framework.core.base_page import BasePage
from framework.core.logger import get_logger
logger = get_logger("ThreeDAssetsPage")
class ThreeDAssetsPage(BasePage):
"""3D资产页面 POM - 包含资产包进入与数据新增逻辑"""
MENU_TEXT = "3D资产"
def __init__(self, page):
self.page = page
def navigate_to(self):
"""进入3D资产页面"""
logger.info("正在切换到3D资产页面...")
self.smart_click(self.MENU_TEXT)
time.sleep(2)
def enter_asset_package(self, package_name="ui_test"):
"""进入指定的资产包"""
logger.info(f"📂 正在寻找并进入资产包: {package_name}")
# 在资产包列表中找到对应名称的项进行点击
selector = f"div.p-card:has-text('{package_name}')"
try:
self.page.wait_for_selector(selector, timeout=10000)
self.page.click(selector)
logger.info(f"✅ 已进入资产包: {package_name}")
# 等待渲染加载
time.sleep(2)
return True
except Exception as e:
logger.error(f"❌ 无法找到资产包 {package_name}: {e}")
return False
def add_asset_data(self, asset_name):
"""执行[新增数据]流程"""
logger.info(f" 准备为资产包新增 3D 数据: {asset_name}")
# 1. 点击[新增数据]按钮
add_btn = self.page.locator("button:has-text('新增数据')")
add_btn.click()
time.sleep(2)
# 2. 点击路径选择区域 (图中带有'请选择包含模型文件的文件夹'占位符的区域)
logger.info("🗺️ 点击路径选择区域...")
path_trigger = self.page.locator("div.cursor-pointer:has-text('请选择包含模型文件的文件夹')")
path_trigger.click()
time.sleep(2)
# 3. 在路径弹窗中操作
logger.info("🗂️ 在弹窗中选择 3D 生成目录...")
# 点击左侧目录: 3d生成(勿删)
folder_root = self.page.locator("span:has-text('3d生成(勿删)')")
folder_root.click()
time.sleep(2) # 等待列表渲染
# 4. 找到刚才生成的资产名称并勾选
logger.info(f"📍 寻找并勾选资产文件夹: {asset_name}")
try:
# 找到包含该名称的一行
asset_row = self.page.locator(f"div.flex:has(span:has-text('{asset_name}'))").last
# 只点击行会高亮但不会勾选,需点击前面的 span 触发
checkbox_trigger = asset_row.locator("span").first
checkbox_trigger.click()
# 检测是否勾选成功 (类名中出现 checked)
time.sleep(1)
is_checked = self.page.locator(f"div.flex:has(span:has-text('{asset_name}'))").last.locator(".p-checkbox-checked").is_visible()
if is_checked:
logger.info(f"✅ 资产 {asset_name} 勾选状态确认成功")
else:
logger.warning(f"⚠️ 无法检测到勾选类名,尝试通过文字补偿点击第 1 个 span")
asset_row.locator(f"span:has-text('{asset_name}')").first.click()
except Exception as e:
logger.error(f"❌ 勾选流程异常: {e}")
# 5. 点击路径弹窗底部确定按钮 (修正:层级定位,解决点不中的问题)
logger.info("⏳ 点击路径选择器[确定]按钮...")
# 分层:在“选择文件”弹窗内寻找确定
file_picker_dialog = self.page.locator(".p-dialog:has-text('选择文件')")
confirm_path_btn = file_picker_dialog.locator("button:has-text('确定')").last
confirm_path_btn.wait_for(state="visible", timeout=5000)
confirm_path_btn.scroll_into_view_if_needed()
confirm_path_btn.click()
logger.info("✅ 路径跳转弹窗已关闭")
time.sleep(2)
# 6. 继续处理新增数据弹窗
logger.info("💾 正在确认[新增3D资产数据]弹窗...")
add_data_dialog = self.page.locator(".p-dialog:has-text('新增3D资产数据')")
# 探测新增数据弹窗内是否已有数据 (校验)
if "已选择1项" in self.page.content() or asset_name in self.page.content():
logger.info("📊 校验成功:新增数据弹窗已加载选中数据")
else:
logger.warning("⚠️ 未能在弹窗中明确校验到选中项,继续尝试提交")
# 7. 点击新增数据弹窗的最终确定按钮
final_confirm_btn = add_data_dialog.locator("button:has-text('确定')").last
final_confirm_btn.scroll_into_view_if_needed()
final_confirm_btn.click()
logger.info("🎉 3D 资产数据新增流程全部完成")
time.sleep(3) # 等待列表回屏加载
return True
def verify_asset_and_preview(self, asset_name):
"""校验资产包列表是否存在刚创建的资产,并查看预览"""
logger.info(f"🔍 正在列表中核实资产: {asset_name}")
# 1. 在列表中寻找该资产卡片并点击
asset_card = self.page.locator(f"div.p-card:has-text('{asset_name}')").first
try:
asset_card.wait_for(state="visible", timeout=10000)
logger.info(f"✅ 找到资产卡片: {asset_name},准备查看预览")
asset_card.click()
except Exception as e:
logger.error(f"❌ 列表中未发现资产 {asset_name}: {e}")
return False
# 2. 等待弹窗渲染加载 (model-viewer 等元素)
logger.info("🖥️ 等待 3D 模型渲染预览渲染...")
try:
# 这里的 model-viewer 或特定的容器是渲染完成的标志
self.page.wait_for_selector("model-viewer, div.userInput.show", timeout=15000)
logger.info("✨ 3D 模型预览已正常渲染")
time.sleep(2)
# 3. 点击右上角的 X 关闭预览
close_btn = self.page.locator("i.pi-times").first
close_btn.click()
logger.info("✅ 预览弹窗已关闭")
return True
except Exception as e:
logger.error(f"❌ 预览渲染失败或无法关闭: {e}")
return False
def check_source_files(self):
"""点击右上角[源文件查看]并校验侧边栏"""
logger.info("📂 点击[源文件查看]按钮...")
source_btn = self.page.locator("button:has-text('源文件查看')")
source_btn.click()
# 等待侧边栏数据加载
logger.info("⏳ 等待侧边栏数据加载...")
try:
# 侧边栏通常是一个 p-sidebar 或包含资产文件结构的容器
self.page.wait_for_selector(".p-sidebar, div:has-text('源文件')", timeout=10000)
time.sleep(2)
# 点击右下角的关闭按钮
logger.info("🚪 点击侧边栏[关闭]按钮...")
close_btn = self.page.locator("button:has-text('关闭')").last
close_btn.click()
logger.info("✅ 源文件侧边栏已关闭")
return True
except Exception as e:
logger.error(f"❌ 源文件查看流程异常: {e}")
return False

View File

@ -0,0 +1,164 @@
import time
import os
import random
import string
from framework.core.base_page import BasePage
from framework.core.logger import get_logger
logger = get_logger("ThreeDGenerationPage")
class ThreeDGenerationPage(BasePage):
"""3D生成页面 POM - 包含生成、等待与保存逻辑"""
MENU_TEXT = "3D生成"
def __init__(self, page):
self.page = page
def navigate_to(self):
"""进入3D生成页面"""
logger.info("正在切换到3D生成页面...")
self.smart_click(self.MENU_TEXT)
try:
self.page.wait_for_selector(".p-button:has-text('保存'), .p-tabmenu", timeout=10000)
except:
pass
time.sleep(2)
def upload_image(self, file_path):
"""通过隐藏 input 上传图片"""
logger.info(f"📤 准备上传 3D 生成素材: {file_path}")
try:
# 直接通过 input 元素设置文件
self.page.set_input_files("input[type='file']", file_path)
except Exception as e:
logger.warning(f"⚠️ 直接注入失败,尝试点击图标触发: {e}")
with self.page.expect_file_chooser() as fc_info:
self.page.click("button.p-button-icon-only:has(.pi-arrow-up)")
fc_info.value.set_files(file_path)
time.sleep(2)
def start_generation(self, retries=3):
"""点击底部生成按钮 (图标按钮) - 增加 3 次重试机制 & 15s 业务异常监控"""
logger.info(f"🚀 准备点击生成按钮 (最多重试 {retries} 次)...")
target_btn = "button.p-button-icon-only:has(.pi-arrow-up)"
for i in range(retries):
try:
# 1. 执行物理点击
self.page.wait_for_selector(target_btn, state="visible", timeout=5000)
self.page.click(target_btn)
logger.info(f"🖱️ 第 {i+1} 次点击动作已发出")
# 2. 持续观测业务状态 (监控 15s)
logger.info(f"👀 正在观测第 {i+1} 次生成任务的业务反馈...")
is_failed = False
for _ in range(15):
# 使用显式文本探测,捕捉 Toast 或 页面文字报错
error_detected = self.page.locator("text=/网络错误|Generation failed|failed/i").first.is_visible()
if error_detected:
logger.warning(f"⚠️ 第 {i+1} 次运行检测到业务异常文字,准备重试")
is_failed = True
break
time.sleep(1)
if not is_failed:
logger.info(f"✅ 第 {i+1} 次任务成功启动且未检出即时报错")
return True
# 业务失败且有次数则间隔后重试
if i < retries - 1:
time.sleep(2)
continue
except Exception as e:
# 捕捉定位或点击失败
logger.warning(f"⚠️ 第 {i+1} 次点击尝试失败 (物理层): {e}")
if i < retries - 1:
time.sleep(2)
continue
# 3. 最终失败处理
error_msg = f"❌ 3D 生成任务在重试 {retries} 次后仍无法启动,程序强行退出"
logger.error(error_msg)
raise Exception(error_msg)
def wait_for_result_and_save(self, timeout=600):
"""等待右上角结果出现并点击保存"""
logger.info("⏳ 等待生成结果...")
# 右上角模型参数卡片里出现内容或“保存”按钮变得可用
start_time = time.time()
while time.time() - start_time < timeout:
content = self.page.content()
# 检查是否有反映生成完成的标志,或者截图中的“保存”按钮
if "保存" in content:
save_btn = self.page.locator("button:has-text('保存')").nth(0)
if save_btn.is_visible() and save_btn.is_enabled():
logger.info("✨ 生成结果已出现,点击[保存]按钮")
save_btn.click()
return True
time.sleep(5)
logger.error("❌ 等待结果超时")
return False
def handle_save_dialog(self):
"""处理保存模型弹窗及其路径选择流程"""
# 1. 生成随机资产名称
random_name = "".join(random.choices(string.ascii_letters + string.digits, k=8))
asset_name = f"Auto3D_{random_name}"
logger.info(f"📝 输入资产名称: {asset_name}")
# 输入资产名称
name_input = self.page.locator("input[placeholder='请输入资产名称']")
name_input.fill(asset_name)
time.sleep(1)
# 2. 点击保存路径触发弹窗
logger.info("📂 点击保存路径选择器...")
path_input = self.page.locator("input[placeholder='请选择要保存的文件夹']")
path_input.click()
time.sleep(2)
# 3. 路径选择弹窗内部操作
logger.info(f"📂 正在开启路径选择弹窗,进行多列导航...")
# 修正核心:弹窗标题实为“选择文件” (根据截图中的渲染结果)
file_picker_dialog = self.page.locator(".p-dialog:has-text('选择文件')")
# 定位左侧 3d生成(勿删) 根目录项
folder_item = file_picker_dialog.locator("span:has-text('3d生成(勿删)')").first
folder_item.click()
time.sleep(2) # 留出中间列刷新的渲染窗口
# 定位刚才生成的随机文件夹名称
logger.info(f"🎯 正在定位生成的文件夹项: {asset_name}")
try:
# 延长等待时间 (20s) 提高环境波动兼容性
target_item = file_picker_dialog.locator(f"//span[contains(text(), '{asset_name}')]").last
target_item.wait_for(state="attached", timeout=20000)
target_item.scroll_into_view_if_needed()
target_item.click()
logger.info(f"✅ 文件夹已在 [选择文件] 弹窗中成功选中")
except Exception as e:
logger.warning(f"⚠️ 在 20s 内未能自动选中项 {asset_name},可能是刷新延迟: {e}")
# 4. 点击路径选择弹窗的确认按钮
logger.info("⏳ 点击路径选择器[确定]按钮...")
# 修正:同步标题匹配为“选择文件”
path_dialog_confirm = file_picker_dialog.locator("button:has-text('确定')").last
path_dialog_confirm.click()
logger.info("✅ 路径跳转弹窗已确认并关闭")
time.sleep(2) # 留出 UI 彻底销毁的时间
# 5. 校验路径是否回填
filled_val = path_input.get_attribute("value")
logger.info(f"🔍 检查保存路径回填状态: {filled_val}")
# 6. 点击保存模型弹窗最终确认按钮 (精准定位保存弹窗确定)
logger.info("💾 点击保存模型弹窗最终确认按钮")
save_dialog_confirm = self.page.locator(".p-dialog:has-text('保存模型')").locator("button:has-text('确定')").last
save_dialog_confirm.wait_for(state="visible", timeout=5000)
save_dialog_confirm.click()
logger.info("🎉 3D 生成场景全流程保存确认成功")
time.sleep(2)
return asset_name

View File

@ -47,6 +47,7 @@ def run_full_file_lifecycle(fm, folder_name):
# 3. 正式上传(等待完成)
fm.upload_files(test_file)
fm.wait_for_success(count=1)
# 4. 重命名与删除
fm.rename_item("Fruits-15.zip", "UI_TEST_RENAMED.zip")
@ -56,5 +57,7 @@ def run_full_file_lifecycle(fm, folder_name):
# 5. 清理
fm.back_to_root()
time.sleep(5)
fm.delete_item(folder_name)
time.sleep(5)
logger.info("✅ 文件系统场景测试完成")

View File

@ -0,0 +1,94 @@
import time
import random
from framework.core.logger import get_logger
logger = get_logger("MonkeyTester")
def run_monkey_testing(page, action_count=1000):
"""
全屏幕 Monkey 稳定性测试 (随机流操作)
利用 JS 提取网页内所有的安全交互元素避开退出注销删除等敏感操作
然后执行随机规模的乱序点击和键盘输入最后伴随鼠标抛掷
"""
logger.info(f"🐒 开始执行全局 Monkey 测试,计划乱序攻击交互 {action_count} 次...")
# 增加捕捉页面底层级 JS 控制台报错的能力
def log_page_error(err):
logger.error(f"💥 [Monkey 探测到前端崩溃]: {err}")
# 监听未捕获异常
page.on("pageerror", log_page_error)
# 危险词汇,避免退出登录或产生破坏性删除导致平台主干链路被污染
forbidden_keywords = ['退出', '注销', 'logout', '删除', 'delete', '清理']
# JS 提取页面上的活动区域
js_get_random_element = """
(forbidden) => {
// 大面积收割所有可能挂载事件的锚点
const interactables = Array.from(document.querySelectorAll("button, a, input, textarea, [role='button'], div.p-dropdown, li.p-menuitem"));
const visible = interactables.filter(el => {
const rect = el.getBoundingClientRect();
const style = window.getComputedStyle(el);
return rect.width > 0 && rect.height > 0 && style.visibility !== 'hidden' && style.opacity !== '0';
});
const safeElements = visible.filter(el => {
const text = (el.innerText || el.value || el.placeholder || '').toLowerCase();
return !forbidden.some(kw => text.includes(kw));
});
if (safeElements.length > 0) {
const randomIndex = Math.floor(Math.random() * safeElements.length);
return safeElements[randomIndex];
}
return null;
}
"""
success_actions = 0
for i in range(action_count):
try:
# 高频发起,给予微小缓冲节奏
time.sleep(0.3)
# 使用 evaluate_handle 将 JS 节点句柄传回到 Python
element_handle = page.evaluate_handle(js_get_random_element, forbidden_keywords)
# null 在 Python 端 json_value 会被转换为 None但在 Handle 里需谨慎评估
is_valid = page.evaluate("(el) => el !== null && typeof el === 'object' && el.tagName", element_handle)
if is_valid:
tag_name = page.evaluate("el => el.tagName", element_handle).lower()
# 开始随机交互分发
if tag_name in ['input', 'textarea']:
# 随机混沌输入
random_txt = str(random.randint(100, 99999))
element_handle.fill(random_txt, force=True, timeout=1000)
logger.debug(f"[Monkey] ⌨️ 乱序输入框写入: {random_txt}")
else:
# 强硬暴力点击 (抛弃边界验证强制触发底层事件)
element_handle.click(force=True, timeout=1000)
logger.debug(f"[Monkey] 🖱️ 盲击了元素节点: <{tag_name}>")
success_actions += 1
else:
# 找不到靶点则强制滚屏扰动页面布局重绘
distance = random.choice([-600, -300, 300, 600])
page.mouse.wheel(0, distance)
logger.debug(f"[Monkey] 📜 随机视差滚屏了 {distance} px")
except Exception as e:
# Monkey 理念:容忍动作失败,但如果抛出的是框架级失去连接则需汇报
if "Target page, context or browser has been closed" in str(e):
logger.error("🚨 浏览器实例被截断Monkey 测试提前终止!")
break
# logger.debug(f"⚠️ Monkey 第 {i+1} 次挥拍落空: {e}")
pass
# 卸载挂载的全局异常监听,避免影响后续的常规测试
page.remove_listener("pageerror", log_page_error)
logger.info(f"✅ Monkey 测试执行完毕!成功施加了 {success_actions}/{action_count} 次有效随机压力。未探测到毁灭性 Crash。")
return True

View File

@ -0,0 +1,70 @@
import time
import random
import string
from framework.core.logger import get_logger
from framework.business.quantization_page import QuantizationPage
logger = get_logger("QuantizationScenario")
def _create_and_submit_task(quant_page: QuantizationPage, task_name: str):
"""内部辅助函数:执行单次完整的任务创建流"""
# 1. 进入任务创建流程
quant_page.navigate_to()
quant_page.click_create_task()
# 2. 输入名称
quant_page.input_task_name(task_name)
# 3. 选择编译模式
quant_page.select_mode_fast_eval()
# 4. 选择镜像
quant_page.select_image("dcloud_ai_toolchain_ubuntu_22_s100_cpu-2")
# 5. 选取模型文件
quant_page.upload_model_file("model", "result.onnx")
# 6. 输入 march 值
quant_page.input_march_param("nash-e")
# 7. 提交表单
quant_page.click_final_submit()
def run_quantization_lifecycle(quant_page: QuantizationPage):
"""量化任务全生命周期流程"""
logger.info("开始执行量化工具自动化流程...")
# ================= 阶段 1手动终止链路 =================
random_str1 = ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))
task_1 = f"AutoQuant_{random_str1}"
logger.info(f"--- 阶段一: 测试任务手动终止与删除 [{task_1}] ---")
_create_and_submit_task(quant_page, task_1)
# 1. 查日志
quant_page.open_and_close_logs(task_1)
# 2. 停止
quant_page.stop_task(task_1)
# 3. 删除
quant_page.delete_task(task_1)
# ================= 阶段 2完整成功链路 =================
random_str2 = ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))
task_2 = f"AutoQuant_{random_str2}"
logger.info(f"--- 阶段二: 测试任务完整成功链路 [{task_2}] ---")
_create_and_submit_task(quant_page, task_2)
# 1. 查日志
quant_page.open_and_close_logs(task_2)
# 监控状态流转为终态 (完成或失败)
quant_page.wait_for_task_finished(task_2)
# + 成功后的新增操作: 报告查看与全量下载拦截
quant_page.open_and_close_report(task_2)
quant_page.download_all_results(task_2)
logger.info("✅ 量化工具全生命周期(异常截断+成功收尾) 双链路完整验证通过!")

View File

@ -0,0 +1,35 @@
import time
from framework.core.logger import get_logger
logger = get_logger("ThreeDAssetsScenario")
def run_3d_assets_lifecycle(assets_page, asset_name):
"""
业务逻辑 3D 生成的资产归档到指定的业务包 (ui_test)
"""
logger.info(f"--- 开启 3D 资产归档测试 (资产名: {asset_name}) ---")
# 1. 导航到 3D 资产菜单
assets_page.navigate_to()
# 2. 进入 ui_test 资产包
if assets_page.enter_asset_package("ui_test"):
# 3. 开启新增数据子弹窗流程
if assets_page.add_asset_data(asset_name):
logger.info(f"✅ 资产 {asset_name} 关联至 ui_test 成功,准备深度校验...")
# 4. 列表校验 + 资产预览校验 (点开 X 关闭)
if not assets_page.verify_asset_and_preview(asset_name):
raise Exception("资产列表校验或 3D 预览渲染失败")
# 5. 源文件查看校验 (右上角按钮)
if not assets_page.check_source_files():
raise Exception("源文件侧边栏加载异常")
logger.info(f"✨ 3D 资产归档及渲染巡检圆满完成: {asset_name}")
else:
logger.error(f"❌ 3D 资产归档失败: {asset_name}")
raise Exception("3D 资产归档数据新增失败")
else:
logger.error("❌ 无法进入指定的资产包 'ui_test'")
raise Exception("无法进入资产包,流程中断")

View File

@ -0,0 +1,46 @@
import os
import random
import time
from framework.core.logger import get_logger
from framework.config.settings import Config
logger = get_logger("ThreeDGenerationScenario")
def run_3d_generation_lifecycle(threed_page):
"""
业务逻辑3D 生成 + 右上角保存全流程
"""
logger.info("--- 开启 3D 生成场景测试 (包含生成后保存) ---")
threed_page.navigate_to()
# 1. 随机选取一张 PNG 素材
test_data_dir = Config.TEST_DATA_DIR
png_files = [f for f in os.listdir(test_data_dir) if f.lower().endswith('.png')]
if not png_files:
logger.warning(f"⚠️ 目录 {test_data_dir} 下没发现 PNG, 尝试使用默认素材")
return
random_pic = random.choice(png_files)
pic_path = os.path.join(test_data_dir, random_pic)
# 调试
# threed_page.page.pause()
logger.info(f"🎲 随机选取的素材为: {random_pic}")
# 2. 上传并触发生成
threed_page.upload_image(pic_path)
time.sleep(1)
if threed_page.start_generation():
# 3. 等待生成结果出现并点击保存
if threed_page.wait_for_result_and_save():
# 4. 执行复杂的保存弹窗流程 (含随机命名与路径选择)
asset_name = threed_page.handle_save_dialog()
logger.info(f"✨ 3D 模型巡检完毕,资产 {asset_name} 已保存")
return asset_name
else:
logger.error("❌ 等待结果超时,未进入保存流程")
else:
logger.error("❌ 无法点击生成按钮,场景中断")
return None

View File

@ -863,6 +863,9 @@
<span class="icon">📊</span> 测试报告
<span class="nav-badge" id="badge-reports" style="background:var(--accent2)">0</span>
</div>
<div class="nav-item" data-page="stats" onclick="nav(this,'stats')">
<span class="icon">📈</span> 数据看板
</div>
</div>
<div class="nav-section">
@ -1001,6 +1004,7 @@
const pages = {
tasks: { title: '自动化任务', sub: '管理并执行所有自动化测试任务' },
reports: { title: '测试报告', sub: '查看历史运行报告与结果分析' },
stats: { title: '数据看板', sub: '全量巡检任务健康度与趋势分析' },
products: { title: 'Robogo', sub: 'Robogo PROD 环境全链路巡检' },
dataloop: { title: '数据闭环', sub: '数据闭环平台端到端验证' },
};
@ -1022,6 +1026,7 @@
const c = document.getElementById('content');
if (page === 'tasks') await renderTasks(c);
else if (page === 'reports') await renderReports(c);
else if (page === 'stats') await renderDashboard(c);
else if (page === 'products') renderProducts(c, 'robogo');
else if (page === 'dataloop') renderProducts(c, 'data_loop');
}
@ -1060,6 +1065,8 @@
<td>
<button class="action-btn" onclick="openLog('${t.id}','${t.name}','${t.status}')">日志</button>
${t.report_id ? `<button class="action-btn" style="margin-left:6px" onclick="viewReport('${t.id}')">报告</button>` : ''}
${t.status === 'running' ? `<button class="action-btn btn-danger" style="margin-left:6px" onclick="stopTask('${t.id}')">停止</button>` : ''}
<button class="action-btn" style="margin-left:6px;color:var(--danger)" onclick="deleteTask('${t.id}')">删除</button>
</td>
</tr>
`).join('')}
@ -1074,12 +1081,25 @@
c.innerHTML = emptyState('📊', '暂无报告', '完成一次任务后报告将在此出现');
return;
}
c.innerHTML = reports.sort((a, b) => new Date(b.finished_at) - new Date(a.finished_at)).map(r => {
c.innerHTML = (await Promise.all(reports.sort((a, b) => new Date(b.finished_at) - new Date(a.finished_at)).map(async r => {
const pct = r.total_runs > 0 ? Math.round(r.pass / r.total_runs * 100) : 0;
const dur = r.started_at && r.finished_at ? Math.round((new Date(r.finished_at) - new Date(r.started_at)) / 1000) : '?';
// 尝试获取截图
const detail = await fetch(`${API}/api/reports/${r.task_id}`).then(res => res.json()).catch(() => ({}));
const screenshots = detail.screenshots || [];
const shotHtml = screenshots.length > 0 ? `
<div style="margin-top:16px; border-top:1px dashed var(--border); padding-top:12px">
<div style="font-size:11px;color:var(--text-muted);margin-bottom:8px">🖼️ 报错截图存档:</div>
<div style="display:flex;gap:10px;overflow-x:auto;padding-bottom:10px">
${screenshots.map(s => `<img src="/artifacts/screenshots/${s}" style="height:80px;border-radius:6px;border:1px solid var(--border);cursor:zoom-in" onclick="window.open('/artifacts/screenshots/${s}')"/>`).join('')}
</div>
</div>
` : '';
return `
<div class="report-card">
<div>
<div class="report-card" style="display:block">
<div style="display:flex; justify-content:space-between; align-items:center">
<div style="display:flex;align-items:center;gap:12px">
<div class="report-result ${r.result}">${r.result === 'PASS' ? '✅' : '❌'} ${r.result}</div>
<div>
@ -1094,13 +1114,14 @@
</div>
</div>
</div>
<div class="progress-bar">
<div class="progress-fill" style="width:${pct}%;background:${r.result === 'PASS' ? 'var(--accent2)' : 'var(--danger)'}"></div>
</div>
<button class="btn btn-ghost" style="font-size:12px;padding:8px 14px" onclick="openLog('${r.task_id}','${r.task_name}','pass')">查看日志</button>
</div>
<button class="btn btn-ghost" style="font-size:12px;padding:8px 14px" onclick="openLog('${r.task_id}','${r.task_name}','pass')">查看日志</button>
<div class="progress-bar">
<div class="progress-fill" style="width:${pct}%;background:${r.result === 'PASS' ? 'var(--accent2)' : 'var(--danger)'}"></div>
</div>
${shotHtml}
</div>`;
}).join('');
}))).join('');
}
function renderProducts(c, key) {
@ -1331,6 +1352,48 @@
return String(t).replace(/&/g, '&amp;').replace(/</g, '&lt;').replace(/>/g, '&gt;');
}
// ── 治理功能 ──
async function deleteTask(id) {
if (!confirm('确定删除该任务及其所有日志、截图吗?')) return;
await fetch(`${API}/api/tasks/${id}`, { method: 'DELETE' });
renderPage(currentPage);
}
async function stopTask(id) {
if (!confirm('确定强行停止当前运行的任务吗?')) return;
await fetch(`${API}/api/tasks/${id}/stop`, { method: 'POST' });
renderPage(currentPage);
}
// ── 看板渲染 ──
async function renderDashboard(c) {
const stats = await fetch(`${API}/api/dashboard/stats`).then(r => r.json());
const brands = Object.entries(stats.products).map(([name, data]) => {
const total = data.pass + data.fail;
const rate = total > 0 ? Math.round(data.pass / total * 100) : 0;
return `
<div class="stat-card">
<div class="stat-label">${name.toUpperCase()} 健康度</div>
<div class="stat-value" style="color:${rate > 90 ? 'var(--accent2)' : 'var(--danger)'}">${rate}%</div>
<div class="stat-meta">累计巡检 ${total} 次</div>
</div>
`;
}).join('');
c.innerHTML = `
<div class="stats-grid">
${stat('全域平均通过率', stats.pass_rate + '%', `基于 ${stats.total_reports} 份报告`, 'linear-gradient(90deg, #6c72ff, #38e0b0)', stats.pass_rate >= 95 ? 'var(--accent2)' : '')}
${stat('累计失败任务', stats.fail_count, '亟待维护项', 'linear-gradient(90deg, var(--danger), #f97316)', stats.fail_count > 0 ? 'var(--danger)' : '')}
</div>
<div style="display:grid; grid-template-columns: repeat(2,1fr); gap:16px; margin-top:16px">
${brands}
</div>
<div class="panel" style="margin-top:24px; padding:24px; text-align:center; color:var(--text-muted)">
📊 数据已在 ${stats.ts} 完成聚合,系统运行稳定。
</div>
`;
}
// Close modal on overlay click
document.querySelectorAll('.modal-overlay').forEach(el => {
el.addEventListener('click', e => { if (e.target === el) closeModal(el.id); });

View File

@ -19,8 +19,13 @@ app = Flask(__name__, static_folder='platform/static', template_folder='platform
CORS(app)
# ── 持久化存储配置 ─────────────────────────────────────────────────────────────
# ── 持久化存储与资源目录 ─────────────────────────────────────────────────────────────
DB_FILE = "platform_db.json"
REPORTS_DIR = "platform_reports"
SCREENSHOTS_DIR = "platform_artifacts/screenshots"
os.makedirs(REPORTS_DIR, exist_ok=True)
os.makedirs(SCREENSHOTS_DIR, exist_ok=True)
def _load_db():
"""从本地文件加载数据,若不存在则初始化空文件"""
@ -106,6 +111,9 @@ def _stream_run(task_id: str, entry: str, account: str, password: str, run_count
env = os.environ.copy()
env["ROBOGO_USER"] = account
env["ROBOGO_PWD"] = password
# 注入统一截图路径与任务前缀
env["ROBOGO_SCREENSHOTS_DIR"] = os.path.abspath(SCREENSHOTS_DIR)
env["ROBOGO_TASK_ID"] = task_id
proc = subprocess.Popen(
[python_bin, entry],
@ -154,9 +162,10 @@ def _stream_run(task_id: str, entry: str, account: str, password: str, run_count
push(f"💥 执行异常: {e}", "ERROR")
total_fail += 1
# ── 生成报告 ───────────────────────────────────────────────────────────────
# ── 生成报告与日志分流 ───────────────────────────────────────────────────────────
finished_at = datetime.now().isoformat()
report = {
# 1. 报告摘要 (主库存储)
report_summary = {
"task_id": task_id,
"task_name": task["name"],
"product": task["product"],
@ -165,14 +174,46 @@ def _stream_run(task_id: str, entry: str, account: str, password: str, run_count
"fail": total_fail,
"started_at": task.get("started_at"),
"finished_at": finished_at,
"logs": logs_all,
"result": "PASS" if total_fail == 0 else "FAIL"
}
reports_db[task_id] = report
# 2. 完整日志 (物理文件隔离存储,防止主库过大)
log_file = os.path.join(REPORTS_DIR, f"{task_id}.json")
try:
with open(log_file, 'w', encoding='utf-8') as f:
json.dump({"logs": logs_all}, f, ensure_ascii=False)
except Exception as e:
push(f"❌ 物理日志保存失败: {e}", "ERROR")
reports_db[task_id] = report_summary
task["status"] = "pass" if total_fail == 0 else "fail"
task["finished_at"] = finished_at
task["report_id"] = task_id
_save_db() # 2. 报告生成完成,任务状态确立后保存
# ── 自动数据清理 (Retention Policy: 最多保留 100 条历史任务) ──
try:
if len(tasks_db) > 100:
# 按创建时间排序,找出最老的 50 条
oldest_ids = sorted(tasks_db.keys(), key=lambda k: tasks_db[k].get("created_at", ""))[:50]
for oid in oldest_ids:
tasks_db.pop(oid, None)
reports_db.pop(oid, None)
# 清除物理日志文件
old_log = os.path.join(REPORTS_DIR, f"{oid}.json")
if os.path.exists(old_log):
os.remove(old_log)
# 清除关联截图文件
try:
for f in os.listdir(SCREENSHOTS_DIR):
if f.startswith(oid):
os.remove(os.path.join(SCREENSHOTS_DIR, f))
except:
pass
print(f"🧹 已自动清理 50 条过期任务数据(含日志与截图)")
except:
pass
_save_db()
push(f"\n━━━━━━━━━ 测试完成 ━━━━━━━━━", "INFO")
push(f"总计: {run_count} 次 | 通过: {total_pass} | 失败: {total_fail}", "INFO")
@ -295,10 +336,114 @@ def get_report(task_id):
report = reports_db.get(task_id)
if not report:
return jsonify({"error": "Not Found"}), 404
return jsonify(report)
full_report = report.copy()
log_file = os.path.join(REPORTS_DIR, f"{task_id}.json")
if os.path.exists(log_file):
try:
with open(log_file, 'r', encoding='utf-8') as f:
log_data = json.load(f)
full_report["logs"] = log_data.get("logs", [])
except:
full_report["logs"] = []
# 扫描属于该任务的截图 (以 task_id 开头)
try:
shots = [f for f in os.listdir(SCREENSHOTS_DIR) if f.startswith(task_id)]
full_report["screenshots"] = sorted(shots)
except:
full_report["screenshots"] = []
return jsonify(full_report)
# ── 平台治理与数据聚合 路由 ──
@app.route("/api/tasks/<task_id>", methods=["DELETE"])
def delete_task(task_id):
"""原子化删除任务、报告与日志文件"""
try:
tasks_db.pop(task_id, None)
reports_db.pop(task_id, None)
# 清理日志
log_path = os.path.join(REPORTS_DIR, f"{task_id}.json")
if os.path.exists(log_path): os.remove(log_path)
# 清理截图
try:
for f in os.listdir(SCREENSHOTS_DIR):
if f.startswith(task_id): os.remove(os.path.join(SCREENSHOTS_DIR, f))
except: pass
_save_db()
return jsonify({"success": True}), 200
except Exception as e:
print(f"❌ 任务删除异常: {e}")
return jsonify({"error": str(e)}), 500
@app.route("/api/tasks/<task_id>/stop", methods=["POST"])
def stop_task(task_id):
"""强杀测试进程"""
try:
task = tasks_db.get(task_id)
if not task or task["status"] != "running":
return jsonify({"error": "Task not running"}), 400
pid = task.get("pid")
if pid:
try:
import signal
os.kill(pid, signal.SIGTERM)
task["status"] = "fail"
_save_db()
return jsonify({"success": True}), 200
except:
return jsonify({"error": "Failed to kill process"}), 500
return jsonify({"error": "No PID found"}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/dashboard/stats")
def get_stats():
"""看板聚合数据 API"""
try:
reports = list(reports_db.values())
total = len(reports)
passed = sum(1 for r in reports if r.get('result') == 'PASS')
prod_breakdown = {}
for r in reports:
p = r.get("product", "unknown")
if p not in prod_breakdown: prod_breakdown[p] = {"pass":0, "fail":0}
if r.get('result') == 'PASS': prod_breakdown[p]["pass"] += 1
else: prod_breakdown[p]["fail"] += 1
return jsonify({
"total_reports": total,
"pass_rate": round(passed/total*100, 1) if total > 0 else 0,
"fail_count": total - passed,
"products": prod_breakdown,
"ts": datetime.now().strftime("%H:%M:%S")
})
except Exception as e:
print(f"❌ 看板统计异常: {e}")
# 返回空数据而不是报错,防止前端彻底崩溃
return jsonify({
"total_reports": 0, "pass_rate": 0, "fail_count": 0,
"products": {}, "ts": datetime.now().strftime("%H:%M:%S")
})
# ── 静态资源路由 ──
@app.route("/artifacts/screenshots/<path:filename>")
def serve_screenshot(filename):
"""提供截图访问能力"""
return send_from_directory(SCREENSHOTS_DIR, filename)
# ── 前端路由 (SPA 单页) ───────────────────────────────────────────────────────
@app.route("/")
@app.route("/<path:path>")
def serve_index(path=""):
@ -306,5 +451,5 @@ def serve_index(path=""):
if __name__ == "__main__":
print("🚀 自动化平台启动中... http://127.0.0.1:5001")
print("🚀 自动化平台 (架构升级版) 启动中... http://127.0.0.1:5001")
app.run(host="127.0.0.1", port=5001, debug=False, threaded=True)

File diff suppressed because it is too large Load Diff