Преглед на файлове

chore: 删除废弃的 llm_content_classifier_v2 模块及关联脚本

移除不再使用的 v2 内容分类器完整模块(12个文件),同步删除 check_imports.py 和 debug_simple.py,清理 inter_tool.py 中的 v2 引用。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
WangXuMing преди 1 седмица
родител
ревизия
ba11d27b0b

+ 0 - 21
core/construction_review/component/reviewers/utils/inter_tool.py

@@ -342,27 +342,6 @@ class InterTool:
                 logger.info(f"🔍 规范性审查结果处理完成,添加 {len(review_lists)} 个问题项")
                 continue
 
-            # 🔧 特殊处理:timeliness_content_reviewer 的返回格式
-            if check_key == 'timeliness_content_reviewer' and isinstance(check_result, dict):
-                content_timeliness_data = check_result.get('timeliness_content_review_results', {})
-                batch_results = content_timeliness_data.get('review_results', [])
-                logger.debug(f"🔍 [DEBUG] 处理内容时效性审查结果,问题数: {len(batch_results)}")
-
-                for item in batch_results:
-                    if isinstance(item, dict):
-                        review_lists.append({
-                            "check_item": item.get('check_item', 'content_timeliness_check'),
-                            "chapter_code": item.get('chapter_code', chapter_code),
-                            "check_item_code": item.get('check_item_code', f"{chapter_code}_content_timeliness_check"),
-                            "check_result": item.get('check_result', item),
-                            "exist_issue": item.get('exist_issue', False),
-                            "risk_info": item.get('risk_info', {"risk_level": "low"}),
-                            "location_info": item.get('location_info', []),
-                            "content_context": item.get('content_context', '')
-                        })
-                logger.info(f"🔍 内容时效性审查结果处理完成,添加 {len(batch_results)} 个问题项")
-                continue
-
             # 🔧 类型安全检查:支持字典和 base_reviewer.ReviewResult 对象
             is_dict = isinstance(check_result, dict)
             is_review_result = hasattr(check_result, 'details') and hasattr(check_result, 'success')

+ 0 - 60
core/construction_review/component/reviewers/utils/llm_content_classifier_v2/__init__.py

@@ -1,60 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-LLM 内容三级分类识别模块 v2
-
-重构后的模块化版本,向后兼容原有接口。
-"""
-
-from .models import CategoryStandard, SecondCategoryStandard, ClassifiedContent, SectionContent, ClassificationResult
-from .config import ClassifierConfig, DEFAULT_CONFIG, MAX_CONCURRENT_REQUESTS, MAX_RETRIES, RETRY_DELAY, EMBEDDING_SIMILARITY_THRESHOLD, CATEGORY_TABLE_PATH, SECOND_CATEGORY_PATH
-from .category_loaders import SECONDARY_CATEGORY_KEYWORDS, CategoryStandardLoader, SecondCategoryStandardLoader
-from .embedding_client import EmbeddingClient
-from .content_classifier import ContentClassifierClient
-from .chunks_converter import ChunksConverter
-from .main_classifier import LLMContentClassifier, classify_chunks, classify_chunks_sync
-from .text_split_utils import split_section_into_chunks
-from .prompt import (
-    CLASSIFY_SYSTEM_PROMPT,
-    SUPPLEMENT_VERIFY_SYSTEM_PROMPT,
-    build_classify_prompt,
-    build_fix_prompt,
-    build_supplement_verify_prompt,
-)
-
-__all__ = [
-    # 提示词
-    "CLASSIFY_SYSTEM_PROMPT",
-    "SUPPLEMENT_VERIFY_SYSTEM_PROMPT",
-    "build_classify_prompt",
-    "build_fix_prompt",
-    "build_supplement_verify_prompt",
-    # 数据模型
-    "CategoryStandard",
-    "SecondCategoryStandard",
-    "ClassifiedContent",
-    "SectionContent",
-    "ClassificationResult",
-    # 配置
-    "ClassifierConfig",
-    "DEFAULT_CONFIG",
-    "MAX_CONCURRENT_REQUESTS",
-    "MAX_RETRIES",
-    "RETRY_DELAY",
-    "EMBEDDING_SIMILARITY_THRESHOLD",
-    "CATEGORY_TABLE_PATH",
-    "SECOND_CATEGORY_PATH",
-    # 加载器
-    "SECONDARY_CATEGORY_KEYWORDS",
-    "CategoryStandardLoader",
-    "SecondCategoryStandardLoader",
-    # 客户端
-    "EmbeddingClient",
-    "ContentClassifierClient",
-    "ChunksConverter",
-    "LLMContentClassifier",
-    # 便捷函数
-    "classify_chunks",
-    "classify_chunks_sync",
-    "split_section_into_chunks",
-]

+ 0 - 228
core/construction_review/component/reviewers/utils/llm_content_classifier_v2/category_loaders.py

@@ -1,228 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-CSV加载器 + 关键词映射
-"""
-
-import csv
-from pathlib import Path
-from typing import Dict, List, Optional
-
-from .models import CategoryStandard, SecondCategoryStandard
-
-
-# ==================== 二级分类关键词映射 ====================
-# 用于将文档中的二级标题名称映射到 StandardCategoryTable.csv 中的标准名称
-# 格式: { CSV标准名称: [可能的文档名称列表] }
-SECONDARY_CATEGORY_KEYWORDS = {
-    # 编制依据 (basis)
-    "法律法规": ["法律法规", "法律", "法规"],
-    "标准规范": ["标准规范", "标准", "规范", "技术标准"],
-    "文件制度": ["文件制度", "制度文件", "管理文件"],
-    "编制原则": ["编制原则", "原则"],
-    "编制范围": ["编制范围", "范围", "工程范围"],
-
-    # 工程概况 (overview)
-    "设计概况": ["设计概况", "工程简介", "工程概况", "概况"],
-    "工程地质与水文气象": ["工程地质与水文气象", "地质", "水文", "气象", "工程地质", "水文气象", "地质与水文"],
-    "周边环境": ["周边环境", "环境", "周围环境"],
-    "施工平面及立面布置": ["施工平面及立面布置", "平面布置", "立面布置", "施工平面", "平面及立面"],
-    "施工要求和技术保证条件": ["施工要求和技术保证条件", "施工要求", "技术保证", "保证条件"],
-    "风险辨识与分级": ["风险辨识与分级", "风险辨识", "风险分级", "风险", "风险等级"],
-    "参建各方责任主体单位": ["参建各方责任主体单位", "参建单位", "责任主体", "参建各方"],
-
-    # 施工计划 (plan)
-    "施工进度计划": ["施工进度计划", "进度计划", "进度", "工期计划"],
-    "施工材料计划": ["施工材料计划", "材料计划", "材料"],
-    "施工设备计划": ["施工设备计划", "设备计划", "机械设备", "设备"],
-    "劳动力计划": ["劳动力计划", "劳动力", "人员计划", "用工计划"],
-    "安全生产费用使用计划": ["安全生产费用使用计划", "安全费用", "安全费", "安全生产费用"],
-
-    # 施工工艺技术 (technology)
-    "主要施工方法概述": ["主要施工方法概述", "施工方法概述", "方法概述", "施工方法"],
-    "技术参数": ["技术参数", "参数", "技术指标"],
-    "工艺流程": ["工艺流程", "流程", "施工流程"],
-    "施工准备": ["施工准备", "准备", "准备工作"],
-    "施工方法及操作要求": ["施工方法及操作要求", "施工方案及操作要求", "操作要求", "施工方案", "施工方法", "方法及操作"],
-    "检查要求": ["检查要求", "检查", "验收要求", "检查验收"],
-
-    # 安全保证措施 (safety)
-    "安全保证体系": ["安全保证体系", "安全体系", "安全管理体系"],
-    "组织保证措施": ["组织保证措施", "组织措施", "组织保证"],
-    "技术保证措施": ["技术保证措施", "技术保障措施", "技术措施", "保障措施", "技术保障", "安全防护措施", "安全防护"],
-    "监测监控措施": ["监测监控措施", "监测措施", "监控措施", "监测监控"],
-    "应急处置措施": ["应急处置措施", "应急预案", "应急措施", "应急处置"],
-
-    # 质量保证措施 (quality)
-    "质量保证体系": ["质量保证体系", "质量体系", "质量管理体系"],
-    "质量目标": ["质量目标", "质量指标"],
-    "工程创优规划": ["工程创优规划", "创优规划", "创优计划", "创优"],
-    "质量控制程序与具体措施": ["质量控制程序与具体措施", "质量控制", "质量措施", "质量控制措施"],
-
-    # 环境保证措施 (environment)
-    "环境保证体系": ["环境保证体系", "环境体系", "环境管理体系"],
-    "环境保护组织机构": ["环境保护组织机构", "环保组织", "环境组织"],
-    "环境保护及文明施工措施": ["环境保护及文明施工措施", "环保措施", "文明施工", "环境保护", "环境措施"],
-
-    # 施工管理及作业人员配备与分工 (management)
-    "施工管理人员": ["施工管理人员", "管理人员", "管理人员配备"],
-    "专职安全生产管理人员": ["专职安全生产管理人员", "专职安全员", "安全管理人员", "安全员", "特种作业人员", "特种工"],
-    "其他作业人员": ["其他作业人员", "其他人员", "作业人员"],
-
-    # 验收要求 (acceptance)
-    "验收标准": ["验收标准", "验收规范", "标准"],
-    "验收程序": ["验收程序", "验收流程", "程序"],
-    "验收内容": ["验收内容", "验收项目"],
-    "验收时间": ["验收时间", "验收日期"],
-    "验收人员": ["验收人员", "验收参与人员"],
-
-    # 其他资料 (other)
-    "计算书": ["计算书", "计算", "验算"],
-    "相关施工图纸": ["相关施工图纸", "施工图纸", "图纸"],
-    "附图附表": ["附图附表", "附图", "附表"],
-    "编制及审核人员情况": ["编制及审核人员情况", "编制人员", "审核人员"],
-}
-
-
-# ==================== 标准分类加载器 ====================
-
-class CategoryStandardLoader:
-    """加载 StandardCategoryTable.csv"""
-
-    def __init__(self, csv_path: Path):
-        self.csv_path = csv_path
-        self.standards: List[CategoryStandard] = []
-        self._load()
-
-    def _load(self):
-        """加载CSV文件"""
-        with open(self.csv_path, 'r', encoding='utf-8-sig') as f:  # utf-8-sig处理BOM
-            reader = csv.DictReader(f)
-            for row in reader:
-                self.standards.append(CategoryStandard(
-                    first_code=row.get('first_code', ''),
-                    first_name=row.get('first_name', ''),
-                    first_seq=int(row.get('first_seq', '0') or 0),
-                    second_code=row.get('second_code', ''),
-                    second_name=row.get('second_name', ''),
-                    second_seq=int(row.get('second_seq', '0') or 0),
-                    second_focus=row.get('second_focus', ''),
-                    third_code=row.get('third_code', ''),
-                    third_name=row.get('third_name', ''),
-                    third_seq=int(row.get('third_seq', '0') or 0),
-                    third_focus=row.get('third_focus', ''),
-                    keywords=row.get('keywords', ''),
-                    extra_prompt=row.get('extra_prompt', '')
-                ))
-
-    def get_standards_by_second_code(self, second_code: str) -> List[CategoryStandard]:
-        """根据二级分类代码获取对应的三级分类标准"""
-        return [s for s in self.standards if s.second_code == second_code]
-
-    def _find_standard_name_by_keyword(self, second_name: str) -> Optional[str]:
-        """
-        通过关键词映射查找标准二级分类名称
-
-        Args:
-            second_name: 文档中的二级标题名称
-
-        Returns:
-            匹配到的标准名称,未匹配返回None
-        """
-        cleaned_name = second_name.strip().lower()
-
-        # 遍历映射表进行匹配
-        for standard_name, keywords in SECONDARY_CATEGORY_KEYWORDS.items():
-            for keyword in keywords:
-                # 宽容匹配:关键词在标题中,或标题在关键词中
-                if keyword.lower() in cleaned_name or cleaned_name in keyword.lower():
-                    return standard_name
-
-        return None
-
-    def get_standards_by_second_name(self, second_name: str) -> List[CategoryStandard]:
-        """
-        根据二级分类名称获取对应的三级分类标准(支持模糊匹配)
-
-        匹配优先级:
-        1. 完全匹配 CSV 中的标准名称
-        2. 包含关系匹配(标准名包含标题名,或标题名包含标准名)
-        3. 关键词映射匹配(通过 SECONDARY_CATEGORY_KEYWORDS)
-
-        Args:
-            second_name: 二级标题名称
-
-        Returns:
-            匹配到的三级分类标准列表
-        """
-        cleaned_name = second_name.strip()
-
-        # 1. 先尝试完全匹配
-        exact = [s for s in self.standards if s.second_name == cleaned_name]
-        if exact:
-            return exact
-
-        # 2. 包含关系匹配(取第一个命中的 second_name,再返回同名的全部行)
-        for s in self.standards:
-            if s.second_name in cleaned_name or cleaned_name in s.second_name:
-                matched_name = s.second_name
-                return [st for st in self.standards if st.second_name == matched_name]
-
-        # 3. 使用关键词映射进行模糊匹配
-        matched_standard_name = self._find_standard_name_by_keyword(cleaned_name)
-        if matched_standard_name:
-            return [s for s in self.standards if s.second_name == matched_standard_name]
-
-        return []
-
-
-class SecondCategoryStandardLoader:
-    """加载 construction_plan_standards.csv(二级分类标准)"""
-
-    def __init__(self, csv_path: Path):
-        self.csv_path = csv_path
-        self.standards: List[SecondCategoryStandard] = []
-        self._load()
-
-    def _load(self):
-        """加载CSV文件"""
-        with open(self.csv_path, 'r', encoding='utf-8-sig') as f:  # utf-8-sig处理BOM
-            reader = csv.DictReader(f)
-            for row in reader:
-                self.standards.append(SecondCategoryStandard(
-                    first_name=row.get('first_name', '').strip(),
-                    second_name=row.get('second_name', '').strip(),
-                    second_raw_content=row.get('second_raw_content', '').strip()
-                ))
-
-    def get_standard_by_second_name(self, second_name: str) -> Optional[SecondCategoryStandard]:
-        """根据二级分类名称获取标准定义(支持模糊匹配)"""
-        # 清理待匹配的名称
-        cleaned_name = second_name.strip().lower()
-
-        # 1. 先尝试完全匹配或包含关系匹配
-        for std in self.standards:
-            # 完全匹配
-            if std.second_name.lower() == cleaned_name:
-                return std
-            # 包含关系匹配
-            if std.second_name.lower() in cleaned_name or cleaned_name in std.second_name.lower():
-                return std
-
-        # 2. 使用关键词映射进行模糊匹配
-        matched_standard_name = None
-        for standard_name, keywords in SECONDARY_CATEGORY_KEYWORDS.items():
-            for keyword in keywords:
-                if keyword.lower() in cleaned_name or cleaned_name in keyword.lower():
-                    matched_standard_name = standard_name
-                    break
-            if matched_standard_name:
-                break
-
-        if matched_standard_name:
-            # 在standards中查找匹配的标准
-            for std in self.standards:
-                if std.second_name == matched_standard_name:
-                    return std
-
-        return None

+ 0 - 207
core/construction_review/component/reviewers/utils/llm_content_classifier_v2/chunks_converter.py

@@ -1,207 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-ChunksConverter:chunks 格式与 SectionContent 格式的转换器
-"""
-
-import re
-from typing import Any, Dict, List, Tuple
-
-from .models import ClassificationResult, SectionContent
-from .category_loaders import CategoryStandardLoader
-
-
-class ChunksConverter:
-    """chunks 格式与 SectionContent 格式的转换器"""
-
-    def __init__(self, category_loader: CategoryStandardLoader):
-        self.category_loader = category_loader
-
-    def chunks_to_sections(self, chunks: List[Dict[str, Any]]) -> List[SectionContent]:
-        """
-        将 chunks 列表转换为 SectionContent 列表
-
-        分组策略:
-        1. 优先按 section_label 分组(更精确的文档结构)
-        2. 如果 section_label 相同,再按一级分类分组
-        3. 从 section_label 提取二级分类名称用于匹配三级标准
-
-        Args:
-            chunks: 文档分块列表,每个 chunk 需包含:
-                - chapter_classification: 一级分类代码
-                - secondary_category_code: 二级分类代码(可能为 none)
-                - secondary_category_cn: 二级分类中文名
-                - review_chunk_content 或 content: 内容文本
-                - section_label: 章节标签(如 "第一章编制依据->一、法律法规")
-
-        Returns:
-            List[SectionContent]: 二级标题段落列表
-        """
-        # 按 section_label 分组(更精确)
-        # section_label 格式: "第一章编制依据->一、法律法规"
-        section_groups: Dict[str, List[Dict]] = {}
-
-        for chunk in chunks:
-            # 获取分类信息
-            section_label = chunk.get("section_label", "") or chunk.get("chapter", "")
-            first_code = chunk.get("chapter_classification", "") or chunk.get("first_code", "")
-            second_code = chunk.get("secondary_category_code", "") or chunk.get("second_code", "")
-            second_cn = chunk.get("secondary_category_cn", "") or chunk.get("second_name", "")
-
-            # 分组策略:每个二级分类独立分组,禁止合并不同二级分类
-            # 优先使用 section_label,其次使用 secondary_category_code
-            if section_label and "->" in section_label:
-                # 有明确的章节标签,使用它作为分组键
-                group_key = section_label
-            elif second_code and second_code not in ("none", "None", ""):
-                # 有二级分类代码,按二级分类独立分组(关键:不再合并到一级分类下)
-                group_key = f"{first_code}->{second_code}"
-            elif section_label:
-                group_key = section_label
-            else:
-                # 完全没有分类信息,使用唯一键避免合并
-                group_key = f"unknown_{first_code}_{id(chunk)}"
-
-            if group_key not in section_groups:
-                section_groups[group_key] = []
-            section_groups[group_key].append(chunk)
-
-        # 为每个分组创建 SectionContent
-        section_contents = []
-        all_lines = []  # 全局行号追踪
-
-        for group_key, group_chunks in section_groups.items():
-            if not group_chunks:
-                continue
-
-            # 合并该分组的所有内容,同时记录每个原始 chunk 的行范围
-            section_lines = []
-            chunk_line_counts: List[Tuple[str, int]] = []  # (chunk_id, line_count)
-            for chunk in group_chunks:
-                content = chunk.get("review_chunk_content", "") or chunk.get("content", "") or chunk.get("original_content", "")
-                if content:
-                    lines = content.split('\n')
-                    n = len(lines)
-                    chunk_id = chunk.get("chunk_id") or chunk.get("id") or str(id(chunk))
-                    chunk_line_counts.append((chunk_id, n))
-                    section_lines.extend(lines)
-                    all_lines.extend(lines)
-                else:
-                    chunk_id = chunk.get("chunk_id") or chunk.get("id") or str(id(chunk))
-                    chunk_line_counts.append((chunk_id, 0))
-
-            if not section_lines:
-                continue
-
-            # 获取一级分类代码
-            first_code = group_chunks[0].get("chapter_classification", "") or group_chunks[0].get("first_code", "")
-
-            # 获取二级分类名称和代码
-            second_code = group_chunks[0].get("secondary_category_code", "") or group_chunks[0].get("second_code", "")
-            second_cn = group_chunks[0].get("secondary_category_cn", "") or group_chunks[0].get("second_name", "")
-
-            # 从 section_label 提取二级分类名称(优先)
-            section_label = group_chunks[0].get("section_label", "") or group_chunks[0].get("chapter", "")
-            if "->" in section_label:
-                parts = section_label.split("->")
-                if len(parts) >= 2:
-                    extracted = parts[1].strip()
-                    # 去除序号前缀(如 "一、" "二、")
-                    cleaned = re.sub(r'^[一二三四五六七八九十]+[、)\s]+', '', extracted).strip()
-                    if cleaned:
-                        second_cn = cleaned
-                        # 尝试根据提取的名称匹配二级分类代码
-                        matched_standards = self.category_loader.get_standards_by_second_name(cleaned)
-                        if matched_standards:
-                            second_code = matched_standards[0].second_code
-
-            # 构建带行号的内容
-            start_line = len(all_lines) - len(section_lines) + 1
-            line_number_map = list(range(start_line, len(all_lines) + 1))
-            numbered_lines = []
-            for i, line in enumerate(section_lines):
-                numbered_lines.append(f"<{line_number_map[i]}> {line}")
-            numbered_content = '\n'.join(numbered_lines)
-
-            # 计算每个原始 chunk 在全局行号中的范围
-            chunk_ranges: List[Tuple[str, int, int]] = []
-            current_global = start_line
-            for chunk_id, n_lines in chunk_line_counts:
-                if n_lines > 0:
-                    chunk_ranges.append((chunk_id, current_global, current_global + n_lines - 1))
-                    current_global += n_lines
-
-            # 获取三级分类标准
-            category_standards = self.category_loader.get_standards_by_second_code(second_code)
-            if not category_standards:
-                category_standards = self.category_loader.get_standards_by_second_name(second_cn)
-
-            # 构建 section_key(使用 group_key 保留 section_label 信息,粒度更细)
-            section_key = group_key
-
-            section_contents.append(SectionContent(
-                section_key=section_key,
-                section_name=second_cn or second_code,
-                lines=section_lines,
-                numbered_content=numbered_content,
-                category_standards=category_standards,
-                line_number_map=line_number_map,
-                chunk_ranges=chunk_ranges
-            ))
-
-        return section_contents
-
-    def classification_result_to_chunks(
-        self,
-        result: ClassificationResult,
-        original_chunks: List[Dict[str, Any]],
-        first_code: str,
-        second_code: str
-    ) -> List[Dict[str, Any]]:
-        """
-        将 ClassificationResult 转换回 chunks 格式
-
-        将行级分类结果展开,为每个三级分类创建对应的 chunk 条目
-
-        Args:
-            result: 分类结果
-            original_chunks: 原始 chunks(用于保留其他字段)
-            first_code: 一级分类代码
-            second_code: 二级分类代码
-
-        Returns:
-            List[Dict]: 更新后的 chunks 列表
-        """
-        updated_chunks = []
-
-        # 收集所有三级分类信息,过滤掉非标准项(no_standard)
-        tertiary_classifications = []
-        for content in result.classified_contents:
-            # 跳过非标准项,不纳入三级分类统计
-            if content.third_category_code == "no_standard":
-                continue
-            tertiary_classifications.append({
-                "third_category_name": content.third_category_name,
-                "third_category_code": content.third_category_code,
-                "start_line": content.start_line,
-                "end_line": content.end_line,
-                "content": content.content
-            })
-
-        # 更新原始 chunks
-        for chunk in original_chunks:
-            updated_chunk = dict(chunk)
-            updated_chunk["first_code"] = first_code
-            updated_chunk["second_code"] = second_code
-
-            # 添加三级分类详情列表
-            updated_chunk["tertiary_classification_details"] = tertiary_classifications
-
-            # 如果有三级分类结果,设置第一个作为主要分类(向后兼容)
-            if tertiary_classifications:
-                updated_chunk["tertiary_category_code"] = tertiary_classifications[0]["third_category_code"]
-                updated_chunk["tertiary_category_cn"] = tertiary_classifications[0]["third_category_name"]
-
-            updated_chunks.append(updated_chunk)
-
-        return updated_chunks

+ 0 - 143
core/construction_review/component/reviewers/utils/llm_content_classifier_v2/config.py

@@ -1,143 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-配置类与全局变量
-
-注意:
-1. LLM 调用已统一通过 foundation.ai.agent.generate.model_generate 处理
-2. Embedding 模型已统一通过 foundation.ai.models.model_handler 处理
-本配置类仅保留并发控制和路径配置
-"""
-
-from pathlib import Path
-from dataclasses import dataclass, field
-from typing import Optional
-
-from foundation.observability.logger.loggering import review_logger as logger
-
-
-def _load_model_from_yaml() -> str:
-    """从 model_setting.yaml 加载默认模型配置
-
-    优先读取 doc_classification_tertiary 配置,其次使用 default 配置
-    """
-    try:
-        import yaml
-        yaml_path = Path(__file__).parent.parent.parent.parent.parent.parent.parent / "config" / "model_setting.yaml"
-        logger.debug(f"[ClassifierConfig] 尝试加载模型配置: {yaml_path}")
-        if yaml_path.exists():
-            with open(yaml_path, 'r', encoding='utf-8') as f:
-                settings = yaml.safe_load(f)
-                model_settings = settings.get('model_settings', {})
-                # 优先使用三级分类配置
-                tertiary_config = model_settings.get('doc_classification_tertiary', {})
-                if tertiary_config and 'model' in tertiary_config:
-                    model = tertiary_config['model']
-                    logger.info(f"[ClassifierConfig] 从 model_setting.yaml 加载三级分类模型: {model}")
-                    return model
-                # 其次使用默认配置
-                default_config = settings.get('default', {})
-                if default_config and 'model' in default_config:
-                    model = default_config['model']
-                    logger.info(f"[ClassifierConfig] 从 model_setting.yaml 加载默认模型: {model}")
-                    return model
-        else:
-            logger.warning(f"[ClassifierConfig] model_setting.yaml 不存在: {yaml_path}")
-    except Exception as e:
-        logger.warning(f"[ClassifierConfig] 加载 model_setting.yaml 失败: {e}")
-    logger.info("[ClassifierConfig] 使用兜底默认模型: qwen3_5_35b_a3b")
-    return "qwen3_5_35b_a3b"  # 兜底默认值
-
-
-def _load_thinking_mode_from_yaml() -> bool:
-    """从 model_setting.yaml 加载 thinking 模式配置"""
-    try:
-        import yaml
-        yaml_path = Path(__file__).parent.parent.parent.parent.parent.parent.parent / "config" / "model_setting.yaml"
-        if yaml_path.exists():
-            with open(yaml_path, 'r', encoding='utf-8') as f:
-                settings = yaml.safe_load(f)
-                model_settings = settings.get('model_settings', {})
-                tertiary_config = model_settings.get('doc_classification_tertiary', {})
-                if tertiary_config and 'enable_thinking' in tertiary_config:
-                    thinking = tertiary_config['enable_thinking']
-                    logger.debug(f"[ClassifierConfig] 从 model_setting.yaml 加载 thinking 模式: {thinking}")
-                    return thinking
-    except Exception as e:
-        logger.debug(f"[ClassifierConfig] 加载 thinking 模式失败: {e}")
-    return False  # 默认禁用 thinking 模式
-
-
-@dataclass
-class ClassifierConfig:
-    """分类器配置
-
-    注意:
-    - LLM 调用统一通过 generate_model_client 处理
-    - Embedding 模型统一通过 model_handler.get_embedding_model() 处理
-    本配置仅用于控制并发和路径
-    """
-
-    # LLM 模型名称(从 model_setting.yaml 自动加载,可覆盖)
-    model: str = field(default_factory=_load_model_from_yaml)
-
-    # 是否启用 thinking 模式(从 model_setting.yaml 自动加载)
-    enable_thinking: bool = field(default_factory=_load_thinking_mode_from_yaml)
-
-    # 并发控制
-    max_concurrent_requests: int = 20
-    max_retries: int = 3
-    retry_delay: int = 1
-
-    # Embedding 相似度阈值(仅阈值配置保留在此处)
-    embedding_similarity_threshold: float = 0.9
-
-    # 路径配置(通过 __post_init__ 自动初始化)
-    category_table_path: str = ""
-    second_category_path: str = ""
-    output_path: str = ""
-
-    def __post_init__(self):
-        """初始化路径配置"""
-        # 初始化默认路径
-        # 注意:本文件位于 reviewers/utils/llm_content_classifier_v2/config.py
-        # parent.parent.parent.parent = component/
-        if not self.category_table_path:
-            self.category_table_path = str(
-                Path(__file__).parent.parent.parent.parent / "doc_worker" / "config" / "StandardCategoryTable.csv"
-            )
-        if not self.second_category_path:
-            self.second_category_path = str(
-                Path(__file__).parent.parent.parent.parent / "doc_worker" / "config" / "construction_plan_standards.csv"
-            )
-        if not self.output_path:
-            # 项目根目录下的 temp/construction_review/llm_content_classifier_v2
-            # 从 reviewers/utils/llm_content_classifier_v2/ 向上 7 层到项目根目录
-            project_root = Path(__file__).parent.parent.parent.parent.parent.parent.parent
-            self.output_path = str(project_root / "temp" / "construction_review" / "llm_content_classifier_v2")
-
-    def get_embedding_model(self):
-        """
-        获取 Embedding 模型实例
-
-        统一通过 model_handler 获取,配置从 config.ini 读取
-
-        Returns:
-            OpenAIEmbeddings: 配置好的 Embedding 模型实例
-        """
-        from foundation.ai.models.model_handler import model_handler
-        return model_handler.get_embedding_model()
-
-
-# 默认配置实例
-DEFAULT_CONFIG = ClassifierConfig()
-
-# 向后兼容的全局变量(供独立运行测试使用)
-# 注意:api_key 和 base_url 已从 ClassifierConfig 移除,LLM 配置由 model_generate 统一处理
-MAX_CONCURRENT_REQUESTS = DEFAULT_CONFIG.max_concurrent_requests
-MAX_RETRIES = DEFAULT_CONFIG.max_retries
-RETRY_DELAY = DEFAULT_CONFIG.retry_delay
-MODEL = DEFAULT_CONFIG.model
-EMBEDDING_SIMILARITY_THRESHOLD = DEFAULT_CONFIG.embedding_similarity_threshold
-CATEGORY_TABLE_PATH = Path(DEFAULT_CONFIG.category_table_path)
-SECOND_CATEGORY_PATH = Path(DEFAULT_CONFIG.second_category_path)

+ 0 - 848
core/construction_review/component/reviewers/utils/llm_content_classifier_v2/content_classifier.py

@@ -1,848 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-ContentClassifierClient 核心分类逻辑
-"""
-
-import asyncio
-import json
-import re
-import time
-from typing import Dict, List, Optional, Tuple
-
-from .models import CategoryStandard, ClassifiedContent, ClassificationResult, SectionContent
-from .embedding_client import EmbeddingClient
-from foundation.ai.agent.generate.model_generate import generate_model_client
-from .category_loaders import SecondCategoryStandardLoader
-from .json_utils import _fix_json, _aggressive_json_fix
-from .prompt import (
-    CLASSIFY_SYSTEM_PROMPT,
-    SUPPLEMENT_VERIFY_SYSTEM_PROMPT,
-    build_classify_prompt,
-    build_fix_prompt,
-    build_supplement_verify_prompt,
-)
-from foundation.observability.logger.loggering import review_logger as logger
-
-
-class ContentClassifierClient:
-    """LLM 内容分类客户端"""
-
-    def __init__(self, model: str, semaphore: asyncio.Semaphore, embedding_client: Optional[EmbeddingClient] = None, second_category_loader: Optional[SecondCategoryStandardLoader] = None, enable_thinking: bool = False):
-        self.model = model
-        self.semaphore = semaphore
-        self.embedding_client = embedding_client
-        self.second_category_loader = second_category_loader
-        self.enable_thinking = enable_thinking
-
-    async def classify_content(self, section: SectionContent) -> ClassificationResult:
-        """对内容进行三级分类识别(带并发控制和自动修复,支持长内容分块处理)"""
-        start_time = time.time()
-
-        # 步骤1: 使用Embedding模型检查二级分类与内容的相似度
-        if self.embedding_client and self.second_category_loader and section.category_standards:
-            # 从construction_plan_standards.csv中查找对应的标准二级分类
-            # 使用section_name进行匹配
-            std_second_category = self.second_category_loader.get_standard_by_second_name(section.section_name)
-
-            if std_second_category:
-                # 找到了对应的标准二级分类,进行相似度检查
-                # 检查section内容与标准的second_raw_content的一致性
-                section_text = '\n'.join(section.lines)
-                is_similar, similarity = await self.embedding_client.check_similarity(
-                    section_name=section.section_name,
-                    section_content=section_text,
-                    second_category_name=std_second_category.second_name,
-                    second_category_raw_content=std_second_category.second_raw_content
-                )
-
-                if is_similar:
-                    from .config import EMBEDDING_SIMILARITY_THRESHOLD
-                    logger.debug(f"[{section.section_name}] 相似度检查通过 ({similarity:.3f} >= {EMBEDDING_SIMILARITY_THRESHOLD}),跳过LLM分类,默认包含所有三级分类")
-                    # 生成默认分类结果:包含所有三级分类
-                    all_contents = self._generate_default_classification(section)
-                    total_lines, classified_lines, coverage_rate = self._calculate_coverage_rate(section, all_contents)
-                    latency = time.time() - start_time
-                    return ClassificationResult(
-                        model=self.model,
-                        section_key=section.section_key,
-                        section_name=section.section_name,
-                        classified_contents=all_contents,
-                        latency=latency,
-                        raw_response=f"[Embedding相似度跳过] similarity={similarity:.3f}",
-                        error=None,
-                        total_lines=total_lines,
-                        classified_lines=classified_lines,
-                        coverage_rate=coverage_rate
-                    )
-                else:
-                    logger.debug(f"[{section.section_name}] 相似度检查未通过 ({similarity:.3f} < ?),继续LLM分类")
-            else:
-                logger.debug(f"[{section.section_name}] 未在construction_plan_standards.csv中找到对应标准,继续LLM分类")
-
-        # 如果内容过长,分块处理
-        MAX_LINES_PER_CHUNK = 150   # 每个块最多150行
-        MAX_CHARS_PER_CHUNK = 3000  # 每个块最多3000字符
-        OVERLAP_CHARS = 100         # 相邻块之间重叠约100字符
-        total_lines = len(section.lines)
-        total_chars = sum(len(line) for line in section.lines)
-
-        if total_lines <= MAX_LINES_PER_CHUNK and total_chars <= MAX_CHARS_PER_CHUNK:
-            # 内容不长,直接处理
-            result = await self._classify_single_chunk(section, start_time)
-            # 补充验证:关键字扫描 + LLM二次确认,补充遗漏的分类
-            if not result.error and result.classified_contents is not None:
-                supplement = await self._detect_and_supplement(section, result.classified_contents)
-                if supplement:
-                    merged = self._merge_classified_contents(result.classified_contents + supplement, section)
-                    total_l, classified_l, coverage_r = self._calculate_coverage_rate(section, merged)
-                    return ClassificationResult(
-                        model=result.model,
-                        section_key=result.section_key,
-                        section_name=result.section_name,
-                        classified_contents=merged,
-                        latency=result.latency,
-                        raw_response=result.raw_response,
-                        error=result.error,
-                        total_lines=total_l,
-                        classified_lines=classified_l,
-                        coverage_rate=coverage_r
-                    )
-            return result
-
-        # 内容过长,按字符数+行数双限制分块处理(带重叠)
-        logger.debug(
-            f"[{section.section_name}] 内容较长({total_lines}行, {total_chars}字符),"
-            f"按 max_lines={MAX_LINES_PER_CHUNK}, max_chars={MAX_CHARS_PER_CHUNK}, overlap={OVERLAP_CHARS} 分块处理..."
-        )
-        chunk_ranges = self._split_section_into_chunks(
-            section, MAX_LINES_PER_CHUNK, MAX_CHARS_PER_CHUNK, OVERLAP_CHARS
-        )
-        all_contents = []
-
-        for chunk_start, chunk_end in chunk_ranges:
-            chunk_section = self._create_chunk_section(section, chunk_start, chunk_end)
-            chunk_result = await self._classify_single_chunk(chunk_section, 0, is_chunk=True)
-
-            if chunk_result.error:
-                logger.error(f"[{section.section_name}] 块 {chunk_start+1}-{chunk_end} 处理失败: {chunk_result.error[:50]}")
-            else:
-                logger.debug(f"[{section.section_name}] 块 {chunk_start+1}-{chunk_end} 成功: {len(chunk_result.classified_contents)} 个分类")
-                all_contents.extend(chunk_result.classified_contents)
-
-        # 所有块处理完成后,再次聚合所有内容(解决分块导致的同一分类分散问题)
-        if all_contents:
-            all_contents = self._merge_classified_contents(all_contents, section)
-
-        # 补充验证:关键字扫描 + LLM二次确认,补充遗漏的分类
-        supplement = await self._detect_and_supplement(section, all_contents)
-        if supplement:
-            all_contents = self._merge_classified_contents(all_contents + supplement, section)
-
-        # 计算分类率
-        total_lines, classified_lines, coverage_rate = self._calculate_coverage_rate(section, all_contents)
-
-        latency = time.time() - start_time
-
-        return ClassificationResult(
-            model=self.model,
-            section_key=section.section_key,
-            section_name=section.section_name,
-            classified_contents=all_contents,
-            latency=latency,
-            raw_response="",
-            error=None if all_contents else "所有块处理失败",
-            total_lines=total_lines,
-            classified_lines=classified_lines,
-            coverage_rate=coverage_rate
-        )
-
-    def _split_section_into_chunks(
-        self,
-        section: SectionContent,
-        max_lines: int = 150,
-        max_chars: int = 3000,
-        overlap_chars: int = 100
-    ) -> List[Tuple[int, int]]:
-        """将 section 切分成多个子块,满足行数和字符数上限,并带字符重叠。"""
-        lines = section.lines
-        total = len(lines)
-        if total == 0:
-            return [(0, 0)]
-
-        chunks = []
-        start = 0
-        while start < total:
-            end = start
-            chars = 0
-            # 同时满足行数和字符数两个限制
-            while end < total and (end - start) < max_lines and chars + len(lines[end]) <= max_chars:
-                chars += len(lines[end])
-                end += 1
-
-            # 至少保证一行
-            if end == start:
-                end = start + 1
-
-            chunks.append((start, end))
-
-            if end >= total:
-                break
-
-            # 计算下一次 start,保留约 overlap_chars 的字符重叠
-            next_start = end - 1
-            overlap_acc = 0
-            while next_start > start and overlap_acc < overlap_chars:
-                overlap_acc += len(lines[next_start])
-                next_start -= 1
-            start = next_start + 1
-
-        return chunks
-
-    def _calculate_coverage_rate(self, section: SectionContent, contents: List[ClassifiedContent]) -> tuple:
-        """计算分类率(已分类行数/总行数)"""
-        total_lines = len(section.lines)
-        if total_lines == 0 or not contents:
-            return total_lines, 0, 0.0
-
-        # 使用集合记录已分类的行号(避免重复计数)
-        classified_line_set = set()
-
-        for content in contents:
-            if section.line_number_map:
-                # 如果有全局行号映射,找出起止行号对应的索引
-                start_idx = -1
-                end_idx = -1
-                for idx, global_line in enumerate(section.line_number_map):
-                    if global_line == content.start_line:
-                        start_idx = idx
-                    if global_line == content.end_line:
-                        end_idx = idx
-                        break
-
-                if start_idx != -1 and end_idx != -1:
-                    for i in range(start_idx, end_idx + 1):
-                        if i < len(section.line_number_map):
-                            classified_line_set.add(section.line_number_map[i])
-            else:
-                # 没有全局行号,直接使用起止行号
-                for line_num in range(content.start_line, content.end_line + 1):
-                    classified_line_set.add(line_num)
-
-        classified_lines = len(classified_line_set)
-        coverage_rate = (classified_lines / total_lines) * 100 if total_lines > 0 else 0.0
-
-        return total_lines, classified_lines, coverage_rate
-
-    def _generate_default_classification(self, section: SectionContent) -> List[ClassifiedContent]:
-        """
-        生成默认的分类结果(当embedding相似度检查通过时使用)
-        默认包含所有三级分类,覆盖整个section内容
-        """
-        if not section.category_standards:
-            return []
-
-        # 获取全局行号范围
-        if section.line_number_map:
-            start_line = section.line_number_map[0]
-            end_line = section.line_number_map[-1]
-        else:
-            start_line = 1
-            end_line = len(section.lines)
-
-        # 为每个三级分类创建一个条目,覆盖全部内容
-        default_contents = []
-        for std in section.category_standards:
-            # 提取该分类对应的内容
-            content = self._extract_content_by_line_numbers(section, start_line, end_line)
-            default_contents.append(ClassifiedContent(
-                third_category_name=std.third_name,
-                third_category_code=std.third_code,
-                third_seq=std.third_seq,
-                start_line=start_line,
-                end_line=end_line,
-                content=content
-            ))
-
-        return default_contents
-
-    def _create_chunk_section(self, section: SectionContent, start_idx: int, end_idx: int) -> SectionContent:
-        """从section创建子块"""
-        chunk_lines = section.lines[start_idx:end_idx]
-        chunk_line_map = section.line_number_map[start_idx:end_idx] if section.line_number_map else list(range(start_idx + 1, end_idx + 1))
-
-        # 生成带行号的内容
-        numbered_content = '\n'.join([f"<{chunk_line_map[i]}> {line}" for i, line in enumerate(chunk_lines)])
-
-        return SectionContent(
-            section_key=f"{section.section_key}_chunk_{start_idx}_{end_idx}",
-            section_name=section.section_name,
-            lines=chunk_lines,
-            numbered_content=numbered_content,
-            category_standards=section.category_standards,
-            line_number_map=chunk_line_map
-        )
-
-    async def _classify_single_chunk(self, section: SectionContent, start_time: float, is_chunk: bool = False) -> ClassificationResult:
-        """处理单个块"""
-        prompt = self._build_prompt(section, is_chunk=is_chunk)
-
-        try:
-            async with self.semaphore:
-                response = await self._call_api(prompt)
-
-            classified_contents, parse_error = await self._parse_with_fix(response, section, prompt)
-
-            if not is_chunk:
-                latency = time.time() - start_time
-                # 计算分类率
-                total_lines, classified_lines, coverage_rate = self._calculate_coverage_rate(section, classified_contents)
-                return ClassificationResult(
-                    model=self.model,
-                    section_key=section.section_key,
-                    section_name=section.section_name,
-                    classified_contents=classified_contents,
-                    latency=latency,
-                    raw_response=response[:1000],
-                    error=parse_error,
-                    total_lines=total_lines,
-                    classified_lines=classified_lines,
-                    coverage_rate=coverage_rate
-                )
-            else:
-                return ClassificationResult(
-                    model=self.model,
-                    section_key=section.section_key,
-                    section_name=section.section_name,
-                    classified_contents=classified_contents,
-                    latency=0,
-                    raw_response="",
-                    error=parse_error
-                )
-        except Exception as e:
-            if not is_chunk:
-                latency = time.time() - start_time
-                return ClassificationResult(
-                    model=self.model,
-                    section_key=section.section_key,
-                    section_name=section.section_name,
-                    classified_contents=[],
-                    latency=latency,
-                    error=str(e)
-                )
-            else:
-                return ClassificationResult(
-                    model=self.model,
-                    section_key=section.section_key,
-                    section_name=section.section_name,
-                    classified_contents=[],
-                    latency=0,
-                    error=str(e)
-                )
-
-    async def _parse_with_fix(self, response: str, section: SectionContent, original_prompt: str = "") -> tuple:
-        """解析响应,失败时让模型修复(最多3次重试)
-
-        返回: (contents, error_msg)
-        - contents: 分类结果列表(可能为空,表示模型判定无匹配内容)
-        - error_msg: 错误信息,None表示成功(包括空结果),非None表示解析失败
-        """
-        # 第一次尝试解析
-        contents, parse_success = self._parse_response(response, section)
-
-        # 解析成功(包括空结果,表示模型判定内容不符合任何分类标准)
-        if parse_success:
-            if not contents:
-                logger.debug(f"[{section.section_name}] 模型判定无匹配内容,记录为未分类")
-            return contents, None
-
-        # 解析失败(JSON格式错误),尝试让模型修复(最多3次)
-        logger.warning(f"[{section.section_name}] JSON解析失败,请求模型修复...")
-        logger.debug(f"[{section.section_name}] 原始响应前200字符: {response[:200]}...")
-
-        original_response = response
-
-        for attempt in range(3):
-            fix_prompt = self._build_fix_prompt(original_response)
-
-            try:
-                async with self.semaphore:
-                    fixed_response = await self._call_api(fix_prompt)
-
-                # 尝试解析修复后的输出
-                contents, parse_success = self._parse_response(fixed_response, section)
-                if parse_success:
-                    logger.debug(f"[{section.section_name}] 模型修复成功(第{attempt+1}次)")
-                    if not contents:
-                        logger.debug(f"[{section.section_name}] 修复后模型判定无匹配内容,记录为未分类")
-                    return contents, None
-                else:
-                    logger.debug(f"[{section.section_name}] 第{attempt+1}次修复失败,继续重试...")
-                    original_response = fixed_response
-            except Exception as e:
-                return [], f"请求模型修复失败: {str(e)}"
-
-        logger.error(f"[{section.section_name}] 模型修复3次后仍无法解析JSON")
-        return [], "模型修复3次后仍无法解析JSON"
-
-    def _build_fix_prompt(self, original_response: str) -> str:
-        """构建JSON修复提示词(委托给 prompt.py 中的 build_fix_prompt)"""
-        return build_fix_prompt(original_response)
-
-    def _build_prompt(self, section: SectionContent, is_chunk: bool = False) -> str:
-        """构建分类提示词(委托给 prompt.py 中的 build_classify_prompt)"""
-        return build_classify_prompt(section, is_chunk)
-
-    async def _call_api(self, prompt: str) -> str:
-        """调用API(使用统一的 GenerateModelClient,带指数退避重试)"""
-        max_retries = 5
-        base_delay = 2  # 基础延迟2秒
-
-        for attempt in range(max_retries):
-            try:
-                # 使用统一的模型调用客户端
-                # 该客户端已内置重试机制和 thinking 模式控制
-                # 从配置获取 enable_thinking,默认禁用
-                enable_thinking = getattr(self, 'enable_thinking', False)
-                response = await generate_model_client.get_model_generate_invoke(
-                    trace_id="content_classifier",
-                    system_prompt=CLASSIFY_SYSTEM_PROMPT,
-                    user_prompt=prompt,
-                    model_name=self.model,
-                    enable_thinking=enable_thinking
-                )
-                return response
-            except Exception as e:
-                error_str = str(e)
-                # 检查是否是429限流错误
-                if "429" in error_str or "rate limit" in error_str.lower():
-                    if attempt < max_retries - 1:
-                        # 指数退避: 2^attempt * (1 + random)
-                        delay = base_delay * (2 ** attempt) + (hash(prompt) % 1000) / 1000
-                        logger.warning(f"API限流(429),等待 {delay:.1f}s 后重试 ({attempt + 1}/{max_retries})...")
-                        await asyncio.sleep(delay)
-                        continue
-                # 其他错误或重试次数用完,抛出异常
-                raise
-
-        return ""
-
-    def _parse_response(self, response: str, section: SectionContent) -> tuple:
-        """解析响应(增强版,处理各种JSON格式问题)
-
-        返回: (contents, parse_success)
-        - contents: 分类结果列表
-        - parse_success: True表示JSON解析成功(包括空结果),False表示解析失败
-        """
-        if not response or not response.strip():
-            return [], False  # 空响应视为解析失败
-
-        response = response.strip()
-
-        # 尝试多种方式提取JSON
-        json_str = None
-
-        # 方法1: 从代码块中提取
-        code_block_match = re.search(r'```(?:json)?\s*([\s\S]*?)```', response)
-        if code_block_match:
-            json_str = code_block_match.group(1).strip()
-
-        # 方法2: 优先查找JSON数组(模型经常直接输出数组格式)
-        if not json_str:
-            # 使用非贪婪匹配找到第一个完整的数组
-            array_match = re.search(r'\[[\s\S]*?\]', response)
-            if array_match:
-                potential_array = array_match.group(0)
-                # 验证是否是有效的JSON数组
-                try:
-                    parsed = json.loads(potential_array)
-                    if isinstance(parsed, list):
-                        json_str = potential_array
-                except Exception:
-                    pass
-
-        # 方法3: 查找JSON对象
-        if not json_str:
-            json_match = re.search(r'\{[\s\S]*\}', response)
-            if json_match:
-                json_str = json_match.group(0)
-
-        if not json_str:
-            return [], False  # 未找到JSON结构,解析失败
-
-        # 处理模型直接输出数组的情况(包装成对象格式)
-        if json_str.strip().startswith('['):
-            try:
-                # 验证是有效的JSON数组
-                array_data = json.loads(json_str)
-                if isinstance(array_data, list):
-                    # 包装成期望的格式
-                    json_str = json.dumps({"classified_contents": array_data})
-            except Exception:
-                pass  # 不是有效数组,继续后续处理
-
-        # 先尝试直接解析,如果成功则不需要修复
-        try:
-            json.loads(json_str)
-            # JSON 有效,直接使用
-        except json.JSONDecodeError:
-            # JSON 无效,尝试修复
-            json_str = self._fix_json(json_str)
-
-        try:
-            data = json.loads(json_str)
-            # 处理数组格式
-            if isinstance(data, list):
-                data = {"classified_contents": data}
-            contents = []
-            # 支持两种键名: classified_contents 或 classified_contents_list
-            items = data.get("classified_contents", []) or data.get("classified_contents_list", [])
-
-            # 构建索引映射表:索引 -> (third_name, third_code, third_seq)
-            index_mapping = {0: ("非标准项", "no_standard", 0)}
-            if section.category_standards:
-                for i, std in enumerate(section.category_standards, 1):
-                    index_mapping[i] = (std.third_name, std.third_code, std.third_seq)
-
-            for item in items:
-                start_line = item.get("start_line", 0)
-                end_line = item.get("end_line", 0)
-
-                # 优先使用 category_index 进行映射
-                category_index = item.get("category_index")
-                if category_index is not None:
-                    # 通过索引映射获取标准名称、代码和序号
-                    idx = int(category_index) if isinstance(category_index, (int, float, str)) else 0
-                    category_name, category_code, category_seq = index_mapping.get(idx, ("非标准项", "no_standard", 0))
-                else:
-                    # 兼容旧格式:直接读取 third_category_code 和 third_category_name
-                    category_code = item.get("third_category_code", "")
-                    category_name = item.get("third_category_name", "")
-
-                    # 清理分类名称格式:移除末尾的代码部分
-                    if category_name and " (" in category_name and category_name.endswith(")"):
-                        category_name = re.sub(r'\s*\([^)]+\)\s*$', '', category_name).strip()
-
-                    # 验证分类代码是否在有效列表中
-                    valid_codes = set(v[1] for v in index_mapping.values())
-                    if category_code not in valid_codes:
-                        logger.warning(f"发现非标准分类 '{category_name}' ({category_code}),强制归为非标准项")
-                        category_code = "no_standard"
-                        category_name = "非标准项"
-
-                # 根据行号从section中提取原文
-                content = self._extract_content_by_line_numbers(section, start_line, end_line)
-                contents.append(ClassifiedContent(
-                    third_category_name=category_name,
-                    third_category_code=category_code,
-                    third_seq=category_seq,
-                    start_line=start_line,
-                    end_line=end_line,
-                    content=content
-                ))
-            # 聚合同一分类下相邻的内容
-            contents = self._merge_classified_contents(contents, section)
-            return contents, True  # 解析成功(可能为空结果)
-        except Exception as e:
-            # 尝试更激进的修复
-            try:
-                fixed = self._aggressive_json_fix(json_str)
-                data = json.loads(fixed)
-                # 处理数组格式
-                if isinstance(data, list):
-                    data = {"classified_contents": data}
-                contents = []
-                # 支持两种键名: classified_contents 或 classified_contents_list
-                items = data.get("classified_contents", []) or data.get("classified_contents_list", [])
-
-                # 构建索引映射表:索引 -> (third_name, third_code)
-                index_mapping = {0: ("非标准项", "no_standard")}
-                if section.category_standards:
-                    for i, std in enumerate(section.category_standards, 1):
-                        index_mapping[i] = (std.third_name, std.third_code)
-
-                for item in items:
-                    start_line = item.get("start_line", 0)
-                    end_line = item.get("end_line", 0)
-
-                    # 优先使用 category_index 进行映射
-                    category_index = item.get("category_index")
-                    if category_index is not None:
-                        idx = int(category_index) if isinstance(category_index, (int, float, str)) else 0
-                        category_name, category_code = index_mapping.get(idx, ("非标准项", "no_standard"))
-                    else:
-                        # 兼容旧格式
-                        category_code = item.get("third_category_code", "")
-                        category_name = item.get("third_category_name", "")
-                        valid_codes = set(v[1] for v in index_mapping.values())
-                        if category_code not in valid_codes:
-                            logger.warning(f"发现非标准分类 '{category_name}' ({category_code}),强制归为非标准项")
-                            category_code = "no_standard"
-                            category_name = "非标准项"
-
-                    # 根据行号从section中提取原文
-                    content = self._extract_content_by_line_numbers(section, start_line, end_line)
-                    contents.append(ClassifiedContent(
-                        third_category_name=category_name,
-                        third_category_code=category_code,
-                        third_seq=0,
-                        start_line=start_line,
-                        end_line=end_line,
-                        content=content
-                    ))
-                # 聚合同一分类下相邻的内容
-                contents = self._merge_classified_contents(contents, section)
-                return contents, True  # 解析成功(可能为空结果)
-            except Exception as e2:
-                logger.error(f"解析JSON失败: {e}, 二次修复也失败: {e2}")
-                logger.debug(f"原始响应前500字符: {response[:500]}...")
-                logger.debug(f"提取的JSON前300字符: {json_str[:300]}...")
-                return [], False  # 解析失败
-
-    def _merge_classified_contents(self, contents: List[ClassifiedContent], section: SectionContent) -> List[ClassifiedContent]:
-        """将同一分类下的内容按区间合并(只有连续或重叠的区间才合并)"""
-        if not contents:
-            return contents
-
-        # 按分类代码分组
-        groups: Dict[str, List[ClassifiedContent]] = {}
-        for content in contents:
-            key = content.third_category_code
-            if key not in groups:
-                groups[key] = []
-            groups[key].append(content)
-
-        merged_contents = []
-
-        for category_code, group_contents in groups.items():
-            # 按起始行号排序
-            group_contents.sort(key=lambda x: x.start_line)
-
-            # 合并连续或重叠的区间
-            merged_ranges = []
-            for content in group_contents:
-                if not merged_ranges:
-                    # 第一个区间
-                    merged_ranges.append({
-                        'start': content.start_line,
-                        'end': content.end_line
-                    })
-                else:
-                    last_range = merged_ranges[-1]
-                    # 检查是否连续或重叠(允许3行的间隔也算连续)
-                    if content.start_line <= last_range['end'] + 3:
-                        # 扩展当前区间
-                        last_range['end'] = max(last_range['end'], content.end_line)
-                    else:
-                        # 不连续,新建区间
-                        merged_ranges.append({
-                            'start': content.start_line,
-                            'end': content.end_line
-                        })
-
-            # 为每个合并后的区间创建条目
-            for range_info in merged_ranges:
-                merged_content = self._extract_content_by_line_numbers(
-                    section, range_info['start'], range_info['end']
-                )
-                merged_contents.append(ClassifiedContent(
-                    third_category_name=group_contents[0].third_category_name,
-                    third_category_code=category_code,
-                    third_seq=group_contents[0].third_seq,
-                    start_line=range_info['start'],
-                    end_line=range_info['end'],
-                    content=merged_content
-                ))
-
-        # 按起始行号排序最终结果
-        merged_contents.sort(key=lambda x: x.start_line)
-        return merged_contents
-
-    def _extract_content_by_line_numbers(self, section: SectionContent, start_line: int, end_line: int) -> str:
-        """根据全局行号从section中提取原文内容"""
-        if not section.line_number_map:
-            # 如果没有行号映射,使用相对索引
-            start_idx = max(0, start_line - 1)
-            end_idx = min(len(section.lines), end_line)
-            return '\n'.join(section.lines[start_idx:end_idx])
-
-        # 找到全局行号对应的索引
-        start_idx = -1
-        end_idx = -1
-
-        for idx, global_line_num in enumerate(section.line_number_map):
-            if global_line_num == start_line:
-                start_idx = idx
-            if global_line_num == end_line:
-                end_idx = idx
-                break
-
-        # 如果没找到精确匹配,使用近似值
-        if start_idx == -1:
-            for idx, global_line_num in enumerate(section.line_number_map):
-                if global_line_num >= start_line:
-                    start_idx = idx
-                    break
-        if end_idx == -1:
-            for idx in range(len(section.line_number_map) - 1, -1, -1):
-                if section.line_number_map[idx] <= end_line:
-                    end_idx = idx
-                    break
-
-        if start_idx == -1:
-            start_idx = 0
-        if end_idx == -1:
-            end_idx = len(section.lines) - 1
-
-        # 确保索引有效
-        start_idx = max(0, min(start_idx, len(section.lines) - 1))
-        end_idx = max(0, min(end_idx, len(section.lines) - 1))
-
-        if start_idx > end_idx:
-            start_idx, end_idx = end_idx, start_idx
-
-        # 添加行号标记返回
-        lines_with_numbers = []
-        for i in range(start_idx, end_idx + 1):
-            global_line = section.line_number_map[i] if i < len(section.line_number_map) else (i + 1)
-            lines_with_numbers.append(f"<{global_line}> {section.lines[i]}")
-
-        return '\n'.join(lines_with_numbers)
-
-    async def _call_supplement_verification(
-        self,
-        section: SectionContent,
-        std: CategoryStandard,
-        hit_lines: List[int],
-        matched_kws: List[str],
-        is_table: bool = False
-    ) -> bool:
-        """针对单个候选遗漏分类发起补充验证LLM调用,返回是否存在。"""
-        start = min(hit_lines)
-        end = max(hit_lines)
-        chunk_text = self._extract_content_by_line_numbers(section, start, end)
-
-        prompt = build_supplement_verify_prompt(std, chunk_text, start, end, hit_lines, matched_kws, is_table)
-
-        try:
-            # 使用统一的模型调用客户端
-            resp = await generate_model_client.get_model_generate_invoke(
-                trace_id="content_classifier_supplement",
-                system_prompt=SUPPLEMENT_VERIFY_SYSTEM_PROMPT,
-                user_prompt=prompt,
-                model_name=self.model,
-                enable_thinking=False,
-                timeout=30  # 补充验证较短超时
-            )
-            if "不存在" in resp:
-                return False
-            if "存在" in resp:
-                return True
-            # 格式异常,保守返回 True
-            logger.warning(f"supplement_verify 格式异常: {resp[:50]}")
-            return True
-        except Exception as e:
-            logger.warning(f"supplement_verify 调用失败: {e}")
-            return True
-
-    async def _detect_and_supplement(
-        self,
-        section: SectionContent,
-        llm_results: List[ClassifiedContent]
-    ) -> List[ClassifiedContent]:
-        """扫描整个 section,补充 LLM 遗漏的三级分类(并发优化版)。
-
-        扫描范围:当前二级分类下的所有行(不跨二级分类,由 section.category_standards 保证)。
-        触发条件:该二级分类下某个三级标准未出现在 LLM 结果中。
-        注意:同一行内容可同时属于多个三级分类,不限制"已覆盖行"。
-        """
-        if not section.category_standards or not section.lines:
-            return []
-
-        # 已命中的有效分类(排除 no_standard)
-        found_codes = {c.third_category_code for c in llm_results if c.third_category_code != 'no_standard'}
-
-        # 判断整个 section 是否含表格特征
-        full_text = ' '.join(section.lines)
-        is_table = (
-            any(kw in full_text for kw in ['序号', '作业活动', '风险源', '防范措施'])
-            or full_text.count('|') > 5
-        )
-
-        # 准备需要验证的任务列表
-        verification_tasks = []
-        verification_info = []  # 保存对应的 std 和 hit_lines 信息
-
-        for std in section.category_standards:
-            if std.third_code in found_codes:
-                continue
-            if not std.keywords and not std.extra_prompt:
-                continue
-
-            keywords = [k.strip() for k in std.keywords.split(';') if k.strip()]
-            # 同时从 extra_prompt 的引号内容中提取补充信号词,用于触发验证
-            extra_signals = []
-            if std.extra_prompt:
-                import re
-                quoted = re.findall(r'[""""]([^""""]+)[""""]', std.extra_prompt)
-                extra_signals.extend([q.strip() for q in quoted if len(q.strip()) >= 2])
-            scan_signals = keywords + extra_signals
-
-            if is_table:
-                # 表格路径:整个 section 行范围提交 LLM 验证
-                if not section.line_number_map:
-                    continue
-                hit_lines = [section.line_number_map[0], section.line_number_map[-1]]
-                verification_tasks.append(
-                    self._call_supplement_verification(section, std, hit_lines, [], is_table=True)
-                )
-                verification_info.append((std, hit_lines))
-            else:
-                # 普通路径:扫描整个 section 所有行的关键字
-                hit_lines, matched_kws = [], []
-                for i, line_text in enumerate(section.lines):
-                    line_num = section.line_number_map[i] if section.line_number_map else (i + 1)
-                    for kw in scan_signals:
-                        if kw in line_text and line_num not in hit_lines:
-                            hit_lines.append(line_num)
-                            if kw not in matched_kws:
-                                matched_kws.append(kw)
-                if not hit_lines:
-                    continue
-                verification_tasks.append(
-                    self._call_supplement_verification(section, std, hit_lines, matched_kws)
-                )
-                verification_info.append((std, hit_lines))
-
-        if not verification_tasks:
-            return []
-
-        # 并发执行所有验证任务
-        results = await asyncio.gather(*verification_tasks, return_exceptions=True)
-
-        # 收集验证通过的结果
-        supplemented = []
-        for (std, hit_lines), confirmed in zip(verification_info, results):
-            if isinstance(confirmed, Exception):
-                logger.warning(f"[{section.section_name}] 补充验证异常: {confirmed}")
-                continue
-            if confirmed:
-                start, end = min(hit_lines), max(hit_lines)
-                content = self._extract_content_by_line_numbers(section, start, end)
-                supplemented.append(ClassifiedContent(
-                    third_category_name=std.third_name,
-                    third_category_code=std.third_code,
-                    third_seq=std.third_seq,
-                    start_line=start,
-                    end_line=end,
-                    content=content
-                ))
-
-        return supplemented
-
-
-    def _fix_json(self, json_str: str) -> str:
-        return _fix_json(json_str)
-
-    def _aggressive_json_fix(self, json_str: str) -> str:
-        return _aggressive_json_fix(json_str)

+ 0 - 186
core/construction_review/component/reviewers/utils/llm_content_classifier_v2/embedding_client.py

@@ -1,186 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-Embedding 客户端
-
-统一通过 model_handler 获取 Embedding 模型,配置从 config.ini 读取
-"""
-
-import asyncio
-import math
-import re
-from typing import List, Optional, Tuple
-
-from .config import EMBEDDING_SIMILARITY_THRESHOLD
-from foundation.ai.models.model_handler import model_handler
-from foundation.observability.logger.loggering import review_logger as logger
-
-
-class EmbeddingClient:
-    """Embedding模型客户端,用于计算文本相似度"""
-
-    # 连续失败次数阈值,超过后清除缓存触发降级
-    _FAILURE_THRESHOLD = 3
-    # 重试次数
-    _MAX_RETRIES = 2
-
-    def __init__(self):
-        """初始化 Embedding 客户端,通过 model_handler 获取模型"""
-        self._embedding_model = None
-        self._consecutive_failures = 0
-
-    @property
-    def embedding_model(self):
-        """懒加载获取 Embedding 模型实例"""
-        if self._embedding_model is None:
-            self._embedding_model = model_handler.get_embedding_model()
-        return self._embedding_model
-
-    def _invalidate_cache(self):
-        """清除本地和 model_handler 的 embedding 缓存,触发降级重新初始化"""
-        self._embedding_model = None
-        self._consecutive_failures = 0
-        # 清除 model_handler 中的 embedding 缓存,使下次 get_embedding_model 重新走初始化+降级逻辑
-        for key in list(model_handler._model_cache.keys()):
-            if "embed" in key.lower():
-                del model_handler._model_cache[key]
-                logger.info(f"已清除 model_handler embedding 缓存: {key}")
-
-    async def get_embedding(self, text: str) -> Optional[List[float]]:
-        """获取文本的embedding向量,带重试和缓存失效机制"""
-        for attempt in range(self._MAX_RETRIES + 1):
-            try:
-                embedding = self.embedding_model.embed_query(text)
-                self._consecutive_failures = 0
-                return embedding
-            except Exception as e:
-                if attempt < self._MAX_RETRIES:
-                    await asyncio.sleep(1 * (attempt + 1))
-                    continue
-                self._consecutive_failures += 1
-                logger.error(f"Embedding API调用失败 (连续第{self._consecutive_failures}次): {e}")
-                if self._consecutive_failures >= self._FAILURE_THRESHOLD:
-                    logger.warning("Embedding连续失败超过阈值,清除缓存触发降级")
-                    self._invalidate_cache()
-                return None
-
-    async def get_embeddings_batch(self, texts: List[str]) -> List[Optional[List[float]]]:
-        """批量获取文本的embedding向量,带重试和缓存失效机制"""
-        for attempt in range(self._MAX_RETRIES + 1):
-            try:
-                embeddings = self.embedding_model.embed_documents(texts)
-                self._consecutive_failures = 0
-                return embeddings
-            except Exception as e:
-                if attempt < self._MAX_RETRIES:
-                    await asyncio.sleep(1 * (attempt + 1))
-                    continue
-                self._consecutive_failures += 1
-                logger.error(f"Embedding API批量调用失败 (连续第{self._consecutive_failures}次): {e}")
-                if self._consecutive_failures >= self._FAILURE_THRESHOLD:
-                    logger.warning("Embedding连续失败超过阈值,清除缓存触发降级")
-                    self._invalidate_cache()
-                return [None] * len(texts)
-
-    def cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float:
-        """计算两个向量的余弦相似度"""
-        if not vec1 or not vec2 or len(vec1) != len(vec2):
-            return 0.0
-
-        dot_product = sum(a * b for a, b in zip(vec1, vec2))
-        norm1 = math.sqrt(sum(a * a for a in vec1))
-        norm2 = math.sqrt(sum(b * b for b in vec2))
-
-        if norm1 == 0 or norm2 == 0:
-            return 0.0
-
-        return dot_product / (norm1 * norm2)
-
-    def _clean_section_name(self, section_name: str) -> str:
-        """清理section名称,去除序号等前缀
-
-        例如:
-        - "一)编制依据" -> "编制依据"
-        - "二) 技术保证措施" -> "技术保证措施"
-        - "1. 施工计划" -> "施工计划"
-        - "(1) 工艺流程" -> "工艺流程"
-        """
-        cleaned = section_name.strip()
-
-        # 去除开头的序号模式:
-        # 1. 中文数字+)或中文数字+、 如 "一)"、"二、"
-        # 2. 阿拉伯数字+. 或阿拉伯数字+)如 "1.", "2)"
-        # 3. 括号数字如 "(1)", "(一)"
-        patterns = [
-            r'^[一二三四五六七八九十百千]+[)\\)、\\.\\s]+',  # 中文数字+标点
-            r'^\\d+[\\.\\)\\)、\\s]+',  # 阿拉伯数字+标点
-            r'^[((]\\d+[))][\\s\\.]*',  # 括号数字
-            r'^[((][一二三四五六七八九十][))][\\s\\.]*',  # 括号中文数字
-        ]
-
-        for pattern in patterns:
-            cleaned = re.sub(pattern, '', cleaned)
-
-        return cleaned.strip()
-
-    async def check_similarity(
-        self,
-        section_name: str,
-        section_content: str,
-        second_category_name: str,
-        second_category_raw_content: str = ""
-    ) -> Tuple[bool, float]:
-        """
-        检查待审查内容与二级分类标准的相似度
-
-        比较:
-        - 左侧: section的实际内容(待审查的施工方案内容)
-        - 右侧: second_raw_content(来自construction_plan_standards.csv的标准定义)
-
-        返回: (is_similar, similarity_score)
-        - is_similar: 是否相似(相似度 > 阈值 或标题完全匹配)
-        - similarity_score: 相似度分数 (0-1)
-        """
-        # 步骤1: 先判断标题是否匹配
-        # 清理文本进行比较(去除序号等前缀)
-        cleaned_section_name = self._clean_section_name(section_name).lower()
-        cleaned_second_name = second_category_name.strip().lower()
-
-        # 标题直接相等检查(清理后的)
-        if cleaned_section_name == cleaned_second_name:
-            # 标题匹配,继续用embedding比较内容相似度
-            pass
-        else:
-            # 标题不匹配,检查是否包含关系
-            if cleaned_second_name in cleaned_section_name or cleaned_section_name in cleaned_second_name:
-                # 要求包含的部分至少4个字符,避免短词误判
-                if len(cleaned_second_name) >= 4 or len(cleaned_section_name) >= 4:
-                    # 标题部分匹配,继续用embedding比较内容
-                    pass
-                else:
-                    # 标题不匹配且太短,直接返回不相似
-                    return False, 0.0
-            else:
-                # 标题完全不匹配,直接返回不相似
-                return False, 0.0
-
-        # 步骤2: 使用embedding计算内容相似度
-        # 左侧: section的实际内容(待审查的施工方案实际内容)
-        # 右侧: second_raw_content(该second_name的标准定义)
-        section_text = section_content[:800]  # 取前800字符的实际内容
-        category_text = second_category_raw_content[:800] if second_category_raw_content else second_category_name
-
-        # 获取embedding
-        embeddings = await self.get_embeddings_batch([section_text, category_text])
-
-        if embeddings[0] is None or embeddings[1] is None:
-            # embedding获取失败,保守起见返回不相似
-            return False, 0.0
-
-        # 计算相似度
-        similarity = self.cosine_similarity(embeddings[0], embeddings[1])
-
-        # 判断结果
-        is_similar = similarity >= EMBEDDING_SIMILARITY_THRESHOLD
-
-        return is_similar, similarity

+ 0 - 146
core/construction_review/component/reviewers/utils/llm_content_classifier_v2/json_utils.py

@@ -1,146 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-JSON 修复工具函数
-"""
-
-import json
-import re
-
-
-def _fix_json(json_str: str) -> str:
-    """修复常见的JSON格式问题"""
-    # 去除尾部多余的逗号
-    json_str = re.sub(r',(\s*[}\]])', r'\1', json_str)
-
-    # 确保 JSON 结构闭合
-    json_str = _ensure_json_closed(json_str)
-
-    # 替换单引号为双引号(但要小心内容中的单引号)
-    # 使用更精确的方法:先尝试解析,失败再替换
-    try:
-        json.loads(json_str)
-        return json_str
-    except Exception:
-        # 尝试替换单引号
-        json_str = json_str.replace("'", '"')
-
-    return json_str
-
-
-def _truncate_to_valid_json(json_str: str) -> str:
-    """将截断的JSON截断到最后一个完整对象的位置,并保留数组结构"""
-    # 找到 "classified_contents" 数组的开始
-    array_start = json_str.find('"classified_contents"')
-    if array_start == -1:
-        return json_str
-
-    # 找到数组的 '['
-    bracket_start = json_str.find('[', array_start)
-    if bracket_start == -1:
-        return json_str
-
-    # 遍历数组,找到最后一个完整的对象
-    brace_count = 0
-    bracket_count = 1  # 已经进入数组,所以是1
-    in_string = False
-    escape_next = False
-    last_valid_obj_end = 0
-    i = bracket_start + 1
-
-    while i < len(json_str):
-        char = json_str[i]
-
-        if escape_next:
-            escape_next = False
-            i += 1
-            continue
-
-        if char == '\\':
-            escape_next = True
-            i += 1
-            continue
-
-        if char == '"' and not escape_next:
-            in_string = not in_string
-            i += 1
-            continue
-
-        if not in_string:
-            if char == '{':
-                brace_count += 1
-            elif char == '}':
-                brace_count -= 1
-                if brace_count == 0:
-                    # 找到一个完整的对象
-                    last_valid_obj_end = i
-            elif char == '[':
-                bracket_count += 1
-            elif char == ']':
-                bracket_count -= 1
-                if bracket_count == 0:
-                    # 数组正常闭合,不需要截断
-                    return json_str
-
-        i += 1
-
-    if last_valid_obj_end > 0:
-        # 截断到最后一个完整对象的位置,并关闭数组
-        return json_str[:last_valid_obj_end + 1] + ']'
-
-    return json_str
-
-
-def _ensure_json_closed(json_str: str) -> str:
-    """确保JSON结构闭合"""
-    # 计算未闭合的括号
-    brace_count = 0
-    bracket_count = 0
-    in_string = False
-    escape_next = False
-
-    for char in json_str:
-        if escape_next:
-            escape_next = False
-            continue
-        if char == '\\':
-            escape_next = True
-            continue
-        if char == '"' and not escape_next:
-            in_string = not in_string
-            continue
-        if not in_string:
-            if char == '{':
-                brace_count += 1
-            elif char == '}':
-                brace_count -= 1
-            elif char == '[':
-                bracket_count += 1
-            elif char == ']':
-                bracket_count -= 1
-
-    # 添加闭合括号
-    result = json_str
-    # 先去掉尾部可能的逗号
-    result = result.rstrip().rstrip(',').rstrip()
-
-    # 关闭对象
-    while brace_count > 0:
-        result += '}'
-        brace_count -= 1
-
-    # 关闭数组
-    while bracket_count > 0:
-        result += ']'
-        bracket_count -= 1
-
-    return result
-
-
-def _aggressive_json_fix(json_str: str) -> str:
-    """激进的JSON修复,用于处理复杂情况"""
-    # 首先尝试截断到最后一个完整对象
-    json_str = _truncate_to_valid_json(json_str)
-    # 然后确保结构闭合
-    json_str = _ensure_json_closed(json_str)
-    return json_str

+ 0 - 368
core/construction_review/component/reviewers/utils/llm_content_classifier_v2/main_classifier.py

@@ -1,368 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-LLMContentClassifier 主入口类 + 便捷函数
-"""
-
-import asyncio
-import json
-from pathlib import Path
-from typing import Any, Dict, List, Optional, Tuple
-
-from .models import ClassificationResult, SectionContent
-from .config import ClassifierConfig
-from .category_loaders import CategoryStandardLoader, SecondCategoryStandardLoader
-from .embedding_client import EmbeddingClient
-from .content_classifier import ContentClassifierClient
-from .chunks_converter import ChunksConverter
-from foundation.observability.logger.loggering import review_logger as logger
-
-
-class LLMContentClassifier:
-    """
-    LLM 内容三级分类器(主入口类)
-
-    封装完整的分类流程,提供简洁的接口供外部调用
-    """
-
-    def __init__(self, config: Optional[ClassifierConfig] = None):
-        """
-        初始化分类器
-
-        Args:
-            config: 配置对象,如果为 None 则使用默认配置
-        """
-        self.config = config or ClassifierConfig()
-        logger.info(f"[LLMContentClassifier] 初始化完成,使用模型: {self.config.model}, thinking: {self.config.enable_thinking}")
-
-        # 加载标准分类
-        self.category_loader = CategoryStandardLoader(Path(self.config.category_table_path))
-
-        # 加载二级分类标准(如果存在)
-        self.second_category_loader = None
-        if Path(self.config.second_category_path).exists():
-            self.second_category_loader = SecondCategoryStandardLoader(Path(self.config.second_category_path))
-
-        # 创建转换器
-        self.converter = ChunksConverter(self.category_loader)
-
-        # 并发控制信号量
-        self.semaphore = asyncio.Semaphore(self.config.max_concurrent_requests)
-
-        # Embedding 客户端(可选,通过 model_handler 统一管理)
-        self.embedding_client = None
-        if self.config.embedding_similarity_threshold > 0:
-            try:
-                self.embedding_client = EmbeddingClient()
-                logger.debug("Embedding 客户端初始化成功(通过 model_handler)")
-            except Exception as e:
-                logger.warning(f"Embedding 客户端初始化失败: {e},将继续不使用 Embedding 功能")
-
-    def _create_embedding_client(self) -> EmbeddingClient:
-        """创建 Embedding 客户端(统一通过 model_handler 获取配置)"""
-        return EmbeddingClient()
-
-    async def classify_chunks(
-        self,
-        chunks: List[Dict[str, Any]],
-        progress_callback: Optional[callable] = None
-    ) -> List[Dict[str, Any]]:
-        """
-        对 chunks 进行三级分类
-
-        Args:
-            chunks: 文档分块列表,每个 chunk 需包含:
-                - chapter_classification: 一级分类代码
-                - secondary_category_code: 二级分类代码
-                - secondary_category_cn: 二级分类中文名
-                - review_chunk_content 或 content: 内容文本
-            progress_callback: 进度回调函数 (completed, total, section_name, success) -> None,支持 async
-
-        Returns:
-            List[Dict]: 更新后的 chunks 列表,每个 chunk 新增字段:
-                - tertiary_category_code: 三级分类代码
-                - tertiary_category_cn: 三级分类名称
-                - tertiary_classification_details: 行级分类详情列表
-        """
-        logger.info(f"【三级分类】输入 {len(chunks)} 个内容块")
-
-        # 步骤1: 将 chunks 转换为 SectionContent 列表
-        sections = self.converter.chunks_to_sections(chunks)
-        total_lines = sum(len(s.lines) for s in sections)
-        total_standards = sum(len(s.category_standards) if s.category_standards else 0 for s in sections)
-
-        logger.info(f"【三级分类】按二级标题分组后: {len(sections)} 个段落, 总计 {total_lines} 行, {total_standards} 个三级标准待匹配")
-        logger.info(f"【三级分类】并发度: {self.config.max_concurrent_requests}, 模型: {self.config.model}, thinking: {self.config.enable_thinking}")
-
-        # 计算总 LLM 调用次数(考虑分块)
-        MAX_LINES_PER_CHUNK = 150
-        total_llm_calls = sum((len(s.lines) + MAX_LINES_PER_CHUNK - 1) // MAX_LINES_PER_CHUNK for s in sections)
-        logger.info(f"【三级分类】预计 LLM 调用次数: {total_llm_calls} 次 (每150行分一块)")
-
-        # 打印每个段落的详情(前10个)
-        for i, section in enumerate(sections[:10]):
-            std_count = len(section.category_standards) if section.category_standards else 0
-            chunks_needed = (len(section.lines) + MAX_LINES_PER_CHUNK - 1) // MAX_LINES_PER_CHUNK
-            chunk_info = f"分{chunks_needed}块" if chunks_needed > 1 else "1块"
-            logger.info(f"【三级分类】段落 {i+1}/{len(sections)}: '{section.section_name}' - {len(section.lines)} 行, {std_count} 个标准, {chunk_info}")
-        if len(sections) > 10:
-            remaining_calls = sum((len(s.lines) + MAX_LINES_PER_CHUNK - 1) // MAX_LINES_PER_CHUNK for s in sections[10:])
-            logger.info(f"【三级分类】... 还有 {len(sections) - 10} 个段落, 预计 {remaining_calls} 次调用")
-
-        if not sections:
-            logger.info("没有有效的段落需要分类")
-            return chunks
-
-        # 步骤2: 创建分类客户端
-        classifier = ContentClassifierClient(
-            model=self.config.model,
-            semaphore=self.semaphore,
-            embedding_client=self.embedding_client,
-            second_category_loader=self.second_category_loader,
-            enable_thinking=self.config.enable_thinking
-        )
-
-        # 步骤3: 并发分类所有段落
-        results_map: Dict[str, ClassificationResult] = {}
-
-        async def classify_with_progress(section: SectionContent, idx: int, total: int):
-            result = await classifier.classify_content(section)
-            results_map[section.section_key] = result
-
-            if progress_callback:
-                ret = progress_callback(idx + 1, total, section.section_name, not result.error)
-                if asyncio.iscoroutine(ret):
-                    await ret
-            else:
-                status = "成功" if not result.error else f"失败: {result.error[:30]}"
-                logger.debug(f"[{idx + 1}/{total}] {section.section_name}: {status}")
-
-            return result
-
-        tasks = [
-            classify_with_progress(section, idx, len(sections))
-            for idx, section in enumerate(sections)
-        ]
-        await asyncio.gather(*tasks)
-
-        # 步骤4: 将分类结果转换回 chunks 格式,按 chunk_ranges 过滤确保每个 chunk 只拿自己行范围内的详情
-        updated_chunks = []
-
-        # 建立 chunk_id -> (section_key, g_start, g_end) 映射,来自 sections 的 chunk_ranges
-        chunk_range_map: Dict[str, Tuple[str, int, int]] = {}
-        for section in sections:
-            for (cid, g_start, g_end) in section.chunk_ranges:
-                chunk_range_map[cid] = (section.section_key, g_start, g_end)
-
-        # 为每个原始 chunk 单独分配其行范围内的分类详情
-        for chunk in chunks:
-            updated_chunk = dict(chunk)
-            first_code = chunk.get("chapter_classification", "") or chunk.get("first_code", "")
-            second_code = chunk.get("secondary_category_code", "") or chunk.get("second_code", "")
-
-            # 从 chunk_range_map 获取该 chunk 的行范围(同时拿到正确的 section_key)
-            chunk_id = chunk.get("chunk_id") or chunk.get("id") or str(id(chunk))
-            range_info = chunk_range_map.get(chunk_id)
-
-            if range_info:
-                # 优先使用 chunk_range_map 中记录的 section_key(经过名称匹配的正确 key)
-                section_key = range_info[0]
-            else:
-                # 降级:从 chunk 字段重建(可能在 second_code="none" 时查不到)
-                section_key = f"{first_code}->{second_code}"
-
-            result = results_map.get(section_key)
-
-            if result:
-                updated_chunk["first_code"] = first_code
-                updated_chunk["second_code"] = second_code
-
-                # 收集全部有效三级分类(非 no_standard)
-                all_tertiary = [
-                    {
-                        "third_category_name": c.third_category_name,
-                        "third_category_code": c.third_category_code,
-                        "third_seq": c.third_seq,
-                        "start_line": c.start_line,
-                        "end_line": c.end_line,
-                        "content": c.content
-                    }
-                    for c in result.classified_contents
-                    if c.third_category_code != "no_standard"
-                ]
-
-                if range_info:
-                    # 过滤:只保留与该 chunk 行范围有交集的详情
-                    _, g_start, g_end = range_info
-                    filtered = [
-                        t for t in all_tertiary
-                        if t["start_line"] <= g_end and t["end_line"] >= g_start
-                    ]
-                else:
-                    # 无法定位行范围(可能是单 chunk 分组),保留全部
-                    filtered = all_tertiary
-
-                # 去重:按 (third_category_code, start_line, end_line) 三元组去重
-                seen = set()
-                deduped = []
-                for t in filtered:
-                    key = (t["third_category_code"], t["start_line"], t["end_line"])
-                    if key not in seen:
-                        seen.add(key)
-                        deduped.append(t)
-                updated_chunk["tertiary_classification_details"] = deduped
-
-                # 向后兼容:设置第一个三级分类为主分类
-                tertiary_details = updated_chunk["tertiary_classification_details"]
-                if tertiary_details:
-                    updated_chunk["tertiary_category_code"] = tertiary_details[0]["third_category_code"]
-                    updated_chunk["tertiary_category_cn"] = tertiary_details[0]["third_category_name"]
-
-            updated_chunks.append(updated_chunk)
-
-        logger.info(f"三级分类完成!共处理 {len(updated_chunks)} 个 chunks")
-        return updated_chunks
-
-
-# ==================== 便捷函数 ====================
-
-async def classify_chunks(
-    chunks: List[Dict[str, Any]],
-    config: Optional[ClassifierConfig] = None,
-    progress_callback: Optional[callable] = None
-) -> List[Dict[str, Any]]:
-    """
-    对 chunks 进行三级分类的便捷函数
-
-    Args:
-        chunks: 文档分块列表
-        config: 配置对象(可选)
-        progress_callback: 进度回调函数
-
-    Returns:
-        List[Dict]: 更新后的 chunks 列表
-
-    使用示例:
-        from llm_content_classifier_v2 import classify_chunks
-
-        # 使用默认配置
-        updated_chunks = await classify_chunks(chunks)
-
-        # 使用自定义配置
-        config = ClassifierConfig(
-            model="qwen3.5-122b-a10b",
-            embedding_similarity_threshold=0.85
-        )
-        updated_chunks = await classify_chunks(chunks, config=config)
-    """
-    classifier = LLMContentClassifier(config)
-    return await classifier.classify_chunks(chunks, progress_callback)
-
-
-def classify_chunks_sync(
-    chunks: List[Dict[str, Any]],
-    config: Optional[ClassifierConfig] = None
-) -> List[Dict[str, Any]]:
-    """
-    同步版本的分类函数(阻塞调用)
-
-    Args:
-        chunks: 文档分块列表
-        config: 配置对象(可选)
-
-    Returns:
-        List[Dict]: 更新后的 chunks 列表
-    """
-    try:
-        loop = asyncio.get_running_loop()
-    except RuntimeError:
-        # 没有运行中的事件循环
-        return asyncio.run(classify_chunks(chunks, config))
-
-    # 已有事件循环,创建任务
-    import concurrent.futures
-    with concurrent.futures.ThreadPoolExecutor() as executor:
-        future = executor.submit(
-            asyncio.run,
-            classify_chunks(chunks, config)
-        )
-        return future.result()
-
-
-# ==================== 快速测试入口 ====================
-
-if __name__ == "__main__":
-    import io
-    import sys
-    from datetime import datetime
-
-    # 修复 Windows 终端 UTF-8 输出
-    sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
-
-    TEST_JSON_PATH = Path(r"temp\construction_review\final_result\4148f6019f89e061b15679666f646893-1773993108.json")
-    OUTPUT_DIR = Path(r"temp\construction_review\llm_content_classifier_v2")
-    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
-
-    def _sep(title: str = "", width: int = 70):
-        print(f"\n{'=' * width}\n  {title}\n{'=' * width}" if title else "─" * width)
-
-    def _load_chunks_from_json(json_path: Path) -> List[Dict[str, Any]]:
-        with open(json_path, encoding="utf-8") as f:
-            data = json.load(f)
-        if "document_result" in data:
-            return data["document_result"]["structured_content"]["chunks"]
-        return data["data"]["document_result"]["structured_content"]["chunks"]
-
-    # ── 加载数据 ──────────────────────────────────────────────
-    _sep("加载测试数据")
-    if not TEST_JSON_PATH.exists():
-        print(f"[ERROR] 文件不存在: {TEST_JSON_PATH}")
-        sys.exit(1)
-
-    raw_chunks = _load_chunks_from_json(TEST_JSON_PATH)
-    print(f"原始 chunks 数: {len(raw_chunks)}")
-
-    # ── 运行完整分类流程 ───────────────────────────────────────
-    _sep("运行三级分类(LLMContentClassifier)")
-    config = ClassifierConfig()
-    print(f"模型: {config.model}")
-    print(f"Embedding 模型: {config.embedding_model}")
-    print(f"相似度阈值: {config.embedding_similarity_threshold}")
-
-    classifier = LLMContentClassifier(config)
-    updated_chunks = asyncio.run(classifier.classify_chunks(raw_chunks))
-
-    # ── 保存结果 ──────────────────────────────────────────────
-    _sep("保存结果")
-    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
-    result_file = OUTPUT_DIR / f"result_{ts}.json"
-    with open(result_file, "w", encoding="utf-8") as f:
-        json.dump(updated_chunks, f, ensure_ascii=False, indent=2)
-    print(f"完整结果已保存: {result_file}")
-
-    # ── 控制台汇总展示 ────────────────────────────────────────
-    _sep("分类结果汇总")
-
-    # 按 section_label 聚合三级分类详情
-    section_map: Dict[str, List[Dict]] = {}
-    for chunk in updated_chunks:
-        label = chunk.get("section_label") or chunk.get("chunk_id", "unknown")
-        details = chunk.get("tertiary_classification_details", [])
-        if label not in section_map:
-            section_map[label] = []
-        for d in details:
-            key = d["third_category_code"]
-            if not any(x["third_category_code"] == key for x in section_map[label]):
-                section_map[label].append(d)
-
-    total_third = 0
-    for label, details in section_map.items():
-        print(f"\n[{label}]  三级分类数={len(details)}")
-        for d in details:
-            line_range = f"L{d.get('start_line', '?')}-{d.get('end_line', '?')}"
-            preview = (d.get("content") or "")[:50].replace("\n", " ")
-            print(f"  ├ {d['third_category_name']}({d['third_category_code']})  {line_range}  {preview}...")
-        total_third += len(details)
-
-    _sep()
-    print(f"处理 chunks: {len(updated_chunks)}  |  识别三级分类: {total_third}  |  结果目录: {OUTPUT_DIR}")

+ 0 - 72
core/construction_review/component/reviewers/utils/llm_content_classifier_v2/models.py

@@ -1,72 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-数据模型定义
-"""
-
-from typing import List, Optional, Tuple
-from dataclasses import dataclass, field
-
-
-@dataclass
-class CategoryStandard:
-    """标准分类定义"""
-    first_code: str
-    first_name: str
-    first_seq: int  # 一级序号
-    second_code: str
-    second_name: str
-    second_seq: int  # 二级序号
-    second_focus: str  # 二级分类关注点
-    third_code: str
-    third_name: str
-    third_seq: int  # 三级序号
-    third_focus: str
-    keywords: str = ""
-    extra_prompt: str = ""
-
-
-@dataclass
-class SecondCategoryStandard:
-    """二级分类标准定义(来自construction_plan_standards.csv)"""
-    first_name: str  # 一级分类中文名
-    second_name: str  # 二级分类中文名
-    second_raw_content: str  # 二级分类详细描述
-
-
-@dataclass
-class ClassifiedContent:
-    """分类结果"""
-    third_category_name: str  # 三级分类名称
-    third_category_code: str  # 三级分类代码
-    third_seq: int  # 三级序号
-    start_line: int
-    end_line: int
-    content: str  # 原文内容
-
-
-@dataclass
-class SectionContent:
-    """二级标题内容"""
-    section_key: str  # 如 "第一章->一"
-    section_name: str  # 如 "一)编制依据"
-    lines: List[str]  # 原始行列表
-    numbered_content: str  # 带行号的内容
-    category_standards: List[CategoryStandard] = field(default_factory=list)  # 该二级分类下的三级标准
-    line_number_map: List[int] = field(default_factory=list)  # 每行对应的全局行号(如果有)
-    chunk_ranges: List[Tuple[str, int, int]] = field(default_factory=list)  # [(chunk_id, global_start, global_end), ...]
-
-
-@dataclass
-class ClassificationResult:
-    """分类结果"""
-    model: str
-    section_key: str
-    section_name: str
-    classified_contents: List[ClassifiedContent]
-    latency: float
-    raw_response: str = ""
-    error: Optional[str] = None
-    total_lines: int = 0  # 该section的总行数
-    classified_lines: int = 0  # 已分类的行数
-    coverage_rate: float = 0.0  # 分类率(已分类行数/总行数)

+ 0 - 365
core/construction_review/component/reviewers/utils/llm_content_classifier_v2/prompt.py

@@ -1,365 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-提示词模板集中管理
-
-所有对外 LLM 调用的 system_prompt 和 user_prompt 均在此定义。
-
-================================================================================
-使用场景总览
-================================================================================
-
-1. 主分类调用 (CLASSIFY_SYSTEM_PROMPT + build_classify_prompt)
-   - 调用位置: ContentClassifierClient.classify_content() → _classify_single_chunk() → _call_api()
-   - 触发时机: 对二级分类下的文档内容进行三级分类识别时
-   - 输入: SectionContent(包含二级分类标准、带行号的文档内容)
-   - 输出: JSON 格式的分类结果列表(category_index 为数字索引)
-   - 特点:
-     * 支持长内容分块处理(超过150行自动切分)
-     * 使用数字索引避免模型输出复杂代码字符串
-     * 包含详细的分类指南和示例
-
-2. JSON 修复调用 (build_fix_prompt)
-   - 调用位置: ContentClassifierClient._parse_with_fix()
-   - 触发时机: 主分类返回的 JSON 解析失败时(最多重试3次)
-   - 输入: 原始模型输出(格式错误的 JSON 字符串)
-   - 输出: 修复后的合法 JSON
-   - 特点: 严格保持业务数据完整性,仅修复语法错误
-
-3. 补充验证调用 (SUPPLEMENT_VERIFY_SYSTEM_PROMPT + build_supplement_verify_prompt)
-   - 调用位置: ContentClassifierClient._detect_and_supplement() → _call_supplement_verification()
-   - 触发时机:
-     * 主分类完成后,扫描发现某些三级分类可能遗漏时
-     * 通过 keywords 字段匹配到未覆盖行时
-     * 表格内容需要二次确认时
-   - 输入: 待确认的 CategoryStandard + 相关文本片段
-   - 输出: "存在" 或 "不存在"
-   - 特点: 轻量级验证,max_tokens=10,temperature=0.0
-
-================================================================================
-调用流程图
-================================================================================
-
-classify_content()
-    ├── Embedding 相似度检查(可选跳过)
-    ├── _classify_single_chunk()
-    │       ├── build_classify_prompt() ──→ LLM 主分类
-    │       └── _parse_with_fix()
-    │               └── build_fix_prompt() ──→ LLM 修复 JSON(失败时)
-    ├── _detect_and_supplement()
-    │       └── build_supplement_verify_prompt() ──→ LLM 确认遗漏(逐个标准)
-    └── 合并结果并返回
-"""
-
-from typing import List
-
-from .models import CategoryStandard, SectionContent
-
-
-# ================================================================================
-# 主分类调用
-# ================================================================================
-
-# 使用场景:ContentClassifierClient._call_api() 中的 system 消息
-# 作用:定义模型角色为"施工方案文档分析专家",约束输出格式和行为
-CLASSIFY_SYSTEM_PROMPT = """你是专业的施工方案文档分析专家。你的任务是:
-1. 仔细阅读文档内容,理解每行的语义
-2. 将内容归类到给定的三级分类标准中
-3. 【重要】优先使用标准分类,只有完全不符合时才使用索引0(非标准项)
-4. 【重要】连续相同分类的多行必须合并为一个条目
-5. 【重要】当一行同时提及多个主体或类别(如"勘察、设计和监测单位"),必须为每个主体单独输出一条条目,行号相同
-6. 【重要】输出格式:category_index必须是纯数字(0,1,2...),禁止输出文本名称或代码
-7. 必须在给定的三级分类标准范围内分类,禁止创造新的分类
-8. 只输出JSON格式结果,不要任何解释文字"""
-
-
-def build_classify_prompt(section: SectionContent, is_chunk: bool = False) -> str:
-    """
-    构建主分类的 user prompt。
-
-    使用场景:
-        ContentClassifierClient._classify_single_chunk() 中调用,
-        将 SectionContent 转换为 LLM 可理解的分类任务描述。
-
-    参数说明:
-        section: 包含二级分类标准、文档内容和行号映射的段落对象
-        is_chunk: 是否为分块处理(长文档会被切分成多个 chunk 依次处理)
-
-    输出格式:
-        完整的 user prompt 字符串,包含:
-        - 当前文档位置(一级/二级分类信息)
-        - 三级分类标准列表(带数字索引)
-        - 带行号的文档内容
-        - 分类任务指南(核心原则、示例、行号规则、多主体拆分规则)
-        - 索引映射表(供后处理转换使用)
-        - 强制约束(category_index 必须是数字)
-
-    注意事项:
-        - 内容超过 12000 字符会自动截断,并添加截断提示
-        - 分块处理时会添加 chunk_hint 提示模型当前是文档的一部分
-        - 使用数字索引(1-N)而非分类代码,避免模型输出错误
-    """
-    # 获取二级分类信息
-    second_code = ""
-    second_name = section.section_name
-    first_code = ""
-    first_name = ""
-
-    if section.category_standards:
-        first_code = section.category_standards[0].first_code
-        first_name = section.category_standards[0].first_name
-        second_code = section.category_standards[0].second_code
-
-    # 构建三级分类标准描述(使用数字索引,模型只需输出索引号)
-    standards_desc = []
-    for i, std in enumerate(section.category_standards, 1):
-        focus_content = std.third_focus if std.third_focus else "(无具体关注要点)"
-        extra_hint = f"\n   【特别说明】{std.extra_prompt}" if std.extra_prompt else ""
-        standards_desc.append(
-            f"{i}. {std.third_name}\n"
-            f"   【识别要点】{focus_content}{extra_hint}"
-        )
-
-    # 添加非标准项作为兜底分类(索引0)
-    standards_desc.insert(0, "0. 非标准项\n   【识别要点】仅当内容完全不符合以上任何分类标准时使用,如页眉页脚、纯表格分隔线、无关的广告语等")
-
-    standards_text = '\n\n'.join(standards_desc) if standards_desc else "无具体标准,请根据内容自行判断"
-
-    # 构建索引映射表(用于后处理转换)
-    index_mapping_lines = ["0 -> 非标准项 (no_standard)"]
-    for i, std in enumerate(section.category_standards, 1):
-        index_mapping_lines.append(f"{i} -> {std.third_name} ({std.third_code})")
-    index_mapping_text = "\n".join(index_mapping_lines)
-
-    # 计算内容长度和分段提示
-    max_content_length = 12000
-    content_to_use = section.numbered_content[:max_content_length]
-    is_truncated = len(section.numbered_content) > max_content_length
-
-    if is_chunk and section.line_number_map:
-        chunk_hint = (
-            f"\n【注意】这是文档的一个分块(行号 {section.line_number_map[0]}~{section.line_number_map[-1]}),"
-            f"请对有实质内容的行进行分类,空行和纯符号行无需单独输出。\n"
-        )
-    elif is_chunk:
-        chunk_hint = "\n【注意】这是文档的一个分块,请对有实质内容的行进行分类。\n"
-    else:
-        chunk_hint = ""
-
-    truncation_hint = (
-        f"\n【提示】内容较长已截断,当前显示前{max_content_length}字符,请对显示的内容进行完整分类。\n"
-        if is_truncated else ""
-    )
-
-    line_start = section.line_number_map[0] if section.line_number_map else 1
-    line_end = section.line_number_map[-1] if section.line_number_map else len(section.lines)
-
-    return f"""你是一个专业的施工方案文档分析专家。请根据给定的三级分类标准,识别文档内容中属于各个三级分类的部分。{chunk_hint}{truncation_hint}
-
-## 当前文档位置
-- 一级分类: {first_name} ({first_code})
-- 二级分类: {second_name} ({second_code})
-
-## 三级分类标准(共{len(section.category_standards)}个,必须在此范围内分类)
-
-{standards_text}
-
----
-
-## 文档内容(每行以<行号>开头,共{len(section.lines)}行)
-```
-{content_to_use}
-```
-
----
-
-## 分类任务指南
-
-### 核心原则(按优先级排序)
-1. **优先匹配标准分类**:首先判断内容是否符合上述任何一个三级分类标准
-2. **关键词匹配**:内容中出现与分类名称相关的关键词时,应归类到该分类
-3. **语义相关**:即使没有精确关键词,只要语义相关,也应归类
-4. **非标准项谨慎使用**:只有当内容完全不符合任何标准分类时,才使用"非标准项"
-
-### 通用索引说明归类规则(重要)
-当文档内容仅包含"详见附表"、"详见另册"、"见附表"、"见另册"、"专详见另册"等通用索引说明,且某三级分类的【特别说明】明确允许此类表述视为符合时,必须将该内容同时归类到这些三级分类中,禁止标记为"非标准项"。
-
-### 分类示例
-- 看到"验收内容"、"验收标准"、"验收程序"等内容 → 归类到对应的三级分类
-- 看到"检验方法"、"检查内容"等 → 可能属于"检查要求"或"验收内容"
-- 看到"材料"、"钢筋"、"混凝土"等 → 关注上下文判断所属三级分类
-
-### 行号处理规则
-- **必须合并连续行**:连续多行属于同一分类时,合并为一个条目(start_line为起始,end_line为结束)
-- **禁止逐行输出**:不要为每一行单独创建条目
-- **允许重复分类**:同一行内容可以同时属于多个三级分类
-
-### 多主体句拆分规则(重要)
-- 当一行内容同时提及多个不同主体或类别时,**必须为每个主体单独输出一条分类条目,行号相同**
-- 示例:`"3、有关勘察、设计和监测单位项目技术负责人"` 同时涉及设计单位和监测单位,应输出:
-  - `{{"third_category_code": "DesignUnitXxx", "start_line": N, "end_line": N}}`
-  - `{{"third_category_code": "MonitoringUnitXxx", "start_line": N, "end_line": N}}`
-- 示例:`"总承包单位和分包单位技术负责人"` 同时涉及施工单位,应归入施工单位对应分类
-- 凡是"A、B和C单位"句式,需逐一判断每个主体能否对应某个三级分类
-
-### 自查清单
-- [ ] 是否优先使用了标准分类而非"非标准项"?
-- [ ] 连续相同分类的行是否已合并?
-- [ ] 分类名称是否与标准列表完全一致?
-- [ ] 包含多个主体的行是否已拆分为多条输出?
-
-## 索引映射表(用于后处理转换,你只需输出索引号)
-{index_mapping_text}
-
-## 输出格式(严格JSON,不要任何其他文字)
-```{{
-    "classified_contents_list": [
-        {{
-            "category_index": 数字索引号,
-            "start_line": 起始行号,
-            "end_line": 结束行号
-        }}
-    ]
-}}
-```
-
-## 强制约束
-1. **category_index 必须是数字**(0, 1, 2, 3...),对应上述索引映射表
-2. 0 表示非标准项,1-{len(section.category_standards)} 对应各个三级分类
-3. **禁止输出文本名称或代码**,只输出数字索引
-4. 行号范围: {line_start} - {line_end}
-5. 只输出JSON,禁止任何解释文字"""
-
-
-# ================================================================================
-# JSON 修复调用
-# ================================================================================
-
-def build_fix_prompt(original_response: str) -> str:
-    """
-    构建 JSON 格式修复的 user prompt。
-
-    使用场景:
-        ContentClassifierClient._parse_with_fix() 中调用,
-        当主分类返回的 JSON 解析失败时,请求模型修复格式错误。
-
-    参数说明:
-        original_response: 原始模型输出,包含 JSON 格式错误(如缺少逗号、
-                          括号不匹配、引号问题等)
-
-    修复策略:
-        1. 严格保持原始数据的完整性和内容,不修改业务数据
-        2. 仅修复 JSON 语法错误(逗号、括号、引号等)
-        3. 确保输出合法的 JSON 格式
-        4. 强制 category_index 为数字索引
-
-    注意事项:
-        - 最多截取前 6000 字符进行修复(避免超出上下文限制)
-        - 如果原始内容被截断,修复已提供的部分即可
-        - 这是自动重试机制的一部分,最多重试 3 次
-    """
-    return f"""你之前的输出存在JSON格式错误,请修复以下内容为正确的JSON格式。
-
-## 修复要求
-1. 严格保持原始数据的完整性和内容,不要修改任何业务数据
-2. 只修复JSON语法错误(如缺少逗号、括号不匹配、引号问题等)
-3. 确保输出的是合法的JSON格式
-4. 【重要】category_index 必须是数字索引(0, 1, 2...),禁止输出文本名称或代码
-5. 输出必须严格符合以下结构:
-{{
-    "classified_contents_list": [
-        {{
-            "category_index": 数字索引号,
-            "start_line": 数字,
-            "end_line": 数字
-        }}
-    ]
-}}
-
-## 原始输出(需要修复的内容)
-```
-{original_response[:6000]}
-```
-
-注意:
-- 只输出JSON,不要任何解释文字
-- 如果原始内容被截断,修复已提供的部分即可
-- category_index 只能是数字,如 0(非标准项)、1、2、3..."""
-
-
-# ================================================================================
-# 补充验证调用
-# ================================================================================
-
-# 使用场景:ContentClassifierClient._call_supplement_verification() 中的 system 消息
-# 作用:定义模型角色为"内容审查专家",约束输出为二值判断(存在/不存在)
-SUPPLEMENT_VERIFY_SYSTEM_PROMPT = '你是施工方案内容审查专家,请根据提供的内容作出判断,只回答"存在"或"不存在",不要任何其他文字。'
-
-
-def build_supplement_verify_prompt(
-    std: CategoryStandard,
-    chunk_text: str,
-    start: int,
-    end: int,
-    hit_lines: List[int],
-    matched_kws: List[str],
-    is_table: bool = False
-) -> str:
-    """
-    构建补充验证的 user prompt。
-
-    使用场景:
-        ContentClassifierClient._detect_and_supplement() → _call_supplement_verification()
-        在主分类完成后,对某些可能遗漏的三级分类进行二次确认。
-
-    触发条件:
-        1. 普通路径:扫描 section 全文,发现某些三级分类的 keywords 出现在
-           未被 LLM 分类覆盖的行中
-        2. 表格路径:section 包含表格特征(含"序号/作业活动/风险源/防范措施"
-           或 | 符号较多),某些分类可能隐藏在表格列中
-
-    参数说明:
-        std: 待确认的三级分类标准(包含 third_name, third_focus, keywords)
-        chunk_text: 待审查的文本片段(根据 hit_lines 提取)
-        start: 文本片段起始行号
-        end: 文本片段结束行号
-        hit_lines: 匹配到 keyword 的行号列表
-        matched_kws: 匹配到的关键词列表
-        is_table: 是否为表格路径(影响 trigger 说明文字)
-
-    输出格式:
-        模型应只回答"存在"或"不存在"
-
-    调用特点:
-        - 轻量级调用:max_tokens=10, temperature=0.0
-        - 逐个标准独立调用(非批量)
-        - 包含组织层级说明(区分总公司/子公司/桥梁公司)
-    """
-    if is_table:
-        trigger = "该内容块包含表格,表格中多列信息混排,以下分类在主分类阶段未被识别,需确认是否存在于表格中"
-    else:
-        trigger = f"以下关键字在文档中被检测到:{'、'.join(matched_kws)}(出现于第 {hit_lines} 行)"
-
-    return f"""你是一个施工方案内容分类专家。
-
-【组织层级说明】
-本项目的组织层级如下,判断时请严格区分:
-- 四川路桥(总公司)= 四川公路桥梁建设集团有限公司,文件通常以"四川公路桥梁"开头或含"SCQJ"
-- 路桥集团(子公司)= 四川路桥集团有限公司,文件中出现"四川路桥集团"即属于路桥集团(子公司),而非总公司
-- 桥梁公司(子公司)= 四川路桥桥梁公司,文件中出现"四川路桥桥梁公司"或"桥梁公司"即属于桥梁公司(子公司)
-
-【待审查内容】(第 {start}~{end} 行)
-{chunk_text}
-
-【待确认的分类】
-分类名称:{std.third_name}
-识别说明:{std.third_focus}
-{f'特别说明:{std.extra_prompt}' if std.extra_prompt else ''}
-
-【触发原因】
-{trigger}
-
-【问题】
-上述文档内容中,是否包含"{std.third_name}"相关的实质内容?
-
-请仅回答"存在"或"不存在":"""

+ 0 - 132
core/construction_review/component/reviewers/utils/llm_content_classifier_v2/text_split_utils.py

@@ -1,132 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-文本切块工具函数
-"""
-
-from typing import Any, Dict, List, Tuple
-
-
-def _is_markdown_table_line(line: str) -> bool:
-    """判断一行是否为 Markdown 表格行(以 | 开头且以 | 结尾)"""
-    stripped = line.strip()
-    return stripped.startswith('|') and stripped.endswith('|') and len(stripped) >= 3
-
-
-def _split_text_lines_with_overlap(
-    lines: List[str],
-    max_chars: int,
-    overlap_chars: int
-) -> List[List[str]]:
-    """
-    将文本行列表按字符数切分,相邻 chunk 之间保留重叠。
-
-    - 普通行(<= max_chars):积累到超限时 flush,下一个 chunk 以末尾若干行作重叠头。
-    - 超长行(> max_chars):先 flush 当前积累,再对该行做字符级滑窗切分,
-      每片段 max_chars 字符,步长 max_chars - overlap_chars(即相邻片段重叠 overlap_chars)。
-    """
-    if not lines:
-        return []
-
-    chunks: List[List[str]] = []
-    current_lines: List[str] = []
-    current_chars: int = 0
-
-    def _flush():
-        """保存当前 chunk,并以末尾若干行作为下一个 chunk 的重叠起始。"""
-        nonlocal current_lines, current_chars
-        if not current_lines:
-            return
-        chunks.append(list(current_lines))
-        overlap_lines: List[str] = []
-        overlap_len: int = 0
-        for prev in reversed(current_lines):
-            overlap_lines.insert(0, prev)
-            overlap_len += len(prev)
-            if overlap_len >= overlap_chars:
-                break
-        current_lines = overlap_lines
-        current_chars = overlap_len
-
-    for line in lines:
-        line_chars = len(line)
-
-        if line_chars > max_chars:
-            # 超长行:先 flush,再对该行做字符级滑窗切分
-            _flush()
-            step = max_chars - overlap_chars  # 滑动步长
-            start = 0
-            while start < line_chars:
-                piece = line[start: start + max_chars]
-                chunks.append([piece])
-                start += step
-            # 以最后一片段末尾的 overlap_chars 个字符作重叠起始
-            last_piece = line[max(0, line_chars - overlap_chars):]
-            current_lines = [last_piece]
-            current_chars = len(last_piece)
-        else:
-            # 普通行:加入后超限则先 flush
-            if current_chars + line_chars > max_chars and current_lines:
-                _flush()
-            current_lines.append(line)
-            current_chars += line_chars
-
-    if current_lines:
-        chunks.append(current_lines)
-
-    return chunks
-
-
-def split_section_into_chunks(
-    lines: List[str],
-    max_chars: int = 600,
-    overlap_chars: int = 30
-) -> List[Dict[str, Any]]:
-    """
-    将二级分类下的行列表切分为 chunks。
-
-    规则:
-    - Markdown 表格(以 | 开头且以 | 结尾的连续行)作为独立 chunk,不切断、不与其他内容合并、无重叠。
-    - 普通文本按 max_chars 字符数切分,相邻 chunk 之间有 overlap_chars 字符的重叠。
-    - 单行超过 max_chars 时做字符级滑窗切分,相邻片段之间同样保留 overlap_chars 重叠。
-
-    Args:
-        lines:         行列表(不含行号标记)
-        max_chars:     每个文本 chunk 的最大字符数,默认 600
-        overlap_chars: 相邻文本 chunk 的重叠字符数,默认 30
-
-    Returns:
-        List[Dict]: 每个元素包含:
-            - 'type':  'text' 或 'table'
-            - 'lines': 该 chunk 对应的行列表
-    """
-    if not lines:
-        return []
-
-    # Step 1:将行序列分割为交替的 table_segment / text_segment
-    segments: List[Tuple[str, List[str]]] = []
-    i = 0
-    while i < len(lines):
-        if _is_markdown_table_line(lines[i]):
-            table_lines: List[str] = []
-            while i < len(lines) and _is_markdown_table_line(lines[i]):
-                table_lines.append(lines[i])
-                i += 1
-            segments.append(('table', table_lines))
-        else:
-            text_lines: List[str] = []
-            while i < len(lines) and not _is_markdown_table_line(lines[i]):
-                text_lines.append(lines[i])
-                i += 1
-            segments.append(('text', text_lines))
-
-    # Step 2:表格段整体输出;文本段按字符数切分并加重叠
-    result: List[Dict[str, Any]] = []
-    for seg_type, seg_lines in segments:
-        if seg_type == 'table':
-            result.append({'type': 'table', 'lines': seg_lines})
-        else:
-            for chunk_lines in _split_text_lines_with_overlap(seg_lines, max_chars, overlap_chars):
-                result.append({'type': 'text', 'lines': chunk_lines})
-
-    return result

+ 0 - 50
scripts/check_imports.py

@@ -1,50 +0,0 @@
-"""跨层导入检查脚本。用法: python scripts/check_imports.py"""
-import sys
-import io
-from pathlib import Path
-
-# Windows 控制台编码兼容
-sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
-
-ROOT = Path(__file__).parent.parent
-violations = 0
-
-RULES = [
-    {
-        "name": "views/ 禁止直接导入 foundation/database/",
-        "check_dirs": ["views"],
-        "forbidden": ["foundation.database"],
-    },
-    {
-        "name": "reviewers/ 禁止直接导入 foundation/database/",
-        "check_dirs": [
-            "core/construction_review/component/reviewers",
-            "core/construction_review/component/doc_worker",
-        ],
-        "forbidden": ["foundation.database"],
-    },
-]
-
-for rule in RULES:
-    print(f"\n检查: {rule['name']}")
-    for check_dir in rule["check_dirs"]:
-        full_dir = ROOT / check_dir
-        if not full_dir.exists():
-            print(f"  跳过(目录不存在): {check_dir}")
-            continue
-        for py_file in full_dir.rglob("*.py"):
-            try:
-                content = py_file.read_text(encoding="utf-8")
-            except Exception:
-                continue
-            for pattern in rule["forbidden"]:
-                import_line = f"from {pattern}"
-                if import_line in content:
-                    print(f"  ❌ {py_file.relative_to(ROOT)}")
-                    violations += 1
-
-if violations:
-    print(f"\n共发现 {violations} 处跨层导入违规")
-    sys.exit(1)
-else:
-    print("\n跨层导入检查通过")

+ 0 - 103
utils_test/Milvus_Test/debug_simple.py

@@ -1,103 +0,0 @@
-#!/usr/bin/env python3
-"""
-简化调试 - 检查集合字段结构
-"""
-
-import sys
-import os
-
-
-print("调试 LangChain Milvus 集合字段结构")
-print("=" * 50)
-
-def debug_collection():
-    """调试集合结构"""
-    try:
-        from langchain_milvus import Milvus, BM25BuiltInFunction
-        from langchain_core.documents import Document
-        from pymilvus import Collection, utility
-        from foundation.ai.models.model_handler import model_handler
-
-        # 连接参数
-        connection_args = {
-            "uri": "http://192.168.92.61:19530",
-            "user": None,
-            "db_name": "lq_db"
-        }
-
-        collection_name = "debug_simple_fields"
-
-        # 获取嵌入模型
-        emdmodel = model_handler._get_lq_qwen3_8b_emd()
-
-        # 创建测试文档
-        test_docs = [
-            Document(page_content="测试文档内容", metadata={"category": "test"})
-        ]
-
-        print("1. 创建 LangChain Milvus 混合搜索集合...")
-        vectorstore = Milvus.from_documents(
-            documents=test_docs,
-            embedding=emdmodel,
-            builtin_function=BM25BuiltInFunction(),
-            vector_field=["dense", "sparse"],
-            connection_args=connection_args,
-            collection_name=collection_name,
-            consistency_level="Strong",
-            drop_old=True,
-        )
-
-        print("集合创建成功")
-
-        # 等待索引创建
-        import time
-        time.sleep(3)
-
-        print("\n2. 检查集合结构...")
-        if utility.has_collection(collection_name):
-            collection = Collection(collection_name)
-
-            # 获取集合信息
-            print(f"集合名称: {collection.name}")
-            print(f"集合数量: {collection.num_entities}")
-
-            # 获取字段信息
-            schema = collection.schema
-            print(f"\n字段结构:")
-            for field in schema.fields:
-                print(f"  - 字段名: {field.name}")
-                print(f"    类型: {field.dtype}")
-                print(f"    是否主键: {field.is_primary}")
-                if hasattr(field, 'dim'):
-                    print(f"    维度: {field.dim}")
-                if hasattr(field, 'max_length'):
-                    print(f"    最大长度: {field.max_length}")
-                print()
-
-            # 获取索引信息
-            print("索引信息:")
-            try:
-                indexes = collection.indexes
-                for index in indexes:
-                    print(f"  - 索引字段: {index.field_name}")
-                    print(f"    索引类型: {index.index_type}")
-                    print(f"    索引参数: {index.params}")
-                    print()
-            except Exception as e:
-                print(f"获取索引失败: {e}")
-
-        # 清理
-        if utility.has_collection(collection_name):
-            utility.drop_collection(collection_name)
-            print(f"清理测试集合: {collection_name}")
-
-        return True
-
-    except Exception as e:
-        print(f"调试失败: {e}")
-        import traceback
-        traceback.print_exc()
-        return False
-
-if __name__ == "__main__":
-    debug_collection()