aichat_proxy.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. """
  2. AIChat 代理服务
  3. 负责将请求代理到 shudao-aichat 服务
  4. """
  5. import httpx
  6. from typing import AsyncGenerator, Optional
  7. from fastapi import Request
  8. from fastapi.responses import StreamingResponse, JSONResponse
  9. from utils.config import settings
  10. from utils.logger import logger
  11. class AIChatProxy:
  12. """AIChat 服务代理"""
  13. def __init__(self):
  14. self.base_url = settings.aichat.api_url.rstrip('/')
  15. self.timeout = settings.aichat.timeout
  16. def _get_auth_headers(self, request: Request) -> dict:
  17. """提取并转发认证 headers"""
  18. headers = {}
  19. # 支持多种 header 名称
  20. for header_name in ["Authorization", "Token", "token"]:
  21. header_value = request.headers.get(header_name, "").strip()
  22. if header_value:
  23. headers[header_name] = header_value
  24. return headers
  25. async def proxy_sse(
  26. self,
  27. path: str,
  28. request: Request,
  29. request_body: bytes
  30. ) -> StreamingResponse:
  31. """
  32. 代理 SSE 流式请求到 aichat
  33. Args:
  34. path: API 路径(如 /report/complete-flow)
  35. request: FastAPI Request 对象
  36. request_body: 请求体
  37. Returns:
  38. StreamingResponse
  39. """
  40. url = f"{self.base_url}{path}"
  41. headers = self._get_auth_headers(request)
  42. headers["Content-Type"] = "application/json"
  43. logger.info(f"[AIChat代理] SSE 请求: {url}")
  44. async def stream_generator() -> AsyncGenerator[bytes, None]:
  45. try:
  46. async with httpx.AsyncClient(timeout=self.timeout) as client:
  47. async with client.stream(
  48. "POST",
  49. url,
  50. content=request_body,
  51. headers=headers
  52. ) as response:
  53. if response.status_code != 200:
  54. error_text = await response.aread()
  55. logger.error(
  56. f"[AIChat代理] SSE 请求失败: {response.status_code} {error_text.decode()}")
  57. error_msg = f"data: {{\"type\": \"online_error\", \"message\": \"AIChat服务返回异常: {response.status_code}\"}}\n\n"
  58. yield error_msg.encode('utf-8')
  59. yield b"data: {\"type\": \"completed\"}\n\n"
  60. return
  61. # 流式转发响应
  62. async for chunk in response.aiter_bytes(chunk_size=4096):
  63. yield chunk
  64. except httpx.TimeoutException:
  65. logger.error("[AIChat代理] SSE 请求超时")
  66. yield f'data: {{"type": "online_error", "message": "AIChat服务请求超时"}}\n\n'.encode('utf-8')
  67. yield b"data: {\"type\": \"completed\"}\n\n"
  68. except Exception as e:
  69. logger.error(f"[AIChat代理] SSE 请求异常: {e}")
  70. yield f'data: {{"type": "online_error", "message": "AIChat服务请求超时"}}\n\n'.encode('utf-8')
  71. yield error_msg.encode('utf-8')
  72. yield b"data: {\"type\": \"completed\"}\n\n"
  73. return StreamingResponse(
  74. stream_generator(),
  75. media_type="text/event-stream",
  76. headers={
  77. "Cache-Control": "no-cache",
  78. "Connection": "keep-alive",
  79. "Access-Control-Allow-Origin": "*",
  80. }
  81. )
  82. async def proxy_json(
  83. self,
  84. path: str,
  85. request: Request,
  86. request_body: bytes
  87. ) -> JSONResponse:
  88. """
  89. 代理 JSON 请求到 aichat
  90. Args:
  91. path: API 路径(如 /report/update-ai-message)
  92. request: FastAPI Request 对象
  93. request_body: 请求体
  94. Returns:
  95. JSONResponse
  96. """
  97. url = f"{self.base_url}{path}"
  98. headers = self._get_auth_headers(request)
  99. headers["Content-Type"] = "application/json"
  100. logger.info(f"[AIChat代理] JSON 请求: {url}")
  101. try:
  102. async with httpx.AsyncClient(timeout=30) as client:
  103. response = await client.post(
  104. url,
  105. content=request_body,
  106. headers=headers
  107. )
  108. # 转发响应
  109. return JSONResponse(
  110. content=response.json(),
  111. status_code=response.status_code
  112. )
  113. except httpx.TimeoutException:
  114. logger.error("[AIChat代理] JSON 请求超时")
  115. return JSONResponse(
  116. content={"success": False, "message": "AIChat服务请求超时"},
  117. status_code=504
  118. )
  119. except Exception as e:
  120. logger.error(f"[AIChat代理] JSON 请求异常: {e}")
  121. return JSONResponse(
  122. content={"success": False, "message": f"AIChat服务异常: {str(e)}"},
  123. status_code=500
  124. )
  125. async def health_check(self) -> bool:
  126. """
  127. 检查 aichat 服务健康状态
  128. Returns:
  129. True 表示服务可用,False 表示不可用
  130. """
  131. try:
  132. async with httpx.AsyncClient(timeout=5) as client:
  133. response = await client.get(f"{self.base_url}/health")
  134. return response.status_code == 200
  135. except Exception as e:
  136. logger.warning(f"[AIChat代理] 健康检查失败: {e}")
  137. return False
  138. # 全局代理实例
  139. aichat_proxy = AIChatProxy()