app.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. """
  2. 服务主应用
  3. 整合所有接口,提供统一的测试服务
  4. """
  5. import sys
  6. import time
  7. import uvicorn
  8. import datetime
  9. import threading
  10. from pathlib import Path
  11. root_dir = Path(__file__).parent.parent.parent
  12. sys.path.append(str(root_dir))
  13. from fastapi import FastAPI, HTTPException
  14. from fastapi.responses import JSONResponse
  15. from fastapi.middleware.cors import CORSMiddleware
  16. from foundation.base.celery_app import app as celery_app
  17. from foundation.logger.loggering import server_logger as logger
  18. from views.construction_review.file_upload import file_upload_router
  19. from views.construction_review.task_progress import task_progress_router
  20. from views.construction_review.review_results import review_results_router
  21. from views.construction_review.launch_review import launch_review_router
  22. def create_app() -> FastAPI:
  23. """创建接口服务"""
  24. app = FastAPI(
  25. title="施工方案审查API服务",
  26. description="用于前端开发和接口联调服务",
  27. version="0.0.1"
  28. )
  29. # 添加CORS中间件(允许前端访问)
  30. app.add_middleware(
  31. CORSMiddleware,
  32. allow_origins=["*"], # 在生产环境中应该设置具体的域名
  33. allow_credentials=True,
  34. allow_methods=["*"],
  35. allow_headers=["*"],
  36. )
  37. # 添加路由
  38. app.include_router(file_upload_router)
  39. app.include_router(task_progress_router)
  40. app.include_router(review_results_router)
  41. app.include_router(launch_review_router)
  42. # 全局异常处理
  43. @app.exception_handler(HTTPException)
  44. async def http_exception_handler(request, exc):
  45. return JSONResponse(
  46. status_code=exc.status_code,
  47. content=exc.detail
  48. )
  49. # 健康检查
  50. @app.get("/health")
  51. async def health_check():
  52. timestamp = datetime.datetime.now().isoformat()
  53. return {"status": "healthy", "timestamp": timestamp}
  54. # Celery状态检查
  55. @app.get("/celery/status")
  56. async def get_celery_status():
  57. """获取Celery Worker状态"""
  58. global celery_manager
  59. status = celery_manager.get_status()
  60. return {
  61. "celery_worker": status,
  62. "timestamp": datetime.datetime.now().isoformat()
  63. }
  64. # API文档
  65. @app.get("/api/docs")
  66. async def api_docs():
  67. return {
  68. "title": "施工方案审查服务API文档",
  69. "description": "API接口文档",
  70. "version": "V.0.1",
  71. "apis": [
  72. {
  73. "name": "文档上传",
  74. "path": "/sgsc/file_upload",
  75. "method": "POST",
  76. "description": "上传施工方案文档"
  77. },
  78. {
  79. "name": "审查启动",
  80. "path": "/sgsc/launch_review",
  81. "method": "POST",
  82. "description": "启动AI审查工作流"
  83. },
  84. {
  85. "name": "进度查询",
  86. "path": "/sgsc/task_progress/{callback_task_id}",
  87. "method": "GET",
  88. "description": "查询审查任务进度"
  89. },
  90. {
  91. "name": "结果获取",
  92. "path": "/sgsc/review_results",
  93. "method": "POST",
  94. "description": "获取审查结果"
  95. }
  96. ],
  97. }
  98. return app
  99. # Celery Worker管理器
  100. class CeleryWorkerManager:
  101. """Celery Worker程序化管理器"""
  102. def __init__(self):
  103. self.worker = None
  104. self.is_running = False
  105. self.worker_thread = None
  106. self.shutdown_event = threading.Event()
  107. def start_worker(self, **kwargs):
  108. """启动Celery Worker"""
  109. if self.is_running:
  110. logger.warning("Celery Worker已在运行")
  111. return True
  112. try:
  113. # 创建Worker函数
  114. def run_celery_worker():
  115. try:
  116. # 使用最简单的启动方式
  117. logger.info("Celery Worker开始运行...")
  118. # 直接启动worker,使用默认配置
  119. celery_app.worker_main(['worker'])
  120. except KeyboardInterrupt:
  121. logger.info("收到停止信号,Celery Worker退出")
  122. except Exception as e:
  123. logger.error(f"Celery Worker运行时出错: {e}")
  124. logger.exception("详细错误信息:")
  125. finally:
  126. self.is_running = False
  127. logger.info("Celery Worker已停止")
  128. # 在单独线程中启动Worker
  129. self.worker_thread = threading.Thread(target=run_celery_worker, daemon=True)
  130. self.worker_thread.start()
  131. self.is_running = True
  132. # 等待启动
  133. time.sleep(2)
  134. if self.is_running and self.worker_thread.is_alive():
  135. logger.info("Celery Worker启动成功")
  136. return True
  137. else:
  138. logger.error("Celery Worker启动失败")
  139. self.is_running = False
  140. return False
  141. except ImportError as e:
  142. logger.error(f"导入Celery失败: {e}")
  143. logger.info("请先安装Celery: pip install celery redis")
  144. return False
  145. except Exception as e:
  146. logger.error(f"启动Celery Worker失败: {e}")
  147. logger.exception("详细错误信息:")
  148. return False
  149. def stop_worker(self, timeout: int = 5):
  150. """停止Celery Worker"""
  151. if not self.is_running:
  152. logger.info("Celery Worker未运行")
  153. return True
  154. try:
  155. logger.info("停止Celery Worker...")
  156. self.shutdown_event.set()
  157. # 发送停止信号给线程
  158. if self.worker_thread and self.worker_thread.is_alive():
  159. # 尝试优雅停止
  160. start_time = time.time()
  161. while self.is_running and (time.time() - start_time) < timeout:
  162. time.sleep(0.1)
  163. # 如果还没停止,记录警告
  164. if self.is_running:
  165. logger.warning("Celery Worker优雅停止超时")
  166. else:
  167. logger.info("Celery Worker已优雅停止")
  168. self.is_running = False
  169. self.shutdown_event.clear()
  170. return True
  171. except Exception as e:
  172. logger.error(f"停止Celery Worker失败: {e}")
  173. return False
  174. def stop_worker_immediately(self):
  175. """立即停止Celery Worker,不等待"""
  176. if not self.is_running:
  177. logger.info("Celery Worker未运行")
  178. return True
  179. try:
  180. logger.info("立即停止Celery Worker...")
  181. self.shutdown_event.set()
  182. # 设置超时事件,强制停止
  183. import signal
  184. import os
  185. # 发送中断信号给当前进程
  186. if hasattr(os, 'kill'):
  187. try:
  188. os.kill(os.getpid(), signal.SIGINT)
  189. logger.info("已发送中断信号")
  190. except:
  191. pass
  192. # 立即设置状态为停止
  193. self.is_running = False
  194. self.shutdown_event.clear()
  195. logger.info("Celery Worker已立即停止")
  196. return True
  197. except Exception as e:
  198. logger.error(f"立即停止Celery Worker失败: {e}")
  199. # 即使失败也要设置状态
  200. self.is_running = False
  201. return False
  202. def get_status(self):
  203. """获取Worker状态"""
  204. return {
  205. "is_running": self.is_running,
  206. "thread_alive": self.worker_thread.is_alive() if self.worker_thread else False,
  207. }
  208. def __enter__(self):
  209. return self
  210. def __exit__(self, exc_type, exc_val, exc_tb):
  211. self.stop_worker()
  212. # 全局Worker管理器实例
  213. celery_manager = CeleryWorkerManager()
  214. def start_celery_worker():
  215. """启动Celery Worker(同步方式,用于测试)"""
  216. return celery_manager.start_worker()
  217. def cleanup_redis_before_start():
  218. """启动前清理Redis中的残留Celery任务"""
  219. try:
  220. import redis
  221. from foundation.base.config import config_handler
  222. # 连接Redis
  223. redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
  224. redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
  225. redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
  226. if redis_password:
  227. redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/0'
  228. else:
  229. redis_url = f'redis://{redis_host}:{redis_port}/0'
  230. r = redis.from_url(redis_url, decode_responses=True)
  231. logger.info("清理Redis中的残留Celery任务...")
  232. # 清理所有Celery相关的键
  233. keys_to_delete = []
  234. for key in r.keys():
  235. if any(keyword in key.lower() for keyword in ['celery', 'task:']):
  236. keys_to_delete.append(key)
  237. if keys_to_delete:
  238. for key in keys_to_delete:
  239. try:
  240. r.delete(key)
  241. logger.debug(f"已清理: {key}")
  242. except Exception as e:
  243. logger.warning(f"清理 {key} 失败: {e}")
  244. logger.info(f"成功清理 {len(keys_to_delete)} 个Redis键")
  245. else:
  246. logger.info("没有发现需要清理的残留任务")
  247. return True
  248. except Exception as e:
  249. logger.error(f"清理Redis残留任务失败: {e}")
  250. return False
  251. def start_celery_worker_background():
  252. """在后台启动Celery Worker(异步方式)"""
  253. # 启动前清理残留任务
  254. cleanup_redis_before_start()
  255. # 添加调用栈调试
  256. import traceback
  257. logger.info("=== Celery Worker启动调用栈 ===")
  258. for line in traceback.format_stack():
  259. logger.debug(f" {line.strip()}")
  260. logger.info("=== 调用栈结束 ===")
  261. return celery_manager.start_worker()
  262. def stop_celery_worker():
  263. """停止Celery Worker"""
  264. global celery_manager
  265. # 立即取消所有任务注册
  266. try:
  267. import redis
  268. from foundation.base.config import config_handler
  269. # 连接Redis
  270. redis_host = config_handler.get('redis', 'REDIS_HOST', 'localhost')
  271. redis_port = config_handler.get('redis', 'REDIS_PORT', '6379')
  272. redis_password = config_handler.get('redis', 'REDIS_PASSWORD', '')
  273. if redis_password:
  274. redis_url = f'redis://:{redis_password}@{redis_host}:{redis_port}/2'
  275. else:
  276. redis_url = f'redis://{redis_host}:{redis_port}/2'
  277. r = redis.from_url(redis_url, decode_responses=True)
  278. # 清理所有任务注册
  279. task_keys = r.keys('task:*')
  280. for key in task_keys:
  281. r.delete(key)
  282. logger.info(f"取消任务注册: {key}")
  283. logger.info(f"已取消 {len(task_keys)} 个任务注册")
  284. except Exception as e:
  285. logger.error(f"取消任务注册失败: {e}")
  286. # 立即停止Worker,不等待
  287. return celery_manager.stop_worker_immediately()
  288. def run_server(host: str = "127.0.0.1", port: int = 8034, reload: bool = False,
  289. with_celery: bool = True):
  290. """运行服务器"""
  291. if with_celery:
  292. # 启动Celery Worker
  293. start_celery_worker_background()
  294. # 注册退出时的清理函数
  295. import atexit
  296. atexit.register(stop_celery_worker)
  297. # 设置信号处理
  298. import signal
  299. def signal_handler(signum, frame):
  300. logger.info(f"收到信号 {signum},正在停止服务...")
  301. stop_celery_worker()
  302. sys.exit(0)
  303. # Windows和Unix系统的信号处理
  304. try:
  305. signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
  306. signal.signal(signal.SIGTERM, signal_handler) # 终止信号
  307. except AttributeError:
  308. # Windows可能不支持某些信号
  309. pass
  310. # Windows特有的控制台事件处理
  311. if sys.platform == 'win32':
  312. try:
  313. import win32api
  314. def win32_handler(dwCtrlType):
  315. # 正确的控制台事件常量
  316. CTRL_C_EVENT = 0
  317. CTRL_BREAK_EVENT = 1
  318. CTRL_CLOSE_EVENT = 2
  319. CTRL_SHUTDOWN_EVENT = 6
  320. if dwCtrlType in (CTRL_C_EVENT, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_SHUTDOWN_EVENT):
  321. logger.info(f"收到Windows控制台事件 {dwCtrlType},正在停止服务...")
  322. stop_celery_worker()
  323. sys.exit(0)
  324. return False
  325. win32api.SetConsoleCtrlHandler(win32_handler, True)
  326. except (ImportError, AttributeError) as e:
  327. # 如果win32api不可用,跳过Windows控制台处理
  328. logger.debug(f"Windows控制台事件处理不可用: {e}")
  329. pass
  330. try:
  331. if reload:
  332. # 重载模式需要正确的模块路径
  333. app_import_path = "views.construction_review.app:app"
  334. uvicorn.run(app_import_path, host=host, port=port, reload=reload)
  335. else:
  336. # 直接运行模式,直接使用app对象
  337. uvicorn.run(app, host=host, port=port)
  338. finally:
  339. if with_celery:
  340. stop_celery_worker()
  341. app = create_app()
  342. if __name__ == "__main__":
  343. import argparse
  344. parser = argparse.ArgumentParser(description='施工方案审查API服务')
  345. parser.add_argument('--host', default='127.0.0.1', help='服务器地址')
  346. parser.add_argument('--port', type=int, default=8035, help='服务器端口')
  347. parser.add_argument('--no-celery', action='store_true', help='不启动Celery Worker')
  348. parser.add_argument('--no-reload', action='store_true', help='关闭热重载')
  349. args = parser.parse_args()
  350. logger.info("施工方案审查API服务启动中...")
  351. logger.info(f"服务地址: http://{args.host}:{args.port}")
  352. logger.info(f"API文档: http://{args.host}:{args.port}/docs")
  353. logger.info(f"健康检查: http://{args.host}:{args.port}/health")
  354. if not args.no_celery:
  355. logger.info("Celery Worker: 已集成启动")
  356. else:
  357. logger.warning("Celery Worker: 已禁用")
  358. run_server(
  359. host=args.host,
  360. port=args.port,
  361. reload=False,
  362. with_celery=not args.no_celery
  363. )