""" 脱敏模块 API 接口 提供文档脱敏、校验、结果还原等功能的 REST API 接口 根据 wlast.md 文档第7节设计 """ import uuid from datetime import datetime from typing import Optional from pydantic import BaseModel, Field from fastapi import APIRouter, HTTPException, UploadFile, File, Form from fastapi.responses import JSONResponse from foundation.observability.logger.loggering import review_logger as logger from core.construction_review.component.desensitize import ( BlackWhiteListChecker, ValidationResult, DictManager, ) from core.construction_review.component.desensitize.remapper import ResultRemapper desensitize_router = APIRouter(prefix="/desensitize", tags=["数据脱敏"]) # 初始化组件 validator = BlackWhiteListChecker() dict_manager = DictManager() remapper = ResultRemapper() # ============ 请求/响应模型 ============ class DesensitizeLevel: """脱敏级别枚举""" MINIMAL = "minimal" # 最小脱敏:仅PII STANDARD = "standard" # 标准脱敏:PII + 地理坐标 + 商业标识 STRICT = "strict" # 严格脱敏:全四维度 class DesensitizeModelType: """脱敏模型类型枚举""" RULE = "rule" # 规则引擎 QWEN3_5_35B = "qwen3_5_35b" # Qwen3.5-35B本地推理 class ValidateCheckLevel: """校验级别枚举""" STRICT = "strict" NORMAL = "normal" class DesensitizeDocumentRequest(BaseModel): """文档脱敏请求模型""" user_id: str = Field(..., description="用户唯一标识") project_id: str = Field(..., description="项目唯一标识") desensitize_level: str = Field(default="standard", description="脱敏级别: minimal/standard/strict") model_type: str = Field(default="rule", description="脱敏处理模型: rule/qwen3_5_35b") class DesensitizeDocumentResponse(BaseModel): """文档脱敏响应模型""" code: int = Field(default=200, description="状态码") message: str = Field(default="success", description="状态消息") data: dict = Field(default_factory=dict, description="响应数据") class ValidateRequest(BaseModel): """脱敏校验请求模型""" content: str = Field(..., description="待校验的文本内容") check_level: str = Field(default="strict", description="校验级别: strict/normal") class ValidateResponse(BaseModel): """脱敏校验响应模型""" code: int = Field(default=200, description="状态码") message: str = Field(default="success", description="状态消息") data: dict = Field(default_factory=dict, description="响应数据") class RemapRequest(BaseModel): """结果还原请求模型""" task_id: str = Field(..., description="文档脱敏时返回的任务ID") cloud_response: str = Field(..., description="云端审查返回的文本") remap_coordinate: bool = Field(default=True, description="是否还原相对桩号") class RemapResponse(BaseModel): """结果还原响应模型""" code: int = Field(default=200, description="状态码") message: str = Field(default="success", description="状态消息") data: dict = Field(default_factory=dict, description="响应数据") class DictInfoResponse(BaseModel): """字典信息响应模型""" code: int = Field(default=200, description="状态码") message: str = Field(default="success", description="状态消息") data: dict = Field(default_factory=dict, description="响应数据") # ============ API 接口 ============ @desensitize_router.post("/document", response_model=DesensitizeDocumentResponse) async def desensitize_document( user_id: str = Form(..., description="用户唯一标识"), project_id: str = Form(..., description="项目唯一标识"), document: UploadFile = File(..., description="PDF/Word格式施工方案"), desensitize_level: str = Form(default="standard", description="脱敏级别: minimal/standard/strict"), model_type: str = Form(default="rule", description="脱敏处理模型: rule/qwen3_5_35b") ): """ 文档脱敏接口 对施工方案文档进行四维度脱敏处理,生成脱敏字典并本地加密存储 - **desensitize_level**: minimal(仅PII) / standard(标准) / strict(严格) - **model_type**: rule(规则引擎) / qwen3_5_35b(本地大模型) """ try: # 生成任务ID task_id = f"des-{datetime.now().strftime('%Y%m%d')}-{uuid.uuid4().hex[:6]}" logger.info(f"[DesensitizeAPI] 文档脱敏请求: task_id={task_id}, " f"user_id={user_id}, level={desensitize_level}, model={model_type}") # 读取文档内容 content_bytes = await document.read() content = content_bytes.decode('utf-8', errors='ignore') if not content: raise HTTPException(status_code=400, detail="文档内容为空或无法解析") # 注:脱敏功能暂时禁用,直接返回原始内容 # TODO: 如需启用脱敏,取消下面注释并删除直接返回的代码 # result: DesensitizedResult = await desensitize_engine.process(content, task_id) # if not result.is_valid: # return DesensitizeDocumentResponse(...) # 直接返回原始内容(脱敏已禁用) preview_length = min(500, len(content)) return DesensitizeDocumentResponse( code=200, message="文档处理成功(脱敏功能已禁用)", data={ "task_id": task_id, "status": "completed (desensitization disabled)", "desensitize_level": desensitize_level, "model_type": model_type, "output": { "content_preview": content[:preview_length] + "..." if len(content) > preview_length else content, "content_length": len(content), "dict_hash": "" }, "statistics": { "pii_count": 0, "geo_count": 0, "biz_count": 0, "financial_count": 0 } } ) except Exception as e: logger.exception(f"[DesensitizeAPI] 文档脱敏失败: {e}") raise HTTPException(status_code=500, detail=f"脱敏处理失败: {str(e)}") @desensitize_router.post("/validate", response_model=ValidateResponse) async def validate_desensitized(request: ValidateRequest): """ 脱敏校验接口 黑白名单校验,检测脱敏是否完整,返回违规项列表 - **check_level**: strict(严格) / normal(普通) """ try: logger.info(f"[DesensitizeAPI] 校验请求: check_level={request.check_level}") result: ValidationResult = validator.validate(request.content, request.check_level) # 构造违规项响应 violations = [] for v in result.violations[:20]: # 限制返回数量 violations.append({ "type": v.get("type"), "match": v.get("match"), "severity": v.get("severity"), "suggestion": v.get("suggestion"), "position": v.get("positions", [{}])[0] if v.get("positions") else {} }) return ValidateResponse( code=200, message="校验完成" if result.is_valid else f"发现 {len(result.violations)} 个违规项", data={ "is_valid": result.is_valid, "check_level": request.check_level, "violations": violations, "summary": { "total_violations": len(result.violations), "whitelist_matches": result.whitelist_matches, "blacklist_matches": result.blacklist_matches } } ) except Exception as e: logger.exception(f"[DesensitizeAPI] 校验失败: {e}") raise HTTPException(status_code=500, detail=f"校验失败: {str(e)}") @desensitize_router.post("/remap", response_model=RemapResponse) async def remap_result(request: RemapRequest): """ 结果还原接口 将云端审查意见中的泛化占位符还原为真实工程术语,生成最终审查报告 示例转换: - "[项目经理A]在[1号特大桥]K0+500处发现安全隐患" - "张三在映雪特大桥D1K86+779.91处发现安全隐患" """ try: logger.info(f"[DesensitizeAPI] 结果还原请求: task_id={request.task_id}") # 检查字典是否存在 if not await dict_manager.exists(request.task_id): raise HTTPException(status_code=404, detail=f"找不到脱敏字典: {request.task_id}") # 执行映射 remap_result = await remapper.remap( cloud_response=request.cloud_response, task_id=request.task_id, remap_coordinate=request.remap_coordinate ) if remap_result.errors: logger.warning(f"[DesensitizeAPI] 映射警告: {remap_result.errors}") return RemapResponse( code=200, message="映射还原成功", data={ "task_id": request.task_id, "original_response": remap_result.original_response, "remapped_response": remap_result.remapped_response, "mapping_summary": remap_result.mapping_summary } ) except HTTPException: raise except Exception as e: logger.exception(f"[DesensitizeAPI] 结果还原失败: {e}") raise HTTPException(status_code=500, detail=f"结果还原失败: {str(e)}") @desensitize_router.get("/dict/{task_id}", response_model=DictInfoResponse) async def get_dict_info(task_id: str): """ 字典查询接口 查询脱敏字典元信息(不包含敏感映射内容) """ try: metadata = dict_manager.get_dict_metadata(task_id) if not metadata: raise HTTPException(status_code=404, detail=f"找不到脱敏字典: {task_id}") return DictInfoResponse( code=200, message="查询成功", data={ "task_id": task_id, "metadata": { "file_path": metadata.get("file_path"), "file_size": metadata.get("file_size"), "modified_at": metadata.get("modified_at") } } ) except HTTPException: raise except Exception as e: logger.exception(f"[DesensitizeAPI] 字典查询失败: {e}") raise HTTPException(status_code=500, detail=f"查询失败: {str(e)}") @desensitize_router.delete("/dict/{task_id}") async def delete_dict(task_id: str): """ 删除脱敏字典接口 手动删除指定任务的脱敏字典(通常由自动清理任务处理) """ try: success = await dict_manager.delete(task_id) if success: return JSONResponse( status_code=200, content={ "code": 200, "message": f"字典 {task_id} 已删除", "data": {"task_id": task_id} } ) else: raise HTTPException(status_code=500, detail="删除失败") except Exception as e: logger.exception(f"[DesensitizeAPI] 字典删除失败: {e}") raise HTTPException(status_code=500, detail=f"删除失败: {str(e)}") @desensitize_router.post("/text") async def desensitize_text( content: str = Form(..., description="待脱敏文本内容"), level: str = Form(default="standard", description="脱敏级别") ): """ 文本脱敏接口(简化版) 直接对输入文本进行脱敏,不存储字典(适用于简单场景) """ try: # 注:脱敏功能暂时禁用,直接返回原始内容 # TODO: 如需启用脱敏,取消下面注释 # task_id = f"text-{datetime.now().strftime('%Y%m%d')}-{uuid.uuid4().hex[:6]}" # result = await desensitize_engine.process(content, task_id) # await dict_manager.delete(task_id) return JSONResponse( status_code=200, content={ "code": 200, "message": "文本处理成功(脱敏功能已禁用)", "data": { "original_length": len(content), "desensitized_length": len(content), "desensitized_content": content, # 返回原始内容 "statistics": { "pii_count": 0, "geo_count": 0, "biz_count": 0, "financial_count": 0 } } } ) except Exception as e: logger.exception(f"[DesensitizeAPI] 文本脱敏失败: {e}") raise HTTPException(status_code=500, detail=f"文本脱敏失败: {str(e)}")