| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- # coding=utf-8
- """
- 插件异常处理器
- 对插件运行时异常进行分类和标准化处理
- """
- from plugin.services.error_codes import PluginErrorCode
- from common.utils.logger import maxkb_logger
- class PluginException(Exception):
- """插件基础异常"""
- def __init__(self, error_code: str, message: str, detail: dict = None):
- self.error_code = error_code
- self.message = message
- self.detail = detail or {}
- super().__init__(message)
- class PluginNotFoundException(PluginException):
- """插件不存在异常"""
- def __init__(self, plugin_id: str = ''):
- code, msg = PluginErrorCode.PLUGIN_NOT_FOUND
- super().__init__(code, msg, {'plugin_id': plugin_id})
- class PluginAlreadyExistsException(PluginException):
- """插件已存在异常"""
- def __init__(self, code: str = ''):
- error_code, msg = PluginErrorCode.PLUGIN_ALREADY_EXISTS
- super().__init__(error_code, msg, {'plugin_code': code})
- class PluginNotInstalledException(PluginException):
- """插件未安装异常"""
- def __init__(self, plugin_id: str = ''):
- error_code, msg = PluginErrorCode.PLUGIN_NOT_INSTALLED
- super().__init__(error_code, msg, {'plugin_id': plugin_id})
- class PluginStatusException(PluginException):
- """插件状态异常"""
- def __init__(self, plugin_id: str = '', current_status: str = ''):
- error_code, msg = PluginErrorCode.PLUGIN_STATUS_ERROR
- super().__init__(error_code, msg, {
- 'plugin_id': plugin_id,
- 'current_status': current_status
- })
- class PluginTestException(PluginException):
- """插件测试异常"""
- def __init__(self, error_code: str = None, message: str = '', detail: dict = None):
- if not error_code:
- error_code, default_msg = PluginErrorCode.TEST_FAILED
- message = message or default_msg
- super().__init__(error_code, message, detail or {})
- class PluginTestTimeoutException(PluginTestException):
- """插件测试超时异常"""
- def __init__(self, timeout_ms: int = 0):
- error_code, msg = PluginErrorCode.TEST_TIMEOUT
- super().__init__(error_code, msg, {'timeout_ms': timeout_ms})
- class PluginSchemaException(PluginException):
- """插件 Schema 异常"""
- def __init__(self, error_code: str = None, message: str = '', detail: dict = None):
- if not error_code:
- error_code, default_msg = PluginErrorCode.SCHEMA_INVALID
- message = message or default_msg
- super().__init__(error_code, message, detail or {})
- class PluginExceptionHandler:
- """插件异常处理器"""
- @staticmethod
- def handle(exception: Exception) -> dict:
- """
- 处理插件异常,返回标准化错误响应
- """
- if isinstance(exception, PluginException):
- return {
- 'success': False,
- 'error_code': exception.error_code,
- 'message': exception.message,
- 'detail': exception.detail
- }
- # 未知异常
- maxkb_logger.error(f"Unhandled plugin exception: {exception}", exc_info=True)
- return {
- 'success': False,
- 'error_code': 'PLUGIN_999',
- 'message': '插件内部错误',
- 'detail': {'original_error': str(exception)}
- }
- @staticmethod
- def to_api_response(exception: Exception) -> dict:
- """转换为 API 响应格式"""
- error = PluginExceptionHandler.handle(exception)
- return {
- 'code': 400,
- 'message': error['message'],
- 'data': {
- 'error_code': error['error_code'],
- 'detail': error['detail']
- }
- }
|