# coding=utf-8 """ 插件异常处理器 对插件运行时异常进行分类和标准化处理 """ from plugin.services.error_codes import PluginErrorCode from common.utils.logger import maxkb_logger class PluginException(Exception): """插件基础异常""" def __init__(self, error_code: str, message: str, detail: dict = None): self.error_code = error_code self.message = message self.detail = detail or {} super().__init__(message) class PluginNotFoundException(PluginException): """插件不存在异常""" def __init__(self, plugin_id: str = ''): code, msg = PluginErrorCode.PLUGIN_NOT_FOUND super().__init__(code, msg, {'plugin_id': plugin_id}) class PluginAlreadyExistsException(PluginException): """插件已存在异常""" def __init__(self, code: str = ''): error_code, msg = PluginErrorCode.PLUGIN_ALREADY_EXISTS super().__init__(error_code, msg, {'plugin_code': code}) class PluginNotInstalledException(PluginException): """插件未安装异常""" def __init__(self, plugin_id: str = ''): error_code, msg = PluginErrorCode.PLUGIN_NOT_INSTALLED super().__init__(error_code, msg, {'plugin_id': plugin_id}) class PluginStatusException(PluginException): """插件状态异常""" def __init__(self, plugin_id: str = '', current_status: str = ''): error_code, msg = PluginErrorCode.PLUGIN_STATUS_ERROR super().__init__(error_code, msg, { 'plugin_id': plugin_id, 'current_status': current_status }) class PluginTestException(PluginException): """插件测试异常""" def __init__(self, error_code: str = None, message: str = '', detail: dict = None): if not error_code: error_code, default_msg = PluginErrorCode.TEST_FAILED message = message or default_msg super().__init__(error_code, message, detail or {}) class PluginTestTimeoutException(PluginTestException): """插件测试超时异常""" def __init__(self, timeout_ms: int = 0): error_code, msg = PluginErrorCode.TEST_TIMEOUT super().__init__(error_code, msg, {'timeout_ms': timeout_ms}) class PluginSchemaException(PluginException): """插件 Schema 异常""" def __init__(self, error_code: str = None, message: str = '', detail: dict = None): if not error_code: error_code, default_msg = PluginErrorCode.SCHEMA_INVALID message = message or default_msg super().__init__(error_code, message, detail or {}) class PluginExceptionHandler: """插件异常处理器""" @staticmethod def handle(exception: Exception) -> dict: """ 处理插件异常,返回标准化错误响应 """ if isinstance(exception, PluginException): return { 'success': False, 'error_code': exception.error_code, 'message': exception.message, 'detail': exception.detail } # 未知异常 maxkb_logger.error(f"Unhandled plugin exception: {exception}", exc_info=True) return { 'success': False, 'error_code': 'PLUGIN_999', 'message': '插件内部错误', 'detail': {'original_error': str(exception)} } @staticmethod def to_api_response(exception: Exception) -> dict: """转换为 API 响应格式""" error = PluginExceptionHandler.handle(exception) return { 'code': 400, 'message': error['message'], 'data': { 'error_code': error['error_code'], 'detail': error['detail'] } }