ソースを参照

dev:规范性和时间性添加过滤匹配不到的将不参见审查

ZengChao 1 ヶ月 前
コミット
4c894fc357

+ 164 - 6
core/construction_review/component/reviewers/reference_basis_reviewer.py

@@ -1,19 +1,105 @@
 from __future__ import annotations
 
 import asyncio
+import json
 import time
 import yaml
 from typing import Any, Dict, List, Optional
+from functools import partial
 
+from langchain_milvus import Milvus, BM25BuiltInFunction
+from foundation.infrastructure.config.config import config_handler
+from foundation.ai.models.model_handler import model_handler as mh
 from core.construction_review.component.reviewers.utils.directory_extraction import BasisItem, BasisItems
 from core.construction_review.component.reviewers.utils.inter_tool import InterTool
 from core.construction_review.component.reviewers.utils.prompt_loader import PromptLoader
 from core.construction_review.component.reviewers.utils.punctuation_checker import check_punctuation
 from core.construction_review.component.reviewers.utils.punctuation_result_processor import process_punctuation_results
+from core.construction_review.component.reviewers.utils.reference_matcher import match_reference_files
 from foundation.observability.logger.loggering import server_logger as logger
 from langchain_core.prompts import ChatPromptTemplate
 from langchain_openai import ChatOpenAI
 
+class BasisSearchEngine:
+    """编制依据向量搜索引擎"""
+
+    def __init__(self):
+        self.emdmodel = None
+        self._initialize()
+
+    def _initialize(self):
+        """初始化搜索引擎"""
+        try:
+            # 连接配置
+            self.host = config_handler.get('milvus', 'MILVUS_HOST', 'localhost')
+            self.port = int(config_handler.get('milvus', 'MILVUS_PORT', '19530'))
+            self.user = config_handler.get('milvus', 'MILVUS_USER')
+            self.password = config_handler.get('milvus', 'MILVUS_PASSWORD')
+
+            # 初始化嵌入模型
+            self.emdmodel = mh._get_lq_qwen3_8b_emd()
+            logger.info("嵌入模型初始化成功")
+
+        except Exception as e:
+            logger.error(f" BasisSearchEngine 初始化失败: {e}")
+
+    def hybrid_search(self, collection_name: str, query_text: str,
+                     top_k: int = 3, ranker_type: str = "weighted",
+                     dense_weight: float = 0.7, sparse_weight: float = 0.3):
+        try:
+
+            # 连接到现有集合
+            connection_args = {
+                "uri": f"http://{self.host}:{self.port}",
+                "user": self.user,
+                "db_name": "lq_db"
+            }
+
+            if self.password:
+                connection_args["password"] = self.password
+
+            vectorstore = Milvus(
+                embedding_function=self.emdmodel,
+                collection_name=collection_name,
+                connection_args=connection_args,
+                consistency_level="Strong",
+                builtin_function=BM25BuiltInFunction(),
+                vector_field=["dense", "sparse"]
+            )
+
+            # 执行混合搜索
+            if ranker_type == "weighted":
+                results = vectorstore.similarity_search(
+                    query=query_text,
+                    k=top_k,
+                    ranker_type="weighted",
+                    ranker_params={"weights": [dense_weight, sparse_weight]}
+                )
+            else:  # rrf
+                results = vectorstore.similarity_search(
+                    query=query_text,
+                    k=top_k,
+                    ranker_type="rrf",
+                    ranker_params={"k": 60}
+                )
+
+            # 格式化结果,保持与其他搜索方法一致
+            formatted_results = []
+            for doc in results:
+                formatted_results.append({
+                    'id': doc.metadata.get('pk', 0),
+                    'text_content': doc.page_content,
+                    'metadata': doc.metadata,
+                    'distance': 0.0,
+                    'similarity': 1.0
+                })
+
+            return formatted_results
+
+        except Exception as e:
+            # 回退到传统的向量搜索
+            logger.error(f" 搜索失败: {e}")
+
 class StandardizedResponseProcessor:
     """标准化响应处理器 - 统一为outline_reviewer.py格式"""
 
@@ -115,6 +201,7 @@ class BasisReviewService:
     """编制依据审查服务核心类"""
 
     def __init__(self, max_concurrent: int = 4):
+        self.search_engine = BasisSearchEngine()
         self.llm_client = LLMReviewClient()
         self.response_processor = StandardizedResponseProcessor()
         fresh_prompt_loader = PromptLoader()
@@ -147,17 +234,59 @@ class BasisReviewService:
 
         async with self._semaphore:
             try:
-                # 第一步:调用标点符号检查器
+                # 第一步:搜索编制依据并通过match_reference_files过滤
+                search_tasks = []
+                for basis in basis_items:
+                    task = asyncio.create_task(
+                        self._async_search_basis(basis, collection_name, top_k_each)
+                    )
+                    search_tasks.append(task)
+
+                # 等待所有搜索完成
+                search_results = await asyncio.gather(*search_tasks, return_exceptions=True)
+
+                grouped_candidates = []
+                for i, result in enumerate(search_results):
+                    if isinstance(result, Exception):
+                        logger.error(f"搜索失败 '{basis_items[i]}': {result}")
+                        grouped_candidates.append([])
+                    else:
+                        # result 是 List[dict],需要遍历
+                        texts = [item["text_content"] for item in result if "text_content" in item]
+                        grouped_candidates.append(texts)
+                
+                # 获取match_reference_files的结果并过滤
+                match_result = await match_reference_files(reference_text=grouped_candidates, review_text=basis_items)
+                # 解析JSON并过滤:same_name_current和exact_match_info都是""的项过滤掉
+                try:
+                    match_data = json.loads(match_result)
+                    # 提取items字段(match_reference_files返回{items: [...]}格式)
+                    items = match_data.get('items', match_data) if isinstance(match_data, dict) else match_data
+                    filtered_data = [item for item in items if not (item.get('same_name_current') == "" and item.get('exact_match_info') == "")]
+                    # 从过滤后的数据中提取review_item用于后续检查
+                    filtered_basis_items = [item.get('review_item') for item in filtered_data if item.get('review_item')]
+                    basis_items_to_check = filtered_basis_items if filtered_basis_items else []
+                    logger.info(f"过滤后参与检查的编制依据: {len(basis_items_to_check)}/{len(basis_items)}")
+                except (json.JSONDecodeError, TypeError) as e:
+                    logger.warning(f"过滤match_reference_files结果时出错: {e}")
+                    # 如果解析失败,使用原始结果
+                    basis_items_to_check = []
                 
-                checker_result = await check_punctuation(basis_items)
+                # 如果没有过滤出数据,直接返回空结果
+                if not basis_items_to_check:
+                    logger.info(f"过滤后没有符合条件的编制依据,跳过后续检查")
+                    return []
+                
+                # 第二步:调用标点符号检查器
+                checker_result = await check_punctuation(basis_items_to_check)
                 print(checker_result)
                 
-                # 第二步:调用结果处理器,生成详细的问题分析报告
+                # 第步:调用结果处理器,生成详细的问题分析报告
                 processor_result = await process_punctuation_results(checker_result)
                 print("\n【第二步】问题分析报告输出:")
                 print(processor_result)
                 
-                # 第三步:转换为标准格式
+                # 第步:转换为标准格式
                 standardized_result = self.response_processor.process_llm_response(
                     processor_result, 
                     "reference_check", 
@@ -167,7 +296,7 @@ class BasisReviewService:
 
                 # 统计问题数量
                 issue_count = sum(1 for item in standardized_result if item.get('exist_issue', False))
-                logger.info(f"编制依据批次审查完成:总计 {len(basis_items)} 项,发现问题 {issue_count} 项")
+                logger.info(f"编制依据批次审查完成:总计 {len(basis_items_to_check)} 项,发现问题 {issue_count} 项")
 
                 return standardized_result
 
@@ -182,6 +311,31 @@ class BasisReviewService:
                     "risk_info": {"risk_level": "high"}
                 }]
 
+    async def _async_search_basis(
+        self,
+        basis: str,
+        collection_name: str,
+        top_k_each: int
+    ) -> List[dict]:
+        """异步搜索单个编制依据(Hybrid Search)"""
+        try:
+            loop = asyncio.get_running_loop()
+            func = partial(
+                self.search_engine.hybrid_search,
+                collection_name=collection_name,
+                query_text=basis,
+                top_k=top_k_each,
+                ranker_type="weighted",
+                dense_weight=0.3,
+                sparse_weight=0.7
+            )
+            retrieved = await loop.run_in_executor(None, func)
+            logger.info(f" 搜索 '{basis}' -> 找到 {len(retrieved or [])} 个结果")
+            return retrieved or []
+        except Exception as e:
+            logger.error(f" 搜索失败 '{basis}': {e}")
+            return []
+
     async def review_all(self, basis_items: BasisItems, collection_name: str = "first_bfp_collection_status",
                         progress_manager=None, callback_task_id: str = None) -> List[List[Dict[str, Any]]]:
         """异步批量审查所有编制依据(BasisItems 入参)"""
@@ -356,6 +510,10 @@ async def review_all_basis_async(basis_items: BasisItems, max_concurrent: int =
 if __name__ == "__main__":
     # 简单测试
     test_basis_items = BasisItems(items=[
-        BasisItem(title="中华人民共和国特种设备安全法", suffix="2023", raw="《公路桥涵施工技术规范》JTG/T 3650-2020);")
+        BasisItem(title="坠落防护水平生命线装置", suffix="GB 38454", raw="《坠落防护水平生命线装置》GB 38454"),
+        BasisItem(title="电力高处作业防坠器", suffix="DL/T 1147", raw="《电力高处作业防坠器》DL/T 1147"),
+        BasisItem(title="坠落防护挂点装置", suffix="GB 30862", raw="《坠落防护挂点装置》GB 30862"),
+        BasisItem(title="混凝土结构设计规范", suffix="GB 50010-2010", raw="《混凝土结构设计规范》GB 50010-2010"),
+        BasisItem(title="建筑施工组织设计规范", suffix="GB/T 50502-2015", raw="《建筑施工组织设计规范》GB/T 50502-2015"),
     ])
     result = asyncio.run(review_all_basis_async(test_basis_items))

+ 33 - 8
core/construction_review/component/reviewers/timeliness_basis_reviewer.py

@@ -198,14 +198,39 @@ class BasisReviewService:
                         texts = [item["text_content"] for item in result if "text_content" in item]
                         grouped_candidates.append(texts)
                 
-                llm_out = await determine_timeliness_issue(await match_reference_files(reference_text=grouped_candidates, review_text=basis_items))
-                
-                standardized_result = self.response_processor.process_llm_response(llm_out, "timeliness_check", "basis", "basis_timeliness_check")
-                # 统计问题数量
-                issue_count = sum(1 for item in standardized_result if item.get('exist_issue', False))
-                logger.info(f"编制依据批次审查完成:总计 {len(basis_items)} 项,发现问题 {issue_count} 项")
-
-                return standardized_result
+                # 获取match_reference_files的结果并过滤
+                match_result = await match_reference_files(reference_text=grouped_candidates, review_text=basis_items)
+                # 解析JSON并过滤:same_name_current和exact_match_info都是""的项过滤掉
+                try:
+                    match_data = json.loads(match_result)
+                    # 提取items字段(match_reference_files返回{items: [...]}格式)
+                    items = match_data.get('items', match_data) if isinstance(match_data, dict) else match_data
+                    filtered_data = [item for item in items if not (item.get('same_name_current') == "" and item.get('exact_match_info') == "")]
+                    
+                    # 如果没有过滤出数据,直接返回空结果
+                    if not filtered_data:
+                        logger.info(f"过滤后没有符合条件的编制依据,跳过后续检查")
+                        standardized_result = []
+                    else:
+                        # 重新构建JSON格式
+                        if isinstance(match_data, dict) and 'items' in match_data:
+                            match_result = json.dumps({"items": filtered_data}, ensure_ascii=False, indent=2)
+                        else:
+                            match_result = json.dumps(filtered_data, ensure_ascii=False, indent=2)
+                        
+                        llm_out = await determine_timeliness_issue(match_result)
+                        
+                        standardized_result = self.response_processor.process_llm_response(llm_out, "timeliness_check", "basis", "basis_timeliness_check")
+                        # 统计问题数量
+                        issue_count = sum(1 for item in standardized_result if item.get('exist_issue', False))
+                        logger.info(f"编制依据批次审查完成:总计 {len(filtered_data)} 项,发现问题 {issue_count} 项")
+                    
+                    return standardized_result if standardized_result else []
+                    
+                except (json.JSONDecodeError, TypeError) as e:
+                    logger.warning(f"过滤match_reference_files结果时出错: {e}")
+                    # 如果解析失败,返回空结果
+                    return []
 
             except Exception as e:
                 logger.error(f" 批次处理失败: {e}")