app.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. import sys
  2. import os
  3. import subprocess
  4. import time
  5. import socket
  6. from pathlib import Path
  7. from contextlib import asynccontextmanager
  8. from typing import Optional
  9. PROJECT_ROOT = Path(__file__).resolve().parents[1]
  10. if str(PROJECT_ROOT) not in sys.path:
  11. sys.path.insert(0, str(PROJECT_ROOT))
  12. import uvicorn
  13. from fastapi import FastAPI
  14. from fastapi.middleware.cors import CORSMiddleware
  15. from foundation.infrastructure.config.config import config_handler
  16. from foundation.observability.logger.loggering import server_logger
  17. from views import lifespan as views_lifespan
  18. from views.construction_write.content_completion import content_completion_router
  19. from views.construction_write.outline_views import outline_router
  20. from views.construction_write.similar_plan_recommend import similar_fragment_router
  21. from views.document_chat.views import document_chat_router
  22. def _config_bool(section: str, option: str, default: bool = False) -> bool:
  23. value = os.getenv(option)
  24. if value is None:
  25. value = config_handler.get(section, option, str(default))
  26. if value is None:
  27. return default
  28. return str(value).strip().lower() in {"1", "true", "yes", "y", "on"}
  29. class CeleryWorkerManager:
  30. """Start the construction-write Celery worker with the API in local/dev runs."""
  31. def __init__(self):
  32. self.process: Optional[subprocess.Popen] = None
  33. self._stdout = None
  34. self._stderr = None
  35. self.stdout_log_path: Optional[Path] = None
  36. self.stderr_log_path: Optional[Path] = None
  37. self.last_command: list[str] = []
  38. self.last_error: Optional[str] = None
  39. self.started_at: Optional[float] = None
  40. def _tail_file(self, path: Optional[Path], max_lines: int = 20) -> str:
  41. if not path or not path.exists():
  42. return ""
  43. try:
  44. lines = path.read_text(encoding="utf-8", errors="ignore").splitlines()
  45. return "\n".join(lines[-max_lines:])
  46. except Exception as exc:
  47. return f"failed to read log tail: {exc}"
  48. def start(self):
  49. if not _config_bool("construction_write", "AUTO_START_CELERY_WORKER", True):
  50. server_logger.info("Construction write Celery worker auto-start disabled")
  51. return
  52. if self.process and self.process.poll() is None:
  53. server_logger.info(f"Construction write Celery worker already running: pid={self.process.pid}")
  54. return
  55. log_dir = PROJECT_ROOT / "logs" / "construction_write"
  56. log_dir.mkdir(parents=True, exist_ok=True)
  57. self.stdout_log_path = log_dir / "celery_worker.out.log"
  58. self.stderr_log_path = log_dir / "celery_worker.err.log"
  59. self._stdout = open(self.stdout_log_path, "a", encoding="utf-8", buffering=1)
  60. self._stderr = open(self.stderr_log_path, "a", encoding="utf-8", buffering=1)
  61. concurrency = config_handler.get("construction_write", "MAX_CELERY_TASKS", "1")
  62. args = [
  63. sys.executable,
  64. "-m",
  65. "celery",
  66. "-A",
  67. "foundation.infrastructure.messaging.celery_app.app",
  68. "worker",
  69. "-Q",
  70. "construction_write",
  71. "--loglevel=info",
  72. "--concurrency",
  73. str(concurrency),
  74. ]
  75. if sys.platform == "win32":
  76. args.extend(["--pool=solo"])
  77. self.last_command = args
  78. self.last_error = None
  79. self.started_at = time.time()
  80. env = os.environ.copy()
  81. env["CELERY_WORKER"] = "1"
  82. env["PYTHONPATH"] = f"{PROJECT_ROOT}{os.pathsep}{env.get('PYTHONPATH', '')}"
  83. creationflags = subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0
  84. self.process = subprocess.Popen(
  85. args,
  86. cwd=str(PROJECT_ROOT),
  87. stdout=self._stdout,
  88. stderr=self._stderr,
  89. env=env,
  90. creationflags=creationflags,
  91. )
  92. server_logger.info(f"Construction write Celery worker started: pid={self.process.pid}")
  93. try:
  94. self.process.wait(timeout=1.0)
  95. except subprocess.TimeoutExpired:
  96. return
  97. returncode = self.process.returncode
  98. if self._stderr:
  99. self._stderr.flush()
  100. stderr_tail = self._tail_file(self.stderr_log_path)
  101. self.last_error = (
  102. f"Construction write Celery worker exited immediately with code {returncode}. "
  103. f"stderr tail: {stderr_tail}"
  104. )
  105. server_logger.error(self.last_error)
  106. def stop(self):
  107. if self.process and self.process.poll() is None:
  108. server_logger.info(f"Stopping construction write Celery worker: pid={self.process.pid}")
  109. self.process.terminate()
  110. try:
  111. self.process.wait(timeout=10)
  112. except subprocess.TimeoutExpired:
  113. server_logger.warning("Celery worker did not stop in time; killing it")
  114. self.process.kill()
  115. self.process.wait(timeout=5)
  116. for handle in (self._stdout, self._stderr):
  117. if handle:
  118. handle.close()
  119. self.process = None
  120. self._stdout = None
  121. self._stderr = None
  122. def status(self) -> dict:
  123. auto_start = _config_bool("construction_write", "AUTO_START_CELERY_WORKER", True)
  124. running = bool(self.process and self.process.poll() is None)
  125. return {
  126. "auto_start": auto_start,
  127. "is_running": running,
  128. "pid": self.process.pid if running else None,
  129. "returncode": self.process.returncode if self.process and not running else None,
  130. "started_at": int(self.started_at) if self.started_at else None,
  131. "last_error": self.last_error,
  132. "command": " ".join(self.last_command) if self.last_command else None,
  133. "stdout_log": str(self.stdout_log_path) if self.stdout_log_path else None,
  134. "stderr_log": str(self.stderr_log_path) if self.stderr_log_path else None,
  135. }
  136. celery_worker_manager = CeleryWorkerManager()
  137. @asynccontextmanager
  138. async def lifespan(app: FastAPI):
  139. celery_worker_manager.start()
  140. async with views_lifespan(app):
  141. yield
  142. celery_worker_manager.stop()
  143. def create_app() -> FastAPI:
  144. app = FastAPI(
  145. title="LQAgent Write API",
  146. description="施工方案编写服务",
  147. version="1.0.0",
  148. lifespan=lifespan,
  149. )
  150. app.add_middleware(
  151. CORSMiddleware,
  152. allow_origins=["*"],
  153. allow_credentials=True,
  154. allow_methods=["*"],
  155. allow_headers=["*"],
  156. )
  157. app.include_router(outline_router)
  158. app.include_router(content_completion_router)
  159. app.include_router(similar_fragment_router)
  160. app.include_router(document_chat_router)
  161. @app.get("/health")
  162. async def health():
  163. worker_status = celery_worker_manager.status()
  164. status = "healthy"
  165. if worker_status["auto_start"] and not worker_status["is_running"]:
  166. status = "degraded"
  167. return {
  168. "status": status,
  169. "service": "construction_write",
  170. "celery_worker": {
  171. "auto_start": worker_status["auto_start"],
  172. "is_running": worker_status["is_running"],
  173. "pid": worker_status["pid"],
  174. },
  175. }
  176. @app.get("/celery/status")
  177. async def celery_status():
  178. return {"celery_worker": celery_worker_manager.status()}
  179. return app
  180. app = create_app()
  181. def _ensure_port_available(host: str, port: int):
  182. try:
  183. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
  184. sock.bind((host, port))
  185. except OSError as exc:
  186. message = (
  187. f"Cannot start LQAgent Write API on {host}:{port}. "
  188. "The port is already in use. Stop the existing process or change [launch] LAUNCH_PORT."
  189. )
  190. server_logger.error(f"{message} Original error: {exc}")
  191. raise SystemExit(message) from exc
  192. def main():
  193. host = config_handler.get("launch", "HOST", "0.0.0.0")
  194. port = int(config_handler.get("launch", "LAUNCH_PORT", "8004"))
  195. server_logger.info(f"LQAgent Write API starting on {host}:{port}")
  196. _ensure_port_available(host, port)
  197. uvicorn.run(app, host=host, port=port, reload=False)
  198. if __name__ == "__main__":
  199. main()