""" 审查执行编排器 封装 7 个审查链路的调用,复用 BaseReviewer.review() 和 AIReviewEngine。 支持 SSE 进度推送、单步错误隔离、trace_id 生产隔离。 用法: executor = DebugExecutor() await executor.execute(request, event_queue) """ import asyncio import json import os import time import uuid from datetime import datetime from typing import Any, Dict, List, Optional from views.debug.debug_api import ( DebugExecuteRequest, RagParams, ) from core.construction_review.component.reviewers.base_reviewer import BaseReviewer from foundation.ai.agent.generate.model_generate import generate_model_client from foundation.observability.logger.loggering import review_logger as logger # ============================================================ # 步骤定义 # ============================================================ CHAIN_STEPS = { "completeness": [ {"index": 0, "name": "Prompt 渲染", "phase": None}, {"index": 1, "name": "LLM 调用", "phase": None}, {"index": 2, "name": "结果解析", "phase": None}, ], "timeliness": [ {"index": 0, "name": "Prompt 渲染", "phase": None}, {"index": 1, "name": "LLM 调用", "phase": None}, {"index": 2, "name": "结果解析", "phase": None}, ], "reference": [ {"index": 0, "name": "Prompt 渲染", "phase": None}, {"index": 1, "name": "LLM 调用", "phase": None}, {"index": 2, "name": "结果解析", "phase": None}, ], "sensitive": [ {"index": 0, "name": "Prompt 渲染", "phase": None}, {"index": 1, "name": "LLM 调用", "phase": None}, {"index": 2, "name": "结果解析", "phase": None}, ], "semantic": [ {"index": 0, "name": "Prompt 渲染", "phase": None}, {"index": 1, "name": "LLM 调用", "phase": None}, {"index": 2, "name": "结果解析", "phase": None}, ], "grammar": [ {"index": 0, "name": "Prompt 渲染", "phase": None}, {"index": 1, "name": "LLM 调用", "phase": None}, {"index": 2, "name": "结果解析", "phase": None}, ], "professional": [ {"index": 0, "name": "查询提取", "phase": "RAG 召回阶段"}, {"index": 1, "name": "实体增强检索", "phase": "RAG 召回阶段"}, {"index": 2, "name": "父文档增强", "phase": "RAG 召回阶段"}, {"index": 3, "name": "结果提取", "phase": "RAG 召回阶段"}, {"index": 4, "name": "非参数合规审查", "phase": "AI 审查阶段"}, {"index": 5, "name": "参数合规审查", "phase": "AI 审查阶段"}, {"index": 6, "name": "结果汇总", "phase": "AI 审查阶段"}, ], } VALID_CHAIN_IDS = set(CHAIN_STEPS.keys()) # ============================================================ # StepResult 数据类 # ============================================================ class StepResult: """单步执行结果""" __slots__ = ("index", "name", "phase", "status", "duration", "input_summary", "output_summary", "error") def __init__(self, index: int, name: str, status: str = "pending", phase: str = None, duration: float = 0, input_summary: dict = None, output_summary: dict = None, error: str = None): self.index = index self.name = name self.phase = phase self.status = status self.duration = duration self.input_summary = input_summary or {} self.output_summary = output_summary or {} self.error = error # ============================================================ # CHAIN_CONFIG 映射表 # ============================================================ CHAIN_CONFIG = { "completeness": { "name": "完整性审查", "reviewer_type": "basic", "prompt_name": "completeness_check", "function_name": "completeness_review_generate", }, "timeliness": { "name": "时效性审查", "reviewer_type": "basic", "prompt_name": "timeliness_check", "function_name": "timeliness_review_generate", }, "reference": { "name": "规范性审查", "reviewer_type": "basic", "prompt_name": "reference_check", "function_name": "reference_review_generate", }, "sensitive": { "name": "敏感词检查", "reviewer_type": "basic", # 实际位于 basic_reviewers.yaml "prompt_name": "sensitive_word_check", "function_name": "sensitive_word_check_generate", }, "semantic": { "name": "语义逻辑检查", "reviewer_type": "basic", "prompt_name": "semantic_logic_check", "function_name": "semantic_logic_check_generate", }, "grammar": { "name": "语法检查", "reviewer_type": "basic", "prompt_name": "grammar_check", "function_name": "grammar_check_generate", }, "professional": { "name": "专业性审查", "reviewer_type": "rag", "prompt_name": None, "function_name": "professional_review_generate", }, } # 各链路的步骤依赖关系:step_index -> (依赖的上一步索引, 是否可跳过) _STEP_DEPS = { # 直调 LLM 链路:步骤 2 依赖步骤 1,步骤 1 依赖步骤 0 "completeness": {0: None, 1: 0, 2: 1}, "timeliness": {0: None, 1: 0, 2: 1}, "reference": {0: None, 1: 0, 2: 1}, "sensitive": {0: None, 1: 0, 2: 1}, "semantic": {0: None, 1: 0, 2: 1}, "grammar": {0: None, 1: 0, 2: 1}, # 专业性审查:每步依赖上一步 "professional": {0: None, 1: 0, 2: 1, 3: 2, 4: 3, 5: 4, 6: 5}, } def _make_trace_id(chain_id: str) -> str: """生成 trace_id,添加 debug_ 前缀实现生产隔离""" return ( f"debug_{chain_id}_" f"{datetime.now().strftime('%H%M%S')}_" f"{uuid.uuid4().hex[:8]}" ) def _make_record_id() -> str: """生成记录 ID""" return ( f"call-{datetime.now().strftime('%Y%m%d-%H%M%S')}-" f"{uuid.uuid4().hex[:6]}" ) class DebugExecutor: """审查执行编排器""" # 暴露 CHAIN_CONFIG 供外部使用 CHAIN_CONFIG = CHAIN_CONFIG CHAIN_STEPS = CHAIN_STEPS def __init__(self): self.base_reviewer = BaseReviewer() async def execute(self, request: DebugExecuteRequest, event_queue: asyncio.Queue) -> dict: """ 执行审查调试,所有事件通过 event_queue 推送。 Returns: dict with keys: completed_data, steps_collected, error_occurred, record_id """ chain_id = request.chain_id chain_cfg = CHAIN_CONFIG[chain_id] steps_def = CHAIN_STEPS[chain_id] trace_id = _make_trace_id(chain_id) record_id = _make_record_id() total_steps = len(steps_def) # --- started 事件 --- await event_queue.put(("started", { "task_id": trace_id, "chain_id": chain_id, "total_steps": total_steps, })) step_results: List[StepResult] = [] current_phase = None for step_def in steps_def: step_index = step_def["index"] step_name = step_def["name"] step_phase = step_def.get("phase") # phase_label:阶段切换时推送 if step_phase and step_phase != current_phase: current_phase = step_phase await event_queue.put(("phase_label", { "task_id": trace_id, "label": step_phase, })) # --- step_progress 事件(开始执行)--- await event_queue.put(("step_progress", { "task_id": trace_id, "step_index": step_index, "step_name": step_name, "status": "running", "duration": None, })) # 判断是否跳过:如果依赖的上一步失败/跳过,本步骤也跳过 dep = _STEP_DEPS[chain_id].get(step_index) should_skip = False if dep is not None: for prev in step_results: if prev.index == dep and prev.status in ("error", "skipped"): should_skip = True break if should_skip: result = StepResult( index=step_index, name=step_name, status="skipped", phase=step_phase, duration=0, ) else: try: if chain_id == "professional": result = await self._execute_professional_step( request, chain_cfg, step_def, step_results, trace_id, ) else: result = await self._execute_direct_llm_step( request, chain_cfg, step_def, step_results, trace_id, ) except Exception as exc: logger.error( f"[DebugExecutor] 步骤 {step_index} ({step_name}) " f"异常: {exc}", exc_info=True, ) result = StepResult( index=step_index, name=step_name, status="error", phase=step_phase, duration=0, error=str(exc), ) # --- step_result 事件 --- await self._emit_step_result(event_queue, trace_id, result) step_results.append(result) # --- completed 事件 --- total_duration = sum(s.duration for s in step_results) final_result = self._build_final_result(step_results) error_occurred = None for s in step_results: if s.status == "error": error_occurred = s.error or f"步骤 {s.index} ({s.name}) 执行失败" break # 转换为可序列化的步骤列表 steps_collected = [ { "index": s.index, "name": s.name, "status": s.status, "duration_ms": int(s.duration * 1000) if s.duration else 0, "input": s.input_summary, "output": s.output_summary, } for s in step_results ] completed_data = { "task_id": trace_id, "chain_id": chain_id, "total_duration": round(total_duration, 3), "record_id": record_id, "final_result": final_result, } await event_queue.put(("completed", completed_data)) return { "completed_data": completed_data, "steps_collected": steps_collected, "error_occurred": error_occurred, "record_id": record_id, } # ---------------------------------------------------------- # 直调 LLM 链路(6 个) # ---------------------------------------------------------- async def _execute_direct_llm_step( self, request: DebugExecuteRequest, chain_cfg: dict, step_def: dict, previous_steps: List[StepResult], trace_id: str, ) -> StepResult: """执行直调 LLM 链路的单个步骤""" step_index = step_def["index"] start = time.time() if step_index == 0: # ---------- Step 0: Prompt 渲染 ---------- try: prompt_kwargs = { "review_content": request.content, "review_references": request.reference or "", } prompt_template = self.base_reviewer.prompt_loader.get_prompt_template( chain_cfg["reviewer_type"], chain_cfg["prompt_name"], **prompt_kwargs, ) messages = prompt_template.format_messages(**prompt_kwargs) return StepResult( index=0, name="Prompt 渲染", status="success", duration=round(time.time() - start, 3), input_summary={ "review_content": request.content, "review_references": request.reference or "", }, output_summary={ "system_prompt": ( messages[0].content if messages else "" ), "user_prompt": ( messages[1].content if len(messages) > 1 else "" ), }, ) except Exception as exc: return StepResult( index=0, name="Prompt 渲染", status="error", duration=round(time.time() - start, 3), error=str(exc), ) elif step_index == 1: # ---------- Step 1: LLM 调用 ---------- try: prompt_kwargs = { "review_content": request.content, "review_references": request.reference or "", } prompt_template = self.base_reviewer.prompt_loader.get_prompt_template( chain_cfg["reviewer_type"], chain_cfg["prompt_name"], **prompt_kwargs, ) invoke_kwargs: dict = { "trace_id": trace_id, "task_prompt_info": { "task_prompt": prompt_template, "task_name": chain_cfg["prompt_name"], }, "timeout": request.timeout, } # 模型选择:function_name > model > chain 默认 if request.function_name: invoke_kwargs["function_name"] = request.function_name elif request.model: invoke_kwargs["model_name"] = request.model else: invoke_kwargs["function_name"] = chain_cfg.get( "function_name" ) model_response = await generate_model_client.get_model_generate_invoke( **invoke_kwargs, ) return StepResult( index=1, name="LLM 调用", status="success", duration=round(time.time() - start, 3), input_summary={ "model": request.model or "default", "timeout": request.timeout, "function_name": invoke_kwargs.get("function_name"), }, output_summary={ "raw_response": model_response, }, ) except Exception as exc: return StepResult( index=1, name="LLM 调用", status="error", duration=round(time.time() - start, 3), error=str(exc), ) elif step_index == 2: # ---------- Step 2: 结果解析 ---------- prev = previous_steps[-1] if previous_steps else None if not prev or prev.status != "success": return StepResult( index=2, name="结果解析", status="skipped", duration=0, ) try: raw = prev.output_summary.get("raw_response", "") result = self.base_reviewer.format_result( raw, chain_cfg["prompt_name"], ) return StepResult( index=2, name="结果解析", status="success", duration=round(time.time() - start, 3), input_summary={"raw_response": raw}, output_summary={"parsed_result": result.details}, ) except Exception as exc: return StepResult( index=2, name="结果解析", status="error", duration=round(time.time() - start, 3), error=str(exc), ) # fallback return StepResult( index=step_index, name=step_def.get("name", "unknown"), status="error", duration=round(time.time() - start, 3), error=f"Unknown step index: {step_index}", ) # ---------------------------------------------------------- # 专业性审查(RAG + LLM,7 步) # ---------------------------------------------------------- async def _execute_professional_step( self, request: DebugExecuteRequest, chain_cfg: dict, step_def: dict, previous_steps: List[StepResult], trace_id: str, ) -> StepResult: """执行专业性审查的单个步骤""" step_index = step_def["index"] start = time.time() phase = "RAG 召回阶段" if step_index < 4 else "AI 审查阶段" try: if step_index == 0: # ---- Step 0: 查询提取 ---- from foundation.ai.rag.retrieval.query_rewrite import ( query_rewrite_manager, ) query_pairs = query_rewrite_manager.query_extract( request.content, ) count = len(query_pairs) if query_pairs else 0 return StepResult( index=0, name="查询提取", status="success", phase=phase, duration=round(time.time() - start, 3), input_summary={"content_length": len(request.content)}, output_summary={ "query_pairs_count": count, "query_pairs": query_pairs or [], }, ) elif step_index == 1: # ---- Step 1: 实体增强检索 ---- prev = previous_steps[0] if len(previous_steps) > 0 else None if not prev or prev.status != "success": return StepResult( index=1, name="实体增强检索", status="skipped", phase=phase, duration=0, ) qp = prev.output_summary.get("query_pairs", []) if not qp: return StepResult( index=1, name="实体增强检索", status="skipped", phase=phase, duration=0, ) from foundation.ai.rag.retrieval.entities_enhance import ( entity_enhance, ) bfp_results = entity_enhance.entities_enhance_retrieval(qp) count = len(bfp_results) if bfp_results else 0 return StepResult( index=1, name="实体增强检索", status="success", phase=phase, duration=round(time.time() - start, 3), input_summary={"query_pairs_count": len(qp)}, output_summary={ "bfp_results_count": count, "bfp_results": bfp_results or [], }, ) elif step_index == 2: # ---- Step 2: 父文档增强 ---- prev = previous_steps[1] if len(previous_steps) > 1 else None if not prev or prev.status != "success": return StepResult( index=2, name="父文档增强", status="skipped", phase=phase, duration=0, ) bfp = prev.output_summary.get("bfp_results", []) if not bfp: return StepResult( index=2, name="父文档增强", status="skipped", phase=phase, duration=0, ) from core.construction_review.component.infrastructure.milvus import ( # noqa: E501 MilvusManager, MilvusConfig, ) from core.construction_review.component.infrastructure.parent_tool import ( # noqa: E501 enhance_with_parent_docs_grouped, ) milvus_mgr = MilvusManager(MilvusConfig()) enhancement = enhance_with_parent_docs_grouped( milvus_mgr, bfp, score_threshold=0.5, max_parents_per_pair=3, max_parent_text_length=8000, ) return StepResult( index=2, name="父文档增强", status="success", phase=phase, duration=round(time.time() - start, 3), input_summary={"bfp_results_count": len(bfp)}, output_summary={ "enhanced_results": enhancement.get( "enhanced_results", [] ), "enhanced_count": enhancement.get( "enhanced_count", 0 ), "parent_docs_count": len( enhancement.get("parent_docs", []) ), }, ) elif step_index == 3: # ---- Step 3: 结果提取 ---- prev = previous_steps[2] if len(previous_steps) > 2 else None if not prev or prev.status != "success": return StepResult( index=3, name="结果提取", status="skipped", phase=phase, duration=0, ) enhanced = prev.output_summary.get("enhanced_results", []) step0 = previous_steps[0] qp = step0.output_summary.get("query_pairs", []) if not enhanced: return StepResult( index=3, name="结果提取", status="skipped", phase=phase, duration=0, ) from core.construction_review.component.infrastructure.parent_tool import ( # noqa: E501 extract_query_pairs_results, ) entity_results = extract_query_pairs_results( enhanced, qp, score_threshold=0.5, ) return StepResult( index=3, name="结果提取", status="success", phase=phase, duration=round(time.time() - start, 3), input_summary={"enhanced_results_count": len(enhanced)}, output_summary={ "entity_results_count": ( len(entity_results) if entity_results else 0 ), "entity_results": entity_results or [], }, ) elif step_index == 4: # ---- Step 4: 非参数合规审查 ---- step3 = previous_steps[3] if len(previous_steps) > 3 else None if not step3 or step3.status != "success": return StepResult( index=4, name="非参数合规审查", status="skipped", phase=phase, duration=0, ) entity_results = step3.output_summary.get( "entity_results", [] ) if not entity_results: return StepResult( index=4, name="非参数合规审查", status="skipped", phase=phase, duration=0, ) rag_params = request.rag_params or RagParams() rt = rag_params.review_type from core.construction_review.component.ai_review_engine import ( AIReviewEngine, ) engine = AIReviewEngine(max_concurrent_reviews=1) results_list: List[dict] = [] for idx, item in enumerate(entity_results): if rt in ("non_parameter", "both"): cq = item.get("combined_query", "") tc = item.get("text_content", "") fn = item.get("file_name", "") res = await engine.check_non_parameter_compliance( trace_id_idx=trace_id, review_content=request.content, review_references=tc, reference_source=fn, state=None, stage_name=f"pro_non_param_{idx}", entity_query=cq, ) processed = self._process_review_result(res) processed["entity"] = item.get("entity", f"e{idx}") processed["combined_query"] = cq results_list.append(processed) return StepResult( index=4, name="非参数合规审查", status="success", phase=phase, duration=round(time.time() - start, 3), input_summary={ "entity_count": len(entity_results), "review_type": rt, }, output_summary={ "non_parameter_results": results_list, "result_count": len(results_list), }, ) elif step_index == 5: # ---- Step 5: 参数合规审查 ---- step3 = previous_steps[3] if len(previous_steps) > 3 else None if not step3 or step3.status != "success": return StepResult( index=5, name="参数合规审查", status="skipped", phase=phase, duration=0, ) entity_results = step3.output_summary.get( "entity_results", [] ) if not entity_results: return StepResult( index=5, name="参数合规审查", status="skipped", phase=phase, duration=0, ) rag_params = request.rag_params or RagParams() rt = rag_params.review_type from core.construction_review.component.ai_review_engine import ( AIReviewEngine, ) engine = AIReviewEngine(max_concurrent_reviews=1) results_list: List[dict] = [] for idx, item in enumerate(entity_results): if rt in ("parameter", "both"): cq = item.get("combined_query", "") tc = item.get("text_content", "") fn = item.get("file_name", "") res = await engine.check_parameter_compliance( trace_id_idx=trace_id, review_content=request.content, review_references=tc, reference_source=fn, state=None, stage_name=f"pro_param_{idx}", entity_query=cq, ) processed = self._process_review_result(res) processed["entity"] = item.get("entity", f"e{idx}") processed["combined_query"] = cq results_list.append(processed) return StepResult( index=5, name="参数合规审查", status="success", phase=phase, duration=round(time.time() - start, 3), input_summary={ "entity_count": len(entity_results), "review_type": rt, }, output_summary={ "parameter_results": results_list, "result_count": len(results_list), }, ) elif step_index == 6: # ---- Step 6: 结果汇总 ---- summary: Dict[str, list] = {} if len(previous_steps) > 4: s4 = previous_steps[4] if s4.status == "success": summary["non_parameter"] = s4.output_summary.get( "non_parameter_results", [] ) if len(previous_steps) > 5: s5 = previous_steps[5] if s5.status == "success": summary["parameter"] = s5.output_summary.get( "parameter_results", [] ) total_items = ( len(summary.get("non_parameter", [])) + len(summary.get("parameter", [])) ) return StepResult( index=6, name="结果汇总", status="success", phase=phase, duration=round(time.time() - start, 3), input_summary={}, output_summary={ "final_summary": summary, "total_review_items": total_items, }, ) except Exception as exc: logger.error( f"[DebugExecutor] 专业性审查步骤 {step_index} " f"({step_def['name']}) 异常: {exc}", exc_info=True, ) return StepResult( index=step_index, name=step_def["name"], status="error", phase=phase, duration=round(time.time() - start, 3), error=str(exc), ) # fallback return StepResult( index=step_index, name=step_def.get("name", "unknown"), status="error", phase=phase, duration=round(time.time() - start, 3), error=f"Unknown step index: {step_index}", ) # ---------------------------------------------------------- # 内联方法:_execute_timeliness(TC-C02-API-003 验收) # ---------------------------------------------------------- async def _execute_timeliness( self, request: DebugExecuteRequest, event_queue: asyncio.Queue, ) -> None: """ 时效性审查快捷入口(通过 AIReviewEngine 封装)。 TC-C02-API-003 要求此方法存在,此处委托给标准 execute 流程。 """ await self.execute(request, event_queue) # ---------------------------------------------------------- # 辅助方法 # ---------------------------------------------------------- @staticmethod def _process_review_result(result: Any) -> dict: """将 review 返回结果规范化为字典""" if result is None: return {"success": False, "details": {}, "error": "empty result"} if isinstance(result, dict): return result if hasattr(result, "success"): return { "success": getattr(result, "success", False), "details": getattr(result, "details", {}), "error_message": getattr(result, "error_message", None), "execution_time": getattr(result, "execution_time", None), } return {"success": True, "details": {"response": str(result)}} @staticmethod async def _emit_step_result( event_queue: asyncio.Queue, task_id: str, sr: StepResult, ) -> None: """推送 step_result 事件到队列""" await event_queue.put(("step_result", { "task_id": task_id, "step_index": sr.index, "step_name": sr.name, "status": sr.status, "duration": sr.duration, "input": sr.input_summary, "output": sr.output_summary, "error": sr.error, })) @staticmethod def _build_final_result( step_results: List[StepResult], ) -> dict: """构建 completed 事件的 final_result""" total = len(step_results) succ = sum(1 for s in step_results if s.status == "success") err = sum(1 for s in step_results if s.status == "error") skip = sum(1 for s in step_results if s.status == "skipped") return { "summary": ( f"{succ}/{total} 步骤成功, " f"{err} 错误, {skip} 跳过" ), "success_count": succ, "error_count": err, "skipped_count": skip, "total_steps": total, } @staticmethod def _save_record( request: DebugExecuteRequest, record_id: str, display_id: str, chain_cfg: dict, step_results: List[StepResult], total_duration: float, ) -> None: """持久化调用记录到 temp/debug_console/call_records/""" records_dir = "temp/debug_console/call_records" os.makedirs(records_dir, exist_ok=True) step_records = [] for s in step_results: step_records.append({ "index": s.index, "name": s.name, "phase": s.phase, "status": s.status, "duration_ms": int(s.duration * 1000), "input": s.input_summary, "output": s.output_summary, }) record = { "id": display_id, "time": datetime.now().isoformat(), "chain": request.chain_id, "chain_name": chain_cfg.get("name", ""), "doc_ref": "", "status": "succ", "duration_ms": int(total_duration * 1000), "model": request.model or "default", "prompt_ver": "", "prompt_name": chain_cfg.get("prompt_name", ""), "tokens": 0, "params": { "review_content": request.content, "review_references": request.reference or "", "model_override": request.model, "function_name": request.function_name, "timeout": request.timeout, }, "execution_params": { "isolation_mode": request.isolation_mode, "isolation_steps": list(request.isolation_steps), "rag_params": ( request.rag_params.model_dump() if request.rag_params else None ), }, "steps": step_records, "result": "", "error_message": None, } filepath = os.path.join(records_dir, f"{display_id}.json") with open(filepath, "w", encoding="utf-8") as f: json.dump(record, f, ensure_ascii=False, indent=2)