test/framework/business/quantization_page.py

279 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import os
import random
import string
from framework.core.base_page import BasePage
from framework.core.logger import get_logger
logger = get_logger("QuantizationPage")
class QuantizationPage(BasePage):
"""量化工具页面 POM"""
MENU_TEXT = "量化工具"
def navigate_to(self):
"""进入量化工具页面"""
logger.info("正在切换到量化工具页面...")
self.smart_click(self.MENU_TEXT)
try:
self.page.wait_for_selector(".p-button:has-text('保存'), .p-tabmenu", timeout=10000)
except:
pass
time.sleep(2)
def click_create_task(self):
"""点击创建任务按钮"""
logger.info("正在点击创建任务按钮...")
self.smart_click("创建任务")
time.sleep(2)
def input_task_name(self, task_name="AutoQuantization"):
"""选中任务名称清空,输入随机任务名称"""
logger.info(f"📝 准备输入任务名称: {task_name}")
# 1. 精准定位:找到包含该 label 的直接父容器,从而限定在这个特定输入组内
input_box = self.page.locator("label").filter(has_text="任务名称").locator("..").locator("input").first
input_box.wait_for(state="visible", timeout=15000)
# 2. 强制清空
input_box.click()
# 识别系统
modifier = "Meta" if "mac" in str(self.page.context.browser.browser_type).lower() else "Control"
self.page.keyboard.press(f"{modifier}+A")
self.page.keyboard.press("Backspace")
# 3. 填入新值
input_box.fill(task_name)
time.sleep(1)
def navigate_to_quantization_task_config(self):
"""点击编译模型转换模式"""
logger.info("正在切换到量化任务配置页面...")
self.smart_click("编译模型转换模式")
try:
self.page.wait_for_selector(".p-button:has-text('保存'), .p-tabmenu", timeout=10000)
except:
pass
time.sleep(2)
"""点击工具版本"""
def click_tool_version(self):
logger.info("正在点击工具版本...")
self.smart_click("工具版本")
time.sleep(2)
def select_mode_fast_eval(self):
"""选择快速性能评测模式"""
logger.info("下拉选择: 快速性能评测模式")
# 修正: label 包在外部容器中p-select 内部并没有该文本,需通过容器层级定位
container = self.page.locator("div:has(label:has-text('编译模型转换模式'))").first
dropdown = container.locator(".p-select").first
dropdown.click()
dropdown = container.locator(".p-select").first
dropdown.click()
# 弹出的选项列表中点击
option = self.page.locator("li.p-select-option:has-text('快速性能评测模式')")
option.click()
time.sleep(1)
def select_image(self, image_name="dcloud_ai_toolchain_ubuntu_22_s100_cpu-2"):
"""选择镜像流程"""
logger.info(f"点击选择镜像按钮,拉起弹窗...")
self.page.locator("button.p-button-link:has-text('选择镜像')").click()
# 在弹窗中定位镜像项
logger.info(f"📍 在弹窗中寻找镜像: {image_name}")
image_row = self.page.locator(f"div:has-text('{image_name}')").last
image_row.scroll_into_view_if_needed()
image_row.click()
# 点击弹窗内部的确定 (限制定位器在 p-dialog 容器内)
logger.info("⏳ 点击[镜像弹窗]确认按钮...")
dialog_confirm = self.page.locator(".p-dialog:has-text('群组镜像')").locator("button:has-text('确定')").last
dialog_confirm.click()
time.sleep(2)
def input_march_param(self, value="nash-e"):
"""模型参数 march 输入"""
logger.info(f"📝 模型参数输入 march: {value}")
# 精准定位:找到特定的 label回到直接父容器再找 input防止全局 flex 匹配
march_input = self.page.locator("label").filter(has_text="march").locator("..").locator("input").first
march_input.scroll_into_view_if_needed()
# 强制清空再输入
march_input.click()
modifier = "Meta" if "mac" in str(self.page.context.browser.browser_type).lower() else "Control"
self.page.keyboard.press(f"{modifier}+A")
self.page.keyboard.press("Backspace")
march_input.fill(value)
time.sleep(1)
def click_final_submit(self):
"""点击页面最终确认按钮"""
logger.info("💾 点击量化任务最终[确认]按钮")
# 往往在页面正下方,不在弹窗内
confirm_btn = self.page.locator("button:has-text('确定')").last
confirm_btn.click()
logger.info("🎉 量化任务已成功提交")
time.sleep(2)
def upload_model_file(self, folder_name="model", file_name="result.onnx"):
"""点击模型文件框,选择具体模型文件"""
logger.info("📂 点击[模型文件]区域,拉起路径选择...")
# 定位虚线框
self.page.locator("div.border-dashed:has-text('点击上传模型')").click()
time.sleep(1)
# 在“选择文件”弹窗内操作
file_picker = self.page.locator(".p-dialog:has-text('选择文件')")
# 1. 点击左侧文件夹
logger.info(f"📍 进入文件夹: {folder_name}")
file_picker.locator(f"span.text-sm:has-text('{folder_name}')").click()
time.sleep(1)
# 2. 点击右侧文件列表中的目标项
logger.info(f"📄 选择具体文件: {file_name}")
file_picker.locator(f"span:has-text('{file_name}')").click()
time.sleep(1)
# 3. 点击弹窗底部的确定
logger.info("⏳ 点击[选择文件]弹窗确认按钮...")
file_picker.locator("button:has-text('确定')").last.click()
logger.info("✅ 模型文件选取成功")
time.sleep(1)
def open_and_close_logs(self, task_name):
"""等待任务进入运行中状态后,点击日志打开新窗口,等待加载后关闭"""
import re
logger.info(f"📄 准备查看任务 [{task_name}] 的日志...")
row = self.page.locator("tr").filter(has_text=task_name).first
# 1. 拦截队列:必须等到状态跳变出[排队中]
logger.info(f"⏳ 正在监听排队队列,等待任务步入 [运行中] 状态...")
state_loc = row.locator("span, div").filter(has_text=re.compile(r"运行中|已完成|失败")).first
state_loc.wait_for(state="visible", timeout=180000) # 最多等待3分钟排队
curr_state = state_loc.text_content()
if "运行中" not in curr_state:
logger.warning(f"⚠️ 状态跃迁过快或异常,当前状态已达 [{curr_state}],无需再去查看动态运行日志。")
return
logger.info(f"✅ 状态刷新确认: [运行中]!正在拉起全屏日志流面板...")
# 2. 监听新页面的弹出
with self.page.context.expect_page() as new_page_info:
row.locator("button:has-text('日志')").click()
new_page = new_page_info.value
new_page.wait_for_load_state("domcontentloaded")
logger.info("✅ 已切入日志新窗口,等待底层视图渲染...")
new_page.locator("body").wait_for(state="visible", timeout=10000)
time.sleep(2) # 给足够的流式打印时间预览
new_page.close()
logger.info("✅ 日志窗口已平滑关闭,切回主视图。测试流继续下放。")
def stop_task(self, task_name):
"""停止任务并等待状态变为已停止"""
logger.info(f"⏹ 准备停止任务: {task_name}")
row = self.page.locator("tr").filter(has_text=task_name).first
# 等待停止按钮可点击
stop_btn = row.locator("button:has-text('停止')")
stop_btn.click()
logger.info("⏳ 等待任务状态变为 [已停止]...")
# 等待出现 已停止 的标签
row.locator("span, div").filter(has_text="已停止").first.wait_for(state="visible", timeout=60000)
logger.info("✅ 任务停止成功")
def delete_task(self, task_name):
"""删除已停止的任务"""
logger.info(f"🗑 正在删除任务: {task_name}")
row = self.page.locator("tr").filter(has_text=task_name).first
row.locator("button:has-text('删除')").click()
# 点击气泡确认
logger.info("点击确认删除弹窗...")
confirm_btn = self.page.locator(".p-confirm-popup-accept-button, button:has-text('确定')").last
confirm_btn.wait_for(state="visible", timeout=5000)
confirm_btn.click()
# 校验行是否消失
row.wait_for(state="hidden", timeout=10000)
logger.info(f"✅ 任务 [{task_name}] 已从列表中移除")
def wait_for_task_finished(self, task_name, timeout_mins=5):
"""轮询等待任务达到终态 (已完成 / 失败)"""
import re
logger.info(f"⏳ 正在监听任务 [{task_name}] 执行进度 (上限 {timeout_mins} 分钟)...")
row = self.page.locator("tr").filter(has_text=task_name).first
try:
# 同时匹配"已完成"或"失败"任一状态
end_state = row.locator("span, div").filter(has_text=re.compile(r"已完成|失败")).first
end_state.wait_for(state="visible", timeout=timeout_mins * 60000)
state_text = end_state.text_content()
if "失败" in state_text:
logger.error(f"❌ 任务执行异常: 监控到状态变为 [失败]")
raise Exception("后端量化任务处理失败")
else:
logger.info(f"🎉 业务通过: 任务执行成功结束,当前处于 [{state_text}] 状态")
time.sleep(2)
except Exception as e:
logger.error(f"❌ 监听任务进度时发生中断或超时: {e}")
raise e
def open_and_close_report(self, task_name):
"""点击报告打开新窗口,等待页面完全加载后关闭"""
logger.info(f"📊 准备查看任务 [{task_name}] 的检查报告...")
row = self.page.locator("tr").filter(has_text=task_name).first
# 拦截新标签页弹出
with self.page.context.expect_page() as new_page_info:
row.locator("button:has-text('报告')").click()
new_page = new_page_info.value
new_page.wait_for_load_state("domcontentloaded")
new_page.locator("body").wait_for(state="visible", timeout=15000)
logger.info("✅ 已切入[报告]新窗口基础DOM渲染就绪...")
time.sleep(3) # 提供安全的静态视觉缓冲期保证页面完全加载
new_page.close()
logger.info("✅ 报告页面窗口已关闭,切回主视图。")
def download_all_results(self, task_name):
"""点击下载拉起弹窗,下载全部结果并等待下载任务完成后结束"""
logger.info(f"📥 准备下载任务 [{task_name}] 的全部输出结果...")
row = self.page.locator("tr").filter(has_text=task_name).first
# 1. 点击当前行下载动作
row.locator("button:has-text('下载')").click()
# 2. 接管全局下载弹窗
dialog = self.page.locator(".p-dialog").filter(has_text="下载全部结果")
dialog.wait_for(state="visible", timeout=5000)
# 3. 阻塞拦截底层下载事件
logger.info("⏳ 点击[下载全部结果],安全挂起等待文件流写入完毕...")
dl_btn = dialog.locator("button:has-text('下载全部结果')")
with self.page.expect_download(timeout=60000) as download_info:
dl_btn.click()
download = download_info.value
# 主动调用 path() 即可在当前主线程强阻塞,直至磁盘下载动作完全落盘成功
dl_path = download.path()
logger.info(f"🎉 文件下载成功落盘!临时归档路径: {dl_path}")
time.sleep(2)
# 4. 收尾清理弹窗 UI
close_btn = dialog.locator("button.p-dialog-header-close").first
if close_btn.is_visible():
close_btn.click()
time.sleep(1)
logger.info("✅ 下载流程顺畅完结。")