| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- """
- AIChat 代理服务
- 负责将请求代理到 shudao-aichat 服务
- """
- import httpx
- from typing import AsyncGenerator, Optional
- from fastapi import Request
- from fastapi.responses import StreamingResponse, JSONResponse
- from utils.config import settings
- from utils.logger import logger
- class AIChatProxy:
- """AIChat 服务代理"""
- def __init__(self):
- self.base_url = settings.aichat.api_url.rstrip('/')
- self.timeout = settings.aichat.timeout
- def _get_auth_headers(self, request: Request) -> dict:
- """提取并转发认证 headers"""
- headers = {}
- # 支持多种 header 名称
- for header_name in ["Authorization", "Token", "token"]:
- header_value = request.headers.get(header_name, "").strip()
- if header_value:
- headers[header_name] = header_value
- return headers
- async def proxy_sse(
- self,
- path: str,
- request: Request,
- request_body: bytes
- ) -> StreamingResponse:
- """
- 代理 SSE 流式请求到 aichat
- Args:
- path: API 路径(如 /report/complete-flow)
- request: FastAPI Request 对象
- request_body: 请求体
- Returns:
- StreamingResponse
- """
- url = f"{self.base_url}{path}"
- headers = self._get_auth_headers(request)
- headers["Content-Type"] = "application/json"
- logger.info(f"[AIChat代理] SSE 请求: {url}")
- async def stream_generator() -> AsyncGenerator[bytes, None]:
- try:
- async with httpx.AsyncClient(timeout=self.timeout) as client:
- async with client.stream(
- "POST",
- url,
- content=request_body,
- headers=headers
- ) as response:
- if response.status_code != 200:
- error_text = await response.aread()
- logger.error(
- f"[AIChat代理] SSE 请求失败: {response.status_code} {error_text.decode()}")
- error_msg = f"data: {{\"type\": \"online_error\", \"message\": \"AIChat服务返回异常: {response.status_code}\"}}\n\n"
- yield error_msg.encode('utf-8')
- yield b"data: {\"type\": \"completed\"}\n\n"
- return
- # 流式转发响应
- async for chunk in response.aiter_bytes(chunk_size=4096):
- yield chunk
- except httpx.TimeoutException:
- logger.error("[AIChat代理] SSE 请求超时")
- yield f'data: {{"type": "online_error", "message": "AIChat服务请求超时"}}\n\n'.encode('utf-8')
- yield b"data: {\"type\": \"completed\"}\n\n"
- except Exception as e:
- logger.error(f"[AIChat代理] SSE 请求异常: {e}")
- yield f'data: {{"type": "online_error", "message": "AIChat服务请求超时"}}\n\n'.encode('utf-8')
- yield error_msg.encode('utf-8')
- yield b"data: {\"type\": \"completed\"}\n\n"
- return StreamingResponse(
- stream_generator(),
- media_type="text/event-stream",
- headers={
- "Cache-Control": "no-cache",
- "Connection": "keep-alive",
- "Access-Control-Allow-Origin": "*",
- }
- )
- async def proxy_json(
- self,
- path: str,
- request: Request,
- request_body: bytes
- ) -> JSONResponse:
- """
- 代理 JSON 请求到 aichat
- Args:
- path: API 路径(如 /report/update-ai-message)
- request: FastAPI Request 对象
- request_body: 请求体
- Returns:
- JSONResponse
- """
- url = f"{self.base_url}{path}"
- headers = self._get_auth_headers(request)
- headers["Content-Type"] = "application/json"
- logger.info(f"[AIChat代理] JSON 请求: {url}")
- try:
- async with httpx.AsyncClient(timeout=30) as client:
- response = await client.post(
- url,
- content=request_body,
- headers=headers
- )
- # 转发响应
- return JSONResponse(
- content=response.json(),
- status_code=response.status_code
- )
- except httpx.TimeoutException:
- logger.error("[AIChat代理] JSON 请求超时")
- return JSONResponse(
- content={"success": False, "message": "AIChat服务请求超时"},
- status_code=504
- )
- except Exception as e:
- logger.error(f"[AIChat代理] JSON 请求异常: {e}")
- return JSONResponse(
- content={"success": False, "message": f"AIChat服务异常: {str(e)}"},
- status_code=500
- )
- async def health_check(self) -> bool:
- """
- 检查 aichat 服务健康状态
- Returns:
- True 表示服务可用,False 表示不可用
- """
- try:
- async with httpx.AsyncClient(timeout=5) as client:
- response = await client.get(f"{self.base_url}/health")
- return response.status_code == 200
- except Exception as e:
- logger.warning(f"[AIChat代理] 健康检查失败: {e}")
- return False
- # 全局代理实例
- aichat_proxy = AIChatProxy()
|