| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- # !/usr/bin/ python
- # -*- coding: utf-8 -*-
- '''
- @Project : lq-agent-api
- @File :loggering.py
- @IDE :PyCharm
- @Author :
- @Date :2025/7/11 10:48
- '''
- from foundation.infrastructure.config import config_handler
- import os
- import sys
- import logging
- from logging.handlers import RotatingFileHandler
- # 导入trace系统
- from foundation.infrastructure.tracing import TraceContext
- from foundation.infrastructure.tracing.trace_context import trace_filter
- class CompatibleLogger(logging.Logger):
- """
- 完全兼容的日志记录器,继承自 logging.Logger
- 提供按级别分文件的日志记录,每个文件包含指定级别及更高级别的日志
- """
- def __init__(self, name, log_dir="logs", console_output=True,
- file_max_mb=10, backup_count=5,
- log_format=None, datefmt=None):
- # 初始化父类
- super().__init__(name)
- self.setLevel(logging.DEBUG) # 设置logger自身为最低级别
- # 存储配置
- self.log_dir = log_dir
- self.console_output = console_output
- self.file_max_bytes = file_max_mb * 1024 * 1024
- self.backup_count = backup_count
- # 设置日志格式
- self._set_formatter(log_format, datefmt)
- # 确保日志目录存在
- os.makedirs(log_dir, exist_ok=True)
- # 清除可能存在的旧处理器
- if self.hasHandlers():
- self.handlers.clear()
- # 创建文件处理器
- self._create_file_handlers()
- # 创建控制台处理器
- if console_output:
- self._create_console_handler()
- def _set_formatter(self, log_format, datefmt):
- """设置日志格式"""
- if log_format is None:
- # 使用system_trace_id字段,通过TraceFilter自动注入
- log_format = 'P%(process)d.T%(thread)d | %(asctime)s | %(levelname)-8s | %(system_trace_id)-15s | %(log_type)-5s | %(message)s'
- if datefmt is None:
- datefmt = '%Y-%m-%d %H:%M:%S'
- self.formatter = logging.Formatter(log_format, datefmt)
- def _create_file_handlers(self):
- """为每个日志级别创建文件处理器,每个文件包含该级别及更高级别的日志"""
- level_files = {
- logging.DEBUG: os.path.join(self.log_dir, "agent_debug.log"),
- logging.INFO: os.path.join(self.log_dir, "agent_info.log"),
- logging.WARNING: os.path.join(self.log_dir, "agent_warning.log"),
- logging.ERROR: os.path.join(self.log_dir, "agent_error.log"),
- logging.CRITICAL: os.path.join(self.log_dir, "agent_critical.log"),
- }
- for level, filename in level_files.items():
- handler = RotatingFileHandler(
- filename=filename,
- mode='a',
- maxBytes=self.file_max_bytes,
- backupCount=self.backup_count,
- encoding='utf-8',
- delay=True # ✅ 延迟打开文件,避免Windows下文件占用问题
- )
- handler.setLevel(level) # 设置级别为对应文件级别
- handler.setFormatter(self.formatter)
- # 为每个级别的日志文件都添加一个筛选器,确保记录该级别及其更高级别
- handler.addFilter(lambda record, lvl=level: record.levelno >= lvl)
- # 添加trace_filter,自动注入system_trace_id
- handler.addFilter(trace_filter)
- self.addHandler(handler)
- def _create_console_handler(self):
- """创建控制台日志处理器"""
- console_handler = logging.StreamHandler(sys.stdout)
- console_handler.setLevel(logging.DEBUG)
- console_handler.setFormatter(self.formatter)
- # 添加trace_filter,自动注入system_trace_id
- console_handler.addFilter(trace_filter)
- self.addHandler(console_handler)
- def _log_with_context(self, level, msg, trace_id, log_type, *args, **kwargs):
- """统一的日志记录方法 - 兼容手动传递trace_id和自动获取trace_id"""
- extra = kwargs.get('extra', {})
- # 如果没有手动传递trace_id,则从TraceContext自动获取
- if not trace_id:
- trace_id = TraceContext.get_trace_id()
- extra.update({
- 'trace_id': trace_id,
- 'log_type': log_type
- })
- kwargs['extra'] = extra
- super().log(level, msg, *args, **kwargs)
-
- def debug(self, msg, *args, trace_id="", log_type="system", **kwargs):
- self._log_with_context(logging.DEBUG, msg, trace_id, log_type, *args, **kwargs)
- def info(self, msg, *args, trace_id="", log_type="system", **kwargs):
- self._log_with_context(logging.INFO, msg, trace_id, log_type, *args, **kwargs)
- def warning(self, msg, *args, trace_id="", log_type="system", **kwargs):
- self._log_with_context(logging.WARNING, msg, trace_id, log_type, *args, **kwargs)
- def error(self, msg, *args, trace_id="", log_type="system", **kwargs):
- self._log_with_context(logging.ERROR, msg, trace_id, log_type, *args, **kwargs)
-
- def exception(self, msg, *args, trace_id="", log_type="system", exc_info=True, **kwargs):
- """记录异常信息,包含堆栈跟踪"""
- extra = kwargs.get('extra', {})
- extra.update({
- 'trace_id': trace_id,
- 'log_type': log_type
- })
- kwargs['extra'] = extra
- kwargs['exc_info'] = exc_info # 确保异常信息被记录
- super().error(msg, *args, **kwargs) # 使用 error 级别记录异常
- def critical(self, msg, *args, trace_id="", log_type="system", **kwargs):
- self._log_with_context(logging.CRITICAL, msg, trace_id, log_type, *args, **kwargs)
- server_logger = CompatibleLogger(
- name="agent_log",
- log_dir=config_handler.get("log", "LOG_FILE_PATH" , "logs"),
- console_output=False if config_handler.get("log", "CONSOLE_OUTPUT" , "True").upper() == "FALSE" else True,
- file_max_mb=int(config_handler.get("log", "LOG_FILE_MAX_MB", "10")),
- backup_count=int(config_handler.get("log", "LOG_BACKUP_COUNT", "5"))
- )
- # 添加trace_filter到logger,自动注入system_trace_id
- server_logger.addFilter(trace_filter)
- # 设置日志级别
- server_logger.info("logging initialized")
|