生成时间: 2026-01-08
分析范围: core/ 目录及其子目录
目的: 识别可复用但被重复实现的方法,提供重构建议
core/base/progress_manager.py - ProgressManager._init_redis()core/base/redis_duplicate_checker.py - RedisDuplicateChecker.__init__()# 两个类中都有相似的 Redis 连接逻辑
redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
redis_db = config_handler.get('redis', 'REDIS_DB', '0')
if redis_password:
redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}"
else:
redis_url = f"redis://{redis_host}:{redis_port}/{redis_db}"
self.redis_client = redis.from_url(redis_url, decode_responses=True)
self.redis_client.ping()
方案: 创建统一的 Redis 连接工厂类
# 建议位置: foundation/infrastructure/cache/redis_factory.py
class RedisConnectionFactory:
"""Redis 连接工厂 - 统一管理 Redis 连接"""
@staticmethod
def create_connection(decode_responses=True):
"""创建 Redis 连接"""
redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
redis_db = config_handler.get('redis', 'REDIS_DB', '0')
if redis_password:
redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}"
else:
redis_url = f"redis://{redis_host}:{redis_port}/{redis_db}"
client = redis.from_url(redis_url, decode_responses=decode_responses)
client.ping()
return client
影响范围: 2 个文件需要修改
core/base/progress_manager.py - update_stage_progress()# 在多个审查方法中重复出现
if state and state.get("progress_manager"):
asyncio.create_task(
state["progress_manager"].update_stage_progress(
callback_task_id=state["callback_task_id"],
stage_name=stage_name,
current=None,
status="processing",
message=f"{name} 审查完成,耗时: {execution_time:.2f}s",
issues=[review_result_data],
event_type="processing"
)
)
方案: 创建审查结果推送装饰器或工具方法
# 建议位置: core/construction_review/component/reviewers/utils/progress_helper.py
class ProgressHelper:
"""进度推送辅助工具"""
@staticmethod
async def push_review_result(state: dict, stage_name: str,
review_name: str, result: ReviewResult):
"""统一推送审查结果"""
if not state or not state.get("progress_manager"):
return
review_result_data = {
'name': review_name,
'success': result.success,
'details': result.details,
'error_message': result.error_message,
'execution_time': result.execution_time,
'timestamp': time.time()
}
await state["progress_manager"].update_stage_progress(
callback_task_id=state["callback_task_id"],
stage_name=stage_name,
current=None,
status="processing",
message=f"{review_name} 审查完成,耗时: {result.execution_time:.2f}s",
issues=[review_result_data],
event_type="processing"
)
影响范围: ai_review_engine.py 及多个 reviewer 文件
core/construction_review/component/ai_review_engine.py - _process_review_result()core/construction_review/workflows/ai_review_workflow.py - 结果处理逻辑# 将 ReviewResult 对象转换为字典的逻辑重复
if isinstance(result, Exception):
return {"error": str(result), "success": False}
elif hasattr(result, '__dict__'):
return {
"success": result.success if hasattr(result, 'success') else False,
"details": result.details if hasattr(result, 'details') else {},
"error_message": result.error_message if hasattr(result, 'error_message') else None,
"execution_time": result.execution_time if hasattr(result, 'execution_time') else None
}
方案: 在 ReviewResult 类中添加 to_dict() 方法
# 建议位置: core/construction_review/component/reviewers/base_reviewer.py
@dataclass
class ReviewResult:
"""审查结果"""
success: bool
details: Dict[str, Any]
error_message: Optional[str]
execution_time: float
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
"success": self.success,
"details": self.details,
"error_message": self.error_message,
"execution_time": self.execution_time
}
@classmethod
def from_exception(cls, e: Exception, execution_time: float = 0.0):
"""从异常创建失败结果"""
return cls(
success=False,
details={"error": str(e)},
error_message=str(e),
execution_time=execution_time
)
影响范围: ai_review_engine.py, ai_review_workflow.py
core/construction_review/component/document_processor.py - _fallback_pdf_processing()core/construction_review/component/document_processor.py - _fallback_docx_processing()# PDF 和 DOCX 的降级处理逻辑高度相似
try:
logger.info("使用基础处理模式")
# ... 基础处理逻辑
return {
'document_type': file_type,
'total_chunks': len(chunks),
'chunks': chunks,
# ...
}
except Exception as e:
logger.error(f"基础处理失败: {str(e)}")
raise
方案: 提取通用的降级处理模板方法
# 建议位置: core/construction_review/component/document_processor.py
class DocumentProcessor:
async def _fallback_processing(self, file_path: str, file_type: str) -> Dict[str, Any]:
"""通用降级处理模板"""
try:
logger.info(f"使用基础{file_type.upper()}处理模式")
# 根据文件类型选择加载器
if file_type == 'pdf':
loader = PyPDFLoader(file_path)
elif file_type == 'docx':
loader = self._create_docx_loader(file_path)
else:
raise ValueError(f"不支持的文件类型: {file_type}")
documents = loader.load()
# 统一的文本分块逻辑
splits = self._split_documents(documents)
return self._format_fallback_result(file_type, documents, splits)
except Exception as e:
logger.error(f"基础{file_type.upper()}处理失败: {str(e)}")
raise
影响范围: document_processor.py
core/base/redis_duplicate_checker.py - is_valid_task_id()core/base/redis_duplicate_checker.py - is_task_already_used()# 两个方法都遍历 Redis 键查找任务
if self.use_redis:
keys = self.redis_client.keys("task:*")
for key in keys:
task_info = self.redis_client.get(key)
if task_info:
task_data = json.loads(task_info)
if task_data.get("callback_task_id") == callback_task_id:
# ... 不同的检查逻辑
方案: 提取通用的任务查找方法
# 建议位置: core/base/redis_duplicate_checker.py
class RedisDuplicateChecker:
def _find_task_by_callback_id(self, callback_task_id: str) -> Optional[Dict]:
"""根据 callback_task_id 查找任务数据"""
if self.use_redis:
keys = self.redis_client.keys("task:*")
for key in keys:
task_info = self.redis_client.get(key)
if task_info:
task_data = json.loads(task_info)
if task_data.get("callback_task_id") == callback_task_id:
return task_data
else:
for file_id, task_info in self.task_cache.items():
if task_info.get("callback_task_id") == callback_task_id:
return task_info
return None
async def is_valid_task_id(self, callback_task_id: str) -> bool:
"""验证任务ID是否存在且未过期"""
task_data = self._find_task_by_callback_id(callback_task_id)
if not task_data:
return False
created_at = datetime.fromisoformat(task_data['created_at'])
return datetime.now() - created_at < timedelta(hours=1)
影响范围: redis_duplicate_checker.py
多个审查方法中都有类似的 trace_id 构造逻辑:
sensitive_word_check()check_semantic_logic()check_non_parameter_compliance()check_parameter_compliance()# 每个方法都重复构造 trace_id
reviewer_type = Stage.BASIC.value['reviewer_type']
prompt_name = Stage.BASIC.value['grammar']
trace_id = prompt_name + trace_id_idx
方案: 创建 trace_id 构造工具方法
# 建议位置: core/construction_review/component/ai_review_engine.py
class AIReviewEngine:
def _build_trace_id(self, stage: Stage, check_type: str, trace_id_idx: str) -> str:
"""构造 trace_id"""
reviewer_type = stage.value['reviewer_type']
prompt_name = stage.value[check_type]
return f"{prompt_name}{trace_id_idx}"
async def sensitive_word_check(self, trace_id_idx: str, ...):
trace_id = self._build_trace_id(Stage.BASIC, 'grammar', trace_id_idx)
# ...
影响范围: ai_review_engine.py
| 重复类型 | 重复度 | 影响文件数 | 优先级 |
|---|---|---|---|
| Redis 连接初始化 | 高 | 2 | ⭐⭐⭐ |
| 进度更新与 SSE 推送 | 中 | 5+ | ⭐⭐⭐ |
| 审查结果处理 | 中 | 2 | ⭐⭐ |
| 文档处理降级逻辑 | 中 | 1 | ⭐⭐ |
| 任务状态检查 | 低 | 1 | ⭐ |
| trace_id 构造 | 低 | 1 | ⭐ |
@deprecatedfoundation/infrastructure/cache/redis_factory.pycore/construction_review/component/reviewers/utils/progress_helper.pycore/construction_review/component/reviewers/utils/result_converter.py通过本次梳理,发现 core/ 目录中存在 6 类重复实现的可复用方法,主要集中在:
建议优先重构 Redis 连接初始化 和 进度更新与 SSE 推送 两个高频重复模块,可显著提升代码质量和维护效率。
报告生成者: Kiro AI Assistant
最后更新: 2026-01-08