# coding=utf-8 """ 样本中心 API 客户端 用于对接外部样本中心系统的知识库查询和批量入库接口 """ import time from typing import Dict, Optional import requests from django.conf import settings from django.utils.translation import gettext as _ from common.exception.app_exception import AppApiException from common.utils.logger import maxkb_logger class SampleCenterClient: """样本中心 API 客户端""" def __init__(self, base_url: str = None, app_id: str = None, app_secret: str = None): self.base_url = (base_url or getattr(settings, 'SAMPLE_CENTER_BASE_URL', '')).rstrip('/') self.app_id = app_id or getattr(settings, 'SAMPLE_CENTER_APP_ID', '') self.app_secret = app_secret or getattr(settings, 'SAMPLE_CENTER_APP_SECRET', '') self._token = None self._token_expires_at = 0 def _get_token(self) -> str: """获取访问令牌(自动缓存和刷新)""" if self._token and time.time() < self._token_expires_at - 60: return self._token url = f"{self.base_url}/api/v1/auth/token" payload = { "app_id": self.app_id, "app_secret": self.app_secret, } try: response = requests.post(url, json=payload, timeout=10) response.raise_for_status() result = response.json() if result.get('code') != '000000': raise AppApiException(500, _('Sample center auth failed: {msg}').format( msg=result.get('message', 'unknown error'))) data = result['data'] self._token = data['access_token'] self._token_expires_at = time.time() + data.get('expires_in', 7200) return self._token except requests.exceptions.RequestException as e: maxkb_logger.error(f'Sample center auth request failed: {e}') raise AppApiException(502, _('Failed to connect to sample center: {error}').format(error=str(e))) def _request(self, method: str, path: str, **kwargs) -> Dict: """统一请求方法""" token = self._get_token() url = f"{self.base_url}{path}" headers = { 'Authorization': f'Bearer {token}', 'X-App-Id': self.app_id, 'Content-Type': 'application/json', } headers.update(kwargs.pop('headers', {})) try: response = requests.request(method, url, headers=headers, timeout=30, **kwargs) response.raise_for_status() result = response.json() if result.get('code') != '000000': raise AppApiException(500, _('Sample center API error: {msg}').format( msg=result.get('message', 'unknown error'))) return result.get('data', {}) except requests.exceptions.RequestException as e: maxkb_logger.error(f'Sample center API request failed: {e}') raise AppApiException(502, _('Failed to connect to sample center: {error}').format(error=str(e))) def list_knowledge_bases(self, page: int = 1, page_size: int = 20) -> Dict: """查询知识库列表""" params = {'page': page, 'page_size': page_size} return self._request('GET', '/api/v1/knowledge-bases', params=params) def get_knowledge_base(self, kb_id: str) -> Dict: """查询知识库详情""" return self._request('GET', f'/api/v1/knowledge-bases/{kb_id}') def batch_import(self, kb_id: str, task_no: str, parents: list, children: list = None, callback_url: str = None) -> Dict: """提交批量入库任务""" payload = { 'task_no': task_no, 'parents': parents, } if children: payload['children'] = children if callback_url: payload['callback_url'] = callback_url return self._request('POST', f'/api/v1/knowledge-bases/{kb_id}/batch-import', json=payload) def get_import_task(self, task_id: str) -> Dict: """查询批量入库任务状态""" return self._request('GET', f'/api/v1/knowledge-bases/batch-import/{task_id}') def get_sample_center_client(**kwargs) -> SampleCenterClient: """获取样本中心客户端实例""" return SampleCenterClient(**kwargs)