| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674 |
- """
- 标准库匹配规则服务 - 内存处理版本
- 实现施工方案审查-时效性审查的匹配逻辑
- 架构:
- - StandardRepository: 内存数据存储和索引
- - StandardMatcher: 匹配规则逻辑
- - StandardMatchingService: 对外服务接口
- """
- import sys
- import os
- # 添加项目根目录到 Python 路径
- current_dir = os.path.dirname(os.path.abspath(__file__))
- project_root = os.path.dirname(os.path.dirname(current_dir))
- if project_root not in sys.path:
- sys.path.insert(0, project_root)
- from typing import List, Dict, Optional, Set
- from dataclasses import dataclass, field
- from enum import Enum
- class ValidityStatus(Enum):
- """时效性状态"""
- CURRENT = "XH" # 现行
- TRIAL = "SX" # 试行
- ABOLISHED = "FZ" # 废止
- class MatchResultCode(Enum):
- """匹配结果状态码"""
- OK = "OK" # 正常
- SUBSTITUTED = "SUBSTITUTED" # 被替代
- ABOLISHED = "ABOLISHED" # 废止无现行
- MISMATCH = "MISMATCH" # 不匹配
- NOT_FOUND = "NOT_FOUND" # 标准库不存在
- @dataclass
- class StandardMatchResult:
- """标准匹配结果数据结构"""
- seq_no: int = 0 # 序号
- original_name: str = "" # 原始标准名称
- original_number: str = "" # 原始标准号
- substitute_number: Optional[str] = None # 替代标准号(如果有)
- substitute_name: Optional[str] = None # 替代标准名称(如果有)
- process_result: str = "" # 处理结果状态
- status_code: str = "" # 状态码
- final_result: str = "" # 最终结果消息
- @dataclass
- class StandardRecord:
- """标准记录数据结构"""
- id: int
- standard_name: str
- standard_number: str
- validity: str
- class StandardRepository:
- """
- 标准库内存数据仓库
- 负责加载和索引标准数据,支持快速查询
- """
- def __init__(self):
- # 原始数据列表
- self._records: List[StandardRecord] = []
- # 索引结构,加速查询
- self._number_index: Dict[str, StandardRecord] = {} # 标准号 -> 记录
- self._name_index: Dict[str, List[StandardRecord]] = {} # 名称 -> 记录列表
- self._current_records: List[StandardRecord] = [] # 现行/试行标准列表
- def load_data(self, raw_data: List[Dict]):
- """
- 加载原始数据到内存并建立索引
- Args:
- raw_data: 从数据库查询的原始标准数据列表
- """
- self._records = []
- self._number_index = {}
- self._name_index = {}
- self._current_records = []
- for item in raw_data:
- # 跳过无效数据
- standard_number = item.get("standard_number")
- standard_name = item.get("standard_name")
- if not standard_number or not standard_name:
- continue
- record = StandardRecord(
- id=item.get("id", 0),
- standard_name=standard_name,
- standard_number=standard_number,
- validity=item.get("validity", "")
- )
- self._records.append(record)
- # 建立标准号索引
- self._number_index[record.standard_number] = record
- # 建立名称索引(一个名称可能对应多个标准号)
- if record.standard_name not in self._name_index:
- self._name_index[record.standard_name] = []
- self._name_index[record.standard_name].append(record)
- # 收集现行/试行标准
- if record.validity in [ValidityStatus.CURRENT.value, ValidityStatus.TRIAL.value]:
- self._current_records.append(record)
- # 对现行标准按标准号降序排序(用于找最新替代标准)
- # 处理可能的 None 值
- self._current_records.sort(
- key=lambda r: r.standard_number or "",
- reverse=True
- )
- print(f"self._records={len(self._records)}")
- def find_by_number_exact(self, standard_number: str) -> Optional[StandardRecord]:
- """精确匹配标准号"""
- return self._number_index.get(standard_number)
- def find_by_name_exact(self, standard_name: str) -> Optional[StandardRecord]:
- """精确匹配标准名称(返回第一个)"""
- records = self._name_index.get(standard_name, [])
- return records[0] if records else None
- def find_by_name_fuzzy(self, standard_name: str) -> List[StandardRecord]:
- """模糊匹配标准名称"""
- results = []
- for name, records in self._name_index.items():
- if standard_name in name or name in standard_name:
- results.extend(records)
- return results
- def find_by_number_fuzzy(self, standard_number: str) -> List[StandardRecord]:
- """模糊匹配标准号"""
- results = []
- # 提取前缀(如 GB/T 5972)
- parts = standard_number.split("-")
- prefix = parts[0] if parts else standard_number
- for number, record in self._number_index.items():
- # 前缀匹配
- if number.startswith(prefix):
- results.append(record)
- return results
- def find_current_by_name(self, standard_name: str) -> List[StandardRecord]:
- """查询指定名称的现行/试行标准(支持模糊匹配)"""
- results = []
- for record in self._current_records:
- # 精确匹配
- if record.standard_name == standard_name:
- results.append(record)
- # 模糊匹配(忽略空格、书名号等)
- elif self._is_name_fuzzy_match_for_repo(record.standard_name, standard_name):
- results.append(record)
- return results
- def _is_name_fuzzy_match_for_repo(self, name1: str, name2: str) -> bool:
- """判断两个标准名称是否模糊匹配"""
- clean1 = name1.replace("《", "").replace("》", "").replace(" ", "").replace(" ", "")
- clean2 = name2.replace("《", "").replace("》", "").replace(" ", "").replace(" ", "")
- return clean1 == clean2
- def get_all_records(self) -> List[StandardRecord]:
- """获取所有记录"""
- return self._records.copy()
- class StandardMatcher:
- """
- 标准匹配器
- 实现标准库匹配规则的核心逻辑
- """
- def __init__(self, repository: StandardRepository):
- self.repo = repository
- def match(self, seq_no: int, input_name: str, input_number: str) -> StandardMatchResult:
- """
- 执行标准匹配
- 匹配流程:
- 1. 标准号精确匹配
- 2. 根据匹配结果进入不同分支处理
- """
- # 去除前后空格
- input_name = input_name.strip() if input_name else input_name
- input_number = input_number.strip() if input_number else input_number
- # 清洗书名号和括号
- input_name = self._clean_brackets_and_booknames(input_name)
- input_number = self._clean_brackets_and_booknames(input_number)
- result = StandardMatchResult(
- seq_no=seq_no,
- original_name=input_name,
- original_number=input_number
- )
- # 步骤1: 精确匹配标准号
- match_by_number = self.repo.find_by_number_exact(input_number)
- if match_by_number:
- # 分支A: 标准号匹配成功
- return self._handle_number_matched(result, match_by_number, input_name)
- else:
- # 分支B: 标准号未匹配
- return self._handle_number_not_matched(result, input_name, input_number)
- def _handle_number_matched(
- self,
- result: StandardMatchResult,
- db_record: StandardRecord,
- input_name: str
- ) -> StandardMatchResult:
- """处理标准号匹配成功的情况"""
- # 检查名称是否匹配
- if db_record.standard_name == input_name:
- # 名称也匹配
- return self._handle_full_match(result, db_record)
- else:
- # 名称不匹配
- return self._handle_name_mismatch(result, db_record, input_name)
- def _handle_full_match(
- self,
- result: StandardMatchResult,
- db_record: StandardRecord
- ) -> StandardMatchResult:
- """处理名称和标准号都完全匹配的情况"""
- if db_record.validity in [ValidityStatus.CURRENT.value, ValidityStatus.TRIAL.value]:
- # 情况1: 现行或试行 - 状态正常
- return self._set_ok_result(result)
- else:
- # 废止状态 - 查找替代标准
- return self._handle_abolished(result, db_record)
- def _handle_name_mismatch(
- self,
- result: StandardMatchResult,
- db_record: StandardRecord,
- input_name: str
- ) -> StandardMatchResult:
- """处理标准号匹配但名称不匹配的情况"""
- # 首先检查是否是名称模糊匹配(忽略空格、书名号等)
- if self._is_name_fuzzy_match(db_record.standard_name, input_name):
- # 名称模糊匹配成功,按完全匹配处理
- return self._handle_full_match(result, db_record)
- # 尝试用输入的名称模糊匹配
- name_matches = self.repo.find_by_name_fuzzy(input_name)
- # 查找精确名称匹配
- exact_match = self._find_exact_name_match(name_matches, input_name)
- if exact_match:
- # 找到名称匹配的记录
- return self._handle_fuzzy_name_match(result, exact_match)
- # 尝试在模糊匹配结果中查找模糊名称匹配
- for match in name_matches:
- if self._is_name_fuzzy_match(match.standard_name, input_name):
- return self._handle_fuzzy_name_match(result, match)
- # 名称完全不匹配,但标准号已匹配成功
- # 说明该标准存在于库中,应返回不匹配而非不存在
- if db_record.validity in [ValidityStatus.CURRENT.value, ValidityStatus.TRIAL.value]:
- return self._set_mismatch_result(result, db_record)
- elif db_record.validity == ValidityStatus.ABOLISHED.value:
- return self._handle_abolished(result, db_record)
- return self._set_not_found_result(result)
- def _handle_number_not_matched(
- self,
- result: StandardMatchResult,
- input_name: str,
- input_number: str
- ) -> StandardMatchResult:
- """处理标准号未匹配的情况"""
- # 尝试模糊匹配标准号
- fuzzy_number_matches = self.repo.find_by_number_fuzzy(input_number)
- if fuzzy_number_matches:
- # 检查名称是否匹配
- return self._check_name_in_records(result, fuzzy_number_matches, input_name)
- else:
- # 尝试直接按名称查询
- return self._search_by_name_only(result, input_name)
- def _check_name_in_records(
- self,
- result: StandardMatchResult,
- records: List[StandardRecord],
- input_name: str
- ) -> StandardMatchResult:
- """在一批记录中查找名称匹配"""
- # 首先尝试精确匹配
- for record in records:
- if record.standard_name == input_name:
- # 名称匹配,检查状态
- if record.validity in [ValidityStatus.CURRENT.value, ValidityStatus.TRIAL.value]:
- return self._set_mismatch_result(result, record)
- elif record.validity == ValidityStatus.ABOLISHED.value:
- return self._handle_abolished(result, record)
- # 尝试模糊名称匹配(忽略空格和书名号)
- for record in records:
- if self._is_name_fuzzy_match(record.standard_name, input_name):
- # 名称模糊匹配成功
- if record.validity in [ValidityStatus.CURRENT.value, ValidityStatus.TRIAL.value]:
- return self._set_mismatch_result(result, record)
- elif record.validity == ValidityStatus.ABOLISHED.value:
- return self._handle_abolished(result, record)
- # 名称不匹配
- return self._set_not_found_result(result)
- def _search_by_name_only(
- self,
- result: StandardMatchResult,
- input_name: str
- ) -> StandardMatchResult:
- """仅通过名称查询"""
- # 精确匹配名称
- name_match = self.repo.find_by_name_exact(input_name)
- if name_match:
- if name_match.validity in [ValidityStatus.CURRENT.value, ValidityStatus.TRIAL.value]:
- return self._set_mismatch_result(result, name_match)
- elif name_match.validity == ValidityStatus.ABOLISHED.value:
- return self._set_not_found_result(result)
- # 模糊匹配名称
- fuzzy_matches = self.repo.find_by_name_fuzzy(input_name)
- # 首先尝试精确匹配
- exact_match = self._find_exact_name_match(fuzzy_matches, input_name)
- if exact_match:
- if exact_match.validity in [ValidityStatus.CURRENT.value, ValidityStatus.TRIAL.value]:
- return self._set_mismatch_result(result, exact_match)
- # 尝试模糊名称匹配(忽略空格、书名号等)
- for match in fuzzy_matches:
- if self._is_name_fuzzy_match(match.standard_name, input_name):
- if match.validity in [ValidityStatus.CURRENT.value, ValidityStatus.TRIAL.value]:
- return self._set_mismatch_result(result, match)
- elif match.validity == ValidityStatus.ABOLISHED.value:
- return self._handle_abolished(result, match)
- return self._set_not_found_result(result)
- def _handle_fuzzy_name_match(
- self,
- result: StandardMatchResult,
- match_record: StandardRecord
- ) -> StandardMatchResult:
- """处理模糊名称匹配成功的情况"""
- if match_record.validity in [ValidityStatus.CURRENT.value, ValidityStatus.TRIAL.value]:
- return self._set_mismatch_result(result, match_record)
- elif match_record.validity == ValidityStatus.ABOLISHED.value:
- return self._handle_abolished(result, match_record)
- return self._set_not_found_result(result)
- def _handle_abolished(
- self,
- result: StandardMatchResult,
- abolished_record: StandardRecord
- ) -> StandardMatchResult:
- """处理已废止标准的情况"""
- # 查询同名现行标准作为替代
- substitutes = self.repo.find_current_by_name(abolished_record.standard_name)
- if substitutes:
- # 有替代标准,取最新的(已按标准号降序)
- latest = substitutes[0]
- return self._set_substituted_result(result, latest)
- else:
- # 无替代标准
- return self._set_abolished_result(result)
- # ========== 格式化方法 ==========
- def _format_standard_name(self, name: str) -> str:
- """格式化标准名称,确保只有一个《》包裹"""
- if not name:
- return name
- name = name.strip()
- # 去除已有的书名号
- while name.startswith('《'):
- name = name[1:]
- while name.endswith('》'):
- name = name[:-1]
- return f"《{name}》"
- def _format_standard_number(self, number: str) -> str:
- """格式化标准编号,确保用()包裹"""
- if not number:
- return number
- number = number.strip()
- # 去除已有的括号
- if number.startswith('(') or number.startswith('('):
- number = number[1:]
- if number.endswith(')') or number.endswith(')'):
- number = number[:-1]
- return f"({number})"
- # ========== 结果设置方法(每个方法职责单一) ==========
- def _set_ok_result(self, result: StandardMatchResult) -> StandardMatchResult:
- """设置状态正常的结果"""
- result.process_result = "正常"
- result.status_code = MatchResultCode.OK.value
- result.final_result = "无问题"
- return result
- def _set_substituted_result(
- self,
- result: StandardMatchResult,
- substitute: StandardRecord
- ) -> StandardMatchResult:
- """设置被替代的结果"""
- result.substitute_name = self._format_standard_name(substitute.standard_name)
- result.substitute_number = self._format_standard_number(substitute.standard_number)
- result.process_result = "被替代"
- result.status_code = MatchResultCode.SUBSTITUTED.value
- result.final_result = (
- f"{self._format_standard_name(result.original_name)}"
- f"{self._format_standard_number(result.original_number)}已废止,"
- f"替代{self._format_standard_name(substitute.standard_name)}"
- f"{self._format_standard_number(substitute.standard_number)}"
- )
- return result
- def _set_abolished_result(self, result: StandardMatchResult) -> StandardMatchResult:
- """设置废止无替代的结果"""
- result.process_result = "废止无现行"
- result.status_code = MatchResultCode.ABOLISHED.value
- result.final_result = (
- f"{self._format_standard_name(result.original_name)}"
- f"{self._format_standard_number(result.original_number)}已废止,无现行状态"
- )
- return result
- def _set_mismatch_result(
- self,
- result: StandardMatchResult,
- actual: StandardRecord
- ) -> StandardMatchResult:
- """设置不匹配的结果"""
- result.substitute_name = self._format_standard_name(actual.standard_name)
- result.substitute_number = self._format_standard_number(actual.standard_number)
- result.process_result = "不匹配"
- result.status_code = MatchResultCode.MISMATCH.value
- result.final_result = (
- f"{self._format_standard_name(result.original_name)}"
- f"{self._format_standard_number(result.original_number)}"
- f"与实际{self._format_standard_name(actual.standard_name)}"
- f"{self._format_standard_number(actual.standard_number)}不匹配"
- )
- return result
- def _set_not_found_result(self, result: StandardMatchResult) -> StandardMatchResult:
- """设置不存在的结果"""
- result.process_result = "标准库不存在"
- result.status_code = MatchResultCode.NOT_FOUND.value
- result.final_result = (
- f"{self._format_standard_name(result.original_name)}"
- f"{self._format_standard_number(result.original_number)}标准库不存在,请确认"
- )
- return result
- # ========== 工具方法 ==========
- def _is_name_fuzzy_match(self, name1: str, name2: str) -> bool:
- """
- 判断两个标准名称是否模糊匹配
- 只去除书名号,保留中间空格(中间空格属于名称的一部分)
- """
- # 清理书名号,但保留中间空格
- clean1 = name1.replace("《", "").replace("》", "")
- clean2 = name2.replace("《", "").replace("》", "")
- return clean1 == clean2
- def _clean_brackets_and_booknames(self, text: str) -> str:
- """
- 清洗字符串前后的书名号和括号
- 包括:《》()()
- """
- if not text:
- return text
- # 循环去除前后的书名号和括号,直到没有变化
- changed = True
- while changed:
- changed = False
- original = text
- # 去除前导的书名号和括号
- if text.startswith("《"):
- text = text[1:]
- changed = True
- if text.startswith("》"):
- text = text[1:]
- changed = True
- if text.startswith("("):
- text = text[1:]
- changed = True
- if text.startswith(")"):
- text = text[1:]
- changed = True
- if text.startswith("("):
- text = text[1:]
- changed = True
- if text.startswith(")"):
- text = text[1:]
- changed = True
- # 去除尾随的书名号和括号
- if text.endswith("《"):
- text = text[:-1]
- changed = True
- if text.endswith("》"):
- text = text[:-1]
- changed = True
- if text.endswith("("):
- text = text[:-1]
- changed = True
- if text.endswith(")"):
- text = text[:-1]
- changed = True
- if text.endswith("("):
- text = text[:-1]
- changed = True
- if text.endswith(")"):
- text = text[:-1]
- changed = True
- # 如果文本变空了,停止循环
- if not text:
- break
- return text
- def _find_exact_name_match(
- self,
- records: List[StandardRecord],
- target_name: str
- ) -> Optional[StandardRecord]:
- """在记录列表中查找精确名称匹配"""
- for record in records:
- if record.standard_name == target_name:
- return record
- return None
- class StandardMatchingService:
- """
- 标准库匹配服务
- 对外暴露的统一接口
- """
- def __init__(self, db_pool=None):
- """
- 初始化服务
- Args:
- db_pool: 数据库连接池,如果为None则使用Mock数据
- """
- self.db_pool = db_pool
- self.repository = StandardRepository()
- self.matcher = StandardMatcher(self.repository)
- self._initialized = False
- async def initialize(self):
- """
- 初始化:从数据库加载数据到内存
- 只需要执行一次
- """
- if self._initialized:
- return
- if self.db_pool:
- # 从真实数据库加载
- from utils_test.standard_new_Test.standard_dao import StandardDAO
- dao = StandardDAO(self.db_pool)
- raw_data = await dao.load_all_standards()
- print(f"raw_data={len(raw_data)}")
- else:
- # 使用Mock数据
- raw_data = self._get_mock_data()
- self.repository.load_data(raw_data)
- self._initialized = True
- async def close(self):
- """关闭服务,清理资源"""
- if self.db_pool:
- await self.db_pool.close()
- self._initialized = False
- def check_standards(self, standards: List[Dict[str, str]]) -> List[StandardMatchResult]:
- """
- 批量检查标准列表
- Args:
- standards: 标准列表,每个元素包含:
- - standard_name: 标准名称(原始)
- - standard_number: 标准号(原始)
- Returns:
- List[StandardMatchResult]: 匹配结果列表
- """
- if not self._initialized:
- raise RuntimeError("服务未初始化,请先调用 initialize()")
- results = []
- for idx, std in enumerate(standards, start=1):
- result = self.matcher.match(
- seq_no=idx,
- input_name=std.get("standard_name", ""),
- input_number=std.get("standard_number", "")
- )
- results.append(result)
- return results
- def check_single(
- self,
- seq_no: int,
- standard_name: str,
- standard_number: str
- ) -> StandardMatchResult:
- """
- 检查单个标准
- Args:
- seq_no: 序号
- standard_name: 标准名称
- standard_number: 标准号
- Returns:
- StandardMatchResult: 匹配结果
- """
- if not self._initialized:
- raise RuntimeError("服务未初始化,请先调用 initialize()")
- return self.matcher.match(seq_no, standard_name, standard_number)
- def _get_mock_data(self) -> List[Dict]:
- """获取Mock数据 - 文档中的7个测试案例"""
- return [
- # 情况1: 正常现行标准
- {"id": 1, "standard_name": "铁路桥涵设计规范", "standard_number": "TB 10002-2017", "validity": "XH"},
- {"id": 2, "standard_name": "铁路工程抗震设计规范", "standard_number": "GB 50111-2006", "validity": "XH"},
- {"id": 3, "standard_name": "铁路混凝土工程施工质量验收标准", "standard_number": "TB 10424-2018", "validity": "XH"},
- # 情况4: 不匹配(年份错误)- 输入2023,实际2024
- {"id": 4, "standard_name": "公路水运危险性较大工程专项施工方案编制审查规程", "standard_number": "JT/T 1495-2024", "validity": "XH"},
- # 情况2: 被替代(废止+有现行替代)
- {"id": 5, "standard_name": "起重机 钢丝绳 保养、维护、检验和报废", "standard_number": "GB/T 5972-2016", "validity": "FZ"},
- {"id": 6, "standard_name": "起重机 钢丝绳 保养、维护、检验和报废", "standard_number": "GB/T 5972-2023", "validity": "XH"},
- # 情况3: 废止无替代
- {"id": 7, "standard_name": "缆索起重机", "standard_number": "GB/T 28756-2012", "validity": "FZ"},
- {"id": 8, "standard_name": "电力高处作业防坠器", "standard_number": "DL/T 1147-2009", "validity": "FZ"},
- ]
|