# !/usr/bin/ python # -*- coding: utf-8 -*- ''' @Project : lq-agent-api @File :loggering.py @IDE :PyCharm @Author : @Date :2025/7/11 10:48 ''' from foundation.base.config import config_handler import os import sys import logging from logging.handlers import RotatingFileHandler # 导入trace系统 from foundation.trace.trace_context import TraceContext, trace_filter class CompatibleLogger(logging.Logger): """ 完全兼容的日志记录器,继承自 logging.Logger 提供按级别分文件的日志记录,每个文件包含指定级别及更高级别的日志 """ def __init__(self, name, log_dir="logs", console_output=True, file_max_mb=10, backup_count=5, log_format=None, datefmt=None): # 初始化父类 super().__init__(name) self.setLevel(logging.INFO) # 设置logger自身为最低级别 # 存储配置 self.log_dir = log_dir self.console_output = console_output self.file_max_bytes = file_max_mb * 1024 * 1024 self.backup_count = backup_count # 设置日志格式 self._set_formatter(log_format, datefmt) # 确保日志目录存在 os.makedirs(log_dir, exist_ok=True) # 清除可能存在的旧处理器 if self.hasHandlers(): self.handlers.clear() # 创建文件处理器 self._create_file_handlers() # 创建控制台处理器 if console_output: self._create_console_handler() def _set_formatter(self, log_format, datefmt): """设置日志格式""" if log_format is None: # 使用system_trace_id字段,通过TraceFilter自动注入 log_format = 'P%(process)d.T%(thread)d | %(asctime)s | %(levelname)-8s | %(system_trace_id)-15s | %(log_type)-5s | %(message)s' if datefmt is None: datefmt = '%Y-%m-%d %H:%M:%S' self.formatter = logging.Formatter(log_format, datefmt) def _create_file_handlers(self): """为每个日志级别创建文件处理器,每个文件包含该级别及更高级别的日志""" level_files = { logging.DEBUG: os.path.join(self.log_dir, "agent_debug.log"), logging.INFO: os.path.join(self.log_dir, "agent_info.log"), logging.WARNING: os.path.join(self.log_dir, "agent_warning.log"), logging.ERROR: os.path.join(self.log_dir, "agent_error.log"), logging.CRITICAL: os.path.join(self.log_dir, "agent_critical.log"), } for level, filename in level_files.items(): handler = RotatingFileHandler( filename=filename, mode='a', maxBytes=self.file_max_bytes, backupCount=self.backup_count, encoding='utf-8' ) handler.setLevel(level) # 设置级别为对应文件级别 handler.setFormatter(self.formatter) # 为每个级别的日志文件都添加一个筛选器,确保记录该级别及其更高级别 handler.addFilter(lambda record, lvl=level: record.levelno >= lvl) # 添加trace_filter,自动注入system_trace_id handler.addFilter(trace_filter) self.addHandler(handler) def _create_console_handler(self): """创建控制台日志处理器""" console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.DEBUG) console_handler.setFormatter(self.formatter) # 添加trace_filter,自动注入system_trace_id console_handler.addFilter(trace_filter) self.addHandler(console_handler) def _log_with_context(self, level, msg, trace_id, log_type, *args, **kwargs): """统一的日志记录方法 - 兼容手动传递trace_id和自动获取trace_id""" extra = kwargs.get('extra', {}) # 如果没有手动传递trace_id,则从TraceContext自动获取 if not trace_id: trace_id = TraceContext.get_trace_id() extra.update({ 'trace_id': trace_id, 'log_type': log_type }) kwargs['extra'] = extra super().log(level, msg, *args, **kwargs) def debug(self, msg, *args, trace_id="", log_type="system", **kwargs): self._log_with_context(logging.DEBUG, msg, trace_id, log_type, *args, **kwargs) def info(self, msg, *args, trace_id="", log_type="system", **kwargs): self._log_with_context(logging.INFO, msg, trace_id, log_type, *args, **kwargs) def warning(self, msg, *args, trace_id="", log_type="system", **kwargs): self._log_with_context(logging.WARNING, msg, trace_id, log_type, *args, **kwargs) def error(self, msg, *args, trace_id="", log_type="system", **kwargs): self._log_with_context(logging.ERROR, msg, trace_id, log_type, *args, **kwargs) def exception(self, msg, *args, trace_id="", log_type="system", exc_info=True, **kwargs): """记录异常信息,包含堆栈跟踪""" extra = kwargs.get('extra', {}) extra.update({ 'trace_id': trace_id, 'log_type': log_type }) kwargs['extra'] = extra kwargs['exc_info'] = exc_info # 确保异常信息被记录 super().error(msg, *args, **kwargs) # 使用 error 级别记录异常 def critical(self, msg, *args, trace_id="", log_type="system", **kwargs): self._log_with_context(logging.CRITICAL, msg, trace_id, log_type, *args, **kwargs) server_logger = CompatibleLogger( name="agent_log", log_dir=config_handler.get("log", "LOG_FILE_PATH" , "logs"), console_output=False if config_handler.get("log", "CONSOLE_OUTPUT" , "True").upper() == "FALSE" else True, file_max_mb=int(config_handler.get("log", "LOG_FILE_MAX_MB", "10")), backup_count=int(config_handler.get("log", "LOG_BACKUP_COUNT", "5")) ) # 添加trace_filter到logger,自动注入system_trace_id server_logger.addFilter(trace_filter) # 设置日志级别 server_logger.info("logging initialized")