162 lines
6.8 KiB
Python
162 lines
6.8 KiB
Python
"""
|
||
UI 自动化基类占位 (推荐使用 Playwright)
|
||
"""
|
||
from playwright.sync_api import sync_playwright
|
||
from framework.core.logger import get_logger
|
||
import time
|
||
|
||
logger = get_logger("BaseUI")
|
||
|
||
class BaseUI:
|
||
def __init__(self, headless=False):
|
||
self.playwright = None
|
||
self.browser = None
|
||
self.context = None
|
||
self.page = None
|
||
self.headless = headless
|
||
|
||
def start(self):
|
||
"""启动浏览器并伪装"""
|
||
self.playwright = sync_playwright().start()
|
||
# 增加伪装配置
|
||
self.browser = self.playwright.chromium.launch(
|
||
headless=self.headless,
|
||
args=["--disable-blink-features=AutomationControlled"]
|
||
)
|
||
# 设置真实的 User-Agent
|
||
user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
|
||
self.context = self.browser.new_context(user_agent=user_agent)
|
||
self.page = self.context.new_page()
|
||
|
||
# 实时打印浏览器控制台日志,方便排查白屏
|
||
self.page.on("console", lambda msg: logger.info(f"[BROWSER LOG] {msg.text}"))
|
||
self.page.on("pageerror", lambda exc: logger.error(f"[BROWSER ERROR] {exc}"))
|
||
|
||
logger.info("Browser started with spoofing and logging")
|
||
return self.page
|
||
|
||
def stop(self):
|
||
"""关闭浏览器"""
|
||
if self.browser:
|
||
self.browser.close()
|
||
if self.playwright:
|
||
self.playwright.stop()
|
||
logger.info("Browser stopped")
|
||
|
||
def navigate(self, url):
|
||
logger.info(f"Navigate to {url}")
|
||
# 使用更稳健的加载策略:DOM 加载完即继续,后续靠 wait_for_selector
|
||
self.page.goto(url, wait_until="domcontentloaded", timeout=30000)
|
||
# 给 2 秒缓冲时间让脚本执行
|
||
time.sleep(2)
|
||
|
||
def click(self, selector, timeout=5000):
|
||
try:
|
||
logger.info(f"Clicking: {selector}")
|
||
self.page.click(selector, timeout=timeout)
|
||
except Exception as e:
|
||
self.page.screenshot(path="click_failed.png")
|
||
logger.error(f"Click failed on {selector}, saved screenshot to click_failed.png")
|
||
raise e
|
||
|
||
def fill(self, selector, value):
|
||
logger.info(f"Filling {selector} with value")
|
||
self.page.fill(selector, value)
|
||
|
||
def wait_for_selector(self, selector, timeout=10000):
|
||
self.page.wait_for_selector(selector, timeout=timeout)
|
||
|
||
def smart_find(self, description, role=None, timeout=5000):
|
||
"""核心智能定位器:按优先级尝试语义定位"""
|
||
strategies = [
|
||
lambda: self.page.get_by_role(role, name=description, exact=False) if role else None,
|
||
lambda: self.page.get_by_text(description, exact=False),
|
||
lambda: self.page.get_by_placeholder(description, exact=False),
|
||
lambda: self.page.get_by_label(description, exact=False),
|
||
lambda: self.page.get_by_title(description, exact=False)
|
||
]
|
||
|
||
for get_loc in strategies:
|
||
try:
|
||
loc = get_loc()
|
||
if loc and loc.count() > 0:
|
||
# 优先返回第一个匹配的可交互元素
|
||
return loc.first
|
||
except:
|
||
continue
|
||
|
||
# 最后的兜底尝试:简单的 CSS 选择器尝试 (如果 description looks like one)
|
||
if "." in description or "#" in description or "[" in description:
|
||
try:
|
||
return self.page.locator(description).first
|
||
except:
|
||
pass
|
||
|
||
raise Exception(f"❌ 无法识别页面元素: '{description}'")
|
||
|
||
def smart_click(self, text, role=None, timeout=5000):
|
||
"""语义化点击"""
|
||
logger.info(f"👉 [SmartClick] 正在尝试点击: {text}")
|
||
try:
|
||
el = self.smart_find(text, role=role, timeout=timeout)
|
||
el.scroll_into_view_if_needed()
|
||
el.click(timeout=timeout)
|
||
except Exception as e:
|
||
logger.warning(f"⚠️ [SmartClick] 常规语义匹配失败: {e}. 正在尝试暴力文本搜索...")
|
||
# 暴力 JS 点击兜底
|
||
success = self.page.evaluate(f"""(txt) => {{
|
||
const items = Array.from(document.querySelectorAll('button, a, span, .p-button-label'));
|
||
const target = items.find(el => el.innerText.trim().includes(txt));
|
||
if (target) {{ target.click(); return true; }}
|
||
return false;
|
||
}}""", text)
|
||
if not success:
|
||
self.page.screenshot(path=f"smart_click_failed_{int(time.time())}.png")
|
||
raise e
|
||
|
||
def smart_fill(self, label, value, timeout=5000):
|
||
"""语义化输入"""
|
||
logger.info(f"⌨️ [SmartFill] 正在尝试在 [{label}] 填写: {value}")
|
||
try:
|
||
# 1. 尝试直接通过 Label/Placeholder 寻找
|
||
el = self.smart_find(label, role="textbox", timeout=timeout)
|
||
el.fill(value)
|
||
except:
|
||
# 2. 如果失败,尝试找到标签文字,然后填充它附近的 input
|
||
logger.info(f"🔍 [SmartFill] 尝试寻找临近 [{label}] 的输入框...")
|
||
self.page.evaluate(f"""([lbl, val]) => {{
|
||
const allTexts = Array.from(document.querySelectorAll('label, span, div'))
|
||
.filter(el => el.innerText.trim() === lbl);
|
||
for (const textItem of allTexts) {{
|
||
const parent = textItem.parentElement;
|
||
const input = parent.querySelector('input, textarea');
|
||
if (input) {{
|
||
input.value = val;
|
||
input.dispatchEvent(new Event('input', {{ bubbles: true }}));
|
||
input.dispatchEvent(new Event('change', {{ bubbles: true }}));
|
||
return true;
|
||
}}
|
||
}}
|
||
return false;
|
||
}}""", [label, value])
|
||
|
||
def get_session_info(self):
|
||
"""动态从浏览器提取 Session 信息"""
|
||
logger.info("正在从浏览器会话提取认证信息...")
|
||
# 提取 Cookie
|
||
cookies = self.context.cookies()
|
||
cookie_str = "; ".join([f"{c['name']}={c['value']}" for c in cookies])
|
||
|
||
# 尝试从 localStorage 提取 Token (通常 key 可能是 token, access_token 或 bearer)
|
||
token = self.page.evaluate("""() => {
|
||
return localStorage.getItem('token') ||
|
||
localStorage.getItem('access_token') ||
|
||
localStorage.getItem('bearer') ||
|
||
JSON.parse(localStorage.getItem('user_info') || '{}').token;
|
||
}""")
|
||
|
||
if token and not token.startswith("Bearer "):
|
||
token = f"Bearer {token}"
|
||
|
||
return {"token": token, "cookie": cookie_str}
|