test/framework/business/data_management.py

125 lines
4.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from framework.config.settings import Config
from framework.core.logger import get_logger
from framework.business.login_page import LoginPage
from framework.business.file_manager_page import FileManagerPage
from framework.business.dev_machine_page import DevMachinePage
from framework.business.cloud_desktop_page import CloudDesktopPage
# 引入抽离出来的业务场景
from framework.scripts.file_system_scenario import run_full_file_lifecycle
from framework.scripts.compute_resource_scenario import run_dev_machine_lifecycle
from framework.scripts.desktop_lifecycle import run_cloud_desktop_lifecycle
from framework.scripts.mirror_assets import run_mirror_assets_lifecycle
from framework.business.mirror_assets_page import MirrorAssetsPage
import time
import os
logger = get_logger("DataManagementRunner")
class DataManagement:
"""符合 PO 模式的场景指挥官"""
def __init__(self, headless=False):
self.ui = LoginPage(headless=headless)
self.page = None
self.fm = None
self.dm = None
self.cd = None
self.ma = None
def start(self):
"""启动浏览器并初始化组件"""
self.page = self.ui.start()
# 初始化页面原子对象
self.fm = FileManagerPage(self.page)
self.dm = DevMachinePage(self.page)
self.cd = CloudDesktopPage(self.page)
self.ma = MirrorAssetsPage(self.page)
def login(self, user, pwd):
"""执行登录流程"""
return self.ui.login(user, pwd)
def _safe_screenshot(self, name):
"""安全截图,防止浏览器已关闭时报错"""
try:
if self.page:
self.page.screenshot(path=name)
except:
logger.warning(f"⚠️ 截图失败(浏览器可能已关闭): {name}")
def run_all_scenarios(self):
"""
场景指挥:依次执行所有的业务流
每个场景独立 try-except一个失败不阻塞后续场景
"""
errors = []
# 1. 执行文件系统场景 (跳过)
try:
run_full_file_lifecycle(self.fm, Config.FOLDER_NAME)
self._safe_screenshot("file_system_final.png")
logger.info("✅ 文件系统场景通过")
except Exception as e:
logger.error(f"❌ 文件系统场景失败: {e}")
self._safe_screenshot("file_system_error.png")
errors.append(f"文件系统: {e}")
# 2. 执行开发机场景 (跳过)
try:
run_dev_machine_lifecycle(self.dm)
self._safe_screenshot("dev_machine_final.png")
logger.info("✅ 开发机场景通过")
except Exception as e:
logger.error(f"❌ 开发机场景失败: {e}")
self._safe_screenshot("dev_machine_error.png")
errors.append(f"开发机: {e}")
# 3. 执行云桌面场景
try:
run_cloud_desktop_lifecycle(self.cd)
self._safe_screenshot("cloud_desktop_final.png")
logger.info("✅ 云桌面场景通过")
except Exception as e:
logger.error(f"❌ 云桌面场景失败: {e}")
self._safe_screenshot("cloud_desktop_error.png")
errors.append(f"云桌面: {e}")
# 4. 执行镜像资产场景
try:
run_mirror_assets_lifecycle(self.ma, self.cd)
self._safe_screenshot("mirror_assets_final.png")
logger.info("✅ 镜像资产场景通过")
except Exception as e:
logger.error(f"❌ 镜像资产场景失败: {e}")
self._safe_screenshot("mirror_assets_error.png")
errors.append(f"镜像资产: {e}")
# 汇总
if errors:
summary = " | ".join(errors)
logger.error(f"{len(errors)} 个场景失败: {summary}")
raise Exception(f"{len(errors)} 个场景失败: {summary}")
logger.info("🎉 所有 UI 模块遍历测试圆满完成!")
def run(self, user, pwd):
"""主入口"""
try:
self.start()
if not self.login(user, pwd):
return
# 开始执行指挥任务
self.run_all_scenarios()
finally:
self.ui.stop()
if __name__ == "__main__":
account = input("账号: ")
password = input("密码: ")
dm = DataManagement(headless=False)
dm.run(account, password)