# !/usr/bin/ python # -*- coding: utf-8 -*- ''' @Project : lq-agent-api @File :request_tool.py @IDE :Cursor @Author : @Date :2025/08/21 ''' import json import requests import time from requests.exceptions import RequestException from base.config import config_handler from logger.loggering import server_logger from utils.common import handler_err class KnowledgeDifyAction: """ 与后端http对接交互类,支持 Token 认证和会话保持 """ def __init__(self, token=None, use_session=True): """ 初始化连接 :param host: 主机地址 :param port: 端口号 :param token: 认证令牌(可选) :param use_session: 是否使用会话保持(默认True) """ self.dify_server_url = config_handler.get("knowledge_dify","DIFY_SERVER_URL") self.dify_api_key = config_handler.get("knowledge_dify","DIFY_API_KEY") self.dify_dataset_id_list_str = config_handler.get("knowledge_dify","DIFY_DATASET_ID_LIST") self.dify_dataset_id_list = [dataset_id for dataset_id in self.dify_dataset_id_list_str.split(",")] self.token = self.dify_api_key self.use_session = use_session # 创建会话对象(如果需要会话保持) if self.use_session: self.session = requests.Session() # 如果提供了token,添加到会话头 if self.token: self.session.headers.update({"Authorization": f"Bearer {self.token}"}) def get_request_knowledge_retrieve_list(self , params=None, headers=None, timeout=20, trace_id=""): """ dify 批量知识库检索 :param params: 请求参数(可选) :param headers: 额外请求头(可选) :param timeout: 超时时间(秒,默认5秒) :param trace_id: 操作id :return: 响应内容(json格式data 内容) """ server_logger.info(trace_id=trace_id, msg=f"开始执行 dify 批量知识库检索,dify_dataset_id_list:{self.dify_dataset_id_list}") all_record_list = [] for dataset_id in self.dify_dataset_id_list: response = self.get_request_knowledge_retrieve(trace_id=trace_id ,dataset_id=dataset_id , params=params , headers=headers , timeout=timeout) # 判断返回结果 ,如果 存在code 则返回错误 if "code" in response: error_msg = f"检索知识库内容失败,详细信息:{response['status']-{response["code"]}-{response['message']}}" server_logger.info(f"dataset_id:{dataset_id}, {error_msg}", trace_id=trace_id) continue # 把 所有 检索列表 合并 为一个 list all_record_list += response["records"] return all_record_list def get_request_knowledge_retrieve(self , dataset_id , params=None, headers=None, timeout=20, trace_id=""): """ dify 知识检索 -发送HTTP请求 :param params: 请求参数(可选) :param headers: 额外请求头(可选) :param timeout: 超时时间(秒,默认5秒) :param trace_id: 操作id :return: 响应内容(json格式data 内容) """ dify_dataset_url = config_handler.get("knowledge_dify","DIFY_DATASET_URL") # 替换 知识库id 配置 dify_dataset_url = dify_dataset_url.format(dataset_id=dataset_id) # 准备请求头 request_headers = {} request_headers["Content-Type"] = "application/json" if self.token: # 非会话模式需单独添加token request_headers["Authorization"] = f"Bearer {self.token}" if headers: # 合并额外请求头 request_headers.update(headers) request_url = "{dify_server_url}{dify_dataset_url}".format( dify_server_url=self.dify_server_url, dify_dataset_url=dify_dataset_url) server_logger.info(f"{request_url}, header: {request_headers}, params={params}", trace_id=trace_id) try: response = requests.post( request_url, json=params, headers=request_headers, timeout=timeout ) # 检查HTTP状态码 response_data = response.json() server_logger.info(f"url: {request_url}, dataset_id:{dataset_id}, response: {response_data}", trace_id=trace_id) return response_data except RequestException as e: handler_err(server_logger, trace_id=trace_id, err=e, err_name='get_request_knowledge_retrieve') #raise e return {}