app.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. """
  2. 服务主应用
  3. 整合所有接口,提供统一的测试服务
  4. """
  5. import sys
  6. import time
  7. import uvicorn
  8. import datetime
  9. import threading
  10. from pathlib import Path
  11. root_dir = Path(__file__).parent.parent.parent
  12. sys.path.append(str(root_dir))
  13. from fastapi import FastAPI, HTTPException
  14. from fastapi.responses import JSONResponse
  15. from fastapi.middleware.cors import CORSMiddleware
  16. from foundation.base.celery_app import app as celery_app
  17. from foundation.logger.loggering import server_logger as logger
  18. from views.construction_review.file_upload import file_upload_router
  19. from views.construction_review.review_results import review_results_router
  20. from views.construction_review.launch_review import launch_review_router
  21. def create_app() -> FastAPI:
  22. """创建接口服务"""
  23. app = FastAPI(
  24. title="施工方案审查API服务",
  25. description="用于前端开发和接口联调服务",
  26. version="0.0.1"
  27. )
  28. # 添加CORS中间件(允许前端访问)
  29. app.add_middleware(
  30. CORSMiddleware,
  31. allow_origins=["*"], # 在生产环境中应该设置具体的域名
  32. allow_credentials=True,
  33. allow_methods=["*"],
  34. allow_headers=["*"],
  35. )
  36. # 添加路由
  37. app.include_router(file_upload_router)
  38. app.include_router(review_results_router)
  39. app.include_router(launch_review_router)
  40. # 全局异常处理
  41. @app.exception_handler(HTTPException)
  42. async def http_exception_handler(request, exc):
  43. return JSONResponse(
  44. status_code=exc.status_code,
  45. content=exc.detail
  46. )
  47. # 健康检查
  48. @app.get("/health")
  49. async def health_check():
  50. timestamp = datetime.datetime.now().isoformat()
  51. return {"status": "healthy", "timestamp": timestamp}
  52. # Celery状态检查
  53. @app.get("/celery/status")
  54. async def get_celery_status():
  55. """获取Celery Worker状态"""
  56. global celery_manager
  57. status = celery_manager.get_status()
  58. return {
  59. "celery_worker": status,
  60. "timestamp": datetime.datetime.now().isoformat()
  61. }
  62. # API文档
  63. @app.get("/api/docs")
  64. async def api_docs():
  65. return {
  66. "title": "施工方案审查服务API文档",
  67. "description": "API接口文档",
  68. "version": "V.0.1",
  69. "apis": [
  70. {
  71. "name": "文档上传",
  72. "path": "/sgsc/file_upload",
  73. "method": "POST",
  74. "description": "上传施工方案文档"
  75. },
  76. {
  77. "name": "审查启动",
  78. "path": "/sgsc/launch_review",
  79. "method": "POST",
  80. "description": "启动AI审查工作流"
  81. },
  82. {
  83. "name": "结果获取",
  84. "path": "/sgsc/review_results",
  85. "method": "POST",
  86. "description": "获取审查结果"
  87. }
  88. ],
  89. }
  90. return app
  91. # Celery Worker管理器
  92. class CeleryWorkerManager:
  93. """Celery Worker程序化管理器"""
  94. def __init__(self):
  95. self.worker = None
  96. self.is_running = False
  97. self.worker_thread = None
  98. self.shutdown_event = threading.Event()
  99. def start_worker(self, **kwargs):
  100. """启动Celery Worker"""
  101. if self.is_running:
  102. logger.warning("Celery Worker已在运行")
  103. return True
  104. try:
  105. # 创建Worker函数
  106. def run_celery_worker():
  107. try:
  108. # 使用最简单的启动方式
  109. logger.info("Celery Worker开始运行...")
  110. # 直接启动worker,使用默认配置
  111. celery_app.worker_main(['worker'])
  112. except KeyboardInterrupt:
  113. logger.info("收到停止信号,Celery Worker退出")
  114. except Exception as e:
  115. logger.error(f"Celery Worker运行时出错: {e}")
  116. logger.exception("详细错误信息:")
  117. finally:
  118. self.is_running = False
  119. logger.info("Celery Worker已停止")
  120. # 在单独线程中启动Worker
  121. self.worker_thread = threading.Thread(target=run_celery_worker, daemon=True)
  122. self.worker_thread.start()
  123. self.is_running = True
  124. # 等待启动
  125. time.sleep(2)
  126. if self.is_running and self.worker_thread.is_alive():
  127. logger.info("Celery Worker启动成功")
  128. return True
  129. else:
  130. logger.error("Celery Worker启动失败")
  131. self.is_running = False
  132. return False
  133. except ImportError as e:
  134. logger.error(f"导入Celery失败: {e}")
  135. logger.info("请先安装Celery: pip install celery redis")
  136. return False
  137. except Exception as e:
  138. logger.error(f"启动Celery Worker失败: {e}")
  139. logger.exception("详细错误信息:")
  140. return False
  141. def stop_worker(self, timeout: int = 5):
  142. """停止Celery Worker"""
  143. if not self.is_running:
  144. logger.info("Celery Worker未运行")
  145. return True
  146. try:
  147. logger.info("停止Celery Worker...")
  148. self.shutdown_event.set()
  149. # 发送停止信号给线程
  150. if self.worker_thread and self.worker_thread.is_alive():
  151. # 尝试优雅停止
  152. start_time = time.time()
  153. while self.is_running and (time.time() - start_time) < timeout:
  154. time.sleep(0.1)
  155. # 如果还没停止,记录警告
  156. if self.is_running:
  157. logger.warning("Celery Worker优雅停止超时")
  158. else:
  159. logger.info("Celery Worker已优雅停止")
  160. self.is_running = False
  161. self.shutdown_event.clear()
  162. return True
  163. except Exception as e:
  164. logger.error(f"停止Celery Worker失败: {e}")
  165. return False
  166. def stop_worker_immediately(self):
  167. """立即停止Celery Worker,不等待"""
  168. if not self.is_running:
  169. logger.info("Celery Worker未运行")
  170. return True
  171. try:
  172. logger.info("立即停止Celery Worker...")
  173. self.shutdown_event.set()
  174. # 设置超时事件,强制停止
  175. import signal
  176. import os
  177. # 发送中断信号给当前进程
  178. if hasattr(os, 'kill'):
  179. try:
  180. os.kill(os.getpid(), signal.SIGINT)
  181. logger.info("已发送中断信号")
  182. except:
  183. pass
  184. # 立即设置状态为停止
  185. self.is_running = False
  186. self.shutdown_event.clear()
  187. logger.info("Celery Worker已立即停止")
  188. return True
  189. except Exception as e:
  190. logger.error(f"立即停止Celery Worker失败: {e}")
  191. # 即使失败也要设置状态
  192. self.is_running = False
  193. return False
  194. def get_status(self):
  195. """获取Worker状态"""
  196. return {
  197. "is_running": self.is_running,
  198. "thread_alive": self.worker_thread.is_alive() if self.worker_thread else False,
  199. }
  200. def __enter__(self):
  201. return self
  202. def __exit__(self, exc_type, exc_val, exc_tb):
  203. self.stop_worker()
  204. # 全局Worker管理器实例
  205. celery_manager = CeleryWorkerManager()
  206. def start_celery_worker():
  207. """启动Celery Worker(同步方式,用于测试)"""
  208. return celery_manager.start_worker()
  209. def cleanup_redis_before_start():
  210. """启动前清理Redis中的残留Celery任务"""
  211. try:
  212. import redis
  213. from foundation.base.config import config_handler
  214. # 连接Redis
  215. redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
  216. redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
  217. redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
  218. if redis_password:
  219. redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/0'
  220. else:
  221. redis_url = f'redis://{redis_host}:{redis_port}/0'
  222. r = redis.from_url(redis_url, decode_responses=True)
  223. logger.info("清理Redis中的残留Celery任务...")
  224. # 清理所有Celery相关的键
  225. keys_to_delete = []
  226. for key in r.keys():
  227. if any(keyword in key.lower() for keyword in ['celery', 'task:']):
  228. keys_to_delete.append(key)
  229. if keys_to_delete:
  230. for key in keys_to_delete:
  231. try:
  232. r.delete(key)
  233. logger.debug(f"已清理: {key}")
  234. except Exception as e:
  235. logger.warning(f"清理 {key} 失败: {e}")
  236. logger.info(f"成功清理 {len(keys_to_delete)} 个Redis键")
  237. else:
  238. logger.info("没有发现需要清理的残留任务")
  239. return True
  240. except Exception as e:
  241. logger.error(f"清理Redis残留任务失败: {e}")
  242. return False
  243. def start_celery_worker_background():
  244. """在后台启动Celery Worker(异步方式)"""
  245. # 启动前清理残留任务
  246. cleanup_redis_before_start()
  247. # 添加调用栈调试
  248. import traceback
  249. logger.info("=== Celery Worker启动调用栈 ===")
  250. for line in traceback.format_stack():
  251. logger.debug(f" {line.strip()}")
  252. logger.info("=== 调用栈结束 ===")
  253. return celery_manager.start_worker()
  254. def stop_celery_worker():
  255. """停止Celery Worker"""
  256. global celery_manager
  257. # 立即取消所有任务注册
  258. try:
  259. import redis
  260. from foundation.base.config import config_handler
  261. # 连接Redis
  262. redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
  263. redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
  264. redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
  265. if redis_password:
  266. redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/2'
  267. else:
  268. redis_url = f'redis://{redis_host}:{redis_port}/2'
  269. r = redis.from_url(redis_url, decode_responses=True)
  270. # 清理所有任务注册
  271. task_keys = r.keys('task:*')
  272. for key in task_keys:
  273. r.delete(key)
  274. logger.info(f"取消任务注册: {key}")
  275. logger.info(f"已取消 {len(task_keys)} 个任务注册")
  276. except Exception as e:
  277. logger.error(f"取消任务注册失败: {e}")
  278. # 立即停止Worker,不等待
  279. return celery_manager.stop_worker_immediately()
  280. def run_server(host: str = "127.0.0.1", port: int = 8034, reload: bool = False,
  281. with_celery: bool = True):
  282. """运行服务器"""
  283. if with_celery:
  284. # 启动Celery Worker
  285. start_celery_worker_background()
  286. # 注册退出时的清理函数
  287. import atexit
  288. atexit.register(stop_celery_worker)
  289. # 设置信号处理
  290. import signal
  291. def signal_handler(signum, frame):
  292. logger.info(f"收到信号 {signum},正在停止服务...")
  293. stop_celery_worker()
  294. sys.exit(0)
  295. # Windows和Unix系统的信号处理
  296. try:
  297. signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
  298. signal.signal(signal.SIGTERM, signal_handler) # 终止信号
  299. except AttributeError:
  300. # Windows可能不支持某些信号
  301. pass
  302. # Windows特有的控制台事件处理
  303. if sys.platform == 'win32':
  304. try:
  305. import win32api
  306. def win32_handler(dwCtrlType):
  307. # 正确的控制台事件常量
  308. CTRL_C_EVENT = 0
  309. CTRL_BREAK_EVENT = 1
  310. CTRL_CLOSE_EVENT = 2
  311. CTRL_SHUTDOWN_EVENT = 6
  312. if dwCtrlType in (CTRL_C_EVENT, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT):
  313. logger.info(f"收到Windows控制台事件 {dwCtrlType},正在停止服务...")
  314. stop_celery_worker()
  315. sys.exit(0)
  316. return False
  317. win32api.SetConsoleCtrlHandler(win32_handler, True)
  318. except (ImportError, AttributeError) as e:
  319. # 如果win32api不可用,跳过Windows控制台处理
  320. logger.debug(f"Windows控制台事件处理不可用: {e}")
  321. pass
  322. try:
  323. if reload:
  324. # 重载模式需要正确的模块路径
  325. app_import_path = "views.construction_review.app:app"
  326. uvicorn.run(app_import_path, host=host, port=port, reload=reload)
  327. else:
  328. # 直接运行模式,直接使用app对象
  329. uvicorn.run(app, host=host, port=port)
  330. finally:
  331. if with_celery:
  332. stop_celery_worker()
  333. app = create_app()
  334. if __name__ == "__main__":
  335. import argparse
  336. parser = argparse.ArgumentParser(description='施工方案审查API服务')
  337. parser.add_argument('--host', default='127.0.0.1', help='服务器地址')
  338. parser.add_argument('--port', type=int, default=8035, help='服务器端口')
  339. parser.add_argument('--no-celery', action='store_true', help='不启动Celery Worker')
  340. parser.add_argument('--no-reload', action='store_true', help='关闭热重载')
  341. args = parser.parse_args()
  342. logger.info("施工方案审查API服务启动中...")
  343. logger.info(f"服务地址: http://{args.host}:{args.port}")
  344. logger.info(f"API文档: http://{args.host}:{args.port}/docs")
  345. logger.info(f"健康检查: http://{args.host}:{args.port}/health")
  346. if not args.no_celery:
  347. logger.info("Celery Worker: 已集成启动")
  348. else:
  349. logger.warning("Celery Worker: 已禁用")
  350. run_server(
  351. host=args.host,
  352. port=args.port,
  353. reload=False,
  354. with_celery=not args.no_celery
  355. )