dagent_eval/docs/基于Dagent平台的多模态问答集生成方案.md

19 KiB
Raw Permalink Blame History

基于 Dagent 平台的多模态问答集生成方案

目标: 利用 dagent 后端已有的知识库处理能力,生成包含图像信息的高质量问答集


一、Dagent 平台现有能力分析

1.1 核心能力

能力 实现位置 说明
HTML → Markdown 转换 pdf_service.py 调用 marker 服务 支持 PDF/DOCX/RST → MD
图片 OCR + 语义描述 pic_to_text.py 使用 GPT-4V 将图片转文本,存入数据库
Markdown 段落分割 split_markdown_filter.py 按标题层级分割段落
图片路径处理 md_service.py 相对路径 → BOS 绝对路径
向量索引存储 store_*_semantic_index.py 段落/问题/表格向量化
知识库检索 knowledge_md_retrieve_service.py 语义搜索

1.2 数据库结构OceanBase兼容 MySQL

连接信息:

Host:     120.48.66.228
Port:     23306
User:     dagent
Password: Fd1.Ej3.fdIie48
Database: dagent_platform

核心表:

knowledge_file — 原始文件元数据

id, org_id, file_md5, file_name, file_type, file_bytes, file_url, file_clean_status

knowledge_md_header_split — 段落分割结果(最重要)

id, org_id, file_id, file_name, headers
paragraph_context              -- 段落文本内容
paragraph_img_num              -- 段落内图片数量
paragraph_pic_semantics_context -- 图片 OCR + 语义描述GPT-4V 已处理)
paragraph_question             -- Dagent 已生成的段落问题
paragraph_summary              -- 段落摘要
paragraph_keywords             -- 关键词

knowledge_md_paragraph_active_context — 段落活跃上下文(含向量)

id, file_id, headers, active_context, active_context_vector

1.3 关键发现

Dagent 已经做了:

  • 209 个 HTML 文件 → 已转换为 Markdown
  • 1142 张图片 → 已上传 BOS已用 GPT-4V 生成语义描述
  • 段落按标题层级分割完毕
  • 每个段落已有 paragraph_questionparagraph_summaryparagraph_keywords

结论:不需要重新处理 HTML直接读数据库即可。


二、方案设计

2.1 整体流程

Dagent 数据库 (knowledge_md_header_split)
    ↓
提取段落数据
  - paragraph_context文本
  - paragraph_pic_semantics_context图片语义已有
  - paragraph_question种子问题已有
    ↓
┌─────────────────────────────────────┐
│  问答生成(三类)                    │
│  1. 纯文本问题(基于 paragraph_context│
│  2. 图文结合问题(文本 + 图片语义)   │
│  3. 扩展种子问题(基于已有问题扩展)  │
└─────────────────────────────────────┘
    ↓
存入 RAG Eval 数据库 (qa_gen_question)
    ↓
审核 → 导出 MD → 单跳召回测试

2.2 对比:从零处理 vs 利用 Dagent

维度 从零处理 HTML 利用 Dagent 数据库
开发工作量 2-3 周 3-5 天
图片 OCR 成本 1142 张 × $0.008 = $9 $0已完成
问答生成成本 $4 $4
数据可靠性 需验证 生产环境已验证
总成本 $13 + 2-3 周 $4 + 3-5 天

三、实现方案

3.1 后端:新增 Dagent 数据源支持

新增文件: server/api/qa_gen_dagent.py

"""
从 Dagent 数据库导入知识库数据,生成多模态问答集
"""
import asyncio
import json
import aiomysql
from fastapi import APIRouter, Form
from typing import Optional

from ..models.db import get_db, _now, _id

router = APIRouter(prefix="/api/qa-gen", tags=["问题生成-Dagent"])

DAGENT_DB = {
    "host": "120.48.66.228",
    "port": 23306,
    "user": "dagent",
    "password": "Fd1.Ej3.fdIie48",
    "db": "dagent_platform",
    "charset": "utf8mb4",
}


async def get_dagent_conn():
    return await aiomysql.connect(**DAGENT_DB)


@router.post("/task/from-dagent")
async def create_task_from_dagent(
    org_id: str = Form(...),
    name: str = Form(""),
    judge_config_id: str = Form(...),
    file_ids: str = Form(""),           # 逗号分隔的 file_id为空则全量
    questions_per_section: int = Form(5),
    quality_threshold: float = Form(0.6),
    include_multimodal: bool = Form(True),
):
    """从 Dagent 数据库创建问答生成任务"""
    task_id = _id()
    file_id_list = [f.strip() for f in file_ids.split(",") if f.strip()]

    async with get_db() as db:
        await db.execute(
            """INSERT INTO qa_gen_task
               (id,name,judge_config_id,questions_per_section,quality_threshold,status,created_at)
               VALUES (?,?,?,?,?,?,?)""",
            (task_id, name or f"Dagent导入({org_id[:8]}...)",
             judge_config_id, questions_per_section, quality_threshold, "pending", _now()),
        )
        await db.commit()

    asyncio.create_task(_run_dagent_task(
        task_id, org_id, file_id_list, judge_config_id,
        questions_per_section, quality_threshold, include_multimodal,
    ))
    return {"status": 0, "data": {"id": task_id}}


@router.get("/dagent/files")
async def list_dagent_files(org_id: str):
    """列出 Dagent 中某组织下已处理完成的文件"""
    conn = await get_dagent_conn()
    cursor = await conn.cursor(aiomysql.DictCursor)
    await cursor.execute(
        """SELECT id, file_name, file_type, file_clean_status,
                  file_bytes, create_time
           FROM knowledge_file
           WHERE org_id = %s AND delete_time IS NULL
           ORDER BY create_time DESC""",
        (org_id,),
    )
    rows = await cursor.fetchall()
    await cursor.close()
    conn.close()
    return {"status": 0, "data": [dict(r) for r in rows]}


@router.get("/dagent/stats")
async def get_dagent_stats(org_id: str):
    """获取 Dagent 知识库统计信息"""
    conn = await get_dagent_conn()
    cursor = await conn.cursor(aiomysql.DictCursor)
    await cursor.execute(
        """SELECT
               COUNT(DISTINCT f.id) as file_count,
               COUNT(h.id) as paragraph_count,
               SUM(h.paragraph_img_num) as total_images,
               SUM(CASE WHEN h.paragraph_pic_semantics_context IS NOT NULL
                        AND h.paragraph_img_num > 0 THEN 1 ELSE 0 END) as paragraphs_with_pic_text,
               SUM(CASE WHEN h.paragraph_question IS NOT NULL THEN 1 ELSE 0 END) as paragraphs_with_question
           FROM knowledge_file f
           LEFT JOIN knowledge_md_header_split h
               ON f.id = h.file_id AND h.delete_time IS NULL
           WHERE f.org_id = %s AND f.delete_time IS NULL
             AND f.file_clean_status = 'CLEAN_FINISH'""",
        (org_id,),
    )
    row = await cursor.fetchone()
    await cursor.close()
    conn.close()
    return {"status": 0, "data": dict(row) if row else {}}


# ── 内部:后台任务 ─────────────────────────────────────────────────────────────

async def _fetch_paragraphs(org_id: str, file_id_list: list[str]) -> list[dict]:
    """从 Dagent 数据库提取段落数据"""
    conn = await get_dagent_conn()
    cursor = await conn.cursor(aiomysql.DictCursor)

    sql = """
        SELECT h.id, h.file_id, h.file_name, h.headers,
               h.paragraph_context, h.paragraph_img_num,
               h.paragraph_pic_semantics_context,
               h.paragraph_question, h.paragraph_summary, h.paragraph_keywords
        FROM knowledge_md_header_split h
        JOIN knowledge_file f ON f.id = h.file_id
        WHERE h.org_id = %s
          AND h.delete_time IS NULL
          AND f.delete_time IS NULL
          AND f.file_clean_status = 'CLEAN_FINISH'
    """
    params = [org_id]

    if file_id_list:
        placeholders = ",".join(["%s"] * len(file_id_list))
        sql += f" AND h.file_id IN ({placeholders})"
        params.extend(file_id_list)

    sql += " ORDER BY h.file_name, h.headers"

    await cursor.execute(sql, params)
    rows = await cursor.fetchall()
    await cursor.close()
    conn.close()
    return [dict(r) for r in rows]


async def _generate_questions_for_paragraph(
    para: dict, cfg: dict, n: int, include_multimodal: bool
) -> list[dict]:
    """为单个段落生成问答"""
    import aiohttp, re

    base_url = cfg.get("base_url", "").rstrip("/")
    api_key = cfg.get("api_key", "")
    model = cfg.get("model", "gpt-4o-mini")

    text = (para.get("paragraph_context") or "").strip()
    pic_semantics = (para.get("paragraph_pic_semantics_context") or "").strip()
    seed_question = (para.get("paragraph_question") or "").strip()
    headers = (para.get("headers") or "").strip()
    has_image = bool(pic_semantics and para.get("paragraph_img_num", 0) > 0)

    if not text:
        return []

    # 构建 prompt
    pic_section = ""
    if has_image and include_multimodal:
        pic_section = f"""
**图片语义描述(图片已由 AI 识别):**
{pic_semantics[:800]}
"""

    seed_section = ""
    if seed_question:
        seed_section = f"\n**已有种子问题(请避免重复,可从不同角度扩展):** {seed_question}"

    prompt = f"""你是一个技术文档问答生成专家。基于以下内容生成 {n} 个测试问题。

**章节路径:** {headers}

**文本内容:**
{text[:2500]}
{pic_section}{seed_section}

**要求:**
1. 问题必须能从该章节内容直接回答
2. 覆盖关键知识点,避免过于简单的是非题
3. 如果有图片语义描述,至少生成 1 个图文结合的问题(问题中提及"如图所示"、"图中"等)
4. 答案准确长度适中1-3 句话)
5. source_chunk 为答案来源的原文片段50-150 字)
6. has_image 标记该问题是否依赖图像信息
7. quality_score 为质量评估0-1

只输出 JSON 数组:
[
  {{
    "question": "问题文本",
    "answer": "参考答案",
    "source_chunk": "答案来源原文片段",
    "has_image": false,
    "quality_score": 0.9
  }}
]"""

    headers_http = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {api_key}",
    }
    payload = {
        "model": model,
        "messages": [{"role": "user", "content": prompt}],
        "temperature": 0.3,
    }

    try:
        async with aiohttp.ClientSession(headers=headers_http) as session:
            async with session.post(
                f"{base_url}/chat/completions",
                json=payload,
                timeout=aiohttp.ClientTimeout(total=60),
            ) as resp:
                resp.raise_for_status()
                data = await resp.json()

        text_resp = data["choices"][0]["message"]["content"].strip()
        m = re.search(r"\[.*\]", text_resp, re.DOTALL)
        if not m:
            return []
        questions = json.loads(m.group())
        result = []
        for q in questions:
            if isinstance(q, dict) and q.get("question") and q.get("answer"):
                result.append({
                    "question": str(q["question"]).strip(),
                    "answer": str(q["answer"]).strip(),
                    "source_chunk": str(q.get("source_chunk", "")).strip(),
                    "has_image": bool(q.get("has_image", False)),
                    "quality_score": float(q.get("quality_score", 0.8)),
                    "source_image_desc": pic_semantics[:300] if q.get("has_image") else "",
                })
        return result
    except Exception as e:
        return []


async def _run_dagent_task(
    task_id: str,
    org_id: str,
    file_id_list: list[str],
    judge_config_id: str,
    questions_per_section: int,
    quality_threshold: float,
    include_multimodal: bool,
):
    try:
        # 1. 提取段落
        paragraphs = await _fetch_paragraphs(org_id, file_id_list)
        total = len(paragraphs)

        async with get_db() as db:
            await db.execute(
                "UPDATE qa_gen_task SET status='running', total=? WHERE id=?",
                (total, task_id),
            )
            await db.commit()

        # 2. 获取 LLM 配置
        async with get_db() as db:
            cfg_rows = await db.execute_fetchall(
                "SELECT * FROM judge_config WHERE id=?", (judge_config_id,)
            )
        if not cfg_rows:
            raise ValueError("judge_config not found")
        cfg = dict(cfg_rows[0])

        # 3. 并发生成(每次最多 5 个段落并发)
        sem = asyncio.Semaphore(5)
        done = 0
        FLUSH_SIZE = 10
        write_buf = []

        async def process_one(para: dict):
            nonlocal done
            async with sem:
                questions = await _generate_questions_for_paragraph(
                    para, cfg, questions_per_section, include_multimodal
                )
            done += 1
            write_buf.extend([(para, q) for q in questions])

            if len(write_buf) >= FLUSH_SIZE or done == total:
                batch = write_buf.copy()
                write_buf.clear()
                async with get_db() as db2:
                    for p, q in batch:
                        qid = _id()
                        status = "approved" if q["quality_score"] >= quality_threshold else "pending"
                        await db2.execute(
                            """INSERT INTO qa_gen_question
                               (id,task_id,section_path,question,reference_answer,source_chunk,
                                quality_score,status,created_at)
                               VALUES (?,?,?,?,?,?,?,?,?)""",
                            (qid, task_id, p["headers"],
                             q["question"], q["answer"], q["source_chunk"],
                             q["quality_score"], status, _now()),
                        )
                    # 同步 approved 计数
                    count_rows = await db2.execute_fetchall(
                        "SELECT COUNT(*) as cnt FROM qa_gen_question WHERE task_id=? AND status='approved'",
                        (task_id,),
                    )
                    approved = dict(count_rows[0])["cnt"] if count_rows else 0
                    await db2.execute(
                        "UPDATE qa_gen_task SET progress=?, approved=? WHERE id=?",
                        (done, approved, task_id),
                    )
                    await db2.commit()

        await asyncio.gather(*[process_one(p) for p in paragraphs])

        async with get_db() as db:
            await db.execute(
                "UPDATE qa_gen_task SET status='done', finished_at=? WHERE id=?",
                (_now(), task_id),
            )
            await db.commit()

    except Exception as exc:
        async with get_db() as db:
            await db.execute(
                "UPDATE qa_gen_task SET status='failed', error_message=? WHERE id=?",
                (str(exc), task_id),
            )
            await db.commit()

3.2 注册路由

server/main.py 中添加:

from .api import config, dataset, task, report, single_jump, qa_gen, qa_gen_dagent

app.include_router(qa_gen_dagent.router)

3.3 前端:新增"从 Dagent 导入"入口

QaGen/index.tsx 的新建任务弹窗中增加数据源切换:

// 数据源选择
<Form.Item label="数据来源">
  <Radio.Group value={dataSource} onChange={e => setDataSource(e.target.value)}>
    <Radio value="file">上传 MD 文件</Radio>
    <Radio value="dagent"> Dagent 知识库导入</Radio>
  </Radio.Group>
</Form.Item>

{dataSource === 'dagent' ? (
  <>
    <Form.Item name="org_id" label="Dagent 组织 ID" rules={[{ required: true }]}>
      <Input placeholder="cd6e121594984516..." />
    </Form.Item>
    <Form.Item name="file_ids" label="指定文件 ID可选"
      tooltip="留空则导入该组织下所有已处理完成的文件">
      <Input.TextArea rows={2} placeholder="多个 ID 用逗号分隔,留空则全量导入" />
    </Form.Item>
    <Form.Item name="include_multimodal" label="生成图文结合问题" valuePropName="checked"
      tooltip="利用 Dagent 已生成的图片语义描述,生成图文结合的问题">
      <Switch defaultChecked />
    </Form.Item>
    {/* 统计信息展示 */}
    {dagentStats && (
      <div style={{ background: '#f6ffed', border: '1px solid #b7eb8f', borderRadius: 6, padding: '8px 12px', marginBottom: 16 }}>
        <Space split={<Divider type="vertical" />}>
          <span>文件数: <b>{dagentStats.file_count}</b></span>
          <span>段落数: <b>{dagentStats.paragraph_count}</b></span>
          <span>含图段落: <b>{dagentStats.paragraphs_with_pic_text}</b></span>
          <span>总图片: <b>{dagentStats.total_images}</b></span>
        </Space>
      </div>
    )}
  </>
) : (
  // 原有的文件上传 UI
  <Form.Item label="知识库 MD 文件" required>
    <Upload ... />
  </Form.Item>
)}

四、验证步骤

Step 1先查询数据库确认数据完整性

-- 查看 EVB 知识库的文件列表
SELECT id, file_name, file_type, file_clean_status
FROM knowledge_file
WHERE org_id = 'cd6e121594984516bde17ae9aeb0eb45a01e6d28143034608c4985aea369deec'
  AND delete_time IS NULL
ORDER BY file_name;

-- 查看段落统计(含图片处理情况)
SELECT
    f.file_name,
    COUNT(h.id) as paragraphs,
    SUM(h.paragraph_img_num) as images,
    SUM(CASE WHEN h.paragraph_pic_semantics_context IS NOT NULL THEN 1 ELSE 0 END) as pic_text_done,
    SUM(CASE WHEN h.paragraph_question IS NOT NULL THEN 1 ELSE 0 END) as has_question
FROM knowledge_file f
JOIN knowledge_md_header_split h ON f.id = h.file_id AND h.delete_time IS NULL
WHERE f.org_id = 'cd6e121594984516bde17ae9aeb0eb45a01e6d28143034608c4985aea369deec'
  AND f.delete_time IS NULL
GROUP BY f.file_name
ORDER BY f.file_name;

Step 2抽样检查图片语义质量

-- 随机抽取 10 个有图片的段落,检查图片语义描述质量
SELECT headers, LEFT(paragraph_context, 200) as text_preview,
       LEFT(paragraph_pic_semantics_context, 300) as pic_text_preview
FROM knowledge_md_header_split
WHERE org_id = 'cd6e121594984516bde17ae9aeb0eb45a01e6d28143034608c4985aea369deec'
  AND paragraph_img_num > 0
  AND paragraph_pic_semantics_context IS NOT NULL
  AND delete_time IS NULL
ORDER BY RAND()
LIMIT 10;

Step 3小批量 Pilot 测试

先选 1 个文件(如 common_questions)做 Pilot生成 ~50 条问答,人工审核质量后再全量。


五、预期产出

模块 段落数 含图段落 预期问答数
linux_development ~500 ~200 ~2500 条
multimedia_development ~150 ~80 ~750 条
samples ~100 ~50 ~500 条
toolchain_development ~80 ~30 ~400 条
quick_start ~30 ~15 ~150 条
preface + common_questions ~20 ~5 ~100 条
合计 ~880 ~380 ~4400 条

其中多模态问题(图文结合)预计占 20-30%(约 880-1320 条)。


六、依赖安装

pip install aiomysql

(其他依赖 aiohttp、fastapi 等已有)