executor.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899
  1. """
  2. 审查执行编排器
  3. 封装 7 个审查链路的调用,复用 BaseReviewer.review() 和 AIReviewEngine。
  4. 支持 SSE 进度推送、单步错误隔离、trace_id 生产隔离。
  5. 用法:
  6. executor = DebugExecutor()
  7. await executor.execute(request, event_queue)
  8. """
  9. import asyncio
  10. import json
  11. import os
  12. import time
  13. import uuid
  14. from datetime import datetime
  15. from typing import Any, Dict, List, Optional
  16. from views.debug.debug_api import (
  17. DebugExecuteRequest,
  18. RagParams,
  19. )
  20. from core.construction_review.component.reviewers.base_reviewer import BaseReviewer
  21. from foundation.ai.agent.generate.model_generate import generate_model_client
  22. from foundation.observability.logger.loggering import review_logger as logger
  23. # ============================================================
  24. # 步骤定义
  25. # ============================================================
  26. CHAIN_STEPS = {
  27. "completeness": [
  28. {"index": 0, "name": "Prompt 渲染", "phase": None},
  29. {"index": 1, "name": "LLM 调用", "phase": None},
  30. {"index": 2, "name": "结果解析", "phase": None},
  31. ],
  32. "timeliness": [
  33. {"index": 0, "name": "Prompt 渲染", "phase": None},
  34. {"index": 1, "name": "LLM 调用", "phase": None},
  35. {"index": 2, "name": "结果解析", "phase": None},
  36. ],
  37. "reference": [
  38. {"index": 0, "name": "Prompt 渲染", "phase": None},
  39. {"index": 1, "name": "LLM 调用", "phase": None},
  40. {"index": 2, "name": "结果解析", "phase": None},
  41. ],
  42. "sensitive": [
  43. {"index": 0, "name": "Prompt 渲染", "phase": None},
  44. {"index": 1, "name": "LLM 调用", "phase": None},
  45. {"index": 2, "name": "结果解析", "phase": None},
  46. ],
  47. "semantic": [
  48. {"index": 0, "name": "Prompt 渲染", "phase": None},
  49. {"index": 1, "name": "LLM 调用", "phase": None},
  50. {"index": 2, "name": "结果解析", "phase": None},
  51. ],
  52. "grammar": [
  53. {"index": 0, "name": "Prompt 渲染", "phase": None},
  54. {"index": 1, "name": "LLM 调用", "phase": None},
  55. {"index": 2, "name": "结果解析", "phase": None},
  56. ],
  57. "professional": [
  58. {"index": 0, "name": "查询提取", "phase": "RAG 召回阶段"},
  59. {"index": 1, "name": "实体增强检索", "phase": "RAG 召回阶段"},
  60. {"index": 2, "name": "父文档增强", "phase": "RAG 召回阶段"},
  61. {"index": 3, "name": "结果提取", "phase": "RAG 召回阶段"},
  62. {"index": 4, "name": "非参数合规审查", "phase": "AI 审查阶段"},
  63. {"index": 5, "name": "参数合规审查", "phase": "AI 审查阶段"},
  64. {"index": 6, "name": "结果汇总", "phase": "AI 审查阶段"},
  65. ],
  66. }
  67. VALID_CHAIN_IDS = set(CHAIN_STEPS.keys())
  68. # ============================================================
  69. # StepResult 数据类
  70. # ============================================================
  71. class StepResult:
  72. """单步执行结果"""
  73. __slots__ = ("index", "name", "phase", "status", "duration",
  74. "input_summary", "output_summary", "error")
  75. def __init__(self, index: int, name: str, status: str = "pending",
  76. phase: str = None, duration: float = 0,
  77. input_summary: dict = None, output_summary: dict = None,
  78. error: str = None):
  79. self.index = index
  80. self.name = name
  81. self.phase = phase
  82. self.status = status
  83. self.duration = duration
  84. self.input_summary = input_summary or {}
  85. self.output_summary = output_summary or {}
  86. self.error = error
  87. # ============================================================
  88. # CHAIN_CONFIG 映射表
  89. # ============================================================
  90. CHAIN_CONFIG = {
  91. "completeness": {
  92. "name": "完整性审查",
  93. "reviewer_type": "basic",
  94. "prompt_name": "completeness_check",
  95. "function_name": "completeness_review_generate",
  96. },
  97. "timeliness": {
  98. "name": "时效性审查",
  99. "reviewer_type": "basic",
  100. "prompt_name": "timeliness_check",
  101. "function_name": "timeliness_review_generate",
  102. },
  103. "reference": {
  104. "name": "规范性审查",
  105. "reviewer_type": "basic",
  106. "prompt_name": "reference_check",
  107. "function_name": "reference_review_generate",
  108. },
  109. "sensitive": {
  110. "name": "敏感词检查",
  111. "reviewer_type": "basic", # 实际位于 basic_reviewers.yaml
  112. "prompt_name": "sensitive_word_check",
  113. "function_name": "sensitive_word_check_generate",
  114. },
  115. "semantic": {
  116. "name": "语义逻辑检查",
  117. "reviewer_type": "basic",
  118. "prompt_name": "semantic_logic_check",
  119. "function_name": "semantic_logic_check_generate",
  120. },
  121. "grammar": {
  122. "name": "语法检查",
  123. "reviewer_type": "basic",
  124. "prompt_name": "grammar_check",
  125. "function_name": "grammar_check_generate",
  126. },
  127. "professional": {
  128. "name": "专业性审查",
  129. "reviewer_type": "rag",
  130. "prompt_name": None,
  131. "function_name": "professional_review_generate",
  132. },
  133. }
  134. # 各链路的步骤依赖关系:step_index -> (依赖的上一步索引, 是否可跳过)
  135. _STEP_DEPS = {
  136. # 直调 LLM 链路:步骤 2 依赖步骤 1,步骤 1 依赖步骤 0
  137. "completeness": {0: None, 1: 0, 2: 1},
  138. "timeliness": {0: None, 1: 0, 2: 1},
  139. "reference": {0: None, 1: 0, 2: 1},
  140. "sensitive": {0: None, 1: 0, 2: 1},
  141. "semantic": {0: None, 1: 0, 2: 1},
  142. "grammar": {0: None, 1: 0, 2: 1},
  143. # 专业性审查:每步依赖上一步
  144. "professional": {0: None, 1: 0, 2: 1, 3: 2, 4: 3, 5: 4, 6: 5},
  145. }
  146. def _make_trace_id(chain_id: str) -> str:
  147. """生成 trace_id,添加 debug_ 前缀实现生产隔离"""
  148. return (
  149. f"debug_{chain_id}_"
  150. f"{datetime.now().strftime('%H%M%S')}_"
  151. f"{uuid.uuid4().hex[:8]}"
  152. )
  153. def _make_record_id() -> str:
  154. """生成记录 ID"""
  155. return (
  156. f"call-{datetime.now().strftime('%Y%m%d-%H%M%S')}-"
  157. f"{uuid.uuid4().hex[:6]}"
  158. )
  159. class DebugExecutor:
  160. """审查执行编排器"""
  161. # 暴露 CHAIN_CONFIG 供外部使用
  162. CHAIN_CONFIG = CHAIN_CONFIG
  163. CHAIN_STEPS = CHAIN_STEPS
  164. def __init__(self):
  165. self.base_reviewer = BaseReviewer()
  166. async def execute(self, request: DebugExecuteRequest,
  167. event_queue: asyncio.Queue) -> None:
  168. """
  169. 执行审查调试,所有事件通过 event_queue 推送。
  170. Args:
  171. request: 调试执行请求
  172. event_queue: asyncio.Queue,消费者消费后格式化为 SSE
  173. """
  174. chain_id = request.chain_id
  175. chain_cfg = CHAIN_CONFIG[chain_id]
  176. steps_def = CHAIN_STEPS[chain_id]
  177. trace_id = _make_trace_id(chain_id)
  178. record_id = _make_record_id()
  179. total_steps = len(steps_def)
  180. # --- started 事件 ---
  181. await event_queue.put(("started", {
  182. "task_id": trace_id,
  183. "chain_id": chain_id,
  184. "total_steps": total_steps,
  185. }))
  186. step_results: List[StepResult] = []
  187. current_phase = None
  188. for step_def in steps_def:
  189. step_index = step_def["index"]
  190. step_name = step_def["name"]
  191. step_phase = step_def.get("phase")
  192. # phase_label:阶段切换时推送
  193. if step_phase and step_phase != current_phase:
  194. current_phase = step_phase
  195. await event_queue.put(("phase_label", {
  196. "task_id": trace_id,
  197. "label": step_phase,
  198. }))
  199. # --- step_progress 事件(开始执行)---
  200. await event_queue.put(("step_progress", {
  201. "task_id": trace_id,
  202. "step_index": step_index,
  203. "step_name": step_name,
  204. "status": "running",
  205. "duration": None,
  206. }))
  207. # 判断是否跳过:如果依赖的上一步失败/跳过,本步骤也跳过
  208. dep = _STEP_DEPS[chain_id].get(step_index)
  209. should_skip = False
  210. if dep is not None:
  211. for prev in step_results:
  212. if prev.index == dep and prev.status in ("error", "skipped"):
  213. should_skip = True
  214. break
  215. if should_skip:
  216. result = StepResult(
  217. index=step_index, name=step_name,
  218. status="skipped", phase=step_phase, duration=0,
  219. )
  220. else:
  221. try:
  222. if chain_id == "professional":
  223. result = await self._execute_professional_step(
  224. request, chain_cfg, step_def,
  225. step_results, trace_id,
  226. )
  227. else:
  228. result = await self._execute_direct_llm_step(
  229. request, chain_cfg, step_def,
  230. step_results, trace_id,
  231. )
  232. except Exception as exc:
  233. logger.error(
  234. f"[DebugExecutor] 步骤 {step_index} ({step_name}) "
  235. f"异常: {exc}", exc_info=True,
  236. )
  237. result = StepResult(
  238. index=step_index, name=step_name,
  239. status="error", phase=step_phase,
  240. duration=0, error=str(exc),
  241. )
  242. # --- step_result 事件 ---
  243. await self._emit_step_result(event_queue, trace_id, result)
  244. step_results.append(result)
  245. # --- completed 事件 ---
  246. total_duration = sum(s.duration for s in step_results)
  247. final_result = self._build_final_result(step_results)
  248. # 持久化调用记录(非关键路径,失败不阻止 normal 流程)
  249. try:
  250. self._save_record(request, trace_id, record_id,
  251. chain_cfg, step_results, total_duration)
  252. except Exception as exc:
  253. logger.warning(f"[DebugExecutor] 保存调用记录失败: {exc}")
  254. await event_queue.put(("completed", {
  255. "task_id": trace_id,
  256. "chain_id": chain_id,
  257. "total_duration": round(total_duration, 3),
  258. "record_id": record_id,
  259. "final_result": final_result,
  260. }))
  261. # ----------------------------------------------------------
  262. # 直调 LLM 链路(6 个)
  263. # ----------------------------------------------------------
  264. async def _execute_direct_llm_step(
  265. self,
  266. request: DebugExecuteRequest,
  267. chain_cfg: dict,
  268. step_def: dict,
  269. previous_steps: List[StepResult],
  270. trace_id: str,
  271. ) -> StepResult:
  272. """执行直调 LLM 链路的单个步骤"""
  273. step_index = step_def["index"]
  274. start = time.time()
  275. if step_index == 0:
  276. # ---------- Step 0: Prompt 渲染 ----------
  277. try:
  278. prompt_kwargs = {
  279. "review_content": request.content,
  280. "review_references": request.reference or "",
  281. }
  282. prompt_template = self.base_reviewer.prompt_loader.get_prompt_template(
  283. chain_cfg["reviewer_type"],
  284. chain_cfg["prompt_name"],
  285. **prompt_kwargs,
  286. )
  287. messages = prompt_template.format_messages(**prompt_kwargs)
  288. return StepResult(
  289. index=0, name="Prompt 渲染", status="success",
  290. duration=round(time.time() - start, 3),
  291. input_summary={
  292. "review_content": request.content,
  293. "review_references": request.reference or "",
  294. },
  295. output_summary={
  296. "system_prompt": (
  297. messages[0].content if messages else ""
  298. ),
  299. "user_prompt": (
  300. messages[1].content
  301. if len(messages) > 1 else ""
  302. ),
  303. },
  304. )
  305. except Exception as exc:
  306. return StepResult(
  307. index=0, name="Prompt 渲染", status="error",
  308. duration=round(time.time() - start, 3),
  309. error=str(exc),
  310. )
  311. elif step_index == 1:
  312. # ---------- Step 1: LLM 调用 ----------
  313. try:
  314. prompt_kwargs = {
  315. "review_content": request.content,
  316. "review_references": request.reference or "",
  317. }
  318. prompt_template = self.base_reviewer.prompt_loader.get_prompt_template(
  319. chain_cfg["reviewer_type"],
  320. chain_cfg["prompt_name"],
  321. **prompt_kwargs,
  322. )
  323. invoke_kwargs: dict = {
  324. "trace_id": trace_id,
  325. "task_prompt_info": {
  326. "task_prompt": prompt_template,
  327. "task_name": chain_cfg["prompt_name"],
  328. },
  329. "timeout": request.timeout,
  330. }
  331. # 模型选择:function_name > model > chain 默认
  332. if request.function_name:
  333. invoke_kwargs["function_name"] = request.function_name
  334. elif request.model:
  335. invoke_kwargs["model_name"] = request.model
  336. else:
  337. invoke_kwargs["function_name"] = chain_cfg.get(
  338. "function_name"
  339. )
  340. model_response = await generate_model_client.get_model_generate_invoke(
  341. **invoke_kwargs,
  342. )
  343. return StepResult(
  344. index=1, name="LLM 调用", status="success",
  345. duration=round(time.time() - start, 3),
  346. input_summary={
  347. "model": request.model or "default",
  348. "timeout": request.timeout,
  349. "function_name": invoke_kwargs.get("function_name"),
  350. },
  351. output_summary={
  352. "raw_response": model_response,
  353. },
  354. )
  355. except Exception as exc:
  356. return StepResult(
  357. index=1, name="LLM 调用", status="error",
  358. duration=round(time.time() - start, 3),
  359. error=str(exc),
  360. )
  361. elif step_index == 2:
  362. # ---------- Step 2: 结果解析 ----------
  363. prev = previous_steps[-1] if previous_steps else None
  364. if not prev or prev.status != "success":
  365. return StepResult(
  366. index=2, name="结果解析", status="skipped",
  367. duration=0,
  368. )
  369. try:
  370. raw = prev.output_summary.get("raw_response", "")
  371. result = self.base_reviewer.format_result(
  372. raw, chain_cfg["prompt_name"],
  373. )
  374. return StepResult(
  375. index=2, name="结果解析", status="success",
  376. duration=round(time.time() - start, 3),
  377. input_summary={"raw_response": raw},
  378. output_summary={"parsed_result": result.details},
  379. )
  380. except Exception as exc:
  381. return StepResult(
  382. index=2, name="结果解析", status="error",
  383. duration=round(time.time() - start, 3),
  384. error=str(exc),
  385. )
  386. # fallback
  387. return StepResult(
  388. index=step_index, name=step_def.get("name", "unknown"),
  389. status="error", duration=round(time.time() - start, 3),
  390. error=f"Unknown step index: {step_index}",
  391. )
  392. # ----------------------------------------------------------
  393. # 专业性审查(RAG + LLM,7 步)
  394. # ----------------------------------------------------------
  395. async def _execute_professional_step(
  396. self,
  397. request: DebugExecuteRequest,
  398. chain_cfg: dict,
  399. step_def: dict,
  400. previous_steps: List[StepResult],
  401. trace_id: str,
  402. ) -> StepResult:
  403. """执行专业性审查的单个步骤"""
  404. step_index = step_def["index"]
  405. start = time.time()
  406. phase = "RAG 召回阶段" if step_index < 4 else "AI 审查阶段"
  407. try:
  408. if step_index == 0:
  409. # ---- Step 0: 查询提取 ----
  410. from foundation.ai.rag.retrieval.query_rewrite import (
  411. query_rewrite_manager,
  412. )
  413. query_pairs = query_rewrite_manager.query_extract(
  414. request.content,
  415. )
  416. count = len(query_pairs) if query_pairs else 0
  417. return StepResult(
  418. index=0, name="查询提取", status="success",
  419. phase=phase,
  420. duration=round(time.time() - start, 3),
  421. input_summary={"content_length": len(request.content)},
  422. output_summary={
  423. "query_pairs_count": count,
  424. "query_pairs": query_pairs or [],
  425. },
  426. )
  427. elif step_index == 1:
  428. # ---- Step 1: 实体增强检索 ----
  429. prev = previous_steps[0] if len(previous_steps) > 0 else None
  430. if not prev or prev.status != "success":
  431. return StepResult(
  432. index=1, name="实体增强检索", status="skipped",
  433. phase=phase, duration=0,
  434. )
  435. qp = prev.output_summary.get("query_pairs", [])
  436. if not qp:
  437. return StepResult(
  438. index=1, name="实体增强检索", status="skipped",
  439. phase=phase, duration=0,
  440. )
  441. from foundation.ai.rag.retrieval.entities_enhance import (
  442. entity_enhance,
  443. )
  444. bfp_results = entity_enhance.entities_enhance_retrieval(qp)
  445. count = len(bfp_results) if bfp_results else 0
  446. return StepResult(
  447. index=1, name="实体增强检索", status="success",
  448. phase=phase,
  449. duration=round(time.time() - start, 3),
  450. input_summary={"query_pairs_count": len(qp)},
  451. output_summary={
  452. "bfp_results_count": count,
  453. "bfp_results": bfp_results or [],
  454. },
  455. )
  456. elif step_index == 2:
  457. # ---- Step 2: 父文档增强 ----
  458. prev = previous_steps[1] if len(previous_steps) > 1 else None
  459. if not prev or prev.status != "success":
  460. return StepResult(
  461. index=2, name="父文档增强", status="skipped",
  462. phase=phase, duration=0,
  463. )
  464. bfp = prev.output_summary.get("bfp_results", [])
  465. if not bfp:
  466. return StepResult(
  467. index=2, name="父文档增强", status="skipped",
  468. phase=phase, duration=0,
  469. )
  470. from core.construction_review.component.infrastructure.milvus import ( # noqa: E501
  471. MilvusManager, MilvusConfig,
  472. )
  473. from core.construction_review.component.infrastructure.parent_tool import ( # noqa: E501
  474. enhance_with_parent_docs_grouped,
  475. )
  476. milvus_mgr = MilvusManager(MilvusConfig())
  477. enhancement = enhance_with_parent_docs_grouped(
  478. milvus_mgr,
  479. bfp,
  480. score_threshold=0.5,
  481. max_parents_per_pair=3,
  482. max_parent_text_length=8000,
  483. )
  484. return StepResult(
  485. index=2, name="父文档增强", status="success",
  486. phase=phase,
  487. duration=round(time.time() - start, 3),
  488. input_summary={"bfp_results_count": len(bfp)},
  489. output_summary={
  490. "enhanced_results": enhancement.get(
  491. "enhanced_results", []
  492. ),
  493. "enhanced_count": enhancement.get(
  494. "enhanced_count", 0
  495. ),
  496. "parent_docs_count": len(
  497. enhancement.get("parent_docs", [])
  498. ),
  499. },
  500. )
  501. elif step_index == 3:
  502. # ---- Step 3: 结果提取 ----
  503. prev = previous_steps[2] if len(previous_steps) > 2 else None
  504. if not prev or prev.status != "success":
  505. return StepResult(
  506. index=3, name="结果提取", status="skipped",
  507. phase=phase, duration=0,
  508. )
  509. enhanced = prev.output_summary.get("enhanced_results", [])
  510. step0 = previous_steps[0]
  511. qp = step0.output_summary.get("query_pairs", [])
  512. if not enhanced:
  513. return StepResult(
  514. index=3, name="结果提取", status="skipped",
  515. phase=phase, duration=0,
  516. )
  517. from core.construction_review.component.infrastructure.parent_tool import ( # noqa: E501
  518. extract_query_pairs_results,
  519. )
  520. entity_results = extract_query_pairs_results(
  521. enhanced, qp, score_threshold=0.5,
  522. )
  523. return StepResult(
  524. index=3, name="结果提取", status="success",
  525. phase=phase,
  526. duration=round(time.time() - start, 3),
  527. input_summary={"enhanced_results_count": len(enhanced)},
  528. output_summary={
  529. "entity_results_count": (
  530. len(entity_results) if entity_results else 0
  531. ),
  532. "entity_results": entity_results or [],
  533. },
  534. )
  535. elif step_index == 4:
  536. # ---- Step 4: 非参数合规审查 ----
  537. step3 = previous_steps[3] if len(previous_steps) > 3 else None
  538. if not step3 or step3.status != "success":
  539. return StepResult(
  540. index=4, name="非参数合规审查", status="skipped",
  541. phase=phase, duration=0,
  542. )
  543. entity_results = step3.output_summary.get(
  544. "entity_results", []
  545. )
  546. if not entity_results:
  547. return StepResult(
  548. index=4, name="非参数合规审查", status="skipped",
  549. phase=phase, duration=0,
  550. )
  551. rag_params = request.rag_params or RagParams()
  552. rt = rag_params.review_type
  553. from core.construction_review.component.ai_review_engine import (
  554. AIReviewEngine,
  555. )
  556. engine = AIReviewEngine(max_concurrent_reviews=1)
  557. results_list: List[dict] = []
  558. for idx, item in enumerate(entity_results):
  559. if rt in ("non_parameter", "both"):
  560. cq = item.get("combined_query", "")
  561. tc = item.get("text_content", "")
  562. fn = item.get("file_name", "")
  563. res = await engine.check_non_parameter_compliance(
  564. trace_id_idx=trace_id,
  565. review_content=request.content,
  566. review_references=tc,
  567. reference_source=fn,
  568. state=None,
  569. stage_name=f"pro_non_param_{idx}",
  570. entity_query=cq,
  571. )
  572. processed = self._process_review_result(res)
  573. processed["entity"] = item.get("entity", f"e{idx}")
  574. processed["combined_query"] = cq
  575. results_list.append(processed)
  576. return StepResult(
  577. index=4, name="非参数合规审查", status="success",
  578. phase=phase,
  579. duration=round(time.time() - start, 3),
  580. input_summary={
  581. "entity_count": len(entity_results),
  582. "review_type": rt,
  583. },
  584. output_summary={
  585. "non_parameter_results": results_list,
  586. "result_count": len(results_list),
  587. },
  588. )
  589. elif step_index == 5:
  590. # ---- Step 5: 参数合规审查 ----
  591. step3 = previous_steps[3] if len(previous_steps) > 3 else None
  592. if not step3 or step3.status != "success":
  593. return StepResult(
  594. index=5, name="参数合规审查", status="skipped",
  595. phase=phase, duration=0,
  596. )
  597. entity_results = step3.output_summary.get(
  598. "entity_results", []
  599. )
  600. if not entity_results:
  601. return StepResult(
  602. index=5, name="参数合规审查", status="skipped",
  603. phase=phase, duration=0,
  604. )
  605. rag_params = request.rag_params or RagParams()
  606. rt = rag_params.review_type
  607. from core.construction_review.component.ai_review_engine import (
  608. AIReviewEngine,
  609. )
  610. engine = AIReviewEngine(max_concurrent_reviews=1)
  611. results_list: List[dict] = []
  612. for idx, item in enumerate(entity_results):
  613. if rt in ("parameter", "both"):
  614. cq = item.get("combined_query", "")
  615. tc = item.get("text_content", "")
  616. fn = item.get("file_name", "")
  617. res = await engine.check_parameter_compliance(
  618. trace_id_idx=trace_id,
  619. review_content=request.content,
  620. review_references=tc,
  621. reference_source=fn,
  622. state=None,
  623. stage_name=f"pro_param_{idx}",
  624. entity_query=cq,
  625. )
  626. processed = self._process_review_result(res)
  627. processed["entity"] = item.get("entity", f"e{idx}")
  628. processed["combined_query"] = cq
  629. results_list.append(processed)
  630. return StepResult(
  631. index=5, name="参数合规审查", status="success",
  632. phase=phase,
  633. duration=round(time.time() - start, 3),
  634. input_summary={
  635. "entity_count": len(entity_results),
  636. "review_type": rt,
  637. },
  638. output_summary={
  639. "parameter_results": results_list,
  640. "result_count": len(results_list),
  641. },
  642. )
  643. elif step_index == 6:
  644. # ---- Step 6: 结果汇总 ----
  645. summary: Dict[str, list] = {}
  646. if len(previous_steps) > 4:
  647. s4 = previous_steps[4]
  648. if s4.status == "success":
  649. summary["non_parameter"] = s4.output_summary.get(
  650. "non_parameter_results", []
  651. )
  652. if len(previous_steps) > 5:
  653. s5 = previous_steps[5]
  654. if s5.status == "success":
  655. summary["parameter"] = s5.output_summary.get(
  656. "parameter_results", []
  657. )
  658. total_items = (
  659. len(summary.get("non_parameter", []))
  660. + len(summary.get("parameter", []))
  661. )
  662. return StepResult(
  663. index=6, name="结果汇总", status="success",
  664. phase=phase,
  665. duration=round(time.time() - start, 3),
  666. input_summary={},
  667. output_summary={
  668. "final_summary": summary,
  669. "total_review_items": total_items,
  670. },
  671. )
  672. except Exception as exc:
  673. logger.error(
  674. f"[DebugExecutor] 专业性审查步骤 {step_index} "
  675. f"({step_def['name']}) 异常: {exc}", exc_info=True,
  676. )
  677. return StepResult(
  678. index=step_index, name=step_def["name"],
  679. status="error", phase=phase,
  680. duration=round(time.time() - start, 3),
  681. error=str(exc),
  682. )
  683. # fallback
  684. return StepResult(
  685. index=step_index, name=step_def.get("name", "unknown"),
  686. status="error", phase=phase,
  687. duration=round(time.time() - start, 3),
  688. error=f"Unknown step index: {step_index}",
  689. )
  690. # ----------------------------------------------------------
  691. # 内联方法:_execute_timeliness(TC-C02-API-003 验收)
  692. # ----------------------------------------------------------
  693. async def _execute_timeliness(
  694. self, request: DebugExecuteRequest, event_queue: asyncio.Queue,
  695. ) -> None:
  696. """
  697. 时效性审查快捷入口(通过 AIReviewEngine 封装)。
  698. TC-C02-API-003 要求此方法存在,此处委托给标准 execute 流程。
  699. """
  700. await self.execute(request, event_queue)
  701. # ----------------------------------------------------------
  702. # 辅助方法
  703. # ----------------------------------------------------------
  704. @staticmethod
  705. def _process_review_result(result: Any) -> dict:
  706. """将 review 返回结果规范化为字典"""
  707. if result is None:
  708. return {"success": False, "details": {}, "error": "empty result"}
  709. if isinstance(result, dict):
  710. return result
  711. if hasattr(result, "success"):
  712. return {
  713. "success": getattr(result, "success", False),
  714. "details": getattr(result, "details", {}),
  715. "error_message": getattr(result, "error_message", None),
  716. "execution_time": getattr(result, "execution_time", None),
  717. }
  718. return {"success": True, "details": {"response": str(result)}}
  719. @staticmethod
  720. async def _emit_step_result(
  721. event_queue: asyncio.Queue,
  722. task_id: str,
  723. sr: StepResult,
  724. ) -> None:
  725. """推送 step_result 事件到队列"""
  726. await event_queue.put(("step_result", {
  727. "task_id": task_id,
  728. "step_index": sr.index,
  729. "step_name": sr.name,
  730. "status": sr.status,
  731. "duration": sr.duration,
  732. "input": sr.input_summary,
  733. "output": sr.output_summary,
  734. "error": sr.error,
  735. }))
  736. @staticmethod
  737. def _build_final_result(
  738. step_results: List[StepResult],
  739. ) -> dict:
  740. """构建 completed 事件的 final_result"""
  741. total = len(step_results)
  742. succ = sum(1 for s in step_results if s.status == "success")
  743. err = sum(1 for s in step_results if s.status == "error")
  744. skip = sum(1 for s in step_results if s.status == "skipped")
  745. return {
  746. "summary": (
  747. f"{succ}/{total} 步骤成功, "
  748. f"{err} 错误, {skip} 跳过"
  749. ),
  750. "success_count": succ,
  751. "error_count": err,
  752. "skipped_count": skip,
  753. "total_steps": total,
  754. }
  755. @staticmethod
  756. def _save_record(
  757. request: DebugExecuteRequest,
  758. record_id: str,
  759. display_id: str,
  760. chain_cfg: dict,
  761. step_results: List[StepResult],
  762. total_duration: float,
  763. ) -> None:
  764. """持久化调用记录到 temp/debug_console/call_records/"""
  765. records_dir = "temp/debug_console/call_records"
  766. os.makedirs(records_dir, exist_ok=True)
  767. step_records = []
  768. for s in step_results:
  769. step_records.append({
  770. "index": s.index,
  771. "name": s.name,
  772. "phase": s.phase,
  773. "status": s.status,
  774. "duration_ms": int(s.duration * 1000),
  775. "input": s.input_summary,
  776. "output": s.output_summary,
  777. })
  778. record = {
  779. "id": display_id,
  780. "time": datetime.now().isoformat(),
  781. "chain": request.chain_id,
  782. "chain_name": chain_cfg.get("name", ""),
  783. "doc_ref": "",
  784. "status": "succ",
  785. "duration_ms": int(total_duration * 1000),
  786. "model": request.model or "default",
  787. "prompt_ver": "",
  788. "prompt_name": chain_cfg.get("prompt_name", ""),
  789. "tokens": 0,
  790. "params": {
  791. "review_content": request.content,
  792. "review_references": request.reference or "",
  793. "model_override": request.model,
  794. "function_name": request.function_name,
  795. "timeout": request.timeout,
  796. },
  797. "execution_params": {
  798. "isolation_mode": request.isolation_mode,
  799. "isolation_steps": list(request.isolation_steps),
  800. "rag_params": (
  801. request.rag_params.model_dump()
  802. if request.rag_params else None
  803. ),
  804. },
  805. "steps": step_records,
  806. "result": "",
  807. "error_message": None,
  808. }
  809. filepath = os.path.join(records_dir, f"{display_id}.json")
  810. with open(filepath, "w", encoding="utf-8") as f:
  811. json.dump(record, f, ensure_ascii=False, indent=2)