app.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. import os
  2. import sys
  3. import time
  4. import uvicorn
  5. import datetime
  6. import threading
  7. from pathlib import Path
  8. import argparse
  9. import signal
  10. import traceback
  11. BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  12. sys.path.insert(0, BASE_DIR)
  13. from views import lifespan
  14. from fastapi.middleware.cors import CORSMiddleware
  15. from fastapi import FastAPI, HTTPException
  16. from fastapi.responses import JSONResponse
  17. from foundation.logger.loggering import server_logger
  18. from foundation.base.celery_app import app as celery_app
  19. # 导入所有路由
  20. from views.test_views import test_router
  21. from views.construction_review.file_upload import file_upload_router
  22. from views.construction_review.review_results import review_results_router
  23. from views.construction_review.launch_review import launch_review_router
  24. # 创建 FastAPI 应用
  25. def create_app() -> FastAPI:
  26. """创建主应用服务"""
  27. app = FastAPI(
  28. title="Agent API - 施工方案审查系统",
  29. version="0.3",
  30. description="Agent API - 集成施工方案审查功能",
  31. lifespan=lifespan
  32. )
  33. # 添加 CORS 中间件
  34. app.add_middleware(
  35. CORSMiddleware,
  36. allow_origins=["*"], # 允许所有的来源
  37. allow_credentials=True,
  38. allow_methods=["*"], # 允许的HTTP方法
  39. allow_headers=["*"], # 允许的请求头
  40. )
  41. # 添加所有路由
  42. app.include_router(test_router)
  43. app.include_router(file_upload_router)
  44. app.include_router(review_results_router)
  45. app.include_router(launch_review_router)
  46. # 全局异常处理
  47. @app.exception_handler(HTTPException)
  48. async def http_exception_handler(request, exc):
  49. return JSONResponse(
  50. status_code=exc.status_code,
  51. content=exc.detail
  52. )
  53. # 健康检查
  54. @app.get("/health")
  55. async def health_check():
  56. timestamp = datetime.datetime.now().isoformat()
  57. return {"status": "healthy", "timestamp": timestamp}
  58. # Celery状态检查
  59. @app.get("/celery/status")
  60. async def get_celery_status():
  61. """获取Celery Worker状态"""
  62. global celery_manager
  63. status = celery_manager.get_status()
  64. return {
  65. "celery_worker": status,
  66. "timestamp": datetime.datetime.now().isoformat()
  67. }
  68. # API文档
  69. @app.get("/api/docs")
  70. async def api_docs():
  71. return {
  72. "title": "Agent API服务文档",
  73. "description": "集成施工方案审查功能的API接口文档",
  74. "version": "V.0.3",
  75. "apis": [
  76. {
  77. "name": "测试接口",
  78. "path": "/test",
  79. "method": "GET",
  80. "description": "系统测试接口"
  81. },
  82. {
  83. "name": "文档上传",
  84. "path": "/sgsc/file_upload",
  85. "method": "POST",
  86. "description": "上传施工方案文档"
  87. },
  88. {
  89. "name": "审查启动",
  90. "path": "/sgsc/sse/launch_review",
  91. "method": "POST",
  92. "description": "启动AI审查工作流"
  93. },
  94. {
  95. "name": "进度推送",
  96. "path": "/sgsc/sse/progress/{callback_task_id}",
  97. "method": "GET",
  98. "description": "SSE实时进度推送"
  99. },
  100. {
  101. "name": "结果获取",
  102. "path": "/sgsc/review_results",
  103. "method": "POST",
  104. "description": "获取审查结果"
  105. }
  106. ],
  107. }
  108. return app
  109. # Celery Worker管理器
  110. class CeleryWorkerManager:
  111. """Celery Worker程序化管理器"""
  112. def __init__(self):
  113. self.worker = None
  114. self.is_running = False
  115. self.worker_thread = None
  116. self.shutdown_event = threading.Event()
  117. def start_worker(self, **kwargs):
  118. """启动Celery Worker"""
  119. if self.is_running:
  120. server_logger.warning("Celery Worker已在运行")
  121. return True
  122. try:
  123. # 创建Worker函数
  124. def run_celery_worker():
  125. try:
  126. # 使用最简单的启动方式
  127. server_logger.info("Celery Worker开始运行...")
  128. # 直接启动worker,使用默认配置
  129. celery_app.worker_main(['worker'])
  130. except KeyboardInterrupt:
  131. server_logger.info("收到停止信号,Celery Worker退出")
  132. except Exception as e:
  133. server_logger.error(f"Celery Worker运行时出错: {e}")
  134. server_logger.exception("详细错误信息:")
  135. finally:
  136. self.is_running = False
  137. server_logger.info("Celery Worker已停止")
  138. # 在单独线程中启动Worker
  139. self.worker_thread = threading.Thread(target=run_celery_worker, daemon=True)
  140. self.worker_thread.start()
  141. self.is_running = True
  142. # 等待启动
  143. time.sleep(2)
  144. if self.is_running and self.worker_thread.is_alive():
  145. server_logger.info("Celery Worker启动成功")
  146. return True
  147. else:
  148. server_logger.error("Celery Worker启动失败")
  149. self.is_running = False
  150. return False
  151. except ImportError as e:
  152. server_logger.error(f"导入Celery失败: {e}")
  153. server_logger.info("请先安装Celery: pip install celery redis")
  154. return False
  155. except Exception as e:
  156. server_logger.error(f"启动Celery Worker失败: {e}")
  157. server_logger.exception("详细错误信息:")
  158. return False
  159. def stop_worker(self, timeout: int = 5):
  160. """停止Celery Worker"""
  161. if not self.is_running:
  162. server_logger.info("Celery Worker未运行")
  163. return True
  164. try:
  165. server_logger.info("停止Celery Worker...")
  166. self.shutdown_event.set()
  167. # 发送停止信号给线程
  168. if self.worker_thread and self.worker_thread.is_alive():
  169. # 尝试优雅停止
  170. start_time = time.time()
  171. while self.is_running and (time.time() - start_time) < timeout:
  172. time.sleep(0.1)
  173. # 如果还没停止,记录警告
  174. if self.is_running:
  175. server_logger.warning("Celery Worker优雅停止超时")
  176. else:
  177. server_logger.info("Celery Worker已优雅停止")
  178. self.is_running = False
  179. self.shutdown_event.clear()
  180. return True
  181. except Exception as e:
  182. server_logger.error(f"停止Celery Worker失败: {e}")
  183. return False
  184. def stop_worker_immediately(self):
  185. """立即停止Celery Worker,不等待"""
  186. if not self.is_running:
  187. server_logger.info("Celery Worker未运行")
  188. return True
  189. try:
  190. server_logger.info("立即停止Celery Worker...")
  191. self.shutdown_event.set()
  192. # 设置超时事件,强制停止
  193. import os
  194. # 发送中断信号给当前进程
  195. if hasattr(os, 'kill'):
  196. try:
  197. os.kill(os.getpid(), signal.SIGINT)
  198. server_logger.info("已发送中断信号")
  199. except:
  200. pass
  201. # 立即设置状态为停止
  202. self.is_running = False
  203. self.shutdown_event.clear()
  204. server_logger.info("Celery Worker已立即停止")
  205. return True
  206. except Exception as e:
  207. server_logger.error(f"立即停止Celery Worker失败: {e}")
  208. # 即使失败也要设置状态
  209. self.is_running = False
  210. return False
  211. def get_status(self):
  212. """获取Worker状态"""
  213. return {
  214. "is_running": self.is_running,
  215. "thread_alive": self.worker_thread.is_alive() if self.worker_thread else False,
  216. }
  217. def __enter__(self):
  218. return self
  219. def __exit__(self, exc_type, exc_val, exc_tb):
  220. self.stop_worker()
  221. # 全局Worker管理器实例
  222. celery_manager = CeleryWorkerManager()
  223. def cleanup_redis_before_start():
  224. """启动前清理Redis中的残留Celery任务"""
  225. try:
  226. import redis
  227. from foundation.base.config import config_handler
  228. # 连接Redis
  229. redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
  230. redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
  231. redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
  232. redis_db = config_handler.get('redis', 'REDIS_DB', '0')
  233. if redis_password:
  234. redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}'
  235. else:
  236. redis_url = f'redis://{redis_host}:{redis_port}/{redis_db}'
  237. r = redis.from_url(redis_url, decode_responses=True)
  238. server_logger.info("清理Redis中的残留Celery任务...")
  239. # 清理所有Celery相关的键,包括更多模式
  240. keys_to_delete = []
  241. for key in r.keys():
  242. key_lower = key.lower()
  243. # 扩展匹配模式,包括你遇到的实际键格式
  244. if any(keyword in key_lower for keyword in [
  245. 'celery', 'task:', 'celery-task', 'kombu', 'current:'
  246. ]):
  247. keys_to_delete.append(key)
  248. # 匹配特定模式
  249. elif key.startswith('celery-task-meta-') or key.startswith('current:'):
  250. keys_to_delete.append(key)
  251. # 临时键
  252. elif key == 't_key':
  253. keys_to_delete.append(key)
  254. # 清理消息队列
  255. try:
  256. # 清理所有Celery队列
  257. queues = ['celery', 'celery.pidbox', 'celeryev']
  258. for queue in queues:
  259. # 删除队列
  260. r.delete(queue)
  261. server_logger.debug(f"已清理队列: {queue}")
  262. # 清理Kombu绑定
  263. kombu_keys = r.keys('_kombu.binding.*')
  264. for key in kombu_keys:
  265. r.delete(key)
  266. server_logger.debug(f"已清理Kombu绑定: {key}")
  267. except Exception as e:
  268. server_logger.warning(f"清理队列失败: {e}")
  269. # 清理识别到的键
  270. if keys_to_delete:
  271. for key in keys_to_delete:
  272. try:
  273. r.delete(key)
  274. server_logger.debug(f"已清理: {key}")
  275. except Exception as e:
  276. server_logger.warning(f"清理 {key} 失败: {e}")
  277. server_logger.info(f"成功清理 {len(keys_to_delete)} 个Redis键")
  278. else:
  279. server_logger.info("没有发现需要清理的残留任务")
  280. # 额外检查:确保关键队列被清空
  281. try:
  282. # 使用FLUSHDB只清空Celery相关的数据,而不是整个数据库
  283. # 这里我们检查是否还有残留,如果有则进行更彻底的清理
  284. remaining_keys = []
  285. for key in r.keys():
  286. if any(pattern in key.lower() for pattern in ['celery', 'kombu']):
  287. remaining_keys.append(key)
  288. if remaining_keys:
  289. server_logger.warning(f"发现 {len(remaining_keys)} 个残留键,进行彻底清理")
  290. for key in remaining_keys:
  291. try:
  292. r.delete(key)
  293. server_logger.debug(f"彻底清理: {key}")
  294. except Exception as e:
  295. server_logger.warning(f"彻底清理 {key} 失败: {e}")
  296. except Exception as e:
  297. server_logger.warning(f"彻底清理检查失败: {e}")
  298. return True
  299. except Exception as e:
  300. server_logger.error(f"清理Redis残留任务失败: {e}")
  301. return False
  302. def start_celery_worker_background():
  303. """在后台启动Celery Worker(异步方式)"""
  304. # 启动前清理残留任务
  305. cleanup_redis_before_start()
  306. # 添加调用栈调试
  307. server_logger.info("=== Celery Worker启动调用栈 ===")
  308. for line in traceback.format_stack():
  309. server_logger.debug(f" {line.strip()}")
  310. server_logger.info("=== 调用栈结束 ===")
  311. return celery_manager.start_worker()
  312. def stop_celery_worker():
  313. """停止Celery Worker"""
  314. global celery_manager
  315. # 立即取消所有任务注册(使用DB0,与启动时保持一致)
  316. try:
  317. import redis
  318. from foundation.base.config import config_handler
  319. # 连接Redis(使用配置文件的数据库)
  320. redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
  321. redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
  322. redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
  323. redis_db = config_handler.get('redis', 'REDIS_DB', '0')
  324. if redis_password:
  325. redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/{redis_db}'
  326. else:
  327. redis_url = f'redis://{redis_host}:{redis_port}/{redis_db}'
  328. r = redis.from_url(redis_url, decode_responses=True)
  329. server_logger.info("停止时清理Redis中的Celery任务...")
  330. # 清理任务相关键
  331. task_keys = r.keys('task:*') # 重复任务检查器的数据
  332. celery_meta_keys = r.keys('celery-task-meta-*')
  333. current_keys = r.keys('current:*')
  334. kombu_keys = r.keys('_kombu.binding.*')
  335. all_keys = task_keys + celery_meta_keys + current_keys + kombu_keys
  336. for key in all_keys:
  337. try:
  338. r.delete(key)
  339. server_logger.debug(f"停止时清理: {key}")
  340. except Exception as e:
  341. server_logger.warning(f"停止时清理 {key} 失败: {e}")
  342. # 清理队列
  343. queues = ['celery', 'celery.pidbox', 'celeryev']
  344. for queue in queues:
  345. try:
  346. r.delete(queue)
  347. server_logger.debug(f"停止时清理队列: {queue}")
  348. except Exception as e:
  349. server_logger.warning(f"停止时清理队列 {queue} 失败: {e}")
  350. server_logger.info(f"停止时已清理 {len(all_keys)} 个Redis键")
  351. except Exception as e:
  352. server_logger.error(f"停止时清理Redis任务失败: {e}")
  353. # 立即停止Worker,不等待
  354. return celery_manager.stop_worker_immediately()
  355. def run_server(host: str = "127.0.0.1", port: int = 8002, reload: bool = False,
  356. with_celery: bool = True):
  357. """运行服务器"""
  358. if with_celery:
  359. # 启动Celery Worker
  360. start_celery_worker_background()
  361. # 注册退出时的清理函数
  362. import atexit
  363. atexit.register(stop_celery_worker)
  364. # 设置信号处理
  365. def signal_handler(signum, frame):
  366. server_logger.info(f"收到信号 {signum},正在停止服务...")
  367. stop_celery_worker()
  368. sys.exit(0)
  369. # Windows和Unix系统的信号处理
  370. try:
  371. signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
  372. signal.signal(signal.SIGTERM, signal_handler) # 终止信号
  373. except AttributeError:
  374. # Windows可能不支持某些信号
  375. pass
  376. # Windows特有的控制台事件处理
  377. if sys.platform == 'win32':
  378. try:
  379. import win32api
  380. def win32_handler(dwCtrlType):
  381. # 正确的控制台事件常量
  382. CTRL_C_EVENT = 0
  383. CTRL_BREAK_EVENT = 1
  384. CTRL_CLOSE_EVENT = 2
  385. CTRL_SHUTDOWN_EVENT = 6
  386. if dwCtrlType in (CTRL_C_EVENT, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT):
  387. server_logger.info(f"收到Windows控制台事件 {dwCtrlType},正在停止服务...")
  388. stop_celery_worker()
  389. sys.exit(0)
  390. return False
  391. win32api.SetConsoleCtrlHandler(win32_handler, True)
  392. except (ImportError, AttributeError) as e:
  393. # 如果win32api不可用,跳过Windows控制台处理
  394. server_logger.debug(f"Windows控制台事件处理不可用: {e}")
  395. pass
  396. try:
  397. if reload:
  398. # 重载模式需要正确的模块路径
  399. app_import_path = "server.app:app"
  400. uvicorn.run(app_import_path, host=host, port=port, reload=reload)
  401. else:
  402. # 直接运行模式,直接使用app对象
  403. uvicorn.run(app, host=host, port=port)
  404. finally:
  405. if with_celery:
  406. stop_celery_worker()
  407. # 创建应用实例
  408. app = create_app()
  409. server_logger.info(msg="APP init successfully - 集成施工方案审查系统")
  410. # 运行Uvicorn服务器
  411. if __name__ == "__main__":
  412. parser = argparse.ArgumentParser(description='Agent API - 施工方案审查系统')
  413. parser.add_argument('--host', default='0.0.0.0', help='服务器地址')
  414. parser.add_argument('--port', type=int, default=8001, help='服务器端口')
  415. parser.add_argument('--no-celery', action='store_true', help='不启动Celery Worker')
  416. parser.add_argument('--no-reload', action='store_true', help='关闭热重载')
  417. args = parser.parse_args()
  418. server_logger.info("Agent API服务启动中...")
  419. server_logger.info(f"服务地址: http://{args.host}:{args.port}")
  420. server_logger.info(f"API文档: http://{args.host}:{args.port}/docs")
  421. server_logger.info(f"健康检查: http://{args.host}:{args.port}/health")
  422. server_logger.info(f"API概览: http://{args.host}:{args.port}/api/docs")
  423. if not args.no_celery:
  424. server_logger.info("Celery Worker: 已集成启动")
  425. else:
  426. server_logger.warning("Celery Worker: 已禁用")
  427. run_server(
  428. host=args.host,
  429. port=8002,
  430. reload=False,
  431. with_celery=not args.no_celery
  432. )