Browse Source

dev:优化了文件切分模块的的目录分类的性能;

ChenJiSheng 3 days ago
parent
commit
9ffd68a5c7

+ 17 - 21
core/construction_review/component/doc_worker/classification/hierarchy_classifier.py

@@ -7,6 +7,7 @@
 from __future__ import annotations
 
 from collections import Counter
+import asyncio
 from typing import Any, Dict, List, Optional
 
 from ..interfaces import HierarchyClassifier as IHierarchyClassifier
@@ -32,29 +33,11 @@ class HierarchyClassifier(IHierarchyClassifier):
         # 获取标准类别列表(从CSV动态加载)
         self.standard_categories = self.prompt_loader.get_standard_categories()
 
-    def classify(
+    async def classify_async(
         self, toc_items: List[Dict[str, Any]], target_level: int = 1
     ) -> Dict[str, Any]:
         """
-        对目录项进行智能分类(基于LLM API智能识别)
-        
-        新逻辑:
-        1. 只对一级目录进行分类
-        2. 通过异步并发调用LLM API,基于一级目录标题及其下属二级目录来判断分类
-        3. 使用LLM的智能理解能力进行准确分类
-        
-        参数:
-            toc_items: 目录项列表(已经过层级识别)
-            target_level: 要分类的目标层级(默认为1,即一级目录)
-            
-        返回:
-            dict: 分类结果
-            {
-                "items": [...],
-                "total_count": int,
-                "target_level": int,
-                "category_stats": {...}
-            }
+        异步版本的目录分类(推荐在已有事件循环中使用)。
         """
         print(f"\n正在对{target_level}级目录进行智能分类(基于LLM API识别)...")
         
@@ -126,7 +109,7 @@ class HierarchyClassifier(IHierarchyClassifier):
             llm_requests.append(messages)
         
         # 批量异步调用LLM API
-        llm_results = self.llm_client.batch_call(llm_requests)
+        llm_results = await self.llm_client.batch_call_async(llm_requests)
         
         # 处理分类结果
         classified_items = []
@@ -204,3 +187,16 @@ class HierarchyClassifier(IHierarchyClassifier):
             "target_level": target_level,
             "category_stats": dict(category_stats),
         }
+
+    def classify(
+        self, toc_items: List[Dict[str, Any]], target_level: int = 1
+    ) -> Dict[str, Any]:
+        """
+        同步包装,内部调用异步实现。适合无事件循环的同步场景。
+        """
+        try:
+            return asyncio.run(self.classify_async(toc_items, target_level))
+        except RuntimeError as exc:
+            raise RuntimeError(
+                "HierarchyClassifier.classify 不支持在运行中的事件循环内调用,请改用 await classify_async"
+            ) from exc

+ 4 - 0
core/construction_review/component/doc_worker/pdf_worker/classifier.py

@@ -50,6 +50,10 @@ class PdfHierarchyClassifier(HierarchyClassifier):
         # 委托给LLM分类器
         return self._llm_classifier.classify(toc_items, target_level)
 
+    async def classify_async(self, toc_items: List[Dict[str, Any]], target_level: int) -> Dict[str, Any]:
+        """异步分类包装,直接转发给内部 LLM 分类器。"""
+        return await self._llm_classifier.classify_async(toc_items, target_level)
+
 
 
 

+ 11 - 17
core/construction_review/component/doc_worker/utils/llm_client.py

@@ -4,7 +4,7 @@ LLM API客户端工具类
 """
 
 from __future__ import annotations
-
+from foundation.observability.logger.loggering import server_logger as logger
 import asyncio
 import json
 from typing import Any, Dict, List, Optional
@@ -316,26 +316,20 @@ class LLMClient:
         """
         if HAS_AIOHTTP:
             # 使用异步实现
-            # 注释掉异步初始化,直接使用 workflow_manager.py 设置的全局事件循环
-            # loop = asyncio.get_event_loop()
-            # if loop.is_running():
-            #     # 如果事件循环已经在运行,创建新的事件循环
-            #     import nest_asyncio
-            #     try:
-            #         nest_asyncio.apply()
-            #     except ImportError:
-            #         # 如果没有nest_asyncio,回退到同步调用
-            #         return self._batch_call_sync_fallback(requests)
-            # return loop.run_until_complete(self.batch_call_async(requests))
-            
-            # 使用 workflow_manager.py 的全局事件循环(如果已存在)
             try:
-                # 获取 workflow_manager.py 设置的全局事件循环
+                # 获取当前事件循环
                 loop = asyncio.get_event_loop()
-                # 直接使用全局循环,不进行任何初始化
+                # 如果事件循环已在运行,避免 run_until_complete 引发异常,直接回退同步
+                if loop.is_running():
+                    logger.warning("检测到运行中的事件循环,batch_call 请改用 await batch_call_async;本次回退同步调用")
+                    return self._batch_call_sync_fallback(requests)
+
+                # 事件循环存在但未运行,可以直接使用 run_until_complete
+                logger.info("异步调用LLM API进行目录分类处理")
                 return loop.run_until_complete(self.batch_call_async(requests))
             except RuntimeError:
-                # 如果没有事件循环(workflow_manager.py 还未初始化),回退到同步调用
+                # 如果没有事件循环,回退到同步调用
+                logger.info("同步调用LLM API进行目录分类处理(无事件循环)")
                 return self._batch_call_sync_fallback(requests)
         else:
             return self._batch_call_sync_fallback(requests)

+ 2 - 2
core/construction_review/component/document_processor.py

@@ -128,7 +128,7 @@ class DocumentProcessor:
             target_level = int(self.config.get("text_splitting.target_level", 1))
             logger.info(f"步骤2: 对{target_level}级目录进行分类")
             
-            classification_result = self.pdf_classifier.classify(
+            classification_result = await self.pdf_classifier.classify_async(
                 toc_info['toc_items'],
                 target_level=target_level
             )
@@ -268,7 +268,7 @@ class DocumentProcessor:
             target_level = int(self.config.get("text_splitting.target_level", 1))
             logger.info(f"步骤2: 对{target_level}级目录进行分类")
             
-            classification_result = self.docx_classifier.classify(
+            classification_result = await self.docx_classifier.classify_async(
                 toc_info['toc_items'],
                 target_level=target_level
             )

+ 442 - 0
core目录重复实现方法梳理报告.md

@@ -0,0 +1,442 @@
+# Core 目录重复实现方法梳理报告
+
+## 📋 报告概述
+
+**生成时间**: 2026-01-08  
+**分析范围**: `core/` 目录及其子目录  
+**目的**: 识别可复用但被重复实现的方法,提供重构建议
+
+---
+
+## 🔍 一、Redis 连接初始化(重复度:高)
+
+### 重复位置
+
+1. **`core/base/progress_manager.py`** - `ProgressManager._init_redis()`
+2. **`core/base/redis_duplicate_checker.py`** - `RedisDuplicateChecker.__init__()`
+
+### 重复代码特征
+
+```python
+# 两个类中都有相似的 Redis 连接逻辑
+redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
+redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
+redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
+redis_db = config_handler.get('redis', 'REDIS_DB', '0')
+
+if redis_password:
+    redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}"
+else:
+    redis_url = f"redis://{redis_host}:{redis_port}/{redis_db}"
+
+self.redis_client = redis.from_url(redis_url, decode_responses=True)
+self.redis_client.ping()
+```
+
+### 重构建议
+
+**方案**: 创建统一的 Redis 连接工厂类
+
+```python
+# 建议位置: foundation/infrastructure/cache/redis_factory.py
+class RedisConnectionFactory:
+    """Redis 连接工厂 - 统一管理 Redis 连接"""
+    
+    @staticmethod
+    def create_connection(decode_responses=True):
+        """创建 Redis 连接"""
+        redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
+        redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
+        redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
+        redis_db = config_handler.get('redis', 'REDIS_DB', '0')
+        
+        if redis_password:
+            redis_url = f"redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}"
+        else:
+            redis_url = f"redis://{redis_host}:{redis_port}/{redis_db}"
+        
+        client = redis.from_url(redis_url, decode_responses=decode_responses)
+        client.ping()
+        return client
+```
+
+**影响范围**: 2 个文件需要修改
+
+---
+
+## 🔍 二、进度更新与 SSE 推送(重复度:中)
+
+### 重复位置
+
+1. **`core/base/progress_manager.py`** - `update_stage_progress()`
+2. **多个 reviewer 文件** - 审查完成后的进度推送逻辑
+
+### 重复代码特征
+
+```python
+# 在多个审查方法中重复出现
+if state and state.get("progress_manager"):
+    asyncio.create_task(
+        state["progress_manager"].update_stage_progress(
+            callback_task_id=state["callback_task_id"],
+            stage_name=stage_name,
+            current=None,
+            status="processing",
+            message=f"{name} 审查完成,耗时: {execution_time:.2f}s",
+            issues=[review_result_data],
+            event_type="processing"
+        )
+    )
+```
+
+### 重构建议
+
+**方案**: 创建审查结果推送装饰器或工具方法
+
+```python
+# 建议位置: core/construction_review/component/reviewers/utils/progress_helper.py
+class ProgressHelper:
+    """进度推送辅助工具"""
+    
+    @staticmethod
+    async def push_review_result(state: dict, stage_name: str, 
+                                 review_name: str, result: ReviewResult):
+        """统一推送审查结果"""
+        if not state or not state.get("progress_manager"):
+            return
+        
+        review_result_data = {
+            'name': review_name,
+            'success': result.success,
+            'details': result.details,
+            'error_message': result.error_message,
+            'execution_time': result.execution_time,
+            'timestamp': time.time()
+        }
+        
+        await state["progress_manager"].update_stage_progress(
+            callback_task_id=state["callback_task_id"],
+            stage_name=stage_name,
+            current=None,
+            status="processing",
+            message=f"{review_name} 审查完成,耗时: {result.execution_time:.2f}s",
+            issues=[review_result_data],
+            event_type="processing"
+        )
+```
+
+**影响范围**: `ai_review_engine.py` 及多个 reviewer 文件
+
+---
+
+## 🔍 三、审查结果处理(重复度:中)
+
+### 重复位置
+
+1. **`core/construction_review/component/ai_review_engine.py`** - `_process_review_result()`
+2. **`core/construction_review/workflows/ai_review_workflow.py`** - 结果处理逻辑
+
+### 重复代码特征
+
+```python
+# 将 ReviewResult 对象转换为字典的逻辑重复
+if isinstance(result, Exception):
+    return {"error": str(result), "success": False}
+elif hasattr(result, '__dict__'):
+    return {
+        "success": result.success if hasattr(result, 'success') else False,
+        "details": result.details if hasattr(result, 'details') else {},
+        "error_message": result.error_message if hasattr(result, 'error_message') else None,
+        "execution_time": result.execution_time if hasattr(result, 'execution_time') else None
+    }
+```
+
+### 重构建议
+
+**方案**: 在 `ReviewResult` 类中添加 `to_dict()` 方法
+
+```python
+# 建议位置: core/construction_review/component/reviewers/base_reviewer.py
+@dataclass
+class ReviewResult:
+    """审查结果"""
+    success: bool
+    details: Dict[str, Any]
+    error_message: Optional[str]
+    execution_time: float
+    
+    def to_dict(self) -> Dict[str, Any]:
+        """转换为字典格式"""
+        return {
+            "success": self.success,
+            "details": self.details,
+            "error_message": self.error_message,
+            "execution_time": self.execution_time
+        }
+    
+    @classmethod
+    def from_exception(cls, e: Exception, execution_time: float = 0.0):
+        """从异常创建失败结果"""
+        return cls(
+            success=False,
+            details={"error": str(e)},
+            error_message=str(e),
+            execution_time=execution_time
+        )
+```
+
+**影响范围**: `ai_review_engine.py`, `ai_review_workflow.py`
+
+---
+
+## 🔍 四、文档处理降级逻辑(重复度:中)
+
+### 重复位置
+
+1. **`core/construction_review/component/document_processor.py`** - `_fallback_pdf_processing()`
+2. **`core/construction_review/component/document_processor.py`** - `_fallback_docx_processing()`
+
+### 重复代码特征
+
+```python
+# PDF 和 DOCX 的降级处理逻辑高度相似
+try:
+    logger.info("使用基础处理模式")
+    # ... 基础处理逻辑
+    return {
+        'document_type': file_type,
+        'total_chunks': len(chunks),
+        'chunks': chunks,
+        # ...
+    }
+except Exception as e:
+    logger.error(f"基础处理失败: {str(e)}")
+    raise
+```
+
+### 重构建议
+
+**方案**: 提取通用的降级处理模板方法
+
+```python
+# 建议位置: core/construction_review/component/document_processor.py
+class DocumentProcessor:
+    
+    async def _fallback_processing(self, file_path: str, file_type: str) -> Dict[str, Any]:
+        """通用降级处理模板"""
+        try:
+            logger.info(f"使用基础{file_type.upper()}处理模式")
+            
+            # 根据文件类型选择加载器
+            if file_type == 'pdf':
+                loader = PyPDFLoader(file_path)
+            elif file_type == 'docx':
+                loader = self._create_docx_loader(file_path)
+            else:
+                raise ValueError(f"不支持的文件类型: {file_type}")
+            
+            documents = loader.load()
+            
+            # 统一的文本分块逻辑
+            splits = self._split_documents(documents)
+            
+            return self._format_fallback_result(file_type, documents, splits)
+            
+        except Exception as e:
+            logger.error(f"基础{file_type.upper()}处理失败: {str(e)}")
+            raise
+```
+
+**影响范围**: `document_processor.py`
+
+---
+
+## 🔍 五、任务状态检查(重复度:低)
+
+### 重复位置
+
+1. **`core/base/redis_duplicate_checker.py`** - `is_valid_task_id()`
+2. **`core/base/redis_duplicate_checker.py`** - `is_task_already_used()`
+
+### 重复代码特征
+
+```python
+# 两个方法都遍历 Redis 键查找任务
+if self.use_redis:
+    keys = self.redis_client.keys("task:*")
+    for key in keys:
+        task_info = self.redis_client.get(key)
+        if task_info:
+            task_data = json.loads(task_info)
+            if task_data.get("callback_task_id") == callback_task_id:
+                # ... 不同的检查逻辑
+```
+
+### 重构建议
+
+**方案**: 提取通用的任务查找方法
+
+```python
+# 建议位置: core/base/redis_duplicate_checker.py
+class RedisDuplicateChecker:
+    
+    def _find_task_by_callback_id(self, callback_task_id: str) -> Optional[Dict]:
+        """根据 callback_task_id 查找任务数据"""
+        if self.use_redis:
+            keys = self.redis_client.keys("task:*")
+            for key in keys:
+                task_info = self.redis_client.get(key)
+                if task_info:
+                    task_data = json.loads(task_info)
+                    if task_data.get("callback_task_id") == callback_task_id:
+                        return task_data
+        else:
+            for file_id, task_info in self.task_cache.items():
+                if task_info.get("callback_task_id") == callback_task_id:
+                    return task_info
+        return None
+    
+    async def is_valid_task_id(self, callback_task_id: str) -> bool:
+        """验证任务ID是否存在且未过期"""
+        task_data = self._find_task_by_callback_id(callback_task_id)
+        if not task_data:
+            return False
+        
+        created_at = datetime.fromisoformat(task_data['created_at'])
+        return datetime.now() - created_at < timedelta(hours=1)
+```
+
+**影响范围**: `redis_duplicate_checker.py`
+
+---
+
+## 🔍 六、审查引擎中的 trace_id 构造(重复度:低)
+
+### 重复位置
+
+多个审查方法中都有类似的 trace_id 构造逻辑:
+
+1. `sensitive_word_check()`
+2. `check_semantic_logic()`
+3. `check_non_parameter_compliance()`
+4. `check_parameter_compliance()`
+
+### 重复代码特征
+
+```python
+# 每个方法都重复构造 trace_id
+reviewer_type = Stage.BASIC.value['reviewer_type']
+prompt_name = Stage.BASIC.value['grammar']
+trace_id = prompt_name + trace_id_idx
+```
+
+### 重构建议
+
+**方案**: 创建 trace_id 构造工具方法
+
+```python
+# 建议位置: core/construction_review/component/ai_review_engine.py
+class AIReviewEngine:
+    
+    def _build_trace_id(self, stage: Stage, check_type: str, trace_id_idx: str) -> str:
+        """构造 trace_id"""
+        reviewer_type = stage.value['reviewer_type']
+        prompt_name = stage.value[check_type]
+        return f"{prompt_name}{trace_id_idx}"
+    
+    async def sensitive_word_check(self, trace_id_idx: str, ...):
+        trace_id = self._build_trace_id(Stage.BASIC, 'grammar', trace_id_idx)
+        # ...
+```
+
+**影响范围**: `ai_review_engine.py`
+
+---
+
+## 📊 七、重复度统计
+
+| 重复类型 | 重复度 | 影响文件数 | 优先级 |
+|---------|--------|-----------|--------|
+| Redis 连接初始化 | 高 | 2 | ⭐⭐⭐ |
+| 进度更新与 SSE 推送 | 中 | 5+ | ⭐⭐⭐ |
+| 审查结果处理 | 中 | 2 | ⭐⭐ |
+| 文档处理降级逻辑 | 中 | 1 | ⭐⭐ |
+| 任务状态检查 | 低 | 1 | ⭐ |
+| trace_id 构造 | 低 | 1 | ⭐ |
+
+---
+
+## 🎯 八、重构优先级建议
+
+### 高优先级(建议立即重构)
+
+1. **Redis 连接初始化** - 影响范围广,重复度高
+2. **进度更新与 SSE 推送** - 代码分散,维护成本高
+
+### 中优先级(建议近期重构)
+
+3. **审查结果处理** - 提升代码可读性
+4. **文档处理降级逻辑** - 减少代码冗余
+
+### 低优先级(可选重构)
+
+5. **任务状态检查** - 局部优化
+6. **trace_id 构造** - 局部优化
+
+---
+
+## 📝 九、重构注意事项
+
+### 1. 向后兼容性
+
+- 保留旧接口,标记为 `@deprecated`
+- 提供迁移指南和示例代码
+
+### 2. 测试覆盖
+
+- 为新的工具类编写单元测试
+- 确保重构后功能不变
+
+### 3. 文档更新
+
+- 更新 API 文档
+- 添加使用示例
+
+### 4. 渐进式重构
+
+- 先重构高优先级项
+- 逐步迁移现有代码
+- 避免一次性大规模改动
+
+---
+
+## 🔧 十、建议的新增工具模块
+
+### 1. `foundation/infrastructure/cache/redis_factory.py`
+- Redis 连接工厂
+- 连接池管理
+
+### 2. `core/construction_review/component/reviewers/utils/progress_helper.py`
+- 进度推送辅助工具
+- SSE 消息格式化
+
+### 3. `core/construction_review/component/reviewers/utils/result_converter.py`
+- 审查结果转换工具
+- 统一的结果格式化
+
+---
+
+## 📌 十一、总结
+
+通过本次梳理,发现 `core/` 目录中存在 **6 类重复实现的可复用方法**,主要集中在:
+
+1. **基础设施层**:Redis 连接、进度管理
+2. **业务逻辑层**:审查结果处理、文档处理
+3. **工具方法层**:trace_id 构造、任务状态检查
+
+建议优先重构 **Redis 连接初始化** 和 **进度更新与 SSE 推送** 两个高频重复模块,可显著提升代码质量和维护效率。
+
+---
+
+**报告生成者**: Kiro AI Assistant  
+**最后更新**: 2026-01-08