import os import sys sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))) import requests from dotenv import load_dotenv from foundation.models.base_online_platform import BaseApiPlatform from foundation.base.config import config_handler from foundation.logger.loggering import server_logger from foundation.utils.common import handler_err from openai import OpenAI from langchain_core.embeddings import Embeddings #from chromadb.utils.embedding_functions import EmbeddingFunction from typing import List import numpy as np class SiliconFlowEmbeddings(Embeddings): """ LangChain 兼容的硅基流动嵌入模型客户端 使用方式: embeddings = SiliconFlowEmbeddings( model="netease-youdao/bce-embedding-base_v1", api_key="sk-..." ) vectors = embeddings.embed_documents(["文本1", "文本2"]) """ def __init__(self, base_url: str, api_key: str, embed_model_id: str): self.model = embed_model_id self.api_key = api_key if not self.api_key: raise ValueError("必须提供 api_key 或设置环境变量 SILICONFLOW_API_KEY") self.client = OpenAI( api_key=self.api_key, base_url=base_url ) def embed_documents(self, texts: List[str]) -> List[List[float]]: """对文档列表进行向量化""" if not texts: return [] response = self.client.embeddings.create( model=self.model, input=texts ) return [item.embedding for item in response.data] def embed_query(self, text: str) -> List[float]: """对查询文本进行向量化""" return self.embed_documents([text])[0] class SiliconFlowAPI(BaseApiPlatform): def __init__(self , trace_id=""): self.trace_id = trace_id self.config_prefix = "siliconflow" self.model_server_url = config_handler.get(self.config_prefix, "SLCF_MODEL_SERVER_URL") self.api_key = config_handler.get(self.config_prefix, "SLCF_API_KEY") self.embed_url = self.model_server_url +"/embeddings" #/embeddings self.rerank_url = self.model_server_url +"/rerank" #/rerank self.embed_model_id = config_handler.get(self.config_prefix, "SLCF_EMBED_MODEL_ID") self.rerank_model_id = config_handler.get(self.config_prefix, "SLCF_REANKER_MODEL_ID") server_logger.info(f"SiliconFlowAPI -> embed_url:{self.embed_url},rerank_url:{self.rerank_url}") server_logger.info(f"SiliconFlowAPI -> embed_model_id:{self.embed_model_id},rerank_model_id:{self.rerank_model_id}") self.client = self.get_openai_client(self.model_server_url, self.api_key) # 创建LangChain兼容的嵌入对象 langchain_embeddings = SiliconFlowEmbeddings(base_url = self.model_server_url , api_key=self.api_key , embed_model_id=self.embed_model_id) #self.embed_model = ChromaSiliconFlowEmbedding(embeddings=langchain_embeddings) def get_embed_model(self): """ 获取嵌入模型 """ return self.embed_model def get_embeddings(self, texts: list[str]): """获取文本向量(embedding)""" try: response = self.client.embeddings.create( model=self.embed_model_id, # 指定向量模型 input=texts if isinstance(texts, list) else [texts] ) # 返回 embeddings 列表 return [data.embedding for data in response.data] except Exception as e: handler_err(server_logger, trace_id=self.trace_id, err=e, err_name='Embedding 调用失败') raise def rerank(self, input_query: str, documents: list, top_n: int = 5, return_documents: bool = True): """ 使用 BGE 重排序模型进行相关性打分 使用重排序模型对候选文档进行排序 :param query: 用户查询语句 :param documents: 候选文本列表 :param top_n: 返回前 N 个结果 :return: 排序后的结果列表,包含文本和相似度分数 """ try: headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } payload = { "model": self.rerank_model_id, "query": input_query, "documents": documents, "top_n": top_n, "return_documents": return_documents } response = requests.post(self.rerank_url, json=payload, headers=headers) response.raise_for_status() data = response.json() results = [] for item in data['results']: results.append({ "index": item['index'], "relevance_score": item['relevance_score'], "document": item.get('document', {}).get('text', None) }) return results except Exception as e: handler_err(server_logger, trace_id=self.trace_id, err=e, err_name='重排序调用失败') raise # 使用示例 if __name__ == "__main__": # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY) client = SiliconFlowAPI() # 示例1:向量化文本 texts = ["奶牛养殖技术", "牛肉市场价格分析"] embeddings = client.get_embeddings(texts) print(f"向量维度:{len(embeddings[0])}") # 输出向量维度 # 示例2:重排序文档 query = "如何提高牛奶产量?" documents = [ "奶牛饲料配比指南", "牧场管理规范", "牛奶加工工艺流程", "提高产奶量的10个技巧" ] rerank_results = client.rerank(query, documents) print("\n重排序结果:") for result in sorted(rerank_results, key=lambda x: x['relevance_score'], reverse=True): print(f"{result['index']} (得分: {result['relevance_score']:.2f}): {documents[result['index']]}")