""" 环节隔离执行器 提供 IsolationRunner,支持环节隔离模式下的步骤调度。 配合 StepDispatcher 和 DebugExecutor 实现: - 仅执行选中的步骤,其余跳过 - 手动输入注入(manual_inputs) - 自动前向传播(auto-forward):当某步有 manual_input 时,后续步骤自动执行 - 依赖检查:requires_previous 不满足时跳过 用法: runner = IsolationRunner() results = await runner.run_selected_steps( chain_id="completeness", selected_indices=[1], manual_inputs={"1": "请审查以下内容:..."}, content="施工方案内容...", ) """ import asyncio import time import uuid from datetime import datetime from typing import Any, Dict, List, Optional from core.debug.step_dispatcher import ( CHAIN_STEPS, StepDefinition, StepDispatcher, _STEP_DEPS, ) from core.construction_review.component.reviewers.base_reviewer import BaseReviewer from foundation.ai.agent.generate.model_generate import generate_model_client from foundation.observability.logger.loggering import review_logger as logger # ============================================================ # StepResult 数据类(轻量版,兼容 executor.StepResult) # ============================================================ class StepResult: """单步执行结果""" __slots__ = ("index", "name", "phase", "status", "duration", "input_summary", "output_summary", "error") def __init__(self, index: int, name: str, status: str = "pending", phase: str = None, duration: float = 0, input_summary: dict = None, output_summary: dict = None, error: str = None): self.index = index self.name = name self.phase = phase self.status = status self.duration = duration self.input_summary = input_summary or {} self.output_summary = output_summary or {} self.error = error def to_dict(self) -> dict: """转为字典""" return { "index": self.index, "name": self.name, "phase": self.phase, "status": self.status, "duration": self.duration, "input": self.input_summary, "output": self.output_summary, "error": self.error, } # ============================================================ # 工具函数 # ============================================================ def _make_skipped(step: StepDefinition) -> StepResult: """创建一个跳过结果""" return StepResult( index=step.index, name=step.name, phase=step.phase, status="skipped", duration=0, ) def _make_trace_id(chain_id: str) -> str: """生成 trace_id(与 executor.py 一致)""" return ( f"debug_{chain_id}_" f"{datetime.now().strftime('%H%M%S')}_" f"{uuid.uuid4().hex[:8]}" ) def _build_step_dict(step_def: StepDefinition) -> dict: """StepDefinition → dict(兼容 executor.py 的 step_def 格式)""" d = {"index": step_def.index, "name": step_def.name} if step_def.phase: d["phase"] = step_def.phase return d def _find_result(results: List[StepResult], step_index: int) -> Optional[StepResult]: """从结果列表中按 step_index 查找""" for r in results: if r.index == step_index: return r return None # ============================================================ # IsolationRunner # ============================================================ class IsolationRunner: """环节隔离执行器""" def __init__(self): self._executor = None self.base_reviewer = BaseReviewer() # ---------------------------------------------------------- # 公共方法 # ---------------------------------------------------------- async def run_isolated_step( self, chain_id: str, step_index: int, manual_inputs: Dict[str, str] = None, **params, ) -> dict: """ 运行单个隔离步骤。 如果步骤 requires_previous=False → 直接执行。 如果 requires_previous=True 且无 manual_inputs → 返回错误。 Args: chain_id: 链路标识。 step_index: 步骤索引。 manual_inputs: 手动输入,key=str(step_index), value=输入内容。 **params: 执行参数(content, reference, model 等)。 Returns: dict: 步骤执行结果(兼容 StepResult.to_dict)。 """ manual_inputs = manual_inputs or {} step_defs = StepDispatcher.get_steps(chain_id) if step_index < 0 or step_index >= len(step_defs): return StepResult( index=step_index, name=f"未知步骤 #{step_index}", status="error", duration=0, error=f"step_index {step_index} 超出范围({chain_id} 共 {len(step_defs)} 步)", ).to_dict() step_def = step_defs[step_index] has_manual = str(step_index) in manual_inputs # 检查依赖 if step_def.requires_previous and not has_manual: return StepResult( index=step_index, name=step_def.name, phase=step_def.phase, status="error", duration=0, error="此步骤需要前一步的输出,请提供 manual_inputs", ).to_dict() # 执行步骤 if has_manual: result = await self._execute_step_with_manual( chain_id, step_def, manual_inputs, **params, ) else: result = await self._do_execute_step( chain_id, step_def, [], **params, ) return result.to_dict() async def run_selected_steps( self, chain_id: str, selected_indices: List[int], manual_inputs: Dict[str, str] = None, **params, ) -> List[dict]: """ 在环节隔离模式下执行选中的步骤。 核心逻辑: 1. 仅 `selected_indices` 中的步骤被允许执行 2. 若某选中步骤有 `manual_inputs` → 触发自动前向传播, 后续所有步骤(无论是否在选中列表中)也一并执行 3. 步骤检查 requires_previous,若前一步非 success → 跳过 4. 超出范围的索引被自动过滤(记录 warning) Args: chain_id: 链路标识。 selected_indices: 选中的步骤索引列表。 manual_inputs: 手动输入映射 {str(index): str(value)}。 **params: 执行参数(content, reference, model 等)。 Returns: List[dict]: 步骤执行结果列表(全量步骤,含 skipped)。 """ manual_inputs = manual_inputs or {} step_defs = StepDispatcher.get_steps(chain_id) deps = StepDispatcher.get_step_deps(chain_id) # ---- Step 1: 构建有效执行集 ---- effective_run = self._build_effective_set( step_defs, selected_indices, manual_inputs, ) # ---- Step 2: 顺序执行步骤 ---- executor = await self._get_executor() chain_cfg = executor.CHAIN_CONFIG.get(chain_id, {}) trace_id = _make_trace_id(chain_id) previous_results: List[StepResult] = [] results: List[StepResult] = [] last_success_step = -1 # 最近成功执行的步骤索引 for step_def in step_defs: si = step_def.index if si not in effective_run: # 不在有效执行集中 → 跳过 sr = _make_skipped(step_def) results.append(sr) previous_results.append(sr) continue has_manual = str(si) in manual_inputs # 检查依赖:依赖的步骤是否成功? dep_idx = deps.get(si) dep_satisfied = True if dep_idx is not None and not has_manual: dep_result = _find_result(results, dep_idx) if dep_result and dep_result.status not in ("success",): dep_satisfied = False if not dep_satisfied: sr = _make_skipped(step_def) results.append(sr) previous_results.append(sr) continue # 执行步骤 try: if has_manual: sr = await self._execute_step_with_manual( chain_id, step_def, manual_inputs, **params, ) else: # 构造兼容 executor 的 step_def dict step_dict = _build_step_dict(step_def) sr = await self._do_execute_step( chain_id, step_def, previous_results, **params, ) except Exception as exc: logger.error( f"[IsolationRunner] 步骤 {si} ({step_def.name}) 异常: {exc}", exc_info=True, ) sr = StepResult( index=si, name=step_def.name, phase=step_def.phase, status="error", duration=0, error=str(exc), ) results.append(sr) previous_results.append(sr) if sr.status == "success": last_success_step = si return [r.to_dict() for r in results] # ---------------------------------------------------------- # 执行集构建 # ---------------------------------------------------------- @staticmethod def _build_effective_set( step_defs: List[StepDefinition], selected_indices: List[int], manual_inputs: Dict[str, str], ) -> set: """ 构建有效执行步骤索引集合。 规则: - 仅包含 selected_indices 中的合法索引 - 若某选中步骤有 manual_inputs,自动加入该步骤之后的所有步骤 - 过滤超出范围的索引(记录 warning) """ manual_inputs = manual_inputs or {} total = len(step_defs) effective = set() for idx in selected_indices: if idx < 0 or idx >= total: logger.warning( f"[IsolationRunner] selected_indices 包含超出范围的索引 {idx}" f"(共 {total} 步),已自动过滤。" ) continue effective.add(idx) # 手动输入触发前向传播:加入该步骤之后的所有步骤 for idx_str in manual_inputs: try: idx = int(idx_str) except (ValueError, TypeError): continue if idx < 0 or idx >= total: continue # 该步骤本身已在 effective 中或不在 # 前向传播:加入后续所有步骤 if idx in effective: for later in range(idx + 1, total): effective.add(later) return effective # ---------------------------------------------------------- # 步骤执行 # ---------------------------------------------------------- async def _do_execute_step( self, chain_id: str, step_def: StepDefinition, previous_results: List[StepResult], **params, ) -> StepResult: """ 执行步骤(委托给 DebugExecutor 的对应方法)。 """ executor = await self._get_executor() chain_cfg = executor.CHAIN_CONFIG.get(chain_id, {}) step_dict = _build_step_dict(step_def) trace_id = _make_trace_id(chain_id) # 构造兼容 DebugExecuteRequest 的参数字典 request = _RequestProxy(chain_id=chain_id, **params) if chain_id == "professional": return await executor._execute_professional_step( request, chain_cfg, step_dict, previous_results, trace_id, ) else: return await executor._execute_direct_llm_step( request, chain_cfg, step_dict, previous_results, trace_id, ) async def _execute_step_with_manual( self, chain_id: str, step_def: StepDefinition, manual_inputs: Dict[str, str], **params, ) -> StepResult: """ 使用 manual_inputs 执行步骤。 根据步骤索引分别处理: - step 0: manual_input 作为已渲染的提示词内容 - step 1: manual_input 作为 LLM 调用的 prompt(调用 LLM) - step 2: manual_input 作为原始 JSON 响应(直接解析) - professional 各步暂以 manual_input 直接作为输出 """ si = step_def.index manual_text = manual_inputs.get(str(si), "") start = time.time() trace_id = _make_trace_id(chain_id) is_professional = chain_id == "professional" try: if is_professional: # Professional 步骤:manual_input 作为输出内容注入 return StepResult( index=si, name=step_def.name, phase=step_def.phase, status="success", duration=round(time.time() - start, 3), input_summary={"manual_input": True, "text_length": len(manual_text)}, output_summary={"manual_result": manual_text, "note": "手动输入注入"}, ) # ---- 直调 LLM 步骤 ---- if si == 0: # Step 0: Prompt 渲染 — manual_input 作为已渲染的提示词 return StepResult( index=0, name="Prompt 渲染", status="success", duration=0, input_summary={"manual_input": True}, output_summary={ "system_prompt": "(手动输入 - 无 system prompt)", "user_prompt": manual_text, "note": "手动输入注入,跳过模板渲染", }, ) elif si == 1: # Step 1: LLM 调用 — manual_input 作为 prompt,调用 LLM content = params.get("content", "") reference = params.get("reference", "") model = params.get("model") function_name = params.get("function_name") timeout = params.get("timeout", 60) # 用 manual_input 作为 user_prompt 构造调用 from langchain.prompts import ChatPromptTemplate prompt_template = ChatPromptTemplate.from_messages([ ("system", "你是一个专业的施工方案审查专家。"), ("human", "{manual_prompt}"), ]) invoke_kwargs: dict = { "trace_id": trace_id, "task_prompt_info": { "task_prompt": prompt_template, "task_name": f"{chain_id}_manual", }, "timeout": timeout, } if function_name: invoke_kwargs["function_name"] = function_name elif model: invoke_kwargs["model_name"] = model else: invoke_kwargs["function_name"] = f"{chain_id}_review_generate" # 渲染 manual prompt messages = prompt_template.format_messages(manual_prompt=manual_text) model_response = await generate_model_client.get_model_generate_invoke( **invoke_kwargs, ) return StepResult( index=1, name="LLM 调用", status="success", duration=round(time.time() - start, 3), input_summary={ "manual_prompt": manual_text, "model": model or "default", "timeout": timeout, }, output_summary={ "raw_response": model_response, "note": "使用手动输入的 prompt 调用 LLM", }, ) elif si == 2: # Step 2: 结果解析 — manual_input 作为原始 JSON 响应 executor = await self._get_executor() chain_cfg = executor.CHAIN_CONFIG.get(chain_id, {}) prompt_name = chain_cfg.get("prompt_name", chain_id) try: result = self.base_reviewer.format_result(manual_text, prompt_name) return StepResult( index=2, name="结果解析", status="success", duration=round(time.time() - start, 3), input_summary={ "raw_response": manual_text, "manual_input": True, }, output_summary={ "parsed_result": result.details, "note": "手动输入的 JSON 被解析为结构化审查结果", }, ) except Exception as parse_err: return StepResult( index=2, name="结果解析", status="error", duration=round(time.time() - start, 3), input_summary={"raw_response": manual_text}, error=f"手动输入解析失败: {parse_err}", ) except Exception as exc: logger.error( f"[IsolationRunner] manual step {si} 异常: {exc}", exc_info=True, ) return StepResult( index=si, name=step_def.name, phase=step_def.phase, status="error", duration=round(time.time() - start, 3), error=str(exc), ) # fallback return StepResult( index=si, name=step_def.name, status="error", duration=round(time.time() - start, 3), error=f"未知步骤索引: {si}", ) # ---------------------------------------------------------- # 辅助方法 # ---------------------------------------------------------- async def _get_executor(self): """懒加载 DebugExecutor 实例""" if self._executor is None: from core.debug.executor import DebugExecutor self._executor = DebugExecutor() return self._executor # ============================================================ # _RequestProxy — 兼容 DebugExecuteRequest 的轻量代理 # ============================================================ class _RequestProxy: """ 轻量属性代理,模拟 DebugExecuteRequest 对象。 使 IsolationRunner 可以传递 `**params` 给 DebugExecutor 的内部方法, 而无需构造完整的 Pydantic 模型。 """ def __init__(self, chain_id: str = "", **kwargs): self.chain_id = chain_id self.content = kwargs.get("content", "") self.reference = kwargs.get("reference", "") self.model = kwargs.get("model") self.function_name = kwargs.get("function_name") self.timeout = kwargs.get("timeout", 60) self.rag_params = kwargs.get("rag_params") self.isolation_mode = kwargs.get("isolation_mode", False) self.isolation_steps = kwargs.get("isolation_steps", []) self.manual_inputs = kwargs.get("manual_inputs", {}) self.prompt_version = kwargs.get("prompt_version")