|
@@ -0,0 +1,268 @@
|
|
|
|
|
+"""
|
|
|
|
|
+黑白名单校验器
|
|
|
|
|
+
|
|
|
|
|
+在文件发往云端前进行最终正则校验,任一黑名单模式命中则拦截上传并告警,确保零漏脱。
|
|
|
|
|
+"""
|
|
|
|
|
+
|
|
|
|
|
+import re
|
|
|
|
|
+import logging
|
|
|
|
|
+from dataclasses import dataclass, field
|
|
|
|
|
+from typing import List, Dict, Any, Optional
|
|
|
|
|
+
|
|
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@dataclass
|
|
|
|
|
+class ValidationResult:
|
|
|
|
|
+ """校验结果"""
|
|
|
|
|
+ is_valid: bool # 是否通过校验
|
|
|
|
|
+ violations: List[Dict[str, Any]] = field(default_factory=list) # 违规项列表
|
|
|
|
|
+ whitelist_matches: int = 0 # 白名单匹配数
|
|
|
|
|
+ blacklist_matches: int = 0 # 黑名单匹配数
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class BlackWhiteListChecker:
|
|
|
|
|
+ """黑白名单校验器 - 上传前最终安全门
|
|
|
|
|
+
|
|
|
|
|
+ 核心约束:
|
|
|
|
|
+ - 黑名单:绝对不允许出现的模式
|
|
|
|
|
+ - 白名单:脱敏后允许的合规格式(用于统计和辅助判断)
|
|
|
|
|
+
|
|
|
|
|
+ 根据 wlast.md 文档第6.3.2节设计
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ # 黑名单:绝对不允许出现的模式(原始敏感数据格式)
|
|
|
|
|
+ BLACKLIST_PATTERNS = [
|
|
|
|
|
+ # 手机号
|
|
|
|
|
+ (r'\b1[3-9]\d{9}\b', 'phone', 'critical'),
|
|
|
|
|
+ # 身份证号
|
|
|
|
|
+ (r'\b\d{17}[\dXx]\b', 'id_card', 'critical'),
|
|
|
|
|
+ # 企业名称(含集团、局、有限公司等)
|
|
|
|
|
+ (r'[\u4e00-\u9fa5]{2,10}(集团|有限公司|局|处)', 'company', 'high'),
|
|
|
|
|
+ # 精确绝对桩号(含小数点精确值)
|
|
|
|
|
+ (r'[A-Z]?\d?[Kk]\d{2,5}\+\d{3,6}\.\d+', 'coordinate', 'high'),
|
|
|
|
|
+ # 证书编号
|
|
|
|
|
+ (r'[京津沪渝冀豫云辽黑湘皖鲁新苏浙赣鄂桂甘晋蒙陕吉闽贵粤川青藏琼宁][A-Z]\d{6,12}', 'cert', 'high'),
|
|
|
|
|
+ # 邮箱
|
|
|
|
|
+ (r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', 'email', 'medium'),
|
|
|
|
|
+ # 统一社会信用代码
|
|
|
|
|
+ (r'[A-Z0-9]{18}', 'credit_code', 'medium'),
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ # 白名单:脱敏后允许的合规格式
|
|
|
|
|
+ WHITELIST_PATTERNS = [
|
|
|
|
|
+ # 脱敏标记
|
|
|
|
|
+ (r'\[手机号脱敏\]', 'masked_phone'),
|
|
|
|
|
+ (r'\[证件号脱敏\]', 'masked_id'),
|
|
|
|
|
+ (r'\[邮箱脱敏\]', 'masked_email'),
|
|
|
|
|
+ (r'\[金额脱敏\]', 'masked_amount'),
|
|
|
|
|
+ (r'\[单价脱敏\]', 'masked_price'),
|
|
|
|
|
+ (r'\[数量脱敏\]', 'masked_quantity'),
|
|
|
|
|
+ (r'\[桩号脱敏\]', 'masked_coordinate'),
|
|
|
|
|
+ (r'\[合同编号脱敏\]', 'masked_contract'),
|
|
|
|
|
+ (r'\[信用代码脱敏\]', 'masked_credit'),
|
|
|
|
|
+ # 角色占位符
|
|
|
|
|
+ (r'\[项目经理[ABCDEFGHIJ]\]', 'role_pm'),
|
|
|
|
|
+ (r'\[安全员[ABCDEFGHIJ]\]', 'role_safety'),
|
|
|
|
|
+ (r'\[技术负责人[ABCDEFGHIJ]\]', 'role_tech'),
|
|
|
|
|
+ (r'\[负责人[ABCDEFGHIJ]\]', 'role_leader'),
|
|
|
|
|
+ (r'\[班组长[ABCDEFGHIJ]\]', 'role_foreman'),
|
|
|
|
|
+ (r'\[施工员[ABCDEFGHIJ]\]', 'role_worker'),
|
|
|
|
|
+ (r'\[质检员[ABCDEFGHIJ]\]', 'role_qc'),
|
|
|
|
|
+ (r'\[人员[ABCDEFGHIJ]\]', 'role_person'),
|
|
|
|
|
+ # 单位/角色词汇
|
|
|
|
|
+ (r'\[建设单位\]', 'company_constructor'),
|
|
|
|
|
+ (r'\[总承包单位\]', 'company_contractor'),
|
|
|
|
|
+ (r'\[监理单位\]', 'company_supervisor'),
|
|
|
|
|
+ (r'\[设计单位\]', 'company_designer'),
|
|
|
|
|
+ (r'\[劳务分包单位\]', 'company_subcontractor'),
|
|
|
|
|
+ (r'\[数字化实施方\]', 'company_tech'),
|
|
|
|
|
+ (r'\[某工程项目\]', 'project_masked'),
|
|
|
|
|
+ # 工程实体占位符
|
|
|
|
|
+ (r'\[\d+号特大桥\]', 'entity_bridge'),
|
|
|
|
|
+ (r'\[\d+号隧道\]', 'entity_tunnel'),
|
|
|
|
|
+ (r'\[\d+号涵洞\]', 'entity_culvert'),
|
|
|
|
|
+ (r'\[\d+号河流\]', 'entity_river'),
|
|
|
|
|
+ (r'\[\d+号立交\]', 'entity_interchange'),
|
|
|
|
|
+ # 相对桩号(标准化格式 K{数字}+{数字})
|
|
|
|
|
+ (r'K\d+\+\d{3}', 'relative_coordinate'),
|
|
|
|
|
+ # 地形描述
|
|
|
|
|
+ (r'\[高原高山区\]', 'terrain_high'),
|
|
|
|
|
+ (r'\[中山区\]', 'terrain_mid'),
|
|
|
|
|
+ (r'\[低山丘陵区\]', 'terrain_low'),
|
|
|
|
|
+ (r'\[平原区\]', 'terrain_plain'),
|
|
|
|
|
+ (r'\[某地区\]', 'location_masked'),
|
|
|
|
|
+ # 地质等级
|
|
|
|
|
+ (r'\[[IV]+级围岩区\]', 'geology_level'),
|
|
|
|
|
+ # 审批标记
|
|
|
|
|
+ (r'\[已审核\]', 'signature_masked'),
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ def __init__(self):
|
|
|
|
|
+ """初始化校验器,编译正则表达式"""
|
|
|
|
|
+ self.blacklist = [
|
|
|
|
|
+ (re.compile(pattern), pattern_type, severity)
|
|
|
|
|
+ for pattern, pattern_type, severity in self.BLACKLIST_PATTERNS
|
|
|
|
|
+ ]
|
|
|
|
|
+ self.whitelist = [
|
|
|
|
|
+ (re.compile(pattern), pattern_type)
|
|
|
|
|
+ for pattern, pattern_type in self.WHITELIST_PATTERNS
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ def validate(self, content: str, check_level: str = "strict") -> ValidationResult:
|
|
|
|
|
+ """执行黑白名单校验
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ content: 待校验的文本内容
|
|
|
|
|
+ check_level: 校验级别 ('strict' 严格 / 'normal' 普通)
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ ValidationResult: 校验结果
|
|
|
|
|
+ """
|
|
|
|
|
+ violations = []
|
|
|
|
|
+ whitelist_matches = 0
|
|
|
|
|
+ blacklist_matches = 0
|
|
|
|
|
+
|
|
|
|
|
+ # 1. 黑名单校验(查找原始敏感数据残留)
|
|
|
|
|
+ for pattern, pattern_type, severity in self.blacklist:
|
|
|
|
|
+ matches = pattern.findall(content)
|
|
|
|
|
+ if matches:
|
|
|
|
|
+ blacklist_matches += len(matches)
|
|
|
|
|
+ # 去重,限制报告数量
|
|
|
|
|
+ unique_matches = list(dict.fromkeys(matches))[:5]
|
|
|
|
|
+
|
|
|
|
|
+ for match in unique_matches:
|
|
|
|
|
+ # 查找位置
|
|
|
|
|
+ positions = self._find_positions(content, pattern, match)
|
|
|
|
|
+
|
|
|
|
|
+ violations.append({
|
|
|
|
|
+ "type": pattern_type,
|
|
|
|
|
+ "pattern": str(pattern.pattern),
|
|
|
|
|
+ "match": match,
|
|
|
|
|
+ "positions": positions,
|
|
|
|
|
+ "severity": severity,
|
|
|
|
|
+ "suggestion": self._get_suggestion(pattern_type),
|
|
|
|
|
+ "check": "blacklist"
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ # 2. 白名单统计(用于辅助判断脱敏覆盖率)
|
|
|
|
|
+ for pattern, pattern_type in self.whitelist:
|
|
|
|
|
+ matches = pattern.findall(content)
|
|
|
|
|
+ whitelist_matches += len(matches)
|
|
|
|
|
+
|
|
|
|
|
+ # 3. 严格模式额外检查
|
|
|
|
|
+ if check_level == "strict":
|
|
|
|
|
+ # 检查是否有疑似中文姓名残留(2-4字人名,前面有职务词)
|
|
|
|
|
+ strict_violations = self._strict_mode_check(content)
|
|
|
|
|
+ violations.extend(strict_violations)
|
|
|
|
|
+ blacklist_matches += len(strict_violations)
|
|
|
|
|
+
|
|
|
|
|
+ is_valid = len(violations) == 0
|
|
|
|
|
+
|
|
|
|
|
+ if not is_valid:
|
|
|
|
|
+ logger.warning(
|
|
|
|
|
+ f"[BlackWhiteListChecker] 校验失败: {len(violations)} 个违规项, "
|
|
|
|
|
+ f"黑名单匹配: {blacklist_matches}, 白名单匹配: {whitelist_matches}"
|
|
|
|
|
+ )
|
|
|
|
|
+ for v in violations[:3]: # 只记录前3个
|
|
|
|
|
+ logger.warning(f" - {v['type']}: {v['match'][:50]}...")
|
|
|
|
|
+
|
|
|
|
|
+ return ValidationResult(
|
|
|
|
|
+ is_valid=is_valid,
|
|
|
|
|
+ violations=violations,
|
|
|
|
|
+ whitelist_matches=whitelist_matches,
|
|
|
|
|
+ blacklist_matches=blacklist_matches
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ def _find_positions(self, content: str, pattern: re.Pattern,
|
|
|
|
|
+ match_text: str) -> List[Dict[str, Any]]:
|
|
|
|
|
+ """查找匹配文本在内容中的位置"""
|
|
|
|
|
+ positions = []
|
|
|
|
|
+ for m in pattern.finditer(content):
|
|
|
|
|
+ if m.group(0) == match_text:
|
|
|
|
|
+ # 计算行号和列号
|
|
|
|
|
+ line_num = content[:m.start()].count('\n') + 1
|
|
|
|
|
+ line_start = content.rfind('\n', 0, m.start()) + 1
|
|
|
|
|
+ col_num = m.start() - line_start + 1
|
|
|
|
|
+
|
|
|
|
|
+ positions.append({
|
|
|
|
|
+ "start": m.start(),
|
|
|
|
|
+ "end": m.end(),
|
|
|
|
|
+ "line": line_num,
|
|
|
|
|
+ "column": col_num
|
|
|
|
|
+ })
|
|
|
|
|
+ if len(positions) >= 3: # 限制位置数量
|
|
|
|
|
+ break
|
|
|
|
|
+ return positions
|
|
|
|
|
+
|
|
|
|
|
+ def _get_suggestion(self, pattern_type: str) -> str:
|
|
|
|
|
+ """根据违规类型返回建议"""
|
|
|
|
|
+ suggestions = {
|
|
|
|
|
+ 'phone': '替换为 [手机号脱敏]',
|
|
|
|
|
+ 'id_card': '替换为 [证件号脱敏]',
|
|
|
|
|
+ 'company': '替换为 [建设单位] 等标准角色词汇',
|
|
|
|
|
+ 'coordinate': '转换为相对桩号格式 K{n}+{m}',
|
|
|
|
|
+ 'cert': '替换为 [证件号脱敏]',
|
|
|
|
|
+ 'email': '替换为 [邮箱脱敏]',
|
|
|
|
|
+ 'credit_code': '替换为 [信用代码脱敏]',
|
|
|
|
|
+ 'person_name': '替换为 [项目经理A] 等角色占位符',
|
|
|
|
|
+ }
|
|
|
|
|
+ return suggestions.get(pattern_type, '检查并脱敏处理')
|
|
|
|
|
+
|
|
|
|
|
+ def _strict_mode_check(self, content: str) -> List[Dict[str, Any]]:
|
|
|
|
|
+ """严格模式额外检查"""
|
|
|
|
|
+ violations = []
|
|
|
|
|
+
|
|
|
|
|
+ # 检查疑似中文姓名(带职务前缀)
|
|
|
|
|
+ person_pattern = re.compile(
|
|
|
|
|
+ r'(项目经理|负责人|安全员|技术负责人|班组长|施工员|质检员)[::\s]*'
|
|
|
|
|
+ r'([\u4e00-\u9fa5]{2,4})'
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ for match in person_pattern.finditer(content):
|
|
|
|
|
+ name = match.group(2)
|
|
|
|
|
+ # 简单启发式:排除常见非人名词
|
|
|
|
|
+ non_person_words = ['要求', '规定', '标准', '规范', '措施', '方案',
|
|
|
|
|
+ '计划', '制度', '管理', '控制', '保证', '确保']
|
|
|
|
|
+ if name not in non_person_words:
|
|
|
|
|
+ line_num = content[:match.start()].count('\n') + 1
|
|
|
|
|
+
|
|
|
|
|
+ violations.append({
|
|
|
|
|
+ "type": "person_name",
|
|
|
|
|
+ "pattern": str(person_pattern.pattern),
|
|
|
|
|
+ "match": match.group(0),
|
|
|
|
|
+ "positions": [{"line": line_num, "column": match.start()}],
|
|
|
|
|
+ "severity": "high",
|
|
|
|
|
+ "suggestion": f"将'{name}'替换为[项目经理A]等角色占位符",
|
|
|
|
|
+ "check": "strict_mode"
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ return violations
|
|
|
|
|
+
|
|
|
|
|
+ def quick_check(self, content: str) -> bool:
|
|
|
|
|
+ """快速检查,仅返回是否通过(用于高频场景)"""
|
|
|
|
|
+ for pattern, _, _ in self.blacklist:
|
|
|
|
|
+ if pattern.search(content):
|
|
|
|
|
+ return False
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ def get_check_summary(self, content: str) -> Dict[str, Any]:
|
|
|
|
|
+ """获取校验摘要统计"""
|
|
|
|
|
+ result = self.validate(content)
|
|
|
|
|
+
|
|
|
|
|
+ # 按类型统计
|
|
|
|
|
+ severity_count = {"critical": 0, "high": 0, "medium": 0, "low": 0}
|
|
|
|
|
+ type_count = {}
|
|
|
|
|
+
|
|
|
|
|
+ for v in result.violations:
|
|
|
|
|
+ severity_count[v["severity"]] = severity_count.get(v["severity"], 0) + 1
|
|
|
|
|
+ type_count[v["type"]] = type_count.get(v["type"], 0) + 1
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ "is_valid": result.is_valid,
|
|
|
|
|
+ "total_violations": len(result.violations),
|
|
|
|
|
+ "severity_distribution": severity_count,
|
|
|
|
|
+ "type_distribution": type_count,
|
|
|
|
|
+ "whitelist_matches": result.whitelist_matches,
|
|
|
|
|
+ "coverage_estimate": min(100, result.whitelist_matches * 5) # 粗略估计
|
|
|
|
|
+ }
|