from fastapi import APIRouter, Depends, Request from sqlalchemy.orm import Session from pydantic import BaseModel from database import get_db, get_unix from models.tracking import TrackingRecord, ApiPathMapping import uuid router = APIRouter() class TrackingRequest(BaseModel): api_path: str api_name: str = "" def _get_tracking_user_id(user) -> str: """统一获取埋点所需的字符串用户标识。""" if not user: return "" return str( getattr(user, "userCode", "") or getattr(user, "user_code", "") or getattr(user, "account", "") or getattr(user, "account_id", "") or getattr(user, "user_id", "") or "" ) @router.post("/tracking/record") async def record_tracking( request: Request, data: TrackingRequest, db: Session = Depends(get_db) ): """记录埋点""" user = request.state.user user_id = _get_tracking_user_id(user) if not user_id: return {"statusCode": 401, "msg": "未授权"} # 获取客户端IP client_ip = request.client.host if request.client else "" # 生成请求ID request_id = str(uuid.uuid4()) # 创建埋点记录 record = TrackingRecord( user_id=user_id, api_path=data.api_path, api_name=data.api_name, method=request.method, request_id=request_id, ip_address=client_ip, created_at=get_unix() ) db.add(record) db.commit() return {"statusCode": 200, "msg": "记录成功", "data": {"request_id": request_id}} @router.get("/tracking/records") async def get_tracking_records( request: Request, limit: int = 100, db: Session = Depends(get_db) ): """获取埋点记录""" user = request.state.user user_id = _get_tracking_user_id(user) if not user_id: return {"statusCode": 401, "msg": "未授权"} records = db.query(TrackingRecord).filter( TrackingRecord.user_id == user_id ).order_by(TrackingRecord.created_at.desc()).limit(limit).all() return { "statusCode": 200, "msg": "success", "data": [ { "id": r.id, "api_path": r.api_path, "api_name": r.api_name, "created_at": r.created_at } for r in records ] } class ApiMappingRequest(BaseModel): api_path: str api_name: str api_desc: str = "" @router.post("/tracking/api_mapping") async def add_api_mapping( request: Request, data: ApiMappingRequest, db: Session = Depends(get_db) ): """添加API映射""" user = request.state.user if not user: return {"statusCode": 401, "msg": "未授权"} # 检查是否已存在 existing = db.query(ApiPathMapping).filter( ApiPathMapping.api_path == data.api_path ).first() if existing: return {"statusCode": 400, "msg": "API映射已存在"} mapping = ApiPathMapping( api_path=data.api_path, api_name=data.api_name, api_desc=data.api_desc, created_at=get_unix() ) db.add(mapping) db.commit() db.refresh(mapping) return { "statusCode": 200, "msg": "添加成功", "data": {"id": mapping.id} } @router.get("/tracking/api_mappings") async def get_api_mappings( request: Request, db: Session = Depends(get_db) ): """获取API映射""" user = request.state.user if not user: return {"statusCode": 401, "msg": "未授权"} mappings = db.query(ApiPathMapping).all() return { "statusCode": 200, "msg": "success", "data": [ { "id": m.id, "api_path": m.api_path, "api_name": m.api_name, "api_desc": m.api_desc, "status": m.status } for m in mappings ] } # get_user_data_id 已移至 routers/total.py,避免路由重复