|
|
@@ -20,6 +20,9 @@ try:
|
|
|
from core.construction_review.component.reviewers.utils.inter_tool import InterTool
|
|
|
from foundation.observability.logger.loggering import server_logger as logger
|
|
|
from langchain_core.prompts import ChatPromptTemplate
|
|
|
+ from langchain_milvus import Milvus, BM25BuiltInFunction
|
|
|
+ from functools import partial
|
|
|
+
|
|
|
except ImportError as e:
|
|
|
logger.warning(f"Warning: 无法导入依赖: {e}")
|
|
|
# 设置默认值,避免程序崩溃
|
|
|
@@ -106,37 +109,7 @@ class MessageBuilder:
|
|
|
("system", data["timeliness_basis_reviewer"]["system_prompt"]),
|
|
|
("user", data["timeliness_basis_reviewer"]["user_prompt_template"])
|
|
|
])
|
|
|
-
|
|
|
- def build_user_content(
|
|
|
- self,
|
|
|
- basis_items: List[str],
|
|
|
- grouped_candidates: List[List[Dict[str, Any]]],
|
|
|
- ) -> str:
|
|
|
- """构建用户内容"""
|
|
|
- items = []
|
|
|
- for raw, cands in zip(basis_items, grouped_candidates):
|
|
|
- items.append({
|
|
|
- "raw_text": raw,
|
|
|
- "candidates": [
|
|
|
- {
|
|
|
- "id": c.get("id"),
|
|
|
- "similarity": c.get("similarity"),
|
|
|
- "text": c.get("text") or c.get("text_content") or "",
|
|
|
- }
|
|
|
- for c in (cands or [])
|
|
|
- ],
|
|
|
- })
|
|
|
-
|
|
|
- user_content = {
|
|
|
- "items": items,
|
|
|
- "required_output_example": [
|
|
|
- {"is_standard": False, "status": "", "meg": ""} for _ in items
|
|
|
- ],
|
|
|
- }
|
|
|
-
|
|
|
- return json.dumps(user_content, ensure_ascii=False)
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
class BasisSearchEngine:
|
|
|
"""编制依据向量搜索引擎"""
|
|
|
|
|
|
@@ -153,16 +126,6 @@ class BasisSearchEngine:
|
|
|
self.user = config_handler.get('milvus', 'MILVUS_USER')
|
|
|
self.password = config_handler.get('milvus', 'MILVUS_PASSWORD')
|
|
|
|
|
|
- # 连接到 Milvus
|
|
|
- connections.connect(
|
|
|
- alias="default",
|
|
|
- host=self.host,
|
|
|
- port=self.port,
|
|
|
- user=self.user,
|
|
|
- db_name="lq_db"
|
|
|
- )
|
|
|
- logger.info(f" 成功连接到 Milvus {self.host}:{self.port}")
|
|
|
-
|
|
|
# 初始化嵌入模型
|
|
|
if mh:
|
|
|
self.emdmodel = mh._get_lq_qwen3_8b_emd()
|
|
|
@@ -172,94 +135,63 @@ class BasisSearchEngine:
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f" BasisSearchEngine 初始化失败: {e}")
|
|
|
- self.emdmodel = None
|
|
|
-
|
|
|
- def text_to_vector(self, text: str) -> List[float]:
|
|
|
- """将文本转换为向量"""
|
|
|
- if not self.emdmodel:
|
|
|
- raise ValueError("嵌入模型未初始化")
|
|
|
-
|
|
|
- try:
|
|
|
- embedding = self.emdmodel.embed_query(text)
|
|
|
- return embedding.tolist() if hasattr(embedding, 'tolist') else list(embedding)
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"文本向量化失败: {e}")
|
|
|
- raise
|
|
|
|
|
|
- def similarity_search(self, collection_name: str, query_text: str,
|
|
|
- min_score: float = 0.3, top_k: int = 3,
|
|
|
- filters: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
|
|
|
- """执行相似度搜索"""
|
|
|
+ def hybrid_search(self, collection_name: str, query_text: str,
|
|
|
+ top_k: int = 3, ranker_type: str = "weighted",
|
|
|
+ dense_weight: float = 0.7, sparse_weight: float = 0.3):
|
|
|
try:
|
|
|
- if not self.emdmodel:
|
|
|
- raise ValueError("搜索器未正确初始化")
|
|
|
|
|
|
- # 获取集合
|
|
|
- collection = Collection(collection_name)
|
|
|
- collection.load()
|
|
|
-
|
|
|
- # 转换查询文本为向量
|
|
|
- query_embedding = self.text_to_vector(query_text)
|
|
|
-
|
|
|
- # 搜索参数
|
|
|
- search_params = {
|
|
|
- "metric_type": "COSINE",
|
|
|
- "params": {"nprobe": 10}
|
|
|
+ # 连接到现有集合
|
|
|
+ connection_args = {
|
|
|
+ "uri": f"http://{self.host}:{self.port}",
|
|
|
+ "user": self.user,
|
|
|
+ "db_name": "lq_db"
|
|
|
}
|
|
|
|
|
|
- # 构建过滤表达式
|
|
|
- filter_expr = self._create_filter(filters)
|
|
|
-
|
|
|
- # 执行搜索
|
|
|
- results = collection.search(
|
|
|
- data=[query_embedding],
|
|
|
- anns_field="embedding",
|
|
|
- param=search_params,
|
|
|
- limit=top_k,
|
|
|
- expr=filter_expr,
|
|
|
- output_fields=["text", "metadata"]
|
|
|
+ if self.password:
|
|
|
+ connection_args["password"] = self.password
|
|
|
+
|
|
|
+ vectorstore = Milvus(
|
|
|
+ embedding_function=self.emdmodel,
|
|
|
+ collection_name=collection_name,
|
|
|
+ connection_args=connection_args,
|
|
|
+ consistency_level="Strong",
|
|
|
+ builtin_function=BM25BuiltInFunction(),
|
|
|
+ vector_field=["dense", "sparse"]
|
|
|
)
|
|
|
|
|
|
- # 格式化结果
|
|
|
- formatted_results = []
|
|
|
- for hits in results:
|
|
|
- for hit in hits:
|
|
|
- formatted_results.append({
|
|
|
- 'id': hit.id,
|
|
|
- 'text': hit.entity.get('text', ''),
|
|
|
- 'text_content': hit.entity.get('text', ''),
|
|
|
- 'metadata': hit.entity.get('metadata', {}),
|
|
|
- 'distance': hit.distance,
|
|
|
- 'similarity': 1 - hit.distance
|
|
|
- })
|
|
|
-
|
|
|
- # 过滤低相似度结果
|
|
|
- filtered_results = [
|
|
|
- result for result in formatted_results
|
|
|
- if result['similarity'] >= min_score
|
|
|
- ]
|
|
|
-
|
|
|
- return filtered_results
|
|
|
+ # 执行混合搜索
|
|
|
+ if ranker_type == "weighted":
|
|
|
+ results = vectorstore.similarity_search(
|
|
|
+ query=query_text,
|
|
|
+ k=top_k,
|
|
|
+ ranker_type="weighted",
|
|
|
+ ranker_params={"weights": [dense_weight, sparse_weight]}
|
|
|
+ )
|
|
|
+ else: # rrf
|
|
|
+ results = vectorstore.similarity_search(
|
|
|
+ query=query_text,
|
|
|
+ k=top_k,
|
|
|
+ ranker_type="rrf",
|
|
|
+ ranker_params={"k": 60}
|
|
|
+ )
|
|
|
|
|
|
- except Exception as e:
|
|
|
- logger.error(f" 相似度搜索失败: {e}")
|
|
|
- return []
|
|
|
+ # 格式化结果,保持与其他搜索方法一致
|
|
|
+ formatted_results = []
|
|
|
+ for doc in results:
|
|
|
+ formatted_results.append({
|
|
|
+ 'id': doc.metadata.get('pk', 0),
|
|
|
+ 'text_content': doc.page_content,
|
|
|
+ 'metadata': doc.metadata,
|
|
|
+ 'distance': 0.0,
|
|
|
+ 'similarity': 1.0
|
|
|
+ })
|
|
|
|
|
|
- def _create_filter(self, filters: Dict[str, Any]) -> str:
|
|
|
- """创建过滤条件"""
|
|
|
- if not filters:
|
|
|
- return ""
|
|
|
-
|
|
|
- conditions = []
|
|
|
- for key, value in filters.items():
|
|
|
- if isinstance(value, str):
|
|
|
- conditions.append(f'metadata["{key}"] == "{value}"')
|
|
|
- elif isinstance(value, (int, float)):
|
|
|
- conditions.append(f'metadata["{key}"] == {value}')
|
|
|
- else:
|
|
|
- conditions.append(f'metadata["{key}"] == "{value}"')
|
|
|
+ return formatted_results
|
|
|
|
|
|
- return " and ".join(conditions)
|
|
|
+ except Exception as e:
|
|
|
+ # 回退到传统的向量搜索
|
|
|
+ logger.error(f" 搜索失败: {e}")
|
|
|
|
|
|
|
|
|
class LLMReviewClient:
|
|
|
@@ -332,7 +264,7 @@ class BasisReviewService:
|
|
|
search_tasks = []
|
|
|
for basis in basis_items:
|
|
|
task = asyncio.create_task(
|
|
|
- self._async_search_basis(basis, collection_name, min_score, top_k_each, filters)
|
|
|
+ self._async_search_basis(basis,collection_name, top_k_each)
|
|
|
)
|
|
|
search_tasks.append(task)
|
|
|
|
|
|
@@ -342,10 +274,13 @@ class BasisReviewService:
|
|
|
grouped_candidates = []
|
|
|
for i, result in enumerate(search_results):
|
|
|
if isinstance(result, Exception):
|
|
|
- logger.error(f" 搜索失败 '{basis_items[i]}': {result}")
|
|
|
+ logger.error(f"搜索失败 '{basis_items[i]}': {result}")
|
|
|
grouped_candidates.append([])
|
|
|
else:
|
|
|
- grouped_candidates.append(result)
|
|
|
+ # result 是 List[dict],需要遍历
|
|
|
+ texts = [item["text_content"] for item in result if "text_content" in item]
|
|
|
+ grouped_candidates.append(texts)
|
|
|
+ print("搜索结果:\n"+str(grouped_candidates))
|
|
|
|
|
|
# 构建提示词模板和用户内容
|
|
|
prompt_template = self.message_builder.get_prompt_template()
|
|
|
@@ -374,27 +309,26 @@ class BasisReviewService:
|
|
|
}]
|
|
|
|
|
|
|
|
|
+
|
|
|
async def _async_search_basis(
|
|
|
self,
|
|
|
basis: str,
|
|
|
collection_name: str,
|
|
|
- min_score: float,
|
|
|
- top_k_each: int,
|
|
|
- filters: Optional[Dict[str, Any]]
|
|
|
- ) -> List[Dict[str, Any]]:
|
|
|
- """异步搜索单个编制依据"""
|
|
|
+ top_k_each: int
|
|
|
+ ) -> List[dict]:
|
|
|
+ """异步搜索单个编制依据(Hybrid Search)"""
|
|
|
try:
|
|
|
- # 在线程池中执行同步搜索操作
|
|
|
- loop = asyncio.get_event_loop()
|
|
|
- retrieved = await loop.run_in_executor(
|
|
|
- None,
|
|
|
- self.search_engine.similarity_search,
|
|
|
- collection_name,
|
|
|
- basis,
|
|
|
- min_score,
|
|
|
- top_k_each,
|
|
|
- filters
|
|
|
+ loop = asyncio.get_running_loop()
|
|
|
+ func = partial(
|
|
|
+ self.search_engine.hybrid_search,
|
|
|
+ collection_name=collection_name,
|
|
|
+ query_text=basis,
|
|
|
+ top_k=top_k_each,
|
|
|
+ ranker_type="weighted",
|
|
|
+ dense_weight=0.3,
|
|
|
+ sparse_weight=0.7
|
|
|
)
|
|
|
+ retrieved = await loop.run_in_executor(None, func)
|
|
|
logger.info(f" 搜索 '{basis}' -> 找到 {len(retrieved or [])} 个结果")
|
|
|
return retrieved or []
|
|
|
except Exception as e:
|
|
|
@@ -402,7 +336,7 @@ class BasisReviewService:
|
|
|
return []
|
|
|
|
|
|
|
|
|
- async def review_all(self, text: str, collection_name: str = "already_basis",
|
|
|
+ async def review_all(self, text: str, collection_name: str = "first_bfp_collection_status",
|
|
|
progress_manager=None, callback_task_id: str = None) -> List[List[Dict[str, Any]]]:
|
|
|
"""异步批量审查所有编制依据"""
|
|
|
items = self.text_processor.extract_basis(text)
|
|
|
@@ -568,5 +502,5 @@ async def review_all_basis_async(text: str, max_concurrent: int = 4) -> List[Lis
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
# 简单测试
|
|
|
- test_text = "根据《建筑设计防火规范》(GB50016-2014)和《高层民用建筑设计防火规范》(GB50045-95)进行设计。"
|
|
|
+ test_text = "《起重机械安全规程》(GB6067-2010)"
|
|
|
result = asyncio.run(review_all_basis_async(test_text))
|