소스 검색

v0.0.4-功能优化
- 判断是否存在RAG召回内容有则进行专业性审查

WangXuMing 1 개월 전
부모
커밋
e5dc27343e

+ 20 - 41
core/construction_review/component/ai_review_engine.py

@@ -95,7 +95,7 @@ class Stage(Enum):
     """工作流状态"""
     BASIC = {
         'reviewer_type':'basic',
-        'grammar': 'grammar_check',
+        'grammar': 'sensitive_word_check',
         'sensitive':'sensitive_check',
         'semantic': 'semantic_logic_check',
         'completeness': 'completeness_check',
@@ -222,7 +222,7 @@ class AIReviewEngine(BaseReviewer):
             basic_tasks.append(
                 asyncio.create_task(
                     asyncio.wait_for(
-                        check_with_semaphore(self.check_grammar, trace_id_idx=trace_id_idx, review_content=review_content, review_references=None, review_location_label=review_location_label, state=state, stage_name=stage_name),
+                        check_with_semaphore(self.sensitive_word_check, trace_id_idx=trace_id_idx, review_content=review_content, review_references=None, review_location_label=review_location_label, state=state, stage_name=stage_name),
                         timeout=TASK_TIMEOUT
                     )
                 )
@@ -258,7 +258,7 @@ class AIReviewEngine(BaseReviewer):
         # 一次性执行所有任务,避免重复协程调用
         if not basic_tasks:
             return {
-                "grammar_check": self._process_review_result(None),
+                "sensitive_word_check": self._process_review_result(None),
                 "semantic_logic_check": self._process_review_result(None),
                 "sensitive_check": self._process_review_result(None),
             }
@@ -298,7 +298,7 @@ class AIReviewEngine(BaseReviewer):
             if result_index < len(results):
                 grammar_result = self._process_review_result(results[result_index])
             result_index += 1
-            with open('temp/grammar_check_result.json','w',encoding='utf-8') as f:
+            with open('temp/sensitive_word_check_result.json','w',encoding='utf-8') as f:
                 json.dump(grammar_result,f,ensure_ascii=False,indent=4)
         if 'semantic_logic_check' in self.task_info.get_review_config_list():
             if result_index < len(results):
@@ -316,7 +316,7 @@ class AIReviewEngine(BaseReviewer):
         # with open('temp/completeness_check_result.json','w',encoding='utf-8') as f:
         #     json.dump(completeness_result,f,ensure_ascii=False,indent=4)
         return {
-            'grammar_check': grammar_result,
+            'sensitive_word_check': grammar_result,
             'semantic_logic_check': semantic_result,
             'sensitive_check': sensitive_result,
             #'completeness_check': completeness_result,
@@ -428,36 +428,14 @@ class AIReviewEngine(BaseReviewer):
             logger.info(f"[技术审查] 总共创建了 {len(technical_tasks)} 个动态审查任务")
 
         else:
-            # 没有entity_results或未配置专业性审查,使用原有逻辑
-            logger.info("[技术审查] 使用通用审查模式(未使用 entity_results)")
+            # 没有entity_results或未配置专业性审查,直接跳过该条审查
+            logger.warning("[技术审查] 未提供 entity_results 或未配置专业性审查,跳过该条审查")
 
-            if 'non_parameter_compliance_check' in self.task_info.get_review_config_list():
-                task_mapping.append('non_parameter_compliance')
-                technical_tasks.append(
-                    asyncio.create_task(
-                        asyncio.wait_for(
-                            check_with_semaphore(self.check_non_parameter_compliance, trace_id_idx=trace_id_idx,
-                                               review_content=review_content, review_references=None,
-                                               reference_source=None, review_location_label=review_location_label,
-                                               state=state, stage_name=stage_name),
-                            timeout=TASK_TIMEOUT
-                        )
-                    )
-                )
-
-            if 'parameter_compliance_check' in self.task_info.get_review_config_list():
-                task_mapping.append('parameter_compliance')
-                technical_tasks.append(
-                    asyncio.create_task(
-                        asyncio.wait_for(
-                            check_with_semaphore(self.check_parameter_compliance, trace_id_idx=trace_id_idx,
-                                               review_content=review_content, review_references=None,
-                                               reference_source=None, review_location_label=review_location_label,
-                                               state=state, stage_name=stage_name),
-                            timeout=TASK_TIMEOUT
-                        )
-                    )
-                )
+            # 直接返回空结果,不执行任何审查任务
+            return {
+                "non_parameter_compliance": self._process_review_result(None),
+                "parameter_compliance": self._process_review_result(None),
+            }
 
         # 一次性执行所有任务,避免重复协程调用
         if not technical_tasks:
@@ -618,7 +596,8 @@ class AIReviewEngine(BaseReviewer):
                 self.milvus,
                 bfp_result_lists,
                 score_threshold=0.5,  # bfp_rerank_score 阈值
-                max_parents_per_pair=3  # 每个查询对最多3个父文档
+                max_parents_per_pair=3,  # 每个查询对最多3个父文档
+                max_parent_text_length=8000  # 单个父文档最大8000字符(约5300 tokens)
             )
             enhanced_results = enhancement_result['enhanced_results']
             enhanced_count = enhancement_result['enhanced_count']
@@ -638,8 +617,8 @@ class AIReviewEngine(BaseReviewer):
             # 失败时使用原始结果
             enhanced_results = bfp_result_lists
 
-        # Step 5: 提取查询对结果(只保留得分>0.8的结果)
-        entity_results = extract_query_pairs_results(enhanced_results, query_pairs, score_threshold=0.8)
+        # Step 5: 提取查询对结果(只保留得分>0.5的结果)
+        entity_results = extract_query_pairs_results(enhanced_results, query_pairs, score_threshold=0.5)
 
         # 保存最终结果用于调试
         # with open(rf"temp\ai_review_engine\extract_query_pairs_results.json", "w", encoding='utf-8') as f:
@@ -647,7 +626,7 @@ class AIReviewEngine(BaseReviewer):
 
         # 如果没有结果通过阈值过滤,返回空结果
         if not entity_results:
-            logger.warning("[RAG增强] 没有结果通过阈值过滤(得分>0.8),返回空结果")
+            logger.warning("[RAG增强] 没有结果通过阈值过滤(得分>0.5),返回空结果")
             return {
                 'vector_search': [],
                 'retrieval_status': 'no_results',
@@ -666,7 +645,7 @@ class AIReviewEngine(BaseReviewer):
         }
 
 
-    async def check_grammar(self, trace_id_idx: str, review_content: str, review_references: str,
+    async def sensitive_word_check(self, trace_id_idx: str, review_content: str, review_references: str,
                           review_location_label: str, state: str, stage_name: str) -> Dict[str, Any]:
         """
         词句语法检查
@@ -682,7 +661,7 @@ class AIReviewEngine(BaseReviewer):
         Returns:
             ReviewResult: 语法检查结果
         """
-        from core.construction_review.component.reviewers.grammar_check import grammar_check_reviewer
+        from core.construction_review.component.reviewers.sensitive_word_check import sensitive_word_check_reviewer
         
         # 构造trace_id
         reviewer_type = Stage.BASIC.value['reviewer_type']
@@ -690,7 +669,7 @@ class AIReviewEngine(BaseReviewer):
         trace_id = prompt_name + trace_id_idx
         
         # 调用语法检查审查模块
-        result = await grammar_check_reviewer.check_grammar(
+        result = await sensitive_word_check_reviewer.check_grammar(
             trace_id=trace_id,
             review_content=review_content,
             review_references=review_references,

+ 40 - 11
core/construction_review/component/infrastructure/parent_tool.py

@@ -123,7 +123,7 @@ def enhance_with_parent_docs_grouped(
     bfp_result_lists: List,
     score_threshold: float = 0.5,
     max_parents_per_pair: int = 3,
-    # max_parent_text_length: Optional[int] = None
+    max_parent_text_length: int = 8000
 ) -> Dict[str, Any]:
     """
     分组增强 + 按分数筛选 (每个查询对独立处理)
@@ -132,14 +132,14 @@ def enhance_with_parent_docs_grouped(
     1. 每个查询对独立处理,按 bfp_rerank_score 筛选
     2. 只保留并增强高分结果,低分查询对直接跳过
     3. 用父ID从Milvus召回父文档内容
-    4. 将父文档内容拼接到高分结果后
+    4. 将父文档内容拼接到高分结果后(长度受限)
 
     Args:
         milvus_manager: MilvusManager 实例
         bfp_result_lists: 检索结果列表 (二维,每个子列表对应一个查询对)
-        score_threshold: bfp_rerank_score 最低阈值,低于此分数直接跳过 (默认0.7)
-        max_parents_per_pair: 每个查询对最多选取的父文档数量 (默认2个)
-        max_parent_text_length: 单个父文档最大长度限制 (None=不限制)
+        score_threshold: bfp_rerank_score 最低阈值,低于此分数直接跳过 (默认0.5)
+        max_parents_per_pair: 每个查询对最多选取的父文档数量 (默认3个)
+        max_parent_text_length: 单个父文档最大长度限制,单位字符 (默认8000,约5300 tokens)
 
     Returns:
         Dict: 增强结果,包含:
@@ -200,9 +200,19 @@ def enhance_with_parent_docs_grouped(
                 header = f"【文件】{file_name}\n【标题】{title}\n" if file_name or title else ""
 
                 # 拼接所有片段的 text_content
-                combined_text = header + "\n".join([c.get('text', '') for c in sorted_chunks])
+                full_text = "\n".join([c.get('text', '') for c in sorted_chunks])
+
+                # 限制父文档长度(从后往前截断,保留尾部内容)
+                if len(full_text) > max_parent_text_length:
+                    # 添加截断提示
+                    truncated_text = f"...[内容过长,已截断,保留尾部{max_parent_text_length}字符]...\n{full_text[-max_parent_text_length:]}"
+                    logger.warning(f"[分组增强] parent_id={str(pid)[:8]}... 文本过长 ({len(full_text)}字符),截断至 {max_parent_text_length} 字符")
+                    combined_text = header + truncated_text
+                else:
+                    combined_text = header + full_text
+
                 parent_id_to_doc[str(pid)] = combined_text
-                logger.debug(f"[分组增强] parent_id={str(pid)[:8]}... 召回并拼接了 {len(sorted_chunks)} 个片段,文件={file_name}")
+                logger.debug(f"[分组增强] parent_id={str(pid)[:8]}... 召回并拼接了 {len(sorted_chunks)} 个片段,文件={file_name},最终长度={len(combined_text)}字符")
 
         if not parent_id_to_doc:
             logger.warning(f"[分组增强] 查询对 {pair_idx}: 所有父文档召回失败,跳过")
@@ -210,20 +220,39 @@ def enhance_with_parent_docs_grouped(
 
         # 4. 只保留并增强高分结果(每个结果只用其对应的父文档增强)
         enhanced_list = []
+        max_total_length = 12000  # 总长度限制(约8000 tokens),为prompt等留出空间
+
         for result in high_score_results:
             parent_id = str(result.get('metadata', {}).get('parent_id'))
             if parent_id in parent_id_to_doc:
                 # 用该结果对应的父文档增强
                 parent_text = parent_id_to_doc[parent_id]
+                original_text = result.get('text_content', '')
+
+                # 检查总长度
+                total_length = len(original_text) + len(parent_text)
+                if total_length > max_total_length:
+                    # 进一步截断父文档
+                    available_length = max_total_length - len(original_text) - 50  # 留50字符缓冲
+                    if available_length > 500:  # 至少保留500字符
+                        parent_text = parent_text[:available_length] + f"\n...[总长度超限,截断至{available_length}字符]..."
+                        logger.warning(f"[分组增强] 查询对 {pair_idx}: 总长度过长 ({total_length}字符),进一步截断父文档至 {available_length} 字符")
+                    else:
+                        # 父文档太短,不添加父文档
+                        logger.warning(f"[分组增强] 查询对 {pair_idx}: 父文档无法截断(可用空间仅{available_length}字符),跳过父文档增强")
+                        parent_text = ""
+
+                enhanced_content = original_text + f"\n【参考文档】\n{parent_text}\n" if parent_text else original_text
+
                 enhanced_list.append({
-                    'text_content': result.get('text_content', '') + f"\n【参考文档】\n{parent_text}\n",
+                    'text_content': enhanced_content,
                     'metadata': result.get('metadata', {}),
                     'hybrid_similarity': result.get('hybrid_similarity'),
                     'rerank_score': result.get('rerank_score'),
                     'bfp_rerank_score': result.get('bfp_rerank_score'),
                     'bfp_rerank_parent_id': result.get('bfp_rerank_parent_id', ''),
                     'source_entity': result.get('source_entity', ''),
-                    'enhanced': True
+                    'enhanced': True if parent_text else False
                 })
 
         if enhanced_list:
@@ -247,7 +276,7 @@ def enhance_with_parent_docs_grouped(
 
 
 def extract_query_pairs_results(bfp_result_lists: List, query_pairs: List[Dict] = None,
-                                score_threshold: float = 0.8) -> List[Dict[str, Any]]:
+                                score_threshold: float = 0.5) -> List[Dict[str, Any]]:
     """
     从检索结果中提取每个查询对的最高得分结果(得分必须大于阈值),返回列表格式
 
@@ -258,7 +287,7 @@ def extract_query_pairs_results(bfp_result_lists: List, query_pairs: List[Dict]
     Args:
         bfp_result_lists: 检索结果列表(二维列表,每个子列表对应一个查询对)
         query_pairs: 查询对列表(用于映射实体名称、背景、参数)
-        score_threshold: 得分阈值,只返回得分大于该阈值的结果(默认0.8
+        score_threshold: 得分阈值,只返回得分大于该阈值的结果(默认0.5
 
     Returns:
         List[Dict]: 结果列表,每个元素包含:

+ 4 - 2
core/construction_review/component/reviewers/prompt/technical_reviewers.yaml

@@ -24,7 +24,8 @@ non_parameter_compliance_check:
     3. 对于术语概念不得曲解
     4. 没有明显安全合规问题的内容不予检查,输出无明显问题
     5. 已检查出的问题项仅输出一次检查结果,禁止对同一内容重复检查
-    6. 若审查参考与审查内容相关性过低,可不予参考该条文,直接实施审查
+    6. 若审查参考与审查内容相关性过低,不予检查,输出无明显问题
+    7. 务必注意,只有在审查参考与审查内容相关时才能依据审查参考的内容进行问题检查,否则输出无明显问题
 
   user_prompt_template: |
     请审查以下内容的安全合规性和强制性标准符合性:
@@ -70,7 +71,8 @@ parameter_compliance_check:
     3. 对于术语概念不得曲解
     4. 没有明显参数合规问题的内容不予检查,输出无明显问题
     5. 已检查出的问题项仅输出一次检查结果,禁止对同一内容重复检查
-    6. 若审查参考与审查内容相关性过低,可不予参考该条文,直接实施审查
+    6. 若审查参考与审查内容相关性过低,不予检查,输出无明显问题
+    7. 务必注意,只有在审查参考与审查内容相关时才能依据审查参考的内容进行问题检查,否则输出无明显问题
 
   user_prompt_template: |
     请审查以下内容的技术参数精确性、实体概念和工程术语的正确性:

+ 15 - 15
core/construction_review/component/reviewers/grammar_check.py → core/construction_review/component/reviewers/sensitive_word_check.py

@@ -14,7 +14,7 @@ from foundation.observability.logger.loggering import server_logger as logger
 
 # 模型配置信息
 # 暂时写死,未来从配置文件读取
-GRAMMAR_CHECK_MODEL_CONFIG = {
+sensitive_word_check_MODEL_CONFIG = {
     "base_url": "http://192.168.91.253:8003/v1",
     "api_key": "sk-123456",
     "model": "qwen3-30b",
@@ -29,12 +29,12 @@ class GrammarCheckReviewer:
     def __init__(self):
         """初始化语法检查审查器"""
         self.client = AsyncOpenAI(
-            base_url=GRAMMAR_CHECK_MODEL_CONFIG["base_url"],
-            api_key=GRAMMAR_CHECK_MODEL_CONFIG["api_key"]
+            base_url=sensitive_word_check_MODEL_CONFIG["base_url"],
+            api_key=sensitive_word_check_MODEL_CONFIG["api_key"]
         )
-        self.model = GRAMMAR_CHECK_MODEL_CONFIG["model"]
-        self.temperature = GRAMMAR_CHECK_MODEL_CONFIG["temperature"]
-        self.max_tokens = GRAMMAR_CHECK_MODEL_CONFIG["max_tokens"]
+        self.model = sensitive_word_check_MODEL_CONFIG["model"]
+        self.temperature = sensitive_word_check_MODEL_CONFIG["temperature"]
+        self.max_tokens = sensitive_word_check_MODEL_CONFIG["max_tokens"]
         
     async def check_grammar(
         self, 
@@ -72,7 +72,7 @@ class GrammarCheckReviewer:
             # 获取提示词模板
             prompt_template = prompt_loader.get_prompt_template(
                 "basic", 
-                "grammar_check", 
+                "sensitive_word_check", 
                 **prompt_kwargs
             )
             
@@ -113,7 +113,7 @@ class GrammarCheckReviewer:
             result = ReviewResult(
                 success=True,
                 details={
-                    "name": "grammar_check",
+                    "name": "sensitive_word_check",
                     "response": model_response
                 },
                 error_message=None,
@@ -123,7 +123,7 @@ class GrammarCheckReviewer:
             # 推送审查完成信息
             if state and state.get("progress_manager"):
                 review_result_data = {
-                    'name': 'grammar_check',
+                    'name': 'sensitive_word_check',
                     'success': result.success,
                     'details': result.details,
                     'error_message': result.error_message,
@@ -137,13 +137,13 @@ class GrammarCheckReviewer:
                         stage_name=stage_name,
                         current=None,
                         status="processing",
-                        message=f"grammar_check 审查完成,耗时: {result.execution_time:.2f}s",
+                        message=f"sensitive_word_check 审查完成,耗时: {result.execution_time:.2f}s",
                         issues=[review_result_data],
                         event_type="processing"
                     )
                 )
                 
-            logger.info(f"grammar_check 审查完成,耗时: {result.execution_time:.2f}s")
+            logger.info(f"sensitive_word_check 审查完成,耗时: {result.execution_time:.2f}s")
             
             return result
             
@@ -155,7 +155,7 @@ class GrammarCheckReviewer:
             # 返回失败结果
             result = ReviewResult(
                 success=False,
-                details={"name": "grammar_check"},
+                details={"name": "sensitive_word_check"},
                 error_message=error_msg,
                 execution_time=execution_time
             )
@@ -163,7 +163,7 @@ class GrammarCheckReviewer:
             # 推送失败信息
             if state and state.get("progress_manager"):
                 review_result_data = {
-                    'name': 'grammar_check',
+                    'name': 'sensitive_word_check',
                     'success': False,
                     'details': result.details,
                     'error_message': error_msg,
@@ -177,7 +177,7 @@ class GrammarCheckReviewer:
                         stage_name=stage_name,
                         current=None,
                         status="processing",
-                        message=f"grammar_check 审查失败: {error_msg}",
+                        message=f"sensitive_word_check 审查失败: {error_msg}",
                         issues=[review_result_data],
                         event_type="processing"
                     )
@@ -187,5 +187,5 @@ class GrammarCheckReviewer:
 
 
 # 全局单例实例
-grammar_check_reviewer = GrammarCheckReviewer()
+sensitive_word_check_reviewer = GrammarCheckReviewer()
 

+ 3 - 3
utils_test/RAG_Test/rag_pipeline_web/rag_pipeline_server.py

@@ -181,9 +181,9 @@ def rag_enhanced_check(query_content: str) -> dict:
             "output": {"error": str(e), "enhanced_results": enhanced_results}
         }
 
-    # Step 4: 提取结果(按查询对区分,只保留得分>0.8的结果)
+    # Step 4: 提取结果(按查询对区分,只保留得分>0.5的结果)
     step4_start = time.time()
-    entity_results = extract_query_pairs_results(enhanced_results, query_pairs, score_threshold=0.8) if enhanced_results else []
+    entity_results = extract_query_pairs_results(enhanced_results, query_pairs, score_threshold=0.5) if enhanced_results else []
 
     pipeline_data["steps"]["4_extract_query_pairs_results"] = {
         "name": "按查询对提取结果",
@@ -191,7 +191,7 @@ def rag_enhanced_check(query_content: str) -> dict:
         "input": {
             "enhanced_results_count": len(enhanced_results) if enhanced_results else 0,
             "query_pairs_count": len(query_pairs),
-            "score_threshold": 0.8
+            "score_threshold": 0.5
         },
         "output": {
             "entity_results": entity_results,