|
|
@@ -3,17 +3,26 @@ from __future__ import annotations
|
|
|
import json
|
|
|
import time
|
|
|
import asyncio
|
|
|
-from typing import Any, Dict, List
|
|
|
+import re
|
|
|
+from typing import Any, Dict, List, Optional, Tuple
|
|
|
from functools import partial
|
|
|
|
|
|
-from langchain_milvus import Milvus, BM25BuiltInFunction
|
|
|
-from foundation.infrastructure.config.config import config_handler
|
|
|
-from foundation.ai.models.model_handler import model_handler as mh
|
|
|
+# [已注释] 旧的向量搜索和LLM判断相关导入
|
|
|
+# from langchain_milvus import Milvus, BM25BuiltInFunction
|
|
|
+# from foundation.infrastructure.config.config import config_handler
|
|
|
+# from foundation.ai.models.model_handler import model_handler as mh
|
|
|
from core.construction_review.component.reviewers.utils.inter_tool import InterTool
|
|
|
from core.construction_review.component.reviewers.utils.directory_extraction import BasisItems, BasisItem
|
|
|
from foundation.observability.logger.loggering import review_logger as logger
|
|
|
-from core.construction_review.component.reviewers.utils.reference_matcher import match_reference_files
|
|
|
-from core.construction_review.component.reviewers.utils.timeliness_determiner import determine_timeliness_issue
|
|
|
+# [已注释] 旧的匹配和判定逻辑
|
|
|
+# from core.construction_review.component.reviewers.utils.reference_matcher import match_reference_files
|
|
|
+# from core.construction_review.component.reviewers.utils.timeliness_determiner import determine_timeliness_issue
|
|
|
+
|
|
|
+# [新增] 新的标准时效性审查模块
|
|
|
+from core.construction_review.component.reviewers.standard_timeliness_reviewer import (
|
|
|
+ StandardTimelinessReviewer,
|
|
|
+ review_standard_timeliness_with_standardized_output,
|
|
|
+)
|
|
|
|
|
|
class StandardizedResponseProcessor:
|
|
|
"""标准化响应处理器"""
|
|
|
@@ -26,7 +35,7 @@ class StandardizedResponseProcessor:
|
|
|
处理LLM响应,返回标准格式
|
|
|
|
|
|
Args:
|
|
|
- response_text: LLM原始响应文本(JSON字符串)
|
|
|
+ response_text: LLM原始响应文本(JSON字符串)
|
|
|
check_name: 检查项名称
|
|
|
chapter_code: 章节代码
|
|
|
check_item_code: 检查项代码
|
|
|
@@ -64,143 +73,246 @@ class StandardizedResponseProcessor:
|
|
|
}]
|
|
|
|
|
|
|
|
|
-class BasisSearchEngine:
|
|
|
- """编制依据向量搜索引擎"""
|
|
|
-
|
|
|
- # 类级别的缓存,避免重复创建 Milvus 实例
|
|
|
- _vectorstore_cache = {}
|
|
|
-
|
|
|
- def __init__(self):
|
|
|
- self.emdmodel = None
|
|
|
- self.host = None
|
|
|
- self.port = None
|
|
|
- self.user = None
|
|
|
- self.password = None
|
|
|
- self._initialize()
|
|
|
-
|
|
|
- def _initialize(self):
|
|
|
- """初始化搜索引擎"""
|
|
|
- try:
|
|
|
- # 连接配置
|
|
|
- self.host = config_handler.get('milvus', 'MILVUS_HOST', 'localhost')
|
|
|
- self.port = int(config_handler.get('milvus', 'MILVUS_PORT', '19530'))
|
|
|
- self.user = config_handler.get('milvus', 'MILVUS_USER')
|
|
|
- self.password = config_handler.get('milvus', 'MILVUS_PASSWORD')
|
|
|
-
|
|
|
- # 初始化嵌入模型
|
|
|
- self.emdmodel = mh._get_lq_qwen3_8b_emd()
|
|
|
- logger.info("嵌入模型初始化成功")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f" BasisSearchEngine 初始化失败: {e}")
|
|
|
-
|
|
|
- def _get_vectorstore(self, collection_name: str):
|
|
|
- """获取或创建 Milvus vectorstore 实例(使用缓存)"""
|
|
|
- cache_key = f"{self.host}:{self.port}:{collection_name}"
|
|
|
-
|
|
|
- if cache_key not in BasisSearchEngine._vectorstore_cache:
|
|
|
- connection_args = {
|
|
|
- "uri": f"http://{self.host}:{self.port}",
|
|
|
- "user": self.user,
|
|
|
- "db_name": "lq_db"
|
|
|
- }
|
|
|
- if self.password:
|
|
|
- connection_args["password"] = self.password
|
|
|
-
|
|
|
- # 抑制 AsyncMilvusClient 的警告日志
|
|
|
- import logging
|
|
|
- original_level = logging.getLogger('pymilvus').level
|
|
|
- logging.getLogger('pymilvus').setLevel(logging.ERROR)
|
|
|
-
|
|
|
- try:
|
|
|
- vectorstore = Milvus(
|
|
|
- embedding_function=self.emdmodel,
|
|
|
- collection_name=collection_name,
|
|
|
- connection_args=connection_args,
|
|
|
- consistency_level="Strong",
|
|
|
- builtin_function=BM25BuiltInFunction(),
|
|
|
- vector_field=["dense", "sparse"]
|
|
|
- )
|
|
|
- BasisSearchEngine._vectorstore_cache[cache_key] = vectorstore
|
|
|
- logger.info(f"创建并缓存 Milvus 连接: {cache_key}")
|
|
|
- finally:
|
|
|
- logging.getLogger('pymilvus').setLevel(original_level)
|
|
|
-
|
|
|
- return BasisSearchEngine._vectorstore_cache[cache_key]
|
|
|
-
|
|
|
- def hybrid_search(self, collection_name: str, query_text: str,
|
|
|
- top_k: int = 3, ranker_type: str = "weighted",
|
|
|
- dense_weight: float = 0.7, sparse_weight: float = 0.3):
|
|
|
- try:
|
|
|
- # 使用缓存的 vectorstore
|
|
|
- vectorstore = self._get_vectorstore(collection_name)
|
|
|
-
|
|
|
- # 执行混合搜索
|
|
|
- if ranker_type == "weighted":
|
|
|
- results = vectorstore.similarity_search(
|
|
|
- query=query_text,
|
|
|
- k=top_k,
|
|
|
- ranker_type="weighted",
|
|
|
- ranker_params={"weights": [dense_weight, sparse_weight]}
|
|
|
- )
|
|
|
- else: # rrf
|
|
|
- results = vectorstore.similarity_search(
|
|
|
- query=query_text,
|
|
|
- k=top_k,
|
|
|
- ranker_type="rrf",
|
|
|
- ranker_params={"k": 60}
|
|
|
- )
|
|
|
-
|
|
|
- # 格式化结果,保持与其他搜索方法一致
|
|
|
- formatted_results = []
|
|
|
- for doc in results:
|
|
|
- formatted_results.append({
|
|
|
- 'id': doc.metadata.get('pk', 0),
|
|
|
- 'text_content': doc.page_content,
|
|
|
- 'metadata': doc.metadata,
|
|
|
- 'distance': 0.0,
|
|
|
- 'similarity': 1.0
|
|
|
- })
|
|
|
-
|
|
|
- return formatted_results
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- # 回退到传统的向量搜索
|
|
|
- logger.error(f" 搜索失败: {e}")
|
|
|
+# [已注释] 旧的向量搜索引擎类,已被新的规则匹配替代
|
|
|
+# class BasisSearchEngine:
|
|
|
+# """编制依据向量搜索引擎"""
|
|
|
+#
|
|
|
+# # 类级别的缓存,避免重复创建 Milvus 实例
|
|
|
+# _vectorstore_cache = {}
|
|
|
+#
|
|
|
+# def __init__(self):
|
|
|
+# self.emdmodel = None
|
|
|
+# self.host = None
|
|
|
+# self.port = None
|
|
|
+# self.user = None
|
|
|
+# self.password = None
|
|
|
+# self._initialize()
|
|
|
+#
|
|
|
+# def _initialize(self):
|
|
|
+# """初始化搜索引擎"""
|
|
|
+# try:
|
|
|
+# # 连接配置
|
|
|
+# self.host = config_handler.get('milvus', 'MILVUS_HOST', 'localhost')
|
|
|
+# self.port = int(config_handler.get('milvus', 'MILVUS_PORT', '19530'))
|
|
|
+# self.user = config_handler.get('milvus', 'MILVUS_USER')
|
|
|
+# self.password = config_handler.get('milvus', 'MILVUS_PASSWORD')
|
|
|
+#
|
|
|
+# # 初始化嵌入模型
|
|
|
+# self.emdmodel = mh._get_lq_qwen3_8b_emd()
|
|
|
+# logger.info("嵌入模型初始化成功")
|
|
|
+#
|
|
|
+# except Exception as e:
|
|
|
+# logger.error(f" BasisSearchEngine 初始化失败: {e}")
|
|
|
+#
|
|
|
+# def _get_vectorstore(self, collection_name: str):
|
|
|
+# """获取或创建 Milvus vectorstore 实例(使用缓存)"""
|
|
|
+# cache_key = f"{self.host}:{self.port}:{collection_name}"
|
|
|
+#
|
|
|
+# if cache_key not in BasisSearchEngine._vectorstore_cache:
|
|
|
+# connection_args = {
|
|
|
+# "uri": f"http://{self.host}:{self.port}",
|
|
|
+# "user": self.user,
|
|
|
+# "db_name": "lq_db"
|
|
|
+# }
|
|
|
+# if self.password:
|
|
|
+# connection_args["password"] = self.password
|
|
|
+#
|
|
|
+# # 抑制 AsyncMilvusClient 的警告日志
|
|
|
+# import logging
|
|
|
+# original_level = logging.getLogger('pymilvus').level
|
|
|
+# logging.getLogger('pymilvus').setLevel(logging.ERROR)
|
|
|
+#
|
|
|
+# try:
|
|
|
+# vectorstore = Milvus(
|
|
|
+# embedding_function=self.emdmodel,
|
|
|
+# collection_name=collection_name,
|
|
|
+# connection_args=connection_args,
|
|
|
+# consistency_level="Strong",
|
|
|
+# builtin_function=BM25BuiltInFunction(),
|
|
|
+# vector_field=["dense", "sparse"]
|
|
|
+# )
|
|
|
+# BasisSearchEngine._vectorstore_cache[cache_key] = vectorstore
|
|
|
+# logger.info(f"创建并缓存 Milvus 连接: {cache_key}")
|
|
|
+# finally:
|
|
|
+# logging.getLogger('pymilvus').setLevel(original_level)
|
|
|
+#
|
|
|
+# return BasisSearchEngine._vectorstore_cache[cache_key]
|
|
|
+#
|
|
|
+# def hybrid_search(self, collection_name: str, query_text: str,
|
|
|
+# top_k: int = 3, ranker_type: str = "weighted",
|
|
|
+# dense_weight: float = 0.7, sparse_weight: float = 0.3):
|
|
|
+# try:
|
|
|
+# # 使用缓存的 vectorstore
|
|
|
+# vectorstore = self._get_vectorstore(collection_name)
|
|
|
+#
|
|
|
+# # 执行混合搜索
|
|
|
+# if ranker_type == "weighted":
|
|
|
+# results = vectorstore.similarity_search(
|
|
|
+# query=query_text,
|
|
|
+# k=top_k,
|
|
|
+# ranker_type="weighted",
|
|
|
+# ranker_params={"weights": [dense_weight, sparse_weight]}
|
|
|
+# )
|
|
|
+# else: # rrf
|
|
|
+# results = vectorstore.similarity_search(
|
|
|
+# query=query_text,
|
|
|
+# k=top_k,
|
|
|
+# ranker_type="rrf",
|
|
|
+# ranker_params={"k": 60}
|
|
|
+# )
|
|
|
+#
|
|
|
+# # 格式化结果,保持与其他搜索方法一致
|
|
|
+# formatted_results = []
|
|
|
+# for doc in results:
|
|
|
+# formatted_results.append({
|
|
|
+# 'id': doc.metadata.get('pk', 0),
|
|
|
+# 'text_content': doc.page_content,
|
|
|
+# 'metadata': doc.metadata,
|
|
|
+# 'distance': 0.0,
|
|
|
+# 'similarity': 1.0
|
|
|
+# })
|
|
|
+#
|
|
|
+# return formatted_results
|
|
|
+#
|
|
|
+# except Exception as e:
|
|
|
+# # 回退到传统的向量搜索
|
|
|
+# logger.error(f" 搜索失败: {e}")
|
|
|
|
|
|
|
|
|
class BasisReviewService:
|
|
|
"""编制依据审查服务核心类"""
|
|
|
|
|
|
- def __init__(self, max_concurrent: int = 4):
|
|
|
- self.search_engine = BasisSearchEngine()
|
|
|
- self.response_processor = StandardizedResponseProcessor()
|
|
|
+ def __init__(self, max_concurrent: int = 4, db_pool=None):
|
|
|
+ # [已注释] 旧的向量搜索引擎
|
|
|
+ # self.search_engine = BasisSearchEngine()
|
|
|
+ # self.response_processor = StandardizedResponseProcessor()
|
|
|
self.max_concurrent = max_concurrent
|
|
|
self._semaphore = None
|
|
|
+ self.db_pool = db_pool
|
|
|
+ self._timeliness_reviewer = None
|
|
|
|
|
|
async def __aenter__(self):
|
|
|
"""异步上下文管理器入口"""
|
|
|
if self._semaphore is None:
|
|
|
self._semaphore = asyncio.Semaphore(self.max_concurrent)
|
|
|
+ # [新增] 初始化新的时效性审查器
|
|
|
+ if self._timeliness_reviewer is None:
|
|
|
+ self._timeliness_reviewer = StandardTimelinessReviewer(db_pool=self.db_pool)
|
|
|
+ # 预初始化数据(如果还没初始化)
|
|
|
+ if not self._timeliness_reviewer._service or not self._timeliness_reviewer._service._initialized:
|
|
|
+ await self._timeliness_reviewer.__aenter__()
|
|
|
return self
|
|
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
|
"""异步上下文管理器出口"""
|
|
|
+ # [新增] 关闭时效性审查器
|
|
|
+ if self._timeliness_reviewer:
|
|
|
+ await self._timeliness_reviewer.__aexit__(exc_type, exc_val, exc_tb)
|
|
|
return False
|
|
|
|
|
|
+ def _extract_standard_from_basis(self, basis_text: str) -> Optional[Dict[str, str]]:
|
|
|
+ """
|
|
|
+ [新增] 从编制依据文本中提取标准名称和编号
|
|
|
+
|
|
|
+ 支持格式:
|
|
|
+ - 《标准名称》(标准号)
|
|
|
+ - 《标准名称》(标准号)其他文字
|
|
|
+ - 标准名称(标准号)
|
|
|
+ """
|
|
|
+ if not basis_text:
|
|
|
+ return None
|
|
|
+
|
|
|
+ # 模式1: 《名称》(编号)
|
|
|
+ pattern1 = r'《([^《》]+)》\s*(([^)]+))'
|
|
|
+ match = re.search(pattern1, basis_text)
|
|
|
+ if match:
|
|
|
+ return {
|
|
|
+ "standard_name": match.group(1).strip(),
|
|
|
+ "standard_number": match.group(2).strip()
|
|
|
+ }
|
|
|
+
|
|
|
+ # 模式2: 《名称》(编号) - 半角括号
|
|
|
+ pattern2 = r'《([^《》]+)》\s*\(([^)]+)\)'
|
|
|
+ match = re.search(pattern2, basis_text)
|
|
|
+ if match:
|
|
|
+ return {
|
|
|
+ "standard_name": match.group(1).strip(),
|
|
|
+ "standard_number": match.group(2).strip()
|
|
|
+ }
|
|
|
+
|
|
|
+ # 模式3: 尝试匹配标准号格式(如 GB 1234-2020)
|
|
|
+ standard_pattern = r'([A-Z]{2,6}(?:/[A-Z])?\s*\d{1,6}(?:\.\d)?(?:-\d{4})?)'
|
|
|
+ std_match = re.search(standard_pattern, basis_text.upper())
|
|
|
+ if std_match:
|
|
|
+ standard_number = std_match.group(1).strip()
|
|
|
+ # 尝试提取名称(在编号前的书名号内)
|
|
|
+ name_match = re.search(r'《([^《》]+)》', basis_text)
|
|
|
+ if name_match:
|
|
|
+ return {
|
|
|
+ "standard_name": name_match.group(1).strip(),
|
|
|
+ "standard_number": standard_number
|
|
|
+ }
|
|
|
+ # 如果没有书名号,使用空名称
|
|
|
+ return {
|
|
|
+ "standard_name": "",
|
|
|
+ "standard_number": standard_number
|
|
|
+ }
|
|
|
+
|
|
|
+ return None
|
|
|
+
|
|
|
async def review_batch(
|
|
|
self,
|
|
|
basis_items: List[str],
|
|
|
- collection_name: str = "first_bfp_collection_status",
|
|
|
- top_k_each: int = 10, # 增加召回数量,提高精确匹配机会
|
|
|
+ collection_name: str = "first_bfp_collection_status", # [保留参数但不再使用]
|
|
|
+ top_k_each: int = 10, # [保留参数但不再使用]
|
|
|
) -> List[Dict[str, Any]]:
|
|
|
- """异步批次审查(通常3条)"""
|
|
|
+ """
|
|
|
+ [已修改] 异步批次审查(通常3条)
|
|
|
+
|
|
|
+ 新逻辑:使用基于内存的规则匹配替代向量搜索+LLM判断
|
|
|
+ """
|
|
|
basis_items = [x for x in (basis_items or []) if isinstance(x, str) and x.strip()]
|
|
|
if not basis_items:
|
|
|
return []
|
|
|
|
|
|
async with self._semaphore:
|
|
|
try:
|
|
|
+ # [新增] 从编制依据中提取标准信息
|
|
|
+ standards_list = []
|
|
|
+ for basis in basis_items:
|
|
|
+ std_info = self._extract_standard_from_basis(basis)
|
|
|
+ if std_info:
|
|
|
+ standards_list.append(std_info)
|
|
|
+ logger.debug(f"提取到标准: {std_info['standard_name']} ({std_info['standard_number']})")
|
|
|
+ else:
|
|
|
+ logger.warning(f"无法从编制依据提取标准信息: {basis}")
|
|
|
+
|
|
|
+ if not standards_list:
|
|
|
+ logger.info(f"批次中未提取到有效标准信息,跳过审查")
|
|
|
+ return []
|
|
|
+
|
|
|
+ # [新增] 使用新的时效性审查逻辑
|
|
|
+ if not self._timeliness_reviewer:
|
|
|
+ raise RuntimeError("时效性审查器未初始化,请使用异步上下文管理器")
|
|
|
+
|
|
|
+ review_results = self._timeliness_reviewer.review_standards(standards_list)
|
|
|
+
|
|
|
+ # 转换为标准格式
|
|
|
+ standardized_results = self._timeliness_reviewer.convert_to_standardized_format(
|
|
|
+ review_results,
|
|
|
+ check_item="timeliness_check",
|
|
|
+ chapter_code="basis",
|
|
|
+ check_item_code="basis_timeliness_check"
|
|
|
+ )
|
|
|
+
|
|
|
+ # 统计结果
|
|
|
+ issue_count = sum(1 for item in standardized_results if item.get('exist_issue', False))
|
|
|
+ logger.info(f"编制依据批次审查完成:总计 {len(standards_list)} 项,发现问题 {issue_count} 项")
|
|
|
+
|
|
|
+ return standardized_results
|
|
|
+
|
|
|
+ # [已注释] 旧的向量搜索+LLM判断逻辑
|
|
|
+ """
|
|
|
# 并发搜索每个编制依据
|
|
|
search_tasks = []
|
|
|
for basis in basis_items:
|
|
|
@@ -218,77 +330,15 @@ class BasisReviewService:
|
|
|
logger.error(f"搜索失败 '{basis_items[i]}': {result}")
|
|
|
grouped_candidates.append([])
|
|
|
else:
|
|
|
- # result 是 List[dict],需要遍历
|
|
|
texts = [item["text_content"] for item in result if "text_content" in item]
|
|
|
grouped_candidates.append(texts)
|
|
|
-
|
|
|
- # 获取match_reference_files的结果并过滤
|
|
|
- match_result = await match_reference_files(reference_text=grouped_candidates, review_text=basis_items)
|
|
|
|
|
|
- # 记录完整的匹配结果用于调试
|
|
|
- logger.info(f"批次 match_reference_files 原始结果: {match_result[:500]}...")
|
|
|
-
|
|
|
- # 解析JSON并过滤:保留有相关信息的项
|
|
|
- try:
|
|
|
- match_data = json.loads(match_result)
|
|
|
- # 提取items字段(match_reference_files返回{items: [...]}格式)
|
|
|
- items = match_data.get('items', match_data) if isinstance(match_data, dict) else match_data
|
|
|
-
|
|
|
- logger.info(f"解析到 {len(items)} 个匹配项")
|
|
|
- for idx, item in enumerate(items):
|
|
|
- logger.info(f" 项{idx}: review_item={item.get('review_item', 'unknown')}, "
|
|
|
- f"has_related_file={item.get('has_related_file')}, "
|
|
|
- f"exact_match_info={item.get('exact_match_info')}, "
|
|
|
- f"same_name_current={item.get('same_name_current')}")
|
|
|
-
|
|
|
- # 放宽过滤条件:只要有相关文件信息就进行审查
|
|
|
- filtered_data = [
|
|
|
- item for item in items
|
|
|
- if item.get('has_related_file') or
|
|
|
- item.get('exact_match_info') or
|
|
|
- item.get('same_name_current')
|
|
|
- ]
|
|
|
-
|
|
|
- logger.info(f"过滤后保留 {len(filtered_data)} 个项")
|
|
|
-
|
|
|
- # 记录被过滤掉的项目用于调试
|
|
|
- skipped_items = [
|
|
|
- item for item in items
|
|
|
- if not (item.get('has_related_file') or
|
|
|
- item.get('exact_match_info') or
|
|
|
- item.get('same_name_current'))
|
|
|
- ]
|
|
|
- if skipped_items:
|
|
|
- logger.warning(f"跳过了 {len(skipped_items)} 个无参考信息的编制依据: "
|
|
|
- f"{[item.get('review_item', 'unknown') for item in skipped_items]}")
|
|
|
-
|
|
|
- # 如果没有过滤出数据,直接返回空结果
|
|
|
- if not filtered_data:
|
|
|
- logger.info(f"过滤后没有符合条件的编制依据,跳过后续检查")
|
|
|
- standardized_result = []
|
|
|
- else:
|
|
|
- # 重新构建JSON格式
|
|
|
- if isinstance(match_data, dict) and 'items' in match_data:
|
|
|
- match_result = json.dumps({"items": filtered_data}, ensure_ascii=False, indent=2)
|
|
|
- else:
|
|
|
- match_result = json.dumps(filtered_data, ensure_ascii=False, indent=2)
|
|
|
-
|
|
|
- llm_out = await determine_timeliness_issue(match_result)
|
|
|
-
|
|
|
- standardized_result = self.response_processor.process_llm_response(llm_out, "timeliness_check", "basis", "basis_timeliness_check")
|
|
|
- # 统计问题数量
|
|
|
- issue_count = sum(1 for item in standardized_result if item.get('exist_issue', False))
|
|
|
- logger.info(f"编制依据批次审查完成:总计 {len(filtered_data)} 项,发现问题 {issue_count} 项")
|
|
|
-
|
|
|
- return standardized_result if standardized_result else []
|
|
|
-
|
|
|
- except (json.JSONDecodeError, TypeError) as e:
|
|
|
- logger.warning(f"过滤match_reference_files结果时出错: {e}")
|
|
|
- # 如果解析失败,返回空结果
|
|
|
- return []
|
|
|
+ match_result = await match_reference_files(reference_text=grouped_candidates, review_text=basis_items)
|
|
|
+ ... # 其余旧逻辑已省略
|
|
|
+ """
|
|
|
|
|
|
except Exception as e:
|
|
|
- logger.error(f" 批次处理失败: {e}")
|
|
|
+ logger.error(f"批次处理失败: {e}")
|
|
|
return [{
|
|
|
"check_item": "timeliness_check",
|
|
|
"chapter_code": "basis",
|
|
|
@@ -298,15 +348,15 @@ class BasisReviewService:
|
|
|
"risk_info": {"risk_level": "high"}
|
|
|
}]
|
|
|
|
|
|
-
|
|
|
-
|
|
|
+ # [已注释] 旧的向量搜索方法,已被新的规则匹配替代
|
|
|
+ """
|
|
|
async def _async_search_basis(
|
|
|
self,
|
|
|
basis: str,
|
|
|
collection_name: str,
|
|
|
top_k_each: int
|
|
|
) -> List[dict]:
|
|
|
- """异步搜索单个编制依据(Hybrid Search)"""
|
|
|
+ # 异步搜索单个编制依据(Hybrid Search)
|
|
|
try:
|
|
|
loop = asyncio.get_running_loop()
|
|
|
func = partial(
|
|
|
@@ -324,11 +374,11 @@ class BasisReviewService:
|
|
|
except Exception as e:
|
|
|
logger.error(f" 搜索失败 '{basis}': {e}")
|
|
|
return []
|
|
|
+ """
|
|
|
|
|
|
-
|
|
|
async def review_all(self, basis_items: BasisItems, collection_name: str = "first_bfp_collection_status",
|
|
|
progress_manager=None, callback_task_id: str = None) -> List[List[Dict[str, Any]]]:
|
|
|
- """异步批量审查所有编制依据(入参为 BasisItems)"""
|
|
|
+ """异步批量审查所有编制依据(入参为 BasisItems)"""
|
|
|
if not basis_items or not getattr(basis_items, "items", None):
|
|
|
return []
|
|
|
|
|
|
@@ -339,7 +389,7 @@ class BasisReviewService:
|
|
|
start_time = time.time()
|
|
|
total_batches = (len(items) + 2) // 3 # 计算总批次数
|
|
|
|
|
|
- # 发送开始审查的SSE推送(使用独立命名空间,避免与主流程进度冲突)
|
|
|
+ # 发送开始审查的SSE推送(使用独立命名空间,避免与主流程进度冲突)
|
|
|
if progress_manager and callback_task_id:
|
|
|
try:
|
|
|
await progress_manager.update_stage_progress(
|
|
|
@@ -373,7 +423,7 @@ class BasisReviewService:
|
|
|
if isinstance(item, dict) and item.get('is_standard', False):
|
|
|
batch_standard_count += 1
|
|
|
|
|
|
- # 立即推送当前批次完成的SSE消息(使用独立命名空间)
|
|
|
+ # 立即推送当前批次完成的SSE消息(使用独立命名空间)
|
|
|
logger.info(f"批次{batch_index + 1}完成,准备推送SSE")
|
|
|
if progress_manager and callback_task_id:
|
|
|
try:
|
|
|
@@ -398,7 +448,7 @@ class BasisReviewService:
|
|
|
error_result = [{"name": name, "is_standard": False, "status": "", "meg": f"批次处理失败2: {str(e)}"}
|
|
|
for name in batch]
|
|
|
|
|
|
- # 即使失败也要推送结果(使用独立命名空间)
|
|
|
+ # 即使失败也要推送结果(使用独立命名空间)
|
|
|
if progress_manager and callback_task_id:
|
|
|
try:
|
|
|
await progress_manager.update_stage_progress(
|
|
|
@@ -463,7 +513,7 @@ class BasisReviewService:
|
|
|
logger.info(f"并发执行完成,成功批次: {successful_batches}/{total_batches}")
|
|
|
|
|
|
|
|
|
- # 发送完成审查的SSE推送(使用独立命名空间,不设置current避免覆盖主流程进度)
|
|
|
+ # 发送完成审查的SSE推送(使用独立命名空间,不设置current避免覆盖主流程进度)
|
|
|
elapsed_time = time.time() - start_time
|
|
|
if progress_manager and callback_task_id:
|
|
|
try:
|
|
|
@@ -486,15 +536,37 @@ class BasisReviewService:
|
|
|
|
|
|
|
|
|
# 便捷函数
|
|
|
-async def review_basis_batch_async(basis_items: List[str], max_concurrent: int = 4) -> List[Dict[str, Any]]:
|
|
|
- """异步批次审查便捷函数"""
|
|
|
- async with BasisReviewService(max_concurrent=max_concurrent) as service:
|
|
|
+async def review_basis_batch_async(
|
|
|
+ basis_items: List[str],
|
|
|
+ max_concurrent: int = 4,
|
|
|
+ db_pool=None
|
|
|
+) -> List[Dict[str, Any]]:
|
|
|
+ """
|
|
|
+ [已修改] 异步批次审查便捷函数
|
|
|
+
|
|
|
+ Args:
|
|
|
+ basis_items: 编制依据列表
|
|
|
+ max_concurrent: 最大并发数
|
|
|
+ db_pool: 数据库连接池(用于新的规则匹配)
|
|
|
+ """
|
|
|
+ async with BasisReviewService(max_concurrent=max_concurrent, db_pool=db_pool) as service:
|
|
|
return await service.review_batch(basis_items)
|
|
|
|
|
|
|
|
|
-async def review_all_basis_async(basis_items: BasisItems, max_concurrent: int = 4) -> List[List[Dict[str, Any]]]:
|
|
|
- """异步全部审查便捷函数(BasisItems 入参)"""
|
|
|
- async with BasisReviewService(max_concurrent=max_concurrent) as service:
|
|
|
+async def review_all_basis_async(
|
|
|
+ basis_items: BasisItems,
|
|
|
+ max_concurrent: int = 4,
|
|
|
+ db_pool=None
|
|
|
+) -> List[List[Dict[str, Any]]]:
|
|
|
+ """
|
|
|
+ [已修改] 异步全部审查便捷函数(BasisItems 入参)
|
|
|
+
|
|
|
+ Args:
|
|
|
+ basis_items: BasisItems 对象
|
|
|
+ max_concurrent: 最大并发数
|
|
|
+ db_pool: 数据库连接池(用于新的规则匹配)
|
|
|
+ """
|
|
|
+ async with BasisReviewService(max_concurrent=max_concurrent, db_pool=db_pool) as service:
|
|
|
return await service.review_all(basis_items)
|
|
|
|
|
|
if __name__ == "__main__":
|