core目录重复实现方法梳理报告.md 13 KB

Core 目录重复实现方法梳理报告

📋 报告概述

生成时间: 2026-01-08
分析范围: core/ 目录及其子目录
目的: 识别可复用但被重复实现的方法,提供重构建议


🔍 一、Redis 连接初始化(重复度:高)

重复位置

  1. core/base/progress_manager.py - ProgressManager._init_redis()
  2. core/base/redis_duplicate_checker.py - RedisDuplicateChecker.__init__()

重复代码特征

# 两个类中都有相似的 Redis 连接逻辑
redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
redis_db = config_handler.get('redis', 'REDIS_DB', '0')

if redis_password:
    redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}"
else:
    redis_url = f"redis://{redis_host}:{redis_port}/{redis_db}"

self.redis_client = redis.from_url(redis_url, decode_responses=True)
self.redis_client.ping()

重构建议

方案: 创建统一的 Redis 连接工厂类

# 建议位置: foundation/infrastructure/cache/redis_factory.py
class RedisConnectionFactory:
    """Redis 连接工厂 - 统一管理 Redis 连接"""
    
    @staticmethod
    def create_connection(decode_responses=True):
        """创建 Redis 连接"""
        redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
        redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
        redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
        redis_db = config_handler.get('redis', 'REDIS_DB', '0')
        
        if redis_password:
            redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}"
        else:
            redis_url = f"redis://{redis_host}:{redis_port}/{redis_db}"
        
        client = redis.from_url(redis_url, decode_responses=decode_responses)
        client.ping()
        return client

影响范围: 2 个文件需要修改


🔍 二、进度更新与 SSE 推送(重复度:中)

重复位置

  1. core/base/progress_manager.py - update_stage_progress()
  2. 多个 reviewer 文件 - 审查完成后的进度推送逻辑

重复代码特征

# 在多个审查方法中重复出现
if state and state.get("progress_manager"):
    asyncio.create_task(
        state["progress_manager"].update_stage_progress(
            callback_task_id=state["callback_task_id"],
            stage_name=stage_name,
            current=None,
            status="processing",
            message=f"{name} 审查完成,耗时: {execution_time:.2f}s",
            issues=[review_result_data],
            event_type="processing"
        )
    )

重构建议

方案: 创建审查结果推送装饰器或工具方法

# 建议位置: core/construction_review/component/reviewers/utils/progress_helper.py
class ProgressHelper:
    """进度推送辅助工具"""
    
    @staticmethod
    async def push_review_result(state: dict, stage_name: str, 
                                 review_name: str, result: ReviewResult):
        """统一推送审查结果"""
        if not state or not state.get("progress_manager"):
            return
        
        review_result_data = {
            'name': review_name,
            'success': result.success,
            'details': result.details,
            'error_message': result.error_message,
            'execution_time': result.execution_time,
            'timestamp': time.time()
        }
        
        await state["progress_manager"].update_stage_progress(
            callback_task_id=state["callback_task_id"],
            stage_name=stage_name,
            current=None,
            status="processing",
            message=f"{review_name} 审查完成,耗时: {result.execution_time:.2f}s",
            issues=[review_result_data],
            event_type="processing"
        )

影响范围: ai_review_engine.py 及多个 reviewer 文件


🔍 三、审查结果处理(重复度:中)

重复位置

  1. core/construction_review/component/ai_review_engine.py - _process_review_result()
  2. core/construction_review/workflows/ai_review_workflow.py - 结果处理逻辑

重复代码特征

# 将 ReviewResult 对象转换为字典的逻辑重复
if isinstance(result, Exception):
    return {"error": str(result), "success": False}
elif hasattr(result, '__dict__'):
    return {
        "success": result.success if hasattr(result, 'success') else False,
        "details": result.details if hasattr(result, 'details') else {},
        "error_message": result.error_message if hasattr(result, 'error_message') else None,
        "execution_time": result.execution_time if hasattr(result, 'execution_time') else None
    }

重构建议

方案: 在 ReviewResult 类中添加 to_dict() 方法

# 建议位置: core/construction_review/component/reviewers/base_reviewer.py
@dataclass
class ReviewResult:
    """审查结果"""
    success: bool
    details: Dict[str, Any]
    error_message: Optional[str]
    execution_time: float
    
    def to_dict(self) -> Dict[str, Any]:
        """转换为字典格式"""
        return {
            "success": self.success,
            "details": self.details,
            "error_message": self.error_message,
            "execution_time": self.execution_time
        }
    
    @classmethod
    def from_exception(cls, e: Exception, execution_time: float = 0.0):
        """从异常创建失败结果"""
        return cls(
            success=False,
            details={"error": str(e)},
            error_message=str(e),
            execution_time=execution_time
        )

影响范围: ai_review_engine.py, ai_review_workflow.py


🔍 四、文档处理降级逻辑(重复度:中)

重复位置

  1. core/construction_review/component/document_processor.py - _fallback_pdf_processing()
  2. core/construction_review/component/document_processor.py - _fallback_docx_processing()

重复代码特征

# PDF 和 DOCX 的降级处理逻辑高度相似
try:
    logger.info("使用基础处理模式")
    # ... 基础处理逻辑
    return {
        'document_type': file_type,
        'total_chunks': len(chunks),
        'chunks': chunks,
        # ...
    }
except Exception as e:
    logger.error(f"基础处理失败: {str(e)}")
    raise

重构建议

方案: 提取通用的降级处理模板方法

# 建议位置: core/construction_review/component/document_processor.py
class DocumentProcessor:
    
    async def _fallback_processing(self, file_path: str, file_type: str) -> Dict[str, Any]:
        """通用降级处理模板"""
        try:
            logger.info(f"使用基础{file_type.upper()}处理模式")
            
            # 根据文件类型选择加载器
            if file_type == 'pdf':
                loader = PyPDFLoader(file_path)
            elif file_type == 'docx':
                loader = self._create_docx_loader(file_path)
            else:
                raise ValueError(f"不支持的文件类型: {file_type}")
            
            documents = loader.load()
            
            # 统一的文本分块逻辑
            splits = self._split_documents(documents)
            
            return self._format_fallback_result(file_type, documents, splits)
            
        except Exception as e:
            logger.error(f"基础{file_type.upper()}处理失败: {str(e)}")
            raise

影响范围: document_processor.py


🔍 五、任务状态检查(重复度:低)

重复位置

  1. core/base/redis_duplicate_checker.py - is_valid_task_id()
  2. core/base/redis_duplicate_checker.py - is_task_already_used()

重复代码特征

# 两个方法都遍历 Redis 键查找任务
if self.use_redis:
    keys = self.redis_client.keys("task:*")
    for key in keys:
        task_info = self.redis_client.get(key)
        if task_info:
            task_data = json.loads(task_info)
            if task_data.get("callback_task_id") == callback_task_id:
                # ... 不同的检查逻辑

重构建议

方案: 提取通用的任务查找方法

# 建议位置: core/base/redis_duplicate_checker.py
class RedisDuplicateChecker:
    
    def _find_task_by_callback_id(self, callback_task_id: str) -> Optional[Dict]:
        """根据 callback_task_id 查找任务数据"""
        if self.use_redis:
            keys = self.redis_client.keys("task:*")
            for key in keys:
                task_info = self.redis_client.get(key)
                if task_info:
                    task_data = json.loads(task_info)
                    if task_data.get("callback_task_id") == callback_task_id:
                        return task_data
        else:
            for file_id, task_info in self.task_cache.items():
                if task_info.get("callback_task_id") == callback_task_id:
                    return task_info
        return None
    
    async def is_valid_task_id(self, callback_task_id: str) -> bool:
        """验证任务ID是否存在且未过期"""
        task_data = self._find_task_by_callback_id(callback_task_id)
        if not task_data:
            return False
        
        created_at = datetime.fromisoformat(task_data['created_at'])
        return datetime.now() - created_at < timedelta(hours=1)

影响范围: redis_duplicate_checker.py


🔍 六、审查引擎中的 trace_id 构造(重复度:低)

重复位置

多个审查方法中都有类似的 trace_id 构造逻辑:

  1. sensitive_word_check()
  2. check_semantic_logic()
  3. check_non_parameter_compliance()
  4. check_parameter_compliance()

重复代码特征

# 每个方法都重复构造 trace_id
reviewer_type = Stage.BASIC.value['reviewer_type']
prompt_name = Stage.BASIC.value['grammar']
trace_id = prompt_name + trace_id_idx

重构建议

方案: 创建 trace_id 构造工具方法

# 建议位置: core/construction_review/component/ai_review_engine.py
class AIReviewEngine:
    
    def _build_trace_id(self, stage: Stage, check_type: str, trace_id_idx: str) -> str:
        """构造 trace_id"""
        reviewer_type = stage.value['reviewer_type']
        prompt_name = stage.value[check_type]
        return f"{prompt_name}{trace_id_idx}"
    
    async def sensitive_word_check(self, trace_id_idx: str, ...):
        trace_id = self._build_trace_id(Stage.BASIC, 'grammar', trace_id_idx)
        # ...

影响范围: ai_review_engine.py


📊 七、重复度统计

重复类型 重复度 影响文件数 优先级
Redis 连接初始化 2 ⭐⭐⭐
进度更新与 SSE 推送 5+ ⭐⭐⭐
审查结果处理 2 ⭐⭐
文档处理降级逻辑 1 ⭐⭐
任务状态检查 1
trace_id 构造 1

🎯 八、重构优先级建议

高优先级(建议立即重构)

  1. Redis 连接初始化 - 影响范围广,重复度高
  2. 进度更新与 SSE 推送 - 代码分散,维护成本高

中优先级(建议近期重构)

  1. 审查结果处理 - 提升代码可读性
  2. 文档处理降级逻辑 - 减少代码冗余

低优先级(可选重构)

  1. 任务状态检查 - 局部优化
  2. trace_id 构造 - 局部优化

📝 九、重构注意事项

1. 向后兼容性

  • 保留旧接口,标记为 @deprecated
  • 提供迁移指南和示例代码

2. 测试覆盖

  • 为新的工具类编写单元测试
  • 确保重构后功能不变

3. 文档更新

  • 更新 API 文档
  • 添加使用示例

4. 渐进式重构

  • 先重构高优先级项
  • 逐步迁移现有代码
  • 避免一次性大规模改动

🔧 十、建议的新增工具模块

1. foundation/infrastructure/cache/redis_factory.py

  • Redis 连接工厂
  • 连接池管理

2. core/construction_review/component/reviewers/utils/progress_helper.py

  • 进度推送辅助工具
  • SSE 消息格式化

3. core/construction_review/component/reviewers/utils/result_converter.py

  • 审查结果转换工具
  • 统一的结果格式化

📌 十一、总结

通过本次梳理,发现 core/ 目录中存在 6 类重复实现的可复用方法,主要集中在:

  1. 基础设施层:Redis 连接、进度管理
  2. 业务逻辑层:审查结果处理、文档处理
  3. 工具方法层:trace_id 构造、任务状态检查

建议优先重构 Redis 连接初始化进度更新与 SSE 推送 两个高频重复模块,可显著提升代码质量和维护效率。


报告生成者: Kiro AI Assistant
最后更新: 2026-01-08