| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- # !/usr/bin/ python
- # -*- coding: utf-8 -*-
- '''
- @Project : lq-agent-api
- @File :request_tool.py
- @IDE :Cursor
- @Author :
- @Date :2025/08/21
- '''
- import json
- import requests
- import time
- from requests.exceptions import RequestException
- from base.config import config_handler
- from logger.loggering import server_logger
- from utils.common import handler_err
- class KnowledgeDifyAction:
- """
- 与后端http对接交互类,支持 Token 认证和会话保持
- """
- def __init__(self, token=None, use_session=True):
- """
- 初始化连接
- :param host: 主机地址
- :param port: 端口号
- :param token: 认证令牌(可选)
- :param use_session: 是否使用会话保持(默认True)
- """
- self.dify_server_url = config_handler.get("knowledge_dify","DIFY_SERVER_URL")
- self.dify_api_key = config_handler.get("knowledge_dify","DIFY_API_KEY")
- self.dify_dataset_id_list_str = config_handler.get("knowledge_dify","DIFY_DATASET_ID_LIST")
- self.dify_dataset_id_list = [dataset_id for dataset_id in self.dify_dataset_id_list_str.split(",")]
- self.token = self.dify_api_key
- self.use_session = use_session
-
- # 创建会话对象(如果需要会话保持)
- if self.use_session:
- self.session = requests.Session()
- # 如果提供了token,添加到会话头
- if self.token:
- self.session.headers.update({"Authorization": f"Bearer {self.token}"})
-
- def get_request_knowledge_retrieve_list(self , params=None, headers=None, timeout=20, trace_id=""):
- """
- dify 批量知识库检索
- :param params: 请求参数(可选)
- :param headers: 额外请求头(可选)
- :param timeout: 超时时间(秒,默认5秒)
- :param trace_id: 操作id
- :return: 响应内容(json格式data 内容)
- """
- server_logger.info(trace_id=trace_id, msg=f"开始执行 dify 批量知识库检索,dify_dataset_id_list:{self.dify_dataset_id_list}")
- all_record_list = []
- for dataset_id in self.dify_dataset_id_list:
- response = self.get_request_knowledge_retrieve(trace_id=trace_id ,dataset_id=dataset_id , params=params , headers=headers , timeout=timeout)
- # 判断返回结果 ,如果 存在code 则返回错误
- if "code" in response:
- error_msg = f"检索知识库内容失败,详细信息:{response['status']-{response["code"]}-{response['message']}}"
- server_logger.info(f"dataset_id:{dataset_id}, {error_msg}", trace_id=trace_id)
- continue
- # 把 所有 检索列表 合并 为一个 list
- all_record_list += response["records"]
- return all_record_list
- def get_request_knowledge_retrieve(self , dataset_id , params=None, headers=None, timeout=20, trace_id=""):
- """
- dify 知识检索 -发送HTTP请求
- :param params: 请求参数(可选)
- :param headers: 额外请求头(可选)
- :param timeout: 超时时间(秒,默认5秒)
- :param trace_id: 操作id
- :return: 响应内容(json格式data 内容)
- """
- dify_dataset_url = config_handler.get("knowledge_dify","DIFY_DATASET_URL")
- # 替换 知识库id 配置
- dify_dataset_url = dify_dataset_url.format(dataset_id=dataset_id)
- # 准备请求头
- request_headers = {}
- request_headers["Content-Type"] = "application/json"
- if self.token: # 非会话模式需单独添加token
- request_headers["Authorization"] = f"Bearer {self.token}"
- if headers: # 合并额外请求头
- request_headers.update(headers)
- request_url = "{dify_server_url}{dify_dataset_url}".format(
- dify_server_url=self.dify_server_url, dify_dataset_url=dify_dataset_url)
- server_logger.info(f"{request_url}, header: {request_headers}, params={params}", trace_id=trace_id)
- try:
- response = requests.post(
- request_url,
- json=params,
- headers=request_headers,
- timeout=timeout
- )
- # 检查HTTP状态码
- response_data = response.json()
- server_logger.info(f"url: {request_url}, dataset_id:{dataset_id}, response: {response_data}", trace_id=trace_id)
- return response_data
- except RequestException as e:
- handler_err(server_logger, trace_id=trace_id, err=e, err_name='get_request_knowledge_retrieve')
- #raise e
- return {}
-
-
|