""" 标准库匹配规则服务 - 内存处理版本 实现施工方案审查-时效性审查的匹配逻辑 架构: - StandardRepository: 内存数据存储和索引 - StandardMatcher: 匹配规则逻辑 - StandardMatchingService: 对外服务接口 """ import sys import os # 添加项目根目录到 Python 路径 current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(os.path.dirname(current_dir)) if project_root not in sys.path: sys.path.insert(0, project_root) from typing import List, Dict, Optional, Set from dataclasses import dataclass, field from enum import Enum class ValidityStatus(Enum): """时效性状态""" CURRENT = "XH" # 现行 TRIAL = "SX" # 试行 ABOLISHED = "FZ" # 废止 class MatchResultCode(Enum): """匹配结果状态码""" OK = "OK" # 正常 SUBSTITUTED = "SUBSTITUTED" # 被替代 ABOLISHED = "ABOLISHED" # 废止无现行 MISMATCH = "MISMATCH" # 不匹配 NOT_FOUND = "NOT_FOUND" # 标准库不存在 @dataclass class StandardMatchResult: """标准匹配结果数据结构""" seq_no: int = 0 # 序号 original_name: str = "" # 原始标准名称 original_number: str = "" # 原始标准号 substitute_number: Optional[str] = None # 替代标准号(如果有) substitute_name: Optional[str] = None # 替代标准名称(如果有) process_result: str = "" # 处理结果状态 status_code: str = "" # 状态码 final_result: str = "" # 最终结果消息 @dataclass class StandardRecord: """标准记录数据结构""" id: int standard_name: str standard_number: str validity: str class StandardRepository: """ 标准库内存数据仓库 负责加载和索引标准数据,支持快速查询 """ def __init__(self): # 原始数据列表 self._records: List[StandardRecord] = [] # 索引结构,加速查询 self._number_index: Dict[str, StandardRecord] = {} # 标准号 -> 记录 self._name_index: Dict[str, List[StandardRecord]] = {} # 名称 -> 记录列表 self._current_records: List[StandardRecord] = [] # 现行/试行标准列表 def load_data(self, raw_data: List[Dict]): """ 加载原始数据到内存并建立索引 Args: raw_data: 从数据库查询的原始标准数据列表 """ self._records = [] self._number_index = {} self._name_index = {} self._current_records = [] for item in raw_data: # 跳过无效数据 standard_number = item.get("standard_number") standard_name = item.get("standard_name") if not standard_number or not standard_name: continue record = StandardRecord( id=item.get("id", 0), standard_name=standard_name, standard_number=standard_number, validity=item.get("validity", "") ) self._records.append(record) # 建立标准号索引 self._number_index[record.standard_number] = record # 建立名称索引(一个名称可能对应多个标准号) if record.standard_name not in self._name_index: self._name_index[record.standard_name] = [] self._name_index[record.standard_name].append(record) # 收集现行/试行标准 if record.validity in [ValidityStatus.CURRENT.value, ValidityStatus.TRIAL.value]: self._current_records.append(record) # 对现行标准按标准号降序排序(用于找最新替代标准) # 处理可能的 None 值 self._current_records.sort( key=lambda r: r.standard_number or "", reverse=True ) print(f"self._records={len(self._records)}") def find_by_number_exact(self, standard_number: str) -> Optional[StandardRecord]: """精确匹配标准号""" return self._number_index.get(standard_number) def find_by_name_exact(self, standard_name: str) -> Optional[StandardRecord]: """精确匹配标准名称(返回第一个)""" records = self._name_index.get(standard_name, []) return records[0] if records else None def find_by_name_fuzzy(self, standard_name: str) -> List[StandardRecord]: """模糊匹配标准名称""" results = [] for name, records in self._name_index.items(): if standard_name in name or name in standard_name: results.extend(records) return results def find_by_number_fuzzy(self, standard_number: str) -> List[StandardRecord]: """模糊匹配标准号""" results = [] # 提取前缀(如 GB/T 5972) parts = standard_number.split("-") prefix = parts[0] if parts else standard_number for number, record in self._number_index.items(): # 前缀匹配 if number.startswith(prefix): results.append(record) return results def find_current_by_name(self, standard_name: str) -> List[StandardRecord]: """查询指定名称的现行/试行标准(支持模糊匹配)""" results = [] for record in self._current_records: # 精确匹配 if record.standard_name == standard_name: results.append(record) # 模糊匹配(忽略空格、书名号等) elif self._is_name_fuzzy_match_for_repo(record.standard_name, standard_name): results.append(record) return results def _is_name_fuzzy_match_for_repo(self, name1: str, name2: str) -> bool: """判断两个标准名称是否模糊匹配""" clean1 = name1.replace("《", "").replace("》", "").replace(" ", "").replace(" ", "") clean2 = name2.replace("《", "").replace("》", "").replace(" ", "").replace(" ", "") return clean1 == clean2 def get_all_records(self) -> List[StandardRecord]: """获取所有记录""" return self._records.copy() class StandardMatcher: """ 标准匹配器 实现标准库匹配规则的核心逻辑 """ def __init__(self, repository: StandardRepository): self.repo = repository def match(self, seq_no: int, input_name: str, input_number: str) -> StandardMatchResult: """ 执行标准匹配 匹配流程: 1. 标准号精确匹配 2. 根据匹配结果进入不同分支处理 """ # 去除前后空格 input_name = input_name.strip() if input_name else input_name input_number = input_number.strip() if input_number else input_number # 清洗书名号和括号 input_name = self._clean_brackets_and_booknames(input_name) input_number = self._clean_brackets_and_booknames(input_number) result = StandardMatchResult( seq_no=seq_no, original_name=input_name, original_number=input_number ) # 步骤1: 精确匹配标准号 match_by_number = self.repo.find_by_number_exact(input_number) if match_by_number: # 分支A: 标准号匹配成功 return self._handle_number_matched(result, match_by_number, input_name) else: # 分支B: 标准号未匹配 return self._handle_number_not_matched(result, input_name, input_number) def _handle_number_matched( self, result: StandardMatchResult, db_record: StandardRecord, input_name: str ) -> StandardMatchResult: """处理标准号匹配成功的情况""" # 检查名称是否匹配 if db_record.standard_name == input_name: # 名称也匹配 return self._handle_full_match(result, db_record) else: # 名称不匹配 return self._handle_name_mismatch(result, db_record, input_name) def _handle_full_match( self, result: StandardMatchResult, db_record: StandardRecord ) -> StandardMatchResult: """处理名称和标准号都完全匹配的情况""" if db_record.validity in [ValidityStatus.CURRENT.value, ValidityStatus.TRIAL.value]: # 情况1: 现行或试行 - 状态正常 return self._set_ok_result(result) else: # 废止状态 - 查找替代标准 return self._handle_abolished(result, db_record) def _handle_name_mismatch( self, result: StandardMatchResult, db_record: StandardRecord, input_name: str ) -> StandardMatchResult: """处理标准号匹配但名称不匹配的情况""" # 首先检查是否是名称模糊匹配(忽略空格、书名号等) if self._is_name_fuzzy_match(db_record.standard_name, input_name): # 名称模糊匹配成功,按完全匹配处理 return self._handle_full_match(result, db_record) # 尝试用输入的名称模糊匹配 name_matches = self.repo.find_by_name_fuzzy(input_name) # 查找精确名称匹配 exact_match = self._find_exact_name_match(name_matches, input_name) if exact_match: # 找到名称匹配的记录 return self._handle_fuzzy_name_match(result, exact_match) # 尝试在模糊匹配结果中查找模糊名称匹配 for match in name_matches: if self._is_name_fuzzy_match(match.standard_name, input_name): return self._handle_fuzzy_name_match(result, match) # 名称完全不匹配,但标准号已匹配成功 # 说明该标准存在于库中,应返回不匹配而非不存在 if db_record.validity in [ValidityStatus.CURRENT.value, ValidityStatus.TRIAL.value]: return self._set_mismatch_result(result, db_record) elif db_record.validity == ValidityStatus.ABOLISHED.value: return self._handle_abolished(result, db_record) return self._set_not_found_result(result) def _handle_number_not_matched( self, result: StandardMatchResult, input_name: str, input_number: str ) -> StandardMatchResult: """处理标准号未匹配的情况""" # 尝试模糊匹配标准号 fuzzy_number_matches = self.repo.find_by_number_fuzzy(input_number) if fuzzy_number_matches: # 检查名称是否匹配 return self._check_name_in_records(result, fuzzy_number_matches, input_name) else: # 尝试直接按名称查询 return self._search_by_name_only(result, input_name) def _check_name_in_records( self, result: StandardMatchResult, records: List[StandardRecord], input_name: str ) -> StandardMatchResult: """在一批记录中查找名称匹配""" # 首先尝试精确匹配 for record in records: if record.standard_name == input_name: # 名称匹配,检查状态 if record.validity in [ValidityStatus.CURRENT.value, ValidityStatus.TRIAL.value]: return self._set_mismatch_result(result, record) elif record.validity == ValidityStatus.ABOLISHED.value: return self._handle_abolished(result, record) # 尝试模糊名称匹配(忽略空格和书名号) for record in records: if self._is_name_fuzzy_match(record.standard_name, input_name): # 名称模糊匹配成功 if record.validity in [ValidityStatus.CURRENT.value, ValidityStatus.TRIAL.value]: return self._set_mismatch_result(result, record) elif record.validity == ValidityStatus.ABOLISHED.value: return self._handle_abolished(result, record) # 名称不匹配 return self._set_not_found_result(result) def _search_by_name_only( self, result: StandardMatchResult, input_name: str ) -> StandardMatchResult: """仅通过名称查询""" # 精确匹配名称 name_match = self.repo.find_by_name_exact(input_name) if name_match: if name_match.validity in [ValidityStatus.CURRENT.value, ValidityStatus.TRIAL.value]: return self._set_mismatch_result(result, name_match) elif name_match.validity == ValidityStatus.ABOLISHED.value: return self._set_not_found_result(result) # 模糊匹配名称 fuzzy_matches = self.repo.find_by_name_fuzzy(input_name) # 首先尝试精确匹配 exact_match = self._find_exact_name_match(fuzzy_matches, input_name) if exact_match: if exact_match.validity in [ValidityStatus.CURRENT.value, ValidityStatus.TRIAL.value]: return self._set_mismatch_result(result, exact_match) # 尝试模糊名称匹配(忽略空格、书名号等) for match in fuzzy_matches: if self._is_name_fuzzy_match(match.standard_name, input_name): if match.validity in [ValidityStatus.CURRENT.value, ValidityStatus.TRIAL.value]: return self._set_mismatch_result(result, match) elif match.validity == ValidityStatus.ABOLISHED.value: return self._handle_abolished(result, match) return self._set_not_found_result(result) def _handle_fuzzy_name_match( self, result: StandardMatchResult, match_record: StandardRecord ) -> StandardMatchResult: """处理模糊名称匹配成功的情况""" if match_record.validity in [ValidityStatus.CURRENT.value, ValidityStatus.TRIAL.value]: return self._set_mismatch_result(result, match_record) elif match_record.validity == ValidityStatus.ABOLISHED.value: return self._handle_abolished(result, match_record) return self._set_not_found_result(result) def _handle_abolished( self, result: StandardMatchResult, abolished_record: StandardRecord ) -> StandardMatchResult: """处理已废止标准的情况""" # 查询同名现行标准作为替代 substitutes = self.repo.find_current_by_name(abolished_record.standard_name) if substitutes: # 有替代标准,取最新的(已按标准号降序) latest = substitutes[0] return self._set_substituted_result(result, latest) else: # 无替代标准 return self._set_abolished_result(result) # ========== 格式化方法 ========== def _format_standard_name(self, name: str) -> str: """格式化标准名称,确保只有一个《》包裹""" if not name: return name name = name.strip() # 去除已有的书名号 while name.startswith('《'): name = name[1:] while name.endswith('》'): name = name[:-1] return f"《{name}》" def _format_standard_number(self, number: str) -> str: """格式化标准编号,确保用()包裹""" if not number: return number number = number.strip() # 去除已有的括号 if number.startswith('(') or number.startswith('('): number = number[1:] if number.endswith(')') or number.endswith(')'): number = number[:-1] return f"({number})" # ========== 结果设置方法(每个方法职责单一) ========== def _set_ok_result(self, result: StandardMatchResult) -> StandardMatchResult: """设置状态正常的结果""" result.process_result = "正常" result.status_code = MatchResultCode.OK.value result.final_result = "无问题" return result def _set_substituted_result( self, result: StandardMatchResult, substitute: StandardRecord ) -> StandardMatchResult: """设置被替代的结果""" result.substitute_name = self._format_standard_name(substitute.standard_name) result.substitute_number = self._format_standard_number(substitute.standard_number) result.process_result = "被替代" result.status_code = MatchResultCode.SUBSTITUTED.value result.final_result = ( f"{self._format_standard_name(result.original_name)}" f"{self._format_standard_number(result.original_number)}已废止," f"替代{self._format_standard_name(substitute.standard_name)}" f"{self._format_standard_number(substitute.standard_number)}" ) return result def _set_abolished_result(self, result: StandardMatchResult) -> StandardMatchResult: """设置废止无替代的结果""" result.process_result = "废止无现行" result.status_code = MatchResultCode.ABOLISHED.value result.final_result = ( f"{self._format_standard_name(result.original_name)}" f"{self._format_standard_number(result.original_number)}已废止,无现行状态" ) return result def _set_mismatch_result( self, result: StandardMatchResult, actual: StandardRecord ) -> StandardMatchResult: """设置不匹配的结果""" result.substitute_name = self._format_standard_name(actual.standard_name) result.substitute_number = self._format_standard_number(actual.standard_number) result.process_result = "不匹配" result.status_code = MatchResultCode.MISMATCH.value result.final_result = ( f"{self._format_standard_name(result.original_name)}" f"{self._format_standard_number(result.original_number)}" f"与实际{self._format_standard_name(actual.standard_name)}" f"{self._format_standard_number(actual.standard_number)}不匹配" ) return result def _set_not_found_result(self, result: StandardMatchResult) -> StandardMatchResult: """设置不存在的结果""" result.process_result = "标准库不存在" result.status_code = MatchResultCode.NOT_FOUND.value result.final_result = ( f"{self._format_standard_name(result.original_name)}" f"{self._format_standard_number(result.original_number)}标准库不存在,请确认" ) return result # ========== 工具方法 ========== def _is_name_fuzzy_match(self, name1: str, name2: str) -> bool: """ 判断两个标准名称是否模糊匹配 只去除书名号,保留中间空格(中间空格属于名称的一部分) """ # 清理书名号,但保留中间空格 clean1 = name1.replace("《", "").replace("》", "") clean2 = name2.replace("《", "").replace("》", "") return clean1 == clean2 def _clean_brackets_and_booknames(self, text: str) -> str: """ 清洗字符串前后的书名号和括号 包括:《》()() """ if not text: return text # 循环去除前后的书名号和括号,直到没有变化 changed = True while changed: changed = False original = text # 去除前导的书名号和括号 if text.startswith("《"): text = text[1:] changed = True if text.startswith("》"): text = text[1:] changed = True if text.startswith("("): text = text[1:] changed = True if text.startswith(")"): text = text[1:] changed = True if text.startswith("("): text = text[1:] changed = True if text.startswith(")"): text = text[1:] changed = True # 去除尾随的书名号和括号 if text.endswith("《"): text = text[:-1] changed = True if text.endswith("》"): text = text[:-1] changed = True if text.endswith("("): text = text[:-1] changed = True if text.endswith(")"): text = text[:-1] changed = True if text.endswith("("): text = text[:-1] changed = True if text.endswith(")"): text = text[:-1] changed = True # 如果文本变空了,停止循环 if not text: break return text def _find_exact_name_match( self, records: List[StandardRecord], target_name: str ) -> Optional[StandardRecord]: """在记录列表中查找精确名称匹配""" for record in records: if record.standard_name == target_name: return record return None class StandardMatchingService: """ 标准库匹配服务 对外暴露的统一接口 """ def __init__(self, db_pool=None): """ 初始化服务 Args: db_pool: 数据库连接池,如果为None则使用Mock数据 """ self.db_pool = db_pool self.repository = StandardRepository() self.matcher = StandardMatcher(self.repository) self._initialized = False async def initialize(self): """ 初始化:从数据库加载数据到内存 只需要执行一次 """ if self._initialized: return if self.db_pool: # 从真实数据库加载 from utils_test.standard_new_Test.standard_dao import StandardDAO dao = StandardDAO(self.db_pool) raw_data = await dao.load_all_standards() print(f"raw_data={len(raw_data)}") else: # 使用Mock数据 raw_data = self._get_mock_data() self.repository.load_data(raw_data) self._initialized = True async def close(self): """关闭服务,清理资源""" if self.db_pool: await self.db_pool.close() self._initialized = False def check_standards(self, standards: List[Dict[str, str]]) -> List[StandardMatchResult]: """ 批量检查标准列表 Args: standards: 标准列表,每个元素包含: - standard_name: 标准名称(原始) - standard_number: 标准号(原始) Returns: List[StandardMatchResult]: 匹配结果列表 """ if not self._initialized: raise RuntimeError("服务未初始化,请先调用 initialize()") results = [] for idx, std in enumerate(standards, start=1): result = self.matcher.match( seq_no=idx, input_name=std.get("standard_name", ""), input_number=std.get("standard_number", "") ) results.append(result) return results def check_single( self, seq_no: int, standard_name: str, standard_number: str ) -> StandardMatchResult: """ 检查单个标准 Args: seq_no: 序号 standard_name: 标准名称 standard_number: 标准号 Returns: StandardMatchResult: 匹配结果 """ if not self._initialized: raise RuntimeError("服务未初始化,请先调用 initialize()") return self.matcher.match(seq_no, standard_name, standard_number) def _get_mock_data(self) -> List[Dict]: """获取Mock数据 - 文档中的7个测试案例""" return [ # 情况1: 正常现行标准 {"id": 1, "standard_name": "铁路桥涵设计规范", "standard_number": "TB 10002-2017", "validity": "XH"}, {"id": 2, "standard_name": "铁路工程抗震设计规范", "standard_number": "GB 50111-2006", "validity": "XH"}, {"id": 3, "standard_name": "铁路混凝土工程施工质量验收标准", "standard_number": "TB 10424-2018", "validity": "XH"}, # 情况4: 不匹配(年份错误)- 输入2023,实际2024 {"id": 4, "standard_name": "公路水运危险性较大工程专项施工方案编制审查规程", "standard_number": "JT/T 1495-2024", "validity": "XH"}, # 情况2: 被替代(废止+有现行替代) {"id": 5, "standard_name": "起重机 钢丝绳 保养、维护、检验和报废", "standard_number": "GB/T 5972-2016", "validity": "FZ"}, {"id": 6, "standard_name": "起重机 钢丝绳 保养、维护、检验和报废", "standard_number": "GB/T 5972-2023", "validity": "XH"}, # 情况3: 废止无替代 {"id": 7, "standard_name": "缆索起重机", "standard_number": "GB/T 28756-2012", "validity": "FZ"}, {"id": 8, "standard_name": "电力高处作业防坠器", "standard_number": "DL/T 1147-2009", "validity": "FZ"}, ]