| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- """
- 管理员数据统计服务
- 提供多维度的数据统计和分析,包括用户统计、API调用统计、模型使用统计等
- """
- import json
- import hashlib
- from datetime import date, datetime, timedelta
- from typing import Optional, List, Dict, Any
- import logging
- from sqlalchemy import func
- from sqlalchemy.orm import Session
- from app.models.user import User
- from app.models.api_call_log import ApiCallLog
- logger = logging.getLogger(__name__)
- # 统计缓存 TTL(秒)
- _STATS_CACHE_TTL = 300 # 5 分钟
- def _stats_cache_key(method: str, **kwargs) -> str:
- params = json.dumps(kwargs, default=str, sort_keys=True)
- return f"admin_stats:{method}:{hashlib.md5(params.encode()).hexdigest()}"
- def _get_sync_redis():
- try:
- from app.core.redis import redis_manager
- return redis_manager.get_sync_client()
- except Exception:
- return None
- def _cache_get(key: str):
- r = _get_sync_redis()
- if not r:
- return None
- try:
- raw = r.get(key)
- return json.loads(raw) if raw else None
- except Exception:
- return None
- def _cache_set(key: str, value, ttl: int = _STATS_CACHE_TTL):
- r = _get_sync_redis()
- if not r:
- return
- try:
- r.setex(key, ttl, json.dumps(value, default=str))
- except Exception as e:
- logger.debug(f"统计缓存写入失败: {e}")
- def stats_cached(ttl: int = _STATS_CACHE_TTL):
- import functools
- def decorator(fn):
- @functools.wraps(fn)
- async def wrapper(self, *args, **kwargs):
- import inspect
- sig = inspect.signature(fn)
- bound = sig.bind(self, *args, **kwargs)
- bound.apply_defaults()
- params = {k: v for k, v in bound.arguments.items() if k != "self"}
- key = _stats_cache_key(fn.__name__, **params)
- cached = _cache_get(key)
- if cached is not None:
- return cached
- result = await fn(self, *args, **kwargs)
- _cache_set(key, result, ttl)
- return result
- return wrapper
- return decorator
- class AdminStatsService:
- """管理员数据统计服务类"""
- def __init__(self, db: Session):
- self.db = db
- @stats_cached()
- async def get_user_overview(
- self,
- start_date: Optional[date] = None,
- end_date: Optional[date] = None
- ) -> Dict[str, Any]:
- """获取用户统计概览"""
- new_users_query = self.db.query(func.count(User.id))
- if start_date:
- new_users_query = new_users_query.filter(func.date(User.created_at) >= start_date)
- if end_date:
- new_users_query = new_users_query.filter(func.date(User.created_at) <= end_date)
- new_users = new_users_query.scalar() or 0
- total_users = self.db.query(func.count(User.id)).scalar() or 0
- # 活跃用户数:有API调用记录的用户
- active_query = self.db.query(func.count(func.distinct(ApiCallLog.user_id)))
- if start_date:
- active_query = active_query.filter(func.date(ApiCallLog.created_at) >= start_date)
- if end_date:
- active_query = active_query.filter(func.date(ApiCallLog.created_at) <= end_date)
- active_users = active_query.scalar() or 0
- retention_rate = (active_users / new_users * 100) if new_users > 0 else 0
- return {
- "new_users": new_users,
- "total_users": total_users,
- "active_users": active_users,
- "retention_rate": round(retention_rate, 2)
- }
- @stats_cached()
- async def get_user_growth_trend(
- self,
- start_date: date,
- end_date: date
- ) -> List[Dict[str, Any]]:
- """获取用户增长趋势"""
- new_users_by_date = self.db.query(
- func.date(User.created_at).label('date'),
- func.count(User.id).label('count')
- ).filter(
- func.date(User.created_at) >= start_date,
- func.date(User.created_at) <= end_date
- ).group_by(func.date(User.created_at)).all()
- # 活跃用户:有API调用的用户
- active_users_by_date = self.db.query(
- func.date(ApiCallLog.created_at).label('date'),
- func.count(func.distinct(ApiCallLog.user_id)).label('count')
- ).filter(
- func.date(ApiCallLog.created_at) >= start_date,
- func.date(ApiCallLog.created_at) <= end_date
- ).group_by(func.date(ApiCallLog.created_at)).all()
- date_range = []
- current = start_date
- while current <= end_date:
- date_range.append(current)
- current += timedelta(days=1)
- new_users_dict = {item.date: item.count for item in new_users_by_date}
- active_users_dict = {item.date: item.count for item in active_users_by_date}
- return [
- {
- "date": d.isoformat(),
- "new_users": new_users_dict.get(d, 0),
- "active_users": active_users_dict.get(d, 0)
- }
- for d in date_range
- ]
- @stats_cached()
- async def get_business_overview(
- self,
- start_date: Optional[date] = None,
- end_date: Optional[date] = None
- ) -> Dict[str, Any]:
- """获取 API 调用统计概览"""
- query = self.db.query(func.count(ApiCallLog.id))
- if start_date:
- query = query.filter(func.date(ApiCallLog.created_at) >= start_date)
- if end_date:
- query = query.filter(func.date(ApiCallLog.created_at) <= end_date)
- total_calls = query.scalar() or 0
- success_query = self.db.query(func.count(ApiCallLog.id)).filter(ApiCallLog.status == 'success')
- if start_date:
- success_query = success_query.filter(func.date(ApiCallLog.created_at) >= start_date)
- if end_date:
- success_query = success_query.filter(func.date(ApiCallLog.created_at) <= end_date)
- success_calls = success_query.scalar() or 0
- return {
- "total_calls": total_calls,
- "success_calls": success_calls,
- "failed_calls": total_calls - success_calls,
- }
- @stats_cached()
- async def get_business_trend(
- self,
- start_date: date,
- end_date: date
- ) -> List[Dict[str, Any]]:
- """获取 API 调用趋势"""
- calls_by_date = self.db.query(
- func.date(ApiCallLog.created_at).label('date'),
- func.count(ApiCallLog.id).label('count')
- ).filter(
- func.date(ApiCallLog.created_at) >= start_date,
- func.date(ApiCallLog.created_at) <= end_date
- ).group_by(func.date(ApiCallLog.created_at)).all()
- date_range = []
- current = start_date
- while current <= end_date:
- date_range.append(current)
- current += timedelta(days=1)
- calls_dict = {item.date: item.count for item in calls_by_date}
- return [
- {
- "date": d.isoformat(),
- "api_calls": calls_dict.get(d, 0),
- }
- for d in date_range
- ]
- @stats_cached()
- async def get_model_usage_ranking(
- self,
- start_date: Optional[date] = None,
- end_date: Optional[date] = None,
- top_n: int = 10
- ) -> List[Dict[str, Any]]:
- """获取模型使用排行(基于 API 调用日志)"""
- query = self.db.query(
- ApiCallLog.model_name,
- func.count(ApiCallLog.id).label('count')
- )
- if start_date:
- query = query.filter(func.date(ApiCallLog.created_at) >= start_date)
- if end_date:
- query = query.filter(func.date(ApiCallLog.created_at) <= end_date)
- query = query.filter(ApiCallLog.model_name.isnot(None))
- query = query.group_by(ApiCallLog.model_name)
- query = query.order_by(func.count(ApiCallLog.id).desc())
- query = query.limit(top_n)
- results = query.all()
- total_count = sum(r.count for r in results)
- return [
- {
- "model_name": r.model_name,
- "usage_count": r.count,
- "percentage": round((r.count / total_count * 100) if total_count > 0 else 0, 2)
- }
- for r in results
- ]
- @stats_cached()
- async def get_dashboard_metrics(self) -> Dict[str, Any]:
- """获取仪表盘核心指标"""
- today = date.today()
- yesterday = today - timedelta(days=1)
- seven_days_ago = today - timedelta(days=7)
- # 今日新增用户
- today_new_users = self.db.query(func.count(User.id)).filter(
- func.date(User.created_at) == today
- ).scalar() or 0
- yesterday_new_users = self.db.query(func.count(User.id)).filter(
- func.date(User.created_at) == yesterday
- ).scalar() or 0
- today_new_users_growth = (
- ((today_new_users - yesterday_new_users) / yesterday_new_users * 100)
- if yesterday_new_users > 0 else 0.0
- )
- total_users = self.db.query(func.count(User.id)).scalar() or 0
- # 活跃用户数(近7日有API调用的用户)
- active_users = self.db.query(func.count(func.distinct(ApiCallLog.user_id))).filter(
- func.date(ApiCallLog.created_at) >= seven_days_ago
- ).scalar() or 0
- # API调用量(今日)
- today_api_calls = self.db.query(func.count(ApiCallLog.id)).filter(
- func.date(ApiCallLog.created_at) == today
- ).scalar() or 0
- return {
- "today_new_users": today_new_users,
- "today_new_users_growth": round(today_new_users_growth, 2),
- "total_users": total_users,
- "active_users": active_users,
- "today_api_calls": today_api_calls,
- }
|