app.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893
  1. import os
  2. import sys
  3. # Windows 平台 Celery 兼容性设置(必须在导入 celery 之前)
  4. if sys.platform == 'win32':
  5. os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
  6. import time
  7. import redis
  8. import signal
  9. import uvicorn
  10. import datetime
  11. import traceback
  12. import threading
  13. BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  14. sys.path.insert(0, BASE_DIR)
  15. from views import lifespan
  16. from fastapi import FastAPI, HTTPException
  17. from fastapi.responses import JSONResponse
  18. from pydantic import BaseModel
  19. from typing import Optional, Dict, Any
  20. from fastapi.middleware.cors import CORSMiddleware
  21. from foundation.infrastructure.config.config import config_handler
  22. from foundation.infrastructure.cache import RedisConnectionFactory
  23. from foundation.observability.logger.loggering import server_logger
  24. from foundation.infrastructure.messaging.celery_app import app as celery_app
  25. # 导入所有路由
  26. from views.test_views import test_router
  27. from views.construction_review.file_upload import file_upload_router
  28. from views.construction_review.review_results import review_results_router
  29. from views.construction_review.launch_review import launch_review_router
  30. from views.construction_review.task_control import task_control_router
  31. # 导入施工方案编写路由
  32. from views.construction_write.outline_views import outline_router
  33. class ServerUtils:
  34. """服务器工具函数类 - 集中管理工具方法"""
  35. @staticmethod
  36. def get_redis_connection():
  37. """获取Redis连接的统一工具函数
  38. Returns:
  39. redis.Redis: Redis连接对象
  40. """
  41. # 从配置文件获取Redis连接参数
  42. redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
  43. redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
  44. redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
  45. redis_db = config_handler.get('redis', 'REDIS_DB', '0')
  46. # 构建Redis URL
  47. if redis_password:
  48. redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}'
  49. else:
  50. redis_url = f'redis://{redis_host}:{redis_port}/{redis_db}'
  51. return redis.from_url(redis_url, decode_responses=True)
  52. class RouteManager:
  53. """路由管理类 - 负责路由配置和中间件设置"""
  54. def __init__(self, app: FastAPI):
  55. """初始化路由管理器
  56. Args:
  57. app: FastAPI应用实例
  58. """
  59. self.app = app
  60. self._setup_cors()
  61. self._setup_routes()
  62. self._setup_exception_handlers()
  63. self._setup_health_checks()
  64. self._setup_api_docs()
  65. def _setup_cors(self):
  66. """配置CORS中间件"""
  67. self.app.add_middleware(
  68. CORSMiddleware,
  69. allow_origins=["*"], # 允许所有的来源
  70. allow_credentials=True,
  71. allow_methods=["*"], # 允许的HTTP方法
  72. allow_headers=["*"], # 允许的请求头
  73. )
  74. def _setup_routes(self):
  75. """配置所有路由"""
  76. self.app.include_router(test_router)
  77. self.app.include_router(file_upload_router)
  78. self.app.include_router(review_results_router)
  79. self.app.include_router(launch_review_router)
  80. self.app.include_router(task_control_router) # 任务控制路由
  81. # 施工方案编写路由
  82. self.app.include_router(outline_router)
  83. def _setup_exception_handlers(self):
  84. """配置全局异常处理"""
  85. @self.app.exception_handler(HTTPException)
  86. async def http_exception_handler(request, exc):
  87. return JSONResponse(
  88. status_code=exc.status_code,
  89. content=exc.detail
  90. )
  91. def _setup_health_checks(self):
  92. """配置健康检查接口"""
  93. @self.app.get("/health" ,tags=["系统状态"])
  94. async def health_check():
  95. timestamp = datetime.datetime.now().isoformat()
  96. return {"status": "healthy", "timestamp": timestamp}
  97. @self.app.get("/celery/status", tags=["系统状态"])
  98. async def get_celery_status():
  99. """获取Celery Worker状态"""
  100. # 延迟导入避免循环引用
  101. from server.app import celery_manager
  102. status = celery_manager.get_status()
  103. return {
  104. "celery_worker": status,
  105. "timestamp": datetime.datetime.now().isoformat()
  106. }
  107. def _setup_api_docs(self):
  108. """配置Swagger API文档"""
  109. # 添加API文档信息接口
  110. @self.app.get("/api/docs/info", tags=["系统状态"])
  111. async def api_info():
  112. """获取API文档信息"""
  113. return {
  114. "title": "Agent API - 施工方案审查系统",
  115. "description": "集成施工方案审查功能的API接口文档",
  116. "version": "0.3",
  117. "docs_urls": {
  118. "swagger_ui": "/docs",
  119. "redoc": "/redoc",
  120. "openapi_json": "/openapi.json"
  121. },
  122. "features": [
  123. "自动生成API文档",
  124. "交互式API测试",
  125. "OpenAPI 3.0规范",
  126. "支持多种认证方式"
  127. ]
  128. }
  129. @self.app.get("/api/docs/health", tags=["系统状态"])
  130. async def docs_health_check():
  131. """API文档健康检查"""
  132. return {
  133. "status": "healthy",
  134. "service": "API Documentation",
  135. "version": "0.3",
  136. "timestamp": datetime.datetime.now().isoformat()
  137. }
  138. class CeleryWorkerManager:
  139. """Celery Worker程序化管理器 - 独立的Celery管理模块"""
  140. def __init__(self, server_utils: ServerUtils = None):
  141. """初始化Celery Worker管理器
  142. Args:
  143. server_utils: 服务器工具类实例
  144. """
  145. self.worker = None
  146. self.is_running = False
  147. self.worker_thread = None
  148. self.shutdown_event = threading.Event()
  149. self.server_utils = server_utils or ServerUtils()
  150. def start_worker(self, **kwargs) -> bool:
  151. """启动Celery Worker
  152. Returns:
  153. bool: 启动是否成功
  154. """
  155. if self.is_running:
  156. server_logger.warning("Celery Worker已在运行")
  157. return True
  158. try:
  159. # 启动前清理残留任务
  160. self._cleanup_redis_tasks("启动前")
  161. # 创建Worker函数
  162. def run_celery_worker():
  163. try:
  164. server_logger.info("Celery Worker开始运行...")
  165. server_logger.info("Worker配置: 并发数=4, 进程池=prefork, 日志输出=终端")
  166. # 配置子进程日志输出
  167. from foundation.observability.logger.loggering import configure_logging_for_subprocess
  168. configure_logging_for_subprocess()
  169. # 构建 Celery worker 参数
  170. worker_args = [
  171. 'worker', # 子命令
  172. '-c', '4', # 并发数:4个worker进程
  173. '-P', 'prefork', # 进程池类型:prefork
  174. '-l', 'info', # 日志级别
  175. '--without-heartbeat', # 禁用心跳(Windows兼容)
  176. '--without-gossip', # 禁用gossip(Windows兼容)
  177. '--without-mingle', # 禁用mingle(Windows兼容)
  178. ]
  179. # Windows 平台额外设置
  180. if sys.platform == 'win32':
  181. server_logger.info("Windows平台 detected,启用兼容性设置")
  182. os.environ['FORKED_BY_MULTIPROCESSING'] = '1'
  183. celery_app.worker_main(worker_args)
  184. except KeyboardInterrupt:
  185. server_logger.info("收到停止信号,Celery Worker退出")
  186. except Exception as e:
  187. server_logger.error(f"Celery Worker运行时出错: {e}")
  188. server_logger.exception("详细错误信息:")
  189. finally:
  190. self.is_running = False
  191. server_logger.info("Celery Worker已停止")
  192. # 在单独线程中启动Worker
  193. self.worker_thread = threading.Thread(target=run_celery_worker, daemon=True)
  194. self.worker_thread.start()
  195. self.is_running = True
  196. # 等待启动
  197. time.sleep(2)
  198. success = self.is_running and self.worker_thread.is_alive()
  199. if success:
  200. server_logger.info("Celery Worker启动成功")
  201. else:
  202. server_logger.error("Celery Worker启动失败")
  203. self.is_running = False
  204. return success
  205. except ImportError as e:
  206. server_logger.error(f"导入Celery失败: {e}")
  207. server_logger.info("请先安装Celery: pip install celery redis")
  208. return False
  209. except Exception as e:
  210. server_logger.error(f"启动Celery Worker失败: {e}")
  211. server_logger.exception("详细错误信息:")
  212. return False
  213. def stop_worker(self, timeout: int = 5) -> bool:
  214. """优雅停止Celery Worker
  215. Args:
  216. timeout: 停止超时时间(秒)
  217. Returns:
  218. bool: 停止是否成功
  219. """
  220. if not self.is_running:
  221. server_logger.info("Celery Worker未运行")
  222. return True
  223. try:
  224. server_logger.info("停止Celery Worker...")
  225. self.shutdown_event.set()
  226. if self.worker_thread and self.worker_thread.is_alive():
  227. # 尝试优雅停止
  228. start_time = time.time()
  229. while self.is_running and (time.time() - start_time) < timeout:
  230. time.sleep(0.1)
  231. if self.is_running:
  232. server_logger.warning("Celery Worker优雅停止超时")
  233. else:
  234. server_logger.info("Celery Worker已优雅停止")
  235. # 停止后清理Redis任务
  236. self._cleanup_redis_tasks("停止时")
  237. self.is_running = False
  238. self.shutdown_event.clear()
  239. return True
  240. except Exception as e:
  241. server_logger.error(f"停止Celery Worker失败: {e}")
  242. return False
  243. def stop_worker_immediately(self) -> bool:
  244. """立即停止Celery Worker,不等待
  245. Returns:
  246. bool: 停止是否成功
  247. """
  248. if not self.is_running:
  249. server_logger.info("Celery Worker未运行")
  250. return True
  251. try:
  252. server_logger.info("立即停止Celery Worker...")
  253. self.shutdown_event.set()
  254. # 发送中断信号
  255. if hasattr(os, 'kill'):
  256. try:
  257. os.kill(os.getpid(), signal.SIGINT)
  258. server_logger.info("已发送中断信号")
  259. except:
  260. pass
  261. # 停止后清理Redis任务
  262. self._cleanup_redis_tasks("立即停止时")
  263. self.is_running = False
  264. self.shutdown_event.clear()
  265. server_logger.info("Celery Worker已立即停止")
  266. return True
  267. except Exception as e:
  268. server_logger.error(f"立即停止Celery Worker失败: {e}")
  269. self.is_running = False
  270. return False
  271. def get_status(self) -> dict:
  272. """获取Worker状态
  273. Returns:
  274. dict: 包含worker状态的字典
  275. """
  276. return {
  277. "is_running": self.is_running,
  278. "thread_alive": self.worker_thread.is_alive() if self.worker_thread else False,
  279. }
  280. def _cleanup_redis_tasks(self, phase: str):
  281. """清理Redis中的Celery任务 - 增强版
  282. Args:
  283. phase: 清理阶段(启动前/停止时/立即停止时)
  284. """
  285. try:
  286. r = self.server_utils.get_redis_connection()
  287. server_logger.info(f"{phase}清理Redis中的Celery任务...")
  288. # 1. 首先停止所有正在运行的Worker(如果是启动前清理)
  289. if phase == "启动前":
  290. self._kill_existing_celery_workers()
  291. # 2. 清理任务相关键
  292. task_keys = r.keys('task:*')
  293. celery_meta_keys = r.keys('celery-task-meta-*')
  294. current_keys = r.keys('current:*')
  295. kombu_keys = r.keys('_kombu.binding.*')
  296. # 3. 清理Celery内部未确认任务队列(关键!)
  297. unacked_keys = r.keys('unacked*') # 未确认的任务
  298. qos_keys = r.keys('celery@*') # QoS/预取相关键
  299. all_keys = (
  300. task_keys + celery_meta_keys + current_keys +
  301. kombu_keys + unacked_keys + qos_keys
  302. )
  303. for key in all_keys:
  304. try:
  305. r.delete(key)
  306. server_logger.debug(f"{phase}清理: {key}")
  307. except Exception as e:
  308. server_logger.warning(f"{phase}清理 {key} 失败: {e}")
  309. # 4. 清理标准队列和优先级队列
  310. queues = ['celery', 'celery.pidbox', 'celeryev']
  311. for queue in queues:
  312. try:
  313. r.delete(queue)
  314. server_logger.debug(f"{phase}清理队列: {queue}")
  315. except Exception as e:
  316. server_logger.warning(f"{phase}清理队列 {queue} 失败: {e}")
  317. # 5. 清理优先级队列(celery~1, celery~2 等)
  318. priority_queues = r.keys('celery~*')
  319. for queue in priority_queues:
  320. try:
  321. r.delete(queue)
  322. server_logger.debug(f"{phase}清理优先级队列: {queue}")
  323. except Exception as e:
  324. server_logger.warning(f"{phase}清理优先级队列 {queue} 失败: {e}")
  325. # 6. 最后验证:确保队列为空
  326. for queue in ['celery']:
  327. try:
  328. queue_len = r.llen(queue)
  329. if queue_len > 0:
  330. server_logger.warning(f"队列 {queue} 仍有 {queue_len} 个任务,强制清空")
  331. r.delete(queue)
  332. except Exception as e:
  333. server_logger.warning(f"验证队列 {queue} 失败: {e}")
  334. if all_keys or priority_queues:
  335. server_logger.info(f"{phase}已清理 {len(all_keys)} 个Redis键和 {len(priority_queues)} 个优先级队列")
  336. else:
  337. server_logger.info(f"{phase}未发现需要清理的残留任务")
  338. # 7. 启动前清理时,等待一小段时间确保Redis同步完成
  339. if phase == "启动前":
  340. time.sleep(0.5)
  341. except Exception as e:
  342. server_logger.error(f"{phase}清理Redis任务失败: {e}")
  343. def _kill_existing_celery_workers(self):
  344. """终止所有现有的Celery Worker进程"""
  345. try:
  346. import subprocess
  347. import platform
  348. system = platform.system()
  349. server_logger.info("检查并终止现有的Celery Worker进程...")
  350. if system == "Windows":
  351. # Windows: 使用 taskkill
  352. try:
  353. # 查找 celery 进程
  354. result = subprocess.run(
  355. ['tasklist', '/FI', 'IMAGENAME eq python.exe', '/FO', 'CSV'],
  356. capture_output=True, text=True
  357. )
  358. if 'celery' in result.stdout.lower():
  359. subprocess.run(['taskkill', '/F', '/IM', 'celery.exe'], capture_output=True)
  360. server_logger.info("已终止现有的Celery Worker进程")
  361. except Exception as e:
  362. server_logger.debug(f"终止Celery进程时出错: {e}")
  363. else:
  364. # Linux/Mac: 使用 pkill
  365. try:
  366. subprocess.run(['pkill', '-f', 'celery worker'], capture_output=True)
  367. server_logger.info("已终止现有的Celery Worker进程")
  368. except Exception as e:
  369. server_logger.debug(f"终止Celery进程时出错: {e}")
  370. # 短暂等待确保进程完全终止
  371. time.sleep(0.5)
  372. except Exception as e:
  373. server_logger.warning(f"终止现有Celery Worker失败: {e}")
  374. def __enter__(self):
  375. return self
  376. def __exit__(self, exc_type, exc_val, exc_tb):
  377. self.stop_worker()
  378. class ApplicationFactory:
  379. """应用工厂类 - 负责创建和配置FastAPI应用"""
  380. def __init__(self):
  381. """初始化应用工厂"""
  382. self.server_utils = ServerUtils()
  383. self.celery_manager = CeleryWorkerManager(self.server_utils)
  384. def create_app(self) -> FastAPI:
  385. """创建FastAPI应用实例
  386. Returns:
  387. FastAPI: 配置完成的应用实例
  388. """
  389. app = FastAPI(
  390. title="Agent API - 施工方案审查系统",
  391. version="0.3",
  392. description="Agent API - 集成施工方案审查功能",
  393. lifespan=lifespan
  394. )
  395. # 使用路由管理器配置应用
  396. route_manager = RouteManager(app)
  397. return app
  398. def create_server_config(self) -> dict:
  399. """创建服务器配置
  400. Returns:
  401. dict: 服务器配置字典
  402. """
  403. # 确保端口号是整数类型
  404. port = config_handler.get('launch', 'LAUNCH_PORT', 8002)
  405. try:
  406. port = int(port)
  407. except (ValueError, TypeError):
  408. port = 8002
  409. return {
  410. 'host': config_handler.get('launch', 'HOST', '0.0.0.0'),
  411. 'port': port,
  412. 'reload': False,
  413. 'with_celery': True
  414. }
  415. # 全局实例
  416. app_factory = ApplicationFactory()
  417. celery_manager = app_factory.celery_manager
  418. # 创建 FastAPI 应用
  419. def create_app() -> FastAPI:
  420. """创建主应用服务"""
  421. app = FastAPI(
  422. title="Agent API - 施工方案审查系统",
  423. version="0.3",
  424. description="Agent API - 集成施工方案审查功能",
  425. lifespan=lifespan
  426. )
  427. # 添加 CORS 中间件
  428. app.add_middleware(
  429. CORSMiddleware,
  430. allow_origins=["*"], # 允许所有的来源
  431. allow_credentials=True,
  432. allow_methods=["*"], # 允许的HTTP方法
  433. allow_headers=["*"], # 允许的请求头
  434. )
  435. # 添加所有路由
  436. app.include_router(test_router)
  437. app.include_router(file_upload_router)
  438. app.include_router(review_results_router)
  439. app.include_router(launch_review_router)
  440. # 施工方案编写路由
  441. app.include_router(outline_router)
  442. # 全局异常处理
  443. @app.exception_handler(HTTPException)
  444. async def http_exception_handler(request, exc):
  445. return JSONResponse(
  446. status_code=exc.status_code,
  447. content=exc.detail
  448. )
  449. # 健康检查
  450. @app.get("/health")
  451. async def health_check():
  452. timestamp = datetime.datetime.now().isoformat()
  453. return {"status": "healthy", "timestamp": timestamp}
  454. # Celery状态检查
  455. @app.get("/celery/status")
  456. async def get_celery_status():
  457. """获取Celery Worker状态"""
  458. global celery_manager
  459. status = celery_manager.get_status()
  460. return {
  461. "celery_worker": status,
  462. "timestamp": datetime.datetime.now().isoformat()
  463. }
  464. return app
  465. def cleanup_redis_before_start():
  466. """启动前清理Redis中的残留Celery任务"""
  467. try:
  468. # 使用统一的Redis连接工具函数
  469. r = server_utils.get_redis_connection()
  470. server_logger.info("清理Redis中的残留Celery任务...")
  471. # 清理所有Celery相关的键,包括更多模式
  472. keys_to_delete = []
  473. for key in r.keys():
  474. key_lower = key.lower()
  475. # 扩展匹配模式,包括你遇到的实际键格式
  476. if any(keyword in key_lower for keyword in [
  477. 'celery', 'task:', 'celery-task', 'kombu', 'current:'
  478. ]):
  479. keys_to_delete.append(key)
  480. # 匹配特定模式
  481. elif key.startswith('celery-task-meta-') or key.startswith('current:'):
  482. keys_to_delete.append(key)
  483. # 临时键
  484. elif key == 't_key':
  485. keys_to_delete.append(key)
  486. # 清理消息队列
  487. try:
  488. # 清理所有Celery队列
  489. queues = ['celery', 'celery.pidbox', 'celeryev']
  490. for queue in queues:
  491. # 删除队列
  492. r.delete(queue)
  493. server_logger.debug(f"已清理队列: {queue}")
  494. # 清理Kombu绑定
  495. kombu_keys = r.keys('_kombu.binding.*')
  496. for key in kombu_keys:
  497. r.delete(key)
  498. server_logger.debug(f"已清理Kombu绑定: {key}")
  499. except Exception as e:
  500. server_logger.warning(f"清理队列失败: {e}")
  501. # 清理识别到的键
  502. if keys_to_delete:
  503. for key in keys_to_delete:
  504. try:
  505. r.delete(key)
  506. server_logger.debug(f"已清理: {key}")
  507. except Exception as e:
  508. server_logger.warning(f"清理 {key} 失败: {e}")
  509. server_logger.info(f"成功清理 {len(keys_to_delete)} 个Redis键")
  510. else:
  511. server_logger.info("没有发现需要清理的残留任务")
  512. # 额外检查:确保关键队列被清空
  513. try:
  514. # 使用FLUSHDB只清空Celery相关的数据,而不是整个数据库
  515. # 这里我们检查是否还有残留,如果有则进行更彻底的清理
  516. remaining_keys = []
  517. for key in r.keys():
  518. if any(pattern in key.lower() for pattern in ['celery', 'kombu']):
  519. remaining_keys.append(key)
  520. if remaining_keys:
  521. server_logger.warning(f"发现 {len(remaining_keys)} 个残留键,进行彻底清理")
  522. for key in remaining_keys:
  523. try:
  524. r.delete(key)
  525. server_logger.debug(f"彻底清理: {key}")
  526. except Exception as e:
  527. server_logger.warning(f"彻底清理 {key} 失败: {e}")
  528. except Exception as e:
  529. server_logger.warning(f"彻底清理检查失败: {e}")
  530. return True
  531. except Exception as e:
  532. server_logger.error(f"清理Redis残留任务失败: {e}")
  533. return False
  534. def start_celery_worker_background():
  535. """在后台启动Celery Worker(异步方式)"""
  536. # 添加调用栈调试
  537. server_logger.info("=== Celery Worker启动调用栈 ===")
  538. for line in traceback.format_stack():
  539. server_logger.debug(f" {line.strip()}")
  540. server_logger.info("=== 调用栈结束 ===")
  541. # 清理逻辑已在 celery_manager.start_worker() 内部的 _cleanup_redis_tasks() 中处理
  542. return celery_manager.start_worker()
  543. def stop_celery_worker():
  544. """停止Celery Worker"""
  545. global celery_manager
  546. # 立即取消所有任务注册(使用DB0,与启动时保持一致)
  547. try:
  548. # 使用统一的Redis连接工具函数
  549. r = server_utils.get_redis_connection()
  550. server_logger.info("停止时清理Redis中的Celery任务...")
  551. # 清理任务相关键
  552. task_keys = r.keys('task:*') # 重复任务检查器的数据
  553. celery_meta_keys = r.keys('celery-task-meta-*')
  554. current_keys = r.keys('current:*')
  555. kombu_keys = r.keys('_kombu.binding.*')
  556. all_keys = task_keys + celery_meta_keys + current_keys + kombu_keys
  557. for key in all_keys:
  558. try:
  559. r.delete(key)
  560. server_logger.debug(f"停止时清理: {key}")
  561. except Exception as e:
  562. server_logger.warning(f"停止时清理 {key} 失败: {e}")
  563. # 清理队列
  564. queues = ['celery', 'celery.pidbox', 'celeryev']
  565. for queue in queues:
  566. try:
  567. r.delete(queue)
  568. server_logger.debug(f"停止时清理队列: {queue}")
  569. except Exception as e:
  570. server_logger.warning(f"停止时清理队列 {queue} 失败: {e}")
  571. server_logger.info(f"停止时已清理 {len(all_keys)} 个Redis键")
  572. except Exception as e:
  573. server_logger.error(f"停止时清理Redis任务失败: {e}")
  574. # 立即停止Worker,不等待
  575. return celery_manager.stop_worker_immediately()
  576. def run_server(host: str = None, port: int = None, reload: bool = False,
  577. with_celery: bool = True):
  578. """运行服务器"""
  579. # 从配置文件获取默认值
  580. if host is None:
  581. host = config_handler.get('launch', 'HOST', '0.0.0.0')
  582. if port is None:
  583. port = config_handler.get('launch', 'LAUNCH_PORT')
  584. if with_celery:
  585. # 启动Celery Worker
  586. start_celery_worker_background()
  587. # 注册退出时的清理函数
  588. import atexit
  589. atexit.register(stop_celery_worker)
  590. # 设置信号处理
  591. def signal_handler(signum, frame):
  592. server_logger.info(f"收到信号 {signum},正在停止服务...")
  593. stop_celery_worker()
  594. sys.exit(0)
  595. # Windows和Unix系统的信号处理
  596. try:
  597. signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
  598. signal.signal(signal.SIGTERM, signal_handler) # 终止信号
  599. except AttributeError:
  600. # Windows可能不支持某些信号
  601. pass
  602. # Windows特有的控制台事件处理
  603. if sys.platform == 'win32':
  604. try:
  605. import win32api
  606. def win32_handler(dwCtrlType):
  607. # 正确的控制台事件常量
  608. CTRL_C_EVENT = 0
  609. CTRL_BREAK_EVENT = 1
  610. CTRL_CLOSE_EVENT = 2
  611. CTRL_SHUTDOWN_EVENT = 6
  612. if dwCtrlType in (CTRL_C_EVENT, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT):
  613. server_logger.info(f"收到Windows控制台事件 {dwCtrlType},正在停止服务...")
  614. stop_celery_worker()
  615. sys.exit(0)
  616. return False
  617. win32api.SetConsoleCtrlHandler(win32_handler, True)
  618. except (ImportError, AttributeError) as e:
  619. # 如果win32api不可用,跳过Windows控制台处理
  620. server_logger.debug(f"Windows控制台事件处理不可用: {e}")
  621. pass
  622. try:
  623. if reload:
  624. # 重载模式需要正确的模块路径
  625. app_import_path = "server.app:app"
  626. uvicorn.run(app_import_path, host=host, port=port, reload=reload)
  627. else:
  628. # 直接运行模式,直接使用app对象
  629. uvicorn.run(app, host=host, port=port)
  630. finally:
  631. if with_celery:
  632. stop_celery_worker()
  633. app = create_app()
  634. server_logger.info(msg="APP init successfully - 集成施工方案审查系统")
  635. class ServerRunner:
  636. """服务器运行器 - 简化的主启动入口"""
  637. def __init__(self, app_factory: ApplicationFactory):
  638. """初始化服务器运行器
  639. Args:
  640. app_factory: 应用工厂实例
  641. """
  642. self.app_factory = app_factory
  643. self.celery_manager = app_factory.celery_manager
  644. def run_server(self, **kwargs):
  645. """运行服务器
  646. Args:
  647. **kwargs: 服务器配置参数
  648. """
  649. # 获取配置
  650. config = self.app_factory.create_server_config()
  651. config.update(kwargs)
  652. host = config.get('host', '0.0.0.0')
  653. port = config.get('port', 8002)
  654. # 确保端口号是整数类型
  655. try:
  656. port = int(port)
  657. except (ValueError, TypeError):
  658. port = 8002
  659. reload = config.get('reload', False)
  660. with_celery = config.get('with_celery', True)
  661. if with_celery:
  662. self._setup_celery_integration()
  663. # 创建应用实例
  664. app = self.app_factory.create_app()
  665. try:
  666. if reload:
  667. app_import_path = "server.app:app"
  668. uvicorn.run(app_import_path, host=host, port=port, reload=reload)
  669. else:
  670. uvicorn.run(app, host=host, port=port)
  671. finally:
  672. if with_celery:
  673. self.celery_manager.stop_worker()
  674. def _setup_celery_integration(self):
  675. """设置Celery集成"""
  676. # 启动Celery Worker
  677. self.celery_manager.start_worker()
  678. # 注册退出处理
  679. import atexit
  680. atexit.register(self.celery_manager.stop_worker_immediately)
  681. # 设置信号处理
  682. self._setup_signal_handlers()
  683. def _setup_signal_handlers(self):
  684. """设置信号处理器"""
  685. def signal_handler(signum, frame):
  686. server_logger.info(f"收到信号 {signum},正在停止服务...")
  687. self.celery_manager.stop_worker_immediately()
  688. sys.exit(0)
  689. # 通用信号处理
  690. try:
  691. signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
  692. signal.signal(signal.SIGTERM, signal_handler) # 终止信号
  693. except AttributeError:
  694. # Windows可能不支持某些信号
  695. pass
  696. # Windows特有处理
  697. if sys.platform == 'win32':
  698. self._setup_windows_signal_handler()
  699. def _setup_windows_signal_handler(self):
  700. """设置Windows信号处理器"""
  701. try:
  702. import win32api
  703. def win32_handler(dwCtrlType):
  704. CTRL_C_EVENT = 0
  705. CTRL_BREAK_EVENT = 1
  706. CTRL_CLOSE_EVENT = 2
  707. CTRL_SHUTDOWN_EVENT = 6
  708. if dwCtrlType in (CTRL_C_EVENT, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT):
  709. server_logger.info(f"收到Windows控制台事件 {dwCtrlType},正在停止服务...")
  710. self.celery_manager.stop_worker_immediately()
  711. sys.exit(0)
  712. return False
  713. win32api.SetConsoleCtrlHandler(win32_handler, True)
  714. except (ImportError, AttributeError) as e:
  715. server_logger.debug(f"Windows控制台事件处理不可用: {e}")
  716. # 创建应用实例和运行器
  717. app = app_factory.create_app()
  718. server_runner = ServerRunner(app_factory)
  719. server_logger.info(msg="APP init successfully - 集成施工方案审查系统")
  720. # 运行Uvicorn服务器
  721. if __name__ == "__main__":
  722. # 使用新的服务器运行器启动
  723. config = app_factory.create_server_config()
  724. server_logger.info(f"Agent API服务启动中...运行在{config['host']}:{config['port']}")
  725. if config['with_celery']:
  726. server_logger.info("Celery Worker: 已集成启动")
  727. else:
  728. server_logger.warning("Celery Worker: 已禁用")
  729. server_runner.run_server(**config)