# Core 目录重复实现方法梳理报告 ## 📋 报告概述 **生成时间**: 2026-01-08 **分析范围**: `core/` 目录及其子目录 **目的**: 识别可复用但被重复实现的方法,提供重构建议 --- ## 🔍 一、Redis 连接初始化(重复度:高) ### 重复位置 1. **`core/base/progress_manager.py`** - `ProgressManager._init_redis()` 2. **`core/base/redis_duplicate_checker.py`** - `RedisDuplicateChecker.__init__()` ### 重复代码特征 ```python # 两个类中都有相似的 Redis 连接逻辑 redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost') redis_port = config_handler.get('redis', 'REDIS_PORT', '6379') redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '') redis_db = config_handler.get('redis', 'REDIS_DB', '0') if redis_password: redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}" else: redis_url = f"redis://{redis_host}:{redis_port}/{redis_db}" self.redis_client = redis.from_url(redis_url, decode_responses=True) self.redis_client.ping() ``` ### 重构建议 **方案**: 创建统一的 Redis 连接工厂类 ```python # 建议位置: foundation/infrastructure/cache/redis_factory.py class RedisConnectionFactory: """Redis 连接工厂 - 统一管理 Redis 连接""" @staticmethod def create_connection(decode_responses=True): """创建 Redis 连接""" redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost') redis_port = config_handler.get('redis', 'REDIS_PORT', '6379') redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '') redis_db = config_handler.get('redis', 'REDIS_DB', '0') if redis_password: redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}" else: redis_url = f"redis://{redis_host}:{redis_port}/{redis_db}" client = redis.from_url(redis_url, decode_responses=decode_responses) client.ping() return client ``` **影响范围**: 2 个文件需要修改 --- ## 🔍 二、进度更新与 SSE 推送(重复度:中) ### 重复位置 1. **`core/base/progress_manager.py`** - `update_stage_progress()` 2. **多个 reviewer 文件** - 审查完成后的进度推送逻辑 ### 重复代码特征 ```python # 在多个审查方法中重复出现 if state and state.get("progress_manager"): asyncio.create_task( state["progress_manager"].update_stage_progress( callback_task_id=state["callback_task_id"], stage_name=stage_name, current=None, status="processing", message=f"{name} 审查完成,耗时: {execution_time:.2f}s", issues=[review_result_data], event_type="processing" ) ) ``` ### 重构建议 **方案**: 创建审查结果推送装饰器或工具方法 ```python # 建议位置: core/construction_review/component/reviewers/utils/progress_helper.py class ProgressHelper: """进度推送辅助工具""" @staticmethod async def push_review_result(state: dict, stage_name: str, review_name: str, result: ReviewResult): """统一推送审查结果""" if not state or not state.get("progress_manager"): return review_result_data = { 'name': review_name, 'success': result.success, 'details': result.details, 'error_message': result.error_message, 'execution_time': result.execution_time, 'timestamp': time.time() } await state["progress_manager"].update_stage_progress( callback_task_id=state["callback_task_id"], stage_name=stage_name, current=None, status="processing", message=f"{review_name} 审查完成,耗时: {result.execution_time:.2f}s", issues=[review_result_data], event_type="processing" ) ``` **影响范围**: `ai_review_engine.py` 及多个 reviewer 文件 --- ## 🔍 三、审查结果处理(重复度:中) ### 重复位置 1. **`core/construction_review/component/ai_review_engine.py`** - `_process_review_result()` 2. **`core/construction_review/workflows/ai_review_workflow.py`** - 结果处理逻辑 ### 重复代码特征 ```python # 将 ReviewResult 对象转换为字典的逻辑重复 if isinstance(result, Exception): return {"error": str(result), "success": False} elif hasattr(result, '__dict__'): return { "success": result.success if hasattr(result, 'success') else False, "details": result.details if hasattr(result, 'details') else {}, "error_message": result.error_message if hasattr(result, 'error_message') else None, "execution_time": result.execution_time if hasattr(result, 'execution_time') else None } ``` ### 重构建议 **方案**: 在 `ReviewResult` 类中添加 `to_dict()` 方法 ```python # 建议位置: core/construction_review/component/reviewers/base_reviewer.py @dataclass class ReviewResult: """审查结果""" success: bool details: Dict[str, Any] error_message: Optional[str] execution_time: float def to_dict(self) -> Dict[str, Any]: """转换为字典格式""" return { "success": self.success, "details": self.details, "error_message": self.error_message, "execution_time": self.execution_time } @classmethod def from_exception(cls, e: Exception, execution_time: float = 0.0): """从异常创建失败结果""" return cls( success=False, details={"error": str(e)}, error_message=str(e), execution_time=execution_time ) ``` **影响范围**: `ai_review_engine.py`, `ai_review_workflow.py` --- ## 🔍 四、文档处理降级逻辑(重复度:中) ### 重复位置 1. **`core/construction_review/component/document_processor.py`** - `_fallback_pdf_processing()` 2. **`core/construction_review/component/document_processor.py`** - `_fallback_docx_processing()` ### 重复代码特征 ```python # PDF 和 DOCX 的降级处理逻辑高度相似 try: logger.info("使用基础处理模式") # ... 基础处理逻辑 return { 'document_type': file_type, 'total_chunks': len(chunks), 'chunks': chunks, # ... } except Exception as e: logger.error(f"基础处理失败: {str(e)}") raise ``` ### 重构建议 **方案**: 提取通用的降级处理模板方法 ```python # 建议位置: core/construction_review/component/document_processor.py class DocumentProcessor: async def _fallback_processing(self, file_path: str, file_type: str) -> Dict[str, Any]: """通用降级处理模板""" try: logger.info(f"使用基础{file_type.upper()}处理模式") # 根据文件类型选择加载器 if file_type == 'pdf': loader = PyPDFLoader(file_path) elif file_type == 'docx': loader = self._create_docx_loader(file_path) else: raise ValueError(f"不支持的文件类型: {file_type}") documents = loader.load() # 统一的文本分块逻辑 splits = self._split_documents(documents) return self._format_fallback_result(file_type, documents, splits) except Exception as e: logger.error(f"基础{file_type.upper()}处理失败: {str(e)}") raise ``` **影响范围**: `document_processor.py` --- ## 🔍 五、任务状态检查(重复度:低) ### 重复位置 1. **`core/base/redis_duplicate_checker.py`** - `is_valid_task_id()` 2. **`core/base/redis_duplicate_checker.py`** - `is_task_already_used()` ### 重复代码特征 ```python # 两个方法都遍历 Redis 键查找任务 if self.use_redis: keys = self.redis_client.keys("task:*") for key in keys: task_info = self.redis_client.get(key) if task_info: task_data = json.loads(task_info) if task_data.get("callback_task_id") == callback_task_id: # ... 不同的检查逻辑 ``` ### 重构建议 **方案**: 提取通用的任务查找方法 ```python # 建议位置: core/base/redis_duplicate_checker.py class RedisDuplicateChecker: def _find_task_by_callback_id(self, callback_task_id: str) -> Optional[Dict]: """根据 callback_task_id 查找任务数据""" if self.use_redis: keys = self.redis_client.keys("task:*") for key in keys: task_info = self.redis_client.get(key) if task_info: task_data = json.loads(task_info) if task_data.get("callback_task_id") == callback_task_id: return task_data else: for file_id, task_info in self.task_cache.items(): if task_info.get("callback_task_id") == callback_task_id: return task_info return None async def is_valid_task_id(self, callback_task_id: str) -> bool: """验证任务ID是否存在且未过期""" task_data = self._find_task_by_callback_id(callback_task_id) if not task_data: return False created_at = datetime.fromisoformat(task_data['created_at']) return datetime.now() - created_at < timedelta(hours=1) ``` **影响范围**: `redis_duplicate_checker.py` --- ## 🔍 六、审查引擎中的 trace_id 构造(重复度:低) ### 重复位置 多个审查方法中都有类似的 trace_id 构造逻辑: 1. `sensitive_word_check()` 2. `check_semantic_logic()` 3. `check_non_parameter_compliance()` 4. `check_parameter_compliance()` ### 重复代码特征 ```python # 每个方法都重复构造 trace_id reviewer_type = Stage.BASIC.value['reviewer_type'] prompt_name = Stage.BASIC.value['grammar'] trace_id = prompt_name + trace_id_idx ``` ### 重构建议 **方案**: 创建 trace_id 构造工具方法 ```python # 建议位置: core/construction_review/component/ai_review_engine.py class AIReviewEngine: def _build_trace_id(self, stage: Stage, check_type: str, trace_id_idx: str) -> str: """构造 trace_id""" reviewer_type = stage.value['reviewer_type'] prompt_name = stage.value[check_type] return f"{prompt_name}{trace_id_idx}" async def sensitive_word_check(self, trace_id_idx: str, ...): trace_id = self._build_trace_id(Stage.BASIC, 'grammar', trace_id_idx) # ... ``` **影响范围**: `ai_review_engine.py` --- ## 📊 七、重复度统计 | 重复类型 | 重复度 | 影响文件数 | 优先级 | |---------|--------|-----------|--------| | Redis 连接初始化 | 高 | 2 | ⭐⭐⭐ | | 进度更新与 SSE 推送 | 中 | 5+ | ⭐⭐⭐ | | 审查结果处理 | 中 | 2 | ⭐⭐ | | 文档处理降级逻辑 | 中 | 1 | ⭐⭐ | | 任务状态检查 | 低 | 1 | ⭐ | | trace_id 构造 | 低 | 1 | ⭐ | --- ## 🎯 八、重构优先级建议 ### 高优先级(建议立即重构) 1. **Redis 连接初始化** - 影响范围广,重复度高 2. **进度更新与 SSE 推送** - 代码分散,维护成本高 ### 中优先级(建议近期重构) 3. **审查结果处理** - 提升代码可读性 4. **文档处理降级逻辑** - 减少代码冗余 ### 低优先级(可选重构) 5. **任务状态检查** - 局部优化 6. **trace_id 构造** - 局部优化 --- ## 📝 九、重构注意事项 ### 1. 向后兼容性 - 保留旧接口,标记为 `@deprecated` - 提供迁移指南和示例代码 ### 2. 测试覆盖 - 为新的工具类编写单元测试 - 确保重构后功能不变 ### 3. 文档更新 - 更新 API 文档 - 添加使用示例 ### 4. 渐进式重构 - 先重构高优先级项 - 逐步迁移现有代码 - 避免一次性大规模改动 --- ## 🔧 十、建议的新增工具模块 ### 1. `foundation/infrastructure/cache/redis_factory.py` - Redis 连接工厂 - 连接池管理 ### 2. `core/construction_review/component/reviewers/utils/progress_helper.py` - 进度推送辅助工具 - SSE 消息格式化 ### 3. `core/construction_review/component/reviewers/utils/result_converter.py` - 审查结果转换工具 - 统一的结果格式化 --- ## 📌 十一、总结 通过本次梳理,发现 `core/` 目录中存在 **6 类重复实现的可复用方法**,主要集中在: 1. **基础设施层**:Redis 连接、进度管理 2. **业务逻辑层**:审查结果处理、文档处理 3. **工具方法层**:trace_id 构造、任务状态检查 建议优先重构 **Redis 连接初始化** 和 **进度更新与 SSE 推送** 两个高频重复模块,可显著提升代码质量和维护效率。 --- **报告生成者**: Kiro AI Assistant **最后更新**: 2026-01-08