GO_OPTIMIZATION_PLAN.md 28 KB

shudao-chat-go Go 代码优化方案

项目概况

  • 项目名称: shudao-chat-go
  • 框架: Beego v2
  • 语言: Go
  • 端口: 22000
  • 当前状态: 运行中的遗留代码(屎山)
  • 重构目标: 迁移至 Python (FastAPI) → shudao-main-py

一、核心问题分析

1.1 架构设计问题

问题描述

❌ 数据库连接在 init() 中初始化(models/mysql.go)
❌ 全局变量 DB 被整个项目共享
❌ 配置硬编码在 app.conf 中
❌ 缺乏依赖注入机制

影响

  • 测试困难(无法 mock 数据库)
  • 并发安全隐患
  • 配置修改需要重启服务

优化方案

// 当前代码(不推荐)
var DB *gorm.DB
func init() {
    DB, err = gorm.Open(...)
}

// 推荐方案
type DBManager struct {
    db *gorm.DB
}

func NewDBManager(config *Config) (*DBManager, error) {
    db, err := gorm.Open(mysql.Open(config.DSN), &gorm.Config{...})
    if err != nil {
        return nil, err
    }
    return &DBManager{db: db}, nil
}

func (m *DBManager) GetDB() *gorm.DB {
    return m.db
}

1.2 安全性问题

🔴 严重问题:敏感信息泄露

问题代码 (conf/app.conf):

# 数据库密码明文存储
mysqlpass = "88888888"

# API 密钥明文存储
deepseek_api_key = "sk-28625cb3738844e190cee62b2bcb25bf"

# OSS 凭证明文存储
OSS_ACCESS_KEY_ID="fnyfi2f368pbic74d8ll"
OSS_ACCESS_KEY_SECRET="jgqwk7sirqlz2602x2k7yx2eor0vii19wah6ywlv"

风险评估

  • ⚠️ 数据库密码泄露 → 数据库被攻击
  • ⚠️ API 密钥泄露 → 账单爆炸
  • ⚠️ OSS 凭证泄露 → 存储被删除/篡改

优化方案

  1. 环境变量方案

    # .env
    MYSQL_PASSWORD=<encrypted>
    DEEPSEEK_API_KEY=<encrypted>
    OSS_ACCESS_KEY_SECRET=<encrypted>
    
    # 使用 godotenv 加载
    import "github.com/joho/godotenv"
    godotenv.Load()
    password := os.Getenv("MYSQL_PASSWORD")
    
  2. 配置加密方案

    import "github.com/aead/chacha20poly1305"
    
    func DecryptConfig(encryptedData []byte, key []byte) (string, error) {
    aead, _ := chacha20poly1305.NewX(key)
    plaintext, err := aead.Open(nil, nonce, encryptedData, nil)
    return string(plaintext), err
    }
    
  3. Vault 集成方案 (推荐生产环境)

    import "github.com/hashicorp/vault/api"
    
    client, _ := api.NewClient(api.DefaultConfig())
    secret, _ := client.Logical().Read("secret/data/mysql")
    password := secret.Data["password"].(string)
    

1.3 代码组织问题

问题 1: 控制器职责过重

问题代码 (controllers/chat.go):

func (c *ChatController) SendDeepSeekMessage() {
    // 1. 解析请求参数
    // 2. 意图识别(调用第三方 API)
    // 3. ChromaDB 检索
    // 4. 在线搜索
    // 5. 构建提示词
    // 6. 调用大模型
    // 7. 解析响应
    // 8. 替换引用来源
    // 9. 数据库存储
    // 10. 返回结果
}

问题分析

  • 单一方法超过 500 行
  • 违反单一职责原则
  • 难以测试和维护

优化方案

// Service 层拆分
type ChatService struct {
    intentService  *IntentService
    ragService     *RAGService
    llmService     *LLMService
    storageService *StorageService
}

func (s *ChatService) ProcessMessage(ctx context.Context, req *MessageRequest) (*MessageResponse, error) {
    // 1. 意图识别
    intent, _ := s.intentService.Recognize(ctx, req.Message)
    
    // 2. 检索增强
    context, _ := s.ragService.Retrieve(ctx, req.Message, intent)
    
    // 3. 调用 LLM
    response, _ := s.llmService.Generate(ctx, req.Message, context)
    
    // 4. 存储结果
    s.storageService.Save(ctx, req, response)
    
    return response, nil
}

// Controller 只负责 HTTP 处理
func (c *ChatController) SendMessage() {
    var req MessageRequest
    c.BindJSON(&req)
    
    resp, err := c.chatService.ProcessMessage(c.Ctx.Request.Context(), &req)
    if err != nil {
        c.Ctx.Output.SetStatus(500)
        return
    }
    
    c.Data["json"] = resp
    c.ServeJSON()
}

问题 2: 重复代码严重

问题示例

// controllers/chat.go
func (c *ChatController) sendQwen3Message(userMessage string, useStream bool) (string, error) {
    // HTTP 请求代码
}

// controllers/liushi.go
func (c *LiushiController) sendQwen3Message(userMessage string, useStream bool) (string, error) {
    // 几乎相同的代码
}

优化方案

// services/llm_client.go
type LLMClient struct {
    apiURL string
    model  string
}

func (c *LLMClient) SendMessage(ctx context.Context, message string, stream bool) (string, error) {
    // 统一的 HTTP 请求逻辑
}

// 各 Controller 共享
func (c *ChatController) SendMessage() {
    resp, _ := c.llmClient.SendMessage(c.Ctx.Request.Context(), message, false)
}

1.4 性能问题

问题 1: 数据库连接池配置不合理

当前配置 (models/mysql.go):

sqlDB.SetMaxOpenConns(100)    // 最大连接数
sqlDB.SetMaxIdleConns(10)     // 最大空闲连接
sqlDB.SetConnMaxLifetime(time.Hour)
sqlDB.SetConnMaxIdleTime(time.Minute * 30)

问题分析

  • 100 个最大连接对于单机过高
  • MySQL 默认 max_connections = 151,可能耗尽
  • 空闲连接 10 个偏少,频繁创建新连接

优化建议

// 根据实际并发量调整
sqlDB.SetMaxOpenConns(50)     // 降低到 50
sqlDB.SetMaxIdleConns(25)     // 提高空闲连接到 25
sqlDB.SetConnMaxLifetime(time.Minute * 30)  // 缩短生命周期
sqlDB.SetConnMaxIdleTime(time.Minute * 10)  // 缩短空闲时间

监控指标

stats := sqlDB.Stats()
log.Printf("OpenConnections: %d, InUse: %d, Idle: %d", 
    stats.OpenConnections, stats.InUse, stats.Idle)

问题 2: 缺少缓存机制

问题场景

// controllers/total.go
func (c *TotalController) GetRecommendQuestion() {
    // 每次都查询数据库
    var questions []models.RecommendQuestion
    models.DB.Find(&questions)
}

func (c *TotalController) GetFunctionCard() {
    // 每次都查询数据库
    var cards []models.FunctionCard
    models.DB.Find(&cards)
}

问题分析

  • 推荐问题和功能卡片数据几乎不变
  • 高频访问接口
  • 浪费数据库资源

优化方案

  1. 内存缓存 (适合小数据量)

    type Cache struct {
    data map[string]interface{}
    mu   sync.RWMutex
    ttl  time.Duration
    }
    
    func (c *Cache) Get(key string) (interface{}, bool) {
    c.mu.RLock()
    defer c.mu.RUnlock()
    val, ok := c.data[key]
    return val, ok
    }
    
    func (c *TotalController) GetRecommendQuestion() {
    cacheKey := "recommend_questions"
    if val, ok := cache.Get(cacheKey); ok {
        c.Data["json"] = val
        c.ServeJSON()
        return
    }
        
    var questions []models.RecommendQuestion
    models.DB.Find(&questions)
    cache.Set(cacheKey, questions, 1*time.Hour)
    c.Data["json"] = questions
    c.ServeJSON()
    }
    
  2. Redis 缓存 (推荐)

    import "github.com/go-redis/redis/v8"
    
    func (c *TotalController) GetRecommendQuestion() {
    val, err := redisClient.Get(ctx, "recommend_questions").Result()
    if err == nil {
        var questions []models.RecommendQuestion
        json.Unmarshal([]byte(val), &questions)
        c.Data["json"] = questions
        c.ServeJSON()
        return
    }
        
    var questions []models.RecommendQuestion
    models.DB.Find(&questions)
    data, _ := json.Marshal(questions)
    redisClient.Set(ctx, "recommend_questions", data, 1*time.Hour)
    }
    

问题 3: 图片处理性能问题

问题代码 (controllers/hazard.go):

func (c *HazardController) Hazard() {
    // 1. 下载 OSS 图片到内存
    imageData, _ := downloadImageFromOSS(imageURL)
    
    // 2. 解码图片
    img, _ := decodePNGImage(imageData)
    
    // 3. YOLO 识别
    boxes, labels, scores := callYOLOAPI(imageData)
    
    // 4. 绘制边界框和水印
    resultImg := drawBoundingBox(img, boxes, labels, scores, username, account, date)
    
    // 5. 重新编码
    var buf bytes.Buffer
    png.Encode(&buf, resultImg)
    
    // 6. 上传回 OSS
    uploadImageToOSS(buf.Bytes(), fileName)
}

问题分析

  • 图片在内存中多次编解码
  • 大图片(4K+)占用大量内存
  • 没有并发控制,高并发时 OOM

优化方案

  1. 流式处理

    // 使用临时文件减少内存占用
    tmpFile, _ := os.CreateTemp("", "image-*.png")
    defer os.Remove(tmpFile.Name())
    
    // 下载到文件
    io.Copy(tmpFile, ossResponse.Body)
    
    // 使用文件路径处理
    processImageFile(tmpFile.Name())
    
  2. 并发控制

    // 限制同时处理的图片数量
    type ImageProcessor struct {
    semaphore chan struct{}
    }
    
    func NewImageProcessor(maxConcurrent int) *ImageProcessor {
    return &ImageProcessor{
        semaphore: make(chan struct{}, maxConcurrent),
    }
    }
    
    func (p *ImageProcessor) Process(imageURL string) error {
    p.semaphore <- struct{}{}        // 获取令牌
    defer func() { <-p.semaphore }() // 释放令牌
        
    // 处理图片
    return processImage(imageURL)
    }
    
  3. 异步处理队列

    // 使用消息队列异步处理
    type ImageTask struct {
    ImageURL string
    UserID   int
    }
    
    func (c *HazardController) Hazard() {
    task := ImageTask{
        ImageURL: req.ImageURL,
        UserID:   req.UserID,
    }
        
    // 发送到队列
    queueClient.Publish("image-process", task)
        
    // 立即返回任务 ID
    c.Data["json"] = map[string]interface{}{
        "task_id": generateTaskID(),
        "status": "processing",
    }
    c.ServeJSON()
    }
    
    // 后台 Worker 处理
    func imageWorker() {
    for task := range queueClient.Subscribe("image-process") {
        processImageTask(task)
    }
    }
    

1.5 错误处理问题

问题 1: 错误信息不规范

问题代码

// controllers/chat.go
if err != nil {
    c.Ctx.Output.SetStatus(500)
    c.Data["json"] = map[string]interface{}{
        "msg": "失败",  // 错误信息不明确
    }
    c.ServeJSON()
    return
}

// controllers/scene.go
if err != nil {
    c.Data["json"] = map[string]string{
        "error": err.Error(),  // 直接暴露系统错误
    }
    c.ServeJSON()
}

问题分析

  • 错误格式不统一
  • 错误信息过于简略或过于详细
  • 缺少错误码
  • 缺少请求追踪 ID

优化方案

// 定义标准错误响应
type ErrorResponse struct {
    Code      int    `json:"code"`
    Message   string `json:"message"`
    RequestID string `json:"request_id"`
    Details   string `json:"details,omitempty"`
}

// 错误码定义
const (
    ErrCodeInvalidRequest = 40001
    ErrCodeUnauthorized   = 40100
    ErrCodeNotFound       = 40400
    ErrCodeInternalError  = 50000
    ErrCodeDatabaseError  = 50001
    ErrCodeExternalAPI    = 50002
)

// 错误处理中间件
func ErrorHandler(err error, c *beego.Controller) {
    var errResp ErrorResponse
    
    switch e := err.(type) {
    case *ValidationError:
        errResp = ErrorResponse{
            Code:      ErrCodeInvalidRequest,
            Message:   "请求参数错误",
            RequestID: c.Ctx.Request.Header.Get("X-Request-ID"),
            Details:   e.Error(),
        }
        c.Ctx.Output.SetStatus(400)
        
    case *DatabaseError:
        errResp = ErrorResponse{
            Code:      ErrCodeDatabaseError,
            Message:   "数据库错误",
            RequestID: c.Ctx.Request.Header.Get("X-Request-ID"),
            // 不暴露详细错误
        }
        c.Ctx.Output.SetStatus(500)
        log.Error("Database error: %v", e)
        
    default:
        errResp = ErrorResponse{
            Code:      ErrCodeInternalError,
            Message:   "服务器内部错误",
            RequestID: c.Ctx.Request.Header.Get("X-Request-ID"),
        }
        c.Ctx.Output.SetStatus(500)
        log.Error("Unknown error: %v", err)
    }
    
    c.Data["json"] = errResp
    c.ServeJSON()
}

1.6 并发安全问题

问题:全局回调函数中的竞态条件

问题代码 (models/mysql.go):

func setCreateTimeCallback(db *gorm.DB) {
    if _, ok := db.Statement.Schema.FieldsByName["CreatedAt"]; ok {
        now := int(GetUnix())  // 可能被多个 goroutine 同时调用
        db.Statement.SetColumn("CreatedAt", now)
    }
}

问题分析

  • GetUnix() 本身是线程安全的
  • 但多个并发请求可能导致时间戳不一致
  • 建议在事务中确保原子性

优化方案

// 使用数据库时间戳
type BaseModel struct {
    ID        uint      `gorm:"primarykey"`
    CreatedAt time.Time `gorm:"autoCreateTime"`
    UpdatedAt time.Time `gorm:"autoUpdateTime"`
}

// 或者使用 GORM 内置功能
DB, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
    NowFunc: func() time.Time {
        return time.Now().UTC()
    },
})

二、接口实现完整性分析

2.1 Go 版本接口统计

模块 控制器 接口数量 核心功能
聊天 ChatController 17 发送消息、历史记录、意图识别、联网搜索、ChromaDB 检索
场景 SceneController 5 场景管理、识别记录、示例图片、评价
隐患识别 HazardController 2 YOLO 识别、步骤保存
文件管理 OssController + ShudaoOssController 5 文件上传、图片压缩、OSS 解析
通用 TotalController 8 推荐问题、反馈、政策文件、统计
考试 ExamController + PromptController 3 题目生成、提示词构建
知识库 ChromaController 1 高级搜索
流式 LiushiController 2 SSE 流式输出
埋点 TrackingController 4 行为追踪、统计分析
前端 FrontendController 4 测试页面

总计: 51 个接口


2.2 Python 版本实现进度

模块 文件 已实现接口 完成度
聊天 routers/chat.py 4/17 24%
通用 routers/common.py 4/8 50%
场景 routers/scene.py 5/5 100% ✅
文件 routers/oss.py 2/5 40%
埋点 routers/tracking.py 1/4 25%

总体完成度: 16/51 = 31.4%


2.3 缺失的高优先级接口

🔴 P0 (必须实现)

  1. SendDeepSeekMessage (chat.go)

    • 核心聊天接口
    • 集成意图识别、RAG、LLM 调用
    • 依赖:IntentRecognition, GetChromaDBDocument, OnlineSearch
  2. StreamChatWithDB (liushi.go)

    • SSE 流式聊天
    • 实时返回 AI 回复
    • 保存对话历史
  3. Hazard (hazard.go)

    • YOLO 图像识别
    • 边界框绘制
    • 水印添加
  4. AdvancedSearch (chroma.go)

    • 知识库高级检索
    • 与 shudao-aichat 服务集成

🟡 P1 (重要功能)

  1. GuessYouWant (chat.go)

    • 智能推荐问题
    • 提升用户体验
  2. OnlineSearch (chat.go)

    • 联网搜索
    • 搜索结果摘要
  3. UploadImage (shudaooss.go)

    • 图片压缩上传
    • 支持多种格式
  4. BuildExamPrompt (prompt.go)

    • 考试题目生成
    • 提示词构建

三、重构优先级建议

阶段 1: 基础设施优化 (1-2 周)

✅ 配置管理重构
   - 环境变量分离
   - 敏感信息加密
   - 配置热重载

✅ 数据库层重构
   - 连接池优化
   - 依赖注入
   - 事务管理

✅ 日志系统
   - 结构化日志
   - 日志等级
   - 请求追踪

✅ 错误处理
   - 统一错误响应
   - 错误码规范
   - Panic 恢复

阶段 2: 核心接口实现 (3-4 周)

✅ P0 接口迁移
   - SendDeepSeekMessage
   - StreamChatWithDB
   - Hazard
   - AdvancedSearch

✅ 服务层拆分
   - IntentService
   - RAGService
   - LLMService
   - ImageService

✅ 缓存层
   - Redis 集成
   - 缓存策略

阶段 3: 性能优化 (1-2 周)

✅ 异步处理
   - 消息队列
   - 后台任务
   - 定时任务

✅ 并发控制
   - 限流
   - 熔断
   - 降级

✅ 监控告警
   - Prometheus
   - Grafana
   - 日志聚合

阶段 4: 功能完善 (2-3 周)

✅ P1 接口实现
   - GuessYouWant
   - OnlineSearch
   - 考试模块

✅ 单元测试
   - 覆盖率 > 80%
   - 集成测试

✅ 文档补全
   - API 文档
   - 部署文档

四、迁移风险评估

4.1 技术风险

风险项 风险等级 影响 缓解措施
YOLO API 兼容性 🟡 中 图像识别失败 提前测试,准备降级方案
ChromaDB 连接 🟡 中 知识库检索异常 连接池管理,超时重试
流式响应稳定性 🟡 中 聊天体验下降 SSE 心跳机制,断线重连
数据库迁移 🔴 高 数据丢失/损坏 充分备份,灰度发布
并发性能 🟡 中 高负载下崩溃 压力测试,弹性伸缩

4.2 业务风险

风险项 影响 缓解措施
接口不兼容 前端调用失败 保持接口签名一致
响应格式变化 前端解析错误 严格遵守现有格式
性能下降 用户体验变差 性能对比测试
功能缺失 业务中断 分批迁移,灰度发布

五、Python 重构建议

5.1 技术栈选型

# 已选择 ✅
FastAPI      # Web 框架
SQLAlchemy   # ORM
Pydantic     # 数据验证
httpx        # HTTP 客户端

# 建议补充
redis        # 缓存
celery       # 异步任务
prometheus-client  # 监控
python-multipart   # 文件上传
Pillow       # 图像处理

5.2 项目结构建议

shudao-main-py/
├── config.yaml          # 配置文件
├── main.py             # 入口文件
├── requirements.txt    # 依赖
├── .env.example        # 环境变量模板
├── core/
│   ├── config.py       # ✅ 已实现
│   ├── database.py     # ✅ 已实现
│   ├── auth.py         # ✅ 已实现
│   ├── cache.py        # ❌ 缺失:Redis 缓存
│   ├── logger.py       # ❌ 缺失:日志配置
│   └── exceptions.py   # ❌ 缺失:自定义异常
├── models/
│   ├── __init__.py     # ✅ 已实现
│   ├── base.py         # ✅ 已实现
│   ├── chat.py         # ✅ 已实现
│   ├── scene.py        # ✅ 已实现
│   ├── user.py         # ✅ 已实现
│   └── exam.py         # ❌ 缺失:考试模型
├── routers/
│   ├── __init__.py     # ✅ 已实现
│   ├── chat.py         # ⚠️ 部分实现
│   ├── scene.py        # ✅ 已实现
│   ├── oss.py          # ⚠️ 部分实现
│   ├── exam.py         # ❌ 缺失:考试路由
│   └── stream.py       # ❌ 缺失:流式路由
├── services/
│   ├── __init__.py
│   ├── intent_service.py    # ❌ 缺失
│   ├── rag_service.py       # ❌ 缺失
│   ├── llm_service.py       # ❌ 缺失
│   ├── image_service.py     # ❌ 缺失
│   ├── search_service.py    # ❌ 缺失
│   └── storage_service.py   # ❌ 缺失
├── utils/
│   ├── image_processor.py   # ❌ 缺失
│   ├── prompt_builder.py    # ❌ 缺失
│   └── validators.py        # ❌ 缺失
└── tests/
    ├── test_chat.py
    ├── test_scene.py
    └── test_hazard.py

5.3 关键代码示例

示例 1: 统一错误处理

# core/exceptions.py
from fastapi import HTTPException, status

class BusinessException(HTTPException):
    def __init__(self, code: int, message: str, details: str = None):
        super().__init__(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail={
                "code": code,
                "message": message,
                "details": details
            }
        )

class DatabaseException(HTTPException):
    def __init__(self, message: str = "数据库错误"):
        super().__init__(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail={
                "code": 50001,
                "message": message
            }
        )

# 全局异常处理
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    logger.error(f"Unhandled exception: {exc}", exc_info=True)
    return JSONResponse(
        status_code=500,
        content={
            "code": 50000,
            "message": "服务器内部错误",
            "request_id": request.headers.get("X-Request-ID")
        }
    )

示例 2: 依赖注入

# core/dependencies.py
from fastapi import Depends
from sqlalchemy.orm import Session
from redis import Redis

def get_db() -> Session:
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

def get_redis() -> Redis:
    return redis_client

def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: Session = Depends(get_db)
) -> User:
    # 验证 token
    payload = verify_token(token)
    user = db.query(User).filter(User.id == payload["user_id"]).first()
    if not user:
        raise HTTPException(status_code=401, detail="用户不存在")
    return user

# 使用
@router.post("/send_message")
async def send_message(
    request: MessageRequest,
    db: Session = Depends(get_db),
    redis: Redis = Depends(get_redis),
    user: User = Depends(get_current_user)
):
    # 业务逻辑
    pass

示例 3: 服务层实现

# services/llm_service.py
import httpx
from typing import AsyncIterator

class LLMService:
    def __init__(self, api_url: str, model: str):
        self.api_url = api_url
        self.model = model
        self.client = httpx.AsyncClient(timeout=60.0)
    
    async def generate(
        self, 
        message: str, 
        context: str = None,
        stream: bool = False
    ) -> str | AsyncIterator[str]:
        payload = {
            "model": self.model,
            "messages": [
                {"role": "system", "content": context or ""},
                {"role": "user", "content": message}
            ],
            "stream": stream
        }
        
        if stream:
            return self._stream_generate(payload)
        else:
            response = await self.client.post(
                f"{self.api_url}/v1/chat/completions",
                json=payload
            )
            response.raise_for_status()
            return response.json()["choices"][0]["message"]["content"]
    
    async def _stream_generate(self, payload: dict) -> AsyncIterator[str]:
        async with self.client.stream(
            "POST",
            f"{self.api_url}/v1/chat/completions",
            json=payload
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith("data: "):
                    data = json.loads(line[6:])
                    if content := data["choices"][0]["delta"].get("content"):
                        yield content

示例 4: 流式响应

# routers/stream.py
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from services.llm_service import LLMService

router = APIRouter()

@router.post("/stream/chat")
async def stream_chat(request: ChatRequest):
    async def event_generator():
        async for chunk in llm_service.generate(
            message=request.message,
            context=request.context,
            stream=True
        ):
            yield f"data: {json.dumps({'content': chunk})}\n\n"
        
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )

六、测试策略

6.1 单元测试

# tests/test_llm_service.py
import pytest
from services.llm_service import LLMService

@pytest.fixture
def llm_service():
    return LLMService(
        api_url="http://test.com",
        model="test-model"
    )

@pytest.mark.asyncio
async def test_generate_success(llm_service, httpx_mock):
    httpx_mock.add_response(
        json={
            "choices": [{"message": {"content": "Hello"}}]
        }
    )
    
    result = await llm_service.generate("Hi")
    assert result == "Hello"

@pytest.mark.asyncio
async def test_generate_stream(llm_service, httpx_mock):
    httpx_mock.add_response(
        text="data: {\"choices\":[{\"delta\":{\"content\":\"H\"}}]}\n\n"
    )
    
    chunks = []
    async for chunk in llm_service.generate("Hi", stream=True):
        chunks.append(chunk)
    
    assert chunks == ["H"]

6.2 集成测试

# tests/test_chat_api.py
from fastapi.testclient import TestClient
from main import app

client = TestClient(app)

def test_send_message():
    response = client.post(
        "/apiv1/send_message",
        json={
            "message": "测试消息",
            "user_id": 1
        },
        headers={"Authorization": "Bearer test_token"}
    )
    
    assert response.status_code == 200
    assert "reply" in response.json()

def test_stream_chat():
    with client.stream(
        "POST",
        "/apiv1/stream/chat",
        json={"message": "测试"}
    ) as response:
        chunks = list(response.iter_lines())
        assert len(chunks) > 0

七、部署建议

7.1 Docker 化

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    libpq-dev \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# 安装 Python 依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY . .

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:8000/health || exit 1

# 启动服务
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'

services:
  app:
    build: .
    ports:
      - "22000:8000"
    environment:
      - DATABASE_URL=mysql://user:pass@db:3306/shudao
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - db
      - redis
    volumes:
      - ./logs:/app/logs
  
  db:
    image: mysql:8.0
    environment:
      MYSQL_DATABASE: shudao
      MYSQL_ROOT_PASSWORD: ${DB_PASSWORD}
    volumes:
      - mysql_data:/var/lib/mysql
  
  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data

volumes:
  mysql_data:
  redis_data:

7.2 监控配置

# main.py
from prometheus_client import Counter, Histogram, make_asgi_app

# 指标定义
REQUEST_COUNT = Counter(
    "http_requests_total",
    "Total HTTP requests",
    ["method", "endpoint", "status"]
)

REQUEST_DURATION = Histogram(
    "http_request_duration_seconds",
    "HTTP request duration",
    ["method", "endpoint"]
)

# 中间件
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    duration = time.time() - start_time
    
    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()
    
    REQUEST_DURATION.labels(
        method=request.method,
        endpoint=request.url.path
    ).observe(duration)
    
    return response

# 暴露指标
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)

八、总结与建议

当前 Go 代码主要问题

  1. 安全性 ⚠️

    • 敏感信息明文存储(最严重)
    • 缺少输入验证
    • 错误信息泄露
  2. 架构设计 ⚠️

    • 全局变量滥用
    • 缺少依赖注入
    • 控制器职责过重
  3. 代码质量 ⚠️

    • 重复代码严重
    • 缺少单元测试
    • 文档不完善
  4. 性能优化 ⚠️

    • 缺少缓存
    • 图片处理效率低
    • 没有并发控制

Python 重构优先级

立即执行 (P0):

  1. 环境变量配置 + 敏感信息加密
  2. 核心聊天接口实现
  3. 流式响应支持
  4. 图像识别模块

尽快完成 (P1):

  1. 缓存层(Redis)
  2. 服务层拆分
  3. 错误处理规范
  4. 单元测试覆盖

持续优化 (P2):

  1. 性能监控
  2. 异步任务队列
  3. API 文档
  4. 部署自动化

风险控制建议

  • ✅ 灰度发布:先迁移 10% 流量
  • ✅ 接口兼容性测试:确保前端无需改动
  • ✅ 性能对比测试:Python 版本不低于 Go 版本
  • ✅ 回滚方案:保留 Go 版本至少 1 个月
  • ✅ 监控告警:及时发现问题

预估时间线: 8-12 周完成完整迁移

人力需求: 2-3 名后端开发 + 1 名测试

建议开始时间: 立即启动基础设施优化,并行进行核心接口开发