|
|
@@ -0,0 +1,194 @@
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+import os
|
|
|
+import sys
|
|
|
+sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
|
|
|
+import requests
|
|
|
+from dotenv import load_dotenv
|
|
|
+from foundation.models.base_online_platform import BaseApiPlatform
|
|
|
+from foundation.base.config import config_handler
|
|
|
+from foundation.logger.loggering import server_logger
|
|
|
+from foundation.utils.common import handler_err
|
|
|
+from openai import OpenAI
|
|
|
+from langchain_core.embeddings import Embeddings
|
|
|
+from chromadb.utils.embedding_functions import EmbeddingFunction
|
|
|
+from typing import List
|
|
|
+import numpy as np
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+class SiliconFlowEmbeddings(Embeddings):
|
|
|
+ """
|
|
|
+ LangChain 兼容的硅基流动嵌入模型客户端
|
|
|
+ 使用方式:
|
|
|
+ embeddings = SiliconFlowEmbeddings(
|
|
|
+ model="netease-youdao/bce-embedding-base_v1",
|
|
|
+ api_key="sk-..."
|
|
|
+ )
|
|
|
+ vectors = embeddings.embed_documents(["文本1", "文本2"])
|
|
|
+ """
|
|
|
+ def __init__(self, base_url: str, api_key: str, embed_model_id: str):
|
|
|
+ self.model = embed_model_id
|
|
|
+ self.api_key = api_key
|
|
|
+ if not self.api_key:
|
|
|
+ raise ValueError("必须提供 api_key 或设置环境变量 SILICONFLOW_API_KEY")
|
|
|
+ self.client = OpenAI(
|
|
|
+ api_key=self.api_key,
|
|
|
+ base_url=base_url
|
|
|
+ )
|
|
|
+
|
|
|
+ def embed_documents(self, texts: List[str]) -> List[List[float]]:
|
|
|
+ """对文档列表进行向量化"""
|
|
|
+ if not texts:
|
|
|
+ return []
|
|
|
+ response = self.client.embeddings.create(
|
|
|
+ model=self.model,
|
|
|
+ input=texts
|
|
|
+ )
|
|
|
+ return [item.embedding for item in response.data]
|
|
|
+
|
|
|
+ def embed_query(self, text: str) -> List[float]:
|
|
|
+ """对查询文本进行向量化"""
|
|
|
+ return self.embed_documents([text])[0]
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+class ChromaSiliconFlowEmbedding(EmbeddingFunction):
|
|
|
+ """
|
|
|
+ 将SiliconFlowEmbeddings适配到ChromaDB的嵌入函数接口
|
|
|
+ """
|
|
|
+ def __init__(self, embeddings):
|
|
|
+ self.embeddings = embeddings
|
|
|
+
|
|
|
+ def __call__(self, input: List[str]) -> List[List[float]]:
|
|
|
+ raw_embeddings = self.embeddings.embed_documents(input) # 关键添加
|
|
|
+ return self.normalized_embeddings(raw_embeddings)
|
|
|
+
|
|
|
+ def embed_documents(self, input: List[str]) -> List[List[float]]:
|
|
|
+ raw_embeddings = self.embeddings.embed_documents(input) # 关键添加
|
|
|
+ return self.normalized_embeddings(raw_embeddings)
|
|
|
+
|
|
|
+ def embed_query(self, text: str) -> List[float]:
|
|
|
+ """对查询文本进行向量化"""
|
|
|
+ raw_embeddings = self.embeddings.embed_documents([text])[0]
|
|
|
+ return self.normalized_embeddings(raw_embeddings)
|
|
|
+
|
|
|
+
|
|
|
+ def normalized_embeddings(self , raw_embeddings):
|
|
|
+ # L2归一化处理
|
|
|
+ normalized = []
|
|
|
+ for vector in raw_embeddings:
|
|
|
+ norm = np.linalg.norm(vector)
|
|
|
+ if norm > 0:
|
|
|
+ normalized.append(vector / norm)
|
|
|
+ else:
|
|
|
+ normalized.append(vector)
|
|
|
+ return normalized
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+class SiliconFlowAPI(BaseApiPlatform):
|
|
|
+ def __init__(self , trace_id=""):
|
|
|
+ self.trace_id = trace_id
|
|
|
+ self.config_prefix = "siliconflow"
|
|
|
+ self.model_server_url = config_handler.get(self.config_prefix, "SLCF_MODEL_SERVER_URL")
|
|
|
+ self.api_key = config_handler.get(self.config_prefix, "SLCF_API_KEY")
|
|
|
+ self.embed_url = self.model_server_url +"/embeddings" #/embeddings
|
|
|
+ self.rerank_url = self.model_server_url +"/rerank" #/rerank
|
|
|
+ self.embed_model_id = config_handler.get(self.config_prefix, "SLCF_EMBED_MODEL_ID")
|
|
|
+ self.rerank_model_id = config_handler.get(self.config_prefix, "SLCF_REANKER_MODEL_ID")
|
|
|
+ server_logger.info(f"SiliconFlowAPI -> embed_url:{self.embed_url},rerank_url:{self.rerank_url}")
|
|
|
+ server_logger.info(f"SiliconFlowAPI -> embed_model_id:{self.embed_model_id},rerank_model_id:{self.rerank_model_id}")
|
|
|
+ self.client = self.get_openai_client(self.model_server_url, self.api_key)
|
|
|
+ # 创建LangChain兼容的嵌入对象
|
|
|
+ langchain_embeddings = SiliconFlowEmbeddings(base_url = self.model_server_url , api_key=self.api_key , embed_model_id=self.embed_model_id)
|
|
|
+ self.embed_model = ChromaSiliconFlowEmbedding(embeddings=langchain_embeddings)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ def get_embed_model(self):
|
|
|
+ """
|
|
|
+ 获取嵌入模型
|
|
|
+ """
|
|
|
+ return self.embed_model
|
|
|
+
|
|
|
+
|
|
|
+ def get_embeddings(self, texts: list[str]):
|
|
|
+ """获取文本向量(embedding)"""
|
|
|
+ try:
|
|
|
+ response = self.client.embeddings.create(
|
|
|
+ model=self.embed_model_id, # 指定向量模型
|
|
|
+ input=texts if isinstance(texts, list) else [texts]
|
|
|
+ )
|
|
|
+ # 返回 embeddings 列表
|
|
|
+ return [data.embedding for data in response.data]
|
|
|
+ except Exception as e:
|
|
|
+ handler_err(server_logger, trace_id=self.trace_id, err=e, err_name='Embedding 调用失败')
|
|
|
+ raise
|
|
|
+
|
|
|
+
|
|
|
+ def rerank(self, input_query: str, documents: list, top_n: int = 5, return_documents: bool = True):
|
|
|
+ """
|
|
|
+ 使用 BGE 重排序模型进行相关性打分
|
|
|
+ 使用重排序模型对候选文档进行排序
|
|
|
+ :param query: 用户查询语句
|
|
|
+ :param documents: 候选文本列表
|
|
|
+ :param top_n: 返回前 N 个结果
|
|
|
+ :return: 排序后的结果列表,包含文本和相似度分数
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ headers = {
|
|
|
+ "Authorization": f"Bearer {self.api_key}",
|
|
|
+ "Content-Type": "application/json"
|
|
|
+ }
|
|
|
+ payload = {
|
|
|
+ "model": self.rerank_model_id,
|
|
|
+ "query": input_query,
|
|
|
+ "documents": documents,
|
|
|
+ "top_n": top_n,
|
|
|
+ "return_documents": return_documents
|
|
|
+ }
|
|
|
+ response = requests.post(self.rerank_url, json=payload, headers=headers)
|
|
|
+ response.raise_for_status()
|
|
|
+ data = response.json()
|
|
|
+
|
|
|
+ results = []
|
|
|
+ for item in data['results']:
|
|
|
+ results.append({
|
|
|
+ "index": item['index'],
|
|
|
+ "relevance_score": item['relevance_score'],
|
|
|
+ "document": item.get('document', {}).get('text', None)
|
|
|
+ })
|
|
|
+ return results
|
|
|
+ except Exception as e:
|
|
|
+ handler_err(server_logger, trace_id=self.trace_id, err=e, err_name='重排序调用失败')
|
|
|
+ raise
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+# 使用示例
|
|
|
+if __name__ == "__main__":
|
|
|
+ # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
|
|
|
+ client = SiliconFlowAPI()
|
|
|
+
|
|
|
+ # 示例1:向量化文本
|
|
|
+ texts = ["奶牛养殖技术", "牛肉市场价格分析"]
|
|
|
+ embeddings = client.get_embeddings(texts)
|
|
|
+ print(f"向量维度:{len(embeddings[0])}") # 输出向量维度
|
|
|
+
|
|
|
+ # 示例2:重排序文档
|
|
|
+ query = "如何提高牛奶产量?"
|
|
|
+ documents = [
|
|
|
+ "奶牛饲料配比指南",
|
|
|
+ "牧场管理规范",
|
|
|
+ "牛奶加工工艺流程",
|
|
|
+ "提高产奶量的10个技巧"
|
|
|
+ ]
|
|
|
+ rerank_results = client.rerank(query, documents)
|
|
|
+
|
|
|
+ print("\n重排序结果:")
|
|
|
+ for result in sorted(rerank_results, key=lambda x: x['relevance_score'], reverse=True):
|
|
|
+ print(f"{result['index']} (得分: {result['relevance_score']:.2f}): {documents[result['index']]}")
|