| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551 |
- """
- 环节隔离执行器
- 提供 IsolationRunner,支持环节隔离模式下的步骤调度。
- 配合 StepDispatcher 和 DebugExecutor 实现:
- - 仅执行选中的步骤,其余跳过
- - 手动输入注入(manual_inputs)
- - 自动前向传播(auto-forward):当某步有 manual_input 时,后续步骤自动执行
- - 依赖检查:requires_previous 不满足时跳过
- 用法:
- runner = IsolationRunner()
- results = await runner.run_selected_steps(
- chain_id="completeness",
- selected_indices=[1],
- manual_inputs={"1": "请审查以下内容:..."},
- content="施工方案内容...",
- )
- """
- import asyncio
- import time
- import uuid
- from datetime import datetime
- from typing import Any, Dict, List, Optional
- from core.debug.step_dispatcher import (
- CHAIN_STEPS,
- StepDefinition,
- StepDispatcher,
- _STEP_DEPS,
- )
- from core.construction_review.component.reviewers.base_reviewer import BaseReviewer
- from foundation.ai.agent.generate.model_generate import generate_model_client
- from foundation.observability.logger.loggering import review_logger as logger
- # ============================================================
- # StepResult 数据类(轻量版,兼容 executor.StepResult)
- # ============================================================
- class StepResult:
- """单步执行结果"""
- __slots__ = ("index", "name", "phase", "status", "duration",
- "input_summary", "output_summary", "error")
- def __init__(self, index: int, name: str, status: str = "pending",
- phase: str = None, duration: float = 0,
- input_summary: dict = None, output_summary: dict = None,
- error: str = None):
- self.index = index
- self.name = name
- self.phase = phase
- self.status = status
- self.duration = duration
- self.input_summary = input_summary or {}
- self.output_summary = output_summary or {}
- self.error = error
- def to_dict(self) -> dict:
- """转为字典"""
- return {
- "index": self.index,
- "name": self.name,
- "phase": self.phase,
- "status": self.status,
- "duration": self.duration,
- "input": self.input_summary,
- "output": self.output_summary,
- "error": self.error,
- }
- # ============================================================
- # 工具函数
- # ============================================================
- def _make_skipped(step: StepDefinition) -> StepResult:
- """创建一个跳过结果"""
- return StepResult(
- index=step.index,
- name=step.name,
- phase=step.phase,
- status="skipped",
- duration=0,
- )
- def _make_trace_id(chain_id: str) -> str:
- """生成 trace_id(与 executor.py 一致)"""
- return (
- f"debug_{chain_id}_"
- f"{datetime.now().strftime('%H%M%S')}_"
- f"{uuid.uuid4().hex[:8]}"
- )
- def _build_step_dict(step_def: StepDefinition) -> dict:
- """StepDefinition → dict(兼容 executor.py 的 step_def 格式)"""
- d = {"index": step_def.index, "name": step_def.name}
- if step_def.phase:
- d["phase"] = step_def.phase
- return d
- def _find_result(results: List[StepResult], step_index: int) -> Optional[StepResult]:
- """从结果列表中按 step_index 查找"""
- for r in results:
- if r.index == step_index:
- return r
- return None
- # ============================================================
- # IsolationRunner
- # ============================================================
- class IsolationRunner:
- """环节隔离执行器"""
- def __init__(self):
- self._executor = None
- self.base_reviewer = BaseReviewer()
- # ----------------------------------------------------------
- # 公共方法
- # ----------------------------------------------------------
- async def run_isolated_step(
- self,
- chain_id: str,
- step_index: int,
- manual_inputs: Dict[str, str] = None,
- **params,
- ) -> dict:
- """
- 运行单个隔离步骤。
- 如果步骤 requires_previous=False → 直接执行。
- 如果 requires_previous=True 且无 manual_inputs → 返回错误。
- Args:
- chain_id: 链路标识。
- step_index: 步骤索引。
- manual_inputs: 手动输入,key=str(step_index), value=输入内容。
- **params: 执行参数(content, reference, model 等)。
- Returns:
- dict: 步骤执行结果(兼容 StepResult.to_dict)。
- """
- manual_inputs = manual_inputs or {}
- step_defs = StepDispatcher.get_steps(chain_id)
- if step_index < 0 or step_index >= len(step_defs):
- return StepResult(
- index=step_index,
- name=f"未知步骤 #{step_index}",
- status="error",
- duration=0,
- error=f"step_index {step_index} 超出范围({chain_id} 共 {len(step_defs)} 步)",
- ).to_dict()
- step_def = step_defs[step_index]
- has_manual = str(step_index) in manual_inputs
- # 检查依赖
- if step_def.requires_previous and not has_manual:
- return StepResult(
- index=step_index,
- name=step_def.name,
- phase=step_def.phase,
- status="error",
- duration=0,
- error="此步骤需要前一步的输出,请提供 manual_inputs",
- ).to_dict()
- # 执行步骤
- if has_manual:
- result = await self._execute_step_with_manual(
- chain_id, step_def, manual_inputs, **params,
- )
- else:
- result = await self._do_execute_step(
- chain_id, step_def, [], **params,
- )
- return result.to_dict()
- async def run_selected_steps(
- self,
- chain_id: str,
- selected_indices: List[int],
- manual_inputs: Dict[str, str] = None,
- **params,
- ) -> List[dict]:
- """
- 在环节隔离模式下执行选中的步骤。
- 核心逻辑:
- 1. 仅 `selected_indices` 中的步骤被允许执行
- 2. 若某选中步骤有 `manual_inputs` → 触发自动前向传播,
- 后续所有步骤(无论是否在选中列表中)也一并执行
- 3. 步骤检查 requires_previous,若前一步非 success → 跳过
- 4. 超出范围的索引被自动过滤(记录 warning)
- Args:
- chain_id: 链路标识。
- selected_indices: 选中的步骤索引列表。
- manual_inputs: 手动输入映射 {str(index): str(value)}。
- **params: 执行参数(content, reference, model 等)。
- Returns:
- List[dict]: 步骤执行结果列表(全量步骤,含 skipped)。
- """
- manual_inputs = manual_inputs or {}
- step_defs = StepDispatcher.get_steps(chain_id)
- deps = StepDispatcher.get_step_deps(chain_id)
- # ---- Step 1: 构建有效执行集 ----
- effective_run = self._build_effective_set(
- step_defs, selected_indices, manual_inputs,
- )
- # ---- Step 2: 顺序执行步骤 ----
- executor = await self._get_executor()
- chain_cfg = executor.CHAIN_CONFIG.get(chain_id, {})
- trace_id = _make_trace_id(chain_id)
- previous_results: List[StepResult] = []
- results: List[StepResult] = []
- last_success_step = -1 # 最近成功执行的步骤索引
- for step_def in step_defs:
- si = step_def.index
- if si not in effective_run:
- # 不在有效执行集中 → 跳过
- sr = _make_skipped(step_def)
- results.append(sr)
- previous_results.append(sr)
- continue
- has_manual = str(si) in manual_inputs
- # 检查依赖:依赖的步骤是否成功?
- dep_idx = deps.get(si)
- dep_satisfied = True
- if dep_idx is not None and not has_manual:
- dep_result = _find_result(results, dep_idx)
- if dep_result and dep_result.status not in ("success",):
- dep_satisfied = False
- if not dep_satisfied:
- sr = _make_skipped(step_def)
- results.append(sr)
- previous_results.append(sr)
- continue
- # 执行步骤
- try:
- if has_manual:
- sr = await self._execute_step_with_manual(
- chain_id, step_def, manual_inputs, **params,
- )
- else:
- # 构造兼容 executor 的 step_def dict
- step_dict = _build_step_dict(step_def)
- sr = await self._do_execute_step(
- chain_id, step_def, previous_results, **params,
- )
- except Exception as exc:
- logger.error(
- f"[IsolationRunner] 步骤 {si} ({step_def.name}) 异常: {exc}",
- exc_info=True,
- )
- sr = StepResult(
- index=si, name=step_def.name,
- phase=step_def.phase,
- status="error", duration=0,
- error=str(exc),
- )
- results.append(sr)
- previous_results.append(sr)
- if sr.status == "success":
- last_success_step = si
- return [r.to_dict() for r in results]
- # ----------------------------------------------------------
- # 执行集构建
- # ----------------------------------------------------------
- @staticmethod
- def _build_effective_set(
- step_defs: List[StepDefinition],
- selected_indices: List[int],
- manual_inputs: Dict[str, str],
- ) -> set:
- """
- 构建有效执行步骤索引集合。
- 规则:
- - 仅包含 selected_indices 中的合法索引
- - 若某选中步骤有 manual_inputs,自动加入该步骤之后的所有步骤
- - 过滤超出范围的索引(记录 warning)
- """
- manual_inputs = manual_inputs or {}
- total = len(step_defs)
- effective = set()
- for idx in selected_indices:
- if idx < 0 or idx >= total:
- logger.warning(
- f"[IsolationRunner] selected_indices 包含超出范围的索引 {idx}"
- f"(共 {total} 步),已自动过滤。"
- )
- continue
- effective.add(idx)
- # 手动输入触发前向传播:加入该步骤之后的所有步骤
- for idx_str in manual_inputs:
- try:
- idx = int(idx_str)
- except (ValueError, TypeError):
- continue
- if idx < 0 or idx >= total:
- continue
- # 该步骤本身已在 effective 中或不在
- # 前向传播:加入后续所有步骤
- if idx in effective:
- for later in range(idx + 1, total):
- effective.add(later)
- return effective
- # ----------------------------------------------------------
- # 步骤执行
- # ----------------------------------------------------------
- async def _do_execute_step(
- self,
- chain_id: str,
- step_def: StepDefinition,
- previous_results: List[StepResult],
- **params,
- ) -> StepResult:
- """
- 执行步骤(委托给 DebugExecutor 的对应方法)。
- """
- executor = await self._get_executor()
- chain_cfg = executor.CHAIN_CONFIG.get(chain_id, {})
- step_dict = _build_step_dict(step_def)
- trace_id = _make_trace_id(chain_id)
- # 构造兼容 DebugExecuteRequest 的参数字典
- request = _RequestProxy(chain_id=chain_id, **params)
- if chain_id == "professional":
- return await executor._execute_professional_step(
- request, chain_cfg, step_dict, previous_results, trace_id,
- )
- else:
- return await executor._execute_direct_llm_step(
- request, chain_cfg, step_dict, previous_results, trace_id,
- )
- async def _execute_step_with_manual(
- self,
- chain_id: str,
- step_def: StepDefinition,
- manual_inputs: Dict[str, str],
- **params,
- ) -> StepResult:
- """
- 使用 manual_inputs 执行步骤。
- 根据步骤索引分别处理:
- - step 0: manual_input 作为已渲染的提示词内容
- - step 1: manual_input 作为 LLM 调用的 prompt(调用 LLM)
- - step 2: manual_input 作为原始 JSON 响应(直接解析)
- - professional 各步暂以 manual_input 直接作为输出
- """
- si = step_def.index
- manual_text = manual_inputs.get(str(si), "")
- start = time.time()
- trace_id = _make_trace_id(chain_id)
- is_professional = chain_id == "professional"
- try:
- if is_professional:
- # Professional 步骤:manual_input 作为输出内容注入
- return StepResult(
- index=si, name=step_def.name,
- phase=step_def.phase, status="success",
- duration=round(time.time() - start, 3),
- input_summary={"manual_input": True, "text_length": len(manual_text)},
- output_summary={"manual_result": manual_text, "note": "手动输入注入"},
- )
- # ---- 直调 LLM 步骤 ----
- if si == 0:
- # Step 0: Prompt 渲染 — manual_input 作为已渲染的提示词
- return StepResult(
- index=0, name="Prompt 渲染", status="success",
- duration=0,
- input_summary={"manual_input": True},
- output_summary={
- "system_prompt": "(手动输入 - 无 system prompt)",
- "user_prompt": manual_text,
- "note": "手动输入注入,跳过模板渲染",
- },
- )
- elif si == 1:
- # Step 1: LLM 调用 — manual_input 作为 prompt,调用 LLM
- content = params.get("content", "")
- reference = params.get("reference", "")
- model = params.get("model")
- function_name = params.get("function_name")
- timeout = params.get("timeout", 60)
- # 用 manual_input 作为 user_prompt 构造调用
- from langchain.prompts import ChatPromptTemplate
- prompt_template = ChatPromptTemplate.from_messages([
- ("system", "你是一个专业的施工方案审查专家。"),
- ("human", "{manual_prompt}"),
- ])
- invoke_kwargs: dict = {
- "trace_id": trace_id,
- "task_prompt_info": {
- "task_prompt": prompt_template,
- "task_name": f"{chain_id}_manual",
- },
- "timeout": timeout,
- }
- if function_name:
- invoke_kwargs["function_name"] = function_name
- elif model:
- invoke_kwargs["model_name"] = model
- else:
- invoke_kwargs["function_name"] = f"{chain_id}_review_generate"
- # 渲染 manual prompt
- messages = prompt_template.format_messages(manual_prompt=manual_text)
- model_response = await generate_model_client.get_model_generate_invoke(
- **invoke_kwargs,
- )
- return StepResult(
- index=1, name="LLM 调用", status="success",
- duration=round(time.time() - start, 3),
- input_summary={
- "manual_prompt": manual_text,
- "model": model or "default",
- "timeout": timeout,
- },
- output_summary={
- "raw_response": model_response,
- "note": "使用手动输入的 prompt 调用 LLM",
- },
- )
- elif si == 2:
- # Step 2: 结果解析 — manual_input 作为原始 JSON 响应
- executor = await self._get_executor()
- chain_cfg = executor.CHAIN_CONFIG.get(chain_id, {})
- prompt_name = chain_cfg.get("prompt_name", chain_id)
- try:
- result = self.base_reviewer.format_result(manual_text, prompt_name)
- return StepResult(
- index=2, name="结果解析", status="success",
- duration=round(time.time() - start, 3),
- input_summary={
- "raw_response": manual_text,
- "manual_input": True,
- },
- output_summary={
- "parsed_result": result.details,
- "note": "手动输入的 JSON 被解析为结构化审查结果",
- },
- )
- except Exception as parse_err:
- return StepResult(
- index=2, name="结果解析", status="error",
- duration=round(time.time() - start, 3),
- input_summary={"raw_response": manual_text},
- error=f"手动输入解析失败: {parse_err}",
- )
- except Exception as exc:
- logger.error(
- f"[IsolationRunner] manual step {si} 异常: {exc}", exc_info=True,
- )
- return StepResult(
- index=si, name=step_def.name,
- phase=step_def.phase, status="error",
- duration=round(time.time() - start, 3),
- error=str(exc),
- )
- # fallback
- return StepResult(
- index=si, name=step_def.name, status="error",
- duration=round(time.time() - start, 3),
- error=f"未知步骤索引: {si}",
- )
- # ----------------------------------------------------------
- # 辅助方法
- # ----------------------------------------------------------
- async def _get_executor(self):
- """懒加载 DebugExecutor 实例"""
- if self._executor is None:
- from core.debug.executor import DebugExecutor
- self._executor = DebugExecutor()
- return self._executor
- # ============================================================
- # _RequestProxy — 兼容 DebugExecuteRequest 的轻量代理
- # ============================================================
- class _RequestProxy:
- """
- 轻量属性代理,模拟 DebugExecuteRequest 对象。
- 使 IsolationRunner 可以传递 `**params` 给 DebugExecutor 的内部方法,
- 而无需构造完整的 Pydantic 模型。
- """
- def __init__(self, chain_id: str = "", **kwargs):
- self.chain_id = chain_id
- self.content = kwargs.get("content", "")
- self.reference = kwargs.get("reference", "")
- self.model = kwargs.get("model")
- self.function_name = kwargs.get("function_name")
- self.timeout = kwargs.get("timeout", 60)
- self.rag_params = kwargs.get("rag_params")
- self.isolation_mode = kwargs.get("isolation_mode", False)
- self.isolation_steps = kwargs.get("isolation_steps", [])
- self.manual_inputs = kwargs.get("manual_inputs", {})
- self.prompt_version = kwargs.get("prompt_version")
|