tracking.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. from fastapi import APIRouter, Depends, Request
  2. from sqlalchemy.orm import Session
  3. from pydantic import BaseModel
  4. from database import get_db, get_unix
  5. from models.tracking import TrackingRecord, ApiPathMapping
  6. import uuid
  7. router = APIRouter()
  8. class TrackingRequest(BaseModel):
  9. api_path: str
  10. api_name: str = ""
  11. @router.post("/tracking/record")
  12. async def record_tracking(
  13. request: Request,
  14. data: TrackingRequest,
  15. db: Session = Depends(get_db)
  16. ):
  17. """记录埋点"""
  18. user = request.state.user
  19. # 获取客户端IP
  20. client_ip = request.client.host if request.client else ""
  21. # 生成请求ID
  22. request_id = str(uuid.uuid4())
  23. # 创建埋点记录
  24. record = TrackingRecord(
  25. user_id=user.userCode,
  26. api_path=data.api_path,
  27. api_name=data.api_name,
  28. method=request.method,
  29. request_id=request_id,
  30. ip_address=client_ip,
  31. created_at=get_unix()
  32. )
  33. db.add(record)
  34. db.commit()
  35. return {"statusCode": 200, "msg": "记录成功", "data": {"request_id": request_id}}
  36. @router.get("/tracking/records")
  37. async def get_tracking_records(
  38. request: Request,
  39. limit: int = 100,
  40. db: Session = Depends(get_db)
  41. ):
  42. """获取埋点记录"""
  43. user = request.state.user
  44. records = db.query(TrackingRecord).filter(
  45. TrackingRecord.user_id == user.userCode
  46. ).order_by(TrackingRecord.created_at.desc()).limit(limit).all()
  47. return {
  48. "statusCode": 200,
  49. "msg": "success",
  50. "data": [
  51. {
  52. "id": r.id,
  53. "api_path": r.api_path,
  54. "api_name": r.api_name,
  55. "created_at": r.created_at
  56. }
  57. for r in records
  58. ]
  59. }
  60. class ApiMappingRequest(BaseModel):
  61. api_path: str
  62. api_name: str
  63. api_desc: str = ""
  64. @router.post("/tracking/api_mapping")
  65. async def add_api_mapping(
  66. request: Request,
  67. data: ApiMappingRequest,
  68. db: Session = Depends(get_db)
  69. ):
  70. """添加API映射"""
  71. user = request.state.user
  72. if not user:
  73. return {"statusCode": 401, "msg": "未授权"}
  74. # 检查是否已存在
  75. existing = db.query(ApiPathMapping).filter(
  76. ApiPathMapping.api_path == data.api_path
  77. ).first()
  78. if existing:
  79. return {"statusCode": 400, "msg": "API映射已存在"}
  80. mapping = ApiPathMapping(
  81. api_path=data.api_path,
  82. api_name=data.api_name,
  83. api_desc=data.api_desc,
  84. created_at=get_unix()
  85. )
  86. db.add(mapping)
  87. db.commit()
  88. db.refresh(mapping)
  89. return {
  90. "statusCode": 200,
  91. "msg": "添加成功",
  92. "data": {"id": mapping.id}
  93. }
  94. @router.get("/tracking/api_mappings")
  95. async def get_api_mappings(
  96. request: Request,
  97. db: Session = Depends(get_db)
  98. ):
  99. """获取API映射"""
  100. user = request.state.user
  101. if not user:
  102. return {"statusCode": 401, "msg": "未授权"}
  103. mappings = db.query(ApiPathMapping).all()
  104. return {
  105. "statusCode": 200,
  106. "msg": "success",
  107. "data": [
  108. {
  109. "id": m.id,
  110. "api_path": m.api_path,
  111. "api_name": m.api_name,
  112. "api_desc": m.api_desc,
  113. "status": m.status
  114. }
  115. for m in mappings
  116. ]
  117. }
  118. # get_user_data_id 已移至 routers/total.py,避免路由重复