""" AIChat 代理服务 负责将请求代理到 shudao-aichat 服务 """ import httpx from typing import AsyncGenerator, Optional from fastapi import Request from fastapi.responses import StreamingResponse, JSONResponse from utils.config import settings from utils.logger import logger class AIChatProxy: """AIChat 服务代理""" def __init__(self): self.base_url = settings.aichat.api_url.rstrip('/') self.timeout = settings.aichat.timeout def _get_auth_headers(self, request: Request) -> dict: """提取并转发认证 headers""" headers = {} # 支持多种 header 名称 for header_name in ["Authorization", "Token", "token"]: header_value = request.headers.get(header_name, "").strip() if header_value: headers[header_name] = header_value return headers async def proxy_sse( self, path: str, request: Request, request_body: bytes ) -> StreamingResponse: """ 代理 SSE 流式请求到 aichat Args: path: API 路径(如 /report/complete-flow) request: FastAPI Request 对象 request_body: 请求体 Returns: StreamingResponse """ url = f"{self.base_url}{path}" headers = self._get_auth_headers(request) headers["Content-Type"] = "application/json" logger.info(f"[AIChat代理] SSE 请求: {url}") async def stream_generator() -> AsyncGenerator[bytes, None]: try: async with httpx.AsyncClient(timeout=self.timeout) as client: async with client.stream( "POST", url, content=request_body, headers=headers ) as response: if response.status_code != 200: error_text = await response.aread() logger.error( f"[AIChat代理] SSE 请求失败: {response.status_code} {error_text.decode()}") error_msg = f"data: {{\"type\": \"online_error\", \"message\": \"AIChat服务返回异常: {response.status_code}\"}}\n\n" yield error_msg.encode('utf-8') yield b"data: {\"type\": \"completed\"}\n\n" return # 流式转发响应 async for chunk in response.aiter_bytes(chunk_size=4096): yield chunk except httpx.TimeoutException: logger.error("[AIChat代理] SSE 请求超时") yield f'data: {{"type": "online_error", "message": "AIChat服务请求超时"}}\n\n'.encode('utf-8') yield b"data: {\"type\": \"completed\"}\n\n" except Exception as e: logger.error(f"[AIChat代理] SSE 请求异常: {e}") yield f'data: {{"type": "online_error", "message": "AIChat服务请求超时"}}\n\n'.encode('utf-8') yield error_msg.encode('utf-8') yield b"data: {\"type\": \"completed\"}\n\n" return StreamingResponse( stream_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", } ) async def proxy_json( self, path: str, request: Request, request_body: bytes ) -> JSONResponse: """ 代理 JSON 请求到 aichat Args: path: API 路径(如 /report/update-ai-message) request: FastAPI Request 对象 request_body: 请求体 Returns: JSONResponse """ url = f"{self.base_url}{path}" headers = self._get_auth_headers(request) headers["Content-Type"] = "application/json" logger.info(f"[AIChat代理] JSON 请求: {url}") try: async with httpx.AsyncClient(timeout=30) as client: response = await client.post( url, content=request_body, headers=headers ) # 转发响应 return JSONResponse( content=response.json(), status_code=response.status_code ) except httpx.TimeoutException: logger.error("[AIChat代理] JSON 请求超时") return JSONResponse( content={"success": False, "message": "AIChat服务请求超时"}, status_code=504 ) except Exception as e: logger.error(f"[AIChat代理] JSON 请求异常: {e}") return JSONResponse( content={"success": False, "message": f"AIChat服务异常: {str(e)}"}, status_code=500 ) async def health_check(self) -> bool: """ 检查 aichat 服务健康状态 Returns: True 表示服务可用,False 表示不可用 """ try: async with httpx.AsyncClient(timeout=5) as client: response = await client.get(f"{self.base_url}/health") return response.status_code == 200 except Exception as e: logger.warning(f"[AIChat代理] 健康检查失败: {e}") return False # 全局代理实例 aichat_proxy = AIChatProxy()