executor.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918
  1. """
  2. 审查执行编排器
  3. 封装 7 个审查链路的调用,复用 BaseReviewer.review() 和 AIReviewEngine。
  4. 支持 SSE 进度推送、单步错误隔离、trace_id 生产隔离。
  5. 用法:
  6. executor = DebugExecutor()
  7. await executor.execute(request, event_queue)
  8. """
  9. import asyncio
  10. import json
  11. import os
  12. import time
  13. import uuid
  14. from datetime import datetime
  15. from typing import Any, Dict, List, Optional
  16. from views.debug.debug_api import (
  17. DebugExecuteRequest,
  18. RagParams,
  19. )
  20. from core.construction_review.component.reviewers.base_reviewer import BaseReviewer
  21. from foundation.ai.agent.generate.model_generate import generate_model_client
  22. from foundation.observability.logger.loggering import review_logger as logger
  23. # ============================================================
  24. # 步骤定义
  25. # ============================================================
  26. CHAIN_STEPS = {
  27. "completeness": [
  28. {"index": 0, "name": "Prompt 渲染", "phase": None},
  29. {"index": 1, "name": "LLM 调用", "phase": None},
  30. {"index": 2, "name": "结果解析", "phase": None},
  31. ],
  32. "timeliness": [
  33. {"index": 0, "name": "Prompt 渲染", "phase": None},
  34. {"index": 1, "name": "LLM 调用", "phase": None},
  35. {"index": 2, "name": "结果解析", "phase": None},
  36. ],
  37. "reference": [
  38. {"index": 0, "name": "Prompt 渲染", "phase": None},
  39. {"index": 1, "name": "LLM 调用", "phase": None},
  40. {"index": 2, "name": "结果解析", "phase": None},
  41. ],
  42. "sensitive": [
  43. {"index": 0, "name": "Prompt 渲染", "phase": None},
  44. {"index": 1, "name": "LLM 调用", "phase": None},
  45. {"index": 2, "name": "结果解析", "phase": None},
  46. ],
  47. "semantic": [
  48. {"index": 0, "name": "Prompt 渲染", "phase": None},
  49. {"index": 1, "name": "LLM 调用", "phase": None},
  50. {"index": 2, "name": "结果解析", "phase": None},
  51. ],
  52. "grammar": [
  53. {"index": 0, "name": "Prompt 渲染", "phase": None},
  54. {"index": 1, "name": "LLM 调用", "phase": None},
  55. {"index": 2, "name": "结果解析", "phase": None},
  56. ],
  57. "professional": [
  58. {"index": 0, "name": "查询提取", "phase": "RAG 召回阶段"},
  59. {"index": 1, "name": "实体增强检索", "phase": "RAG 召回阶段"},
  60. {"index": 2, "name": "父文档增强", "phase": "RAG 召回阶段"},
  61. {"index": 3, "name": "结果提取", "phase": "RAG 召回阶段"},
  62. {"index": 4, "name": "非参数合规审查", "phase": "AI 审查阶段"},
  63. {"index": 5, "name": "参数合规审查", "phase": "AI 审查阶段"},
  64. {"index": 6, "name": "结果汇总", "phase": "AI 审查阶段"},
  65. ],
  66. }
  67. VALID_CHAIN_IDS = set(CHAIN_STEPS.keys())
  68. # ============================================================
  69. # StepResult 数据类
  70. # ============================================================
  71. class StepResult:
  72. """单步执行结果"""
  73. __slots__ = ("index", "name", "phase", "status", "duration",
  74. "input_summary", "output_summary", "error")
  75. def __init__(self, index: int, name: str, status: str = "pending",
  76. phase: str = None, duration: float = 0,
  77. input_summary: dict = None, output_summary: dict = None,
  78. error: str = None):
  79. self.index = index
  80. self.name = name
  81. self.phase = phase
  82. self.status = status
  83. self.duration = duration
  84. self.input_summary = input_summary or {}
  85. self.output_summary = output_summary or {}
  86. self.error = error
  87. # ============================================================
  88. # CHAIN_CONFIG 映射表
  89. # ============================================================
  90. CHAIN_CONFIG = {
  91. "completeness": {
  92. "name": "完整性审查",
  93. "reviewer_type": "basic",
  94. "prompt_name": "completeness_check",
  95. "function_name": "completeness_review_generate",
  96. },
  97. "timeliness": {
  98. "name": "时效性审查",
  99. "reviewer_type": "basic",
  100. "prompt_name": "timeliness_check",
  101. "function_name": "timeliness_review_generate",
  102. },
  103. "reference": {
  104. "name": "规范性审查",
  105. "reviewer_type": "basic",
  106. "prompt_name": "reference_check",
  107. "function_name": "reference_review_generate",
  108. },
  109. "sensitive": {
  110. "name": "敏感词检查",
  111. "reviewer_type": "basic", # 实际位于 basic_reviewers.yaml
  112. "prompt_name": "sensitive_word_check",
  113. "function_name": "sensitive_word_check_generate",
  114. },
  115. "semantic": {
  116. "name": "语义逻辑检查",
  117. "reviewer_type": "basic",
  118. "prompt_name": "semantic_logic_check",
  119. "function_name": "semantic_logic_check_generate",
  120. },
  121. "grammar": {
  122. "name": "语法检查",
  123. "reviewer_type": "basic",
  124. "prompt_name": "grammar_check",
  125. "function_name": "grammar_check_generate",
  126. },
  127. "professional": {
  128. "name": "专业性审查",
  129. "reviewer_type": "rag",
  130. "prompt_name": None,
  131. "function_name": "professional_review_generate",
  132. },
  133. }
  134. # 各链路的步骤依赖关系:step_index -> (依赖的上一步索引, 是否可跳过)
  135. _STEP_DEPS = {
  136. # 直调 LLM 链路:步骤 2 依赖步骤 1,步骤 1 依赖步骤 0
  137. "completeness": {0: None, 1: 0, 2: 1},
  138. "timeliness": {0: None, 1: 0, 2: 1},
  139. "reference": {0: None, 1: 0, 2: 1},
  140. "sensitive": {0: None, 1: 0, 2: 1},
  141. "semantic": {0: None, 1: 0, 2: 1},
  142. "grammar": {0: None, 1: 0, 2: 1},
  143. # 专业性审查:每步依赖上一步
  144. "professional": {0: None, 1: 0, 2: 1, 3: 2, 4: 3, 5: 4, 6: 5},
  145. }
  146. def _make_trace_id(chain_id: str) -> str:
  147. """生成 trace_id,添加 debug_ 前缀实现生产隔离"""
  148. return (
  149. f"debug_{chain_id}_"
  150. f"{datetime.now().strftime('%H%M%S')}_"
  151. f"{uuid.uuid4().hex[:8]}"
  152. )
  153. def _make_record_id() -> str:
  154. """生成记录 ID"""
  155. return (
  156. f"call-{datetime.now().strftime('%Y%m%d-%H%M%S')}-"
  157. f"{uuid.uuid4().hex[:6]}"
  158. )
  159. class DebugExecutor:
  160. """审查执行编排器"""
  161. # 暴露 CHAIN_CONFIG 供外部使用
  162. CHAIN_CONFIG = CHAIN_CONFIG
  163. CHAIN_STEPS = CHAIN_STEPS
  164. def __init__(self):
  165. self.base_reviewer = BaseReviewer()
  166. async def execute(self, request: DebugExecuteRequest,
  167. event_queue: asyncio.Queue) -> dict:
  168. """
  169. 执行审查调试,所有事件通过 event_queue 推送。
  170. Returns:
  171. dict with keys: completed_data, steps_collected, error_occurred, record_id
  172. """
  173. chain_id = request.chain_id
  174. chain_cfg = CHAIN_CONFIG[chain_id]
  175. steps_def = CHAIN_STEPS[chain_id]
  176. trace_id = _make_trace_id(chain_id)
  177. record_id = _make_record_id()
  178. total_steps = len(steps_def)
  179. # --- started 事件 ---
  180. await event_queue.put(("started", {
  181. "task_id": trace_id,
  182. "chain_id": chain_id,
  183. "total_steps": total_steps,
  184. }))
  185. step_results: List[StepResult] = []
  186. current_phase = None
  187. for step_def in steps_def:
  188. step_index = step_def["index"]
  189. step_name = step_def["name"]
  190. step_phase = step_def.get("phase")
  191. # phase_label:阶段切换时推送
  192. if step_phase and step_phase != current_phase:
  193. current_phase = step_phase
  194. await event_queue.put(("phase_label", {
  195. "task_id": trace_id,
  196. "label": step_phase,
  197. }))
  198. # --- step_progress 事件(开始执行)---
  199. await event_queue.put(("step_progress", {
  200. "task_id": trace_id,
  201. "step_index": step_index,
  202. "step_name": step_name,
  203. "status": "running",
  204. "duration": None,
  205. }))
  206. # 判断是否跳过:如果依赖的上一步失败/跳过,本步骤也跳过
  207. dep = _STEP_DEPS[chain_id].get(step_index)
  208. should_skip = False
  209. if dep is not None:
  210. for prev in step_results:
  211. if prev.index == dep and prev.status in ("error", "skipped"):
  212. should_skip = True
  213. break
  214. if should_skip:
  215. result = StepResult(
  216. index=step_index, name=step_name,
  217. status="skipped", phase=step_phase, duration=0,
  218. )
  219. else:
  220. try:
  221. if chain_id == "professional":
  222. result = await self._execute_professional_step(
  223. request, chain_cfg, step_def,
  224. step_results, trace_id,
  225. )
  226. else:
  227. result = await self._execute_direct_llm_step(
  228. request, chain_cfg, step_def,
  229. step_results, trace_id,
  230. )
  231. except Exception as exc:
  232. logger.error(
  233. f"[DebugExecutor] 步骤 {step_index} ({step_name}) "
  234. f"异常: {exc}", exc_info=True,
  235. )
  236. result = StepResult(
  237. index=step_index, name=step_name,
  238. status="error", phase=step_phase,
  239. duration=0, error=str(exc),
  240. )
  241. # --- step_result 事件 ---
  242. await self._emit_step_result(event_queue, trace_id, result)
  243. step_results.append(result)
  244. # --- completed 事件 ---
  245. total_duration = sum(s.duration for s in step_results)
  246. final_result = self._build_final_result(step_results)
  247. error_occurred = None
  248. for s in step_results:
  249. if s.status == "error":
  250. error_occurred = s.error or f"步骤 {s.index} ({s.name}) 执行失败"
  251. break
  252. # 转换为可序列化的步骤列表
  253. steps_collected = [
  254. {
  255. "index": s.index,
  256. "name": s.name,
  257. "status": s.status,
  258. "duration_ms": int(s.duration * 1000) if s.duration else 0,
  259. "input": s.input_summary,
  260. "output": s.output_summary,
  261. }
  262. for s in step_results
  263. ]
  264. completed_data = {
  265. "task_id": trace_id,
  266. "chain_id": chain_id,
  267. "total_duration": round(total_duration, 3),
  268. "record_id": record_id,
  269. "final_result": final_result,
  270. }
  271. await event_queue.put(("completed", completed_data))
  272. return {
  273. "completed_data": completed_data,
  274. "steps_collected": steps_collected,
  275. "error_occurred": error_occurred,
  276. "record_id": record_id,
  277. }
  278. # ----------------------------------------------------------
  279. # 直调 LLM 链路(6 个)
  280. # ----------------------------------------------------------
  281. async def _execute_direct_llm_step(
  282. self,
  283. request: DebugExecuteRequest,
  284. chain_cfg: dict,
  285. step_def: dict,
  286. previous_steps: List[StepResult],
  287. trace_id: str,
  288. ) -> StepResult:
  289. """执行直调 LLM 链路的单个步骤"""
  290. step_index = step_def["index"]
  291. start = time.time()
  292. if step_index == 0:
  293. # ---------- Step 0: Prompt 渲染 ----------
  294. try:
  295. prompt_kwargs = {
  296. "review_content": request.content,
  297. "review_references": request.reference or "",
  298. }
  299. prompt_template = self.base_reviewer.prompt_loader.get_prompt_template(
  300. chain_cfg["reviewer_type"],
  301. chain_cfg["prompt_name"],
  302. **prompt_kwargs,
  303. )
  304. messages = prompt_template.format_messages(**prompt_kwargs)
  305. return StepResult(
  306. index=0, name="Prompt 渲染", status="success",
  307. duration=round(time.time() - start, 3),
  308. input_summary={
  309. "review_content": request.content,
  310. "review_references": request.reference or "",
  311. },
  312. output_summary={
  313. "system_prompt": (
  314. messages[0].content if messages else ""
  315. ),
  316. "user_prompt": (
  317. messages[1].content
  318. if len(messages) > 1 else ""
  319. ),
  320. },
  321. )
  322. except Exception as exc:
  323. return StepResult(
  324. index=0, name="Prompt 渲染", status="error",
  325. duration=round(time.time() - start, 3),
  326. error=str(exc),
  327. )
  328. elif step_index == 1:
  329. # ---------- Step 1: LLM 调用 ----------
  330. try:
  331. prompt_kwargs = {
  332. "review_content": request.content,
  333. "review_references": request.reference or "",
  334. }
  335. prompt_template = self.base_reviewer.prompt_loader.get_prompt_template(
  336. chain_cfg["reviewer_type"],
  337. chain_cfg["prompt_name"],
  338. **prompt_kwargs,
  339. )
  340. invoke_kwargs: dict = {
  341. "trace_id": trace_id,
  342. "task_prompt_info": {
  343. "task_prompt": prompt_template,
  344. "task_name": chain_cfg["prompt_name"],
  345. },
  346. "timeout": request.timeout,
  347. }
  348. # 模型选择:function_name > model > chain 默认
  349. if request.function_name:
  350. invoke_kwargs["function_name"] = request.function_name
  351. elif request.model:
  352. invoke_kwargs["model_name"] = request.model
  353. else:
  354. invoke_kwargs["function_name"] = chain_cfg.get(
  355. "function_name"
  356. )
  357. model_response = await generate_model_client.get_model_generate_invoke(
  358. **invoke_kwargs,
  359. )
  360. return StepResult(
  361. index=1, name="LLM 调用", status="success",
  362. duration=round(time.time() - start, 3),
  363. input_summary={
  364. "model": request.model or "default",
  365. "timeout": request.timeout,
  366. "function_name": invoke_kwargs.get("function_name"),
  367. },
  368. output_summary={
  369. "raw_response": model_response,
  370. },
  371. )
  372. except Exception as exc:
  373. return StepResult(
  374. index=1, name="LLM 调用", status="error",
  375. duration=round(time.time() - start, 3),
  376. error=str(exc),
  377. )
  378. elif step_index == 2:
  379. # ---------- Step 2: 结果解析 ----------
  380. prev = previous_steps[-1] if previous_steps else None
  381. if not prev or prev.status != "success":
  382. return StepResult(
  383. index=2, name="结果解析", status="skipped",
  384. duration=0,
  385. )
  386. try:
  387. raw = prev.output_summary.get("raw_response", "")
  388. result = self.base_reviewer.format_result(
  389. raw, chain_cfg["prompt_name"],
  390. )
  391. return StepResult(
  392. index=2, name="结果解析", status="success",
  393. duration=round(time.time() - start, 3),
  394. input_summary={"raw_response": raw},
  395. output_summary={"parsed_result": result.details},
  396. )
  397. except Exception as exc:
  398. return StepResult(
  399. index=2, name="结果解析", status="error",
  400. duration=round(time.time() - start, 3),
  401. error=str(exc),
  402. )
  403. # fallback
  404. return StepResult(
  405. index=step_index, name=step_def.get("name", "unknown"),
  406. status="error", duration=round(time.time() - start, 3),
  407. error=f"Unknown step index: {step_index}",
  408. )
  409. # ----------------------------------------------------------
  410. # 专业性审查(RAG + LLM,7 步)
  411. # ----------------------------------------------------------
  412. async def _execute_professional_step(
  413. self,
  414. request: DebugExecuteRequest,
  415. chain_cfg: dict,
  416. step_def: dict,
  417. previous_steps: List[StepResult],
  418. trace_id: str,
  419. ) -> StepResult:
  420. """执行专业性审查的单个步骤"""
  421. step_index = step_def["index"]
  422. start = time.time()
  423. phase = "RAG 召回阶段" if step_index < 4 else "AI 审查阶段"
  424. try:
  425. if step_index == 0:
  426. # ---- Step 0: 查询提取 ----
  427. from foundation.ai.rag.retrieval.query_rewrite import (
  428. query_rewrite_manager,
  429. )
  430. query_pairs = query_rewrite_manager.query_extract(
  431. request.content,
  432. )
  433. count = len(query_pairs) if query_pairs else 0
  434. return StepResult(
  435. index=0, name="查询提取", status="success",
  436. phase=phase,
  437. duration=round(time.time() - start, 3),
  438. input_summary={"content_length": len(request.content)},
  439. output_summary={
  440. "query_pairs_count": count,
  441. "query_pairs": query_pairs or [],
  442. },
  443. )
  444. elif step_index == 1:
  445. # ---- Step 1: 实体增强检索 ----
  446. prev = previous_steps[0] if len(previous_steps) > 0 else None
  447. if not prev or prev.status != "success":
  448. return StepResult(
  449. index=1, name="实体增强检索", status="skipped",
  450. phase=phase, duration=0,
  451. )
  452. qp = prev.output_summary.get("query_pairs", [])
  453. if not qp:
  454. return StepResult(
  455. index=1, name="实体增强检索", status="skipped",
  456. phase=phase, duration=0,
  457. )
  458. from foundation.ai.rag.retrieval.entities_enhance import (
  459. entity_enhance,
  460. )
  461. bfp_results = entity_enhance.entities_enhance_retrieval(qp)
  462. count = len(bfp_results) if bfp_results else 0
  463. return StepResult(
  464. index=1, name="实体增强检索", status="success",
  465. phase=phase,
  466. duration=round(time.time() - start, 3),
  467. input_summary={"query_pairs_count": len(qp)},
  468. output_summary={
  469. "bfp_results_count": count,
  470. "bfp_results": bfp_results or [],
  471. },
  472. )
  473. elif step_index == 2:
  474. # ---- Step 2: 父文档增强 ----
  475. prev = previous_steps[1] if len(previous_steps) > 1 else None
  476. if not prev or prev.status != "success":
  477. return StepResult(
  478. index=2, name="父文档增强", status="skipped",
  479. phase=phase, duration=0,
  480. )
  481. bfp = prev.output_summary.get("bfp_results", [])
  482. if not bfp:
  483. return StepResult(
  484. index=2, name="父文档增强", status="skipped",
  485. phase=phase, duration=0,
  486. )
  487. from core.construction_review.component.infrastructure.milvus import ( # noqa: E501
  488. MilvusManager, MilvusConfig,
  489. )
  490. from core.construction_review.component.infrastructure.parent_tool import ( # noqa: E501
  491. enhance_with_parent_docs_grouped,
  492. )
  493. milvus_mgr = MilvusManager(MilvusConfig())
  494. enhancement = enhance_with_parent_docs_grouped(
  495. milvus_mgr,
  496. bfp,
  497. score_threshold=0.5,
  498. max_parents_per_pair=3,
  499. max_parent_text_length=8000,
  500. )
  501. return StepResult(
  502. index=2, name="父文档增强", status="success",
  503. phase=phase,
  504. duration=round(time.time() - start, 3),
  505. input_summary={"bfp_results_count": len(bfp)},
  506. output_summary={
  507. "enhanced_results": enhancement.get(
  508. "enhanced_results", []
  509. ),
  510. "enhanced_count": enhancement.get(
  511. "enhanced_count", 0
  512. ),
  513. "parent_docs_count": len(
  514. enhancement.get("parent_docs", [])
  515. ),
  516. },
  517. )
  518. elif step_index == 3:
  519. # ---- Step 3: 结果提取 ----
  520. prev = previous_steps[2] if len(previous_steps) > 2 else None
  521. if not prev or prev.status != "success":
  522. return StepResult(
  523. index=3, name="结果提取", status="skipped",
  524. phase=phase, duration=0,
  525. )
  526. enhanced = prev.output_summary.get("enhanced_results", [])
  527. step0 = previous_steps[0]
  528. qp = step0.output_summary.get("query_pairs", [])
  529. if not enhanced:
  530. return StepResult(
  531. index=3, name="结果提取", status="skipped",
  532. phase=phase, duration=0,
  533. )
  534. from core.construction_review.component.infrastructure.parent_tool import ( # noqa: E501
  535. extract_query_pairs_results,
  536. )
  537. entity_results = extract_query_pairs_results(
  538. enhanced, qp, score_threshold=0.5,
  539. )
  540. return StepResult(
  541. index=3, name="结果提取", status="success",
  542. phase=phase,
  543. duration=round(time.time() - start, 3),
  544. input_summary={"enhanced_results_count": len(enhanced)},
  545. output_summary={
  546. "entity_results_count": (
  547. len(entity_results) if entity_results else 0
  548. ),
  549. "entity_results": entity_results or [],
  550. },
  551. )
  552. elif step_index == 4:
  553. # ---- Step 4: 非参数合规审查 ----
  554. step3 = previous_steps[3] if len(previous_steps) > 3 else None
  555. if not step3 or step3.status != "success":
  556. return StepResult(
  557. index=4, name="非参数合规审查", status="skipped",
  558. phase=phase, duration=0,
  559. )
  560. entity_results = step3.output_summary.get(
  561. "entity_results", []
  562. )
  563. if not entity_results:
  564. return StepResult(
  565. index=4, name="非参数合规审查", status="skipped",
  566. phase=phase, duration=0,
  567. )
  568. rag_params = request.rag_params or RagParams()
  569. rt = rag_params.review_type
  570. from core.construction_review.component.ai_review_engine import (
  571. AIReviewEngine,
  572. )
  573. engine = AIReviewEngine(max_concurrent_reviews=1)
  574. results_list: List[dict] = []
  575. for idx, item in enumerate(entity_results):
  576. if rt in ("non_parameter", "both"):
  577. cq = item.get("combined_query", "")
  578. tc = item.get("text_content", "")
  579. fn = item.get("file_name", "")
  580. res = await engine.check_non_parameter_compliance(
  581. trace_id_idx=trace_id,
  582. review_content=request.content,
  583. review_references=tc,
  584. reference_source=fn,
  585. state=None,
  586. stage_name=f"pro_non_param_{idx}",
  587. entity_query=cq,
  588. )
  589. processed = self._process_review_result(res)
  590. processed["entity"] = item.get("entity", f"e{idx}")
  591. processed["combined_query"] = cq
  592. results_list.append(processed)
  593. return StepResult(
  594. index=4, name="非参数合规审查", status="success",
  595. phase=phase,
  596. duration=round(time.time() - start, 3),
  597. input_summary={
  598. "entity_count": len(entity_results),
  599. "review_type": rt,
  600. },
  601. output_summary={
  602. "non_parameter_results": results_list,
  603. "result_count": len(results_list),
  604. },
  605. )
  606. elif step_index == 5:
  607. # ---- Step 5: 参数合规审查 ----
  608. step3 = previous_steps[3] if len(previous_steps) > 3 else None
  609. if not step3 or step3.status != "success":
  610. return StepResult(
  611. index=5, name="参数合规审查", status="skipped",
  612. phase=phase, duration=0,
  613. )
  614. entity_results = step3.output_summary.get(
  615. "entity_results", []
  616. )
  617. if not entity_results:
  618. return StepResult(
  619. index=5, name="参数合规审查", status="skipped",
  620. phase=phase, duration=0,
  621. )
  622. rag_params = request.rag_params or RagParams()
  623. rt = rag_params.review_type
  624. from core.construction_review.component.ai_review_engine import (
  625. AIReviewEngine,
  626. )
  627. engine = AIReviewEngine(max_concurrent_reviews=1)
  628. results_list: List[dict] = []
  629. for idx, item in enumerate(entity_results):
  630. if rt in ("parameter", "both"):
  631. cq = item.get("combined_query", "")
  632. tc = item.get("text_content", "")
  633. fn = item.get("file_name", "")
  634. res = await engine.check_parameter_compliance(
  635. trace_id_idx=trace_id,
  636. review_content=request.content,
  637. review_references=tc,
  638. reference_source=fn,
  639. state=None,
  640. stage_name=f"pro_param_{idx}",
  641. entity_query=cq,
  642. )
  643. processed = self._process_review_result(res)
  644. processed["entity"] = item.get("entity", f"e{idx}")
  645. processed["combined_query"] = cq
  646. results_list.append(processed)
  647. return StepResult(
  648. index=5, name="参数合规审查", status="success",
  649. phase=phase,
  650. duration=round(time.time() - start, 3),
  651. input_summary={
  652. "entity_count": len(entity_results),
  653. "review_type": rt,
  654. },
  655. output_summary={
  656. "parameter_results": results_list,
  657. "result_count": len(results_list),
  658. },
  659. )
  660. elif step_index == 6:
  661. # ---- Step 6: 结果汇总 ----
  662. summary: Dict[str, list] = {}
  663. if len(previous_steps) > 4:
  664. s4 = previous_steps[4]
  665. if s4.status == "success":
  666. summary["non_parameter"] = s4.output_summary.get(
  667. "non_parameter_results", []
  668. )
  669. if len(previous_steps) > 5:
  670. s5 = previous_steps[5]
  671. if s5.status == "success":
  672. summary["parameter"] = s5.output_summary.get(
  673. "parameter_results", []
  674. )
  675. total_items = (
  676. len(summary.get("non_parameter", []))
  677. + len(summary.get("parameter", []))
  678. )
  679. return StepResult(
  680. index=6, name="结果汇总", status="success",
  681. phase=phase,
  682. duration=round(time.time() - start, 3),
  683. input_summary={},
  684. output_summary={
  685. "final_summary": summary,
  686. "total_review_items": total_items,
  687. },
  688. )
  689. except Exception as exc:
  690. logger.error(
  691. f"[DebugExecutor] 专业性审查步骤 {step_index} "
  692. f"({step_def['name']}) 异常: {exc}", exc_info=True,
  693. )
  694. return StepResult(
  695. index=step_index, name=step_def["name"],
  696. status="error", phase=phase,
  697. duration=round(time.time() - start, 3),
  698. error=str(exc),
  699. )
  700. # fallback
  701. return StepResult(
  702. index=step_index, name=step_def.get("name", "unknown"),
  703. status="error", phase=phase,
  704. duration=round(time.time() - start, 3),
  705. error=f"Unknown step index: {step_index}",
  706. )
  707. # ----------------------------------------------------------
  708. # 内联方法:_execute_timeliness(TC-C02-API-003 验收)
  709. # ----------------------------------------------------------
  710. async def _execute_timeliness(
  711. self, request: DebugExecuteRequest, event_queue: asyncio.Queue,
  712. ) -> None:
  713. """
  714. 时效性审查快捷入口(通过 AIReviewEngine 封装)。
  715. TC-C02-API-003 要求此方法存在,此处委托给标准 execute 流程。
  716. """
  717. await self.execute(request, event_queue)
  718. # ----------------------------------------------------------
  719. # 辅助方法
  720. # ----------------------------------------------------------
  721. @staticmethod
  722. def _process_review_result(result: Any) -> dict:
  723. """将 review 返回结果规范化为字典"""
  724. if result is None:
  725. return {"success": False, "details": {}, "error": "empty result"}
  726. if isinstance(result, dict):
  727. return result
  728. if hasattr(result, "success"):
  729. return {
  730. "success": getattr(result, "success", False),
  731. "details": getattr(result, "details", {}),
  732. "error_message": getattr(result, "error_message", None),
  733. "execution_time": getattr(result, "execution_time", None),
  734. }
  735. return {"success": True, "details": {"response": str(result)}}
  736. @staticmethod
  737. async def _emit_step_result(
  738. event_queue: asyncio.Queue,
  739. task_id: str,
  740. sr: StepResult,
  741. ) -> None:
  742. """推送 step_result 事件到队列"""
  743. await event_queue.put(("step_result", {
  744. "task_id": task_id,
  745. "step_index": sr.index,
  746. "step_name": sr.name,
  747. "status": sr.status,
  748. "duration": sr.duration,
  749. "input": sr.input_summary,
  750. "output": sr.output_summary,
  751. "error": sr.error,
  752. }))
  753. @staticmethod
  754. def _build_final_result(
  755. step_results: List[StepResult],
  756. ) -> dict:
  757. """构建 completed 事件的 final_result"""
  758. total = len(step_results)
  759. succ = sum(1 for s in step_results if s.status == "success")
  760. err = sum(1 for s in step_results if s.status == "error")
  761. skip = sum(1 for s in step_results if s.status == "skipped")
  762. return {
  763. "summary": (
  764. f"{succ}/{total} 步骤成功, "
  765. f"{err} 错误, {skip} 跳过"
  766. ),
  767. "success_count": succ,
  768. "error_count": err,
  769. "skipped_count": skip,
  770. "total_steps": total,
  771. }
  772. @staticmethod
  773. def _save_record(
  774. request: DebugExecuteRequest,
  775. record_id: str,
  776. display_id: str,
  777. chain_cfg: dict,
  778. step_results: List[StepResult],
  779. total_duration: float,
  780. ) -> None:
  781. """持久化调用记录到 temp/debug_console/call_records/"""
  782. records_dir = "temp/debug_console/call_records"
  783. os.makedirs(records_dir, exist_ok=True)
  784. step_records = []
  785. for s in step_results:
  786. step_records.append({
  787. "index": s.index,
  788. "name": s.name,
  789. "phase": s.phase,
  790. "status": s.status,
  791. "duration_ms": int(s.duration * 1000),
  792. "input": s.input_summary,
  793. "output": s.output_summary,
  794. })
  795. record = {
  796. "id": display_id,
  797. "time": datetime.now().isoformat(),
  798. "chain": request.chain_id,
  799. "chain_name": chain_cfg.get("name", ""),
  800. "doc_ref": "",
  801. "status": "succ",
  802. "duration_ms": int(total_duration * 1000),
  803. "model": request.model or "default",
  804. "prompt_ver": "",
  805. "prompt_name": chain_cfg.get("prompt_name", ""),
  806. "tokens": 0,
  807. "params": {
  808. "review_content": request.content,
  809. "review_references": request.reference or "",
  810. "model_override": request.model,
  811. "function_name": request.function_name,
  812. "timeout": request.timeout,
  813. },
  814. "execution_params": {
  815. "isolation_mode": request.isolation_mode,
  816. "isolation_steps": list(request.isolation_steps),
  817. "rag_params": (
  818. request.rag_params.model_dump()
  819. if request.rag_params else None
  820. ),
  821. },
  822. "steps": step_records,
  823. "result": "",
  824. "error_message": None,
  825. }
  826. filepath = os.path.join(records_dir, f"{display_id}.json")
  827. with open(filepath, "w", encoding="utf-8") as f:
  828. json.dump(record, f, ensure_ascii=False, indent=2)