| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- import sys
- import os
- import subprocess
- import time
- import socket
- from pathlib import Path
- from contextlib import asynccontextmanager
- from typing import Optional
- PROJECT_ROOT = Path(__file__).resolve().parents[1]
- if str(PROJECT_ROOT) not in sys.path:
- sys.path.insert(0, str(PROJECT_ROOT))
- import uvicorn
- from fastapi import FastAPI
- from fastapi.middleware.cors import CORSMiddleware
- from foundation.infrastructure.config.config import config_handler
- from foundation.observability.logger.loggering import server_logger
- from views import lifespan as views_lifespan
- from views.construction_write.content_completion import content_completion_router
- from views.construction_write.outline_views import outline_router
- from views.construction_write.similar_plan_recommend import similar_fragment_router
- from views.document_chat.views import document_chat_router
- def _config_bool(section: str, option: str, default: bool = False) -> bool:
- value = os.getenv(option)
- if value is None:
- value = config_handler.get(section, option, str(default))
- if value is None:
- return default
- return str(value).strip().lower() in {"1", "true", "yes", "y", "on"}
- class CeleryWorkerManager:
- """Start the construction-write Celery worker with the API in local/dev runs."""
- def __init__(self):
- self.process: Optional[subprocess.Popen] = None
- self._stdout = None
- self._stderr = None
- self.stdout_log_path: Optional[Path] = None
- self.stderr_log_path: Optional[Path] = None
- self.last_command: list[str] = []
- self.last_error: Optional[str] = None
- self.started_at: Optional[float] = None
- def _tail_file(self, path: Optional[Path], max_lines: int = 20) -> str:
- if not path or not path.exists():
- return ""
- try:
- lines = path.read_text(encoding="utf-8", errors="ignore").splitlines()
- return "\n".join(lines[-max_lines:])
- except Exception as exc:
- return f"failed to read log tail: {exc}"
- def start(self):
- if not _config_bool("construction_write", "AUTO_START_CELERY_WORKER", True):
- server_logger.info("Construction write Celery worker auto-start disabled")
- return
- if self.process and self.process.poll() is None:
- server_logger.info(f"Construction write Celery worker already running: pid={self.process.pid}")
- return
- log_dir = PROJECT_ROOT / "logs" / "construction_write"
- log_dir.mkdir(parents=True, exist_ok=True)
- self.stdout_log_path = log_dir / "celery_worker.out.log"
- self.stderr_log_path = log_dir / "celery_worker.err.log"
- self._stdout = open(self.stdout_log_path, "a", encoding="utf-8", buffering=1)
- self._stderr = open(self.stderr_log_path, "a", encoding="utf-8", buffering=1)
- concurrency = config_handler.get("construction_write", "MAX_CELERY_TASKS", "1")
- args = [
- sys.executable,
- "-m",
- "celery",
- "-A",
- "foundation.infrastructure.messaging.celery_app.app",
- "worker",
- "-Q",
- "construction_write",
- "--loglevel=info",
- "--concurrency",
- str(concurrency),
- ]
- if sys.platform == "win32":
- args.extend(["--pool=solo"])
- self.last_command = args
- self.last_error = None
- self.started_at = time.time()
- env = os.environ.copy()
- env["CELERY_WORKER"] = "1"
- env["PYTHONPATH"] = f"{PROJECT_ROOT}{os.pathsep}{env.get('PYTHONPATH', '')}"
- creationflags = subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0
- self.process = subprocess.Popen(
- args,
- cwd=str(PROJECT_ROOT),
- stdout=self._stdout,
- stderr=self._stderr,
- env=env,
- creationflags=creationflags,
- )
- server_logger.info(f"Construction write Celery worker started: pid={self.process.pid}")
- try:
- self.process.wait(timeout=1.0)
- except subprocess.TimeoutExpired:
- return
- returncode = self.process.returncode
- if self._stderr:
- self._stderr.flush()
- stderr_tail = self._tail_file(self.stderr_log_path)
- self.last_error = (
- f"Construction write Celery worker exited immediately with code {returncode}. "
- f"stderr tail: {stderr_tail}"
- )
- server_logger.error(self.last_error)
- def stop(self):
- if self.process and self.process.poll() is None:
- server_logger.info(f"Stopping construction write Celery worker: pid={self.process.pid}")
- self.process.terminate()
- try:
- self.process.wait(timeout=10)
- except subprocess.TimeoutExpired:
- server_logger.warning("Celery worker did not stop in time; killing it")
- self.process.kill()
- self.process.wait(timeout=5)
- for handle in (self._stdout, self._stderr):
- if handle:
- handle.close()
- self.process = None
- self._stdout = None
- self._stderr = None
- def status(self) -> dict:
- auto_start = _config_bool("construction_write", "AUTO_START_CELERY_WORKER", True)
- running = bool(self.process and self.process.poll() is None)
- return {
- "auto_start": auto_start,
- "is_running": running,
- "pid": self.process.pid if running else None,
- "returncode": self.process.returncode if self.process and not running else None,
- "started_at": int(self.started_at) if self.started_at else None,
- "last_error": self.last_error,
- "command": " ".join(self.last_command) if self.last_command else None,
- "stdout_log": str(self.stdout_log_path) if self.stdout_log_path else None,
- "stderr_log": str(self.stderr_log_path) if self.stderr_log_path else None,
- }
- celery_worker_manager = CeleryWorkerManager()
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- celery_worker_manager.start()
- async with views_lifespan(app):
- yield
- celery_worker_manager.stop()
- def create_app() -> FastAPI:
- app = FastAPI(
- title="LQAgent Write API",
- description="施工方案编写服务",
- version="1.0.0",
- lifespan=lifespan,
- )
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- app.include_router(outline_router)
- app.include_router(content_completion_router)
- app.include_router(similar_fragment_router)
- app.include_router(document_chat_router)
- @app.get("/health")
- async def health():
- worker_status = celery_worker_manager.status()
- status = "healthy"
- if worker_status["auto_start"] and not worker_status["is_running"]:
- status = "degraded"
- return {
- "status": status,
- "service": "construction_write",
- "celery_worker": {
- "auto_start": worker_status["auto_start"],
- "is_running": worker_status["is_running"],
- "pid": worker_status["pid"],
- },
- }
- @app.get("/celery/status")
- async def celery_status():
- return {"celery_worker": celery_worker_manager.status()}
- return app
- app = create_app()
- def _ensure_port_available(host: str, port: int):
- try:
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
- sock.bind((host, port))
- except OSError as exc:
- message = (
- f"Cannot start LQAgent Write API on {host}:{port}. "
- "The port is already in use. Stop the existing process or change [launch] LAUNCH_PORT."
- )
- server_logger.error(f"{message} Original error: {exc}")
- raise SystemExit(message) from exc
- def main():
- host = config_handler.get("launch", "HOST", "0.0.0.0")
- port = int(config_handler.get("launch", "LAUNCH_PORT", "8004"))
- server_logger.info(f"LQAgent Write API starting on {host}:{port}")
- _ensure_port_available(host, port)
- uvicorn.run(app, host=host, port=port, reload=False)
- if __name__ == "__main__":
- main()
|