| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- from fastapi import APIRouter, Depends, Request
- from sqlalchemy.orm import Session
- from pydantic import BaseModel
- from database import get_db, get_unix
- from models.tracking import TrackingRecord, ApiPathMapping
- import uuid
- router = APIRouter()
- class TrackingRequest(BaseModel):
- api_path: str
- api_name: str = ""
- @router.post("/tracking/record")
- async def record_tracking(
- request: Request,
- data: TrackingRequest,
- db: Session = Depends(get_db)
- ):
- """记录埋点"""
- user = request.state.user
-
- # 获取客户端IP
- client_ip = request.client.host if request.client else ""
-
- # 生成请求ID
- request_id = str(uuid.uuid4())
-
- # 创建埋点记录
- record = TrackingRecord(
- user_id=user.userCode,
- api_path=data.api_path,
- api_name=data.api_name,
- method=request.method,
- request_id=request_id,
- ip_address=client_ip,
- created_at=get_unix()
- )
-
- db.add(record)
- db.commit()
-
- return {"statusCode": 200, "msg": "记录成功", "data": {"request_id": request_id}}
- @router.get("/tracking/records")
- async def get_tracking_records(
- request: Request,
- limit: int = 100,
- db: Session = Depends(get_db)
- ):
- """获取埋点记录"""
- user = request.state.user
-
- records = db.query(TrackingRecord).filter(
- TrackingRecord.user_id == user.userCode
- ).order_by(TrackingRecord.created_at.desc()).limit(limit).all()
-
- return {
- "statusCode": 200,
- "msg": "success",
- "data": [
- {
- "id": r.id,
- "api_path": r.api_path,
- "api_name": r.api_name,
- "created_at": r.created_at
- }
- for r in records
- ]
- }
- class ApiMappingRequest(BaseModel):
- api_path: str
- api_name: str
- api_desc: str = ""
- @router.post("/tracking/api_mapping")
- async def add_api_mapping(
- request: Request,
- data: ApiMappingRequest,
- db: Session = Depends(get_db)
- ):
- """添加API映射"""
- user = request.state.user
- if not user:
- return {"statusCode": 401, "msg": "未授权"}
-
- # 检查是否已存在
- existing = db.query(ApiPathMapping).filter(
- ApiPathMapping.api_path == data.api_path
- ).first()
-
- if existing:
- return {"statusCode": 400, "msg": "API映射已存在"}
-
- mapping = ApiPathMapping(
- api_path=data.api_path,
- api_name=data.api_name,
- api_desc=data.api_desc,
- created_at=get_unix()
- )
- db.add(mapping)
- db.commit()
- db.refresh(mapping)
-
- return {
- "statusCode": 200,
- "msg": "添加成功",
- "data": {"id": mapping.id}
- }
- @router.get("/tracking/api_mappings")
- async def get_api_mappings(
- request: Request,
- db: Session = Depends(get_db)
- ):
- """获取API映射"""
- user = request.state.user
- if not user:
- return {"statusCode": 401, "msg": "未授权"}
-
- mappings = db.query(ApiPathMapping).all()
-
- return {
- "statusCode": 200,
- "msg": "success",
- "data": [
- {
- "id": m.id,
- "api_path": m.api_path,
- "api_name": m.api_name,
- "api_desc": m.api_desc,
- "status": m.status
- }
- for m in mappings
- ]
- }
- # get_user_data_id 已移至 routers/total.py,避免路由重复
|