loggering.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. # !/usr/bin/ python
  2. # -*- coding: utf-8 -*-
  3. '''
  4. @Project : lq-agent-api
  5. @File :loggering.py
  6. @IDE :PyCharm
  7. @Author :
  8. @Date :2025/7/11 10:48
  9. '''
  10. from foundation.infrastructure.config import config_handler
  11. import os
  12. import sys
  13. import logging
  14. from logging.handlers import RotatingFileHandler
  15. # 导入trace系统
  16. from foundation.infrastructure.tracing import TraceContext
  17. from foundation.infrastructure.tracing.trace_context import trace_filter
  18. class CompatibleLogger(logging.Logger):
  19. """
  20. 完全兼容的日志记录器,继承自 logging.Logger
  21. 提供按级别分文件的日志记录,每个文件包含指定级别及更高级别的日志
  22. """
  23. def __init__(self, name, log_dir="logs", console_output=True,
  24. file_max_mb=10, backup_count=5,
  25. log_format=None, datefmt=None):
  26. # 初始化父类
  27. super().__init__(name)
  28. self.setLevel(logging.DEBUG) # 设置logger自身为最低级别
  29. # 存储配置
  30. self.log_dir = log_dir
  31. self.console_output = console_output
  32. self.file_max_bytes = file_max_mb * 1024 * 1024
  33. self.backup_count = backup_count
  34. # 设置日志格式
  35. self._set_formatter(log_format, datefmt)
  36. # 确保日志目录存在
  37. os.makedirs(log_dir, exist_ok=True)
  38. # 清除可能存在的旧处理器
  39. if self.hasHandlers():
  40. self.handlers.clear()
  41. # 创建文件处理器
  42. self._create_file_handlers()
  43. # 创建控制台处理器
  44. if console_output:
  45. self._create_console_handler()
  46. def _set_formatter(self, log_format, datefmt):
  47. """设置日志格式"""
  48. if log_format is None:
  49. # 使用system_trace_id字段,通过TraceFilter自动注入
  50. log_format = 'P%(process)d.T%(thread)d | %(asctime)s | %(levelname)-8s | %(system_trace_id)-15s | %(log_type)-5s | %(message)s'
  51. if datefmt is None:
  52. datefmt = '%Y-%m-%d %H:%M:%S'
  53. self.formatter = logging.Formatter(log_format, datefmt)
  54. def _create_file_handlers(self):
  55. """为每个日志级别创建文件处理器,每个文件包含该级别及更高级别的日志"""
  56. level_files = {
  57. logging.DEBUG: os.path.join(self.log_dir, "agent_debug.log"),
  58. logging.INFO: os.path.join(self.log_dir, "agent_info.log"),
  59. logging.WARNING: os.path.join(self.log_dir, "agent_warning.log"),
  60. logging.ERROR: os.path.join(self.log_dir, "agent_error.log"),
  61. logging.CRITICAL: os.path.join(self.log_dir, "agent_critical.log"),
  62. }
  63. for level, filename in level_files.items():
  64. handler = RotatingFileHandler(
  65. filename=filename,
  66. mode='a',
  67. maxBytes=self.file_max_bytes,
  68. backupCount=self.backup_count,
  69. encoding='utf-8',
  70. delay=True # ✅ 延迟打开文件,避免Windows下文件占用问题
  71. )
  72. handler.setLevel(level) # 设置级别为对应文件级别
  73. handler.setFormatter(self.formatter)
  74. # 为每个级别的日志文件都添加一个筛选器,确保记录该级别及其更高级别
  75. handler.addFilter(lambda record, lvl=level: record.levelno >= lvl)
  76. # 添加trace_filter,自动注入system_trace_id
  77. handler.addFilter(trace_filter)
  78. self.addHandler(handler)
  79. def _create_console_handler(self):
  80. """创建控制台日志处理器"""
  81. console_handler = logging.StreamHandler(sys.stdout)
  82. console_handler.setLevel(logging.DEBUG)
  83. console_handler.setFormatter(self.formatter)
  84. # 添加trace_filter,自动注入system_trace_id
  85. console_handler.addFilter(trace_filter)
  86. self.addHandler(console_handler)
  87. def _log_with_context(self, level, msg, trace_id, log_type, *args, **kwargs):
  88. """统一的日志记录方法 - 兼容手动传递trace_id和自动获取trace_id"""
  89. extra = kwargs.get('extra', {})
  90. # 如果没有手动传递trace_id,则从TraceContext自动获取
  91. if not trace_id:
  92. trace_id = TraceContext.get_trace_id()
  93. extra.update({
  94. 'trace_id': trace_id,
  95. 'log_type': log_type
  96. })
  97. kwargs['extra'] = extra
  98. super().log(level, msg, *args, **kwargs)
  99. def debug(self, msg, *args, trace_id="", log_type="system", **kwargs):
  100. self._log_with_context(logging.DEBUG, msg, trace_id, log_type, *args, **kwargs)
  101. def info(self, msg, *args, trace_id="", log_type="system", **kwargs):
  102. self._log_with_context(logging.INFO, msg, trace_id, log_type, *args, **kwargs)
  103. def warning(self, msg, *args, trace_id="", log_type="system", **kwargs):
  104. self._log_with_context(logging.WARNING, msg, trace_id, log_type, *args, **kwargs)
  105. def error(self, msg, *args, trace_id="", log_type="system", **kwargs):
  106. self._log_with_context(logging.ERROR, msg, trace_id, log_type, *args, **kwargs)
  107. def exception(self, msg, *args, trace_id="", log_type="system", exc_info=True, **kwargs):
  108. """记录异常信息,包含堆栈跟踪"""
  109. extra = kwargs.get('extra', {})
  110. extra.update({
  111. 'trace_id': trace_id,
  112. 'log_type': log_type
  113. })
  114. kwargs['extra'] = extra
  115. kwargs['exc_info'] = exc_info # 确保异常信息被记录
  116. super().error(msg, *args, **kwargs) # 使用 error 级别记录异常
  117. def critical(self, msg, *args, trace_id="", log_type="system", **kwargs):
  118. self._log_with_context(logging.CRITICAL, msg, trace_id, log_type, *args, **kwargs)
  119. server_logger = CompatibleLogger(
  120. name="agent_log",
  121. log_dir=config_handler.get("log", "LOG_FILE_PATH" , "logs"),
  122. console_output=False if config_handler.get("log", "CONSOLE_OUTPUT" , "True").upper() == "FALSE" else True,
  123. file_max_mb=int(config_handler.get("log", "LOG_FILE_MAX_MB", "10")),
  124. backup_count=int(config_handler.get("log", "LOG_BACKUP_COUNT", "5"))
  125. )
  126. # 添加trace_filter到logger,自动注入system_trace_id
  127. server_logger.addFilter(trace_filter)
  128. # 设置日志级别
  129. server_logger.info("logging initialized")
  130. class ModuleLogger:
  131. """
  132. 模块专用日志记录器
  133. 为特定模块创建独立的日志文件,同时保留控制台输出
  134. """
  135. def __init__(self, name: str, module_name: str, log_dir: str = "logs",
  136. console_output: bool = True, file_max_mb: int = 10, backup_count: int = 5):
  137. """
  138. 初始化模块日志记录器
  139. Args:
  140. name: logger名称
  141. module_name: 模块标识名(用于文件名)
  142. log_dir: 日志目录
  143. console_output: 是否输出到控制台
  144. file_max_mb: 单个日志文件最大大小(MB)
  145. backup_count: 备份文件数量
  146. """
  147. self.name = name
  148. self.module_name = module_name
  149. self.log_dir = os.path.join(log_dir, module_name) # 模块日志放在子目录中
  150. self.console_output = console_output
  151. self.file_max_bytes = file_max_mb * 1024 * 1024
  152. self.backup_count = backup_count
  153. # 创建logger
  154. self.logger = logging.getLogger(name)
  155. self.logger.setLevel(logging.DEBUG)
  156. # 清除旧处理器(防止重复添加)
  157. if self.logger.hasHandlers():
  158. self.logger.handlers.clear()
  159. # 设置日志格式
  160. self._setup_formatter()
  161. # 确保日志目录存在
  162. os.makedirs(self.log_dir, exist_ok=True)
  163. # 创建文件处理器
  164. self._create_file_handlers()
  165. # 创建控制台处理器(强制为开发环境启用)
  166. if console_output:
  167. self._create_console_handler()
  168. # 同时添加一个简化的控制台处理器用于子进程
  169. self._create_simple_console_handler()
  170. # 添加trace_filter
  171. self.logger.addFilter(trace_filter)
  172. # 禁止向上传播,避免重复日志
  173. self.logger.propagate = False
  174. def _setup_formatter(self):
  175. """设置日志格式"""
  176. log_format = 'P%(process)d.T%(thread)d | %(asctime)s | %(levelname)-8s | %(system_trace_id)-15s | %(log_type)-5s | %(message)s'
  177. datefmt = '%Y-%m-%d %H:%M:%S'
  178. self.formatter = logging.Formatter(log_format, datefmt)
  179. def _create_file_handlers(self):
  180. """为模块创建独立的日志文件处理器"""
  181. level_files = {
  182. logging.DEBUG: os.path.join(self.log_dir, f"{self.module_name}_debug.log"),
  183. logging.INFO: os.path.join(self.log_dir, f"{self.module_name}_info.log"),
  184. logging.WARNING: os.path.join(self.log_dir, f"{self.module_name}_warning.log"),
  185. logging.ERROR: os.path.join(self.log_dir, f"{self.module_name}_error.log"),
  186. logging.CRITICAL: os.path.join(self.log_dir, f"{self.module_name}_critical.log"),
  187. }
  188. for level, filename in level_files.items():
  189. handler = RotatingFileHandler(
  190. filename=filename,
  191. mode='a',
  192. maxBytes=self.file_max_bytes,
  193. backupCount=self.backup_count,
  194. encoding='utf-8',
  195. delay=True
  196. )
  197. handler.setLevel(level)
  198. handler.setFormatter(self.formatter)
  199. handler.addFilter(lambda record, lvl=level: record.levelno >= lvl)
  200. handler.addFilter(trace_filter)
  201. self.logger.addHandler(handler)
  202. def _create_console_handler(self):
  203. """创建控制台日志处理器"""
  204. console_handler = logging.StreamHandler(sys.stdout)
  205. console_handler.setLevel(logging.DEBUG)
  206. console_handler.setFormatter(self.formatter)
  207. console_handler.addFilter(trace_filter)
  208. self.logger.addHandler(console_handler)
  209. def _create_simple_console_handler(self):
  210. """创建简化的控制台处理器(用于子进程输出)"""
  211. simple_formatter = logging.Formatter(
  212. '%(asctime)s | %(levelname)-8s | %(name)s | %(message)s',
  213. '%H:%M:%S'
  214. )
  215. console_handler = logging.StreamHandler(sys.stdout)
  216. console_handler.setLevel(logging.INFO)
  217. console_handler.setFormatter(simple_formatter)
  218. self.logger.addHandler(console_handler)
  219. def _log_with_context(self, level, msg, trace_id, log_type, *args, **kwargs):
  220. """统一的日志记录方法"""
  221. extra = kwargs.get('extra', {})
  222. if not trace_id:
  223. trace_id = TraceContext.get_trace_id()
  224. extra.update({
  225. 'trace_id': trace_id,
  226. 'log_type': log_type
  227. })
  228. kwargs['extra'] = extra
  229. self.logger.log(level, msg, *args, **kwargs)
  230. def debug(self, msg, *args, trace_id="", log_type="system", **kwargs):
  231. self._log_with_context(logging.DEBUG, msg, trace_id, log_type, *args, **kwargs)
  232. def info(self, msg, *args, trace_id="", log_type="system", **kwargs):
  233. self._log_with_context(logging.INFO, msg, trace_id, log_type, *args, **kwargs)
  234. def warning(self, msg, *args, trace_id="", log_type="system", **kwargs):
  235. self._log_with_context(logging.WARNING, msg, trace_id, log_type, *args, **kwargs)
  236. def error(self, msg, *args, trace_id="", log_type="system", **kwargs):
  237. self._log_with_context(logging.ERROR, msg, trace_id, log_type, *args, **kwargs)
  238. def exception(self, msg, *args, trace_id="", log_type="system", exc_info=True, **kwargs):
  239. """记录异常信息,包含堆栈跟踪"""
  240. extra = kwargs.get('extra', {})
  241. extra.update({
  242. 'trace_id': trace_id,
  243. 'log_type': log_type
  244. })
  245. kwargs['extra'] = extra
  246. kwargs['exc_info'] = exc_info
  247. self.logger.error(msg, *args, **kwargs)
  248. def critical(self, msg, *args, trace_id="", log_type="system", **kwargs):
  249. self._log_with_context(logging.CRITICAL, msg, trace_id, log_type, *args, **kwargs)
  250. # 基础日志目录
  251. base_log_dir = config_handler.get("log", "LOG_FILE_PATH", "logs")
  252. console_out = False if config_handler.get("log", "CONSOLE_OUTPUT", "True").upper() == "FALSE" else True
  253. file_max = int(config_handler.get("log", "LOG_FILE_MAX_MB", "10"))
  254. backup = int(config_handler.get("log", "LOG_BACKUP_COUNT", "5"))
  255. # 施工方案审查模块专用logger
  256. review_logger = ModuleLogger(
  257. name="construction_review",
  258. module_name="construction_review",
  259. log_dir=base_log_dir,
  260. console_output=True, # 强制启用控制台输出(开发环境)
  261. file_max_mb=file_max,
  262. backup_count=backup
  263. )
  264. review_logger.info(f"construction_review logger initialized, log_dir: {os.path.join(base_log_dir, 'construction_review')}")
  265. # 施工方案编写模块专用logger
  266. write_logger = ModuleLogger(
  267. name="construction_write",
  268. module_name="construction_write",
  269. log_dir=base_log_dir,
  270. console_output=True, # 强制启用控制台输出(开发环境)
  271. file_max_mb=file_max,
  272. backup_count=backup
  273. )
  274. write_logger.info(f"construction_write logger initialized, log_dir: {os.path.join(base_log_dir, 'construction_write')}")
  275. def configure_logging_for_subprocess():
  276. """
  277. 为子进程(Celery Worker)配置日志输出
  278. 确保所有子进程的日志都能正确输出到父进程终端
  279. """
  280. # 配置根日志记录器
  281. root_logger = logging.getLogger()
  282. root_logger.setLevel(logging.DEBUG)
  283. # 清除现有处理器
  284. if root_logger.hasHandlers():
  285. root_logger.handlers.clear()
  286. # 创建统一的控制台处理器
  287. console_handler = logging.StreamHandler(sys.stdout)
  288. console_handler.setLevel(logging.INFO)
  289. # 使用简洁格式
  290. formatter = logging.Formatter(
  291. '%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s',
  292. '%H:%M:%S'
  293. )
  294. console_handler.setFormatter(formatter)
  295. root_logger.addHandler(console_handler)
  296. # 为模块专用 logger 添加控制台处理器(因为它们设置了 propagate=False)
  297. for module_name in ['construction_write', 'construction_review']:
  298. module_logger = logging.getLogger(module_name)
  299. # 检查是否已有控制台处理器
  300. has_console_handler = any(
  301. isinstance(h, logging.StreamHandler) and h.stream == sys.stdout
  302. for h in module_logger.handlers
  303. )
  304. if not has_console_handler:
  305. # 添加简洁的控制台处理器
  306. simple_formatter = logging.Formatter(
  307. '%(asctime)s | %(levelname)-8s | ' + module_name + ' | %(message)s',
  308. '%H:%M:%S'
  309. )
  310. simple_handler = logging.StreamHandler(sys.stdout)
  311. simple_handler.setLevel(logging.INFO)
  312. simple_handler.setFormatter(simple_formatter)
  313. module_logger.addHandler(simple_handler)
  314. # 确保 Celery 相关日志也输出
  315. celery_logger = logging.getLogger('celery')
  316. celery_logger.setLevel(logging.INFO)
  317. celery_logger.propagate = True
  318. billiard_logger = logging.getLogger('billiard')
  319. billiard_logger.setLevel(logging.INFO)
  320. billiard_logger.propagate = True
  321. # 禁止过多警告
  322. logging.getLogger('kombu').setLevel(logging.WARNING)
  323. # 自动为子进程配置日志(当此模块被导入时)
  324. import os
  325. if os.environ.get('CELERY_WORKER') or 'celery' in sys.argv[0].lower():
  326. configure_logging_for_subprocess()