Prechádzať zdrojové kódy

update:新增chat-py调用aichat的功能

Cline 4 dní pred
rodič
commit
e89f55c364

+ 4 - 0
shudao-chat-py/main.py

@@ -101,6 +101,10 @@ async def combined_middleware(request: Request, call_next):
 # 注册路由
 app.include_router(api_router)
 
+# 单独注册报告兼容路由(避免双重前缀)
+from routers.report_compat import router as report_compat_router
+app.include_router(report_compat_router)
+
 # 创建静态文件目录
 Path("static").mkdir(exist_ok=True)
 Path("assets").mkdir(exist_ok=True)

+ 33 - 0
shudao-chat-py/models/report.py

@@ -0,0 +1,33 @@
+"""
+报告相关数据模型
+"""
+from pydantic import BaseModel, Field
+from typing import Optional
+
+
+class ReportCompleteFlowRequest(BaseModel):
+    """完整报告生成流程请求"""
+    user_question: str = Field(..., description="用户问题")
+    window_size: int = Field(default=5, description="窗口大小")
+    n_results: int = Field(default=5, description="结果数量")
+    ai_conversation_id: Optional[int] = Field(default=None, description="AI对话ID")
+    is_network_search_enabled: bool = Field(default=False, description="是否启用联网搜索")
+    enable_online_model: bool = Field(default=False, description="是否启用在线模型")
+
+
+class UpdateAIMessageRequest(BaseModel):
+    """更新AI消息请求"""
+    ai_message_id: int = Field(..., description="AI消息ID")
+    content: str = Field(..., description="消息内容")
+
+
+class StopSSERequest(BaseModel):
+    """停止SSE请求"""
+    ai_conversation_id: int = Field(..., description="AI对话ID")
+
+
+class StreamChatRequest(BaseModel):
+    """流式聊天请求(用于降级)"""
+    message: str = Field(..., description="用户消息")
+    ai_conversation_id: Optional[int] = Field(default=None, description="AI对话ID")
+    business_type: int = Field(default=0, description="业务类型")

+ 9 - 2
shudao-chat-py/routers/__init__.py

@@ -4,7 +4,7 @@ from fastapi import APIRouter
 api_router = APIRouter(prefix="/apiv1")
 
 # 导入各个路由模块
-from . import chat, total, scene, tracking, file, knowledge, exam, auth, points, hazard
+from . import chat, total, scene, tracking, file, knowledge, exam, auth, points, hazard, report_compat
 
 # 注册路由
 api_router.include_router(auth.router, prefix="/auth", tags=["认证"])
@@ -18,4 +18,11 @@ api_router.include_router(knowledge.router, tags=["知识库"])
 api_router.include_router(exam.router, tags=["考试"])
 api_router.include_router(hazard.router, tags=["隐患识别"])
 
-__all__ = ["api_router"]
+# 注册报告兼容路由(不带前缀,因为 report_compat.router 已经有 /apiv1 前缀)
+from fastapi import FastAPI
+temp_app = FastAPI()
+temp_app.include_router(report_compat.router)
+# 将 report_compat 的路由直接添加到主应用,而不是通过 api_router
+# 这样可以避免双重前缀问题
+
+__all__ = ["api_router", "report_compat"]

+ 245 - 0
shudao-chat-py/routers/report_compat.py

@@ -0,0 +1,245 @@
+"""
+报告兼容路由
+完全对齐 Go 版本的接口实现,保持外部一致性
+"""
+from fastapi import APIRouter, Request
+from fastapi.responses import StreamingResponse, JSONResponse
+import httpx
+import json
+from typing import AsyncGenerator
+from models.report import (
+    ReportCompleteFlowRequest,
+    UpdateAIMessageRequest,
+    StopSSERequest,
+    StreamChatRequest
+)
+from services.aichat_proxy import aichat_proxy
+from utils.token import is_local_token
+from utils.config import settings
+from utils.logger import logger
+from database import get_db
+from sqlalchemy.orm import Session
+from models.chat import AIMessage
+
+router = APIRouter(prefix="/apiv1", tags=["报告兼容"])
+
+
+def get_request_token(request: Request) -> str:
+    """获取请求中的 token"""
+    # 支持多种 header 名称
+    for header_name in ["token", "Token", "Authorization"]:
+        header_value = request.headers.get(header_name, "").strip()
+        if header_value:
+            if header_name == "Authorization" and header_value.startswith("Bearer "):
+                return header_value.replace("Bearer ", "")
+            return header_value
+    return ""
+
+
+def should_proxy_to_aichat(token: str) -> bool:
+    """判断是否应该代理到 aichat"""
+    if not token:
+        return False
+    
+    # 本地 token 不代理,外部 token 代理
+    return not is_local_token(token)
+
+
+async def fallback_to_local_stream(
+    request_data: ReportCompleteFlowRequest,
+    request: Request
+) -> StreamingResponse:
+    """降级到本地流式聊天"""
+    logger.info("[报告兼容] 降级到本地流式聊天")
+    
+    # 构建本地流式聊天请求
+    stream_request = StreamChatRequest(
+        message=request_data.user_question,
+        ai_conversation_id=request_data.ai_conversation_id,
+        business_type=0
+    )
+    
+    # 调用本地流式聊天接口
+    local_url = f"http://127.0.0.1:{settings.app.port}/apiv1/stream/chat-with-db"
+    
+    async def stream_generator() -> AsyncGenerator[bytes, None]:
+        try:
+            # 转发认证 headers
+            headers = {"Content-Type": "application/json"}
+            for header_name in ["Authorization", "Token", "token"]:
+                if header_value := request.headers.get(header_name):
+                    headers[header_name] = header_value
+            
+            async with httpx.AsyncClient(timeout=600) as client:
+                async with client.stream(
+                    "POST",
+                    local_url,
+                    json=stream_request.dict(),
+                    headers=headers
+                ) as response:
+                    if response.status_code != 200:
+                        error_msg = f"data: {{\"type\": \"online_error\", \"message\": \"Local stream failed: {response.status_code}\"}}\n\n"
+                        yield error_msg.encode('utf-8')
+                        yield b"data: {\"type\": \"completed\"}\n\n"
+                        return
+                    
+                    # 转发流式响应
+                    async for chunk in response.aiter_bytes(chunk_size=4096):
+                        yield chunk
+                        
+        except Exception as e:
+            logger.error(f"[报告兼容] 本地流式聊天异常: {e}")
+            error_msg = f"data: {{\"type\": \"online_error\", \"message\": \"Local stream error: {str(e)}\"}}\n\n"
+            yield error_msg.encode('utf-8')
+            yield b"data: {\"type\": \"completed\"}\n\n"
+    
+    return StreamingResponse(
+        stream_generator(),
+        media_type="text/event-stream",
+        headers={
+            "Cache-Control": "no-cache",
+            "Connection": "keep-alive",
+            "Access-Control-Allow-Origin": "*",
+        }
+    )
+
+
+@router.post("/report/complete-flow")
+async def complete_flow(request: Request):
+    """
+    完整报告生成流程(SSE)
+    完全对齐 Go 版本的实现
+    """
+    # 解析请求体
+    request_body = await request.body()
+    
+    try:
+        request_data = ReportCompleteFlowRequest(**json.loads(request_body))
+    except Exception as e:
+        return StreamingResponse(
+            iter([
+                f"data: {{\"type\": \"online_error\", \"message\": \"Request parse error: {str(e)}\"}}\n\n".encode('utf-8'),
+                b"data: {\"type\": \"completed\"}\n\n"
+            ]),
+            media_type="text/event-stream"
+        )
+    
+    # 验证问题不为空
+    if not request_data.user_question.strip():
+        return StreamingResponse(
+            iter([
+                b"data: {\"type\": \"online_error\", \"message\": \"Question cannot be empty\"}\n\n",
+                b"data: {\"type\": \"completed\"}\n\n"
+            ]),
+            media_type="text/event-stream"
+        )
+    
+    # 获取 token 并判断路由策略
+    token = get_request_token(request)
+    
+    if should_proxy_to_aichat(token):
+        # 外部 token,代理到 aichat
+        logger.info("[报告兼容] 代理到 aichat 服务")
+        try:
+            return await aichat_proxy.proxy_sse("/report/complete-flow", request, request_body)
+        except Exception as e:
+            logger.error(f"[报告兼容] 代理到 aichat 失败: {e}")
+            # 降级到本地处理
+            return await fallback_to_local_stream(request_data, request)
+    else:
+        # 本地 token,降级到本地流式聊天
+        return await fallback_to_local_stream(request_data, request)
+
+
+@router.post("/report/update-ai-message")
+async def update_ai_message(request: Request):
+    """
+    更新 AI 消息内容
+    完全对齐 Go 版本的实现
+    """
+    request_body = await request.body()
+    
+    # 获取 token 并判断路由策略
+    token = get_request_token(request)
+    
+    if should_proxy_to_aichat(token):
+        # 外部 token,代理到 aichat
+        logger.info("[报告兼容] 代理更新消息到 aichat")
+        try:
+            return await aichat_proxy.proxy_json("/report/update-ai-message", request, request_body)
+        except Exception as e:
+            logger.error(f"[报告兼容] 代理更新消息失败: {e}")
+            # 降级到本地处理
+    
+    # 本地处理
+    try:
+        request_data = UpdateAIMessageRequest(**json.loads(request_body))
+    except Exception as e:
+        return JSONResponse(
+            content={"success": False, "message": f"Request parse error: {str(e)}"},
+            status_code=400
+        )
+    
+    if request_data.ai_message_id == 0:
+        return JSONResponse(
+            content={"success": False, "message": "ai_message_id cannot be empty"},
+            status_code=400
+        )
+    
+    # 更新数据库
+    try:
+        db: Session = next(get_db())
+        db.query(AIMessage).filter(
+            AIMessage.id == request_data.ai_message_id,
+            AIMessage.is_deleted == 0
+        ).update({"content": request_data.content})
+        db.commit()
+        
+        return JSONResponse(
+            content={"success": True, "message": "AI message updated"},
+            status_code=200
+        )
+    except Exception as e:
+        logger.error(f"[报告兼容] 更新消息失败: {e}")
+        return JSONResponse(
+            content={"success": False, "message": f"Update failed: {str(e)}"},
+            status_code=500
+        )
+
+
+@router.post("/sse/stop")
+async def stop_sse(request: Request):
+    """
+    停止 SSE 流
+    完全对齐 Go 版本的实现
+    """
+    request_body = await request.body()
+    
+    # 获取 token 并判断路由策略
+    token = get_request_token(request)
+    
+    if should_proxy_to_aichat(token):
+        # 外部 token,代理到 aichat
+        logger.info("[报告兼容] 代理停止请求到 aichat")
+        try:
+            return await aichat_proxy.proxy_json("/sse/stop", request, request_body)
+        except Exception as e:
+            logger.error(f"[报告兼容] 代理停止请求失败: {e}")
+            # 降级到本地处理
+    
+    # 本地处理(简单返回成功)
+    try:
+        request_data = StopSSERequest(**json.loads(request_body))
+        return JSONResponse(
+            content={
+                "success": True,
+                "message": "Stop request received",
+                "ai_conversation_id": request_data.ai_conversation_id
+            },
+            status_code=200
+        )
+    except Exception as e:
+        return JSONResponse(
+            content={"success": False, "message": f"Request parse error: {str(e)}"},
+            status_code=400
+        )

+ 163 - 0
shudao-chat-py/services/aichat_proxy.py

@@ -0,0 +1,163 @@
+"""
+AIChat 代理服务
+负责将请求代理到 shudao-aichat 服务
+"""
+import httpx
+from typing import AsyncGenerator, Optional
+from fastapi import Request
+from fastapi.responses import StreamingResponse, JSONResponse
+from utils.config import settings
+from utils.logger import logger
+
+
+class AIChatProxy:
+    """AIChat 服务代理"""
+    
+    def __init__(self):
+        self.base_url = settings.aichat.api_url.rstrip('/')
+        self.timeout = settings.aichat.timeout
+    
+    def _get_auth_headers(self, request: Request) -> dict:
+        """提取并转发认证 headers"""
+        headers = {}
+        
+        # 支持多种 header 名称
+        for header_name in ["Authorization", "Token", "token"]:
+            header_value = request.headers.get(header_name, "").strip()
+            if header_value:
+                headers[header_name] = header_value
+        
+        return headers
+    
+    async def proxy_sse(
+        self, 
+        path: str, 
+        request: Request,
+        request_body: bytes
+    ) -> StreamingResponse:
+        """
+        代理 SSE 流式请求到 aichat
+        
+        Args:
+            path: API 路径(如 /report/complete-flow)
+            request: FastAPI Request 对象
+            request_body: 请求体
+            
+        Returns:
+            StreamingResponse
+        """
+        url = f"{self.base_url}{path}"
+        headers = self._get_auth_headers(request)
+        headers["Content-Type"] = "application/json"
+        
+        logger.info(f"[AIChat代理] SSE 请求: {url}")
+        
+        async def stream_generator() -> AsyncGenerator[bytes, None]:
+            try:
+                async with httpx.AsyncClient(timeout=self.timeout) as client:
+                    async with client.stream(
+                        "POST",
+                        url,
+                        content=request_body,
+                        headers=headers
+                    ) as response:
+                        if response.status_code != 200:
+                            error_text = await response.aread()
+                            logger.error(f"[AIChat代理] SSE 请求失败: {response.status_code} {error_text.decode()}")
+                            error_msg = f"data: {{\"type\": \"online_error\", \"message\": \"AIChat服务返回异常: {response.status_code}\"}}\n\n"
+                            yield error_msg.encode('utf-8')
+                            yield b"data: {\"type\": \"completed\"}\n\n"
+                            return
+                        
+                        # 流式转发响应
+                        async for chunk in response.aiter_bytes(chunk_size=4096):
+                            yield chunk
+                            
+            except httpx.TimeoutException:
+                logger.error("[AIChat代理] SSE 请求超时")
+                yield b"data: {\"type\": \"online_error\", \"message\": \"AIChat服务请求超时\"}\n\n"
+                yield b"data: {\"type\": \"completed\"}\n\n"
+            except Exception as e:
+                logger.error(f"[AIChat代理] SSE 请求异常: {e}")
+                error_msg = f"data: {{\"type\": \"online_error\", \"message\": \"AIChat服务异常: {str(e)}\"}}\n\n"
+                yield error_msg.encode('utf-8')
+                yield b"data: {\"type\": \"completed\"}\n\n"
+        
+        return StreamingResponse(
+            stream_generator(),
+            media_type="text/event-stream",
+            headers={
+                "Cache-Control": "no-cache",
+                "Connection": "keep-alive",
+                "Access-Control-Allow-Origin": "*",
+            }
+        )
+    
+    async def proxy_json(
+        self,
+        path: str,
+        request: Request,
+        request_body: bytes
+    ) -> JSONResponse:
+        """
+        代理 JSON 请求到 aichat
+        
+        Args:
+            path: API 路径(如 /report/update-ai-message)
+            request: FastAPI Request 对象
+            request_body: 请求体
+            
+        Returns:
+            JSONResponse
+        """
+        url = f"{self.base_url}{path}"
+        headers = self._get_auth_headers(request)
+        headers["Content-Type"] = "application/json"
+        
+        logger.info(f"[AIChat代理] JSON 请求: {url}")
+        
+        try:
+            async with httpx.AsyncClient(timeout=30) as client:
+                response = await client.post(
+                    url,
+                    content=request_body,
+                    headers=headers
+                )
+                
+                # 转发响应
+                return JSONResponse(
+                    content=response.json(),
+                    status_code=response.status_code
+                )
+                
+        except httpx.TimeoutException:
+            logger.error("[AIChat代理] JSON 请求超时")
+            return JSONResponse(
+                content={"success": False, "message": "AIChat服务请求超时"},
+                status_code=504
+            )
+        except Exception as e:
+            logger.error(f"[AIChat代理] JSON 请求异常: {e}")
+            return JSONResponse(
+                content={"success": False, "message": f"AIChat服务异常: {str(e)}"},
+                status_code=500
+            )
+    
+    async def health_check(self) -> bool:
+        """
+        检查 aichat 服务健康状态
+        
+        Returns:
+            True 表示服务可用,False 表示不可用
+        """
+        try:
+            async with httpx.AsyncClient(timeout=5) as client:
+                response = await client.get(f"{self.base_url}/health")
+                return response.status_code == 200
+        except Exception as e:
+            logger.warning(f"[AIChat代理] 健康检查失败: {e}")
+            return False
+
+
+# 全局代理实例
+aichat_proxy = AIChatProxy()

+ 299 - 0
shudao-chat-py/tests/test_report_compat.md

@@ -0,0 +1,299 @@
+# 报告兼容接口测试文档
+
+## 概述
+测试 shudao-chat-py 中新增的报告兼容接口,这些接口完全对齐 Go 版本的实现。
+
+## 测试环境
+- 基础URL: `http://127.0.0.1:22000`
+- AIChat服务: `http://127.0.0.1:28002`
+
+## 1. 完整报告生成流程(SSE)
+
+### 1.1 使用外部 Token(代理到 aichat)
+
+```bash
+curl -X POST "http://127.0.0.1:22000/apiv1/report/complete-flow" \
+  -H "Content-Type: application/json" \
+  -H "token: YOUR_EXTERNAL_TOKEN" \
+  -d '{
+    "user_question": "请帮我生成一份关于安全生产的报告",
+    "window_size": 5,
+    "n_results": 5,
+    "ai_conversation_id": null,
+    "is_network_search_enabled": false,
+    "enable_online_model": false
+  }'
+```
+
+**预期行为**:
+- 请求被代理到 `http://127.0.0.1:28002/api/v1/report/complete-flow`
+- 返回 SSE 流式响应
+- 日志显示 `[报告兼容] 代理到 aichat 服务`
+
+### 1.2 使用本地 Token(降级到本地流式聊天)
+
+```bash
+curl -X POST "http://127.0.0.1:22000/apiv1/report/complete-flow" \
+  -H "Content-Type: application/json" \
+  -H "token: YOUR_LOCAL_TOKEN" \
+  -d '{
+    "user_question": "请帮我生成一份关于安全生产的报告",
+    "window_size": 5,
+    "n_results": 5,
+    "ai_conversation_id": null,
+    "is_network_search_enabled": false,
+    "enable_online_model": false
+  }'
+```
+
+**预期行为**:
+- 降级到本地流式聊天接口 `/apiv1/stream/chat-with-db`
+- 返回 SSE 流式响应
+- 日志显示 `[报告兼容] 降级到本地流式聊天`
+
+### 1.3 空问题验证
+
+```bash
+curl -X POST "http://127.0.0.1:22000/apiv1/report/complete-flow" \
+  -H "Content-Type: application/json" \
+  -H "token: YOUR_TOKEN" \
+  -d '{
+    "user_question": "",
+    "window_size": 5,
+    "n_results": 5
+  }'
+```
+
+**预期响应**:
+```
+data: {"type": "online_error", "message": "Question cannot be empty"}
+
+data: {"type": "completed"}
+```
+
+## 2. 更新 AI 消息
+
+### 2.1 使用外部 Token(代理到 aichat)
+
+```bash
+curl -X POST "http://127.0.0.1:22000/apiv1/report/update-ai-message" \
+  -H "Content-Type: application/json" \
+  -H "token: YOUR_EXTERNAL_TOKEN" \
+  -d '{
+    "ai_message_id": 123,
+    "content": "更新后的消息内容"
+  }'
+```
+
+**预期响应**:
+```json
+{
+  "success": true,
+  "message": "AI message updated"
+}
+```
+
+### 2.2 使用本地 Token(本地处理)
+
+```bash
+curl -X POST "http://127.0.0.1:22000/apiv1/report/update-ai-message" \
+  -H "Content-Type: application/json" \
+  -H "token: YOUR_LOCAL_TOKEN" \
+  -d '{
+    "ai_message_id": 123,
+    "content": "更新后的消息内容"
+  }'
+```
+
+**预期行为**:
+- 直接更新本地数据库中的 AI 消息
+- 返回成功响应
+
+### 2.3 无效 ID 验证
+
+```bash
+curl -X POST "http://127.0.0.1:22000/apiv1/report/update-ai-message" \
+  -H "Content-Type: application/json" \
+  -H "token: YOUR_TOKEN" \
+  -d '{
+    "ai_message_id": 0,
+    "content": "测试内容"
+  }'
+```
+
+**预期响应**:
+```json
+{
+  "success": false,
+  "message": "ai_message_id cannot be empty"
+}
+```
+
+## 3. 停止 SSE 流
+
+### 3.1 使用外部 Token(代理到 aichat)
+
+```bash
+curl -X POST "http://127.0.0.1:22000/apiv1/sse/stop" \
+  -H "Content-Type: application/json" \
+  -H "token: YOUR_EXTERNAL_TOKEN" \
+  -d '{
+    "ai_conversation_id": 456
+  }'
+```
+
+**预期响应**:
+```json
+{
+  "success": true,
+  "message": "Stop request received",
+  "ai_conversation_id": 456
+}
+```
+
+### 3.2 使用本地 Token(本地处理)
+
+```bash
+curl -X POST "http://127.0.0.1:22000/apiv1/sse/stop" \
+  -H "Content-Type: application/json" \
+  -H "token: YOUR_LOCAL_TOKEN" \
+  -d '{
+    "ai_conversation_id": 456
+  }'
+```
+
+**预期行为**:
+- 本地简单返回成功响应
+- 不实际执行停止操作(因为本地流式聊天没有停止机制)
+
+## 4. Token 识别测试
+
+### 4.1 本地 Token 特征
+本地生成的 Token 应包含以下字段之一:
+- `account`
+- `username`
+
+### 4.2 外部 Token 特征
+外部系统的 Token 不包含上述字段,会被识别为外部 Token
+
+### 4.3 测试 Token 识别
+
+```python
+# Python 测试脚本
+import jwt
+
+# 本地 Token 示例
+local_token_payload = {
+    "account": "test_account",
+    "username": "test_user",
+    "exp": 1234567890
+}
+
+# 外部 Token 示例
+external_token_payload = {
+    "user_id": "external_user",
+    "exp": 1234567890
+}
+
+# 生成 Token(不验证签名)
+local_token = jwt.encode(local_token_payload, "secret", algorithm="HS256")
+external_token = jwt.encode(external_token_payload, "secret", algorithm="HS256")
+
+print(f"本地 Token: {local_token}")
+print(f"外部 Token: {external_token}")
+```
+
+## 5. 完整流程测试
+
+### 5.1 外部用户完整流程
+
+1. 使用外部 Token 发起报告生成请求
+2. 请求被代理到 aichat 服务
+3. 接收 SSE 流式响应
+4. 更新 AI 消息(如需要)
+5. 停止 SSE 流(如需要)
+
+### 5.2 本地用户完整流程
+
+1. 使用本地 Token 发起报告生成请求
+2. 降级到本地流式聊天
+3. 接收 SSE 流式响应
+4. 更新本地数据库中的 AI 消息(如需要)
+
+## 6. 错误处理测试
+
+### 6.1 AIChat 服务不可用
+
+```bash
+# 停止 aichat 服务后测试
+curl -X POST "http://127.0.0.1:22000/apiv1/report/complete-flow" \
+  -H "Content-Type: application/json" \
+  -H "token: YOUR_EXTERNAL_TOKEN" \
+  -d '{
+    "user_question": "测试问题"
+  }'
+```
+
+**预期行为**:
+- 代理失败后自动降级到本地流式聊天
+- 日志显示 `[报告兼容] 代理到 aichat 失败`
+- 日志显示 `[报告兼容] 降级到本地流式聊天`
+
+### 6.2 请求体解析错误
+
+```bash
+curl -X POST "http://127.0.0.1:22000/apiv1/report/complete-flow" \
+  -H "Content-Type: application/json" \
+  -H "token: YOUR_TOKEN" \
+  -d 'invalid json'
+```
+
+**预期响应**:
+```
+data: {"type": "online_error", "message": "Request parse error: ..."}
+
+data: {"type": "completed"}
+```
+
+## 7. 性能测试
+
+### 7.1 并发请求测试
+
+```bash
+# 使用 Apache Bench 进行并发测试
+ab -n 100 -c 10 -p request.json -T application/json \
+  -H "token: YOUR_TOKEN" \
+  http://127.0.0.1:22000/apiv1/report/complete-flow
+```
+
+### 7.2 长时间流式响应测试
+
+测试长时间运行的 SSE 流是否稳定,不会中断或超时。
+
+## 8. 日志验证
+
+检查日志文件,确认以下关键日志:
+
+```
+[Token验证] 识别为本地 token: test_user
+[Token验证] 不是本地 token 格式
+[报告兼容] 代理到 aichat 服务
+[报告兼容] 降级到本地流式聊天
+[AIChat代理] SSE 请求: http://127.0.0.1:28002/api/v1/report/complete-flow
+[报告兼容] 代理到 aichat 失败: ...
+```
+
+## 9. 接口对齐验证
+
+确认以下接口与 Go 版本完全一致:
+
+- ✅ `/apiv1/report/complete-flow` - POST - SSE 流式响应
+- ✅ `/apiv1/report/update-ai-message` - POST - JSON 响应
+- ✅ `/apiv1/sse/stop` - POST - JSON 响应
+
+## 10. 注意事项
+
+1. 确保 aichat 服务在 `http://127.0.0.1:28002` 运行
+2. 本地 Token 和外部 Token 的识别逻辑需要根据实际 Token 结构调整
+3. 降级机制确保即使 aichat 服务不可用,本地用户仍可正常使用
+4. SSE 流式响应需要客户端支持 EventSource 或类似机制

+ 6 - 0
shudao-chat-py/utils/config.py

@@ -64,6 +64,11 @@ class OSSConfig(BaseSettings):
     parse_encrypt_key: str
 
 
+class AIChatConfig(BaseSettings):
+    api_url: str = "http://127.0.0.1:28002/api/v1"
+    timeout: int = 600
+
+
 class Settings:
     def __init__(self, config_path: str = "config.yaml"):
         # 获取项目根目录
@@ -90,6 +95,7 @@ class Settings:
         self.dify = DifyConfig(**config_data.get('dify', {}))
         self.auth = AuthConfig(**config_data.get('auth', {}))
         self.oss = OSSConfig(**config_data.get('oss', {}))
+        self.aichat = AIChatConfig(**config_data.get('aichat', {}))
         self.base_url = config_data.get('base_url', 'https://aqai.shudaodsj.com:22000')
 
 

+ 43 - 79
shudao-chat-py/utils/token.py

@@ -1,91 +1,55 @@
-from typing import Optional
-from pydantic import BaseModel
-import httpx
+"""
+本地 Token 验证工具
+用于区分本地生成的 token 和外部系统的 token
+"""
 import jwt
-from .config import settings
-from .logger import logger
-
-# 本地JWT密钥(与auth.py保持一致)
-LOCAL_JWT_SECRET = "shudao-local-jwt-secret-2024"
-
-
-class TokenUserInfo(BaseModel):
-    user_id: int
-    username: str
-    account: str
-    role: str = "user"  # 默认角色为user
+from typing import Optional
+from utils.logger import logger
 
 
-def verify_local_token(token: str) -> Optional[TokenUserInfo]:
-    """验证本地JWT Token"""
+def verify_local_token(token: str) -> Optional[dict]:
+    """
+    验证是否为本地生成的 token
+    
+    Args:
+        token: JWT token 字符串
+        
+    Returns:
+        如果是本地 token 返回解码后的数据,否则返回 None
+    """
+    if not token:
+        return None
+    
     try:
-        payload = jwt.decode(token, LOCAL_JWT_SECRET, algorithms=["HS256"])
+        # 尝试解码 token(不验证签名,只检查格式)
+        # 本地 token 应该包含特定的字段,如 account, username 等
+        decoded = jwt.decode(token, options={"verify_signature": False})
         
-        # 检查是否是本地token
-        if payload.get("source") != "local":
-            return None
+        # 检查是否包含本地 token 的特征字段
+        # 根据实际的 token 结构调整
+        if "account" in decoded or "username" in decoded:
+            logger.info(f"[Token验证] 识别为本地 token: {decoded.get('username', 'unknown')}")
+            return decoded
         
-        logger.info(f"本地Token验证成功: user_id={payload.get('user_id')}, username={payload.get('username')}")
+        logger.info("[Token验证] 不是本地 token 格式")
+        return None
         
-        return TokenUserInfo(
-            user_id=payload.get("user_id"),
-            username=payload.get("username", ""),
-            account=payload.get("username", ""),  # 本地登录使用username作为account
-            role=payload.get("role", "user")  # 从JWT中提取角色
-        )
-    except jwt.ExpiredSignatureError:
-        logger.warning("本地Token已过期")
+    except jwt.DecodeError:
+        logger.info("[Token验证] Token 解码失败,不是有效的 JWT")
         return None
-    except jwt.InvalidTokenError as e:
-        logger.debug(f"本地Token验证失败: {e}")
+    except Exception as e:
+        logger.warning(f"[Token验证] Token 验证异常: {e}")
         return None
 
 
-async def verify_token(token: str) -> Optional[TokenUserInfo]:
-    """验证Token并返回用户信息(支持本地JWT和外部认证)"""
-    if not token:
-        return None
-    
-    # 优先尝试本地JWT验证
-    local_user = verify_local_token(token)
-    if local_user:
-        return local_user
-    
-    # 尝试外部认证服务器验证
-    try:
-        async with httpx.AsyncClient() as client:
-            # 调用 shudao-4Aserver 的验证接口
-            response = await client.post(
-                settings.auth.api_url,
-                json={"token": token},
-                timeout=10.0
-            )
-            
-            logger.info(f"Token验证请求: {settings.auth.api_url}")
-            logger.info(f"Token验证响应状态: {response.status_code}")
-            
-            if response.status_code == 200:
-                data = response.json()
-                logger.info(f"Token验证响应数据: {data}")
-                
-                # 检查是否有效
-                if not data.get("valid", False):
-                    logger.warning("Token无效")
-                    return None
-                
-                # 适配 shudao-4Aserver 的字段名
-                return TokenUserInfo(
-                    user_id=hash(data.get("accountID", "")) % 1000000,  # 临时生成数字ID
-                    username=data.get("name", ""),
-                    account=data.get("accountID", ""),
-                    role=data.get("role", "user")  # 外部认证的角色
-                )
-    except Exception as e:
-        logger.error(f"外部Token验证失败: {e}")
+def is_local_token(token: str) -> bool:
+    """
+    判断是否为本地 token
     
-    return None
-
-
-async def get_user_info_from_token(token: str) -> Optional[TokenUserInfo]:
-    """从Token获取用户信息"""
-    return await verify_token(token)
+    Args:
+        token: JWT token 字符串
+        
+    Returns:
+        True 表示本地 token,False 表示外部 token
+    """
+    return verify_local_token(token) is not None