import json import time import uuid from datetime import datetime from typing import List, Dict, Optional from langchain_core.messages import HumanMessage, AIMessage, SystemMessage from langchain_openai import ChatOpenAI from foundation.base.config import config_handler from foundation.logger.loggering import server_logger def get_models(): """ 获取模型,模型类型 默认为deepseek 、qwen """ model_type = config_handler.get("model", "MODEL_TYPE") server_logger.info(f"get_models -> model_type:{model_type}") if model_type.upper() == "QWEN": return get_deploy_qwen_models() return get_deepseek_models() def get_deepseek_models(): """ 获取DeepSeek模型 """ deepseek_model_server_url = config_handler.get("deepseek", "DEEPSEEK_SERVER_URL") deepseek_chat_model_id = config_handler.get("deepseek", "DEEPSEEK_MODEL_ID") deepseek_api_key = config_handler.get("deepseek", "DEEPSEEK_API_KEY") server_logger.info(f"get_deepseek_models -> chat_model_id:{deepseek_chat_model_id},api_key:{deepseek_api_key}") if deepseek_model_server_url is None or deepseek_chat_model_id is None or deepseek_api_key is None: server_logger.error("请设置环境变量: DEEPSEEK_SERVER_URL, DEEPSEEK_MODEL_ID, DEEPSEEK_API_KEY") raise Exception("设置环境变量: DEEPSEEK_SERVER_URL, DEEPSEEK_MODEL_ID, DEEPSEEK_API_KEY") # llm 大模型 llm = ChatOpenAI(base_url=deepseek_model_server_url, api_key=deepseek_api_key, model=deepseek_chat_model_id, max_tokens=4096, temperature=0.3, top_p=0.7, extra_body={ "enable_thinking": False # 添加这个参数以避免报错 }) # chat 大模型 chat = ChatOpenAI(base_url=deepseek_model_server_url, api_key=deepseek_api_key, model=deepseek_chat_model_id, max_tokens=4096, temperature=0.3, top_p=0.2, extra_body={ "enable_thinking": False # 添加这个参数以避免报错 }) embed = None return llm, chat, embed # 获取千问模型 def get_deploy_qwen_models(): """ 加载千问系列大模型-魔搭在线Qwen3 API服务 """ model_server_url = config_handler.get("qwen", "MODEL_SERVER_URL") chat_model_id = config_handler.get("qwen", "CHAT_MODEL_ID") api_key = config_handler.get("qwen", "API_KEY") embedding_model_id = config_handler.get("qwen", "EMBED_MODEL_ID") # temperature = os.getenv("CHAT_MODEL_TEMPERATURE") server_logger.info( f"get_qwen_chat_model -> chat_model_id:{chat_model_id},api_key:{api_key},embedding_model_id:{embedding_model_id}") if model_server_url is None or chat_model_id is None or api_key is None: server_logger.error("请设置环境变量: MODEL_SERVER_URL, CHAT_MODEL_ID, API_KEY") raise Exception("请设置环境变量: MODEL_SERVER_URL, CHAT_MODEL_ID, API_KEY") # llm 大模型 llm = ChatOpenAI(base_url=model_server_url, api_key=api_key, model=chat_model_id, max_tokens=1024, temperature=0.5, top_p=0.7, extra_body={ "enable_thinking": False # 添加这个参数以避免报错 }) # chat 大模型 chat = ChatOpenAI(base_url=model_server_url, api_key=api_key, model=chat_model_id, max_tokens=1024, temperature=0.01, top_p=0.2, extra_body={ "enable_thinking": False # 添加这个参数以避免报错 }) # embedding 大模型 text-embedding-v3 text-embedding-v4 # from langchain_community.embeddings import DashScopeEmbeddings embed = None # DashScopeEmbeddings(model=embedding_model_id) return llm, chat, embed def test_qwen_chat_model(): # 获取模型 llm, chat, embed = get_deploy_qwen_models() example_query = "你好,你是谁?" result = llm.invoke(input=example_query) server_logger.info(f"result={result}") print(f"result={result}") def test_deepseek_chat_model(): # 获取模型 llm, chat, embed = get_deepseek_models() example_query = "你好,你是谁?" result = llm.invoke(input=example_query) server_logger.info(f"result={result}") print(f"result={result}") def serialize_messages(messages: List[Dict]) -> str: """序列化消息列表为JSON字符串""" return json.dumps(messages) def deserialize_messages(data: str) -> List[Dict]: """反序列化JSON字符串为消息列表""" return json.loads(data) if data else [] def to_langchain_messages(messages: List[Dict]) -> List: """将消息字典转换为LangChain消息对象""" langchain_messages = [] for msg in messages: if msg["role"] == "user": langchain_messages.append(HumanMessage(content=msg["content"])) elif msg["role"] == "assistant": langchain_messages.append(AIMessage(content=msg["content"])) elif msg["role"] == "system": langchain_messages.append(SystemMessage(content=msg["content"])) return langchain_messages def generate_session_id() -> str: """生成唯一的会话ID""" return f"xiwuzc-{uuid.uuid4()}" def get_current_timestamp() -> float: """获取当前时间戳""" return time.time() def format_timestamp(timestamp: float) -> str: """格式化时间戳为可读字符串""" return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S") def build_input_context( trace_id: str, task_prompt_info: dict, input_query: str, context: Optional[str] = None, supplement_info: Optional[str] = None, header_info: Optional[Dict] = None ) -> str: """构建场景优化的上下文提示""" context = context or "无相关数据" supplement_info = supplement_info or "无补充信息" token = header_info.get('token', '') if header_info else '' tenantId = header_info.get('tenantId', '') if header_info else '' task_prompt_info_str = task_prompt_info["task_prompt"] return f""" 🐄 助手会话 [ID: {trace_id}] 🐖 ⏰ 时间: {format_timestamp(get_current_timestamp())} 📌 任务: {task_prompt_info_str} 📊 相关数据: {context} ℹ️ 补充信息: {supplement_info} ❓ 户问题: {input_query} 🔒 安全验证: {token} 🏠 场ID: {tenantId} """.strip() def clean_json_output(raw_output: str) -> str: """去除开头和结尾的 ```json 和 ```""" cleaned = raw_output.strip() if cleaned.startswith("```json"): cleaned = cleaned[7:] if cleaned.endswith("```"): cleaned = cleaned[:-3] return cleaned.strip() if __name__ == "__main__": test_qwen_chat_model() # 运行 # test_deepseek_chat_model()