| 12345678 |
- {
- "success": true,
- "content": "\"\"\"\n审查流水线实现\n\"\"\"\nimport asyncio\nfrom typing import Dict, List, Any\nimport sys\nfrom pathlib import Path\n\n# 添加项目根目录到路径,支持相对导入\n_root = Path(__file__).parent.parent\nif str(_root) not in sys.path:\n sys.path.insert(0, str(_root))\n\nfrom foundation.observability import logger\nfrom interfaces import IReviewPipeline, IPromptBuilder, ILLMClient, IResultProcessor, IKeywordChecker\n\n\nclass ReviewPipeline(IReviewPipeline):\n \"\"\"审查流水线\"\"\"\n \n def __init__(self, prompt_builder: IPromptBuilder,\n llm_client: ILLMClient,\n result_processor: IResultProcessor,\n max_concurrent: int = 20,\n keyword_checker: IKeywordChecker = None):\n \"\"\"\n 初始化审查流水线\n \n Args:\n prompt_builder: 提示词构建器\n llm_client: LLM客户端\n result_processor: 结果处理器\n max_concurrent: 最大并发数\n keyword_checker: 关键词检查器(可选)\n \"\"\"\n self.prompt_builder = prompt_builder\n self.llm_client = llm_client\n self.result_processor = result_processor\n self.max_concurrent = max_concurrent\n self.keyword_checker = keyword_checker\n \n async def review(self, documents: List[Dict[str, Any]], \n specification: Dict[str, List[Dict[str, str]]]) -> List[Dict[str, Any]]:\n \"\"\"\n 执行审查流程\n \n Args:\n documents: 文档块列表\n specification: 规范字典\n \n Returns:\n 审查结果列表,每个结果包含原始文档信息和审查结果\n \n \"\"\"\n\n\n\n # 创建信号量控制并发数\n semaphore = asyncio.Semaphore(self.max_concurrent)\n \n # 创建所有任务\n tasks = []\n for doc in documents:\n task = self._review_single_document(doc, specification, semaphore)\n tasks.append(task)\n\n # 并发执行所有任务\n results = await asyncio.gather(*tasks, return_exceptions=True)\n \n # 处理异常结果\n final_results = []\n for i, result in enumerate(results):\n if isinstance(result, Exception):\n # 如果出现异常,返回错误信息\n doc = documents[i]\n final_results.append({\n **doc,\n 'review_result': {\n 'error': str(result)\n }\n })\n else:\n final_results.append(result)\n \n return final_results\n \n async def _review_single_document(self, doc: Dict[str, Any], \n specification: Dict[str, List[Dict[str, str]]],\n semaphore: asyncio.Semaphore) -> Dict[str, Any]:\n \"\"\"\n 审查单个文档\n \n Args:\n doc: 文档块\n specification: 规范字典\n semaphore: 信号量\n \n Returns:\n 包含审查结果的文档字典\n \"\"\"\n async with semaphore:\n try:\n # 获取文档的分类标签\n chapter_classification = doc.get('chapter_classification', '')\n if not chapter_classification:\n return {\n **doc,\n 'review_result': {\n 'error': '缺少chapter_classification字段'\n }\n }\n \n # 获取对应的规范要求\n requirements = specification.get(chapter_classification, [])\n if not requirements:\n print(\"111111\")\n return {\n **doc,\n 'review_result': {\n 'error': f'未找到标签 {chapter_classification} 对应的规范要求'\n }\n }\n \n # 获取待审查内容\n content = doc.get('content', '')\n if not content:\n return {\n **doc,\n 'review_result': {\n 'error': '缺少content字段'\n }\n }\n \n # 构建提示词\n prompt = self.prompt_builder.build_prompt(content, requirements)\n\n # 调用LLM\n llm_response = await self.llm_client.call_llm(prompt)\n \n # 处理结果\n review_result = self.result_processor.parse_result(llm_response, requirements)\n \n # 关键词二次检查(如果启用)\n if self.keyword_checker:\n content = doc.get('content', '')\n review_result = self.keyword_checker.filter_missing_points(\n review_result, content, specification, chapter_classification\n )\n \n return {\n **doc,\n 'review_result': review_result\n }\n except Exception as e:\n return {\n **doc,\n 'review_result': {\n 'error': str(e)\n }\n }\n\n",
- "start_line": 1,
- "end_line": 161,
- "total_lines": 161,
- "has_more": false
- }
|