FE/backend/main.py

665 lines
23 KiB
Python

from fastapi import FastAPI, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from typing import List, Optional, Any
from pydantic import BaseModel
import uuid, datetime
import models
from database import engine, get_db
from feishu_client import FeishuClient
models.Base.metadata.create_all(bind=engine)
# Initialize Feishu Client
FEISHU_APP_ID = "cli_a9aeb4fb2c78dcb6"
FEISHU_APP_SECRET = "7nYs724srjEn4jgNPJW9cfuqL4e2OVT6"
feishu = FeishuClient(FEISHU_APP_ID, FEISHU_APP_SECRET)
app = FastAPI(title="QuantumTest Backend")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ─────────────────────────────────────────────
# Pydantic Schemas
# ─────────────────────────────────────────────
class TestCaseSchema(BaseModel):
id: Optional[str] = None
caseId: Optional[str] = None
text: Optional[str] = None
module: Optional[str] = None
type: Optional[str] = None
priority: Optional[str] = None
reviewStatus: Optional[str] = None
executionStatus: Optional[str] = None
maintainer: Optional[str] = None
requirementId: Optional[str] = None
bugId: Optional[str] = None
parentId: Optional[str] = None
steps: Optional[list] = None
tags: Optional[List[str]] = None
reviewers: Optional[List[str]] = None
children: Optional[List[Any]] = []
class TaskSchema(BaseModel):
id: Optional[str] = None
name: str
status: Optional[str] = "PENDING"
planId: Optional[str] = None
assignees: Optional[List[str]] = []
class TaskStatusUpdate(BaseModel):
status: str
class TestPlanSchema(BaseModel):
id: Optional[str] = None
name: str
type: str
caseIds: Optional[List[str]] = []
assignees: Optional[List[str]] = []
createdAt: Optional[str] = None
class BugSchema(BaseModel):
id: Optional[str] = None
title: str
status: Optional[str] = "OPEN"
caseId: Optional[str] = None
class BatchReviewSchema(BaseModel):
caseIds: List[str]
reviewerOpenId: str
moduleName: Optional[str] = None
message: Optional[str] = "请协助评审以下测试用例"
class BatchUpdateSchema(BaseModel):
caseIds: List[str]
maintainer: Optional[str] = None
reviewers: Optional[List[str]] = None
class SpaceSchema(BaseModel):
id: Optional[str] = None
name: str
# ─────────────────────────────────────────────
# Health check
# ─────────────────────────────────────────────
@app.get("/")
def read_root():
return {"message": "QuantumTest API is running", "version": "2.0"}
# ─────────────────────────────────────────────
# Test Cases
# ─────────────────────────────────────────────
def _build_tree(cases):
case_dict = {}
for c in cases:
case_dict[c.id] = {
"id": c.id,
"caseId": c.case_id,
"text": c.text,
"module": c.module,
"type": c.type,
"priority": c.priority,
"reviewStatus": c.review_status,
"executionStatus": c.execution_status,
"maintainer": c.maintainer,
"requirementId": c.requirement_id,
"bugId": c.bug_id,
"steps": c.steps,
"tags": c.tags,
"reviewers": c.reviewers or [],
"parentId": c.parent_id,
"children": [],
}
root_nodes = []
for c in cases:
if c.parent_id and c.parent_id in case_dict:
case_dict[c.parent_id]["children"].append(case_dict[c.id])
else:
root_nodes.append(case_dict[c.id])
return root_nodes
@app.get("/api/cases")
def get_cases(space_id: str, db: Session = Depends(get_db)):
cases = db.query(models.TestCase).filter(models.TestCase.space_id == space_id).all()
return {"data": _build_tree(cases)}
@app.post("/api/cases")
def create_case(case: TestCaseSchema, space_id: str, db: Session = Depends(get_db)):
existing = db.query(models.TestCase).filter(models.TestCase.id == case.id).first()
if existing:
return {"message": "already exists", "id": case.id}
db_case = models.TestCase(
id=case.id,
case_id=case.caseId,
text=case.text or "新建用例",
module=case.module,
type=case.type,
priority=case.priority,
review_status=case.reviewStatus,
execution_status=case.executionStatus,
maintainer=case.maintainer,
requirement_id=case.requirementId,
bug_id=case.bugId,
parent_id=case.parentId,
space_id=space_id,
steps=case.steps,
tags=case.tags,
reviewers=case.reviewers,
)
db.add(db_case)
db.commit()
return {"message": "success", "id": db_case.id}
@app.post("/api/cases/batch")
def batch_create_cases(cases: List[TestCaseSchema], space_id: str, db: Session = Depends(get_db)):
for case in cases:
db_case = models.TestCase(
id=case.id,
case_id=case.caseId,
text=case.text or "新建用例",
module=case.module,
type=case.type,
priority=case.priority,
review_status=case.reviewStatus,
execution_status=case.executionStatus,
maintainer=case.maintainer,
requirement_id=case.requirementId,
bug_id=case.bugId,
parent_id=case.parentId,
space_id=space_id,
steps=case.steps,
tags=case.tags,
reviewers=case.reviewers,
)
db.add(db_case)
db.commit()
return {"message": "success", "count": len(cases)}
@app.post("/api/cases/batch")
async def batch_update_cases(data: BatchUpdateSchema, db: Session = Depends(get_db)):
"""
Batch update multiple cases (maintainer and/or reviewers).
"""
try:
cases = db.query(TestCase).filter(TestCase.id.in_(data.caseIds)).all()
for c in cases:
if data.maintainer is not None:
c.maintainer = data.maintainer
if data.reviewers is not None:
c.reviewers = data.reviewers
db.commit()
return {"message": "success", "count": len(cases)}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
@app.put("/api/cases/{case_id}")
def update_case(case_id: str, case: TestCaseSchema, db: Session = Depends(get_db)):
print(f"DEBUG: Updating case {case_id} with data: {case.model_dump(exclude_unset=True)}")
db_case = db.query(models.TestCase).filter(models.TestCase.id == case_id).first()
if not db_case:
raise HTTPException(status_code=404, detail="Case not found")
if case.text is not None: db_case.text = case.text
if case.caseId is not None: db_case.case_id = case.caseId
if case.module is not None: db_case.module = case.module
if case.type is not None: db_case.type = case.type
if case.priority is not None: db_case.priority = case.priority
if case.reviewStatus is not None: db_case.review_status = case.reviewStatus
if case.executionStatus is not None: db_case.execution_status = case.executionStatus
if case.maintainer is not None: db_case.maintainer = case.maintainer
if case.requirementId is not None: db_case.requirement_id = case.requirementId
if case.bugId is not None: db_case.bug_id = case.bugId
if case.steps is not None: db_case.steps = case.steps
if case.tags is not None: db_case.tags = case.tags
if case.reviewers is not None: db_case.reviewers = case.reviewers
db.commit()
return {"message": "success"}
@app.delete("/api/cases/{case_id}")
def delete_case(case_id: str, db: Session = Depends(get_db)):
# Recursively collect all descendant IDs
def collect_ids(cid):
ids = [cid]
children = db.query(models.TestCase).filter(models.TestCase.parent_id == cid).all()
for child in children:
ids.extend(collect_ids(child.id))
return ids
all_ids = collect_ids(case_id)
db.query(models.TestCase).filter(models.TestCase.id.in_(all_ids)).delete(synchronize_session=False)
db.commit()
return {"message": "success", "deleted": len(all_ids)}
# ─────────────────────────────────────────────
# Test Plans
# ─────────────────────────────────────────────
@app.get("/api/plans")
def get_plans(space_id: str, db: Session = Depends(get_db)):
plans = db.query(models.TestPlan).filter(models.TestPlan.space_id == space_id).all()
return {"data": [
{
"id": p.id,
"name": p.name,
"type": p.type,
"caseIds": p.case_ids or [],
"assignees": p.assignees or [],
"createdAt": p.created_at,
} for p in plans
]}
@app.post("/api/plans")
def create_plan(plan: TestPlanSchema, space_id: str, db: Session = Depends(get_db)):
new_id = plan.id or str(uuid.uuid4())
db_plan = models.TestPlan(
id=new_id,
name=plan.name,
type=plan.type,
case_ids=plan.caseIds or [],
assignees=plan.assignees or [],
created_at=plan.createdAt or datetime.datetime.now().isoformat(),
space_id=space_id
)
db.add(db_plan)
db.commit()
# --- Feishu Notification ---
if plan.assignees:
try:
# Use global feishu instance initialized at the top
type_map = {"Requirement": "需求测试", "Regression": "回归测试", "Self-test": "开发自测", "Smoke": "冒烟测试"}
plan_type_cn = type_map.get(plan.type, plan.type)
# Mentions string
mentions = " ".join([f"<at id='{uid}'></at>" for uid in plan.assignees])
card = {
"config": {"wide_screen_mode": True},
"header": {
"title": {"tag": "plain_text", "content": "🚀 新测试计划已创建"},
"template": "blue"
},
"elements": [
{
"tag": "div",
"text": {"tag": "lark_md", "content": f"**计划名称**: {plan.name}\n**计划类型**: {plan_type_cn}\n**包含用例**: {len(plan.caseIds or [])}"}
},
{
"tag": "div",
"text": {"tag": "lark_md", "content": f"**执行人**: {mentions}"}
},
{
"tag": "hr"
},
{
"tag": "note",
"elements": [{"tag": "plain_text", "content": "请相关同学及时关注并开始执行。"}]
},
{
"tag": "action",
"actions": [
{
"tag": "button",
"text": {
"tag": "plain_text",
"content": "查看计划详情"
},
"type": "primary",
"url": f"http://localhost:5173/?view=execution&plan_id={new_id}"
}
]
}
]
}
# Send to each assignee as a private message or we could create a group.
# User said "机器人提醒", usually private message is safer if no group context.
for open_id in plan.assignees:
print(f"Sending Feishu card to {open_id}...")
feishu.send_card(open_id, card)
print(f"Successfully sent card to {open_id}")
except Exception as e:
print(f"❌ Failed to send Feishu notification: {e}")
import traceback
traceback.print_exc()
return {"message": "success", "id": new_id}
@app.put("/api/plans/{plan_id}")
def update_plan(plan_id: str, plan: TestPlanSchema, db: Session = Depends(get_db)):
db_plan = db.query(models.TestPlan).filter(models.TestPlan.id == plan_id).first()
if not db_plan:
raise HTTPException(status_code=404, detail="Plan not found")
if plan.name: db_plan.name = plan.name
if plan.type: db_plan.type = plan.type
if plan.caseIds is not None: db_plan.case_ids = plan.caseIds
if plan.assignees is not None: db_plan.assignees = plan.assignees
db.commit()
return {"message": "success"}
@app.post("/api/plans/{plan_id}/copy")
def copy_plan(plan_id: str, db: Session = Depends(get_db)):
source_plan = db.query(models.TestPlan).filter(models.TestPlan.id == plan_id).first()
if not source_plan:
raise HTTPException(status_code=404, detail="Plan not found")
new_id = f"p-copy-{str(uuid.uuid4())[:8]}"
db_plan = models.TestPlan(
id=new_id,
name=f"{source_plan.name} (副本)",
type=source_plan.type,
case_ids=source_plan.case_ids,
assignees=source_plan.assignees,
created_at=datetime.datetime.now().isoformat(),
space_id=source_plan.space_id
)
db.add(db_plan)
db.commit()
# Create associated task for the copy
new_task_id = str(uuid.uuid4())
db_task = models.TestTask(
id=new_task_id,
name=f"【执行任务】{db_plan.name}",
status="PENDING",
plan_id=new_id,
assignees=db_plan.assignees,
created_at=datetime.datetime.now().isoformat()
)
db.add(db_task)
db.commit()
return {"message": "success", "id": new_id}
@app.delete("/api/plans/{plan_id}")
def delete_plan(plan_id: str, db: Session = Depends(get_db)):
db_plan = db.query(models.TestPlan).filter(models.TestPlan.id == plan_id).first()
if db_plan:
db.delete(db_plan)
db.commit()
return {"message": "success"}
# ─────────────────────────────────────────────
# Test Tasks
# ─────────────────────────────────────────────
@app.get("/api/tasks")
def get_tasks(db: Session = Depends(get_db)):
tasks = db.query(models.TestTask).all()
return {"data": [
{
"id": t.id,
"name": t.name,
"status": t.status,
"planId": t.plan_id,
"assignees": t.assignees or [],
"createdAt": t.created_at,
} for t in tasks
]}
@app.post("/api/tasks")
def create_task(task: TaskSchema, db: Session = Depends(get_db)):
new_id = task.id or str(uuid.uuid4())
db_task = models.TestTask(
id=new_id,
name=task.name,
status=task.status,
plan_id=task.planId,
assignees=task.assignees or [],
created_at=datetime.datetime.now().isoformat(),
)
db.add(db_task)
db.commit()
return {"id": new_id}
@app.put("/api/tasks/{task_id}/status")
def update_task_status(task_id: str, body: TaskStatusUpdate, db: Session = Depends(get_db)):
db_task = db.query(models.TestTask).filter(models.TestTask.id == task_id).first()
if not db_task:
raise HTTPException(status_code=404, detail="Task not found")
db_task.status = body.status
db.commit()
return {"message": "success"}
# ─────────────────────────────────────────────
# Bugs
# ─────────────────────────────────────────────
@app.get("/api/bugs")
def get_bugs(db: Session = Depends(get_db)):
bugs = db.query(models.Bug).all()
return {"data": [
{"id": b.id, "title": b.title, "status": b.status, "caseId": b.case_id}
for b in bugs
]}
@app.post("/api/bugs")
def create_bug(bug: BugSchema, db: Session = Depends(get_db)):
new_id = bug.id or f"BUG-{str(uuid.uuid4())[:8].upper()}"
db_bug = models.Bug(id=new_id, title=bug.title, status=bug.status or "OPEN", case_id=bug.caseId)
db.add(db_bug)
db.commit()
return {"id": new_id}
@app.put("/api/bugs/{bug_id}")
def update_bug(bug_id: str, bug: BugSchema, db: Session = Depends(get_db)):
db_bug = db.query(models.Bug).filter(models.Bug.id == bug_id).first()
if not db_bug:
raise HTTPException(status_code=404, detail="Bug not found")
if bug.status: db_bug.status = bug.status
if bug.status: db_bug.status = bug.status
db.commit()
return {"message": "success"}
@app.delete("/api/bugs/{bug_id}")
def delete_bug(bug_id: str, db: Session = Depends(get_db)):
db_bug = db.query(models.Bug).filter(models.Bug.id == bug_id).first()
if db_bug:
db.delete(db_bug)
db.commit()
return {"message": "success"}
# ─────────────────────────────────────────────
# Spaces
# ─────────────────────────────────────────────
@app.get("/api/spaces")
def get_spaces(db: Session = Depends(get_db)):
spaces = db.query(models.Space).all()
return {"data": [{"id": s.id, "name": s.name} for s in spaces]}
@app.post("/api/spaces")
def create_space(space: SpaceSchema, db: Session = Depends(get_db)):
new_id = space.id or f"space-{str(uuid.uuid4())[:8]}"
db_space = models.Space(id=new_id, name=space.name)
db.add(db_space)
db.commit()
return {"id": new_id, "message": "success"}
@app.delete("/api/spaces/{space_id}")
def delete_space(space_id: str, db: Session = Depends(get_db)):
db_space = db.query(models.Space).filter(models.Space.id == space_id).first()
if db_space:
# 1. 删除该空间下的所有用例
db.query(models.TestCase).filter(models.TestCase.space_id == space_id).delete(synchronize_session=False)
# 2. 删除该空间下的所有测试计划
# 注意:如果有测试任务关联到这些计划,可能也需要处理。
# 这里先简单删除计划,因为任务没有 space_id 只有 plan_id
plans = db.query(models.TestPlan).filter(models.TestPlan.space_id == space_id).all()
plan_ids = [p.id for p in plans]
if plan_ids:
db.query(models.TestTask).filter(models.TestTask.plan_id.in_(plan_ids)).delete(synchronize_session=False)
db.query(models.TestPlan).filter(models.TestPlan.id.in_(plan_ids)).delete(synchronize_session=False)
# 3. 删除空间本身
db.delete(db_space)
db.commit()
return {"message": "success"}
# ─────────────────────────────────────────────
# Feishu Integration & Batch Review
# ─────────────────────────────────────────────
@app.post("/api/reviews/batch")
async def batch_review(data: BatchReviewSchema, db: Session = Depends(get_db)):
# 1. Fetch case details
cases = db.query(models.TestCase).filter(models.TestCase.id.in_(data.caseIds)).all()
if not cases:
raise HTTPException(status_code=404, detail="No cases found")
# 2. Collect participants: maintainer + per-case reviewers + initiator
participants = set()
participants.add(data.reviewerOpenId) # The person who triggered the review
for c in cases:
if c.maintainer and c.maintainer.startswith("ou_"):
participants.add(c.maintainer)
if c.reviewers:
for r in c.reviewers:
if r and r.startswith("ou_"):
participants.add(r)
participants_list = list(participants)
print(f"DEBUG: Batch review participants: {participants_list}")
# 3. Create Feishu Group
module_part = f"-{data.moduleName}" if data.moduleName else ""
group_name = f"【D-Case】用例评审{module_part}-{datetime.datetime.now().strftime('%m%d')}"
group_resp = feishu.create_group(
name=group_name,
description=f"针对 {len(cases)} 条用例的批量评审群",
user_ids=participants_list
)
print(f"DEBUG: Feishu create_group response: {group_resp}")
chat_id = group_resp.get("data", {}).get("chat_id")
if not chat_id:
# Fallback: if group creation fails (e.g. user not in app contact scope),
# try sending to the reviewer directly
chat_id = data.reviewerOpenId
# 4. Prepare & Send Card
card_content = {
"config": {"wide_screen_mode": True},
"header": {
"template": "blue",
"title": {"content": "📋 测试用例评审邀请", "tag": "plain_text"}
},
"elements": [
{
"tag": "div",
"text": {"content": f"**发起人:** <at id={data.reviewerOpenId}></at>\n**评审数量:** {len(cases)}\n**说明:** {data.message}", "tag": "lark_md"}
},
{"tag": "hr"},
{
"tag": "div",
"text": {"content": "**待评审列表:**", "tag": "lark_md"}
}
]
}
# Add first 5 cases to the card
for c in cases[:5]:
card_content["elements"].append({
"tag": "div",
"text": {"content": f"• [{c.priority}] {c.text}", "tag": "plain_text"}
})
if len(cases) > 5:
card_content["elements"].append({
"tag": "div",
"text": {"content": f"... 以及其他 {len(cases)-5} 条用例", "tag": "plain_text"}
})
card_content["elements"].append({
"tag": "action",
"actions": [
{
"tag": "button",
"text": {"content": "立即去评审", "tag": "plain_text"},
"type": "primary",
"url": "http://120.48.157.2:55335/static/login.html" # Should be the real URL in prod
}
]
})
send_resp = feishu.send_card(chat_id, card_content)
return {
"message": "success",
"chat_id": chat_id,
"feishu_response": send_resp
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)