import sys import os import subprocess import time import socket from pathlib import Path from contextlib import asynccontextmanager from typing import Optional PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) import uvicorn from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from foundation.infrastructure.config.config import config_handler from foundation.observability.logger.loggering import server_logger from views import lifespan as views_lifespan from views.construction_write.content_completion import content_completion_router from views.construction_write.outline_views import outline_router from views.construction_write.similar_plan_recommend import similar_fragment_router from views.document_chat.views import document_chat_router def _config_bool(section: str, option: str, default: bool = False) -> bool: value = os.getenv(option) if value is None: value = config_handler.get(section, option, str(default)) if value is None: return default return str(value).strip().lower() in {"1", "true", "yes", "y", "on"} class CeleryWorkerManager: """Start the construction-write Celery worker with the API in local/dev runs.""" def __init__(self): self.process: Optional[subprocess.Popen] = None self._stdout = None self._stderr = None self.stdout_log_path: Optional[Path] = None self.stderr_log_path: Optional[Path] = None self.last_command: list[str] = [] self.last_error: Optional[str] = None self.started_at: Optional[float] = None def _tail_file(self, path: Optional[Path], max_lines: int = 20) -> str: if not path or not path.exists(): return "" try: lines = path.read_text(encoding="utf-8", errors="ignore").splitlines() return "\n".join(lines[-max_lines:]) except Exception as exc: return f"failed to read log tail: {exc}" def start(self): if not _config_bool("construction_write", "AUTO_START_CELERY_WORKER", True): server_logger.info("Construction write Celery worker auto-start disabled") return if self.process and self.process.poll() is None: server_logger.info(f"Construction write Celery worker already running: pid={self.process.pid}") return log_dir = PROJECT_ROOT / "logs" / "construction_write" log_dir.mkdir(parents=True, exist_ok=True) self.stdout_log_path = log_dir / "celery_worker.out.log" self.stderr_log_path = log_dir / "celery_worker.err.log" self._stdout = open(self.stdout_log_path, "a", encoding="utf-8", buffering=1) self._stderr = open(self.stderr_log_path, "a", encoding="utf-8", buffering=1) concurrency = config_handler.get("construction_write", "MAX_CELERY_TASKS", "1") args = [ sys.executable, "-m", "celery", "-A", "foundation.infrastructure.messaging.celery_app.app", "worker", "-Q", "construction_write", "--loglevel=info", "--concurrency", str(concurrency), ] if sys.platform == "win32": args.extend(["--pool=solo"]) self.last_command = args self.last_error = None self.started_at = time.time() env = os.environ.copy() env["CELERY_WORKER"] = "1" env["PYTHONPATH"] = f"{PROJECT_ROOT}{os.pathsep}{env.get('PYTHONPATH', '')}" creationflags = subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 self.process = subprocess.Popen( args, cwd=str(PROJECT_ROOT), stdout=self._stdout, stderr=self._stderr, env=env, creationflags=creationflags, ) server_logger.info(f"Construction write Celery worker started: pid={self.process.pid}") try: self.process.wait(timeout=1.0) except subprocess.TimeoutExpired: return returncode = self.process.returncode if self._stderr: self._stderr.flush() stderr_tail = self._tail_file(self.stderr_log_path) self.last_error = ( f"Construction write Celery worker exited immediately with code {returncode}. " f"stderr tail: {stderr_tail}" ) server_logger.error(self.last_error) def stop(self): if self.process and self.process.poll() is None: server_logger.info(f"Stopping construction write Celery worker: pid={self.process.pid}") self.process.terminate() try: self.process.wait(timeout=10) except subprocess.TimeoutExpired: server_logger.warning("Celery worker did not stop in time; killing it") self.process.kill() self.process.wait(timeout=5) for handle in (self._stdout, self._stderr): if handle: handle.close() self.process = None self._stdout = None self._stderr = None def status(self) -> dict: auto_start = _config_bool("construction_write", "AUTO_START_CELERY_WORKER", True) running = bool(self.process and self.process.poll() is None) return { "auto_start": auto_start, "is_running": running, "pid": self.process.pid if running else None, "returncode": self.process.returncode if self.process and not running else None, "started_at": int(self.started_at) if self.started_at else None, "last_error": self.last_error, "command": " ".join(self.last_command) if self.last_command else None, "stdout_log": str(self.stdout_log_path) if self.stdout_log_path else None, "stderr_log": str(self.stderr_log_path) if self.stderr_log_path else None, } celery_worker_manager = CeleryWorkerManager() @asynccontextmanager async def lifespan(app: FastAPI): celery_worker_manager.start() async with views_lifespan(app): yield celery_worker_manager.stop() def create_app() -> FastAPI: app = FastAPI( title="LQAgent Write API", description="施工方案编写服务", version="1.0.0", lifespan=lifespan, ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.include_router(outline_router) app.include_router(content_completion_router) app.include_router(similar_fragment_router) app.include_router(document_chat_router) @app.get("/health") async def health(): worker_status = celery_worker_manager.status() status = "healthy" if worker_status["auto_start"] and not worker_status["is_running"]: status = "degraded" return { "status": status, "service": "construction_write", "celery_worker": { "auto_start": worker_status["auto_start"], "is_running": worker_status["is_running"], "pid": worker_status["pid"], }, } @app.get("/celery/status") async def celery_status(): return {"celery_worker": celery_worker_manager.status()} return app app = create_app() def _ensure_port_available(host: str, port: int): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.bind((host, port)) except OSError as exc: message = ( f"Cannot start LQAgent Write API on {host}:{port}. " "The port is already in use. Stop the existing process or change [launch] LAUNCH_PORT." ) server_logger.error(f"{message} Original error: {exc}") raise SystemExit(message) from exc def main(): host = config_handler.get("launch", "HOST", "0.0.0.0") port = int(config_handler.get("launch", "LAUNCH_PORT", "8004")) server_logger.info(f"LQAgent Write API starting on {host}:{port}") _ensure_port_available(host, port) uvicorn.run(app, host=host, port=port, reload=False) if __name__ == "__main__": main()