app.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. import os
  2. import sys
  3. import time
  4. import uvicorn
  5. import datetime
  6. import threading
  7. from pathlib import Path
  8. import argparse
  9. import signal
  10. import traceback
  11. BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  12. sys.path.insert(0, BASE_DIR)
  13. from views import lifespan
  14. from fastapi.middleware.cors import CORSMiddleware
  15. from fastapi import FastAPI, HTTPException
  16. from fastapi.responses import JSONResponse
  17. from foundation.logger.loggering import server_logger
  18. from foundation.base.celery_app import app as celery_app
  19. # 导入所有路由
  20. from views.test_views import test_router
  21. from views.construction_review.file_upload import file_upload_router
  22. from views.construction_review.review_results import review_results_router
  23. from views.construction_review.launch_review import launch_review_router
  24. from views.construction_review.task_progress import task_progress_router
  25. # 创建 FastAPI 应用
  26. def create_app() -> FastAPI:
  27. """创建主应用服务"""
  28. app = FastAPI(
  29. title="Agent API - 施工方案审查系统",
  30. version="0.3",
  31. description="Agent API - 集成施工方案审查功能",
  32. lifespan=lifespan
  33. )
  34. # 添加 CORS 中间件
  35. app.add_middleware(
  36. CORSMiddleware,
  37. allow_origins=["*"], # 允许所有的来源
  38. allow_credentials=True,
  39. allow_methods=["*"], # 允许的HTTP方法
  40. allow_headers=["*"], # 允许的请求头
  41. )
  42. # 添加所有路由
  43. app.include_router(test_router)
  44. app.include_router(file_upload_router)
  45. app.include_router(review_results_router)
  46. app.include_router(launch_review_router)
  47. app.include_router(task_progress_router)
  48. # 全局异常处理
  49. @app.exception_handler(HTTPException)
  50. async def http_exception_handler(request, exc):
  51. return JSONResponse(
  52. status_code=exc.status_code,
  53. content=exc.detail
  54. )
  55. # 健康检查
  56. @app.get("/health")
  57. async def health_check():
  58. timestamp = datetime.datetime.now().isoformat()
  59. return {"status": "healthy", "timestamp": timestamp}
  60. # Celery状态检查
  61. @app.get("/celery/status")
  62. async def get_celery_status():
  63. """获取Celery Worker状态"""
  64. global celery_manager
  65. status = celery_manager.get_status()
  66. return {
  67. "celery_worker": status,
  68. "timestamp": datetime.datetime.now().isoformat()
  69. }
  70. # API文档
  71. @app.get("/api/docs")
  72. async def api_docs():
  73. return {
  74. "title": "Agent API服务文档",
  75. "description": "集成施工方案审查功能的API接口文档",
  76. "version": "V.0.3",
  77. "apis": [
  78. {
  79. "name": "测试接口",
  80. "path": "/test",
  81. "method": "GET",
  82. "description": "系统测试接口"
  83. },
  84. {
  85. "name": "文档上传",
  86. "path": "/sgsc/file_upload",
  87. "method": "POST",
  88. "description": "上传施工方案文档"
  89. },
  90. {
  91. "name": "审查启动",
  92. "path": "/sgsc/sse/launch_review",
  93. "method": "POST",
  94. "description": "启动AI审查工作流"
  95. },
  96. {
  97. "name": "进度推送",
  98. "path": "/sgsc/sse/progress/{callback_task_id}",
  99. "method": "GET",
  100. "description": "SSE实时进度推送"
  101. },
  102. {
  103. "name": "结果获取",
  104. "path": "/sgsc/review_results",
  105. "method": "POST",
  106. "description": "获取审查结果"
  107. }
  108. ],
  109. }
  110. return app
  111. # Celery Worker管理器
  112. class CeleryWorkerManager:
  113. """Celery Worker程序化管理器"""
  114. def __init__(self):
  115. self.worker = None
  116. self.is_running = False
  117. self.worker_thread = None
  118. self.shutdown_event = threading.Event()
  119. def start_worker(self, **kwargs):
  120. """启动Celery Worker"""
  121. if self.is_running:
  122. server_logger.warning("Celery Worker已在运行")
  123. return True
  124. try:
  125. # 创建Worker函数
  126. def run_celery_worker():
  127. try:
  128. # 使用最简单的启动方式
  129. server_logger.info("Celery Worker开始运行...")
  130. # 直接启动worker,使用默认配置
  131. celery_app.worker_main(['worker'])
  132. except KeyboardInterrupt:
  133. server_logger.info("收到停止信号,Celery Worker退出")
  134. except Exception as e:
  135. server_logger.error(f"Celery Worker运行时出错: {e}")
  136. server_logger.exception("详细错误信息:")
  137. finally:
  138. self.is_running = False
  139. server_logger.info("Celery Worker已停止")
  140. # 在单独线程中启动Worker
  141. self.worker_thread = threading.Thread(target=run_celery_worker, daemon=True)
  142. self.worker_thread.start()
  143. self.is_running = True
  144. # 等待启动
  145. time.sleep(2)
  146. if self.is_running and self.worker_thread.is_alive():
  147. server_logger.info("Celery Worker启动成功")
  148. return True
  149. else:
  150. server_logger.error("Celery Worker启动失败")
  151. self.is_running = False
  152. return False
  153. except ImportError as e:
  154. server_logger.error(f"导入Celery失败: {e}")
  155. server_logger.info("请先安装Celery: pip install celery redis")
  156. return False
  157. except Exception as e:
  158. server_logger.error(f"启动Celery Worker失败: {e}")
  159. server_logger.exception("详细错误信息:")
  160. return False
  161. def stop_worker(self, timeout: int = 5):
  162. """停止Celery Worker"""
  163. if not self.is_running:
  164. server_logger.info("Celery Worker未运行")
  165. return True
  166. try:
  167. server_logger.info("停止Celery Worker...")
  168. self.shutdown_event.set()
  169. # 发送停止信号给线程
  170. if self.worker_thread and self.worker_thread.is_alive():
  171. # 尝试优雅停止
  172. start_time = time.time()
  173. while self.is_running and (time.time() - start_time) < timeout:
  174. time.sleep(0.1)
  175. # 如果还没停止,记录警告
  176. if self.is_running:
  177. server_logger.warning("Celery Worker优雅停止超时")
  178. else:
  179. server_logger.info("Celery Worker已优雅停止")
  180. self.is_running = False
  181. self.shutdown_event.clear()
  182. return True
  183. except Exception as e:
  184. server_logger.error(f"停止Celery Worker失败: {e}")
  185. return False
  186. def stop_worker_immediately(self):
  187. """立即停止Celery Worker,不等待"""
  188. if not self.is_running:
  189. server_logger.info("Celery Worker未运行")
  190. return True
  191. try:
  192. server_logger.info("立即停止Celery Worker...")
  193. self.shutdown_event.set()
  194. # 设置超时事件,强制停止
  195. import os
  196. # 发送中断信号给当前进程
  197. if hasattr(os, 'kill'):
  198. try:
  199. os.kill(os.getpid(), signal.SIGINT)
  200. server_logger.info("已发送中断信号")
  201. except:
  202. pass
  203. # 立即设置状态为停止
  204. self.is_running = False
  205. self.shutdown_event.clear()
  206. server_logger.info("Celery Worker已立即停止")
  207. return True
  208. except Exception as e:
  209. server_logger.error(f"立即停止Celery Worker失败: {e}")
  210. # 即使失败也要设置状态
  211. self.is_running = False
  212. return False
  213. def get_status(self):
  214. """获取Worker状态"""
  215. return {
  216. "is_running": self.is_running,
  217. "thread_alive": self.worker_thread.is_alive() if self.worker_thread else False,
  218. }
  219. def __enter__(self):
  220. return self
  221. def __exit__(self, exc_type, exc_val, exc_tb):
  222. self.stop_worker()
  223. # 全局Worker管理器实例
  224. celery_manager = CeleryWorkerManager()
  225. def cleanup_redis_before_start():
  226. """启动前清理Redis中的残留Celery任务"""
  227. try:
  228. import redis
  229. from foundation.base.config import config_handler
  230. # 连接Redis
  231. redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
  232. redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
  233. redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
  234. if redis_password:
  235. redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/0'
  236. else:
  237. redis_url = f'redis://{redis_host}:{redis_port}/0'
  238. r = redis.from_url(redis_url, decode_responses=True)
  239. server_logger.info("清理Redis中的残留Celery任务...")
  240. # 清理所有Celery相关的键
  241. keys_to_delete = []
  242. for key in r.keys():
  243. if any(keyword in key.lower() for keyword in ['celery', 'task:']):
  244. keys_to_delete.append(key)
  245. if keys_to_delete:
  246. for key in keys_to_delete:
  247. try:
  248. r.delete(key)
  249. server_logger.debug(f"已清理: {key}")
  250. except Exception as e:
  251. server_logger.warning(f"清理 {key} 失败: {e}")
  252. server_logger.info(f"成功清理 {len(keys_to_delete)} 个Redis键")
  253. else:
  254. server_logger.info("没有发现需要清理的残留任务")
  255. return True
  256. except Exception as e:
  257. server_logger.error(f"清理Redis残留任务失败: {e}")
  258. return False
  259. def start_celery_worker_background():
  260. """在后台启动Celery Worker(异步方式)"""
  261. # 启动前清理残留任务
  262. cleanup_redis_before_start()
  263. # 添加调用栈调试
  264. server_logger.info("=== Celery Worker启动调用栈 ===")
  265. for line in traceback.format_stack():
  266. server_logger.debug(f" {line.strip()}")
  267. server_logger.info("=== 调用栈结束 ===")
  268. return celery_manager.start_worker()
  269. def stop_celery_worker():
  270. """停止Celery Worker"""
  271. global celery_manager
  272. # 立即取消所有任务注册
  273. try:
  274. import redis
  275. from foundation.base.config import config_handler
  276. # 连接Redis
  277. redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
  278. redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
  279. redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
  280. if redis_password:
  281. redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/2'
  282. else:
  283. redis_url = f'redis://{redis_host}:{redis_port}/2'
  284. r = redis.from_url(redis_url, decode_responses=True)
  285. # 清理所有任务注册
  286. task_keys = r.keys('task:*')
  287. for key in task_keys:
  288. r.delete(key)
  289. server_logger.info(f"取消任务注册: {key}")
  290. server_logger.info(f"已取消 {len(task_keys)} 个任务注册")
  291. except Exception as e:
  292. server_logger.error(f"取消任务注册失败: {e}")
  293. # 立即停止Worker,不等待
  294. return celery_manager.stop_worker_immediately()
  295. def run_server(host: str = "127.0.0.1", port: int = 8001, reload: bool = False,
  296. with_celery: bool = True):
  297. """运行服务器"""
  298. if with_celery:
  299. # 启动Celery Worker
  300. start_celery_worker_background()
  301. # 注册退出时的清理函数
  302. import atexit
  303. atexit.register(stop_celery_worker)
  304. # 设置信号处理
  305. def signal_handler(signum, frame):
  306. server_logger.info(f"收到信号 {signum},正在停止服务...")
  307. stop_celery_worker()
  308. sys.exit(0)
  309. # Windows和Unix系统的信号处理
  310. try:
  311. signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
  312. signal.signal(signal.SIGTERM, signal_handler) # 终止信号
  313. except AttributeError:
  314. # Windows可能不支持某些信号
  315. pass
  316. # Windows特有的控制台事件处理
  317. if sys.platform == 'win32':
  318. try:
  319. import win32api
  320. def win32_handler(dwCtrlType):
  321. # 正确的控制台事件常量
  322. CTRL_C_EVENT = 0
  323. CTRL_BREAK_EVENT = 1
  324. CTRL_CLOSE_EVENT = 2
  325. CTRL_SHUTDOWN_EVENT = 6
  326. if dwCtrlType in (CTRL_C_EVENT, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT):
  327. server_logger.info(f"收到Windows控制台事件 {dwCtrlType},正在停止服务...")
  328. stop_celery_worker()
  329. sys.exit(0)
  330. return False
  331. win32api.SetConsoleCtrlHandler(win32_handler, True)
  332. except (ImportError, AttributeError) as e:
  333. # 如果win32api不可用,跳过Windows控制台处理
  334. server_logger.debug(f"Windows控制台事件处理不可用: {e}")
  335. pass
  336. try:
  337. if reload:
  338. # 重载模式需要正确的模块路径
  339. app_import_path = "server.app:app"
  340. uvicorn.run(app_import_path, host=host, port=port, reload=reload)
  341. else:
  342. # 直接运行模式,直接使用app对象
  343. uvicorn.run(app, host=host, port=port)
  344. finally:
  345. if with_celery:
  346. stop_celery_worker()
  347. # 创建应用实例
  348. app = create_app()
  349. server_logger.info(msg="APP init successfully - 集成施工方案审查系统")
  350. # 运行Uvicorn服务器
  351. if __name__ == "__main__":
  352. parser = argparse.ArgumentParser(description='Agent API - 施工方案审查系统')
  353. parser.add_argument('--host', default='0.0.0.0', help='服务器地址')
  354. parser.add_argument('--port', type=int, default=8001, help='服务器端口')
  355. parser.add_argument('--no-celery', action='store_true', help='不启动Celery Worker')
  356. parser.add_argument('--no-reload', action='store_true', help='关闭热重载')
  357. args = parser.parse_args()
  358. server_logger.info("Agent API服务启动中...")
  359. server_logger.info(f"服务地址: http://{args.host}:{args.port}")
  360. server_logger.info(f"API文档: http://{args.host}:{args.port}/docs")
  361. server_logger.info(f"健康检查: http://{args.host}:{args.port}/health")
  362. server_logger.info(f"API概览: http://{args.host}:{args.port}/api/docs")
  363. if not args.no_celery:
  364. server_logger.info("Celery Worker: 已集成启动")
  365. else:
  366. server_logger.warning("Celery Worker: 已禁用")
  367. run_server(
  368. host=args.host,
  369. port=args.port,
  370. reload=False,
  371. with_celery=not args.no_celery
  372. )