""" UI 自动化基类占位 (推荐使用 Playwright) """ from playwright.sync_api import sync_playwright from framework.core.logger import get_logger import time logger = get_logger("BaseUI") class BaseUI: def __init__(self, headless=False): self.playwright = None self.browser = None self.context = None self.page = None self.headless = headless def start(self): """启动浏览器并伪装""" self.playwright = sync_playwright().start() # 增加伪装配置 self.browser = self.playwright.chromium.launch( headless=self.headless, args=["--disable-blink-features=AutomationControlled"] ) # 设置真实的 User-Agent user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" self.context = self.browser.new_context(user_agent=user_agent) self.page = self.context.new_page() # 实时打印浏览器控制台日志,方便排查白屏 self.page.on("console", lambda msg: logger.info(f"[BROWSER LOG] {msg.text}")) self.page.on("pageerror", lambda exc: logger.error(f"[BROWSER ERROR] {exc}")) logger.info("Browser started with spoofing and logging") return self.page def stop(self): """关闭浏览器""" if self.browser: self.browser.close() if self.playwright: self.playwright.stop() logger.info("Browser stopped") def navigate(self, url): logger.info(f"Navigate to {url}") # 使用更稳健的加载策略:DOM 加载完即继续,后续靠 wait_for_selector self.page.goto(url, wait_until="domcontentloaded", timeout=30000) # 给 2 秒缓冲时间让脚本执行 time.sleep(2) def click(self, selector, timeout=5000): try: logger.info(f"Clicking: {selector}") self.page.click(selector, timeout=timeout) except Exception as e: self.page.screenshot(path="click_failed.png") logger.error(f"Click failed on {selector}, saved screenshot to click_failed.png") raise e def fill(self, selector, value): logger.info(f"Filling {selector} with value") self.page.fill(selector, value) def wait_for_selector(self, selector, timeout=10000): self.page.wait_for_selector(selector, timeout=timeout) def smart_find(self, description, role=None, timeout=5000): """核心智能定位器:按优先级尝试语义定位""" strategies = [ lambda: self.page.get_by_role(role, name=description, exact=False) if role else None, lambda: self.page.get_by_text(description, exact=False), lambda: self.page.get_by_placeholder(description, exact=False), lambda: self.page.get_by_label(description, exact=False), lambda: self.page.get_by_title(description, exact=False) ] for get_loc in strategies: try: loc = get_loc() if loc and loc.count() > 0: # 优先返回第一个匹配的可交互元素 return loc.first except: continue # 最后的兜底尝试:简单的 CSS 选择器尝试 (如果 description looks like one) if "." in description or "#" in description or "[" in description: try: return self.page.locator(description).first except: pass raise Exception(f"❌ 无法识别页面元素: '{description}'") def smart_click(self, text, role=None, timeout=5000): """语义化点击""" logger.info(f"👉 [SmartClick] 正在尝试点击: {text}") try: el = self.smart_find(text, role=role, timeout=timeout) el.scroll_into_view_if_needed() el.click(timeout=timeout) except Exception as e: logger.warning(f"⚠️ [SmartClick] 常规语义匹配失败: {e}. 正在尝试暴力文本搜索...") # 暴力 JS 点击兜底 success = self.page.evaluate(f"""(txt) => {{ const items = Array.from(document.querySelectorAll('button, a, span, .p-button-label')); const target = items.find(el => el.innerText.trim().includes(txt)); if (target) {{ target.click(); return true; }} return false; }}""", text) if not success: self.page.screenshot(path=f"smart_click_failed_{int(time.time())}.png") raise e def smart_fill(self, label, value, timeout=5000): """语义化输入""" logger.info(f"⌨️ [SmartFill] 正在尝试在 [{label}] 填写: {value}") try: # 1. 尝试直接通过 Label/Placeholder 寻找 el = self.smart_find(label, role="textbox", timeout=timeout) el.fill(value) except: # 2. 如果失败,尝试找到标签文字,然后填充它附近的 input logger.info(f"🔍 [SmartFill] 尝试寻找临近 [{label}] 的输入框...") self.page.evaluate(f"""([lbl, val]) => {{ const allTexts = Array.from(document.querySelectorAll('label, span, div')) .filter(el => el.innerText.trim() === lbl); for (const textItem of allTexts) {{ const parent = textItem.parentElement; const input = parent.querySelector('input, textarea'); if (input) {{ input.value = val; input.dispatchEvent(new Event('input', {{ bubbles: true }})); input.dispatchEvent(new Event('change', {{ bubbles: true }})); return true; }} }} return false; }}""", [label, value]) def get_session_info(self): """动态从浏览器提取 Session 信息""" logger.info("正在从浏览器会话提取认证信息...") # 提取 Cookie cookies = self.context.cookies() cookie_str = "; ".join([f"{c['name']}={c['value']}" for c in cookies]) # 尝试从 localStorage 提取 Token (通常 key 可能是 token, access_token 或 bearer) token = self.page.evaluate("""() => { return localStorage.getItem('token') || localStorage.getItem('access_token') || localStorage.getItem('bearer') || JSON.parse(localStorage.getItem('user_info') || '{}').token; }""") if token and not token.startswith("Bearer "): token = f"Bearer {token}" return {"token": token, "cookie": cookie_str}