Jelajahi Sumber

Merge branch 'dev' into dev_sgsc_wxm

WangXuMing 3 minggu lalu
induk
melakukan
3613b7d091
29 mengubah file dengan 3429 tambahan dan 17 penghapusan
  1. 101 0
      core/construction_review/component/ai_review_engine.py
  2. 94 0
      core/construction_review/component/desensitize/README.md
  3. 24 0
      core/construction_review/component/desensitize/__init__.py
  4. 292 0
      core/construction_review/component/desensitize/dict_manager.py
  5. 293 0
      core/construction_review/component/desensitize/engine.py
  6. 267 0
      core/construction_review/component/desensitize/model_client.py
  7. 39 0
      core/construction_review/component/desensitize/processors/base_processor.py
  8. 155 0
      core/construction_review/component/desensitize/processors/biz_processor.py
  9. 138 0
      core/construction_review/component/desensitize/processors/financial_processor.py
  10. 169 0
      core/construction_review/component/desensitize/processors/geo_processor.py
  11. 117 0
      core/construction_review/component/desensitize/processors/pii_processor.py
  12. 236 0
      core/construction_review/component/desensitize/remapper.py
  13. 268 0
      core/construction_review/component/desensitize/validator.py
  14. 34 2
      core/construction_review/component/reviewers/timeliness_basis_reviewer.py
  15. 487 0
      core/construction_review/component/reviewers/timeliness_content_reviewer.py
  16. 22 1
      core/construction_review/component/reviewers/utils/inter_tool.py
  17. 1 0
      core/construction_review/component/reviewers/utils/reference_matcher.py
  18. 1 0
      core/construction_review/workflows/ai_review_workflow.py
  19. 25 0
      core/construction_review/workflows/core_functions/ai_review_core_fun.py
  20. 16 0
      foundation/infrastructure/config/config.py
  21. 145 0
      problem.json
  22. 3 0
      server/app.py
  23. 141 0
      test_content_timeliness.py
  24. 358 0
      views/construction_review/desensitize_api.py
  25. 2 4
      views/construction_review/file_upload.py
  26. 1 6
      views/construction_review/review_results.py
  27. 0 1
      views/construction_review/task_control.py
  28. 0 1
      views/construction_write/content_completion.py
  29. 0 2
      views/construction_write/outline_views.py

+ 101 - 0
core/construction_review/component/ai_review_engine.py

@@ -1129,6 +1129,107 @@ class AIReviewEngine(BaseReviewer):
                 }
                 }
             }
             }
         
         
+    async def timeliness_content_reviewer(self, review_data: Dict[str, Any], trace_id: str,
+                                state: dict = None, stage_name: str = None) -> Dict[str, Any]:
+        """
+        执行三级分类内容时效性审查:检查tertiary_classification_details中引用的规范是否过时
+
+        Args:
+            review_data: 待审查数据,包含tertiary_classification_details
+            trace_id: 追踪ID
+            state: 状态字典
+            stage_name: 阶段名称
+
+        Returns:
+            审查结果字典,包含内容时效性审查结果
+        """
+        start_time = time.time()
+        try:
+            logger.info(f"开始三级分类内容时效性审查,trace_id: {trace_id}")
+
+            # 提取三级分类详情
+            tertiary_details = review_data.get('tertiary_classification_details', [])
+            max_concurrent = review_data.get('max_concurrent', 4)
+
+            if not tertiary_details:
+                logger.warning("三级分类详情为空,将跳过内容时效性审查")
+                return {
+                    "timeliness_content_review_results": {
+                        "review_results": [],
+                        "total_items": 0,
+                        "issue_items": 0,
+                        "execution_time": time.time() - start_time,
+                        "error_message": None,
+                        "message": "未找到三级分类详情,跳过内容时效性审查"
+                    }
+                }
+
+            logger.info(f"提取到 {len(tertiary_details)} 个三级分类详情")
+
+            # 调用内容时效性审查
+            try:
+                # 使用信号量控制并发
+                async with self.semaphore:
+                    # 从state中获取progress_manager和callback_task_id
+                    progress_manager = state.get('progress_manager') if state else None
+                    callback_task_id = state.get('callback_task_id') if state else None
+
+                    # 调用内容时效性审查器
+                    from core.construction_review.component.reviewers.timeliness_content_reviewer import ContentTimelinessReviewer
+                    async with ContentTimelinessReviewer(max_concurrent=max_concurrent) as reviewer:
+                        timeliness_content_results = await reviewer.review_tertiary_content(
+                            tertiary_details=tertiary_details,
+                            collection_name="first_bfp_collection_status",
+                            progress_manager=progress_manager,
+                            callback_task_id=callback_task_id
+                        )
+
+                    logger.info(f"内容时效性审查完成,发现问题数量: {len(timeliness_content_results)}")
+
+                    # 统计审查结果
+                    total_items = len(timeliness_content_results)
+                    issue_items = sum(1 for item in timeliness_content_results if item.get('exist_issue', False))
+
+                    logger.info(f"审查统计 - 总规范引用: {total_items}, 问题项: {issue_items}")
+
+            except Exception as e:
+                logger.error(f"内容时效性审查失败: {str(e)}")
+                return {
+                    "timeliness_content_review_results": {
+                        "review_results": [],
+                        "total_items": 0,
+                        "issue_items": 0,
+                        "execution_time": time.time() - start_time,
+                        "error_message": f"内容时效性审查失败: {str(e)}"
+                    }
+                }
+
+            # 返回完整结果
+            return {
+                "timeliness_content_review_results": {
+                    "review_results": timeliness_content_results,
+                    "total_items": total_items,
+                    "issue_items": issue_items,
+                    "execution_time": time.time() - start_time,
+                    "error_message": None
+                }
+            }
+
+        except Exception as e:
+            execution_time = time.time() - start_time
+            error_msg = f"内容时效性审查失败: {str(e)}"
+            logger.error(error_msg, exc_info=True)
+
+            return {
+                "timeliness_content_review_results": {
+                    "review_results": [],
+                    "total_items": 0,
+                    "issue_items": 0,
+                    "execution_time": execution_time,
+                    "error_message": error_msg
+                }
+            }
+
     async def timeliness_basis_reviewer(self, review_data: Dict[str, Any], trace_id: str,
     async def timeliness_basis_reviewer(self, review_data: Dict[str, Any], trace_id: str,
                                 state: dict = None, stage_name: str = None) -> Dict[str, Any]:
                                 state: dict = None, stage_name: str = None) -> Dict[str, Any]:
         """
         """

+ 94 - 0
core/construction_review/component/desensitize/README.md

@@ -0,0 +1,94 @@
+# 施工方案数据脱敏模块
+
+基于 `wlast.md` (v2.3) 文档实现的数据脱敏系统。
+
+## 目录结构
+
+```
+desensitize/
+├── __init__.py              # 模块导出
+├── engine.py                # 脱敏引擎核心
+├── validator.py             # 黑白名单校验器
+├── dict_manager.py          # 脱敏字典管理器
+├── model_client.py          # 本地大模型客户端
+├── remapper.py              # 审查结果逆向映射器
+├── processors/              # 四维度处理器
+│   ├── base_processor.py    # 处理器基类
+│   ├── pii_processor.py     # PII脱敏(姓名/手机/身份证)
+│   ├── geo_processor.py     # 地理坐标脱敏(桩号/地名/高程)
+│   ├── biz_processor.py     # 商业标识脱敏(企业名/项目名)
+│   └── financial_processor.py # 财务数据脱敏(金额/单价)
+└── README.md                # 本文件
+```
+
+## 四维度脱敏规则
+
+| 维度 | 脱敏对象 | 脱敏策略 | 保留逻辑 |
+|------|---------|---------|---------|
+| PII | 姓名、电话、身份证、证书编号 | 占位符/掩码替换 | 角色关系与排班逻辑 |
+| 地理坐标 | 桩号、地名、高程、桥隧名称 | 相对化处理/序列化 | 相对长度与工程实体序列 |
+| 商业标识 | 项目名、企业名、合同编号 | 泛化替换 | 组织角色架构 |
+| 财务数据 | 金额、单价、工程量 | 全量掩码 | 表格科目结构 |
+
+## 两阶段部署策略
+
+### 阶段一(测试)
+- 配置:`DEPLOY_PHASE = test`
+- 使用外部大模型 API 测试验证
+- 仅用于测试数据集,严禁生产文档
+
+### 阶段二(生产)
+- 配置:`DEPLOY_PHASE = local`
+- 方案A:`DESENSITIZE_SCHEME = scheme_a`(规则引擎主导 + 轻量本地模型兜底)
+- 方案B:`DESENSITIZE_SCHEME = scheme_b`(本地大模型全量语义驱动)
+
+## API 接口
+
+| 接口 | 路径 | 功能 |
+|------|------|------|
+| 文档脱敏 | `POST /desensitize/document` | 四维度脱敏处理 |
+| 脱敏校验 | `POST /desensitize/validate` | 黑白名单校验 |
+| 结果还原 | `POST /desensitize/remap` | 审查结果逆向映射 |
+| 字典查询 | `GET /desensitize/dict/{task_id}` | 查询字典元信息 |
+| 文本脱敏 | `POST /desensitize/text` | 简化版文本脱敏 |
+
+## 配置说明
+
+在 `config/config.ini` 中添加以下配置:
+
+```ini
+[desensitize]
+DEPLOY_PHASE = test              # 部署阶段: test/local
+DESENSITIZE_SCHEME = scheme_a    # 本地方案: scheme_a/scheme_b
+DEFAULT_LEVEL = standard         # 默认脱敏级别
+DICT_STORAGE_PATH = ./data/desensitize_dicts
+ENABLE_VALIDATION = true
+VALIDATION_FAILURE_ACTION = block
+```
+
+## 安全约束
+
+1. **外部调用前置脱敏原则**:脱敏是调用任何外部大模型 API 的强制前提
+2. **本地化优先原则**:所有脱敏动作在企业内部本地服务器完成
+3. **可逆映射原则**:本地保留脱敏字典,支持结果还原
+4. **纵深防御原则**:规则引擎 + 本地 AI 模型 + 黑白名单校验三层防护
+
+## 示例
+
+**原始文本**:
+```
+映雪特大桥(D1K86+279.91~D1K87+279.91)跨越云雾河,全长1000m。
+项目经理张三(电话:13812345678)负责现场安全管理,安全费用预算 ¥1,250,000.00 元。
+```
+
+**脱敏后**:
+```
+[1号特大桥](K0+000~K1+000)跨越[1号河流],全长1000m。
+[项目经理A](电话:[手机号脱敏])负责现场安全管理,安全费用预算 [金额脱敏] 元。
+```
+
+## 运行测试
+
+```bash
+python test_desensitize.py
+```

+ 24 - 0
core/construction_review/component/desensitize/__init__.py

@@ -0,0 +1,24 @@
+"""
+施工方案数据脱敏模块
+
+提供四维度脱敏能力:PII、地理坐标、商业标识、财务数据
+支持外部API测试和本地部署两阶段切换
+"""
+
+from .engine import LocalDesensitizationEngine, DesensitizedResult, EntityRegistry
+from .dict_manager import DictManager
+from .validator import BlackWhiteListChecker, ValidationResult
+from .remapper import ResultRemapper, RemapResult
+from .model_client import DesensitizeModelClient
+
+__all__ = [
+    "LocalDesensitizationEngine",
+    "DesensitizedResult",
+    "EntityRegistry",
+    "DictManager",
+    "BlackWhiteListChecker",
+    "ValidationResult",
+    "ResultRemapper",
+    "RemapResult",
+    "DesensitizeModelClient",
+]

+ 292 - 0
core/construction_review/component/desensitize/dict_manager.py

@@ -0,0 +1,292 @@
+"""
+脱敏字典管理器
+
+负责脱敏字典的加密存储、读取和生命周期管理。
+本地保留脱敏字典,支持将云端审查结果还原为真实工程术语。
+
+根据 wlast.md 文档第6.3.1节和附录A设计
+"""
+
+import json
+import hashlib
+import logging
+from typing import Dict, Any, Optional
+from datetime import datetime
+from pathlib import Path
+
+try:
+    from cryptography.fernet import Fernet
+    from cryptography.hazmat.primitives import hashes
+    from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
+    import base64
+    CRYPTO_AVAILABLE = True
+except ImportError:
+    CRYPTO_AVAILABLE = False
+
+from foundation.infrastructure.config.config import config_handler
+from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory
+
+logger = logging.getLogger(__name__)
+
+
+class DictManager:
+    """脱敏字典管理器
+
+    功能:
+    1. 脱敏字典的加密存储(本地文件 + Redis缓存)
+    2. 脱敏字典的读取和解密
+    3. 脱敏字典的生命周期管理(自动清理)
+    4. 多版本备份支持
+    """
+
+    # Redis 键前缀
+    REDIS_KEY_PREFIX = "desensitize:dict:"
+    REDIS_TTL = 7 * 24 * 3600  # 7天过期
+
+    def __init__(self):
+        """初始化字典管理器"""
+        self.cfg = config_handler.get_section("desensitize")
+        self.storage_path = Path(self.cfg.get("DICT_STORAGE_PATH", "./data/desensitize_dicts"))
+        self.storage_path.mkdir(parents=True, exist_ok=True)
+
+        # 初始化加密(如果配置了密钥)
+        self.encryption_key = self._get_encryption_key()
+        self.cipher = None
+        if self.encryption_key and CRYPTO_AVAILABLE:
+            self.cipher = Fernet(self.encryption_key)
+        elif self.encryption_key and not CRYPTO_AVAILABLE:
+            logger.warning("[DictManager] cryptography 库未安装,加密功能不可用")
+
+    def _get_encryption_key(self) -> Optional[bytes]:
+        """获取加密密钥"""
+        key_str = self.cfg.get("DICT_ENCRYPTION_KEY", "")
+
+        # 如果配置了 KMS 引用或环境变量
+        if key_str.startswith("${") and key_str.endswith("}"):
+            env_var = key_str[2:-1]
+            import os
+            key_str = os.environ.get(env_var, "")
+
+        if not key_str:
+            # 使用默认密钥(仅开发环境)
+            logger.warning("[DictManager] 使用默认加密密钥,生产环境请配置安全密钥")
+            key_str = "LQAgentPlatform-Desensitize-Default-Key-32bytes"
+
+        # 生成 Fernet 密钥
+        if CRYPTO_AVAILABLE:
+            kdf = PBKDF2HMAC(
+                algorithm=hashes.SHA256(),
+                length=32,
+                salt=b"lqagentplatform_desensitize_salt",
+                iterations=100000,
+            )
+            key = base64.urlsafe_b64encode(kdf.derive(key_str.encode()))
+            return key
+        return None
+
+    async def save(self, task_id: str, mapping_dict: Dict[str, Any]) -> str:
+        """保存脱敏字典
+
+        Args:
+            task_id: 任务唯一标识
+            mapping_dict: 脱敏映射字典
+
+        Returns:
+            dict_hash: 字典哈希值(用于验证完整性)
+        """
+        # 1. 构造完整字典
+        full_dict = {
+            "version": "1.0",
+            "created_at": datetime.now().isoformat(),
+            "task_id": task_id,
+            "mappings": mapping_dict,
+        }
+
+        # 2. 计算哈希
+        dict_content = json.dumps(full_dict, sort_keys=True, ensure_ascii=False)
+        dict_hash = hashlib.sha256(dict_content.encode()).hexdigest()[:16]
+        full_dict["dict_hash"] = dict_hash
+
+        # 3. 加密(如果可用)
+        if self.cipher:
+            encrypted_content = self.cipher.encrypt(dict_content.encode())
+            storage_content = encrypted_content
+            storage_format = "encrypted"
+        else:
+            storage_content = dict_content.encode()
+            storage_format = "plain"
+
+        # 4. 本地文件存储
+        file_path = self.storage_path / f"{task_id}.dict"
+        try:
+            with open(file_path, "wb") as f:
+                f.write(storage_content)
+            logger.info(f"[DictManager] 脱敏字典已保存: {file_path}")
+        except Exception as e:
+            logger.error(f"[DictManager] 本地存储失败: {e}")
+            raise
+
+        # 5. Redis 缓存(便于快速访问)
+        try:
+            redis_conn = await RedisConnectionFactory.get_connection()
+            redis_key = f"{self.REDIS_KEY_PREFIX}{task_id}"
+            await redis_conn.setex(
+                redis_key,
+                self.REDIS_TTL,
+                json.dumps({
+                    "format": storage_format,
+                    "hash": dict_hash,
+                    "content": base64.b64encode(storage_content).decode() if isinstance(storage_content, bytes) else storage_content
+                })
+            )
+        except Exception as e:
+            logger.warning(f"[DictManager] Redis 缓存失败: {e}")
+
+        return dict_hash
+
+    async def load(self, task_id: str) -> Optional[Dict[str, Any]]:
+        """加载脱敏字典
+
+        Args:
+            task_id: 任务唯一标识
+
+        Returns:
+            脱敏字典,如果不存在则返回 None
+        """
+        # 1. 尝试从 Redis 加载
+        try:
+            redis_conn = await RedisConnectionFactory.get_connection()
+            redis_key = f"{self.REDIS_KEY_PREFIX}{task_id}"
+            cached = await redis_conn.get(redis_key)
+            if cached:
+                data = json.loads(cached)
+                storage_content = base64.b64decode(data["content"])
+
+                if data["format"] == "encrypted" and self.cipher:
+                    decrypted = self.cipher.decrypt(storage_content)
+                    return json.loads(decrypted.decode())
+                else:
+                    return json.loads(storage_content.decode())
+        except Exception as e:
+            logger.warning(f"[DictManager] Redis 加载失败: {e}")
+
+        # 2. 从本地文件加载
+        file_path = self.storage_path / f"{task_id}.dict"
+        if not file_path.exists():
+            logger.error(f"[DictManager] 字典文件不存在: {file_path}")
+            return None
+
+        try:
+            with open(file_path, "rb") as f:
+                storage_content = f.read()
+
+            # 解密
+            if self.cipher:
+                try:
+                    decrypted = self.cipher.decrypt(storage_content)
+                    return json.loads(decrypted.decode())
+                except Exception as e:
+                    logger.error(f"[DictManager] 解密失败: {e}")
+                    # 尝试作为明文解析
+                    return json.loads(storage_content.decode())
+            else:
+                return json.loads(storage_content.decode())
+
+        except Exception as e:
+            logger.error(f"[DictManager] 文件加载失败: {e}")
+            return None
+
+    async def get_mapping(self, task_id: str) -> Optional[Dict[str, Any]]:
+        """获取映射关系(简化接口)"""
+        full_dict = await self.load(task_id)
+        if full_dict:
+            return full_dict.get("mappings", {})
+        return None
+
+    async def delete(self, task_id: str) -> bool:
+        """删除脱敏字典"""
+        success = True
+
+        # 删除本地文件
+        file_path = self.storage_path / f"{task_id}.dict"
+        try:
+            if file_path.exists():
+                file_path.unlink()
+                logger.info(f"[DictManager] 本地字典已删除: {file_path}")
+        except Exception as e:
+            logger.error(f"[DictManager] 本地删除失败: {e}")
+            success = False
+
+        # 删除 Redis 缓存
+        try:
+            redis_conn = await RedisConnectionFactory.get_connection()
+            redis_key = f"{self.REDIS_KEY_PREFIX}{task_id}"
+            await redis_conn.delete(redis_key)
+        except Exception as e:
+            logger.warning(f"[DictManager] Redis 删除失败: {e}")
+
+        return success
+
+    async def exists(self, task_id: str) -> bool:
+        """检查字典是否存在"""
+        file_path = self.storage_path / f"{task_id}.dict"
+        return file_path.exists()
+
+    def get_dict_metadata(self, task_id: str) -> Optional[Dict[str, Any]]:
+        """获取字典元信息(不加载完整内容)"""
+        file_path = self.storage_path / f"{task_id}.dict"
+        if not file_path.exists():
+            return None
+
+        try:
+            stat = file_path.stat()
+            return {
+                "task_id": task_id,
+                "file_path": str(file_path),
+                "file_size": stat.st_size,
+                "modified_at": datetime.fromtimestamp(stat.st_mtime).isoformat(),
+                "exists": True
+            }
+        except Exception as e:
+            logger.error(f"[DictManager] 获取元信息失败: {e}")
+            return None
+
+    async def cleanup_expired(self, days: int = 7) -> int:
+        """清理过期字典
+
+        Args:
+            days: 过期天数
+
+        Returns:
+            清理的字典数量
+        """
+        import time
+        cutoff_time = time.time() - (days * 24 * 3600)
+        cleaned = 0
+
+        for dict_file in self.storage_path.glob("*.dict"):
+            try:
+                stat = dict_file.stat()
+                if stat.st_mtime < cutoff_time:
+                    dict_file.unlink()
+                    cleaned += 1
+                    logger.info(f"[DictManager] 清理过期字典: {dict_file.name}")
+            except Exception as e:
+                logger.warning(f"[DictManager] 清理失败 {dict_file}: {e}")
+
+        return cleaned
+
+    async def create_backup(self, task_id: str) -> Optional[str]:
+        """创建字典备份"""
+        file_path = self.storage_path / f"{task_id}.dict"
+        if not file_path.exists():
+            return None
+
+        backup_path = self.storage_path / f"{task_id}.dict.backup"
+        try:
+            import shutil
+            shutil.copy2(file_path, backup_path)
+            return str(backup_path)
+        except Exception as e:
+            logger.error(f"[DictManager] 备份失败: {e}")
+            return None

+ 293 - 0
core/construction_review/component/desensitize/engine.py

@@ -0,0 +1,293 @@
+"""
+脱敏引擎核心
+
+LocalDesensitizationEngine: 本地脱敏引擎主入口
+协调四维度处理器、模型客户端和字典管理器
+"""
+
+import re
+import logging
+from dataclasses import dataclass, field
+from typing import Optional, Dict, Any, List
+from pathlib import Path
+
+from .processors.pii_processor import PIIDesensitizer
+from .processors.geo_processor import GeoDesensitizer
+from .processors.biz_processor import BizDesensitizer
+from .processors.financial_processor import FinancialDesensitizer
+from .dict_manager import DictManager
+from .validator import BlackWhiteListChecker
+from .model_client import DesensitizeModelClient
+from foundation.infrastructure.config.config import config_handler
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class DesensitizedResult:
+    """脱敏处理结果"""
+    content: str  # 脱敏后的文本内容
+    task_id: str  # 任务ID
+    dict_hash: str  # 脱敏字典哈希
+    statistics: Dict[str, int] = field(default_factory=dict)  # 统计信息
+    is_valid: bool = True  # 是否通过黑白名单校验
+    violations: List[Dict] = field(default_factory=list)  # 违规项列表
+
+
+class EntityRegistry:
+    """实体注册表 - 跨维度追踪实体映射关系"""
+
+    def __init__(self):
+        self.persons: Dict[str, str] = {}  # 姓名 -> 角色占位符
+        self.entities: Dict[str, str] = {}  # 实体名 -> 序列化占位符
+        self.companies: Dict[str, str] = {}  # 公司名 -> 标准角色
+        self.coordinates: Dict[str, Any] = {  # 坐标映射
+            "base_point": None,  # 基准点(最小桩号)
+            "mappings": {}  # 原始 -> 相对
+        }
+        self.amounts: List[str] = []  # 金额列表
+
+        # 计数器用于生成序列号
+        self._person_counter = 0
+        self._entity_counter = 0
+
+    def register_person(self, name: str, role: str = "人员") -> str:
+        """注册人员,返回角色占位符"""
+        if name in self.persons:
+            return self.persons[name]
+
+        self._person_counter += 1
+        placeholder = f"[{role}{self._get_suffix(self._person_counter)}]"
+        self.persons[name] = placeholder
+        return placeholder
+
+    def register_entity(self, name: str, entity_type: str = "实体") -> str:
+        """注册工程实体(桥/隧/河等),返回序列化占位符"""
+        if name in self.entities:
+            return self.entities[name]
+
+        self._entity_counter += 1
+        type_map = {
+            "桥": "号特大桥",
+            "隧道": "号隧道",
+            "河": "号河流",
+            "涵洞": "号涵洞",
+            "立交": "号立交"
+        }
+        suffix = type_map.get(entity_type, f"号{entity_type}")
+        placeholder = f"[{self._entity_counter}{suffix}]"
+        self.entities[name] = placeholder
+        return placeholder
+
+    def register_company(self, name: str, company_type: str = "单位") -> str:
+        """注册企业/单位,返回标准角色词汇"""
+        if name in self.companies:
+            return self.companies[name]
+
+        type_map = {
+            "建设": "建设单位",
+            "总包": "总承包单位",
+            "监理": "监理单位",
+            "设计": "设计单位",
+            "劳务": "劳务分包单位",
+            "数字化": "数字化实施方"
+        }
+        placeholder = f"[{type_map.get(company_type, company_type)}]"
+        self.companies[name] = placeholder
+        return placeholder
+
+    def set_coordinate_base(self, base_point: str):
+        """设置坐标相对化基准点"""
+        self.coordinates["base_point"] = base_point
+        self.coordinates["relative_zero"] = "K0+000"
+
+    def register_coordinate(self, original: str, relative: str):
+        """注册坐标映射关系"""
+        self.coordinates["mappings"][original] = relative
+
+    def export_mapping(self) -> Dict[str, Any]:
+        """导出完整映射字典"""
+        return {
+            "version": "1.0",
+            "persons": [
+                {"original": k, "masked": v, "role": v[1:-1].rstrip("ABCDEFGHIJ")}
+                for k, v in self.persons.items()
+            ],
+            "entities": [
+                {"original": k, "masked": v}
+                for k, v in self.entities.items()
+            ],
+            "companies": [
+                {"original": k, "masked": v}
+                for k, v in self.companies.items()
+            ],
+            "coordinates": self.coordinates,
+            "amount_count": len(self.amounts)
+        }
+
+    def get_statistics(self) -> Dict[str, int]:
+        """获取统计信息"""
+        return {
+            "pii_count": len(self.persons) + len(self.amounts),
+            "geo_count": len(self.entities) + len(self.coordinates.get("mappings", {})),
+            "biz_count": len(self.companies),
+            "financial_count": len(self.amounts)
+        }
+
+    @staticmethod
+    def _get_suffix(n: int) -> str:
+        """获取序号后缀 A, B, C..."""
+        return chr(ord('A') + n - 1) if n <= 26 else f"{n}"
+
+
+class LocalDesensitizationEngine:
+    """本地脱敏引擎主入口
+
+    支持两阶段配置:
+    - DEPLOY_PHASE=test: 使用外部API测试(仅测试数据集)
+    - DEPLOY_PHASE=local: 使用本地模型(生产环境)
+
+    支持两方案切换:
+    - scheme_a: 规则引擎主导 + 模型语义兜底
+    - scheme_b: 模型全量语义驱动 + 规则后校验
+    """
+
+    def __init__(self):
+        self.cfg = config_handler.get_section("desensitize")
+        self.deploy_phase = self.cfg.get("DEPLOY_PHASE", "test")
+        self.scheme = self.cfg.get("DESENSITIZE_SCHEME", "scheme_a")
+        self.level = self.cfg.get("DEFAULT_LEVEL", "standard")
+
+        # 初始化处理器
+        self.pii_processor = PIIDesensitizer()
+        self.geo_processor = GeoDesensitizer()
+        self.biz_processor = BizDesensitizer()
+        self.financial_processor = FinancialDesensitizer()
+
+        # 初始化模型客户端(可选,scheme_b或scheme_a补充阶段使用)
+        self.model_client: Optional[DesensitizeModelClient] = None
+        if self._need_model():
+            self.model_client = DesensitizeModelClient()
+
+        # 初始化校验器和管理器
+        self.validator = BlackWhiteListChecker()
+        self.dict_manager = DictManager()
+
+        logger.info(f"[DesensitizeEngine] Initialized: phase={self.deploy_phase}, "
+                   f"scheme={self.scheme}, level={self.level}")
+
+    def _need_model(self) -> bool:
+        """判断当前配置是否需要调用大模型"""
+        if self.deploy_phase == "test":
+            return True  # 测试阶段必须调用模型
+        if self.scheme == "scheme_b":
+            return True  # 方案B需要模型全量推理
+        return False  # scheme_a 可能不需要(纯规则引擎)
+
+    async def process(self, content: str, task_id: str) -> DesensitizedResult:
+        """执行完整脱敏流程
+
+        Args:
+            content: 原始文档内容
+            task_id: 任务唯一标识
+
+        Returns:
+            DesensitizedResult: 脱敏结果
+        """
+        logger.info(f"[DesensitizeEngine] Processing task {task_id}, "
+                   f"content_length={len(content)}")
+
+        # 创建实体注册表(跨维度共享)
+        registry = EntityRegistry()
+
+        try:
+            if self.scheme == "scheme_a":
+                result_content = await self._process_scheme_a(content, registry)
+            else:  # scheme_b
+                result_content = await self._process_scheme_b(content, registry)
+
+            # 黑白名单最终校验(强制,不可跳过)
+            validation = self.validator.validate(result_content)
+
+            if not validation.is_valid:
+                logger.error(f"[DesensitizeEngine] Validation failed for {task_id}: "
+                           f"{len(validation.violations)} violations")
+                # 根据配置决定是阻断还是仅告警
+                if self.cfg.get("VALIDATION_FAILURE_ACTION", "block") == "block":
+                    return DesensitizedResult(
+                        content=result_content,
+                        task_id=task_id,
+                        dict_hash="",
+                        statistics=registry.get_statistics(),
+                        is_valid=False,
+                        violations=validation.violations
+                    )
+
+            # 保存脱敏字典
+            mapping_dict = registry.export_mapping()
+            dict_hash = await self.dict_manager.save(task_id, mapping_dict)
+
+            return DesensitizedResult(
+                content=result_content,
+                task_id=task_id,
+                dict_hash=dict_hash,
+                statistics=registry.get_statistics(),
+                is_valid=True,
+                violations=[]
+            )
+
+        except Exception as e:
+            logger.exception(f"[DesensitizeEngine] Processing failed for {task_id}")
+            raise
+
+    async def _process_scheme_a(self, content: str, registry: EntityRegistry) -> str:
+        """方案A:规则引擎主导 + 模型语义兜底"""
+        # Step 1: 规则引擎全量扫描
+        content = self.pii_processor.process(content, registry)
+        content = self.geo_processor.process(content, registry)
+        content = self.biz_processor.process(content, registry)
+        content = self.financial_processor.process(content, registry)
+
+        # Step 2: 模型补充识别(如果配置了模型客户端)
+        if self.model_client and self.model_client.selective:
+            # 提取低置信度片段(这里简化处理,实际可基于规则置信度)
+            low_confidence_segments = self._extract_uncertain_segments(content)
+            if low_confidence_segments:
+                logger.debug(f"[SchemeA] Model supplement for {len(low_confidence_segments)} segments")
+                # 可选:调用模型对低置信度片段做补充识别
+                # content = await self.model_client.desensitize_text(content, self.level)
+
+        return content
+
+    async def _process_scheme_b(self, content: str, registry: EntityRegistry) -> str:
+        """方案B:模型全量语义驱动 + 规则后校验"""
+        if not self.model_client:
+            raise RuntimeError("Scheme B requires model client but none available")
+
+        # Step 1: 模型全量语义脱敏
+        content = await self.model_client.desensitize_text(content, self.level)
+
+        # Step 2: 规则引擎补充校验(精确格式兜底)
+        # 确保结构化字段(如精确桩号、手机号格式)完全脱敏
+        content = self.pii_processor.process(content, registry, strict_mode=True)
+        content = self.geo_processor.process(content, registry, strict_mode=True)
+
+        return content
+
+    def _extract_uncertain_segments(self, content: str) -> List[str]:
+        """提取可能未脱敏的低置信度片段(用于模型补充识别)"""
+        # 简化实现:检查是否还有潜在的敏感模式残留
+        uncertain = []
+
+        # 检查是否有可能是人名但未被处理的片段(如"负责人:XXX"后的XXX)
+        name_patterns = [
+            r'负责人[::]\s*([\u4e00-\u9fa5]{2,4})',
+            r'项目经理[::]\s*([\u4e00-\u9fa5]{2,4})',
+            r'安全员[::]\s*([\u4e00-\u9fa5]{2,4})',
+        ]
+
+        for pattern in name_patterns:
+            matches = re.findall(pattern, content)
+            uncertain.extend(matches)
+
+        return uncertain

+ 267 - 0
core/construction_review/component/desensitize/model_client.py

@@ -0,0 +1,267 @@
+"""
+本地大模型脱敏客户端
+
+支持两阶段配置:
+- 阶段一(测试):使用外部API测试候选模型
+- 阶段二(生产):使用本地部署模型(Qwen3-8B/30B)
+
+根据 wlast.md 文档第3.2节和8.3节设计
+"""
+
+import re
+import logging
+from typing import Optional, Dict, Any, List
+from openai import AsyncOpenAI
+
+from foundation.infrastructure.config.config import config_handler
+
+logger = logging.getLogger(__name__)
+
+
+class DesensitizeModelClient:
+    """本地大模型脱敏客户端(仅限内网 Qwen3-8B / Qwen3-30B)
+
+    ⚠️ 安全约束:
+      - 生产阶段 SERVER_URL 必须为内网地址(192.168.x.x / 10.x.x.x / 172.x.x.x)
+      - 禁止配置任何外部公网大模型 API 地址
+      - 脱敏处理必须在调用外部审查 API 之前完成
+
+    阶段配置:
+      - DEPLOY_PHASE=test: 使用外部API测试(仅测试数据集)
+      - DEPLOY_PHASE=local: 使用本地部署模型(生产环境)
+    """
+
+    # 脱敏 Prompt 模板(根据 wlast.md 文档附录B)
+    DESENSITIZE_PROMPT_TEMPLATE = """你是专业的施工方案数据脱敏专家,请对以下内容进行{level}级别脱敏:
+
+【脱敏规则】
+1. PII(个人身份信息):
+   - 姓名 → [项目经理A]、[安全员B] 等角色占位符
+   - 手机号 → [手机号脱敏]
+   - 身份证号 → [证件号脱敏]
+   - 证书编号 → [证件号脱敏]
+   - 邮箱 → [邮箱脱敏]
+
+2. 地理坐标(工程相对化):
+   - 绝对桩号(如D1K86+279.91)→ 转换为相对桩号(K0+000为起点)
+   - 桥隧名称(如映雪特大桥)→ [1号特大桥] 等序列化命名
+   - 高程数据(海拔1850.5m)→ [中山区] 等地形描述
+   - 行政地名(四川省成都市)→ [某地区]
+
+3. 企业标识:
+   - 公司名称 → [建设单位]、[总承包单位]、[监理单位] 等标准角色
+   - 项目名称 → [某工程项目]
+   - 合同编号 → [合同编号脱敏]
+
+4. 财务数据:
+   - 金额(¥1,250,000.00)→ [金额脱敏]元
+   - 设备租赁单价 → [单价脱敏]/单位时间
+   - 材料单价 → [单价脱敏]/单位
+   - 工程量数值 → [数量脱敏]+单位
+
+【要求】
+- 保持原文结构、段落格式和工程逻辑
+- 保留技术参数(如混凝土标号、钢筋规格)
+- 保留工艺工序描述
+- 保留安全规范条款
+- 保留组织架构关系(仅替换名称为占位符)
+
+请直接返回脱敏后的完整文本:"""
+
+    def __init__(self):
+        """初始化模型客户端"""
+        self._load_config()
+        self.client = AsyncOpenAI(
+            base_url=self.server_url,
+            api_key=self.api_key
+        )
+
+        logger.info(f"[DesensitizeModelClient] 初始化完成: "
+                   f"phase={self.deploy_phase}, model={self.model}, "
+                   f"scheme={self.scheme if self.deploy_phase == 'local' else 'N/A'}")
+
+    def _load_config(self):
+        """加载配置"""
+        base_cfg = config_handler.get_section("desensitize")
+        self.deploy_phase = base_cfg.get("DEPLOY_PHASE", "test")
+
+        if self.deploy_phase == "test":
+            # 阶段一:外部 API 测试验证
+            self._load_test_config()
+        else:
+            # 阶段二:本地部署生产模式
+            self._load_local_config()
+
+    def _load_test_config(self):
+        """加载测试阶段配置"""
+        test_cfg = config_handler.get_section("desensitize_model_test")
+        test_model = test_cfg.get("TEST_MODEL", "gemma3_12b")
+        cfg = config_handler.get_section(f"desensitize_model_test_{test_model}")
+
+        self.server_url = cfg.get("SERVER_URL", "")
+        self.api_key = cfg.get("API_KEY", "dummy")
+        self.model = cfg.get("MODEL_NAME", "")
+        self.max_tokens = int(cfg.get("MAX_TOKENS", "4096"))
+        self.temperature = float(cfg.get("TEMPERATURE", "0.1"))
+
+        # 测试阶段特有配置
+        self.selective = False
+        self.confidence_threshold = 0.0
+        self.scheme = "test"
+
+        # 打印警告
+        logger.warning(
+            f"[脱敏] 当前为测试阶段(DEPLOY_PHASE=test),"
+            f"使用外部模型 {self.model},请勿传入真实生产文档"
+        )
+
+        # 检查 API KEY 是否为环境变量引用
+        if self.api_key.startswith("${") and self.api_key.endswith("}"):
+            env_var = self.api_key[2:-1]
+            import os
+            self.api_key = os.environ.get(env_var, "")
+
+    def _load_local_config(self):
+        """加载本地生产配置"""
+        base_cfg = config_handler.get_section("desensitize")
+        self.scheme = base_cfg.get("DESENSITIZE_SCHEME", "scheme_a")
+
+        cfg = config_handler.get_section(f"desensitize_model_{self.scheme}")
+        self.server_url = cfg.get("SERVER_URL", "")
+        self.api_key = cfg.get("API_KEY", "dummy")
+        self.model = cfg.get("MODEL_NAME", "")
+        self.max_tokens = int(cfg.get("MAX_TOKENS", "2048"))
+        self.temperature = float(cfg.get("TEMPERATURE", "0.1"))
+
+        # 本地特有配置
+        self.selective = cfg.get("SELECTIVE_INFERENCE", "false").lower() == "true"
+        self.confidence_threshold = float(cfg.get("CONFIDENCE_THRESHOLD", "0.7"))
+        self.max_input_length = int(cfg.get("MAX_INPUT_LENGTH", "8192"))
+
+        # 生产阶段:强制校验必须为内网地址
+        if not self._is_internal_ip(self.server_url):
+            raise ValueError(
+                f"[脱敏] 生产阶段只允许内网地址,拒绝外网地址: {self.server_url}\n"
+                f"请将 DEPLOY_PHASE 改为 test,或将 SERVER_URL 改为内网地址"
+            )
+
+    def _is_internal_ip(self, url: str) -> bool:
+        """检查是否为内网地址"""
+        internal_prefixes = [
+            "http://192.168.",
+            "http://10.",
+            "http://172.16.",
+            "http://172.17.",
+            "http://172.18.",
+            "http://172.19.",
+            "http://172.20.",
+            "http://172.21.",
+            "http://172.22.",
+            "http://172.23.",
+            "http://172.24.",
+            "http://172.25.",
+            "http://172.26.",
+            "http://172.27.",
+            "http://172.28.",
+            "http://172.29.",
+            "http://172.30.",
+            "http://172.31.",
+            "https://192.168.",
+            "https://10.",
+        ]
+        return any(url.startswith(p) for p in internal_prefixes)
+
+    async def desensitize_text(self, text: str, level: str = "standard") -> str:
+        """使用大模型进行智能脱敏
+
+        Args:
+            text: 待脱敏文本
+            level: 脱敏级别 (minimal/standard/strict)
+
+        Returns:
+            脱敏后的文本
+        """
+        # 截断过长的输入
+        if len(text) > self.max_input_length:
+            logger.warning(f"[DesensitizeModelClient] 输入文本过长({len(text)}), 截断至{self.max_input_length}")
+            text = text[:self.max_input_length]
+
+        system_prompt = self.DESENSITIZE_PROMPT_TEMPLATE.format(level=level)
+
+        try:
+            response = await self.client.chat.completions.create(
+                model=self.model,
+                messages=[
+                    {"role": "system", "content": system_prompt},
+                    {"role": "user", "content": text}
+                ],
+                temperature=self.temperature,
+                max_tokens=self.max_tokens
+            )
+
+            result = response.choices[0].message.content
+
+            # 清理可能的代码块标记
+            result = self._clean_code_blocks(result)
+
+            logger.debug(f"[DesensitizeModelClient] 脱敏完成: {len(text)} -> {len(result)} chars")
+            return result
+
+        except Exception as e:
+            logger.error(f"[DesensitizeModelClient] 模型调用失败: {e}")
+            raise
+
+    async def desensitize_chunks(self, chunks: List[str], level: str = "standard") -> List[str]:
+        """批量脱敏多个文本块
+
+        Args:
+            chunks: 文本块列表
+            level: 脱敏级别
+
+        Returns:
+            脱敏后的文本块列表
+        """
+        results = []
+        for i, chunk in enumerate(chunks):
+            try:
+                desensitized = await self.desensitize_text(chunk, level)
+                results.append(desensitized)
+                logger.debug(f"[DesensitizeModelClient] 块{i+1}/{len(chunks)}脱敏完成")
+            except Exception as e:
+                logger.error(f"[DesensitizeModelClient] 块{i+1}脱敏失败: {e}")
+                # 失败时保留原文(或可根据策略阻断)
+                results.append(chunk)
+        return results
+
+    def _clean_code_blocks(self, text: str) -> str:
+        """清理代码块标记"""
+        # 移除 markdown 代码块标记
+        text = re.sub(r'^```\w*\n?', '', text)
+        text = re.sub(r'\n?```$', '', text)
+        return text.strip()
+
+    async def check_confidence(self, text: str) -> Dict[str, Any]:
+        """检查文本脱敏置信度(用于方案A的选择性推理)
+
+        返回置信度分数和可能需要人工检查的片段
+        """
+        # 简单启发式检查
+        risk_patterns = [
+            (r'\b1[3-9]\d{9}\b', 'phone'),
+            (r'\b\d{17}[\dXx]\b', 'id_card'),
+            (r'[\u4e00-\u9fa5]{2,4}(?:集团|有限公司|局)', 'company'),
+        ]
+
+        risks = []
+        for pattern, risk_type in risk_patterns:
+            if re.search(pattern, text):
+                risks.append(risk_type)
+
+        confidence = 1.0 - (len(risks) * 0.3)  # 简单线性计算
+        confidence = max(0.0, min(1.0, confidence))
+
+        return {
+            "confidence": confidence,
+            "risks": risks,
+            "needs_review": confidence < self.confidence_threshold
+        }

+ 39 - 0
core/construction_review/component/desensitize/processors/base_processor.py

@@ -0,0 +1,39 @@
+"""
+脱敏处理器基类
+"""
+
+import re
+from abc import ABC, abstractmethod
+from typing import Any, Dict
+
+
+class BaseDesensitizer(ABC):
+    """脱敏处理器基类"""
+
+    def __init__(self):
+        self.patterns: Dict[str, re.Pattern] = {}
+        self._compile_patterns()
+
+    @abstractmethod
+    def _compile_patterns(self):
+        """编译正则表达式模式"""
+        pass
+
+    @abstractmethod
+    def process(self, content: str, registry: Any, strict_mode: bool = False) -> str:
+        """处理内容并返回脱敏后的文本
+
+        Args:
+            content: 输入文本
+            registry: 实体注册表
+            strict_mode: 严格模式(用于方案B的后校验阶段)
+
+        Returns:
+            脱敏后的文本
+        """
+        pass
+
+    def _replace_with_placeholder(self, content: str, pattern: re.Pattern,
+                                   placeholder: str, registry: Any = None) -> str:
+        """通用替换方法"""
+        return pattern.sub(placeholder, content)

+ 155 - 0
core/construction_review/component/desensitize/processors/biz_processor.py

@@ -0,0 +1,155 @@
+"""
+商业标识脱敏处理器
+
+处理对象:项目名称、建设单位、总承包单位、监理单位、劳务分包单位等
+执行策略:彻底泛化,替换为标准工程角色词汇
+"""
+
+import re
+from typing import Any
+from .base_processor import BaseDesensitizer
+
+
+class BizDesensitizer(BaseDesensitizer):
+    """商业标识脱敏处理器"""
+
+    def _compile_patterns(self):
+        """编译商业标识相关正则表达式"""
+        # 项目标识(如"都四山地轨道交通项目DSZH标")
+        self.patterns["project"] = re.compile(
+            r'([\u4e00-\u9fa5A-Z0-9\-]{5,30}(?:项目|标段|工程))'
+        )
+
+        # 建设单位(含集团、局、有限公司等)
+        self.patterns["constructor"] = re.compile(
+            r'([\u4e00-\u9fa5]{2,6}(?:集团|局|有限公司))'
+        )
+
+        # 总承包单位
+        self.patterns["contractor"] = re.compile(
+            r'(总承包单位[::]?\s*[\u4e00-\u9fa5]{2,10}(?:集团|局|公司)?)'
+        )
+
+        # 监理单位
+        self.patterns["supervisor"] = re.compile(
+            r'([\u4e00-\u9fa5]{2,8}(?:监理|咨询)(?:公司|单位))'
+        )
+
+        # 设计单位
+        self.patterns["designer"] = re.compile(
+            r'([\u4e00-\u9fa5]{2,8}(?:设计|勘察)(?:院|所|公司))'
+        )
+
+        # 劳务分包单位
+        self.patterns["subcontractor"] = re.compile(
+            r'([\u4e00-\u9fa5]{2,10}(?:劳务|分包)(?:公司|单位))'
+        )
+
+        # 数字化/科技公司
+        self.patterns["tech_company"] = re.compile(
+            r'([\u4e00-\u9fa5]{2,8}(?:数字|科技|信息|软件)(?:公司|科技))'
+        )
+
+        # 合同编号
+        self.patterns["contract_no"] = re.compile(
+            r'(合同编号?[::]?\s*[A-Z0-9\-]{5,20})'
+        )
+
+        # 统一社会信用代码/组织机构代码
+        self.patterns["credit_code"] = re.compile(
+            r'([A-Z0-9]{18})'  # 18位统一社会信用代码
+        )
+
+    def process(self, content: str, registry: Any, strict_mode: bool = False) -> str:
+        """执行商业标识脱敏"""
+        # 1. 总承包单位泛化(必须在项目标识之前处理,防止"工程"被项目模式破坏)
+        def replace_contractor(match):
+            return "[总承包单位]"
+
+        content = self.patterns["contractor"].sub(replace_contractor, content)
+
+        # 补充匹配:行首或换行后的"总承包单位"(没有冒号的情况)
+        content = re.sub(
+            r'([,。;\n]|^)(总承包单位)[::\s]*([\u4e00-\u9fa5]{2,10}(?:集团|局|公司))',
+            r'\1[\2]',
+            content
+        )
+
+        # 2. 项目标识泛化(必须在总承包单位之后处理)
+        def replace_project(match):
+            original = match.group(1)
+            placeholder = "[某工程项目]"
+            registry.register_company(original, "项目")
+            return placeholder
+
+        content = self.patterns["project"].sub(replace_project, content)
+
+        # 3. 建设单位泛化
+        def replace_constructor(match):
+            original = match.group(1)
+            placeholder = registry.register_company(original, "建设")
+            return placeholder
+
+        content = self.patterns["constructor"].sub(replace_constructor, content)
+
+        # 4. 监理单位泛化
+        def replace_supervisor(match):
+            original = match.group(1)
+            placeholder = registry.register_company(original, "监理")
+            return placeholder
+
+        content = self.patterns["supervisor"].sub(replace_supervisor, content)
+
+        # 5. 设计单位泛化
+        def replace_designer(match):
+            original = match.group(1)
+            placeholder = registry.register_company(original, "设计")
+            return placeholder
+
+        content = self.patterns["designer"].sub(replace_designer, content)
+
+        # 6. 劳务分包单位泛化
+        def replace_subcontractor(match):
+            original = match.group(1)
+            placeholder = registry.register_company(original, "劳务")
+            return placeholder
+
+        content = self.patterns["subcontractor"].sub(replace_subcontractor, content)
+
+        # 7. 数字化/科技公司泛化
+        def replace_tech(match):
+            original = match.group(1)
+            placeholder = registry.register_company(original, "数字化")
+            return placeholder
+
+        content = self.patterns["tech_company"].sub(replace_tech, content)
+
+        # 8. 合同编号掩码
+        content = self.patterns["contract_no"].sub("[合同编号脱敏]", content)
+
+        # 9. 统一社会信用代码掩码
+        content = self.patterns["credit_code"].sub("[信用代码脱敏]", content)
+
+        return content
+
+    def extract_companies(self, content: str) -> list:
+        """提取所有企业/单位名称"""
+        companies = []
+
+        patterns = [
+            ("project", self.patterns["project"]),
+            ("constructor", self.patterns["constructor"]),
+            ("supervisor", self.patterns["supervisor"]),
+            ("designer", self.patterns["designer"]),
+            ("subcontractor", self.patterns["subcontractor"]),
+        ]
+
+        for company_type, pattern in patterns:
+            for match in pattern.finditer(content):
+                companies.append({
+                    "name": match.group(1),
+                    "type": company_type,
+                    "position": match.span()
+                })
+
+        return companies

+ 138 - 0
core/construction_review/component/desensitize/processors/financial_processor.py

@@ -0,0 +1,138 @@
+"""
+财务与成本敏感数据脱敏处理器
+
+处理对象:金额、设备租赁单价、材料单价、工程量数值等
+执行策略:全量数值掩码,保留表格科目结构
+"""
+
+import re
+from typing import Any
+from .base_processor import BaseDesensitizer
+
+
+class FinancialDesensitizer(BaseDesensitizer):
+    """财务数据脱敏处理器"""
+
+    def _compile_patterns(self):
+        """编译财务数据相关正则表达式"""
+        # 设备租赁单价(必须在通用金额之前匹配,防止金额模式先匹配)
+        self.patterns["equipment_price"] = re.compile(
+            r'(塔吊|起重机|挖掘机|装载机|压路机|混凝土搅拌站|钢筋加工设备)[\s\u4e00-\u9fa5]*'
+            r'\d{1,6}(?:\.\d{1,2})?\s*(?:元|万元)?\s*/\s*(?:月|天|台班|小时)'
+        )
+
+        # 材料单价
+        self.patterns["material_price"] = re.compile(
+            r'(钢筋|混凝土|水泥|砂石|模板|脚手架|防水材料|保温材料)[\s\u4e00-\u9fa5]*'
+            r'\d{1,5}(?:\.\d{1,2})?\s*(?:元|万元)?\s*/\s*(?:吨|立方米|平米|米|套)'
+        )
+
+        # 货币金额(含人民币符号、千分位)
+        # 匹配:¥1,250,000.00、¥ 50000、50000元、50,000.00元
+        self.patterns["amount_rmb"] = re.compile(
+            r'¥\s*\d{1,3}(?:,\d{3})*(?:\.\d{1,2})?'
+        )
+        # 金额后带单位,添加负向后顾 (?<!\d) 确保前面不是数字,避免部分匹配
+        self.patterns["amount_yuan"] = re.compile(
+            r'(?<!\d)\d{1,3}(?:,\d{3})*(?:\.\d{1,2})?\s*(?:元|万元|亿元)'
+        )
+
+        # 工程量数值(保留单位,掩码数值)
+        self.patterns["quantity"] = re.compile(
+            r'(\d{1,6}(?:\.\d{1,2})?)\s*(立方米|m³|平米|㎡|米|m|吨|t|个|套|根)'
+        )
+
+        # 预算/费用合计
+        self.patterns["total"] = re.compile(
+            r'(总计|合计|总价|预算金额)[::\s]*'
+            r'[¥]?\s*\d{1,3}(?:,\d{3})*(?:\.\d{1,2})?'
+        )
+
+        # 纯数字大金额(5位以上数字,可能是金额)
+        self.patterns["large_number"] = re.compile(
+            r'\b(\d{5,}(?:,\d{3})*(?:\.\d{1,2})?)\b'
+        )
+
+        # 百分比(通常与费用比例相关,保留)
+        self.patterns["percentage"] = re.compile(
+            r'(\d{1,3}(?:\.\d{1,2})?)\s*%'
+        )
+
+    def process(self, content: str, registry: Any, strict_mode: bool = False) -> str:
+        """执行财务数据脱敏
+
+        保留表格的表头、项目分类及科目结构
+        仅掩码具体数值,确保AI能审查"是否编制了资金投入计划"
+        """
+        # 1. 设备租赁单价掩码(保留设备类型和单位)- 必须在通用金额之前
+        def replace_equipment_price(match):
+            equipment = match.group(1)
+            registry.amounts.append(match.group(0))  # 记录金额用于统计
+            return f"{equipment} [单价脱敏]/单位时间"
+
+        content = self.patterns["equipment_price"].sub(replace_equipment_price, content)
+
+        # 2. 材料单价掩码(保留材料名称和单位)- 必须在通用金额之前
+        def replace_material_price(match):
+            material = match.group(1)
+            registry.amounts.append(match.group(0))  # 记录金额用于统计
+            return f"{material} [单价脱敏]/单位"
+
+        content = self.patterns["material_price"].sub(replace_material_price, content)
+
+        # 3. 人民币金额掩码
+        for match in self.patterns["amount_rmb"].finditer(content):
+            registry.amounts.append(match.group(0))  # 记录金额用于统计
+        content = self.patterns["amount_rmb"].sub("¥[金额脱敏]", content)
+
+        # 4. 元单位金额掩码
+        def replace_amount_yuan(match):
+            text = match.group(0)
+            registry.amounts.append(text)  # 记录金额用于统计
+            # 保留单位(元/万元/亿元)
+            if "万元" in text:
+                return "[金额脱敏]万元"
+            elif "亿元" in text:
+                return "[金额脱敏]亿元"
+            else:
+                return "[金额脱敏]元"
+
+        content = self.patterns["amount_yuan"].sub(replace_amount_yuan, content)
+
+        # 5. 工程量数值掩码(保留单位)
+        def replace_quantity(match):
+            unit = match.group(2)
+            registry.amounts.append(match.group(1))  # 记录数值用于统计
+            return f"[数量脱敏]{unit}"
+
+        content = self.patterns["quantity"].sub(replace_quantity, content)
+
+        # 6. 预算合计掩码
+        for match in self.patterns["total"].finditer(content):
+            registry.amounts.append(match.group(0))  # 记录金额用于统计
+        content = self.patterns["total"].sub(r"\1:¥[金额脱敏]", content)
+
+        # 7. 严格模式下,大金额数字也掩码
+        if strict_mode:
+            for match in self.patterns["large_number"].finditer(content):
+                registry.amounts.append(match.group(0))  # 记录金额用于统计
+            content = self.patterns["large_number"].sub("[数值脱敏]", content)
+
+        # 8. 百分比保留(通常是费率比例,不涉及具体金额)
+        # content = self.patterns["percentage"].sub(r"[比例脱敏]%", content)
+
+        return content
+
+    def extract_amounts(self, content: str) -> list:
+        """提取所有金额数值(用于统计)"""
+        amounts = []
+
+        for pattern_name in ["amount_rmb", "amount_yuan"]:
+            for match in self.patterns[pattern_name].finditer(content):
+                amounts.append({
+                    "value": match.group(0),
+                    "type": pattern_name,
+                    "position": match.span()
+                })
+
+        return amounts

+ 169 - 0
core/construction_review/component/desensitize/processors/geo_processor.py

@@ -0,0 +1,169 @@
+"""
+地理坐标与基建数据脱敏处理器
+
+处理对象:桩号、高程、桥隧名称、地质数据、行政地名等
+执行策略:相对化处理(隐去绝对位置,保留物理逻辑)
+"""
+
+import re
+from typing import Any, Tuple, Optional
+from .base_processor import BaseDesensitizer
+
+
+class GeoDesensitizer(BaseDesensitizer):
+    """地理坐标脱敏处理器"""
+
+    def _compile_patterns(self):
+        """编译地理坐标相关正则表达式"""
+        # 绝对桩号格式:D1K86+279.91, K0+500, DK15+230.5 等
+        self.patterns["coordinate"] = re.compile(
+            r'([A-Z]?\d?[Kk])(\d{2,5})\+(\d{2,6}(?:\.\d+)?)'
+        )
+
+        # 实体名称(桥、隧道、涵洞、河流等)
+        self.patterns["bridge"] = re.compile(
+            r'([\u4e00-\u9fa5]{2,8}(?:特大桥|大桥))'
+        )
+        self.patterns["tunnel"] = re.compile(
+            r'([\u4e00-\u9fa5]{2,8}隧道)'
+        )
+        self.patterns["culvert"] = re.compile(
+            r'([\u4e00-\u9fa5]{2,6}涵洞)'
+        )
+        self.patterns["river"] = re.compile(
+            r'([\u4e00-\u9fa5]{2,6}(?:河|江))'
+        )
+
+        # 高程数据
+        self.patterns["elevation"] = re.compile(
+            r'海拔\s*(\d{3,4}(?:\.\d+)?)\s*[m米]'
+        )
+
+        # 地质描述(保留等级,泛化描述)
+        self.patterns["geology"] = re.compile(
+            r'(V|IV|III|II|I)[级阶]围岩'
+        )
+
+        # 行政地名(省市区县)
+        self.patterns["location"] = re.compile(
+            r'(四川省|成都市|北京市|上海市|重庆市|天津市|[\u4e00-\u9fa5]{2,6}(?:省|市|区|县))'
+        )
+
+    def process(self, content: str, registry: Any, strict_mode: bool = False) -> str:
+        """执行地理坐标脱敏"""
+        # 1. 计算坐标基准点(最小桩号)
+        base_point = self._find_base_point(content)
+        if base_point:
+            registry.set_coordinate_base(self._format_coordinate(base_point))
+
+        # 2. 桩号相对化处理
+        def replace_coordinate(match):
+            prefix = match.group(1)  # 如 D1K, K
+            km = int(match.group(2))  # 公里数
+            meter = match.group(3)  # 米数
+
+            original = match.group(0)
+
+            if base_point:
+                # 计算相对桩号
+                relative_km, relative_meter = self._calculate_relative(
+                    (prefix, km, float(meter) if '.' in meter else int(meter)),
+                    base_point
+                )
+                relative = f"K{relative_km}+{int(relative_meter):03d}"
+                registry.register_coordinate(original, relative)
+                return relative
+            else:
+                # 无法确定基准,泛化处理
+                registry.register_coordinate(original, f"[桩号脱敏]")
+                return "[桩号脱敏]"
+
+        content = self.patterns["coordinate"].sub(replace_coordinate, content)
+
+        # 3. 工程实体序列化处理
+        content = self.patterns["bridge"].sub(
+            lambda m: registry.register_entity(m.group(1), "桥"), content
+        )
+        content = self.patterns["tunnel"].sub(
+            lambda m: registry.register_entity(m.group(1), "隧道"), content
+        )
+        content = self.patterns["culvert"].sub(
+            lambda m: registry.register_entity(m.group(1), "涵洞"), content
+        )
+        content = self.patterns["river"].sub(
+            lambda m: registry.register_entity(m.group(1), "河"), content
+        )
+
+        # 4. 高程数据泛化
+        def replace_elevation(match):
+            elevation = float(match.group(1))
+            # 根据海拔范围泛化地形描述
+            if elevation > 3000:
+                return "[高原高山区]"
+            elif elevation > 1500:
+                return "[中山区]"
+            elif elevation > 500:
+                return "[低山丘陵区]"
+            else:
+                return "[平原区]"
+
+        content = self.patterns["elevation"].sub(replace_elevation, content)
+
+        # 5. 地质等级保留(V级围岩等)
+        # 地质等级属于技术参数,保留但泛化描述
+        def replace_geology(match):
+            level = match.group(1)
+            return f"[{level}级围岩区]"
+
+        if strict_mode:
+            content = self.patterns["geology"].sub(replace_geology, content)
+
+        # 6. 行政地名泛化
+        content = self.patterns["location"].sub("[某地区]", content)
+
+        return content
+
+    def _find_base_point(self, content: str) -> Optional[Tuple[str, int, float]]:
+        """从文本中提取最小桩号作为相对化基准点"""
+        coordinates = []
+
+        for match in self.patterns["coordinate"].finditer(content):
+            prefix = match.group(1)
+            km = int(match.group(2))
+            meter_str = match.group(3)
+            meter = float(meter_str) if '.' in meter_str else int(meter_str)
+            coordinates.append((prefix, km, meter))
+
+        if not coordinates:
+            return None
+
+        # 按公里数+米数排序,取最小值
+        coordinates.sort(key=lambda x: (x[1], x[2]))
+        return coordinates[0]
+
+    def _format_coordinate(self, coord: Tuple[str, int, float]) -> str:
+        """格式化坐标"""
+        prefix, km, meter = coord
+        if isinstance(meter, float):
+            return f"{prefix}{km}+{meter:.2f}"
+        return f"{prefix}{km}+{meter:03d}"
+
+    def _calculate_relative(self, coord: Tuple[str, int, float],
+                           base: Tuple[str, int, float]) -> Tuple[int, float]:
+        """计算相对桩号
+
+        Args:
+            coord: 当前坐标 (prefix, km, meter)
+            base: 基准坐标 (prefix, km, meter)
+
+        Returns:
+            (相对公里数, 相对米数)
+        """
+        coord_total_meters = coord[1] * 1000 + coord[2]
+        base_total_meters = base[1] * 1000 + base[2]
+
+        relative_meters = coord_total_meters - base_total_meters
+        relative_km = int(relative_meters // 1000)
+        relative_meter = int(relative_meters % 1000)
+
+        return relative_km, relative_meter

+ 117 - 0
core/construction_review/component/desensitize/processors/pii_processor.py

@@ -0,0 +1,117 @@
+"""
+PII (Personally Identifiable Information) 脱敏处理器
+
+处理对象:姓名、手机号、身份证号、证书编号、邮箱等个人身份信息
+"""
+
+import re
+from typing import Any, Optional
+from .base_processor import BaseDesensitizer
+
+
+class PIIDesensitizer(BaseDesensitizer):
+    """PII 脱敏处理器"""
+
+    def _compile_patterns(self):
+        """编译 PII 相关正则表达式"""
+        # 手机号:1[3-9]开头的11位数字
+        self.patterns["phone"] = re.compile(r'\b1[3-9]\d{9}\b')
+
+        # 身份证号:18位(15位已淘汰,暂不处理)
+        self.patterns["id_card"] = re.compile(r'\b\d{17}[\dXx]\b')
+
+        # 邮箱
+        self.patterns["email"] = re.compile(
+            r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
+        )
+
+        # 证书编号(特种作业资格证等):省份简称+字母+数字
+        # 如:京A202401001
+        self.patterns["cert"] = re.compile(
+            r'[京津沪渝冀豫云辽黑湘皖鲁新苏浙赣鄂桂甘晋蒙陕吉闽贵粤川青藏琼宁][A-Z]\d{6,12}'
+        )
+
+        # 人名识别(中文姓名,2-4个字,位于特定上下文)
+        # 如"项目经理张三"、"负责人:李四"
+        self.patterns["person_with_role"] = re.compile(
+            r'(项目经理|负责人|安全员|技术负责人|班组长|施工员|质检员)[::\s]*([\u4e00-\u9fa5]{2,4})'
+        )
+
+        # 独立人名(更宽松,结合上下文判断)
+        # 位于句首或特定连接词后
+        self.patterns["person_standalone"] = re.compile(
+            r'(?:^|[,。;\n])([\u4e00-\u9fa5]{2,3})(?:负责|主持|编制|审核|审批)'
+        )
+
+        # 审批签字行:姓名 + 日期
+        self.patterns["signature"] = re.compile(
+            r'([\u4e00-\u9fa5]{2,4})\s*20\d{2}[年./]\d{1,2}[月./]\d{1,2}[日]?'
+        )
+
+    def process(self, content: str, registry: Any, strict_mode: bool = False) -> str:
+        """执行 PII 脱敏
+
+        Args:
+            content: 输入文本
+            registry: 实体注册表,用于维护人名映射一致性
+            strict_mode: 严格模式,确保所有匹配项都被处理
+        """
+        # 1. 手机号脱敏
+        content = self.patterns["phone"].sub('[手机号脱敏]', content)
+
+        # 2. 身份证号脱敏
+        content = self.patterns["id_card"].sub('[证件号脱敏]', content)
+
+        # 3. 证书编号脱敏
+        content = self.patterns["cert"].sub('[证件号脱敏]', content)
+
+        # 4. 邮箱脱敏
+        content = self.patterns["email"].sub('[邮箱脱敏]', content)
+
+        # 5. 带角色的人名脱敏(如"项目经理张三")
+        def replace_person_with_role(match):
+            role = match.group(1)
+            name = match.group(2)
+            # 映射角色到标准占位符类型
+            role_map = {
+                '项目经理': '项目经理',
+                '负责人': '负责人',
+                '安全员': '安全员',
+                '技术负责人': '技术负责人',
+                '班组长': '班组长',
+                '施工员': '施工员',
+                '质检员': '质检员'
+            }
+            placeholder = registry.register_person(name, role_map.get(role, '人员'))
+            return f"{role}:{placeholder}" if ':' in match.group(0) else f"{role}{placeholder}"
+
+        content = self.patterns["person_with_role"].sub(replace_person_with_role, content)
+
+        # 6. 审批签字脱敏
+        def replace_signature(match):
+            return '[已审核]'
+        content = self.patterns["signature"].sub(replace_signature, content)
+
+        # 7. 独立人名脱敏(严格模式下更激进)
+        if strict_mode:
+            def replace_standalone_person(match):
+                name = match.group(1)
+                placeholder = registry.register_person(name, "人员")
+                return match.group(0).replace(name, placeholder)
+            content = self.patterns["person_standalone"].sub(replace_standalone_person, content)
+
+        return content
+
+    def extract_persons(self, content: str) -> list:
+        """提取文本中的所有人名(用于预分析)"""
+        persons = []
+
+        # 提取带角色的人名
+        for match in self.patterns["person_with_role"].finditer(content):
+            persons.append({
+                "name": match.group(2),
+                "role": match.group(1),
+                "position": match.span()
+            })
+
+        return persons

+ 236 - 0
core/construction_review/component/desensitize/remapper.py

@@ -0,0 +1,236 @@
+"""
+审查结果逆向映射器
+
+将云端审查意见中的泛化占位符还原为真实工程术语,生成最终审查报告。
+
+根据 wlast.md 文档第6.3.3节设计
+"""
+
+import re
+import logging
+from typing import Dict, Any, Optional, Tuple
+from dataclasses import dataclass
+
+from .dict_manager import DictManager
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class RemapResult:
+    """映射结果"""
+    original_response: str  # 原始响应(脱敏状态)
+    remapped_response: str  # 还原后的响应(含真实术语)
+    mapping_summary: Dict[str, int]  # 映射统计
+    errors: list  # 错误信息
+
+
+class ResultRemapper:
+    """审查结果逆向映射器
+
+    功能:
+    1. 实体逆向替换(人名、企业名、工程实体)
+    2. 坐标逆向转换(相对桩号 -> 绝对桩号)
+    3. 保持审查意见语义不变
+
+    示例:
+      输入: "[项目经理A] 在 [1号特大桥] K0+500 处发现安全隐患"
+      输出: "张三 在 映雪特大桥 D1K86+779.91 处发现安全隐患"
+    """
+
+    def __init__(self):
+        """初始化映射器"""
+        self.dict_manager = DictManager()
+
+    async def remap(self, cloud_response: str, task_id: str,
+                    remap_coordinate: bool = True) -> RemapResult:
+        """将云端审查意见中的泛化词汇映射回真实工程术语
+
+        Args:
+            cloud_response: 云端审查返回的文本
+            task_id: 文档脱敏时返回的任务ID
+            remap_coordinate: 是否还原相对桩号
+
+        Returns:
+            RemapResult: 映射结果
+        """
+        errors = []
+
+        # 1. 加载脱敏字典
+        full_dict = await self.dict_manager.load(task_id)
+        if not full_dict:
+            error_msg = f"找不到脱敏字典: {task_id}"
+            logger.error(f"[ResultRemapper] {error_msg}")
+            return RemapResult(
+                original_response=cloud_response,
+                remapped_response=cloud_response,
+                mapping_summary={},
+                errors=[error_msg]
+            )
+
+        mapping_dict = full_dict.get("mappings", {})
+        result = cloud_response
+
+        # 2. 统计信息
+        summary = {
+            "person_mapped": 0,
+            "entity_mapped": 0,
+            "company_mapped": 0,
+            "coordinate_mapped": 0,
+            "total_replacements": 0
+        }
+
+        # 3. 人员逆向替换
+        persons = mapping_dict.get("persons", [])
+        for person in persons:
+            original = person.get("masked", "")
+            real_name = person.get("original", "")
+            if original and original in result:
+                count = result.count(original)
+                result = result.replace(original, real_name)
+                summary["person_mapped"] += count
+                summary["total_replacements"] += count
+                logger.debug(f"[ResultRemapper] 人员映射: {original} -> {real_name}")
+
+        # 4. 工程实体逆向替换
+        entities = mapping_dict.get("entities", [])
+        for entity in entities:
+            original = entity.get("masked", "")
+            real_name = entity.get("original", "")
+            if original and original in result:
+                count = result.count(original)
+                result = result.replace(original, real_name)
+                summary["entity_mapped"] += count
+                summary["total_replacements"] += count
+                logger.debug(f"[ResultRemapper] 实体映射: {original} -> {real_name}")
+
+        # 5. 企业/单位逆向替换
+        companies = mapping_dict.get("companies", [])
+        for company in companies:
+            original = company.get("masked", "")
+            real_name = company.get("original", "")
+            if original and original in result:
+                count = result.count(original)
+                result = result.replace(original, real_name)
+                summary["company_mapped"] += count
+                summary["total_replacements"] += count
+                logger.debug(f"[ResultRemapper] 企业映射: {original} -> {real_name}")
+
+        # 6. 坐标逆向转换
+        if remap_coordinate:
+            coord_info = mapping_dict.get("coordinates", {})
+            base_point = coord_info.get("base_point")
+            mappings = coord_info.get("mappings", {})
+
+            if base_point and mappings:
+                result, coord_count = self._remap_coordinates(result, mappings, base_point)
+                summary["coordinate_mapped"] = coord_count
+                summary["total_replacements"] += coord_count
+
+        # 7. 项目名称还原(如果有)
+        project_info = mapping_dict.get("project_name", {})
+        if project_info:
+            masked = project_info.get("masked", "")
+            original = project_info.get("original", "")
+            if masked and masked in result:
+                count = result.count(masked)
+                result = result.replace(masked, original)
+                summary["total_replacements"] += count
+
+        logger.info(f"[ResultRemapper] 映射完成: {summary}")
+
+        return RemapResult(
+            original_response=cloud_response,
+            remapped_response=result,
+            mapping_summary=summary,
+            errors=errors
+        )
+
+    def _remap_coordinates(self, text: str, mappings: Dict[str, str],
+                           base_point: str) -> Tuple[str, int]:
+        """将相对桩号还原为绝对桩号
+
+        Args:
+            text: 待处理的文本
+            mappings: 原始->相对的映射字典
+            base_point: 基准点
+
+        Returns:
+            (处理后的文本, 替换次数)
+        """
+        count = 0
+        result = text
+
+        # 匹配相对桩号格式 K{n}+{m}
+        relative_pattern = re.compile(r'K(\d+)\+(\d{3})')
+
+        def replace_coordinate(match):
+            nonlocal count
+            km = int(match.group(1))
+            meter = int(match.group(2))
+
+            # 查找对应的绝对坐标
+            for original, relative in mappings.items():
+                if relative == match.group(0):
+                    count += 1
+                    return original
+
+            # 如果没有找到精确匹配,尝试计算
+            # 这里简化处理,实际可能需要更复杂的坐标解析
+            return match.group(0)
+
+        result = relative_pattern.sub(replace_coordinate, result)
+        return result, count
+
+    async def remap_batch(self, responses: list, task_id: str,
+                          remap_coordinate: bool = True) -> list:
+        """批量映射多个响应
+
+        Args:
+            responses: 响应列表,每项为 (response_id, response_text)
+            task_id: 任务ID
+            remap_coordinate: 是否还原坐标
+
+        Returns:
+            映射后的结果列表
+        """
+        results = []
+        for response_id, response_text in responses:
+            try:
+                remap_result = await self.remap(response_text, task_id, remap_coordinate)
+                results.append({
+                    "response_id": response_id,
+                    "original": response_text,
+                    "remapped": remap_result.remapped_response,
+                    "summary": remap_result.mapping_summary
+                })
+            except Exception as e:
+                logger.error(f"[ResultRemapper] 批量映射失败 {response_id}: {e}")
+                results.append({
+                    "response_id": response_id,
+                    "original": response_text,
+                    "remapped": response_text,
+                    "error": str(e)
+                })
+        return results
+
+    def extract_masked_entities(self, text: str) -> Dict[str, list]:
+        """从文本中提取所有占位符实体
+
+        用于检查是否还有未映射的内容
+        """
+        patterns = {
+            "person": r'\[(项目经理|安全员|技术负责人|负责人|班组长|施工员|质检员|人员)[ABCDEFGHIJ]\]',
+            "entity": r'\[(\d+号)(特大桥|隧道|涵洞|河流|立交)\]',
+            "company": r'\[(建设单位|总承包单位|监理单位|设计单位|劳务分包单位|数字化实施方|某工程项目)\]',
+            "coordinate": r'K\d+\+\d{3}',
+            "masked": r'\[(手机号|证件号|邮箱|金额|单价|数量|桩号|合同编号|信用代码)脱敏\]',
+        }
+
+        result = {}
+        for entity_type, pattern in patterns.items():
+            matches = re.findall(pattern, text)
+            if matches:
+                result[entity_type] = matches
+
+        return result

+ 268 - 0
core/construction_review/component/desensitize/validator.py

@@ -0,0 +1,268 @@
+"""
+黑白名单校验器
+
+在文件发往云端前进行最终正则校验,任一黑名单模式命中则拦截上传并告警,确保零漏脱。
+"""
+
+import re
+import logging
+from dataclasses import dataclass, field
+from typing import List, Dict, Any, Optional
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class ValidationResult:
+    """校验结果"""
+    is_valid: bool  # 是否通过校验
+    violations: List[Dict[str, Any]] = field(default_factory=list)  # 违规项列表
+    whitelist_matches: int = 0  # 白名单匹配数
+    blacklist_matches: int = 0  # 黑名单匹配数
+
+
+class BlackWhiteListChecker:
+    """黑白名单校验器 - 上传前最终安全门
+
+    核心约束:
+    - 黑名单:绝对不允许出现的模式
+    - 白名单:脱敏后允许的合规格式(用于统计和辅助判断)
+
+    根据 wlast.md 文档第6.3.2节设计
+    """
+
+    # 黑名单:绝对不允许出现的模式(原始敏感数据格式)
+    BLACKLIST_PATTERNS = [
+        # 手机号
+        (r'\b1[3-9]\d{9}\b', 'phone', 'critical'),
+        # 身份证号
+        (r'\b\d{17}[\dXx]\b', 'id_card', 'critical'),
+        # 企业名称(含集团、局、有限公司等)
+        (r'[\u4e00-\u9fa5]{2,10}(集团|有限公司|局|处)', 'company', 'high'),
+        # 精确绝对桩号(含小数点精确值)
+        (r'[A-Z]?\d?[Kk]\d{2,5}\+\d{3,6}\.\d+', 'coordinate', 'high'),
+        # 证书编号
+        (r'[京津沪渝冀豫云辽黑湘皖鲁新苏浙赣鄂桂甘晋蒙陕吉闽贵粤川青藏琼宁][A-Z]\d{6,12}', 'cert', 'high'),
+        # 邮箱
+        (r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', 'email', 'medium'),
+        # 统一社会信用代码
+        (r'[A-Z0-9]{18}', 'credit_code', 'medium'),
+    ]
+
+    # 白名单:脱敏后允许的合规格式
+    WHITELIST_PATTERNS = [
+        # 脱敏标记
+        (r'\[手机号脱敏\]', 'masked_phone'),
+        (r'\[证件号脱敏\]', 'masked_id'),
+        (r'\[邮箱脱敏\]', 'masked_email'),
+        (r'\[金额脱敏\]', 'masked_amount'),
+        (r'\[单价脱敏\]', 'masked_price'),
+        (r'\[数量脱敏\]', 'masked_quantity'),
+        (r'\[桩号脱敏\]', 'masked_coordinate'),
+        (r'\[合同编号脱敏\]', 'masked_contract'),
+        (r'\[信用代码脱敏\]', 'masked_credit'),
+        # 角色占位符
+        (r'\[项目经理[ABCDEFGHIJ]\]', 'role_pm'),
+        (r'\[安全员[ABCDEFGHIJ]\]', 'role_safety'),
+        (r'\[技术负责人[ABCDEFGHIJ]\]', 'role_tech'),
+        (r'\[负责人[ABCDEFGHIJ]\]', 'role_leader'),
+        (r'\[班组长[ABCDEFGHIJ]\]', 'role_foreman'),
+        (r'\[施工员[ABCDEFGHIJ]\]', 'role_worker'),
+        (r'\[质检员[ABCDEFGHIJ]\]', 'role_qc'),
+        (r'\[人员[ABCDEFGHIJ]\]', 'role_person'),
+        # 单位/角色词汇
+        (r'\[建设单位\]', 'company_constructor'),
+        (r'\[总承包单位\]', 'company_contractor'),
+        (r'\[监理单位\]', 'company_supervisor'),
+        (r'\[设计单位\]', 'company_designer'),
+        (r'\[劳务分包单位\]', 'company_subcontractor'),
+        (r'\[数字化实施方\]', 'company_tech'),
+        (r'\[某工程项目\]', 'project_masked'),
+        # 工程实体占位符
+        (r'\[\d+号特大桥\]', 'entity_bridge'),
+        (r'\[\d+号隧道\]', 'entity_tunnel'),
+        (r'\[\d+号涵洞\]', 'entity_culvert'),
+        (r'\[\d+号河流\]', 'entity_river'),
+        (r'\[\d+号立交\]', 'entity_interchange'),
+        # 相对桩号(标准化格式 K{数字}+{数字})
+        (r'K\d+\+\d{3}', 'relative_coordinate'),
+        # 地形描述
+        (r'\[高原高山区\]', 'terrain_high'),
+        (r'\[中山区\]', 'terrain_mid'),
+        (r'\[低山丘陵区\]', 'terrain_low'),
+        (r'\[平原区\]', 'terrain_plain'),
+        (r'\[某地区\]', 'location_masked'),
+        # 地质等级
+        (r'\[[IV]+级围岩区\]', 'geology_level'),
+        # 审批标记
+        (r'\[已审核\]', 'signature_masked'),
+    ]
+
+    def __init__(self):
+        """初始化校验器,编译正则表达式"""
+        self.blacklist = [
+            (re.compile(pattern), pattern_type, severity)
+            for pattern, pattern_type, severity in self.BLACKLIST_PATTERNS
+        ]
+        self.whitelist = [
+            (re.compile(pattern), pattern_type)
+            for pattern, pattern_type in self.WHITELIST_PATTERNS
+        ]
+
+    def validate(self, content: str, check_level: str = "strict") -> ValidationResult:
+        """执行黑白名单校验
+
+        Args:
+            content: 待校验的文本内容
+            check_level: 校验级别 ('strict' 严格 / 'normal' 普通)
+
+        Returns:
+            ValidationResult: 校验结果
+        """
+        violations = []
+        whitelist_matches = 0
+        blacklist_matches = 0
+
+        # 1. 黑名单校验(查找原始敏感数据残留)
+        for pattern, pattern_type, severity in self.blacklist:
+            matches = pattern.findall(content)
+            if matches:
+                blacklist_matches += len(matches)
+                # 去重,限制报告数量
+                unique_matches = list(dict.fromkeys(matches))[:5]
+
+                for match in unique_matches:
+                    # 查找位置
+                    positions = self._find_positions(content, pattern, match)
+
+                    violations.append({
+                        "type": pattern_type,
+                        "pattern": str(pattern.pattern),
+                        "match": match,
+                        "positions": positions,
+                        "severity": severity,
+                        "suggestion": self._get_suggestion(pattern_type),
+                        "check": "blacklist"
+                    })
+
+        # 2. 白名单统计(用于辅助判断脱敏覆盖率)
+        for pattern, pattern_type in self.whitelist:
+            matches = pattern.findall(content)
+            whitelist_matches += len(matches)
+
+        # 3. 严格模式额外检查
+        if check_level == "strict":
+            # 检查是否有疑似中文姓名残留(2-4字人名,前面有职务词)
+            strict_violations = self._strict_mode_check(content)
+            violations.extend(strict_violations)
+            blacklist_matches += len(strict_violations)
+
+        is_valid = len(violations) == 0
+
+        if not is_valid:
+            logger.warning(
+                f"[BlackWhiteListChecker] 校验失败: {len(violations)} 个违规项, "
+                f"黑名单匹配: {blacklist_matches}, 白名单匹配: {whitelist_matches}"
+            )
+            for v in violations[:3]:  # 只记录前3个
+                logger.warning(f"  - {v['type']}: {v['match'][:50]}...")
+
+        return ValidationResult(
+            is_valid=is_valid,
+            violations=violations,
+            whitelist_matches=whitelist_matches,
+            blacklist_matches=blacklist_matches
+        )
+
+    def _find_positions(self, content: str, pattern: re.Pattern,
+                        match_text: str) -> List[Dict[str, Any]]:
+        """查找匹配文本在内容中的位置"""
+        positions = []
+        for m in pattern.finditer(content):
+            if m.group(0) == match_text:
+                # 计算行号和列号
+                line_num = content[:m.start()].count('\n') + 1
+                line_start = content.rfind('\n', 0, m.start()) + 1
+                col_num = m.start() - line_start + 1
+
+                positions.append({
+                    "start": m.start(),
+                    "end": m.end(),
+                    "line": line_num,
+                    "column": col_num
+                })
+                if len(positions) >= 3:  # 限制位置数量
+                    break
+        return positions
+
+    def _get_suggestion(self, pattern_type: str) -> str:
+        """根据违规类型返回建议"""
+        suggestions = {
+            'phone': '替换为 [手机号脱敏]',
+            'id_card': '替换为 [证件号脱敏]',
+            'company': '替换为 [建设单位] 等标准角色词汇',
+            'coordinate': '转换为相对桩号格式 K{n}+{m}',
+            'cert': '替换为 [证件号脱敏]',
+            'email': '替换为 [邮箱脱敏]',
+            'credit_code': '替换为 [信用代码脱敏]',
+            'person_name': '替换为 [项目经理A] 等角色占位符',
+        }
+        return suggestions.get(pattern_type, '检查并脱敏处理')
+
+    def _strict_mode_check(self, content: str) -> List[Dict[str, Any]]:
+        """严格模式额外检查"""
+        violations = []
+
+        # 检查疑似中文姓名(带职务前缀)
+        person_pattern = re.compile(
+            r'(项目经理|负责人|安全员|技术负责人|班组长|施工员|质检员)[::\s]*'
+            r'([\u4e00-\u9fa5]{2,4})'
+        )
+
+        for match in person_pattern.finditer(content):
+            name = match.group(2)
+            # 简单启发式:排除常见非人名词
+            non_person_words = ['要求', '规定', '标准', '规范', '措施', '方案',
+                               '计划', '制度', '管理', '控制', '保证', '确保']
+            if name not in non_person_words:
+                line_num = content[:match.start()].count('\n') + 1
+
+                violations.append({
+                    "type": "person_name",
+                    "pattern": str(person_pattern.pattern),
+                    "match": match.group(0),
+                    "positions": [{"line": line_num, "column": match.start()}],
+                    "severity": "high",
+                    "suggestion": f"将'{name}'替换为[项目经理A]等角色占位符",
+                    "check": "strict_mode"
+                })
+
+        return violations
+
+    def quick_check(self, content: str) -> bool:
+        """快速检查,仅返回是否通过(用于高频场景)"""
+        for pattern, _, _ in self.blacklist:
+            if pattern.search(content):
+                return False
+        return True
+
+    def get_check_summary(self, content: str) -> Dict[str, Any]:
+        """获取校验摘要统计"""
+        result = self.validate(content)
+
+        # 按类型统计
+        severity_count = {"critical": 0, "high": 0, "medium": 0, "low": 0}
+        type_count = {}
+
+        for v in result.violations:
+            severity_count[v["severity"]] = severity_count.get(v["severity"], 0) + 1
+            type_count[v["type"]] = type_count.get(v["type"], 0) + 1
+
+        return {
+            "is_valid": result.is_valid,
+            "total_violations": len(result.violations),
+            "severity_distribution": severity_count,
+            "type_distribution": type_count,
+            "whitelist_matches": result.whitelist_matches,
+            "coverage_estimate": min(100, result.whitelist_matches * 5)  # 粗略估计
+        }

+ 34 - 2
core/construction_review/component/reviewers/timeliness_basis_reviewer.py

@@ -224,12 +224,44 @@ class BasisReviewService:
                 
                 
                 # 获取match_reference_files的结果并过滤
                 # 获取match_reference_files的结果并过滤
                 match_result = await match_reference_files(reference_text=grouped_candidates, review_text=basis_items)
                 match_result = await match_reference_files(reference_text=grouped_candidates, review_text=basis_items)
-                # 解析JSON并过滤:same_name_current和exact_match_info都是""的项过滤掉
+
+                # 记录完整的匹配结果用于调试
+                logger.info(f"批次 match_reference_files 原始结果: {match_result[:500]}...")
+
+                # 解析JSON并过滤:保留有相关信息的项
                 try:
                 try:
                     match_data = json.loads(match_result)
                     match_data = json.loads(match_result)
                     # 提取items字段(match_reference_files返回{items: [...]}格式)
                     # 提取items字段(match_reference_files返回{items: [...]}格式)
                     items = match_data.get('items', match_data) if isinstance(match_data, dict) else match_data
                     items = match_data.get('items', match_data) if isinstance(match_data, dict) else match_data
-                    filtered_data = [item for item in items if item.get('exact_match_info') != ""]
+
+                    logger.info(f"解析到 {len(items)} 个匹配项")
+                    for idx, item in enumerate(items):
+                        logger.info(f"  项{idx}: review_item={item.get('review_item', 'unknown')}, "
+                                  f"has_related_file={item.get('has_related_file')}, "
+                                  f"exact_match_info={item.get('exact_match_info')}, "
+                                  f"same_name_current={item.get('same_name_current')}")
+
+                    # 放宽过滤条件:只要有相关文件信息就进行审查
+                    filtered_data = [
+                        item for item in items
+                        if item.get('has_related_file') or
+                           item.get('exact_match_info') or
+                           item.get('same_name_current')
+                    ]
+
+                    logger.info(f"过滤后保留 {len(filtered_data)} 个项")
+
+                    # 记录被过滤掉的项目用于调试
+                    skipped_items = [
+                        item for item in items
+                        if not (item.get('has_related_file') or
+                               item.get('exact_match_info') or
+                               item.get('same_name_current'))
+                    ]
+                    if skipped_items:
+                        logger.warning(f"跳过了 {len(skipped_items)} 个无参考信息的编制依据: "
+                                     f"{[item.get('review_item', 'unknown') for item in skipped_items]}")
+
                     # 如果没有过滤出数据,直接返回空结果
                     # 如果没有过滤出数据,直接返回空结果
                     if not filtered_data:
                     if not filtered_data:
                         logger.info(f"过滤后没有符合条件的编制依据,跳过后续检查")
                         logger.info(f"过滤后没有符合条件的编制依据,跳过后续检查")

+ 487 - 0
core/construction_review/component/reviewers/timeliness_content_reviewer.py

@@ -0,0 +1,487 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+三级分类内容时效性审查模块
+
+功能:从三级分类详情的content字段中提取规范引用,并进行时效性审查。
+主要用于检测文本内容中引用的规范是否过时(如JTG B01-2011应更新为JTG B01-2014)。
+"""
+
+import re
+import json
+import asyncio
+from typing import Any, Dict, List, Optional, Tuple
+from dataclasses import dataclass, field
+from functools import partial
+
+from foundation.observability.logger.loggering import review_logger as logger
+from core.construction_review.component.reviewers.utils.reference_matcher import match_reference_files
+from core.construction_review.component.reviewers.utils.timeliness_determiner import determine_timeliness_issue
+from core.construction_review.component.reviewers.timeliness_basis_reviewer import BasisSearchEngine, StandardizedResponseProcessor
+
+
+@dataclass
+class StandardReference:
+    """规范引用数据类"""
+    original_text: str           # 原始文本,如"《公路工程技术标准》(JTG B01-2011)"
+    name: str                    # 规范名称,如"公路工程技术标准"
+    number: str                  # 规范编号,如"JTG B01-2011"
+    context: str                 # 上下文内容
+    location_info: Dict[str, Any] = field(default_factory=dict)  # 位置信息
+
+
+@dataclass
+class ContentTimelinessResult:
+    """内容时效性审查结果"""
+    reference: StandardReference
+    has_issue: bool
+    issue_type: str              # 问题类型
+    suggestion: str
+    reason: str
+    risk_level: str              # 无风险 / 高风险
+
+
+class StandardExtractor:
+    """规范引用提取器"""
+
+    # 规范编号正则模式(匹配类似 GB 50010-2010、JTG B01-2014、GB/T 50502-2020 等格式)
+    STANDARD_NUMBER_PATTERNS = [
+        # 中国国家标准:GB 50010-2010、GB/T 50502-2020
+        r'GB(?:/T)?\s*\d{4,5}(?:\.\d+)?\s*-\s*\d{4}',
+        # 中国行业标准:JTG B01-2014、JTG D60-2015、JTG/T 3650-2020
+        r'[A-Z]{2,3}(?:/T)?\s*[A-Z]?\s*\d{2,4}(?:\.\d+)?\s*-\s*\d{4}',
+        # 地方标准:DB11/T 1234-2020
+        r'DB\d{2}(?:/T)?\s*\d{4,5}\s*-\s*\d{4}',
+        # 团体标准:T/CECS 123-2020
+        r'T/\w+\s*\d{3,5}\s*-\s*\d{4}',
+    ]
+
+    # 规范名称与编号组合的正则模式
+    STANDARD_FULL_PATTERN = re.compile(
+        r'《([^《》]+)》\s*[((]([^))]+)[))]',
+        re.MULTILINE
+    )
+
+    # 仅规范编号模式
+    STANDARD_NUMBER_ONLY_PATTERN = re.compile(
+        r'(' + '|'.join(STANDARD_NUMBER_PATTERNS) + r')',
+        re.MULTILINE | re.IGNORECASE
+    )
+
+    def __init__(self):
+        self.extracted_cache: Dict[str, List[StandardReference]] = {}
+
+    def extract_from_content(self, content: str, location_info: Optional[Dict] = None) -> List[StandardReference]:
+        """
+        从内容文本中提取规范引用
+
+        Args:
+            content: 内容文本(包含行号标记如 <80>)
+            location_info: 位置信息(如三级分类代码、行号范围等)
+
+        Returns:
+            List[StandardReference]: 提取的规范引用列表
+        """
+        if not content:
+            return []
+
+        # 使用缓存
+        cache_key = hash(content)
+        if cache_key in self.extracted_cache:
+            return self.extracted_cache[cache_key]
+
+        references = []
+
+        # 1. 提取完整格式:《名称》(编号)
+        full_matches = self.STANDARD_FULL_PATTERN.findall(content)
+        for name, number in full_matches:
+            # 验证编号是否符合规范格式
+            if self._is_valid_standard_number(number):
+                original = f"《{name}》({number})"
+                # 查找该引用在原文中的位置
+                context = self._extract_context(content, original)
+                ref = StandardReference(
+                    original_text=original,
+                    name=name.strip(),
+                    number=number.strip(),
+                    context=context,
+                    location_info=location_info or {}
+                )
+                references.append(ref)
+
+        # 2. 提取孤立的规范编号(用于补充)
+        number_matches = self.STANDARD_NUMBER_ONLY_PATTERN.findall(content)
+        for match in number_matches:
+            number = match if isinstance(match, str) else match[0]
+            # 检查是否已包含在完整格式中
+            if not any(number in ref.number for ref in references):
+                # 尝试提取该编号附近的上下文作为名称
+                name = self._infer_name_from_context(content, number)
+                original = f"《{name}》({number})" if name else number
+                ref = StandardReference(
+                    original_text=original,
+                    name=name or "",
+                    number=number.strip(),
+                    context=self._extract_context(content, number),
+                    location_info=location_info or {}
+                )
+                references.append(ref)
+
+        # 去重(基于original_text)
+        seen = set()
+        unique_refs = []
+        for ref in references:
+            if ref.original_text not in seen:
+                seen.add(ref.original_text)
+                unique_refs.append(ref)
+
+        self.extracted_cache[cache_key] = unique_refs
+        return unique_refs
+
+    def _is_valid_standard_number(self, number: str) -> bool:
+        """验证是否为有效的规范编号"""
+        number = number.strip().upper()
+        # 检查是否匹配任一规范编号模式
+        for pattern in self.STANDARD_NUMBER_PATTERNS:
+            if re.match(pattern, number, re.IGNORECASE):
+                return True
+        return False
+
+    def _extract_context(self, content: str, target: str, window: int = 50) -> str:
+        """提取目标文本的上下文"""
+        idx = content.find(target)
+        if idx == -1:
+            return ""
+        start = max(0, idx - window)
+        end = min(len(content), idx + len(target) + window)
+        return content[start:end].strip()
+
+    def _infer_name_from_context(self, content: str, number: str) -> str:
+        """从上下文推断规范名称"""
+        # 查找编号附近的《名称》格式
+        pattern = re.compile(r'《([^《》]{3,50})》[^《》]{0,30}' + re.escape(number))
+        match = pattern.search(content)
+        if match:
+            return match.group(1)
+        return ""
+
+
+class ContentTimelinessReviewer:
+    """三级分类内容时效性审查器"""
+
+    def __init__(self, max_concurrent: int = 4):
+        self.extractor = StandardExtractor()
+        self.search_engine = BasisSearchEngine()
+        self.response_processor = StandardizedResponseProcessor()
+        self.max_concurrent = max_concurrent
+        self._semaphore = None
+
+    async def __aenter__(self):
+        """异步上下文管理器入口"""
+        if self._semaphore is None:
+            self._semaphore = asyncio.Semaphore(self.max_concurrent)
+        return self
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        """异步上下文管理器出口"""
+        return False
+
+    async def review_tertiary_content(
+        self,
+        tertiary_details: List[Dict[str, Any]],
+        collection_name: str = "first_bfp_collection_status",
+        progress_manager=None,
+        callback_task_id: str = None
+    ) -> List[Dict[str, Any]]:
+        """
+        审查三级分类内容中的规范时效性
+
+        Args:
+            tertiary_details: 三级分类详情列表,每项包含content字段
+            collection_name: Milvus集合名称
+            progress_manager: 进度管理器(可选,用于SSE推送)
+            callback_task_id: 回调任务ID(可选)
+
+        Returns:
+            List[Dict]: 标准化的审查结果列表
+        """
+        if not tertiary_details:
+            return []
+
+        # 1. 从所有三级分类内容中提取规范引用
+        all_references = []
+        reference_to_location = {}  # 用于追踪引用来源
+
+        for detail in tertiary_details:
+            content = detail.get("content", "")
+            if not content:
+                continue
+
+            location_info = {
+                "third_category_name": detail.get("third_category_name", ""),
+                "third_category_code": detail.get("third_category_code", ""),
+                "start_line": detail.get("start_line", 0),
+                "end_line": detail.get("end_line", 0),
+            }
+
+            refs = self.extractor.extract_from_content(content, location_info)
+            for ref in refs:
+                all_references.append(ref)
+                # 记录引用来源(用于后续结果关联)
+                if ref.original_text not in reference_to_location:
+                    reference_to_location[ref.original_text] = []
+                reference_to_location[ref.original_text].append(location_info)
+
+        if not all_references:
+            logger.info("未从三级分类内容中提取到规范引用")
+            return []
+
+        logger.info(f"从三级分类内容中提取到 {len(all_references)} 个规范引用")
+
+        # 2. 对提取的规范进行时效性审查
+        all_issues = []
+
+        # 分批处理(每批3个)
+        batch_size = 3
+        ref_texts = [ref.original_text for ref in all_references]
+        total_batches = (len(ref_texts) + batch_size - 1) // batch_size
+
+        for i in range(0, len(ref_texts), batch_size):
+            batch_refs = all_references[i:i + batch_size]
+            batch_texts = [ref.original_text for ref in batch_refs]
+            batch_num = i // batch_size + 1
+
+            try:
+                async with self._semaphore:
+                    # 搜索参考规范
+                    search_tasks = []
+                    for ref in batch_refs:
+                        task = asyncio.create_task(
+                            self._async_search_standard(ref.number, collection_name)
+                        )
+                        search_tasks.append(task)
+
+                    search_results = await asyncio.gather(*search_tasks, return_exceptions=True)
+
+                    # 构建参考文本列表
+                    grouped_candidates = []
+                    for j, result in enumerate(search_results):
+                        if isinstance(result, Exception):
+                            logger.error(f"搜索失败 '{batch_refs[j].original_text}': {result}")
+                            grouped_candidates.append([])
+                        else:
+                            texts = [item.get("text_content", "") for item in result if item]
+                            grouped_candidates.append(texts)
+
+                    # 匹配参考文件
+                    match_result = await match_reference_files(
+                        reference_text=grouped_candidates,
+                        review_text=batch_texts
+                    )
+
+                    # 记录完整的匹配结果用于调试
+                    logger.info(f"批次{batch_num} match_reference_files 原始结果: {match_result[:500]}...")
+
+                    # 过滤:保留有相关信息的项进行审查
+                    # 条件:has_related_file为true 或 exact_match_info不为空 或 same_name_current不为空
+                    try:
+                        match_data = json.loads(match_result)
+                        items = match_data.get('items', match_data) if isinstance(match_data, dict) else match_data
+
+                        logger.info(f"批次{batch_num} 解析到 {len(items)} 个匹配项")
+                        for idx, item in enumerate(items):
+                            logger.info(f"  项{idx}: review_item={item.get('review_item', 'unknown')}, "
+                                      f"has_related_file={item.get('has_related_file')}, "
+                                      f"exact_match_info={item.get('exact_match_info')}, "
+                                      f"same_name_current={item.get('same_name_current')}")
+
+                        # 放宽过滤条件:只要有相关文件信息就进行审查
+                        filtered_data = [
+                            item for item in items
+                            if item.get('has_related_file') or
+                               item.get('exact_match_info') or
+                               item.get('same_name_current')
+                        ]
+
+                        logger.info(f"批次{batch_num} 过滤后保留 {len(filtered_data)} 个项")
+
+                        # 记录被过滤掉的项目用于调试
+                        skipped_items = [
+                            item for item in items
+                            if not (item.get('has_related_file') or
+                                   item.get('exact_match_info') or
+                                   item.get('same_name_current'))
+                        ]
+                        if skipped_items:
+                            logger.warning(f"批次{batch_num} 跳过了 {len(skipped_items)} 个无参考信息的项: "
+                                         f"{[item.get('review_item', 'unknown') for item in skipped_items]}")
+
+                        if not filtered_data:
+                            logger.info(f"批次{batch_num}: 没有符合审查条件的规范引用")
+                            continue
+
+                        # 重新构建JSON
+                        if isinstance(match_data, dict) and 'items' in match_data:
+                            match_result = json.dumps({"items": filtered_data}, ensure_ascii=False)
+                        else:
+                            match_result = json.dumps(filtered_data, ensure_ascii=False)
+
+                        # 判定时效性问题
+                        llm_out = await determine_timeliness_issue(match_result)
+
+                        # 处理响应
+                        standardized_result = self.response_processor.process_llm_response(
+                            llm_out,
+                            "content_timeliness_check",
+                            "content",
+                            "content_timeliness_check"
+                        )
+
+                        # 3. 增强结果:添加位置信息
+                        for item in standardized_result:
+                            review_item = item.get("check_result", {}).get("location", "")
+                            if review_item in reference_to_location:
+                                locations = reference_to_location[review_item]
+                                # 添加位置信息到结果
+                                item["location_info"] = locations
+                                # 添加三级分类上下文
+                                contexts = []
+                                for loc in locations:
+                                    ctx = f"[{loc.get('third_category_name', '')}] 第{loc.get('start_line', 0)}-{loc.get('end_line', 0)}行"
+                                    contexts.append(ctx)
+                                item["content_context"] = "; ".join(contexts)
+
+                                # 更新location字段为更详细的描述
+                                if contexts:
+                                    item["check_result"]["location"] = f"{review_item}(出现在:{item['content_context']})"
+
+                        all_issues.extend(standardized_result)
+
+                        # SSE推送(如果提供了progress_manager)
+                        if progress_manager and callback_task_id:
+                            try:
+                                await progress_manager.update_stage_progress(
+                                    callback_task_id=callback_task_id,
+                                    stage_name=f"内容时效性审查-批次{batch_num}",
+                                    status="processing",
+                                    message=f"完成第{batch_num}/{total_batches}批次内容时效性审查,{len(batch_refs)}项",
+                                    overall_task_status="processing",
+                                    event_type="processing",
+                                    issues=standardized_result
+                                )
+                            except Exception as e:
+                                logger.error(f"SSE推送失败: {e}")
+
+                    except (json.JSONDecodeError, TypeError) as e:
+                        logger.warning(f"处理匹配结果时出错: {e}")
+                        continue
+
+            except Exception as e:
+                logger.error(f"批次 {batch_num} 处理失败: {e}")
+                error_result = {
+                    "check_item": "content_timeliness_check",
+                    "chapter_code": "content",
+                    "check_item_code": "content_timeliness_check",
+                    "check_result": {"error": str(e), "batch_num": batch_num},
+                    "exist_issue": True,
+                    "risk_info": {"risk_level": "medium"}
+                }
+                all_issues.append(error_result)
+
+        # 统计结果
+        issue_count = sum(1 for item in all_issues if item.get("exist_issue", False))
+        logger.info(f"内容时效性审查完成:总计 {len(all_references)} 项引用,发现问题 {issue_count} 项")
+
+        return all_issues
+
+    async def _async_search_standard(
+        self,
+        standard_number: str,
+        collection_name: str,
+        top_k: int = 3
+    ) -> List[dict]:
+        """异步搜索单个规范"""
+        try:
+            loop = asyncio.get_running_loop()
+            func = partial(
+                self.search_engine.hybrid_search,
+                collection_name=collection_name,
+                query_text=standard_number,
+                top_k=top_k,
+                ranker_type="weighted",
+                dense_weight=0.3,
+                sparse_weight=0.7
+            )
+            retrieved = await loop.run_in_executor(None, func)
+            logger.debug(f"搜索 '{standard_number}' -> 找到 {len(retrieved or [])} 个结果")
+            return retrieved or []
+        except Exception as e:
+            logger.error(f"搜索失败 '{standard_number}': {e}")
+            return []
+
+
+# ===== 便捷函数 =====
+
+async def review_tertiary_content_timeliness(
+    tertiary_details: List[Dict[str, Any]],
+    collection_name: str = "first_bfp_collection_status",
+    max_concurrent: int = 4,
+    progress_manager=None,
+    callback_task_id: str = None
+) -> List[Dict[str, Any]]:
+    """
+    审查三级分类内容时效性的便捷函数
+
+    Args:
+        tertiary_details: 三级分类详情列表
+        collection_name: Milvus集合名称
+        max_concurrent: 最大并发数
+        progress_manager: 进度管理器(可选)
+        callback_task_id: 回调任务ID(可选)
+
+    Returns:
+        List[Dict]: 标准化的审查结果列表
+    """
+    async with ContentTimelinessReviewer(max_concurrent=max_concurrent) as reviewer:
+        return await reviewer.review_tertiary_content(
+            tertiary_details=tertiary_details,
+            collection_name=collection_name,
+            progress_manager=progress_manager,
+            callback_task_id=callback_task_id
+        )
+
+
+# ===== 测试代码 =====
+if __name__ == "__main__":
+    # 测试数据
+    test_tertiary_details = [
+        {
+            "third_category_name": "国家方针、政策、标准和设计文件",
+            "third_category_code": "NationalPoliciesStandardsAndDesignDocument",
+            "start_line": 80,
+            "end_line": 82,
+            "content": "<80> 国家方针、政策、标准和设计文件\n<81> 《公路工程技术标准》(JTG B01-2011)\n<82> 《公路桥涵设计通用规范》(JTG D60-2015)"
+        },
+        {
+            "third_category_name": "施工技术标准",
+            "third_category_code": "ConstructionTechnicalStandards",
+            "start_line": 100,
+            "end_line": 102,
+            "content": "<100> 施工技术标准\n<101> 《公路桥涵施工技术规范》(JTG/T 3650-2020)\n<102> 《混凝土结构设计规范》(GB 50010-2010)"
+        }
+    ]
+
+    print(f"测试 {len(test_tertiary_details)} 个三级分类内容...")
+
+    # 测试提取器
+    extractor = StandardExtractor()
+    for detail in test_tertiary_details:
+        refs = extractor.extract_from_content(detail["content"])
+        print(f"\n从 '{detail['third_category_name']}' 提取到 {len(refs)} 个规范引用:")
+        for ref in refs:
+            print(f"  - {ref.original_text}")
+
+    # 测试完整审查流程(需要Milvus连接)
+    # result = asyncio.run(review_tertiary_content_timeliness(test_tertiary_details))
+    # print("\n审查结果:")
+    # print(json.dumps(result, ensure_ascii=False, indent=2))

+ 22 - 1
core/construction_review/component/reviewers/utils/inter_tool.py

@@ -298,7 +298,7 @@ class InterTool:
                 reference_data = check_result.get('reference_basis_review_results', {})
                 reference_data = check_result.get('reference_basis_review_results', {})
                 batch_results = reference_data.get('review_results', [])
                 batch_results = reference_data.get('review_results', [])
                 logger.debug(f"🔍 [DEBUG] 处理规范性审查结果,批次数: {len(batch_results)}")
                 logger.debug(f"🔍 [DEBUG] 处理规范性审查结果,批次数: {len(batch_results)}")
-                
+
                 for batch in batch_results:
                 for batch in batch_results:
                     if isinstance(batch, list):
                     if isinstance(batch, list):
                         for item in batch:
                         for item in batch:
@@ -323,6 +323,27 @@ class InterTool:
                 logger.info(f"🔍 规范性审查结果处理完成,添加 {len(review_lists)} 个问题项")
                 logger.info(f"🔍 规范性审查结果处理完成,添加 {len(review_lists)} 个问题项")
                 continue
                 continue
 
 
+            # 🔧 特殊处理:timeliness_content_reviewer 的返回格式
+            if check_key == 'timeliness_content_reviewer' and isinstance(check_result, dict):
+                content_timeliness_data = check_result.get('timeliness_content_review_results', {})
+                batch_results = content_timeliness_data.get('review_results', [])
+                logger.debug(f"🔍 [DEBUG] 处理内容时效性审查结果,问题数: {len(batch_results)}")
+
+                for item in batch_results:
+                    if isinstance(item, dict):
+                        review_lists.append({
+                            "check_item": item.get('check_item', 'content_timeliness_check'),
+                            "chapter_code": item.get('chapter_code', chapter_code),
+                            "check_item_code": item.get('check_item_code', f"{chapter_code}_content_timeliness_check"),
+                            "check_result": item.get('check_result', item),
+                            "exist_issue": item.get('exist_issue', False),
+                            "risk_info": item.get('risk_info', {"risk_level": "low"}),
+                            "location_info": item.get('location_info', []),
+                            "content_context": item.get('content_context', '')
+                        })
+                logger.info(f"🔍 内容时效性审查结果处理完成,添加 {len(batch_results)} 个问题项")
+                continue
+
             # 🔧 类型安全检查:支持字典和 base_reviewer.ReviewResult 对象
             # 🔧 类型安全检查:支持字典和 base_reviewer.ReviewResult 对象
             is_dict = isinstance(check_result, dict)
             is_dict = isinstance(check_result, dict)
             is_review_result = hasattr(check_result, 'details') and hasattr(check_result, 'success')
             is_review_result = hasattr(check_result, 'details') and hasattr(check_result, 'success')

+ 1 - 0
core/construction_review/component/reviewers/utils/reference_matcher.py

@@ -8,6 +8,7 @@ import re
 from typing import List, Optional, Tuple
 from typing import List, Optional, Tuple
 from dataclasses import dataclass
 from dataclasses import dataclass
 
 
+from core.construction_review.component.reviewers.utils.reference_number_generator import generate_reference_number, validate_reference_number
 from pydantic import BaseModel, Field, ValidationError
 from pydantic import BaseModel, Field, ValidationError
 from langchain_core.prompts import ChatPromptTemplate
 from langchain_core.prompts import ChatPromptTemplate
 from langchain_core.output_parsers import PydanticOutputParser, StrOutputParser
 from langchain_core.output_parsers import PydanticOutputParser, StrOutputParser

+ 1 - 0
core/construction_review/workflows/ai_review_workflow.py

@@ -293,6 +293,7 @@ class AIReviewWorkflow:
                 'semantic_logic_check': 'check_semantic_logic',
                 'semantic_logic_check': 'check_semantic_logic',
                 'completeness_check': 'check_completeness',
                 'completeness_check': 'check_completeness',
                 'timeliness_check': 'timeliness_basis_reviewer',
                 'timeliness_check': 'timeliness_basis_reviewer',
+                'timeliness_content_check': 'timeliness_content_reviewer',
                 'reference_check': 'reference_basis_reviewer',
                 'reference_check': 'reference_basis_reviewer',
                 'sensitive_check': 'check_sensitive',
                 'sensitive_check': 'check_sensitive',
                 'non_parameter_compliance_check': 'check_non_parameter_compliance',
                 'non_parameter_compliance_check': 'check_non_parameter_compliance',

+ 25 - 0
core/construction_review/workflows/core_functions/ai_review_core_fun.py

@@ -546,6 +546,31 @@ class AIReviewCoreFun:
                 is_sse_push=True
                 is_sse_push=True
             )
             )
 
 
+        # timeliness_content_reviewer:三级分类内容时效性审查(逐块处理)
+        elif func_name == "timeliness_content_reviewer" and not is_complete_field:
+            # 从chunk中获取三级分类详情
+            tertiary_details = chunk.get("tertiary_classification_details", [])
+            review_data = {
+                "tertiary_classification_details": tertiary_details,  # 三级分类详情
+                "max_concurrent": 4
+            }
+            raw_result = await method(
+                review_data=review_data,
+                trace_id=trace_id,
+                state=state,
+                stage_name=stage_name
+            )
+            # 基础审查方法,放入 basic_compliance
+            return UnitReviewResult(
+                unit_index=chunk_index,
+                unit_content=chunk,
+                basic_compliance={func_name: raw_result},
+                technical_compliance={},
+                rag_enhanced={},
+                overall_risk=self._calculate_single_result_risk(raw_result),
+                is_sse_push=True
+            )
+
         else:
         else:
             # 处理 check_completeness 但 is_complete_field=False 的情况
             # 处理 check_completeness 但 is_complete_field=False 的情况
             if func_name == "check_completeness" and not is_complete_field:
             if func_name == "check_completeness" and not is_complete_field:

+ 16 - 0
foundation/infrastructure/config/config.py

@@ -24,6 +24,22 @@ class ConfigHandler:
             value = default
             value = default
         return value
         return value
 
 
+    def get_section(self, section):
+        """获取整个配置段的字典
+
+        Args:
+            section: 配置段名称
+
+        Returns:
+            该段所有配置的字典,如果不存在则返回空字典
+        """
+        try:
+            if self.config.has_section(section):
+                return dict(self.config.items(section))
+        except Exception:
+            pass
+        return {}
+
 
 
 
 
 # 全局配置实例
 # 全局配置实例

File diff ditekan karena terlalu besar
+ 145 - 0
problem.json


+ 3 - 0
server/app.py

@@ -35,6 +35,7 @@ from views.construction_review.file_upload import file_upload_router
 from views.construction_review.review_results import review_results_router
 from views.construction_review.review_results import review_results_router
 from views.construction_review.launch_review import launch_review_router
 from views.construction_review.launch_review import launch_review_router
 from views.construction_review.task_control import task_control_router
 from views.construction_review.task_control import task_control_router
+from views.construction_review.desensitize_api import desensitize_router
 
 
 # 导入施工方案编写路由
 # 导入施工方案编写路由
 from views.construction_write.outline_views import outline_router
 from views.construction_write.outline_views import outline_router
@@ -103,6 +104,7 @@ class RouteManager:
         self.app.include_router(review_results_router)
         self.app.include_router(review_results_router)
         self.app.include_router(launch_review_router)
         self.app.include_router(launch_review_router)
         self.app.include_router(task_control_router)  # 任务控制路由
         self.app.include_router(task_control_router)  # 任务控制路由
+        self.app.include_router(desensitize_router)   # 数据脱敏路由
 
 
         # 施工方案编写路由
         # 施工方案编写路由
         self.app.include_router(outline_router)
         self.app.include_router(outline_router)
@@ -540,6 +542,7 @@ def create_app() -> FastAPI:
     app.include_router(file_upload_router)
     app.include_router(file_upload_router)
     app.include_router(review_results_router)
     app.include_router(review_results_router)
     app.include_router(launch_review_router)
     app.include_router(launch_review_router)
+    app.include_router(desensitize_router)  # 数据脱敏路由
 
 
     # 施工方案编写路由
     # 施工方案编写路由
     app.include_router(outline_router)
     app.include_router(outline_router)

+ 141 - 0
test_content_timeliness.py

@@ -0,0 +1,141 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+测试内容时效性审查是否正确处理 JTG B01-2011 的情况
+"""
+
+import json
+import asyncio
+from core.construction_review.component.reviewers.timeliness_content_reviewer import (
+    StandardExtractor, ContentTimelinessReviewer
+)
+
+# 测试数据 - 模拟 problem.json 中的情况
+test_tertiary_details = [
+    {
+        "third_category_name": "国家方针、政策、标准和设计文件",
+        "third_category_code": "NationalPoliciesStandardsAndDesignDocument",
+        "start_line": 80,
+        "end_line": 82,
+        "content": """<80> 国家方针、政策、标准和设计文件
+<81> 《公路工程技术标准》(JTG B01-2011)
+<82> 《公路桥涵设计通用规范》(JTG D60-2015)"""
+    }
+]
+
+# 测试提取器
+def test_extractor():
+    print("=" * 60)
+    print("测试规范提取器")
+    print("=" * 60)
+
+    extractor = StandardExtractor()
+
+    for detail in test_tertiary_details:
+        refs = extractor.extract_from_content(detail["content"])
+        print(f"\n从 '{detail['third_category_name']}' 提取到 {len(refs)} 个规范引用:")
+        for ref in refs:
+            print(f"  - 原始文本: {ref.original_text}")
+            print(f"    名称: {ref.name}")
+            print(f"    编号: {ref.number}")
+            print(f"    上下文: {ref.context[:100]}...")
+
+    return refs
+
+# 测试过滤逻辑
+def test_filter_logic():
+    print("\n" + "=" * 60)
+    print("测试过滤逻辑")
+    print("=" * 60)
+
+    # 模拟 match_reference_files 返回的数据
+    mock_match_result = [
+        {
+            "review_item": "《公路工程技术标准》(JTG B01-2011)",
+            "has_related_file": True,
+            "has_exact_match": False,
+            "exact_match_info": "",
+            "same_name_current": "《公路工程技术标准》(JTG B01-2014)状态为现行"
+        },
+        {
+            "review_item": "《公路桥涵设计通用规范》(JTG D60-2015)",
+            "has_related_file": True,
+            "has_exact_match": True,
+            "exact_match_info": "《公路桥涵设计通用规范》(JTG D60-2015)状态为现行",
+            "same_name_current": ""
+        }
+    ]
+
+    print("\n模拟 match_reference_files 返回数据:")
+    for idx, item in enumerate(mock_match_result):
+        print(f"\n  项{idx}:")
+        print(f"    review_item: {item['review_item']}")
+        print(f"    has_related_file: {item['has_related_file']}")
+        print(f"    has_exact_match: {item['has_exact_match']}")
+        print(f"    exact_match_info: {item['exact_match_info']}")
+        print(f"    same_name_current: {item['same_name_current']}")
+
+    # 测试旧过滤逻辑(只保留 exact_match_info 不为空的)
+    old_filtered = [item for item in mock_match_result if item.get('exact_match_info')]
+    print(f"\n旧过滤逻辑(只保留 exact_match_info 不为空的): {len(old_filtered)} 个项")
+    for item in old_filtered:
+        print(f"  - {item['review_item']}")
+
+    # 测试新过滤逻辑(保留有相关信息的)
+    new_filtered = [
+        item for item in mock_match_result
+        if item.get('has_related_file') or
+           item.get('exact_match_info') or
+           item.get('same_name_current')
+    ]
+    print(f"\n新过滤逻辑(保留有相关信息的): {len(new_filtered)} 个项")
+    for item in new_filtered:
+        print(f"  - {item['review_item']}")
+
+    # 分析差异
+    missed = [item for item in mock_match_result if item not in old_filtered]
+    if missed:
+        print(f"\n[警告] 旧逻辑漏检的项:")
+        for item in missed:
+            print(f"  - {item['review_item']}")
+            print(f"    has_related_file: {item['has_related_file']}")
+            print(f"    same_name_current: {item['same_name_current']}")
+
+# 完整测试
+async def test_full_review():
+    print("\n" + "=" * 60)
+    print("完整审查测试(需要 Milvus 连接)")
+    print("=" * 60)
+
+    try:
+        async with ContentTimelinessReviewer(max_concurrent=4) as reviewer:
+            results = await reviewer.review_tertiary_content(
+                tertiary_details=test_tertiary_details,
+                collection_name="first_bfp_collection_status"
+            )
+
+            print(f"\n审查完成,共 {len(results)} 个结果:")
+            for idx, result in enumerate(results):
+                print(f"\n  结果{idx}:")
+                print(f"    check_item: {result.get('check_item')}")
+                print(f"    exist_issue: {result.get('exist_issue')}")
+                print(f"    risk_info: {result.get('risk_info')}")
+                check_result = result.get('check_result', {})
+                print(f"    issue_point: {check_result.get('issue_point')}")
+                print(f"    suggestion: {check_result.get('suggestion')}")
+                print(f"    reason: {check_result.get('reason')}")
+
+    except Exception as e:
+        print(f"测试失败: {e}")
+        import traceback
+        traceback.print_exc()
+
+if __name__ == "__main__":
+    # 测试提取器
+    refs = test_extractor()
+
+    # 测试过滤逻辑
+    test_filter_logic()
+
+    # 完整测试(可选)
+    # asyncio.run(test_full_review())

+ 358 - 0
views/construction_review/desensitize_api.py

@@ -0,0 +1,358 @@
+"""
+脱敏模块 API 接口
+
+提供文档脱敏、校验、结果还原等功能的 REST API 接口
+
+根据 wlast.md 文档第7节设计
+"""
+
+import uuid
+from datetime import datetime
+from typing import Optional
+from pydantic import BaseModel, Field
+from fastapi import APIRouter, HTTPException, UploadFile, File, Form
+from fastapi.responses import JSONResponse
+
+from foundation.observability.logger.loggering import review_logger as logger
+from core.construction_review.component.desensitize import (
+    BlackWhiteListChecker,
+    ValidationResult,
+    DictManager,
+)
+from core.construction_review.component.desensitize.remapper import ResultRemapper
+
+desensitize_router = APIRouter(prefix="/desensitize", tags=["数据脱敏"])
+
+# 初始化组件
+validator = BlackWhiteListChecker()
+dict_manager = DictManager()
+remapper = ResultRemapper()
+
+
+# ============ 请求/响应模型 ============
+
+class DesensitizeLevel:
+    """脱敏级别枚举"""
+    MINIMAL = "minimal"    # 最小脱敏:仅PII
+    STANDARD = "standard"  # 标准脱敏:PII + 地理坐标 + 商业标识
+    STRICT = "strict"      # 严格脱敏:全四维度
+
+
+class DesensitizeModelType:
+    """脱敏模型类型枚举"""
+    RULE = "rule"              # 规则引擎
+    QWEN3_5_35B = "qwen3_5_35b"  # Qwen3.5-35B本地推理
+
+
+class ValidateCheckLevel:
+    """校验级别枚举"""
+    STRICT = "strict"
+    NORMAL = "normal"
+
+
+class DesensitizeDocumentRequest(BaseModel):
+    """文档脱敏请求模型"""
+    user_id: str = Field(..., description="用户唯一标识")
+    project_id: str = Field(..., description="项目唯一标识")
+    desensitize_level: str = Field(default="standard", description="脱敏级别: minimal/standard/strict")
+    model_type: str = Field(default="rule", description="脱敏处理模型: rule/qwen3_5_35b")
+
+
+class DesensitizeDocumentResponse(BaseModel):
+    """文档脱敏响应模型"""
+    code: int = Field(default=200, description="状态码")
+    message: str = Field(default="success", description="状态消息")
+    data: dict = Field(default_factory=dict, description="响应数据")
+
+
+class ValidateRequest(BaseModel):
+    """脱敏校验请求模型"""
+    content: str = Field(..., description="待校验的文本内容")
+    check_level: str = Field(default="strict", description="校验级别: strict/normal")
+
+
+class ValidateResponse(BaseModel):
+    """脱敏校验响应模型"""
+    code: int = Field(default=200, description="状态码")
+    message: str = Field(default="success", description="状态消息")
+    data: dict = Field(default_factory=dict, description="响应数据")
+
+
+class RemapRequest(BaseModel):
+    """结果还原请求模型"""
+    task_id: str = Field(..., description="文档脱敏时返回的任务ID")
+    cloud_response: str = Field(..., description="云端审查返回的文本")
+    remap_coordinate: bool = Field(default=True, description="是否还原相对桩号")
+
+
+class RemapResponse(BaseModel):
+    """结果还原响应模型"""
+    code: int = Field(default=200, description="状态码")
+    message: str = Field(default="success", description="状态消息")
+    data: dict = Field(default_factory=dict, description="响应数据")
+
+
+class DictInfoResponse(BaseModel):
+    """字典信息响应模型"""
+    code: int = Field(default=200, description="状态码")
+    message: str = Field(default="success", description="状态消息")
+    data: dict = Field(default_factory=dict, description="响应数据")
+
+
+# ============ API 接口 ============
+
+@desensitize_router.post("/document", response_model=DesensitizeDocumentResponse)
+async def desensitize_document(
+    user_id: str = Form(..., description="用户唯一标识"),
+    project_id: str = Form(..., description="项目唯一标识"),
+    document: UploadFile = File(..., description="PDF/Word格式施工方案"),
+    desensitize_level: str = Form(default="standard", description="脱敏级别: minimal/standard/strict"),
+    model_type: str = Form(default="rule", description="脱敏处理模型: rule/qwen3_5_35b")
+):
+    """
+    文档脱敏接口
+
+    对施工方案文档进行四维度脱敏处理,生成脱敏字典并本地加密存储
+
+    - **desensitize_level**: minimal(仅PII) / standard(标准) / strict(严格)
+    - **model_type**: rule(规则引擎) / qwen3_5_35b(本地大模型)
+    """
+    try:
+        # 生成任务ID
+        task_id = f"des-{datetime.now().strftime('%Y%m%d')}-{uuid.uuid4().hex[:6]}"
+
+        logger.info(f"[DesensitizeAPI] 文档脱敏请求: task_id={task_id}, "
+                   f"user_id={user_id}, level={desensitize_level}, model={model_type}")
+
+        # 读取文档内容
+        content_bytes = await document.read()
+        content = content_bytes.decode('utf-8', errors='ignore')
+
+        if not content:
+            raise HTTPException(status_code=400, detail="文档内容为空或无法解析")
+
+        # 注:脱敏功能暂时禁用,直接返回原始内容
+        # TODO: 如需启用脱敏,取消下面注释并删除直接返回的代码
+        # result: DesensitizedResult = await desensitize_engine.process(content, task_id)
+        # if not result.is_valid:
+        #     return DesensitizeDocumentResponse(...)
+
+        # 直接返回原始内容(脱敏已禁用)
+        preview_length = min(500, len(content))
+        return DesensitizeDocumentResponse(
+            code=200,
+            message="文档处理成功(脱敏功能已禁用)",
+            data={
+                "task_id": task_id,
+                "status": "completed (desensitization disabled)",
+                "desensitize_level": desensitize_level,
+                "model_type": model_type,
+                "output": {
+                    "content_preview": content[:preview_length] + "..." if len(content) > preview_length else content,
+                    "content_length": len(content),
+                    "dict_hash": ""
+                },
+                "statistics": {
+                    "pii_count": 0,
+                    "geo_count": 0,
+                    "biz_count": 0,
+                    "financial_count": 0
+                }
+            }
+        )
+
+    except Exception as e:
+        logger.exception(f"[DesensitizeAPI] 文档脱敏失败: {e}")
+        raise HTTPException(status_code=500, detail=f"脱敏处理失败: {str(e)}")
+
+
+@desensitize_router.post("/validate", response_model=ValidateResponse)
+async def validate_desensitized(request: ValidateRequest):
+    """
+    脱敏校验接口
+
+    黑白名单校验,检测脱敏是否完整,返回违规项列表
+
+    - **check_level**: strict(严格) / normal(普通)
+    """
+    try:
+        logger.info(f"[DesensitizeAPI] 校验请求: check_level={request.check_level}")
+
+        result: ValidationResult = validator.validate(request.content, request.check_level)
+
+        # 构造违规项响应
+        violations = []
+        for v in result.violations[:20]:  # 限制返回数量
+            violations.append({
+                "type": v.get("type"),
+                "match": v.get("match"),
+                "severity": v.get("severity"),
+                "suggestion": v.get("suggestion"),
+                "position": v.get("positions", [{}])[0] if v.get("positions") else {}
+            })
+
+        return ValidateResponse(
+            code=200,
+            message="校验完成" if result.is_valid else f"发现 {len(result.violations)} 个违规项",
+            data={
+                "is_valid": result.is_valid,
+                "check_level": request.check_level,
+                "violations": violations,
+                "summary": {
+                    "total_violations": len(result.violations),
+                    "whitelist_matches": result.whitelist_matches,
+                    "blacklist_matches": result.blacklist_matches
+                }
+            }
+        )
+
+    except Exception as e:
+        logger.exception(f"[DesensitizeAPI] 校验失败: {e}")
+        raise HTTPException(status_code=500, detail=f"校验失败: {str(e)}")
+
+
+@desensitize_router.post("/remap", response_model=RemapResponse)
+async def remap_result(request: RemapRequest):
+    """
+    结果还原接口
+
+    将云端审查意见中的泛化占位符还原为真实工程术语,生成最终审查报告
+
+    示例转换:
+    - "[项目经理A]在[1号特大桥]K0+500处发现安全隐患"
+    - "张三在映雪特大桥D1K86+779.91处发现安全隐患"
+    """
+    try:
+        logger.info(f"[DesensitizeAPI] 结果还原请求: task_id={request.task_id}")
+
+        # 检查字典是否存在
+        if not await dict_manager.exists(request.task_id):
+            raise HTTPException(status_code=404, detail=f"找不到脱敏字典: {request.task_id}")
+
+        # 执行映射
+        remap_result = await remapper.remap(
+            cloud_response=request.cloud_response,
+            task_id=request.task_id,
+            remap_coordinate=request.remap_coordinate
+        )
+
+        if remap_result.errors:
+            logger.warning(f"[DesensitizeAPI] 映射警告: {remap_result.errors}")
+
+        return RemapResponse(
+            code=200,
+            message="映射还原成功",
+            data={
+                "task_id": request.task_id,
+                "original_response": remap_result.original_response,
+                "remapped_response": remap_result.remapped_response,
+                "mapping_summary": remap_result.mapping_summary
+            }
+        )
+
+    except HTTPException:
+        raise
+    except Exception as e:
+        logger.exception(f"[DesensitizeAPI] 结果还原失败: {e}")
+        raise HTTPException(status_code=500, detail=f"结果还原失败: {str(e)}")
+
+
+@desensitize_router.get("/dict/{task_id}", response_model=DictInfoResponse)
+async def get_dict_info(task_id: str):
+    """
+    字典查询接口
+
+    查询脱敏字典元信息(不包含敏感映射内容)
+    """
+    try:
+        metadata = dict_manager.get_dict_metadata(task_id)
+
+        if not metadata:
+            raise HTTPException(status_code=404, detail=f"找不到脱敏字典: {task_id}")
+
+        return DictInfoResponse(
+            code=200,
+            message="查询成功",
+            data={
+                "task_id": task_id,
+                "metadata": {
+                    "file_path": metadata.get("file_path"),
+                    "file_size": metadata.get("file_size"),
+                    "modified_at": metadata.get("modified_at")
+                }
+            }
+        )
+
+    except HTTPException:
+        raise
+    except Exception as e:
+        logger.exception(f"[DesensitizeAPI] 字典查询失败: {e}")
+        raise HTTPException(status_code=500, detail=f"查询失败: {str(e)}")
+
+
+@desensitize_router.delete("/dict/{task_id}")
+async def delete_dict(task_id: str):
+    """
+    删除脱敏字典接口
+
+    手动删除指定任务的脱敏字典(通常由自动清理任务处理)
+    """
+    try:
+        success = await dict_manager.delete(task_id)
+
+        if success:
+            return JSONResponse(
+                status_code=200,
+                content={
+                    "code": 200,
+                    "message": f"字典 {task_id} 已删除",
+                    "data": {"task_id": task_id}
+                }
+            )
+        else:
+            raise HTTPException(status_code=500, detail="删除失败")
+
+    except Exception as e:
+        logger.exception(f"[DesensitizeAPI] 字典删除失败: {e}")
+        raise HTTPException(status_code=500, detail=f"删除失败: {str(e)}")
+
+
+@desensitize_router.post("/text")
+async def desensitize_text(
+    content: str = Form(..., description="待脱敏文本内容"),
+    level: str = Form(default="standard", description="脱敏级别")
+):
+    """
+    文本脱敏接口(简化版)
+
+    直接对输入文本进行脱敏,不存储字典(适用于简单场景)
+    """
+    try:
+        # 注:脱敏功能暂时禁用,直接返回原始内容
+        # TODO: 如需启用脱敏,取消下面注释
+        # task_id = f"text-{datetime.now().strftime('%Y%m%d')}-{uuid.uuid4().hex[:6]}"
+        # result = await desensitize_engine.process(content, task_id)
+        # await dict_manager.delete(task_id)
+
+        return JSONResponse(
+            status_code=200,
+            content={
+                "code": 200,
+                "message": "文本处理成功(脱敏功能已禁用)",
+                "data": {
+                    "original_length": len(content),
+                    "desensitized_length": len(content),
+                    "desensitized_content": content,  # 返回原始内容
+                    "statistics": {
+                        "pii_count": 0,
+                        "geo_count": 0,
+                        "biz_count": 0,
+                        "financial_count": 0
+                    }
+                }
+            }
+        )
+
+    except Exception as e:
+        logger.exception(f"[DesensitizeAPI] 文本脱敏失败: {e}")
+        raise HTTPException(status_code=500, detail=f"文本脱敏失败: {str(e)}")

+ 2 - 4
views/construction_review/file_upload.py

@@ -4,7 +4,6 @@
 """
 """
 import ast
 import ast
 import traceback
 import traceback
-import uuid
 import time
 import time
 import tempfile
 import tempfile
 import subprocess
 import subprocess
@@ -12,15 +11,14 @@ import os
 from datetime import datetime
 from datetime import datetime
 from pathlib import Path
 from pathlib import Path
 
 
-from pydantic import BaseModel, Field
-from typing import Optional,List
+from pydantic import BaseModel
+from typing import List
 from foundation.utils import md5
 from foundation.utils import md5
 from foundation.infrastructure.config import config_handler
 from foundation.infrastructure.config import config_handler
 from .schemas.error_schemas import FileUploadErrors
 from .schemas.error_schemas import FileUploadErrors
 from core.base.workflow_manager import WorkflowManager
 from core.base.workflow_manager import WorkflowManager
 from foundation.observability.logger.loggering import review_logger as logger
 from foundation.observability.logger.loggering import review_logger as logger
 from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Request
 from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Request
-from core.base.redis_duplicate_checker import RedisDuplicateChecker
 from foundation.infrastructure.tracing import TraceContext, auto_trace
 from foundation.infrastructure.tracing import TraceContext, auto_trace
 
 
 
 

+ 1 - 6
views/construction_review/review_results.py

@@ -3,14 +3,9 @@
 模拟风险统计、总结报告和问题条文返回
 模拟风险统计、总结报告和问题条文返回
 """
 """
 
 
-import random
-import os
-import json
-from datetime import datetime
 from fastapi import APIRouter, HTTPException, Query
 from fastapi import APIRouter, HTTPException, Query
 from pydantic import BaseModel
 from pydantic import BaseModel
-from typing import Optional, Dict, Any
-from .schemas.error_schemas import ReviewResultsErrors
+from typing import Dict, Any
 from foundation.observability.cachefiles import cache, CacheBaseDir
 from foundation.observability.cachefiles import cache, CacheBaseDir
 
 
 
 

+ 0 - 1
views/construction_review/task_control.py

@@ -3,7 +3,6 @@
 提供任务终止、查询等控制功能
 提供任务终止、查询等控制功能
 """
 """
 
 
-import asyncio
 from typing import List, Optional, Dict, Any
 from typing import List, Optional, Dict, Any
 from pydantic import BaseModel, Field
 from pydantic import BaseModel, Field
 from fastapi import APIRouter, HTTPException, Query
 from fastapi import APIRouter, HTTPException, Query

+ 0 - 1
views/construction_write/content_completion.py

@@ -6,7 +6,6 @@ API URL: https://dashscope.aliyuncs.com/compatible-mode/v1
 模型:qwen3-30b-a3b-instruct-2507
 模型:qwen3-30b-a3b-instruct-2507
 """
 """
 
 
-import os
 import uuid
 import uuid
 import json
 import json
 import time
 import time

+ 0 - 2
views/construction_write/outline_views.py

@@ -10,7 +10,6 @@
 - POST /sgbx/context_generate: SSE 流式上下文生成 (新增)
 - POST /sgbx/context_generate: SSE 流式上下文生成 (新增)
 """
 """
 
 
-import os
 import uuid
 import uuid
 import json
 import json
 import time
 import time
@@ -26,7 +25,6 @@ from foundation.infrastructure.config.config import config_handler
 from core.base.workflow_manager import WorkflowManager
 from core.base.workflow_manager import WorkflowManager
 from core.base.sse_manager import unified_sse_manager
 from core.base.sse_manager import unified_sse_manager
 from core.base.progress_manager import ProgressManager
 from core.base.progress_manager import ProgressManager
-from redis import asyncio as redis_async  # 新增这行
 from redis.asyncio import Redis as AsyncRedis
 from redis.asyncio import Redis as AsyncRedis
 
 
 # 创建路由
 # 创建路由

Beberapa file tidak ditampilkan karena terlalu banyak file yang berubah dalam diff ini