| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- import json
- import time
- import uuid
- from datetime import datetime
- from typing import List, Dict, Optional
- from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
- from langchain_openai import ChatOpenAI
- from foundation.base.config import config_handler
- from foundation.logger.loggering import server_logger
- def get_models():
- """
- 获取模型,模型类型 默认为deepseek 、qwen
- """
- model_type = config_handler.get("model", "MODEL_TYPE")
- server_logger.info(f"get_models -> model_type:{model_type}")
- if model_type.upper() == "QWEN":
- return get_deploy_qwen_models()
- return get_deepseek_models()
- def get_deepseek_models():
- """
- 获取DeepSeek模型
- """
- deepseek_model_server_url = config_handler.get("deepseek", "DEEPSEEK_SERVER_URL")
- deepseek_chat_model_id = config_handler.get("deepseek", "DEEPSEEK_MODEL_ID")
- deepseek_api_key = config_handler.get("deepseek", "DEEPSEEK_API_KEY")
- server_logger.info(f"get_deepseek_models -> chat_model_id:{deepseek_chat_model_id},api_key:{deepseek_api_key}")
- if deepseek_model_server_url is None or deepseek_chat_model_id is None or deepseek_api_key is None:
- server_logger.error("请设置环境变量: DEEPSEEK_SERVER_URL, DEEPSEEK_MODEL_ID, DEEPSEEK_API_KEY")
- raise Exception("设置环境变量: DEEPSEEK_SERVER_URL, DEEPSEEK_MODEL_ID, DEEPSEEK_API_KEY")
- # llm 大模型
- llm = ChatOpenAI(base_url=deepseek_model_server_url,
- api_key=deepseek_api_key,
- model=deepseek_chat_model_id,
- max_tokens=4096,
- temperature=0.3,
- top_p=0.7,
- extra_body={
- "enable_thinking": False # 添加这个参数以避免报错
- })
- # chat 大模型
- chat = ChatOpenAI(base_url=deepseek_model_server_url,
- api_key=deepseek_api_key,
- model=deepseek_chat_model_id,
- max_tokens=4096,
- temperature=0.3,
- top_p=0.2,
- extra_body={
- "enable_thinking": False # 添加这个参数以避免报错
- })
- embed = None
- return llm, chat, embed
- # 获取千问模型
- def get_deploy_qwen_models():
- """
- 加载千问系列大模型-魔搭在线Qwen3 API服务
- """
- model_server_url = config_handler.get("qwen", "MODEL_SERVER_URL")
- chat_model_id = config_handler.get("qwen", "CHAT_MODEL_ID")
- api_key = config_handler.get("qwen", "API_KEY")
- embedding_model_id = config_handler.get("qwen", "EMBED_MODEL_ID")
- # temperature = os.getenv("CHAT_MODEL_TEMPERATURE")
- server_logger.info(
- f"get_qwen_chat_model -> chat_model_id:{chat_model_id},api_key:{api_key},embedding_model_id:{embedding_model_id}")
- if model_server_url is None or chat_model_id is None or api_key is None:
- server_logger.error("请设置环境变量: MODEL_SERVER_URL, CHAT_MODEL_ID, API_KEY")
- raise Exception("请设置环境变量: MODEL_SERVER_URL, CHAT_MODEL_ID, API_KEY")
- # llm 大模型
- llm = ChatOpenAI(base_url=model_server_url,
- api_key=api_key,
- model=chat_model_id,
- max_tokens=1024,
- temperature=0.5,
- top_p=0.7,
- extra_body={
- "enable_thinking": False # 添加这个参数以避免报错
- })
- # chat 大模型
- chat = ChatOpenAI(base_url=model_server_url,
- api_key=api_key,
- model=chat_model_id,
- max_tokens=1024,
- temperature=0.01,
- top_p=0.2,
- extra_body={
- "enable_thinking": False # 添加这个参数以避免报错
- })
- # embedding 大模型 text-embedding-v3 text-embedding-v4
- # from langchain_community.embeddings import DashScopeEmbeddings
- embed = None # DashScopeEmbeddings(model=embedding_model_id)
- return llm, chat, embed
- def test_qwen_chat_model():
- # 获取模型
- llm, chat, embed = get_deploy_qwen_models()
- example_query = "你好,你是谁?"
- result = llm.invoke(input=example_query)
- server_logger.info(f"result={result}")
- print(f"result={result}")
- def test_deepseek_chat_model():
- # 获取模型
- llm, chat, embed = get_deepseek_models()
- example_query = "你好,你是谁?"
- result = llm.invoke(input=example_query)
- server_logger.info(f"result={result}")
- print(f"result={result}")
- def serialize_messages(messages: List[Dict]) -> str:
- """序列化消息列表为JSON字符串"""
- return json.dumps(messages)
- def deserialize_messages(data: str) -> List[Dict]:
- """反序列化JSON字符串为消息列表"""
- return json.loads(data) if data else []
- def to_langchain_messages(messages: List[Dict]) -> List:
- """将消息字典转换为LangChain消息对象"""
- langchain_messages = []
- for msg in messages:
- if msg["role"] == "user":
- langchain_messages.append(HumanMessage(content=msg["content"]))
- elif msg["role"] == "assistant":
- langchain_messages.append(AIMessage(content=msg["content"]))
- elif msg["role"] == "system":
- langchain_messages.append(SystemMessage(content=msg["content"]))
- return langchain_messages
- def generate_session_id() -> str:
- """生成唯一的会话ID"""
- return f"xiwuzc-{uuid.uuid4()}"
- def get_current_timestamp() -> float:
- """获取当前时间戳"""
- return time.time()
- def format_timestamp(timestamp: float) -> str:
- """格式化时间戳为可读字符串"""
- return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
- def build_input_context(
- trace_id: str,
- task_prompt_info: dict,
- input_query: str,
- context: Optional[str] = None,
- supplement_info: Optional[str] = None,
- header_info: Optional[Dict] = None
- ) -> str:
- """构建场景优化的上下文提示"""
- context = context or "无相关数据"
- supplement_info = supplement_info or "无补充信息"
- token = header_info.get('token', '') if header_info else ''
- tenantId = header_info.get('tenantId', '') if header_info else ''
- task_prompt_info_str = task_prompt_info["task_prompt"]
- return f"""
- 🐄 助手会话 [ID: {trace_id}] 🐖
- ⏰ 时间: {format_timestamp(get_current_timestamp())}
- 📌 任务: {task_prompt_info_str}
- 📊 相关数据:
- {context}
- ℹ️ 补充信息:
- {supplement_info}
- ❓ 户问题:
- {input_query}
- 🔒 安全验证: {token}
- 🏠 场ID: {tenantId}
- """.strip()
- def clean_json_output(raw_output: str) -> str:
- """去除开头和结尾的 ```json 和 ```"""
- cleaned = raw_output.strip()
- if cleaned.startswith("```json"):
- cleaned = cleaned[7:]
- if cleaned.endswith("```"):
- cleaned = cleaned[:-3]
- return cleaned.strip()
- if __name__ == "__main__":
- test_qwen_chat_model() # 运行
- # test_deepseek_chat_model()
|