| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- import os
- import sys
- sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
- import requests
- from dotenv import load_dotenv
- from foundation.models.base_online_platform import BaseApiPlatform
- from foundation.base.config import config_handler
- from foundation.logger.loggering import server_logger
- from foundation.utils.common import handler_err
- from openai import OpenAI
- from langchain_core.embeddings import Embeddings
- #from chromadb.utils.embedding_functions import EmbeddingFunction
- from typing import List
- import numpy as np
- class SiliconFlowEmbeddings(Embeddings):
- """
- LangChain 兼容的硅基流动嵌入模型客户端
- 使用方式:
- embeddings = SiliconFlowEmbeddings(
- model="netease-youdao/bce-embedding-base_v1",
- api_key="sk-..."
- )
- vectors = embeddings.embed_documents(["文本1", "文本2"])
- """
- def __init__(self, base_url: str, api_key: str, embed_model_id: str):
- self.model = embed_model_id
- self.api_key = api_key
- if not self.api_key:
- raise ValueError("必须提供 api_key 或设置环境变量 SILICONFLOW_API_KEY")
- self.client = OpenAI(
- api_key=self.api_key,
- base_url=base_url
- )
- def embed_documents(self, texts: List[str]) -> List[List[float]]:
- """对文档列表进行向量化"""
- if not texts:
- return []
- response = self.client.embeddings.create(
- model=self.model,
- input=texts
- )
- return [item.embedding for item in response.data]
- def embed_query(self, text: str) -> List[float]:
- """对查询文本进行向量化"""
- return self.embed_documents([text])[0]
- class SiliconFlowAPI(BaseApiPlatform):
- def __init__(self , trace_id=""):
- self.trace_id = trace_id
- self.config_prefix = "siliconflow"
- self.model_server_url = config_handler.get(self.config_prefix, "SLCF_MODEL_SERVER_URL")
- self.api_key = config_handler.get(self.config_prefix, "SLCF_API_KEY")
- self.embed_url = self.model_server_url +"/embeddings" #/embeddings
- self.rerank_url = self.model_server_url +"/rerank" #/rerank
- self.embed_model_id = config_handler.get(self.config_prefix, "SLCF_EMBED_MODEL_ID")
- self.rerank_model_id = config_handler.get(self.config_prefix, "SLCF_REANKER_MODEL_ID")
- server_logger.info(f"SiliconFlowAPI -> embed_url:{self.embed_url},rerank_url:{self.rerank_url}")
- server_logger.info(f"SiliconFlowAPI -> embed_model_id:{self.embed_model_id},rerank_model_id:{self.rerank_model_id}")
- self.client = self.get_openai_client(self.model_server_url, self.api_key)
- # 创建LangChain兼容的嵌入对象
- langchain_embeddings = SiliconFlowEmbeddings(base_url = self.model_server_url , api_key=self.api_key , embed_model_id=self.embed_model_id)
- #self.embed_model = ChromaSiliconFlowEmbedding(embeddings=langchain_embeddings)
- def get_embed_model(self):
- """
- 获取嵌入模型
- """
- return self.embed_model
- def get_embeddings(self, texts: list[str]):
- """获取文本向量(embedding)"""
- try:
- response = self.client.embeddings.create(
- model=self.embed_model_id, # 指定向量模型
- input=texts if isinstance(texts, list) else [texts]
- )
- # 返回 embeddings 列表
- return [data.embedding for data in response.data]
- except Exception as e:
- handler_err(server_logger, trace_id=self.trace_id, err=e, err_name='Embedding 调用失败')
- raise
- def rerank(self, input_query: str, documents: list, top_n: int = 5, return_documents: bool = True):
- """
- 使用 BGE 重排序模型进行相关性打分
- 使用重排序模型对候选文档进行排序
- :param query: 用户查询语句
- :param documents: 候选文本列表
- :param top_n: 返回前 N 个结果
- :return: 排序后的结果列表,包含文本和相似度分数
- """
- try:
- headers = {
- "Authorization": f"Bearer {self.api_key}",
- "Content-Type": "application/json"
- }
- payload = {
- "model": self.rerank_model_id,
- "query": input_query,
- "documents": documents,
- "top_n": top_n,
- "return_documents": return_documents
- }
- response = requests.post(self.rerank_url, json=payload, headers=headers)
- response.raise_for_status()
- data = response.json()
- results = []
- for item in data['results']:
- results.append({
- "index": item['index'],
- "relevance_score": item['relevance_score'],
- "document": item.get('document', {}).get('text', None)
- })
- return results
- except Exception as e:
- handler_err(server_logger, trace_id=self.trace_id, err=e, err_name='重排序调用失败')
- raise
- # 使用示例
- if __name__ == "__main__":
- # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
- client = SiliconFlowAPI()
- # 示例1:向量化文本
- texts = ["奶牛养殖技术", "牛肉市场价格分析"]
- embeddings = client.get_embeddings(texts)
- print(f"向量维度:{len(embeddings[0])}") # 输出向量维度
- # 示例2:重排序文档
- query = "如何提高牛奶产量?"
- documents = [
- "奶牛饲料配比指南",
- "牧场管理规范",
- "牛奶加工工艺流程",
- "提高产奶量的10个技巧"
- ]
- rerank_results = client.rerank(query, documents)
-
- print("\n重排序结果:")
- for result in sorted(rerank_results, key=lambda x: x['relevance_score'], reverse=True):
- print(f"{result['index']} (得分: {result['relevance_score']:.2f}): {documents[result['index']]}")
|