isolation_runner.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. """
  2. 环节隔离执行器
  3. 提供 IsolationRunner,支持环节隔离模式下的步骤调度。
  4. 配合 StepDispatcher 和 DebugExecutor 实现:
  5. - 仅执行选中的步骤,其余跳过
  6. - 手动输入注入(manual_inputs)
  7. - 自动前向传播(auto-forward):当某步有 manual_input 时,后续步骤自动执行
  8. - 依赖检查:requires_previous 不满足时跳过
  9. 用法:
  10. runner = IsolationRunner()
  11. results = await runner.run_selected_steps(
  12. chain_id="completeness",
  13. selected_indices=[1],
  14. manual_inputs={"1": "请审查以下内容:..."},
  15. content="施工方案内容...",
  16. )
  17. """
  18. import asyncio
  19. import time
  20. import uuid
  21. from datetime import datetime
  22. from typing import Any, Dict, List, Optional
  23. from core.debug.step_dispatcher import (
  24. CHAIN_STEPS,
  25. StepDefinition,
  26. StepDispatcher,
  27. _STEP_DEPS,
  28. )
  29. from core.construction_review.component.reviewers.base_reviewer import BaseReviewer
  30. from foundation.ai.agent.generate.model_generate import generate_model_client
  31. from foundation.observability.logger.loggering import review_logger as logger
  32. # ============================================================
  33. # StepResult 数据类(轻量版,兼容 executor.StepResult)
  34. # ============================================================
  35. class StepResult:
  36. """单步执行结果"""
  37. __slots__ = ("index", "name", "phase", "status", "duration",
  38. "input_summary", "output_summary", "error")
  39. def __init__(self, index: int, name: str, status: str = "pending",
  40. phase: str = None, duration: float = 0,
  41. input_summary: dict = None, output_summary: dict = None,
  42. error: str = None):
  43. self.index = index
  44. self.name = name
  45. self.phase = phase
  46. self.status = status
  47. self.duration = duration
  48. self.input_summary = input_summary or {}
  49. self.output_summary = output_summary or {}
  50. self.error = error
  51. def to_dict(self) -> dict:
  52. """转为字典"""
  53. return {
  54. "index": self.index,
  55. "name": self.name,
  56. "phase": self.phase,
  57. "status": self.status,
  58. "duration": self.duration,
  59. "input": self.input_summary,
  60. "output": self.output_summary,
  61. "error": self.error,
  62. }
  63. # ============================================================
  64. # 工具函数
  65. # ============================================================
  66. def _make_skipped(step: StepDefinition) -> StepResult:
  67. """创建一个跳过结果"""
  68. return StepResult(
  69. index=step.index,
  70. name=step.name,
  71. phase=step.phase,
  72. status="skipped",
  73. duration=0,
  74. )
  75. def _make_trace_id(chain_id: str) -> str:
  76. """生成 trace_id(与 executor.py 一致)"""
  77. return (
  78. f"debug_{chain_id}_"
  79. f"{datetime.now().strftime('%H%M%S')}_"
  80. f"{uuid.uuid4().hex[:8]}"
  81. )
  82. def _build_step_dict(step_def: StepDefinition) -> dict:
  83. """StepDefinition → dict(兼容 executor.py 的 step_def 格式)"""
  84. d = {"index": step_def.index, "name": step_def.name}
  85. if step_def.phase:
  86. d["phase"] = step_def.phase
  87. return d
  88. def _find_result(results: List[StepResult], step_index: int) -> Optional[StepResult]:
  89. """从结果列表中按 step_index 查找"""
  90. for r in results:
  91. if r.index == step_index:
  92. return r
  93. return None
  94. # ============================================================
  95. # IsolationRunner
  96. # ============================================================
  97. class IsolationRunner:
  98. """环节隔离执行器"""
  99. def __init__(self):
  100. self._executor = None
  101. self.base_reviewer = BaseReviewer()
  102. # ----------------------------------------------------------
  103. # 公共方法
  104. # ----------------------------------------------------------
  105. async def run_isolated_step(
  106. self,
  107. chain_id: str,
  108. step_index: int,
  109. manual_inputs: Dict[str, str] = None,
  110. **params,
  111. ) -> dict:
  112. """
  113. 运行单个隔离步骤。
  114. 如果步骤 requires_previous=False → 直接执行。
  115. 如果 requires_previous=True 且无 manual_inputs → 返回错误。
  116. Args:
  117. chain_id: 链路标识。
  118. step_index: 步骤索引。
  119. manual_inputs: 手动输入,key=str(step_index), value=输入内容。
  120. **params: 执行参数(content, reference, model 等)。
  121. Returns:
  122. dict: 步骤执行结果(兼容 StepResult.to_dict)。
  123. """
  124. manual_inputs = manual_inputs or {}
  125. step_defs = StepDispatcher.get_steps(chain_id)
  126. if step_index < 0 or step_index >= len(step_defs):
  127. return StepResult(
  128. index=step_index,
  129. name=f"未知步骤 #{step_index}",
  130. status="error",
  131. duration=0,
  132. error=f"step_index {step_index} 超出范围({chain_id} 共 {len(step_defs)} 步)",
  133. ).to_dict()
  134. step_def = step_defs[step_index]
  135. has_manual = str(step_index) in manual_inputs
  136. # 检查依赖
  137. if step_def.requires_previous and not has_manual:
  138. return StepResult(
  139. index=step_index,
  140. name=step_def.name,
  141. phase=step_def.phase,
  142. status="error",
  143. duration=0,
  144. error="此步骤需要前一步的输出,请提供 manual_inputs",
  145. ).to_dict()
  146. # 执行步骤
  147. if has_manual:
  148. result = await self._execute_step_with_manual(
  149. chain_id, step_def, manual_inputs, **params,
  150. )
  151. else:
  152. result = await self._do_execute_step(
  153. chain_id, step_def, [], **params,
  154. )
  155. return result.to_dict()
  156. async def run_selected_steps(
  157. self,
  158. chain_id: str,
  159. selected_indices: List[int],
  160. manual_inputs: Dict[str, str] = None,
  161. **params,
  162. ) -> List[dict]:
  163. """
  164. 在环节隔离模式下执行选中的步骤。
  165. 核心逻辑:
  166. 1. 仅 `selected_indices` 中的步骤被允许执行
  167. 2. 若某选中步骤有 `manual_inputs` → 触发自动前向传播,
  168. 后续所有步骤(无论是否在选中列表中)也一并执行
  169. 3. 步骤检查 requires_previous,若前一步非 success → 跳过
  170. 4. 超出范围的索引被自动过滤(记录 warning)
  171. Args:
  172. chain_id: 链路标识。
  173. selected_indices: 选中的步骤索引列表。
  174. manual_inputs: 手动输入映射 {str(index): str(value)}。
  175. **params: 执行参数(content, reference, model 等)。
  176. Returns:
  177. List[dict]: 步骤执行结果列表(全量步骤,含 skipped)。
  178. """
  179. manual_inputs = manual_inputs or {}
  180. step_defs = StepDispatcher.get_steps(chain_id)
  181. deps = StepDispatcher.get_step_deps(chain_id)
  182. # ---- Step 1: 构建有效执行集 ----
  183. effective_run = self._build_effective_set(
  184. step_defs, selected_indices, manual_inputs,
  185. )
  186. # ---- Step 2: 顺序执行步骤 ----
  187. executor = await self._get_executor()
  188. chain_cfg = executor.CHAIN_CONFIG.get(chain_id, {})
  189. trace_id = _make_trace_id(chain_id)
  190. previous_results: List[StepResult] = []
  191. results: List[StepResult] = []
  192. last_success_step = -1 # 最近成功执行的步骤索引
  193. for step_def in step_defs:
  194. si = step_def.index
  195. if si not in effective_run:
  196. # 不在有效执行集中 → 跳过
  197. sr = _make_skipped(step_def)
  198. results.append(sr)
  199. previous_results.append(sr)
  200. continue
  201. has_manual = str(si) in manual_inputs
  202. # 检查依赖:依赖的步骤是否成功?
  203. dep_idx = deps.get(si)
  204. dep_satisfied = True
  205. if dep_idx is not None and not has_manual:
  206. dep_result = _find_result(results, dep_idx)
  207. if dep_result and dep_result.status not in ("success",):
  208. dep_satisfied = False
  209. if not dep_satisfied:
  210. sr = _make_skipped(step_def)
  211. results.append(sr)
  212. previous_results.append(sr)
  213. continue
  214. # 执行步骤
  215. try:
  216. if has_manual:
  217. sr = await self._execute_step_with_manual(
  218. chain_id, step_def, manual_inputs, **params,
  219. )
  220. else:
  221. # 构造兼容 executor 的 step_def dict
  222. step_dict = _build_step_dict(step_def)
  223. sr = await self._do_execute_step(
  224. chain_id, step_def, previous_results, **params,
  225. )
  226. except Exception as exc:
  227. logger.error(
  228. f"[IsolationRunner] 步骤 {si} ({step_def.name}) 异常: {exc}",
  229. exc_info=True,
  230. )
  231. sr = StepResult(
  232. index=si, name=step_def.name,
  233. phase=step_def.phase,
  234. status="error", duration=0,
  235. error=str(exc),
  236. )
  237. results.append(sr)
  238. previous_results.append(sr)
  239. if sr.status == "success":
  240. last_success_step = si
  241. return [r.to_dict() for r in results]
  242. # ----------------------------------------------------------
  243. # 执行集构建
  244. # ----------------------------------------------------------
  245. @staticmethod
  246. def _build_effective_set(
  247. step_defs: List[StepDefinition],
  248. selected_indices: List[int],
  249. manual_inputs: Dict[str, str],
  250. ) -> set:
  251. """
  252. 构建有效执行步骤索引集合。
  253. 规则:
  254. - 仅包含 selected_indices 中的合法索引
  255. - 若某选中步骤有 manual_inputs,自动加入该步骤之后的所有步骤
  256. - 过滤超出范围的索引(记录 warning)
  257. """
  258. manual_inputs = manual_inputs or {}
  259. total = len(step_defs)
  260. effective = set()
  261. for idx in selected_indices:
  262. if idx < 0 or idx >= total:
  263. logger.warning(
  264. f"[IsolationRunner] selected_indices 包含超出范围的索引 {idx}"
  265. f"(共 {total} 步),已自动过滤。"
  266. )
  267. continue
  268. effective.add(idx)
  269. # 手动输入触发前向传播:加入该步骤之后的所有步骤
  270. for idx_str in manual_inputs:
  271. try:
  272. idx = int(idx_str)
  273. except (ValueError, TypeError):
  274. continue
  275. if idx < 0 or idx >= total:
  276. continue
  277. # 该步骤本身已在 effective 中或不在
  278. # 前向传播:加入后续所有步骤
  279. if idx in effective:
  280. for later in range(idx + 1, total):
  281. effective.add(later)
  282. return effective
  283. # ----------------------------------------------------------
  284. # 步骤执行
  285. # ----------------------------------------------------------
  286. async def _do_execute_step(
  287. self,
  288. chain_id: str,
  289. step_def: StepDefinition,
  290. previous_results: List[StepResult],
  291. **params,
  292. ) -> StepResult:
  293. """
  294. 执行步骤(委托给 DebugExecutor 的对应方法)。
  295. """
  296. executor = await self._get_executor()
  297. chain_cfg = executor.CHAIN_CONFIG.get(chain_id, {})
  298. step_dict = _build_step_dict(step_def)
  299. trace_id = _make_trace_id(chain_id)
  300. # 构造兼容 DebugExecuteRequest 的参数字典
  301. request = _RequestProxy(chain_id=chain_id, **params)
  302. if chain_id == "professional":
  303. return await executor._execute_professional_step(
  304. request, chain_cfg, step_dict, previous_results, trace_id,
  305. )
  306. else:
  307. return await executor._execute_direct_llm_step(
  308. request, chain_cfg, step_dict, previous_results, trace_id,
  309. )
  310. async def _execute_step_with_manual(
  311. self,
  312. chain_id: str,
  313. step_def: StepDefinition,
  314. manual_inputs: Dict[str, str],
  315. **params,
  316. ) -> StepResult:
  317. """
  318. 使用 manual_inputs 执行步骤。
  319. 根据步骤索引分别处理:
  320. - step 0: manual_input 作为已渲染的提示词内容
  321. - step 1: manual_input 作为 LLM 调用的 prompt(调用 LLM)
  322. - step 2: manual_input 作为原始 JSON 响应(直接解析)
  323. - professional 各步暂以 manual_input 直接作为输出
  324. """
  325. si = step_def.index
  326. manual_text = manual_inputs.get(str(si), "")
  327. start = time.time()
  328. trace_id = _make_trace_id(chain_id)
  329. is_professional = chain_id == "professional"
  330. try:
  331. if is_professional:
  332. # Professional 步骤:manual_input 作为输出内容注入
  333. return StepResult(
  334. index=si, name=step_def.name,
  335. phase=step_def.phase, status="success",
  336. duration=round(time.time() - start, 3),
  337. input_summary={"manual_input": True, "text_length": len(manual_text)},
  338. output_summary={"manual_result": manual_text, "note": "手动输入注入"},
  339. )
  340. # ---- 直调 LLM 步骤 ----
  341. if si == 0:
  342. # Step 0: Prompt 渲染 — manual_input 作为已渲染的提示词
  343. return StepResult(
  344. index=0, name="Prompt 渲染", status="success",
  345. duration=0,
  346. input_summary={"manual_input": True},
  347. output_summary={
  348. "system_prompt": "(手动输入 - 无 system prompt)",
  349. "user_prompt": manual_text,
  350. "note": "手动输入注入,跳过模板渲染",
  351. },
  352. )
  353. elif si == 1:
  354. # Step 1: LLM 调用 — manual_input 作为 prompt,调用 LLM
  355. content = params.get("content", "")
  356. reference = params.get("reference", "")
  357. model = params.get("model")
  358. function_name = params.get("function_name")
  359. timeout = params.get("timeout", 60)
  360. # 用 manual_input 作为 user_prompt 构造调用
  361. from langchain.prompts import ChatPromptTemplate
  362. prompt_template = ChatPromptTemplate.from_messages([
  363. ("system", "你是一个专业的施工方案审查专家。"),
  364. ("human", "{manual_prompt}"),
  365. ])
  366. invoke_kwargs: dict = {
  367. "trace_id": trace_id,
  368. "task_prompt_info": {
  369. "task_prompt": prompt_template,
  370. "task_name": f"{chain_id}_manual",
  371. },
  372. "timeout": timeout,
  373. }
  374. if function_name:
  375. invoke_kwargs["function_name"] = function_name
  376. elif model:
  377. invoke_kwargs["model_name"] = model
  378. else:
  379. invoke_kwargs["function_name"] = f"{chain_id}_review_generate"
  380. # 渲染 manual prompt
  381. messages = prompt_template.format_messages(manual_prompt=manual_text)
  382. model_response = await generate_model_client.get_model_generate_invoke(
  383. **invoke_kwargs,
  384. )
  385. return StepResult(
  386. index=1, name="LLM 调用", status="success",
  387. duration=round(time.time() - start, 3),
  388. input_summary={
  389. "manual_prompt": manual_text,
  390. "model": model or "default",
  391. "timeout": timeout,
  392. },
  393. output_summary={
  394. "raw_response": model_response,
  395. "note": "使用手动输入的 prompt 调用 LLM",
  396. },
  397. )
  398. elif si == 2:
  399. # Step 2: 结果解析 — manual_input 作为原始 JSON 响应
  400. executor = await self._get_executor()
  401. chain_cfg = executor.CHAIN_CONFIG.get(chain_id, {})
  402. prompt_name = chain_cfg.get("prompt_name", chain_id)
  403. try:
  404. result = self.base_reviewer.format_result(manual_text, prompt_name)
  405. return StepResult(
  406. index=2, name="结果解析", status="success",
  407. duration=round(time.time() - start, 3),
  408. input_summary={
  409. "raw_response": manual_text,
  410. "manual_input": True,
  411. },
  412. output_summary={
  413. "parsed_result": result.details,
  414. "note": "手动输入的 JSON 被解析为结构化审查结果",
  415. },
  416. )
  417. except Exception as parse_err:
  418. return StepResult(
  419. index=2, name="结果解析", status="error",
  420. duration=round(time.time() - start, 3),
  421. input_summary={"raw_response": manual_text},
  422. error=f"手动输入解析失败: {parse_err}",
  423. )
  424. except Exception as exc:
  425. logger.error(
  426. f"[IsolationRunner] manual step {si} 异常: {exc}", exc_info=True,
  427. )
  428. return StepResult(
  429. index=si, name=step_def.name,
  430. phase=step_def.phase, status="error",
  431. duration=round(time.time() - start, 3),
  432. error=str(exc),
  433. )
  434. # fallback
  435. return StepResult(
  436. index=si, name=step_def.name, status="error",
  437. duration=round(time.time() - start, 3),
  438. error=f"未知步骤索引: {si}",
  439. )
  440. # ----------------------------------------------------------
  441. # 辅助方法
  442. # ----------------------------------------------------------
  443. async def _get_executor(self):
  444. """懒加载 DebugExecutor 实例"""
  445. if self._executor is None:
  446. from core.debug.executor import DebugExecutor
  447. self._executor = DebugExecutor()
  448. return self._executor
  449. # ============================================================
  450. # _RequestProxy — 兼容 DebugExecuteRequest 的轻量代理
  451. # ============================================================
  452. class _RequestProxy:
  453. """
  454. 轻量属性代理,模拟 DebugExecuteRequest 对象。
  455. 使 IsolationRunner 可以传递 `**params` 给 DebugExecutor 的内部方法,
  456. 而无需构造完整的 Pydantic 模型。
  457. """
  458. def __init__(self, chain_id: str = "", **kwargs):
  459. self.chain_id = chain_id
  460. self.content = kwargs.get("content", "")
  461. self.reference = kwargs.get("reference", "")
  462. self.model = kwargs.get("model")
  463. self.function_name = kwargs.get("function_name")
  464. self.timeout = kwargs.get("timeout", 60)
  465. self.rag_params = kwargs.get("rag_params")
  466. self.isolation_mode = kwargs.get("isolation_mode", False)
  467. self.isolation_steps = kwargs.get("isolation_steps", [])
  468. self.manual_inputs = kwargs.get("manual_inputs", {})
  469. self.prompt_version = kwargs.get("prompt_version")