| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- # coding=utf-8
- """
- 样本中心 API 客户端
- 用于对接外部样本中心系统的知识库查询和批量入库接口
- """
- import time
- from typing import Dict, Optional
- import requests
- from django.conf import settings
- from django.utils.translation import gettext as _
- from common.exception.app_exception import AppApiException
- from common.utils.logger import maxkb_logger
- class SampleCenterClient:
- """样本中心 API 客户端"""
- def __init__(self, base_url: str = None, app_id: str = None, app_secret: str = None):
- self.base_url = (base_url or getattr(settings, 'SAMPLE_CENTER_BASE_URL', '')).rstrip('/')
- self.app_id = app_id or getattr(settings, 'SAMPLE_CENTER_APP_ID', '')
- self.app_secret = app_secret or getattr(settings, 'SAMPLE_CENTER_APP_SECRET', '')
- self._token = None
- self._token_expires_at = 0
- def _get_token(self) -> str:
- """获取访问令牌(自动缓存和刷新)"""
- if self._token and time.time() < self._token_expires_at - 60:
- return self._token
- url = f"{self.base_url}/api/v1/auth/token"
- payload = {
- "app_id": self.app_id,
- "app_secret": self.app_secret,
- }
- try:
- response = requests.post(url, json=payload, timeout=10)
- response.raise_for_status()
- result = response.json()
- if result.get('code') != '000000':
- raise AppApiException(500, _('Sample center auth failed: {msg}').format(
- msg=result.get('message', 'unknown error')))
- data = result['data']
- self._token = data['access_token']
- self._token_expires_at = time.time() + data.get('expires_in', 7200)
- return self._token
- except requests.exceptions.RequestException as e:
- maxkb_logger.error(f'Sample center auth request failed: {e}')
- raise AppApiException(502, _('Failed to connect to sample center: {error}').format(error=str(e)))
- def _request(self, method: str, path: str, **kwargs) -> Dict:
- """统一请求方法"""
- token = self._get_token()
- url = f"{self.base_url}{path}"
- headers = {
- 'Authorization': f'Bearer {token}',
- 'X-App-Id': self.app_id,
- 'Content-Type': 'application/json',
- }
- headers.update(kwargs.pop('headers', {}))
- try:
- response = requests.request(method, url, headers=headers, timeout=30, **kwargs)
- response.raise_for_status()
- result = response.json()
- if result.get('code') != '000000':
- raise AppApiException(500, _('Sample center API error: {msg}').format(
- msg=result.get('message', 'unknown error')))
- return result.get('data', {})
- except requests.exceptions.RequestException as e:
- maxkb_logger.error(f'Sample center API request failed: {e}')
- raise AppApiException(502, _('Failed to connect to sample center: {error}').format(error=str(e)))
- def list_knowledge_bases(self, page: int = 1, page_size: int = 20) -> Dict:
- """查询知识库列表"""
- params = {'page': page, 'page_size': page_size}
- return self._request('GET', '/api/v1/knowledge-bases', params=params)
- def get_knowledge_base(self, kb_id: str) -> Dict:
- """查询知识库详情"""
- return self._request('GET', f'/api/v1/knowledge-bases/{kb_id}')
- def batch_import(self, kb_id: str, task_no: str, parents: list,
- children: list = None, callback_url: str = None) -> Dict:
- """提交批量入库任务"""
- payload = {
- 'task_no': task_no,
- 'parents': parents,
- }
- if children:
- payload['children'] = children
- if callback_url:
- payload['callback_url'] = callback_url
- return self._request('POST', f'/api/v1/knowledge-bases/{kb_id}/batch-import', json=payload)
- def get_import_task(self, task_id: str) -> Dict:
- """查询批量入库任务状态"""
- return self._request('GET', f'/api/v1/knowledge-bases/batch-import/{task_id}')
- def get_sample_center_client(**kwargs) -> SampleCenterClient:
- """获取样本中心客户端实例"""
- return SampleCenterClient(**kwargs)
|