❌ 数据库连接在 init() 中初始化(models/mysql.go)
❌ 全局变量 DB 被整个项目共享
❌ 配置硬编码在 app.conf 中
❌ 缺乏依赖注入机制
影响:
优化方案:
// 当前代码(不推荐)
var DB *gorm.DB
func init() {
DB, err = gorm.Open(...)
}
// 推荐方案
type DBManager struct {
db *gorm.DB
}
func NewDBManager(config *Config) (*DBManager, error) {
db, err := gorm.Open(mysql.Open(config.DSN), &gorm.Config{...})
if err != nil {
return nil, err
}
return &DBManager{db: db}, nil
}
func (m *DBManager) GetDB() *gorm.DB {
return m.db
}
问题代码 (conf/app.conf):
# 数据库密码明文存储
mysqlpass = "88888888"
# API 密钥明文存储
deepseek_api_key = "sk-28625cb3738844e190cee62b2bcb25bf"
# OSS 凭证明文存储
OSS_ACCESS_KEY_ID="fnyfi2f368pbic74d8ll"
OSS_ACCESS_KEY_SECRET="jgqwk7sirqlz2602x2k7yx2eor0vii19wah6ywlv"
风险评估:
优化方案:
环境变量方案
# .env
MYSQL_PASSWORD=<encrypted>
DEEPSEEK_API_KEY=<encrypted>
OSS_ACCESS_KEY_SECRET=<encrypted>
# 使用 godotenv 加载
import "github.com/joho/godotenv"
godotenv.Load()
password := os.Getenv("MYSQL_PASSWORD")
配置加密方案
import "github.com/aead/chacha20poly1305"
func DecryptConfig(encryptedData []byte, key []byte) (string, error) {
aead, _ := chacha20poly1305.NewX(key)
plaintext, err := aead.Open(nil, nonce, encryptedData, nil)
return string(plaintext), err
}
Vault 集成方案 (推荐生产环境)
import "github.com/hashicorp/vault/api"
client, _ := api.NewClient(api.DefaultConfig())
secret, _ := client.Logical().Read("secret/data/mysql")
password := secret.Data["password"].(string)
问题代码 (controllers/chat.go):
func (c *ChatController) SendDeepSeekMessage() {
// 1. 解析请求参数
// 2. 意图识别(调用第三方 API)
// 3. ChromaDB 检索
// 4. 在线搜索
// 5. 构建提示词
// 6. 调用大模型
// 7. 解析响应
// 8. 替换引用来源
// 9. 数据库存储
// 10. 返回结果
}
问题分析:
优化方案:
// Service 层拆分
type ChatService struct {
intentService *IntentService
ragService *RAGService
llmService *LLMService
storageService *StorageService
}
func (s *ChatService) ProcessMessage(ctx context.Context, req *MessageRequest) (*MessageResponse, error) {
// 1. 意图识别
intent, _ := s.intentService.Recognize(ctx, req.Message)
// 2. 检索增强
context, _ := s.ragService.Retrieve(ctx, req.Message, intent)
// 3. 调用 LLM
response, _ := s.llmService.Generate(ctx, req.Message, context)
// 4. 存储结果
s.storageService.Save(ctx, req, response)
return response, nil
}
// Controller 只负责 HTTP 处理
func (c *ChatController) SendMessage() {
var req MessageRequest
c.BindJSON(&req)
resp, err := c.chatService.ProcessMessage(c.Ctx.Request.Context(), &req)
if err != nil {
c.Ctx.Output.SetStatus(500)
return
}
c.Data["json"] = resp
c.ServeJSON()
}
问题示例:
// controllers/chat.go
func (c *ChatController) sendQwen3Message(userMessage string, useStream bool) (string, error) {
// HTTP 请求代码
}
// controllers/liushi.go
func (c *LiushiController) sendQwen3Message(userMessage string, useStream bool) (string, error) {
// 几乎相同的代码
}
优化方案:
// services/llm_client.go
type LLMClient struct {
apiURL string
model string
}
func (c *LLMClient) SendMessage(ctx context.Context, message string, stream bool) (string, error) {
// 统一的 HTTP 请求逻辑
}
// 各 Controller 共享
func (c *ChatController) SendMessage() {
resp, _ := c.llmClient.SendMessage(c.Ctx.Request.Context(), message, false)
}
当前配置 (models/mysql.go):
sqlDB.SetMaxOpenConns(100) // 最大连接数
sqlDB.SetMaxIdleConns(10) // 最大空闲连接
sqlDB.SetConnMaxLifetime(time.Hour)
sqlDB.SetConnMaxIdleTime(time.Minute * 30)
问题分析:
优化建议:
// 根据实际并发量调整
sqlDB.SetMaxOpenConns(50) // 降低到 50
sqlDB.SetMaxIdleConns(25) // 提高空闲连接到 25
sqlDB.SetConnMaxLifetime(time.Minute * 30) // 缩短生命周期
sqlDB.SetConnMaxIdleTime(time.Minute * 10) // 缩短空闲时间
监控指标:
stats := sqlDB.Stats()
log.Printf("OpenConnections: %d, InUse: %d, Idle: %d",
stats.OpenConnections, stats.InUse, stats.Idle)
问题场景:
// controllers/total.go
func (c *TotalController) GetRecommendQuestion() {
// 每次都查询数据库
var questions []models.RecommendQuestion
models.DB.Find(&questions)
}
func (c *TotalController) GetFunctionCard() {
// 每次都查询数据库
var cards []models.FunctionCard
models.DB.Find(&cards)
}
问题分析:
优化方案:
内存缓存 (适合小数据量)
type Cache struct {
data map[string]interface{}
mu sync.RWMutex
ttl time.Duration
}
func (c *Cache) Get(key string) (interface{}, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
val, ok := c.data[key]
return val, ok
}
func (c *TotalController) GetRecommendQuestion() {
cacheKey := "recommend_questions"
if val, ok := cache.Get(cacheKey); ok {
c.Data["json"] = val
c.ServeJSON()
return
}
var questions []models.RecommendQuestion
models.DB.Find(&questions)
cache.Set(cacheKey, questions, 1*time.Hour)
c.Data["json"] = questions
c.ServeJSON()
}
Redis 缓存 (推荐)
import "github.com/go-redis/redis/v8"
func (c *TotalController) GetRecommendQuestion() {
val, err := redisClient.Get(ctx, "recommend_questions").Result()
if err == nil {
var questions []models.RecommendQuestion
json.Unmarshal([]byte(val), &questions)
c.Data["json"] = questions
c.ServeJSON()
return
}
var questions []models.RecommendQuestion
models.DB.Find(&questions)
data, _ := json.Marshal(questions)
redisClient.Set(ctx, "recommend_questions", data, 1*time.Hour)
}
问题代码 (controllers/hazard.go):
func (c *HazardController) Hazard() {
// 1. 下载 OSS 图片到内存
imageData, _ := downloadImageFromOSS(imageURL)
// 2. 解码图片
img, _ := decodePNGImage(imageData)
// 3. YOLO 识别
boxes, labels, scores := callYOLOAPI(imageData)
// 4. 绘制边界框和水印
resultImg := drawBoundingBox(img, boxes, labels, scores, username, account, date)
// 5. 重新编码
var buf bytes.Buffer
png.Encode(&buf, resultImg)
// 6. 上传回 OSS
uploadImageToOSS(buf.Bytes(), fileName)
}
问题分析:
优化方案:
流式处理
// 使用临时文件减少内存占用
tmpFile, _ := os.CreateTemp("", "image-*.png")
defer os.Remove(tmpFile.Name())
// 下载到文件
io.Copy(tmpFile, ossResponse.Body)
// 使用文件路径处理
processImageFile(tmpFile.Name())
并发控制
// 限制同时处理的图片数量
type ImageProcessor struct {
semaphore chan struct{}
}
func NewImageProcessor(maxConcurrent int) *ImageProcessor {
return &ImageProcessor{
semaphore: make(chan struct{}, maxConcurrent),
}
}
func (p *ImageProcessor) Process(imageURL string) error {
p.semaphore <- struct{}{} // 获取令牌
defer func() { <-p.semaphore }() // 释放令牌
// 处理图片
return processImage(imageURL)
}
异步处理队列
// 使用消息队列异步处理
type ImageTask struct {
ImageURL string
UserID int
}
func (c *HazardController) Hazard() {
task := ImageTask{
ImageURL: req.ImageURL,
UserID: req.UserID,
}
// 发送到队列
queueClient.Publish("image-process", task)
// 立即返回任务 ID
c.Data["json"] = map[string]interface{}{
"task_id": generateTaskID(),
"status": "processing",
}
c.ServeJSON()
}
// 后台 Worker 处理
func imageWorker() {
for task := range queueClient.Subscribe("image-process") {
processImageTask(task)
}
}
问题代码:
// controllers/chat.go
if err != nil {
c.Ctx.Output.SetStatus(500)
c.Data["json"] = map[string]interface{}{
"msg": "失败", // 错误信息不明确
}
c.ServeJSON()
return
}
// controllers/scene.go
if err != nil {
c.Data["json"] = map[string]string{
"error": err.Error(), // 直接暴露系统错误
}
c.ServeJSON()
}
问题分析:
优化方案:
// 定义标准错误响应
type ErrorResponse struct {
Code int `json:"code"`
Message string `json:"message"`
RequestID string `json:"request_id"`
Details string `json:"details,omitempty"`
}
// 错误码定义
const (
ErrCodeInvalidRequest = 40001
ErrCodeUnauthorized = 40100
ErrCodeNotFound = 40400
ErrCodeInternalError = 50000
ErrCodeDatabaseError = 50001
ErrCodeExternalAPI = 50002
)
// 错误处理中间件
func ErrorHandler(err error, c *beego.Controller) {
var errResp ErrorResponse
switch e := err.(type) {
case *ValidationError:
errResp = ErrorResponse{
Code: ErrCodeInvalidRequest,
Message: "请求参数错误",
RequestID: c.Ctx.Request.Header.Get("X-Request-ID"),
Details: e.Error(),
}
c.Ctx.Output.SetStatus(400)
case *DatabaseError:
errResp = ErrorResponse{
Code: ErrCodeDatabaseError,
Message: "数据库错误",
RequestID: c.Ctx.Request.Header.Get("X-Request-ID"),
// 不暴露详细错误
}
c.Ctx.Output.SetStatus(500)
log.Error("Database error: %v", e)
default:
errResp = ErrorResponse{
Code: ErrCodeInternalError,
Message: "服务器内部错误",
RequestID: c.Ctx.Request.Header.Get("X-Request-ID"),
}
c.Ctx.Output.SetStatus(500)
log.Error("Unknown error: %v", err)
}
c.Data["json"] = errResp
c.ServeJSON()
}
问题代码 (models/mysql.go):
func setCreateTimeCallback(db *gorm.DB) {
if _, ok := db.Statement.Schema.FieldsByName["CreatedAt"]; ok {
now := int(GetUnix()) // 可能被多个 goroutine 同时调用
db.Statement.SetColumn("CreatedAt", now)
}
}
问题分析:
GetUnix() 本身是线程安全的优化方案:
// 使用数据库时间戳
type BaseModel struct {
ID uint `gorm:"primarykey"`
CreatedAt time.Time `gorm:"autoCreateTime"`
UpdatedAt time.Time `gorm:"autoUpdateTime"`
}
// 或者使用 GORM 内置功能
DB, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
NowFunc: func() time.Time {
return time.Now().UTC()
},
})
| 模块 | 控制器 | 接口数量 | 核心功能 |
|---|---|---|---|
| 聊天 | ChatController | 17 | 发送消息、历史记录、意图识别、联网搜索、ChromaDB 检索 |
| 场景 | SceneController | 5 | 场景管理、识别记录、示例图片、评价 |
| 隐患识别 | HazardController | 2 | YOLO 识别、步骤保存 |
| 文件管理 | OssController + ShudaoOssController | 5 | 文件上传、图片压缩、OSS 解析 |
| 通用 | TotalController | 8 | 推荐问题、反馈、政策文件、统计 |
| 考试 | ExamController + PromptController | 3 | 题目生成、提示词构建 |
| 知识库 | ChromaController | 1 | 高级搜索 |
| 流式 | LiushiController | 2 | SSE 流式输出 |
| 埋点 | TrackingController | 4 | 行为追踪、统计分析 |
| 前端 | FrontendController | 4 | 测试页面 |
总计: 51 个接口
| 模块 | 文件 | 已实现接口 | 完成度 |
|---|---|---|---|
| 聊天 | routers/chat.py | 4/17 | 24% |
| 通用 | routers/common.py | 4/8 | 50% |
| 场景 | routers/scene.py | 5/5 | 100% ✅ |
| 文件 | routers/oss.py | 2/5 | 40% |
| 埋点 | routers/tracking.py | 1/4 | 25% |
总体完成度: 16/51 = 31.4%
SendDeepSeekMessage (chat.go)
StreamChatWithDB (liushi.go)
Hazard (hazard.go)
AdvancedSearch (chroma.go)
GuessYouWant (chat.go)
OnlineSearch (chat.go)
UploadImage (shudaooss.go)
BuildExamPrompt (prompt.go)
✅ 配置管理重构
- 环境变量分离
- 敏感信息加密
- 配置热重载
✅ 数据库层重构
- 连接池优化
- 依赖注入
- 事务管理
✅ 日志系统
- 结构化日志
- 日志等级
- 请求追踪
✅ 错误处理
- 统一错误响应
- 错误码规范
- Panic 恢复
✅ P0 接口迁移
- SendDeepSeekMessage
- StreamChatWithDB
- Hazard
- AdvancedSearch
✅ 服务层拆分
- IntentService
- RAGService
- LLMService
- ImageService
✅ 缓存层
- Redis 集成
- 缓存策略
✅ 异步处理
- 消息队列
- 后台任务
- 定时任务
✅ 并发控制
- 限流
- 熔断
- 降级
✅ 监控告警
- Prometheus
- Grafana
- 日志聚合
✅ P1 接口实现
- GuessYouWant
- OnlineSearch
- 考试模块
✅ 单元测试
- 覆盖率 > 80%
- 集成测试
✅ 文档补全
- API 文档
- 部署文档
| 风险项 | 风险等级 | 影响 | 缓解措施 |
|---|---|---|---|
| YOLO API 兼容性 | 🟡 中 | 图像识别失败 | 提前测试,准备降级方案 |
| ChromaDB 连接 | 🟡 中 | 知识库检索异常 | 连接池管理,超时重试 |
| 流式响应稳定性 | 🟡 中 | 聊天体验下降 | SSE 心跳机制,断线重连 |
| 数据库迁移 | 🔴 高 | 数据丢失/损坏 | 充分备份,灰度发布 |
| 并发性能 | 🟡 中 | 高负载下崩溃 | 压力测试,弹性伸缩 |
| 风险项 | 影响 | 缓解措施 |
|---|---|---|
| 接口不兼容 | 前端调用失败 | 保持接口签名一致 |
| 响应格式变化 | 前端解析错误 | 严格遵守现有格式 |
| 性能下降 | 用户体验变差 | 性能对比测试 |
| 功能缺失 | 业务中断 | 分批迁移,灰度发布 |
# 已选择 ✅
FastAPI # Web 框架
SQLAlchemy # ORM
Pydantic # 数据验证
httpx # HTTP 客户端
# 建议补充
redis # 缓存
celery # 异步任务
prometheus-client # 监控
python-multipart # 文件上传
Pillow # 图像处理
shudao-main-py/
├── config.yaml # 配置文件
├── main.py # 入口文件
├── requirements.txt # 依赖
├── .env.example # 环境变量模板
├── core/
│ ├── config.py # ✅ 已实现
│ ├── database.py # ✅ 已实现
│ ├── auth.py # ✅ 已实现
│ ├── cache.py # ❌ 缺失:Redis 缓存
│ ├── logger.py # ❌ 缺失:日志配置
│ └── exceptions.py # ❌ 缺失:自定义异常
├── models/
│ ├── __init__.py # ✅ 已实现
│ ├── base.py # ✅ 已实现
│ ├── chat.py # ✅ 已实现
│ ├── scene.py # ✅ 已实现
│ ├── user.py # ✅ 已实现
│ └── exam.py # ❌ 缺失:考试模型
├── routers/
│ ├── __init__.py # ✅ 已实现
│ ├── chat.py # ⚠️ 部分实现
│ ├── scene.py # ✅ 已实现
│ ├── oss.py # ⚠️ 部分实现
│ ├── exam.py # ❌ 缺失:考试路由
│ └── stream.py # ❌ 缺失:流式路由
├── services/
│ ├── __init__.py
│ ├── intent_service.py # ❌ 缺失
│ ├── rag_service.py # ❌ 缺失
│ ├── llm_service.py # ❌ 缺失
│ ├── image_service.py # ❌ 缺失
│ ├── search_service.py # ❌ 缺失
│ └── storage_service.py # ❌ 缺失
├── utils/
│ ├── image_processor.py # ❌ 缺失
│ ├── prompt_builder.py # ❌ 缺失
│ └── validators.py # ❌ 缺失
└── tests/
├── test_chat.py
├── test_scene.py
└── test_hazard.py
# core/exceptions.py
from fastapi import HTTPException, status
class BusinessException(HTTPException):
def __init__(self, code: int, message: str, details: str = None):
super().__init__(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"code": code,
"message": message,
"details": details
}
)
class DatabaseException(HTTPException):
def __init__(self, message: str = "数据库错误"):
super().__init__(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"code": 50001,
"message": message
}
)
# 全局异常处理
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.error(f"Unhandled exception: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"code": 50000,
"message": "服务器内部错误",
"request_id": request.headers.get("X-Request-ID")
}
)
# core/dependencies.py
from fastapi import Depends
from sqlalchemy.orm import Session
from redis import Redis
def get_db() -> Session:
db = SessionLocal()
try:
yield db
finally:
db.close()
def get_redis() -> Redis:
return redis_client
def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> User:
# 验证 token
payload = verify_token(token)
user = db.query(User).filter(User.id == payload["user_id"]).first()
if not user:
raise HTTPException(status_code=401, detail="用户不存在")
return user
# 使用
@router.post("/send_message")
async def send_message(
request: MessageRequest,
db: Session = Depends(get_db),
redis: Redis = Depends(get_redis),
user: User = Depends(get_current_user)
):
# 业务逻辑
pass
# services/llm_service.py
import httpx
from typing import AsyncIterator
class LLMService:
def __init__(self, api_url: str, model: str):
self.api_url = api_url
self.model = model
self.client = httpx.AsyncClient(timeout=60.0)
async def generate(
self,
message: str,
context: str = None,
stream: bool = False
) -> str | AsyncIterator[str]:
payload = {
"model": self.model,
"messages": [
{"role": "system", "content": context or ""},
{"role": "user", "content": message}
],
"stream": stream
}
if stream:
return self._stream_generate(payload)
else:
response = await self.client.post(
f"{self.api_url}/v1/chat/completions",
json=payload
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
async def _stream_generate(self, payload: dict) -> AsyncIterator[str]:
async with self.client.stream(
"POST",
f"{self.api_url}/v1/chat/completions",
json=payload
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
data = json.loads(line[6:])
if content := data["choices"][0]["delta"].get("content"):
yield content
# routers/stream.py
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from services.llm_service import LLMService
router = APIRouter()
@router.post("/stream/chat")
async def stream_chat(request: ChatRequest):
async def event_generator():
async for chunk in llm_service.generate(
message=request.message,
context=request.context,
stream=True
):
yield f"data: {json.dumps({'content': chunk})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
# tests/test_llm_service.py
import pytest
from services.llm_service import LLMService
@pytest.fixture
def llm_service():
return LLMService(
api_url="http://test.com",
model="test-model"
)
@pytest.mark.asyncio
async def test_generate_success(llm_service, httpx_mock):
httpx_mock.add_response(
json={
"choices": [{"message": {"content": "Hello"}}]
}
)
result = await llm_service.generate("Hi")
assert result == "Hello"
@pytest.mark.asyncio
async def test_generate_stream(llm_service, httpx_mock):
httpx_mock.add_response(
text="data: {\"choices\":[{\"delta\":{\"content\":\"H\"}}]}\n\n"
)
chunks = []
async for chunk in llm_service.generate("Hi", stream=True):
chunks.append(chunk)
assert chunks == ["H"]
# tests/test_chat_api.py
from fastapi.testclient import TestClient
from main import app
client = TestClient(app)
def test_send_message():
response = client.post(
"/apiv1/send_message",
json={
"message": "测试消息",
"user_id": 1
},
headers={"Authorization": "Bearer test_token"}
)
assert response.status_code == 200
assert "reply" in response.json()
def test_stream_chat():
with client.stream(
"POST",
"/apiv1/stream/chat",
json={"message": "测试"}
) as response:
chunks = list(response.iter_lines())
assert len(chunks) > 0
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
libpq-dev \
gcc \
&& rm -rf /var/lib/apt/lists/*
# 安装 Python 依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制代码
COPY . .
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# 启动服务
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'
services:
app:
build: .
ports:
- "22000:8000"
environment:
- DATABASE_URL=mysql://user:pass@db:3306/shudao
- REDIS_URL=redis://redis:6379/0
depends_on:
- db
- redis
volumes:
- ./logs:/app/logs
db:
image: mysql:8.0
environment:
MYSQL_DATABASE: shudao
MYSQL_ROOT_PASSWORD: ${DB_PASSWORD}
volumes:
- mysql_data:/var/lib/mysql
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
volumes:
mysql_data:
redis_data:
# main.py
from prometheus_client import Counter, Histogram, make_asgi_app
# 指标定义
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"]
)
REQUEST_DURATION = Histogram(
"http_request_duration_seconds",
"HTTP request duration",
["method", "endpoint"]
)
# 中间件
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_DURATION.labels(
method=request.method,
endpoint=request.url.path
).observe(duration)
return response
# 暴露指标
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
安全性 ⚠️
架构设计 ⚠️
代码质量 ⚠️
性能优化 ⚠️
立即执行 (P0):
尽快完成 (P1):
持续优化 (P2):
预估时间线: 8-12 周完成完整迁移
人力需求: 2-3 名后端开发 + 1 名测试
建议开始时间: 立即启动基础设施优化,并行进行核心接口开发