| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448 |
- # coding=utf-8
- """
- @project: MaxKB
- @Author:虎虎
- @file: common.py
- @date:2025/6/9 13:42
- @desc:
- """
- from typing import List
- from django.core.cache import cache
- from django.db.models import QuerySet
- from django.utils import timezone
- from django.utils.translation import gettext_lazy as _
- from application.models import Application, ChatRecord, Chat, ApplicationVersion, ChatUserType, ApplicationTypeChoices
- from application.serializers.application_chat import ChatCountSerializer
- from common.constants.cache_version import Cache_Version
- from common.database_model_manage.database_model_manage import DatabaseModelManage
- from common.exception.app_exception import ChatException
- from knowledge.models import Document
- from models_provider.models import Model
- from models_provider.tools import get_model_credential
- from system_manage.models.resource_mapping import ResourceMapping
- from tools.models import ToolRecord
- class ToolExecute:
- def __init__(self, tool_id: str,
- tool_record_id: str,
- workspace_id: str,
- source_type,
- source_id,
- debug=False):
- self.tool_id = tool_id
- self.workspace_id = workspace_id
- self.source_type = source_type
- self.source_id = source_id
- self.tool_record_id = tool_record_id
- self.debug = debug
- def get_record(self):
- if self.tool_record_id:
- if self.debug:
- return self.to_record(cache.get(Cache_Version.TOOL_WORKFLOW_EXECUTE.get_key(key=self.tool_record_id),
- version=Cache_Version.TOOL_WORKFLOW_EXECUTE.get_version()))
- else:
- return QuerySet(ToolRecord).filter(tool_id=self.tool_id, id=self.tool_record_id).first()
- return None
- def to_record(self, tool_record_dict):
- if tool_record_dict is None:
- return None
- return ToolRecord(id=tool_record_dict.get('id'),
- tool_id=tool_record_dict.get('tool_id'),
- workspace_id=tool_record_dict.get('workspace_id'),
- source_type=tool_record_dict.get('source_type'),
- source_id=tool_record_dict.get('source_id'),
- meta=tool_record_dict.get('meta'),
- state=tool_record_dict.get('state'),
- run_time=tool_record_dict.get('run_time'))
- def to_dict(self, tool_record):
- return {'id': tool_record.id,
- 'tool_id': tool_record.tool_id,
- 'workspace_id': tool_record.workspace_id,
- 'source_type': tool_record.source_type,
- 'source_id': tool_record.source_id,
- 'meta': tool_record.meta,
- 'state': tool_record.state,
- 'run_time': tool_record.run_time}
- def set_record(self, tool_record):
- cache.set(Cache_Version.TOOL_WORKFLOW_EXECUTE.get_key(key=self.tool_record_id), self.to_dict(tool_record),
- version=Cache_Version.TOOL_WORKFLOW_EXECUTE.get_version(),
- timeout=60 * 30)
- if not self.debug:
- QuerySet(ToolRecord).update_or_create(id=tool_record.id,
- create_defaults={'id': tool_record.id,
- 'tool_id': tool_record.tool_id,
- 'state': tool_record.state,
- 'workspace_id': tool_record.workspace_id,
- "source_type": tool_record.source_type,
- 'source_id': tool_record.source_id,
- 'meta': tool_record.meta,
- 'run_time': tool_record.run_time},
- defaults={
- 'workspace_id': tool_record.workspace_id,
- 'tool_id': tool_record.tool_id,
- "source_type": tool_record.source_type,
- 'source_id': tool_record.source_id,
- 'state': tool_record.state,
- 'meta': tool_record.meta,
- 'run_time': tool_record.run_time
- })
- class ChatInfo:
- def __init__(self,
- chat_id: str,
- chat_user_id: str,
- chat_user_type: str,
- ip_address: str,
- source: {},
- knowledge_id_list: List[str],
- exclude_document_id_list: list[str],
- application_id: str,
- debug=False):
- """
- :param chat_id: 对话id
- :param chat_user_id 对话用户id
- :param chat_user_type 对话用户类型
- :param knowledge_id_list: 知识库列表
- :param exclude_document_id_list: 排除的文档
- :param application_id 应用id
- :param debug 是否是调试
- :param ip_address: 用户ip地址
- :param source: 用户来源
- """
- self.chat_id = chat_id
- self.chat_user_id = chat_user_id
- self.chat_user_type = chat_user_type
- self.knowledge_id_list = knowledge_id_list
- self.exclude_document_id_list = exclude_document_id_list
- self.application_id = application_id
- self.chat_record_list: List[ChatRecord] = []
- self.application = None
- self.chat_user = None
- self.ip_address = ip_address
- self.source = source
- self.debug = debug
- @staticmethod
- def get_no_references_setting(knowledge_setting, model_setting):
- no_references_setting = knowledge_setting.get(
- 'no_references_setting', {
- 'status': 'ai_questioning',
- 'value': '{question}'})
- if no_references_setting.get('status') == 'ai_questioning':
- no_references_prompt = model_setting.get('no_references_prompt', '{question}')
- no_references_setting['value'] = no_references_prompt if len(no_references_prompt) > 0 else "{question}"
- return no_references_setting
- def get_application(self):
- if self.debug:
- application = QuerySet(Application).filter(id=self.application_id).first()
- if not application:
- raise ChatException(500, _('The application does not exist'))
- else:
- application = QuerySet(ApplicationVersion).filter(application_id=self.application_id).order_by(
- '-create_time')[0:1].first()
- if not application:
- raise ChatException(500, _("The application has not been published. Please use it after publishing."))
- if application.type == ApplicationTypeChoices.SIMPLE.value:
- # 数据集id列表
- knowledge_id_list = [str(row.target_id) for row in
- QuerySet(ResourceMapping).filter(source_id=self.application_id,
- source_type='APPLICATION',
- target_type='KNOWLEDGE')]
- # 需要排除的文档
- exclude_document_id_list = [str(document.id) for document in
- QuerySet(Document).filter(
- knowledge_id__in=knowledge_id_list,
- is_active=False)]
- self.knowledge_id_list = knowledge_id_list
- self.exclude_document_id_list = exclude_document_id_list
- self.application = application
- return application
- def get_chat_user(self, asker=None):
- if self.chat_user:
- return self.chat_user
- chat_user_model = DatabaseModelManage.get_model("chat_user")
- if self.chat_user_type == ChatUserType.CHAT_USER.value and chat_user_model:
- chat_user = QuerySet(chat_user_model).filter(id=self.chat_user_id).first()
- return {
- 'id': str(chat_user.id),
- 'email': chat_user.email,
- 'phone': chat_user.phone,
- 'nick_name': chat_user.nick_name,
- 'username': chat_user.username,
- 'source': chat_user.source
- }
- else:
- if asker:
- if isinstance(asker, dict):
- self.chat_user = asker
- else:
- self.chat_user = {'username': asker}
- else:
- self.chat_user = {'username': '游客'}
- return self.chat_user
- def get_chat_user_group(self, asker=None):
- chat_user = self.get_chat_user(asker=asker)
- chat_user_id = chat_user.get('id')
- if not chat_user_id:
- return []
- user_group_relation_model = DatabaseModelManage.get_model("user_group_relation")
- if user_group_relation_model:
- return [{
- 'id': user_group_relation.group_id,
- 'name': user_group_relation.group.name
- } for user_group_relation in
- QuerySet(user_group_relation_model).select_related('group').filter(user_id=chat_user_id)]
- return []
- def to_base_pipeline_manage_params(self):
- self.get_application()
- self.get_chat_user()
- knowledge_setting = self.application.knowledge_setting
- model_setting = self.application.model_setting
- model_id = self.application.model_id
- model_params_setting = None
- if model_id is not None:
- model = QuerySet(Model).filter(id=model_id).first()
- credential = get_model_credential(model.provider, model.model_type, model.model_name)
- model_params_setting = credential.get_model_params_setting_form(model.model_name).get_default_form_data()
- return {
- 'knowledge_id_list': self.knowledge_id_list,
- 'exclude_document_id_list': self.exclude_document_id_list,
- 'exclude_paragraph_id_list': [],
- 'top_n': 3 if knowledge_setting.get('top_n') is None else knowledge_setting.get('top_n'),
- 'similarity': 0.6 if knowledge_setting.get('similarity') is None else knowledge_setting.get('similarity'),
- 'max_paragraph_char_number': knowledge_setting.get('max_paragraph_char_number') or 5000,
- 'history_chat_record': self.chat_record_list,
- 'chat_id': self.chat_id,
- 'dialogue_number': self.application.dialogue_number,
- 'problem_optimization_prompt': self.application.problem_optimization_prompt if self.application.problem_optimization_prompt is not None and len(
- self.application.problem_optimization_prompt) > 0 else _(
- "() contains the user's question. Answer the guessed user's question based on the context ({question}) Requirement: Output a complete question and put it in the <data></data> tag"),
- 'prompt': model_setting.get(
- 'prompt') if 'prompt' in model_setting and len(model_setting.get(
- 'prompt')) > 0 else Application.get_default_model_prompt(),
- 'system': model_setting.get(
- 'system', None),
- 'model_id': model_id,
- 'problem_optimization': self.application.problem_optimization,
- 'stream': True,
- 'model_setting': model_setting,
- 'model_params_setting': model_params_setting if self.application.model_params_setting is None or len(
- self.application.model_params_setting.keys()) == 0 else self.application.model_params_setting,
- 'search_mode': self.application.knowledge_setting.get('search_mode') or 'embedding',
- 'no_references_setting': self.get_no_references_setting(self.application.knowledge_setting, model_setting),
- 'workspace_id': self.application.workspace_id,
- 'application_id': self.application_id,
- 'mcp_enable': self.application.mcp_enable,
- 'mcp_tool_ids': self.application.mcp_tool_ids,
- 'mcp_servers': self.application.mcp_servers,
- 'mcp_source': self.application.mcp_source,
- 'tool_enable': self.application.tool_enable,
- 'tool_ids': self.application.tool_ids,
- 'application_enable': self.application.application_enable,
- 'application_ids': self.application.application_ids,
- 'skill_tool_ids': self.application.skill_tool_ids,
- 'mcp_output_enable': self.application.mcp_output_enable,
- }
- def to_pipeline_manage_params(self, problem_text: str, post_response_handler,
- exclude_paragraph_id_list, chat_user_id: str, chat_user_type, ip_address, source,
- stream=True,
- form_data=None):
- if form_data is None:
- form_data = {}
- params = self.to_base_pipeline_manage_params()
- return {**params, 'problem_text': problem_text, 'post_response_handler': post_response_handler,
- 'exclude_paragraph_id_list': exclude_paragraph_id_list, 'stream': stream, 'chat_user_id': chat_user_id,
- 'chat_user_type': chat_user_type, 'ip_address': ip_address, 'source': source, 'form_data': form_data}
- def set_chat(self, question):
- if not self.debug:
- if not QuerySet(Chat).filter(id=self.chat_id).exists():
- Chat(id=self.chat_id, application_id=self.application_id, abstract=question[0:1024],
- chat_user_id=self.chat_user_id, chat_user_type=self.chat_user_type,
- ip_address=self.ip_address, source=self.source,
- asker=self.get_chat_user()).save()
- def set_chat_variable(self, chat_context):
- if not self.debug:
- chat = QuerySet(Chat).filter(id=self.chat_id).first()
- if chat:
- chat.meta = {**(chat.meta if isinstance(chat.meta, dict) else {}), **chat_context}
- chat.save()
- else:
- cache.set(Cache_Version.CHAT_VARIABLE.get_key(key=self.chat_id), chat_context,
- version=Cache_Version.CHAT_VARIABLE.get_version(),
- timeout=60 * 30)
- def get_chat_variable(self):
- if not self.debug:
- chat = QuerySet(Chat).filter(id=self.chat_id).first()
- if chat:
- return chat.meta
- return {}
- else:
- return cache.get(Cache_Version.CHAT_VARIABLE.get_key(key=self.chat_id),
- version=Cache_Version.CHAT_VARIABLE.get_version()) or {}
- def append_chat_record(self, chat_record: ChatRecord):
- chat_record.problem_text = chat_record.problem_text[0:10240] if chat_record.problem_text is not None else ""
- chat_record.answer_text = chat_record.answer_text[0:40960] if chat_record.problem_text is not None else ""
- is_save = True
- # 存入缓存中
- for index in range(len(self.chat_record_list)):
- record = self.chat_record_list[index]
- if record.id == chat_record.id:
- self.chat_record_list[index] = chat_record
- is_save = False
- break
- if is_save:
- self.chat_record_list.append(chat_record)
- if not self.debug:
- if not QuerySet(Chat).filter(id=self.chat_id).exists():
- Chat(id=self.chat_id, application_id=self.application_id, abstract=chat_record.problem_text[0:1024],
- chat_user_id=self.chat_user_id, chat_user_type=self.chat_user_type,
- ip_address=self.ip_address, source=self.source,
- asker=self.get_chat_user()).save()
- else:
- QuerySet(Chat).filter(id=self.chat_id).update(update_time=timezone.now())
- # 插入会话记录
- QuerySet(ChatRecord).update_or_create(id=chat_record.id,
- create_defaults={'id': chat_record.id,
- 'chat_id': chat_record.chat_id,
- "vote_status": chat_record.vote_status,
- 'problem_text': chat_record.problem_text,
- 'answer_text': chat_record.answer_text,
- 'answer_text_list': chat_record.answer_text_list,
- 'message_tokens': chat_record.message_tokens,
- 'answer_tokens': chat_record.answer_tokens,
- 'const': chat_record.const,
- 'details': chat_record.details,
- 'improve_paragraph_id_list': chat_record.improve_paragraph_id_list,
- 'run_time': chat_record.run_time,
- 'source': chat_record.source,
- 'ip_address': chat_record.ip_address or '',
- 'index': chat_record.index},
- defaults={
- "vote_status": chat_record.vote_status,
- 'problem_text': chat_record.problem_text,
- 'answer_text': chat_record.answer_text,
- 'answer_text_list': chat_record.answer_text_list,
- 'message_tokens': chat_record.message_tokens,
- 'answer_tokens': chat_record.answer_tokens,
- 'const': chat_record.const,
- 'details': chat_record.details,
- 'improve_paragraph_id_list': chat_record.improve_paragraph_id_list,
- 'run_time': chat_record.run_time,
- 'index': chat_record.index,
- 'source': chat_record.source,
- 'ip_address': chat_record.ip_address or '',
- })
- ChatCountSerializer(data={'chat_id': self.chat_id}).update_chat()
- def to_dict(self):
- return {
- 'chat_id': self.chat_id,
- 'chat_user_id': self.chat_user_id,
- 'chat_user_type': self.chat_user_type,
- 'ip_address': self.ip_address,
- 'source': self.source,
- 'knowledge_id_list': self.knowledge_id_list,
- 'exclude_document_id_list': self.exclude_document_id_list,
- 'application_id': self.application_id,
- 'chat_record_list': [self.chat_record_to_map(c) for c in self.chat_record_list][-20:],
- 'debug': self.debug
- }
- def chat_record_to_map(self, chat_record):
- return {'id': chat_record.id,
- 'chat_id': chat_record.chat_id,
- 'vote_status': chat_record.vote_status,
- 'problem_text': chat_record.problem_text,
- 'answer_text': chat_record.answer_text,
- 'answer_text_list': chat_record.answer_text_list,
- 'message_tokens': chat_record.message_tokens,
- 'answer_tokens': chat_record.answer_tokens,
- 'const': chat_record.const,
- 'details': chat_record.details,
- 'improve_paragraph_id_list': chat_record.improve_paragraph_id_list,
- 'run_time': chat_record.run_time,
- 'source': chat_record.source,
- 'ip_address': chat_record.ip_address,
- 'index': chat_record.index}
- @staticmethod
- def map_to_chat_record(chat_record_dict):
- return ChatRecord(id=chat_record_dict.get('id'),
- chat_id=chat_record_dict.get('chat_id'),
- vote_status=chat_record_dict.get('vote_status'),
- problem_text=chat_record_dict.get('problem_text'),
- answer_text=chat_record_dict.get('answer_text'),
- answer_text_list=chat_record_dict.get('answer_text_list'),
- message_tokens=chat_record_dict.get('message_tokens'),
- answer_tokens=chat_record_dict.get('answer_tokens'),
- const=chat_record_dict.get('const'),
- details=chat_record_dict.get('details'),
- improve_paragraph_id_list=chat_record_dict.get('improve_paragraph_id_list'),
- run_time=chat_record_dict.get('run_time'),
- index=chat_record_dict.get('index'),
- source=chat_record_dict.get('source'),
- ip_address=chat_record_dict.get('ip_address'))
- def set_cache(self):
- cache.set(Cache_Version.CHAT.get_key(key=self.chat_id), self.to_dict(),
- version=Cache_Version.CHAT_INFO.get_version(),
- timeout=60 * 30)
- @staticmethod
- def map_to_chat_info(chat_info_dict):
- c = ChatInfo(chat_info_dict.get('chat_id'), chat_info_dict.get('chat_user_id'),
- chat_info_dict.get('chat_user_type'), chat_info_dict.get('ip_address'),
- chat_info_dict.get('source'),
- chat_info_dict.get('knowledge_id_list'),
- chat_info_dict.get('exclude_document_id_list'),
- chat_info_dict.get('application_id'),
- debug=chat_info_dict.get('debug'))
- c.chat_record_list = [ChatInfo.map_to_chat_record(c_r) for c_r in chat_info_dict.get('chat_record_list')]
- return c
- @staticmethod
- def get_cache(chat_id):
- chat_info_dict = cache.get(Cache_Version.CHAT.get_key(key=chat_id),
- version=Cache_Version.CHAT_INFO.get_version())
- if chat_info_dict:
- return ChatInfo.map_to_chat_info(chat_info_dict)
- return None
- def update_resource_mapping_by_application(application_id: str, other_resource_mapping=None):
- from application.flow.tools import get_instance_resource, save_workflow_mapping, \
- application_instance_field_call_dict
- from system_manage.models.resource_mapping import ResourceType
- if other_resource_mapping is None:
- other_resource_mapping = []
- application = QuerySet(Application).filter(id=application_id).first()
- instance_mapping = get_instance_resource(application, ResourceType.APPLICATION, str(application.id),
- application_instance_field_call_dict)
- if application.type == 'WORK_FLOW':
- save_workflow_mapping(application.work_flow, ResourceType.APPLICATION, str(application_id),
- instance_mapping + other_resource_mapping)
- return
- else:
- save_workflow_mapping({}, ResourceType.APPLICATION, str(application_id),
- instance_mapping + other_resource_mapping)
|