# !/usr/bin/ python # -*- coding: utf-8 -*- ''' @Project : lq-agent-api @File :loggering.py @IDE :PyCharm @Author : @Date :2025/7/11 10:48 ''' from foundation.infrastructure.config import config_handler import os import sys import logging from logging.handlers import RotatingFileHandler # 导入trace系统 from foundation.infrastructure.tracing import TraceContext from foundation.infrastructure.tracing.trace_context import trace_filter class CompatibleLogger(logging.Logger): """ 完全兼容的日志记录器,继承自 logging.Logger 提供按级别分文件的日志记录,每个文件包含指定级别及更高级别的日志 """ def __init__(self, name, log_dir="logs", console_output=True, file_max_mb=10, backup_count=5, log_format=None, datefmt=None): # 初始化父类 super().__init__(name) self.setLevel(logging.DEBUG) # 设置logger自身为最低级别 # 存储配置 self.log_dir = log_dir self.console_output = console_output self.file_max_bytes = file_max_mb * 1024 * 1024 self.backup_count = backup_count # 设置日志格式 self._set_formatter(log_format, datefmt) # 确保日志目录存在 os.makedirs(log_dir, exist_ok=True) # 清除可能存在的旧处理器 if self.hasHandlers(): self.handlers.clear() # 创建文件处理器 self._create_file_handlers() # 创建控制台处理器 if console_output: self._create_console_handler() def _set_formatter(self, log_format, datefmt): """设置日志格式""" if log_format is None: # 使用system_trace_id字段,通过TraceFilter自动注入 log_format = 'P%(process)d.T%(thread)d | %(asctime)s | %(levelname)-8s | %(system_trace_id)-15s | %(log_type)-5s | %(message)s' if datefmt is None: datefmt = '%Y-%m-%d %H:%M:%S' self.formatter = logging.Formatter(log_format, datefmt) def _create_file_handlers(self): """为每个日志级别创建文件处理器,每个文件包含该级别及更高级别的日志""" level_files = { logging.DEBUG: os.path.join(self.log_dir, "agent_debug.log"), logging.INFO: os.path.join(self.log_dir, "agent_info.log"), logging.WARNING: os.path.join(self.log_dir, "agent_warning.log"), logging.ERROR: os.path.join(self.log_dir, "agent_error.log"), logging.CRITICAL: os.path.join(self.log_dir, "agent_critical.log"), } for level, filename in level_files.items(): handler = RotatingFileHandler( filename=filename, mode='a', maxBytes=self.file_max_bytes, backupCount=self.backup_count, encoding='utf-8', delay=True # ✅ 延迟打开文件,避免Windows下文件占用问题 ) handler.setLevel(level) # 设置级别为对应文件级别 handler.setFormatter(self.formatter) # 为每个级别的日志文件都添加一个筛选器,确保记录该级别及其更高级别 handler.addFilter(lambda record, lvl=level: record.levelno >= lvl) # 添加trace_filter,自动注入system_trace_id handler.addFilter(trace_filter) self.addHandler(handler) def _create_console_handler(self): """创建控制台日志处理器""" console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.DEBUG) console_handler.setFormatter(self.formatter) # 添加trace_filter,自动注入system_trace_id console_handler.addFilter(trace_filter) self.addHandler(console_handler) def _log_with_context(self, level, msg, trace_id, log_type, *args, **kwargs): """统一的日志记录方法 - 兼容手动传递trace_id和自动获取trace_id""" extra = kwargs.get('extra', {}) # 如果没有手动传递trace_id,则从TraceContext自动获取 if not trace_id: trace_id = TraceContext.get_trace_id() extra.update({ 'trace_id': trace_id, 'log_type': log_type }) kwargs['extra'] = extra super().log(level, msg, *args, **kwargs) def debug(self, msg, *args, trace_id="", log_type="system", **kwargs): self._log_with_context(logging.DEBUG, msg, trace_id, log_type, *args, **kwargs) def info(self, msg, *args, trace_id="", log_type="system", **kwargs): self._log_with_context(logging.INFO, msg, trace_id, log_type, *args, **kwargs) def warning(self, msg, *args, trace_id="", log_type="system", **kwargs): self._log_with_context(logging.WARNING, msg, trace_id, log_type, *args, **kwargs) def error(self, msg, *args, trace_id="", log_type="system", **kwargs): self._log_with_context(logging.ERROR, msg, trace_id, log_type, *args, **kwargs) def exception(self, msg, *args, trace_id="", log_type="system", exc_info=True, **kwargs): """记录异常信息,包含堆栈跟踪""" extra = kwargs.get('extra', {}) extra.update({ 'trace_id': trace_id, 'log_type': log_type }) kwargs['extra'] = extra kwargs['exc_info'] = exc_info # 确保异常信息被记录 super().error(msg, *args, **kwargs) # 使用 error 级别记录异常 def critical(self, msg, *args, trace_id="", log_type="system", **kwargs): self._log_with_context(logging.CRITICAL, msg, trace_id, log_type, *args, **kwargs) server_logger = CompatibleLogger( name="agent_log", log_dir=config_handler.get("log", "LOG_FILE_PATH" , "logs"), console_output=False if config_handler.get("log", "CONSOLE_OUTPUT" , "True").upper() == "FALSE" else True, file_max_mb=int(config_handler.get("log", "LOG_FILE_MAX_MB", "10")), backup_count=int(config_handler.get("log", "LOG_BACKUP_COUNT", "5")) ) # 添加trace_filter到logger,自动注入system_trace_id server_logger.addFilter(trace_filter) # 设置日志级别 server_logger.info("logging initialized") class ModuleLogger: """ 模块专用日志记录器 为特定模块创建独立的日志文件,同时保留控制台输出 """ def __init__(self, name: str, module_name: str, log_dir: str = "logs", console_output: bool = True, file_max_mb: int = 10, backup_count: int = 5): """ 初始化模块日志记录器 Args: name: logger名称 module_name: 模块标识名(用于文件名) log_dir: 日志目录 console_output: 是否输出到控制台 file_max_mb: 单个日志文件最大大小(MB) backup_count: 备份文件数量 """ self.name = name self.module_name = module_name self.log_dir = os.path.join(log_dir, module_name) # 模块日志放在子目录中 self.console_output = console_output self.file_max_bytes = file_max_mb * 1024 * 1024 self.backup_count = backup_count # 创建logger self.logger = logging.getLogger(name) self.logger.setLevel(logging.DEBUG) # 清除旧处理器(防止重复添加) if self.logger.hasHandlers(): self.logger.handlers.clear() # 设置日志格式 self._setup_formatter() # 确保日志目录存在 os.makedirs(self.log_dir, exist_ok=True) # 创建文件处理器 self._create_file_handlers() # 创建控制台处理器(强制为开发环境启用) if console_output: self._create_console_handler() # 同时添加一个简化的控制台处理器用于子进程 self._create_simple_console_handler() # 添加trace_filter self.logger.addFilter(trace_filter) # 禁止向上传播,避免重复日志 self.logger.propagate = False def _setup_formatter(self): """设置日志格式""" log_format = 'P%(process)d.T%(thread)d | %(asctime)s | %(levelname)-8s | %(system_trace_id)-15s | %(log_type)-5s | %(message)s' datefmt = '%Y-%m-%d %H:%M:%S' self.formatter = logging.Formatter(log_format, datefmt) def _create_file_handlers(self): """为模块创建独立的日志文件处理器""" level_files = { logging.DEBUG: os.path.join(self.log_dir, f"{self.module_name}_debug.log"), logging.INFO: os.path.join(self.log_dir, f"{self.module_name}_info.log"), logging.WARNING: os.path.join(self.log_dir, f"{self.module_name}_warning.log"), logging.ERROR: os.path.join(self.log_dir, f"{self.module_name}_error.log"), logging.CRITICAL: os.path.join(self.log_dir, f"{self.module_name}_critical.log"), } for level, filename in level_files.items(): handler = RotatingFileHandler( filename=filename, mode='a', maxBytes=self.file_max_bytes, backupCount=self.backup_count, encoding='utf-8', delay=True ) handler.setLevel(level) handler.setFormatter(self.formatter) handler.addFilter(lambda record, lvl=level: record.levelno >= lvl) handler.addFilter(trace_filter) self.logger.addHandler(handler) def _create_console_handler(self): """创建控制台日志处理器""" console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.DEBUG) console_handler.setFormatter(self.formatter) console_handler.addFilter(trace_filter) self.logger.addHandler(console_handler) def _create_simple_console_handler(self): """创建简化的控制台处理器(用于子进程输出)""" simple_formatter = logging.Formatter( '%(asctime)s | %(levelname)-8s | %(name)s | %(message)s', '%H:%M:%S' ) console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.INFO) console_handler.setFormatter(simple_formatter) self.logger.addHandler(console_handler) def _log_with_context(self, level, msg, trace_id, log_type, *args, **kwargs): """统一的日志记录方法""" extra = kwargs.get('extra', {}) if not trace_id: trace_id = TraceContext.get_trace_id() extra.update({ 'trace_id': trace_id, 'log_type': log_type }) kwargs['extra'] = extra self.logger.log(level, msg, *args, **kwargs) def debug(self, msg, *args, trace_id="", log_type="system", **kwargs): self._log_with_context(logging.DEBUG, msg, trace_id, log_type, *args, **kwargs) def info(self, msg, *args, trace_id="", log_type="system", **kwargs): self._log_with_context(logging.INFO, msg, trace_id, log_type, *args, **kwargs) def warning(self, msg, *args, trace_id="", log_type="system", **kwargs): self._log_with_context(logging.WARNING, msg, trace_id, log_type, *args, **kwargs) def error(self, msg, *args, trace_id="", log_type="system", **kwargs): self._log_with_context(logging.ERROR, msg, trace_id, log_type, *args, **kwargs) def exception(self, msg, *args, trace_id="", log_type="system", exc_info=True, **kwargs): """记录异常信息,包含堆栈跟踪""" extra = kwargs.get('extra', {}) extra.update({ 'trace_id': trace_id, 'log_type': log_type }) kwargs['extra'] = extra kwargs['exc_info'] = exc_info self.logger.error(msg, *args, **kwargs) def critical(self, msg, *args, trace_id="", log_type="system", **kwargs): self._log_with_context(logging.CRITICAL, msg, trace_id, log_type, *args, **kwargs) # 基础日志目录 base_log_dir = config_handler.get("log", "LOG_FILE_PATH", "logs") console_out = False if config_handler.get("log", "CONSOLE_OUTPUT", "True").upper() == "FALSE" else True file_max = int(config_handler.get("log", "LOG_FILE_MAX_MB", "10")) backup = int(config_handler.get("log", "LOG_BACKUP_COUNT", "5")) # 施工方案审查模块专用logger review_logger = ModuleLogger( name="construction_review", module_name="construction_review", log_dir=base_log_dir, console_output=True, # 强制启用控制台输出(开发环境) file_max_mb=file_max, backup_count=backup ) review_logger.info(f"construction_review logger initialized, log_dir: {os.path.join(base_log_dir, 'construction_review')}") # 施工方案编写模块专用logger write_logger = ModuleLogger( name="construction_write", module_name="construction_write", log_dir=base_log_dir, console_output=True, # 强制启用控制台输出(开发环境) file_max_mb=file_max, backup_count=backup ) write_logger.info(f"construction_write logger initialized, log_dir: {os.path.join(base_log_dir, 'construction_write')}") def configure_logging_for_subprocess(): """ 为子进程(Celery Worker)配置日志输出 确保所有子进程的日志都能正确输出到父进程终端 """ # 配置根日志记录器 root_logger = logging.getLogger() root_logger.setLevel(logging.DEBUG) # 清除现有处理器 if root_logger.hasHandlers(): root_logger.handlers.clear() # 创建统一的控制台处理器 console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.INFO) # 使用简洁格式 formatter = logging.Formatter( '%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s', '%H:%M:%S' ) console_handler.setFormatter(formatter) root_logger.addHandler(console_handler) # 为模块专用 logger 添加控制台处理器(因为它们设置了 propagate=False) for module_name in ['construction_write', 'construction_review']: module_logger = logging.getLogger(module_name) # 检查是否已有控制台处理器 has_console_handler = any( isinstance(h, logging.StreamHandler) and h.stream == sys.stdout for h in module_logger.handlers ) if not has_console_handler: # 添加简洁的控制台处理器 simple_formatter = logging.Formatter( '%(asctime)s | %(levelname)-8s | ' + module_name + ' | %(message)s', '%H:%M:%S' ) simple_handler = logging.StreamHandler(sys.stdout) simple_handler.setLevel(logging.INFO) simple_handler.setFormatter(simple_formatter) module_logger.addHandler(simple_handler) # 确保 Celery 相关日志也输出 celery_logger = logging.getLogger('celery') celery_logger.setLevel(logging.INFO) celery_logger.propagate = True billiard_logger = logging.getLogger('billiard') billiard_logger.setLevel(logging.INFO) billiard_logger.propagate = True # 禁止过多警告 logging.getLogger('kombu').setLevel(logging.WARNING) # 自动为子进程配置日志(当此模块被导入时) import os if os.environ.get('CELERY_WORKER') or 'celery' in sys.argv[0].lower(): configure_logging_for_subprocess()