Explorar o código

v0.0.5-功能优化-条文完整性审查
- 实现按章审查逻辑
- 增加完整性审查溯源字段
- 优化完整性审查可读性

WangXuMing hai 1 mes
pai
achega
297d694a34

+ 21 - 5
core/construction_review/component/reviewers/check_completeness/components/result_analyzer.py

@@ -227,11 +227,25 @@ class ResultAnalyzer(IResultAnalyzer):
         """
         """
         all_issues = []
         all_issues = []
         metadata = {}
         metadata = {}
-
+        suorces_eum = {
+            "basis": "编制依据",
+            "overview": "工程概况",
+            "plan": "施工计划",
+            "technology": "施工工艺技术",
+            "safety": "安全保证措施",
+            "quality": "质量保证措施",
+            "environment": "环境保证措施",
+            "management": "施工管理及作业人员配备与分工",
+            "acceptance": "验收要求",
+            "other": "其他资料"
+            }
         for row in summary_rows:
         for row in summary_rows:
             level2 = (row.get("二级目录") or "").strip()
             level2 = (row.get("二级目录") or "").strip()
             requirement = (row.get("内容要求") or "").strip()
             requirement = (row.get("内容要求") or "").strip()
-
+            reference_source = '桥梁公司危险性较大工程管理实施细则(2025版)'
+            reason= f"参照:{reference_source} 中的内容要求,{row.get('section_label', '')}内容属于,专项施工方案内容要求中的 【{suorces_eum[row.get("标签", "")]}】 板块,应包含{requirement}"
+            review_references = (row.get("依据") or "").strip()
+            
             missing_points_raw = row.get("缺失的要点", "")
             missing_points_raw = row.get("缺失的要点", "")
             missing_points = self._parse_list_field(missing_points_raw)
             missing_points = self._parse_list_field(missing_points_raw)
             if not missing_points:
             if not missing_points:
@@ -244,9 +258,9 @@ class ResultAnalyzer(IResultAnalyzer):
             requirement_list = requirement.split(':')[-1].split(';')
             requirement_list = requirement.split(':')[-1].split(';')
             requirement_text = ';'.join([requirement_list[i-1] for i in missing_points])
             requirement_text = ';'.join([requirement_list[i-1] for i in missing_points])
             issue_point = (
             issue_point = (
-                f"[{level2}内容缺失]未包含要点:{requirement_text}"
+                f"{row.get('section_label', '')}下缺失{suorces_eum[row.get("标签", "")]}中的【{level2}】内容"
             )
             )
-            suggestion = f"补充:{requirement_text}" if requirement else "补充缺失要点内容"
+            suggestion = f"建议补充:{requirement_text}" if requirement else "补充缺失要点内容"
             risk_level = self._map_risk_level(len(missing_points))
             risk_level = self._map_risk_level(len(missing_points))
 
 
             # 构建问题项并添加到列表
             # 构建问题项并添加到列表
@@ -254,8 +268,10 @@ class ResultAnalyzer(IResultAnalyzer):
                 "issue_point": issue_point,
                 "issue_point": issue_point,
                 "location": row.get("section_label", ""),
                 "location": row.get("section_label", ""),
                 "suggestion": suggestion,
                 "suggestion": suggestion,
-                "reason": requirement,
+                "reason": reason,
                 "risk_level": risk_level,
                 "risk_level": risk_level,
+                "review_references": review_references,
+                "reference_source": reference_source
             }
             }
             all_issues.append(issue_item)
             all_issues.append(issue_item)
 
 

+ 23 - 4
core/construction_review/component/reviewers/utils/inter_tool.py

@@ -530,13 +530,19 @@ class InterTool:
 
 
             # 3. 如果JSON解析失败,回退到文本解析
             # 3. 如果JSON解析失败,回退到文本解析
             if not review_lists:
             if not review_lists:
+                # 🔧 修复:检查响应是否为空或只包含空白字符
+                response_stripped = response.strip() if isinstance(response, str) else ""
+                is_empty_response = not response_stripped or response_stripped in ["", "null", "None", "undefined"]
+
                 risk_level = self._determine_risk_level(response)
                 risk_level = self._determine_risk_level(response)
+
+                # 如果响应为空,则设置 exist_issue=False
                 review_lists.append({
                 review_lists.append({
                     "check_item": check_name,
                     "check_item": check_name,
                     "chapter_code": chapter_code,
                     "chapter_code": chapter_code,
                     "check_item_code": check_item_code,
                     "check_item_code": check_item_code,
                     "check_result": response,
                     "check_result": response,
-                    "exist_issue": True,
+                    "exist_issue": not is_empty_response,  # 🔧 修复:空响应不存在问题
                     "risk_info": {"risk_level": risk_level}
                     "risk_info": {"risk_level": risk_level}
                 })
                 })
 
 
@@ -632,9 +638,22 @@ class InterTool:
         """创建单个审查问题项"""
         """创建单个审查问题项"""
         risk_level = self._determine_risk_level(issue_data.get("risk_level", ""))
         risk_level = self._determine_risk_level(issue_data.get("risk_level", ""))
 
 
-        # 根据原始风险等级判断是否存在问题
-        original_risk_level = issue_data.get("risk_level", "")
-        exist_issue = original_risk_level not in ["无风险", "无", "通过", "符合要求"]
+        # 🔧 修复:首先检查 issue_data 是否为空
+        is_empty = False
+        if isinstance(issue_data, list):
+            is_empty = len(issue_data) == 0
+        elif isinstance(issue_data, dict):
+            # 检查是否为空字典,或者只有 risk_level 字段但没有其他实质内容
+            is_empty = len(issue_data) == 0 or (len(issue_data) == 1 and "risk_level" in issue_data)
+
+        # 根据原始风险等级和内容判断是否存在问题
+        original_risk_level = issue_data.get("risk_level", "") if isinstance(issue_data, dict) else ""
+        # 只有当内容不为空,且风险等级不是"无风险"类时,才认为存在问题
+        exist_issue = not is_empty and original_risk_level not in ["无风险", "无", "通过", "符合要求"]
+
+        # 记录调试信息
+        if is_empty:
+            logger.debug(f"检查项 {check_name} 的 issue_data 为空,设置 exist_issue=False")
 
 
         return {
         return {
             "check_item": check_name,
             "check_item": check_name,

+ 20 - 8
core/construction_review/component/reviewers/utils/punctuation_checker.py

@@ -36,6 +36,7 @@ SYSTEM = """
 - title_mark_status:书名号需完全包裹规范名称,且不多包/漏包
 - title_mark_status:书名号需完全包裹规范名称,且不多包/漏包
 - bracket_status:括号需完全包裹规范编号,且不多包/漏包;编号可能是各种形式,如果文本中没有编号,设置为null
 - bracket_status:括号需完全包裹规范编号,且不多包/漏包;编号可能是各种形式,如果文本中没有编号,设置为null
 
 
+
 【输出要求】
 【输出要求】
 - 为每个输入文本输出一个检查结果
 - 为每个输入文本输出一个检查结果
 - 确保输出数量与输入一致
 - 确保输出数量与输入一致
@@ -67,6 +68,10 @@ HUMAN = """
 - 书名号包裹了完整的规范名称 → title_mark_status=true
 - 书名号包裹了完整的规范名称 → title_mark_status=true
 - 英文括号包裹了完整的编号 → bracket_status=true(混用不算错)
 - 英文括号包裹了完整的编号 → bracket_status=true(混用不算错)
 
 
+示例4:《起重机械钢丝绳保养维护检验和报废》GB/T5972-2023;
+- 书名号包裹了完整的规范名称 → title_mark_status=true
+- 编号未被包裹 → bracket_status=false
+
 【待检查文本】
 【待检查文本】
 {items}
 {items}
 
 
@@ -225,8 +230,15 @@ async def check_punctuation(items: List[str]) -> str:
         try:
         try:
             raw = await chain.ainvoke(payload)
             raw = await chain.ainvoke(payload)
             data = extract_first_json(raw)
             data = extract_first_json(raw)
-            findings = PunctuationResults.model_validate(data)
-            llm_result = [x.model_dump() for x in findings.items]
+
+            # 兼容两种格式:带 items 字段或不带 items 字段(单个对象)
+            if "items" in data:
+                findings = PunctuationResults.model_validate(data)
+                llm_result = [x.model_dump() for x in findings.items]
+            else:
+                # LLM 返回了单个对象,包装成列表
+                single_result = PunctuationResult.model_validate(data)
+                llm_result = [single_result.model_dump()]
             break
             break
         except (Exception, ValidationError, json.JSONDecodeError) as e:
         except (Exception, ValidationError, json.JSONDecodeError) as e:
             last_err = e
             last_err = e
@@ -259,12 +271,12 @@ if __name__ == "__main__":
 
 
     # 测试用例
     # 测试用例
     test_items = [
     test_items = [
-        "(4)《中华人民共和国突发事件应对法》【主席令〔2007〕第 69 号】;",  # 正确
-        "《混》凝土结构设计规范(GB 50010-2010)",      # 缺少书名号
-        "建筑施工组织设计规范GB/T 50502-2015",  # 缺少括号
-        "《建筑抗震设计规范》(GB 50011)-2001",       # 括号不成对
-        "《城市道路工程设计规范(CJJ 37-2012)",    # 书名号不成对
-        "《公路工程技术标准》(JTG B01-2014)",     # 正确
+        "《起重机械钢丝绳保养维护检验和报废》GB/T5972-2023;"  # 正确
+        # "《混》凝土结构设计规范(GB 50010-2010)",      # 缺少书名号
+        # "建筑施工组织设计规范GB/T 50502-2015",  # 缺少括号
+        # "《建筑抗震设计规范》(GB 50011)-2001",       # 括号不成对
+        # "《城市道路工程设计规范(CJJ 37-2012)",    # 书名号不成对
+        # "《公路工程技术标准》(JTG B01-2014)",     # 正确
     ]
     ]
 
 
     result = asyncio.run(check_punctuation(test_items))
     result = asyncio.run(check_punctuation(test_items))

+ 1 - 16
core/construction_review/component/reviewers/utils/punctuation_result_processor.py

@@ -176,23 +176,8 @@ if __name__ == "__main__":
     # 模拟 punctuation_checker 的返回结果
     # 模拟 punctuation_checker 的返回结果
     check_results = json.dumps([
     check_results = json.dumps([
         {
         {
-            "original_text": "《混凝土结构设计规范》",
+            "original_text": "《起重机械钢丝绳保养、维护、检验和报废》GB/T5972-2023;",
             "title_mark_status": True,
             "title_mark_status": True,
-            "bracket_status": "null"
-        },
-        {
-            "original_text": "《混凝土结构设计规范》【GB 50010-2010】",
-            "title_mark_status": True,
-            "bracket_status": False
-        },
-        {
-            "original_text": "《建筑施工组织设计规范》(GB/T 50502-2015)",
-            "title_mark_status": True,
-            "bracket_status": True
-        },
-        {
-            "original_text": "建筑抗震设计规范 GB 50011-2010",
-            "title_mark_status": False,
             "bracket_status": False
             "bracket_status": False
         }
         }
     ], ensure_ascii=False)
     ], ensure_ascii=False)

+ 14 - 1
core/construction_review/workflows/ai_review_workflow.py

@@ -332,7 +332,20 @@ class AIReviewWorkflow:
             # structured_content["chunks"] = chunks
             # structured_content["chunks"] = chunks
 
 
             total_chapters = len(review_item_dict_sorted)
             total_chapters = len(review_item_dict_sorted)
-            total_chunks = len(filtered_chunks)
+            with open("temp/filtered_chunks/review_item_dict_sorted.json", "w", encoding="utf-8") as f:
+                json.dump(review_item_dict_sorted, f, ensure_ascii=False, indent=4)
+            # 如果review_item_dict_sorted中只包含check_completeness,则total_chunks 仅计算chunk中is_complete_field = true的chunk数量
+            all_check_items = []
+            for check_list in review_item_dict_sorted.values():
+                all_check_items.extend(check_list)  # 把每个分类的检查项加入总列表
+
+            # 判断:所有检查项是否都只有 "check_completeness"(无其他检查项)
+            if all(item == "check_completeness" for item in all_check_items):
+                # 仅统计 is_complete_field = True 的chunk数量(用生成器表达式省内存)
+                total_chunks = sum(1 for chunk in filtered_chunks if chunk.get("is_complete_field", False))
+            else:
+                # 统计所有 filtered_chunks
+                total_chunks = len(filtered_chunks)
 
 
             # 初始化issues列表
             # 初始化issues列表
             all_issues = []
             all_issues = []

+ 0 - 38
core/construction_review/workflows/core_functions/ai_review_core_fun.py

@@ -29,7 +29,6 @@ AI审查核心功能类 - 负责具体的审查逻辑和数据处理
 ├── _execute_technical_review()     # 执行技术性审查(参数/非参数合规性检查)
 ├── _execute_technical_review()     # 执行技术性审查(参数/非参数合规性检查)
 ├── _group_chunks_by_chapter()      # 按章节代码对chunks进行分组
 ├── _group_chunks_by_chapter()      # 按章节代码对chunks进行分组
 ├── _extract_issues_from_result()   # 从审查结果中提取issues列表
 ├── _extract_issues_from_result()   # 从审查结果中提取issues列表
-├── _format_chunk_results_to_issues() # 格式化单个块的审查结果为issues列表
 └── _dummy_review_task()            # 空任务(方法不存在时使用)
 └── _dummy_review_task()            # 空任务(方法不存在时使用)
 '''
 '''
 
 
@@ -206,43 +205,6 @@ class AIReviewCoreFun:
 
 
         return issues
         return issues
 
 
-    def _format_chunk_results_to_issues(
-        self,
-        state: AIReviewState,
-        chunk_index: int,
-        chunk: Dict[str, Any],
-        chapter_code: str,
-        chunk_results: Dict[str, Any]
-    ) -> List[Dict]:
-        """
-        格式化单个块的所有审查结果为issues列表
-
-        Args:
-            state: AI审查状态
-            chunk_index: 块索引
-            chunk: 块内容
-            chapter_code: 章节代码
-            chunk_results: 块审查结果字典 {func_name: result}
-
-        Returns:
-            List[Dict]: issues列表
-        """
-        issues = []
-
-        for func_name, result in chunk_results.items():
-            if result is None:
-                continue
-
-            # 处理错误结果
-            if isinstance(result, dict) and "error" in result:
-                logger.warning(f"审查方法 {func_name} 返回错误: {result['error']}")
-                continue
-
-            # 提取issues
-            extracted = self._extract_issues_from_result(result)
-            issues.extend(extracted)
-
-        return issues
 
 
     def _group_chunks_by_chapter(self, chunks: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
     def _group_chunks_by_chapter(self, chunks: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
         """
         """