domains.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. from fastapi import APIRouter, Depends, HTTPException, Query
  2. from pydantic import BaseModel
  3. from sqlalchemy import select, func
  4. from sqlalchemy.ext.asyncio import AsyncSession
  5. from app.database import get_db
  6. from app.models.domain import MonitoredDomain
  7. from app.models.monitoring import SuperAdmin
  8. from app.schemas.domain import (
  9. MonitoredDomainCreate,
  10. MonitoredDomainResponse,
  11. )
  12. from app.services.domain_fetch import fetch_domain_transactions
  13. from app.redis import get_redis
  14. router = APIRouter(prefix="/api/domains", tags=["domains"])
  15. @router.post("/", response_model=MonitoredDomainResponse, status_code=201)
  16. async def add_domain(
  17. payload: MonitoredDomainCreate,
  18. db: AsyncSession = Depends(get_db),
  19. ):
  20. """添加需要监控的域名,同时创建对应的超管记录"""
  21. existing = await db.execute(
  22. select(MonitoredDomain).where(MonitoredDomain.domain == payload.domain)
  23. )
  24. if existing.scalar_one_or_none():
  25. raise HTTPException(status_code=409, detail="域名已在监控中")
  26. # 如果未指定超管,自动创建一条超管记录
  27. sa_id = payload.super_admin_id
  28. if sa_id is None:
  29. max_id_result = await db.execute(select(func.max(SuperAdmin.id)))
  30. max_id = max_id_result.scalar() or 0
  31. new_sa = SuperAdmin(
  32. id=max_id + 1,
  33. username=payload.domain,
  34. nickname=payload.domain,
  35. remark=payload.remark or None,
  36. )
  37. db.add(new_sa)
  38. await db.flush()
  39. sa_id = new_sa.id
  40. record = MonitoredDomain(domain=payload.domain, remark=payload.remark or None, super_admin_id=sa_id)
  41. db.add(record)
  42. await db.commit()
  43. await db.refresh(record)
  44. return record
  45. @router.get("/", response_model=list[MonitoredDomainResponse])
  46. async def list_domains(db: AsyncSession = Depends(get_db)):
  47. """获取所有已监控的域名列表"""
  48. result = await db.execute(select(MonitoredDomain))
  49. return result.scalars().all()
  50. class MonitoredDomainUpdate(BaseModel):
  51. """更新域名备注"""
  52. remark: str = ""
  53. @router.patch("/{domain_id}", response_model=MonitoredDomainResponse)
  54. async def update_domain_remark(
  55. domain_id: int,
  56. payload: MonitoredDomainUpdate,
  57. db: AsyncSession = Depends(get_db),
  58. ):
  59. """更新域名备注,并同步到关联的超管"""
  60. result = await db.execute(
  61. select(MonitoredDomain).where(MonitoredDomain.id == domain_id)
  62. )
  63. record = result.scalar_one_or_none()
  64. if not record:
  65. raise HTTPException(status_code=404, detail="域名不存在")
  66. record.remark = payload.remark or None
  67. # 同步到关联的超管
  68. if record.super_admin_id:
  69. sa_result = await db.execute(
  70. select(SuperAdmin).where(SuperAdmin.id == record.super_admin_id)
  71. )
  72. sa = sa_result.scalar_one_or_none()
  73. if sa:
  74. sa.remark = payload.remark or None
  75. await db.commit()
  76. await db.refresh(record)
  77. return record
  78. @router.delete("/{domain_id}", status_code=204)
  79. async def remove_domain(domain_id: int, db: AsyncSession = Depends(get_db)):
  80. """移除指定 ID 的监控域名"""
  81. result = await db.execute(
  82. select(MonitoredDomain).where(MonitoredDomain.id == domain_id)
  83. )
  84. record = result.scalar_one_or_none()
  85. if not record:
  86. raise HTTPException(status_code=404, detail="域名不存在")
  87. await db.delete(record)
  88. await db.commit()
  89. @router.get("/{domain_id}/transactions")
  90. async def get_domain_transactions(
  91. domain_id: int,
  92. fetch_date: str | None = Query(None, description="爬取指定日期,格式 YYYY-MM-DD,不传则查全部"),
  93. db: AsyncSession = Depends(get_db),
  94. ):
  95. """爬取指定域名的监控数据并入库"""
  96. result = await db.execute(
  97. select(MonitoredDomain).where(MonitoredDomain.id == domain_id)
  98. )
  99. record = result.scalar_one_or_none()
  100. if not record:
  101. raise HTTPException(status_code=404, detail="域名不存在")
  102. data = await fetch_domain_transactions(record.domain, db, fetch_date=fetch_date)
  103. return {"status": "ok", "domain": record.domain, "data": data}
  104. @router.post("/fetch-all", status_code=202)
  105. async def fetch_all_transactions(
  106. fetch_date: str | None = Query(None, description="爬取指定日期,格式 YYYY-MM-DD,不传则爬取当天"),
  107. db: AsyncSession = Depends(get_db),
  108. ):
  109. """批量爬取所有已启用域名的监控数据,默认只爬取当天"""
  110. if not fetch_date:
  111. from datetime import datetime, timezone, timedelta
  112. CST = timezone(timedelta(hours=8))
  113. fetch_date = datetime.now(CST).strftime("%Y-%m-%d")
  114. result = await db.execute(
  115. select(MonitoredDomain).where(MonitoredDomain.is_active == True)
  116. )
  117. domains = result.scalars().all()
  118. errors = []
  119. for d in domains:
  120. try:
  121. await fetch_domain_transactions(d.domain, db, fetch_date=fetch_date)
  122. except Exception as e:
  123. errors.append({"domain": d.domain, "error": str(e)})
  124. return {"status": "ok", "total": len(domains), "errors": errors}
  125. class ScheduleConfigUpdate(BaseModel):
  126. """更新定时爬取配置"""
  127. enabled: bool
  128. schedule_time: str # HH:MM
  129. @router.get("/schedule")
  130. async def get_schedule_config():
  131. """获取定时爬取配置(Redis)"""
  132. r = await get_redis()
  133. enabled = await r.get("fetch_schedule:enabled")
  134. schedule_time = await r.get("fetch_schedule:time")
  135. return {
  136. "enabled": enabled == "true",
  137. "schedule_time": schedule_time or "02:00",
  138. }
  139. @router.put("/schedule")
  140. async def update_schedule_config(payload: ScheduleConfigUpdate):
  141. """更新定时爬取配置(Redis)"""
  142. r = await get_redis()
  143. await r.set("fetch_schedule:enabled", "true" if payload.enabled else "false")
  144. await r.set("fetch_schedule:time", payload.schedule_time)
  145. return {"message": "配置已保存", "enabled": payload.enabled, "schedule_time": payload.schedule_time}