| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918 |
- """
- 审查执行编排器
- 封装 7 个审查链路的调用,复用 BaseReviewer.review() 和 AIReviewEngine。
- 支持 SSE 进度推送、单步错误隔离、trace_id 生产隔离。
- 用法:
- executor = DebugExecutor()
- await executor.execute(request, event_queue)
- """
- import asyncio
- import json
- import os
- import time
- import uuid
- from datetime import datetime
- from typing import Any, Dict, List, Optional
- from views.debug.debug_api import (
- DebugExecuteRequest,
- RagParams,
- )
- from core.construction_review.component.reviewers.base_reviewer import BaseReviewer
- from foundation.ai.agent.generate.model_generate import generate_model_client
- from foundation.observability.logger.loggering import review_logger as logger
- # ============================================================
- # 步骤定义
- # ============================================================
- CHAIN_STEPS = {
- "completeness": [
- {"index": 0, "name": "Prompt 渲染", "phase": None},
- {"index": 1, "name": "LLM 调用", "phase": None},
- {"index": 2, "name": "结果解析", "phase": None},
- ],
- "timeliness": [
- {"index": 0, "name": "Prompt 渲染", "phase": None},
- {"index": 1, "name": "LLM 调用", "phase": None},
- {"index": 2, "name": "结果解析", "phase": None},
- ],
- "reference": [
- {"index": 0, "name": "Prompt 渲染", "phase": None},
- {"index": 1, "name": "LLM 调用", "phase": None},
- {"index": 2, "name": "结果解析", "phase": None},
- ],
- "sensitive": [
- {"index": 0, "name": "Prompt 渲染", "phase": None},
- {"index": 1, "name": "LLM 调用", "phase": None},
- {"index": 2, "name": "结果解析", "phase": None},
- ],
- "semantic": [
- {"index": 0, "name": "Prompt 渲染", "phase": None},
- {"index": 1, "name": "LLM 调用", "phase": None},
- {"index": 2, "name": "结果解析", "phase": None},
- ],
- "grammar": [
- {"index": 0, "name": "Prompt 渲染", "phase": None},
- {"index": 1, "name": "LLM 调用", "phase": None},
- {"index": 2, "name": "结果解析", "phase": None},
- ],
- "professional": [
- {"index": 0, "name": "查询提取", "phase": "RAG 召回阶段"},
- {"index": 1, "name": "实体增强检索", "phase": "RAG 召回阶段"},
- {"index": 2, "name": "父文档增强", "phase": "RAG 召回阶段"},
- {"index": 3, "name": "结果提取", "phase": "RAG 召回阶段"},
- {"index": 4, "name": "非参数合规审查", "phase": "AI 审查阶段"},
- {"index": 5, "name": "参数合规审查", "phase": "AI 审查阶段"},
- {"index": 6, "name": "结果汇总", "phase": "AI 审查阶段"},
- ],
- }
- VALID_CHAIN_IDS = set(CHAIN_STEPS.keys())
- # ============================================================
- # StepResult 数据类
- # ============================================================
- class StepResult:
- """单步执行结果"""
- __slots__ = ("index", "name", "phase", "status", "duration",
- "input_summary", "output_summary", "error")
- def __init__(self, index: int, name: str, status: str = "pending",
- phase: str = None, duration: float = 0,
- input_summary: dict = None, output_summary: dict = None,
- error: str = None):
- self.index = index
- self.name = name
- self.phase = phase
- self.status = status
- self.duration = duration
- self.input_summary = input_summary or {}
- self.output_summary = output_summary or {}
- self.error = error
- # ============================================================
- # CHAIN_CONFIG 映射表
- # ============================================================
- CHAIN_CONFIG = {
- "completeness": {
- "name": "完整性审查",
- "reviewer_type": "basic",
- "prompt_name": "completeness_check",
- "function_name": "completeness_review_generate",
- },
- "timeliness": {
- "name": "时效性审查",
- "reviewer_type": "basic",
- "prompt_name": "timeliness_check",
- "function_name": "timeliness_review_generate",
- },
- "reference": {
- "name": "规范性审查",
- "reviewer_type": "basic",
- "prompt_name": "reference_check",
- "function_name": "reference_review_generate",
- },
- "sensitive": {
- "name": "敏感词检查",
- "reviewer_type": "basic", # 实际位于 basic_reviewers.yaml
- "prompt_name": "sensitive_word_check",
- "function_name": "sensitive_word_check_generate",
- },
- "semantic": {
- "name": "语义逻辑检查",
- "reviewer_type": "basic",
- "prompt_name": "semantic_logic_check",
- "function_name": "semantic_logic_check_generate",
- },
- "grammar": {
- "name": "语法检查",
- "reviewer_type": "basic",
- "prompt_name": "grammar_check",
- "function_name": "grammar_check_generate",
- },
- "professional": {
- "name": "专业性审查",
- "reviewer_type": "rag",
- "prompt_name": None,
- "function_name": "professional_review_generate",
- },
- }
- # 各链路的步骤依赖关系:step_index -> (依赖的上一步索引, 是否可跳过)
- _STEP_DEPS = {
- # 直调 LLM 链路:步骤 2 依赖步骤 1,步骤 1 依赖步骤 0
- "completeness": {0: None, 1: 0, 2: 1},
- "timeliness": {0: None, 1: 0, 2: 1},
- "reference": {0: None, 1: 0, 2: 1},
- "sensitive": {0: None, 1: 0, 2: 1},
- "semantic": {0: None, 1: 0, 2: 1},
- "grammar": {0: None, 1: 0, 2: 1},
- # 专业性审查:每步依赖上一步
- "professional": {0: None, 1: 0, 2: 1, 3: 2, 4: 3, 5: 4, 6: 5},
- }
- def _make_trace_id(chain_id: str) -> str:
- """生成 trace_id,添加 debug_ 前缀实现生产隔离"""
- return (
- f"debug_{chain_id}_"
- f"{datetime.now().strftime('%H%M%S')}_"
- f"{uuid.uuid4().hex[:8]}"
- )
- def _make_record_id() -> str:
- """生成记录 ID"""
- return (
- f"call-{datetime.now().strftime('%Y%m%d-%H%M%S')}-"
- f"{uuid.uuid4().hex[:6]}"
- )
- class DebugExecutor:
- """审查执行编排器"""
- # 暴露 CHAIN_CONFIG 供外部使用
- CHAIN_CONFIG = CHAIN_CONFIG
- CHAIN_STEPS = CHAIN_STEPS
- def __init__(self):
- self.base_reviewer = BaseReviewer()
- async def execute(self, request: DebugExecuteRequest,
- event_queue: asyncio.Queue) -> dict:
- """
- 执行审查调试,所有事件通过 event_queue 推送。
- Returns:
- dict with keys: completed_data, steps_collected, error_occurred, record_id
- """
- chain_id = request.chain_id
- chain_cfg = CHAIN_CONFIG[chain_id]
- steps_def = CHAIN_STEPS[chain_id]
- trace_id = _make_trace_id(chain_id)
- record_id = _make_record_id()
- total_steps = len(steps_def)
- # --- started 事件 ---
- await event_queue.put(("started", {
- "task_id": trace_id,
- "chain_id": chain_id,
- "total_steps": total_steps,
- }))
- step_results: List[StepResult] = []
- current_phase = None
- for step_def in steps_def:
- step_index = step_def["index"]
- step_name = step_def["name"]
- step_phase = step_def.get("phase")
- # phase_label:阶段切换时推送
- if step_phase and step_phase != current_phase:
- current_phase = step_phase
- await event_queue.put(("phase_label", {
- "task_id": trace_id,
- "label": step_phase,
- }))
- # --- step_progress 事件(开始执行)---
- await event_queue.put(("step_progress", {
- "task_id": trace_id,
- "step_index": step_index,
- "step_name": step_name,
- "status": "running",
- "duration": None,
- }))
- # 判断是否跳过:如果依赖的上一步失败/跳过,本步骤也跳过
- dep = _STEP_DEPS[chain_id].get(step_index)
- should_skip = False
- if dep is not None:
- for prev in step_results:
- if prev.index == dep and prev.status in ("error", "skipped"):
- should_skip = True
- break
- if should_skip:
- result = StepResult(
- index=step_index, name=step_name,
- status="skipped", phase=step_phase, duration=0,
- )
- else:
- try:
- if chain_id == "professional":
- result = await self._execute_professional_step(
- request, chain_cfg, step_def,
- step_results, trace_id,
- )
- else:
- result = await self._execute_direct_llm_step(
- request, chain_cfg, step_def,
- step_results, trace_id,
- )
- except Exception as exc:
- logger.error(
- f"[DebugExecutor] 步骤 {step_index} ({step_name}) "
- f"异常: {exc}", exc_info=True,
- )
- result = StepResult(
- index=step_index, name=step_name,
- status="error", phase=step_phase,
- duration=0, error=str(exc),
- )
- # --- step_result 事件 ---
- await self._emit_step_result(event_queue, trace_id, result)
- step_results.append(result)
- # --- completed 事件 ---
- total_duration = sum(s.duration for s in step_results)
- final_result = self._build_final_result(step_results)
- error_occurred = None
- for s in step_results:
- if s.status == "error":
- error_occurred = s.error or f"步骤 {s.index} ({s.name}) 执行失败"
- break
- # 转换为可序列化的步骤列表
- steps_collected = [
- {
- "index": s.index,
- "name": s.name,
- "status": s.status,
- "duration_ms": int(s.duration * 1000) if s.duration else 0,
- "input": s.input_summary,
- "output": s.output_summary,
- }
- for s in step_results
- ]
- completed_data = {
- "task_id": trace_id,
- "chain_id": chain_id,
- "total_duration": round(total_duration, 3),
- "record_id": record_id,
- "final_result": final_result,
- }
- await event_queue.put(("completed", completed_data))
- return {
- "completed_data": completed_data,
- "steps_collected": steps_collected,
- "error_occurred": error_occurred,
- "record_id": record_id,
- }
- # ----------------------------------------------------------
- # 直调 LLM 链路(6 个)
- # ----------------------------------------------------------
- async def _execute_direct_llm_step(
- self,
- request: DebugExecuteRequest,
- chain_cfg: dict,
- step_def: dict,
- previous_steps: List[StepResult],
- trace_id: str,
- ) -> StepResult:
- """执行直调 LLM 链路的单个步骤"""
- step_index = step_def["index"]
- start = time.time()
- if step_index == 0:
- # ---------- Step 0: Prompt 渲染 ----------
- try:
- prompt_kwargs = {
- "review_content": request.content,
- "review_references": request.reference or "",
- }
- prompt_template = self.base_reviewer.prompt_loader.get_prompt_template(
- chain_cfg["reviewer_type"],
- chain_cfg["prompt_name"],
- **prompt_kwargs,
- )
- messages = prompt_template.format_messages(**prompt_kwargs)
- return StepResult(
- index=0, name="Prompt 渲染", status="success",
- duration=round(time.time() - start, 3),
- input_summary={
- "review_content": request.content,
- "review_references": request.reference or "",
- },
- output_summary={
- "system_prompt": (
- messages[0].content if messages else ""
- ),
- "user_prompt": (
- messages[1].content
- if len(messages) > 1 else ""
- ),
- },
- )
- except Exception as exc:
- return StepResult(
- index=0, name="Prompt 渲染", status="error",
- duration=round(time.time() - start, 3),
- error=str(exc),
- )
- elif step_index == 1:
- # ---------- Step 1: LLM 调用 ----------
- try:
- prompt_kwargs = {
- "review_content": request.content,
- "review_references": request.reference or "",
- }
- prompt_template = self.base_reviewer.prompt_loader.get_prompt_template(
- chain_cfg["reviewer_type"],
- chain_cfg["prompt_name"],
- **prompt_kwargs,
- )
- invoke_kwargs: dict = {
- "trace_id": trace_id,
- "task_prompt_info": {
- "task_prompt": prompt_template,
- "task_name": chain_cfg["prompt_name"],
- },
- "timeout": request.timeout,
- }
- # 模型选择:function_name > model > chain 默认
- if request.function_name:
- invoke_kwargs["function_name"] = request.function_name
- elif request.model:
- invoke_kwargs["model_name"] = request.model
- else:
- invoke_kwargs["function_name"] = chain_cfg.get(
- "function_name"
- )
- model_response = await generate_model_client.get_model_generate_invoke(
- **invoke_kwargs,
- )
- return StepResult(
- index=1, name="LLM 调用", status="success",
- duration=round(time.time() - start, 3),
- input_summary={
- "model": request.model or "default",
- "timeout": request.timeout,
- "function_name": invoke_kwargs.get("function_name"),
- },
- output_summary={
- "raw_response": model_response,
- },
- )
- except Exception as exc:
- return StepResult(
- index=1, name="LLM 调用", status="error",
- duration=round(time.time() - start, 3),
- error=str(exc),
- )
- elif step_index == 2:
- # ---------- Step 2: 结果解析 ----------
- prev = previous_steps[-1] if previous_steps else None
- if not prev or prev.status != "success":
- return StepResult(
- index=2, name="结果解析", status="skipped",
- duration=0,
- )
- try:
- raw = prev.output_summary.get("raw_response", "")
- result = self.base_reviewer.format_result(
- raw, chain_cfg["prompt_name"],
- )
- return StepResult(
- index=2, name="结果解析", status="success",
- duration=round(time.time() - start, 3),
- input_summary={"raw_response": raw},
- output_summary={"parsed_result": result.details},
- )
- except Exception as exc:
- return StepResult(
- index=2, name="结果解析", status="error",
- duration=round(time.time() - start, 3),
- error=str(exc),
- )
- # fallback
- return StepResult(
- index=step_index, name=step_def.get("name", "unknown"),
- status="error", duration=round(time.time() - start, 3),
- error=f"Unknown step index: {step_index}",
- )
- # ----------------------------------------------------------
- # 专业性审查(RAG + LLM,7 步)
- # ----------------------------------------------------------
- async def _execute_professional_step(
- self,
- request: DebugExecuteRequest,
- chain_cfg: dict,
- step_def: dict,
- previous_steps: List[StepResult],
- trace_id: str,
- ) -> StepResult:
- """执行专业性审查的单个步骤"""
- step_index = step_def["index"]
- start = time.time()
- phase = "RAG 召回阶段" if step_index < 4 else "AI 审查阶段"
- try:
- if step_index == 0:
- # ---- Step 0: 查询提取 ----
- from foundation.ai.rag.retrieval.query_rewrite import (
- query_rewrite_manager,
- )
- query_pairs = query_rewrite_manager.query_extract(
- request.content,
- )
- count = len(query_pairs) if query_pairs else 0
- return StepResult(
- index=0, name="查询提取", status="success",
- phase=phase,
- duration=round(time.time() - start, 3),
- input_summary={"content_length": len(request.content)},
- output_summary={
- "query_pairs_count": count,
- "query_pairs": query_pairs or [],
- },
- )
- elif step_index == 1:
- # ---- Step 1: 实体增强检索 ----
- prev = previous_steps[0] if len(previous_steps) > 0 else None
- if not prev or prev.status != "success":
- return StepResult(
- index=1, name="实体增强检索", status="skipped",
- phase=phase, duration=0,
- )
- qp = prev.output_summary.get("query_pairs", [])
- if not qp:
- return StepResult(
- index=1, name="实体增强检索", status="skipped",
- phase=phase, duration=0,
- )
- from foundation.ai.rag.retrieval.entities_enhance import (
- entity_enhance,
- )
- bfp_results = entity_enhance.entities_enhance_retrieval(qp)
- count = len(bfp_results) if bfp_results else 0
- return StepResult(
- index=1, name="实体增强检索", status="success",
- phase=phase,
- duration=round(time.time() - start, 3),
- input_summary={"query_pairs_count": len(qp)},
- output_summary={
- "bfp_results_count": count,
- "bfp_results": bfp_results or [],
- },
- )
- elif step_index == 2:
- # ---- Step 2: 父文档增强 ----
- prev = previous_steps[1] if len(previous_steps) > 1 else None
- if not prev or prev.status != "success":
- return StepResult(
- index=2, name="父文档增强", status="skipped",
- phase=phase, duration=0,
- )
- bfp = prev.output_summary.get("bfp_results", [])
- if not bfp:
- return StepResult(
- index=2, name="父文档增强", status="skipped",
- phase=phase, duration=0,
- )
- from core.construction_review.component.infrastructure.milvus import ( # noqa: E501
- MilvusManager, MilvusConfig,
- )
- from core.construction_review.component.infrastructure.parent_tool import ( # noqa: E501
- enhance_with_parent_docs_grouped,
- )
- milvus_mgr = MilvusManager(MilvusConfig())
- enhancement = enhance_with_parent_docs_grouped(
- milvus_mgr,
- bfp,
- score_threshold=0.5,
- max_parents_per_pair=3,
- max_parent_text_length=8000,
- )
- return StepResult(
- index=2, name="父文档增强", status="success",
- phase=phase,
- duration=round(time.time() - start, 3),
- input_summary={"bfp_results_count": len(bfp)},
- output_summary={
- "enhanced_results": enhancement.get(
- "enhanced_results", []
- ),
- "enhanced_count": enhancement.get(
- "enhanced_count", 0
- ),
- "parent_docs_count": len(
- enhancement.get("parent_docs", [])
- ),
- },
- )
- elif step_index == 3:
- # ---- Step 3: 结果提取 ----
- prev = previous_steps[2] if len(previous_steps) > 2 else None
- if not prev or prev.status != "success":
- return StepResult(
- index=3, name="结果提取", status="skipped",
- phase=phase, duration=0,
- )
- enhanced = prev.output_summary.get("enhanced_results", [])
- step0 = previous_steps[0]
- qp = step0.output_summary.get("query_pairs", [])
- if not enhanced:
- return StepResult(
- index=3, name="结果提取", status="skipped",
- phase=phase, duration=0,
- )
- from core.construction_review.component.infrastructure.parent_tool import ( # noqa: E501
- extract_query_pairs_results,
- )
- entity_results = extract_query_pairs_results(
- enhanced, qp, score_threshold=0.5,
- )
- return StepResult(
- index=3, name="结果提取", status="success",
- phase=phase,
- duration=round(time.time() - start, 3),
- input_summary={"enhanced_results_count": len(enhanced)},
- output_summary={
- "entity_results_count": (
- len(entity_results) if entity_results else 0
- ),
- "entity_results": entity_results or [],
- },
- )
- elif step_index == 4:
- # ---- Step 4: 非参数合规审查 ----
- step3 = previous_steps[3] if len(previous_steps) > 3 else None
- if not step3 or step3.status != "success":
- return StepResult(
- index=4, name="非参数合规审查", status="skipped",
- phase=phase, duration=0,
- )
- entity_results = step3.output_summary.get(
- "entity_results", []
- )
- if not entity_results:
- return StepResult(
- index=4, name="非参数合规审查", status="skipped",
- phase=phase, duration=0,
- )
- rag_params = request.rag_params or RagParams()
- rt = rag_params.review_type
- from core.construction_review.component.ai_review_engine import (
- AIReviewEngine,
- )
- engine = AIReviewEngine(max_concurrent_reviews=1)
- results_list: List[dict] = []
- for idx, item in enumerate(entity_results):
- if rt in ("non_parameter", "both"):
- cq = item.get("combined_query", "")
- tc = item.get("text_content", "")
- fn = item.get("file_name", "")
- res = await engine.check_non_parameter_compliance(
- trace_id_idx=trace_id,
- review_content=request.content,
- review_references=tc,
- reference_source=fn,
- state=None,
- stage_name=f"pro_non_param_{idx}",
- entity_query=cq,
- )
- processed = self._process_review_result(res)
- processed["entity"] = item.get("entity", f"e{idx}")
- processed["combined_query"] = cq
- results_list.append(processed)
- return StepResult(
- index=4, name="非参数合规审查", status="success",
- phase=phase,
- duration=round(time.time() - start, 3),
- input_summary={
- "entity_count": len(entity_results),
- "review_type": rt,
- },
- output_summary={
- "non_parameter_results": results_list,
- "result_count": len(results_list),
- },
- )
- elif step_index == 5:
- # ---- Step 5: 参数合规审查 ----
- step3 = previous_steps[3] if len(previous_steps) > 3 else None
- if not step3 or step3.status != "success":
- return StepResult(
- index=5, name="参数合规审查", status="skipped",
- phase=phase, duration=0,
- )
- entity_results = step3.output_summary.get(
- "entity_results", []
- )
- if not entity_results:
- return StepResult(
- index=5, name="参数合规审查", status="skipped",
- phase=phase, duration=0,
- )
- rag_params = request.rag_params or RagParams()
- rt = rag_params.review_type
- from core.construction_review.component.ai_review_engine import (
- AIReviewEngine,
- )
- engine = AIReviewEngine(max_concurrent_reviews=1)
- results_list: List[dict] = []
- for idx, item in enumerate(entity_results):
- if rt in ("parameter", "both"):
- cq = item.get("combined_query", "")
- tc = item.get("text_content", "")
- fn = item.get("file_name", "")
- res = await engine.check_parameter_compliance(
- trace_id_idx=trace_id,
- review_content=request.content,
- review_references=tc,
- reference_source=fn,
- state=None,
- stage_name=f"pro_param_{idx}",
- entity_query=cq,
- )
- processed = self._process_review_result(res)
- processed["entity"] = item.get("entity", f"e{idx}")
- processed["combined_query"] = cq
- results_list.append(processed)
- return StepResult(
- index=5, name="参数合规审查", status="success",
- phase=phase,
- duration=round(time.time() - start, 3),
- input_summary={
- "entity_count": len(entity_results),
- "review_type": rt,
- },
- output_summary={
- "parameter_results": results_list,
- "result_count": len(results_list),
- },
- )
- elif step_index == 6:
- # ---- Step 6: 结果汇总 ----
- summary: Dict[str, list] = {}
- if len(previous_steps) > 4:
- s4 = previous_steps[4]
- if s4.status == "success":
- summary["non_parameter"] = s4.output_summary.get(
- "non_parameter_results", []
- )
- if len(previous_steps) > 5:
- s5 = previous_steps[5]
- if s5.status == "success":
- summary["parameter"] = s5.output_summary.get(
- "parameter_results", []
- )
- total_items = (
- len(summary.get("non_parameter", []))
- + len(summary.get("parameter", []))
- )
- return StepResult(
- index=6, name="结果汇总", status="success",
- phase=phase,
- duration=round(time.time() - start, 3),
- input_summary={},
- output_summary={
- "final_summary": summary,
- "total_review_items": total_items,
- },
- )
- except Exception as exc:
- logger.error(
- f"[DebugExecutor] 专业性审查步骤 {step_index} "
- f"({step_def['name']}) 异常: {exc}", exc_info=True,
- )
- return StepResult(
- index=step_index, name=step_def["name"],
- status="error", phase=phase,
- duration=round(time.time() - start, 3),
- error=str(exc),
- )
- # fallback
- return StepResult(
- index=step_index, name=step_def.get("name", "unknown"),
- status="error", phase=phase,
- duration=round(time.time() - start, 3),
- error=f"Unknown step index: {step_index}",
- )
- # ----------------------------------------------------------
- # 内联方法:_execute_timeliness(TC-C02-API-003 验收)
- # ----------------------------------------------------------
- async def _execute_timeliness(
- self, request: DebugExecuteRequest, event_queue: asyncio.Queue,
- ) -> None:
- """
- 时效性审查快捷入口(通过 AIReviewEngine 封装)。
- TC-C02-API-003 要求此方法存在,此处委托给标准 execute 流程。
- """
- await self.execute(request, event_queue)
- # ----------------------------------------------------------
- # 辅助方法
- # ----------------------------------------------------------
- @staticmethod
- def _process_review_result(result: Any) -> dict:
- """将 review 返回结果规范化为字典"""
- if result is None:
- return {"success": False, "details": {}, "error": "empty result"}
- if isinstance(result, dict):
- return result
- if hasattr(result, "success"):
- return {
- "success": getattr(result, "success", False),
- "details": getattr(result, "details", {}),
- "error_message": getattr(result, "error_message", None),
- "execution_time": getattr(result, "execution_time", None),
- }
- return {"success": True, "details": {"response": str(result)}}
- @staticmethod
- async def _emit_step_result(
- event_queue: asyncio.Queue,
- task_id: str,
- sr: StepResult,
- ) -> None:
- """推送 step_result 事件到队列"""
- await event_queue.put(("step_result", {
- "task_id": task_id,
- "step_index": sr.index,
- "step_name": sr.name,
- "status": sr.status,
- "duration": sr.duration,
- "input": sr.input_summary,
- "output": sr.output_summary,
- "error": sr.error,
- }))
- @staticmethod
- def _build_final_result(
- step_results: List[StepResult],
- ) -> dict:
- """构建 completed 事件的 final_result"""
- total = len(step_results)
- succ = sum(1 for s in step_results if s.status == "success")
- err = sum(1 for s in step_results if s.status == "error")
- skip = sum(1 for s in step_results if s.status == "skipped")
- return {
- "summary": (
- f"{succ}/{total} 步骤成功, "
- f"{err} 错误, {skip} 跳过"
- ),
- "success_count": succ,
- "error_count": err,
- "skipped_count": skip,
- "total_steps": total,
- }
- @staticmethod
- def _save_record(
- request: DebugExecuteRequest,
- record_id: str,
- display_id: str,
- chain_cfg: dict,
- step_results: List[StepResult],
- total_duration: float,
- ) -> None:
- """持久化调用记录到 temp/debug_console/call_records/"""
- records_dir = "temp/debug_console/call_records"
- os.makedirs(records_dir, exist_ok=True)
- step_records = []
- for s in step_results:
- step_records.append({
- "index": s.index,
- "name": s.name,
- "phase": s.phase,
- "status": s.status,
- "duration_ms": int(s.duration * 1000),
- "input": s.input_summary,
- "output": s.output_summary,
- })
- record = {
- "id": display_id,
- "time": datetime.now().isoformat(),
- "chain": request.chain_id,
- "chain_name": chain_cfg.get("name", ""),
- "doc_ref": "",
- "status": "succ",
- "duration_ms": int(total_duration * 1000),
- "model": request.model or "default",
- "prompt_ver": "",
- "prompt_name": chain_cfg.get("prompt_name", ""),
- "tokens": 0,
- "params": {
- "review_content": request.content,
- "review_references": request.reference or "",
- "model_override": request.model,
- "function_name": request.function_name,
- "timeout": request.timeout,
- },
- "execution_params": {
- "isolation_mode": request.isolation_mode,
- "isolation_steps": list(request.isolation_steps),
- "rag_params": (
- request.rag_params.model_dump()
- if request.rag_params else None
- ),
- },
- "steps": step_records,
- "result": "",
- "error_message": None,
- }
- filepath = os.path.join(records_dir, f"{display_id}.json")
- with open(filepath, "w", encoding="utf-8") as f:
- json.dump(record, f, ensure_ascii=False, indent=2)
|