from datetime import datetime from function.knowledge_dify import KnowledgeDify from logger.loggering import server_logger from utils.redis_utils import set_redis_result_cache_data from utils.tool_utils import verify_user_role from enums.common_enums import CacheDataKeyTypeEnum import json # 函数定义 要语义化,不能汉语拼音 def get_current_datetime() -> str: """ 获取当前的日期和时间 """ now = datetime.now() formatted_data = now.strftime("%Y-%m-%d %H:%M:%S") return formatted_data knowledge_dify = KnowledgeDify() async def get_knowledge_answer(question: str , trace_id: str , user_role: str) -> str: """ 相关知识问题检索的工具 知识分类包括:政策法规、市场分析、基础、管理技术、案例库、高频问题、综合类 参数: question: 用户问题中(用户输入问题) trace_id: 日志链路跟踪ID user_role: 用户角色 """ # 如果是普通用户直接返回无,只有租户用户才能检索查询知识库 if not verify_user_role(user_role): return "无" record_list = knowledge_dify.get_request_knowledge_dify(trace_id=trace_id, question=question) if record_list is None or len(record_list) == 0: return "无" # 设置缓存数据,用于智能查询 retriever_resources_list = [record["segment"]["document"] for record in record_list] server_logger.info(trace_id=trace_id, msg=f"知识库检索结果列表: retriever_resources_list={retriever_resources_list}") await set_redis_result_cache_data(CacheDataKeyTypeEnum.RETRIEVER_RESOURCES.code, trace_id, json.dumps(retriever_resources_list)) content_list = [record["segment"]["content"] for record in record_list] #server_logger.info(trace_id=trace_id, msg=f"知识库检索结果列表: content_list={content_list}") return "\n".join(content_list)