""" 管理员数据统计服务 提供多维度的数据统计和分析,包括用户统计、API调用统计、模型使用统计等 """ import json import hashlib from datetime import date, datetime, timedelta from typing import Optional, List, Dict, Any import logging from sqlalchemy import func from sqlalchemy.orm import Session from app.models.user import User from app.models.api_call_log import ApiCallLog logger = logging.getLogger(__name__) # 统计缓存 TTL(秒) _STATS_CACHE_TTL = 300 # 5 分钟 def _stats_cache_key(method: str, **kwargs) -> str: params = json.dumps(kwargs, default=str, sort_keys=True) return f"admin_stats:{method}:{hashlib.md5(params.encode()).hexdigest()}" def _get_sync_redis(): try: from app.core.redis import redis_manager return redis_manager.get_sync_client() except Exception: return None def _cache_get(key: str): r = _get_sync_redis() if not r: return None try: raw = r.get(key) return json.loads(raw) if raw else None except Exception: return None def _cache_set(key: str, value, ttl: int = _STATS_CACHE_TTL): r = _get_sync_redis() if not r: return try: r.setex(key, ttl, json.dumps(value, default=str)) except Exception as e: logger.debug(f"统计缓存写入失败: {e}") def stats_cached(ttl: int = _STATS_CACHE_TTL): import functools def decorator(fn): @functools.wraps(fn) async def wrapper(self, *args, **kwargs): import inspect sig = inspect.signature(fn) bound = sig.bind(self, *args, **kwargs) bound.apply_defaults() params = {k: v for k, v in bound.arguments.items() if k != "self"} key = _stats_cache_key(fn.__name__, **params) cached = _cache_get(key) if cached is not None: return cached result = await fn(self, *args, **kwargs) _cache_set(key, result, ttl) return result return wrapper return decorator class AdminStatsService: """管理员数据统计服务类""" def __init__(self, db: Session): self.db = db @stats_cached() async def get_user_overview( self, start_date: Optional[date] = None, end_date: Optional[date] = None ) -> Dict[str, Any]: """获取用户统计概览""" new_users_query = self.db.query(func.count(User.id)) if start_date: new_users_query = new_users_query.filter(func.date(User.created_at) >= start_date) if end_date: new_users_query = new_users_query.filter(func.date(User.created_at) <= end_date) new_users = new_users_query.scalar() or 0 total_users = self.db.query(func.count(User.id)).scalar() or 0 # 活跃用户数:有API调用记录的用户 active_query = self.db.query(func.count(func.distinct(ApiCallLog.user_id))) if start_date: active_query = active_query.filter(func.date(ApiCallLog.created_at) >= start_date) if end_date: active_query = active_query.filter(func.date(ApiCallLog.created_at) <= end_date) active_users = active_query.scalar() or 0 retention_rate = (active_users / new_users * 100) if new_users > 0 else 0 return { "new_users": new_users, "total_users": total_users, "active_users": active_users, "retention_rate": round(retention_rate, 2) } @stats_cached() async def get_user_growth_trend( self, start_date: date, end_date: date ) -> List[Dict[str, Any]]: """获取用户增长趋势""" new_users_by_date = self.db.query( func.date(User.created_at).label('date'), func.count(User.id).label('count') ).filter( func.date(User.created_at) >= start_date, func.date(User.created_at) <= end_date ).group_by(func.date(User.created_at)).all() # 活跃用户:有API调用的用户 active_users_by_date = self.db.query( func.date(ApiCallLog.created_at).label('date'), func.count(func.distinct(ApiCallLog.user_id)).label('count') ).filter( func.date(ApiCallLog.created_at) >= start_date, func.date(ApiCallLog.created_at) <= end_date ).group_by(func.date(ApiCallLog.created_at)).all() date_range = [] current = start_date while current <= end_date: date_range.append(current) current += timedelta(days=1) new_users_dict = {item.date: item.count for item in new_users_by_date} active_users_dict = {item.date: item.count for item in active_users_by_date} return [ { "date": d.isoformat(), "new_users": new_users_dict.get(d, 0), "active_users": active_users_dict.get(d, 0) } for d in date_range ] @stats_cached() async def get_business_overview( self, start_date: Optional[date] = None, end_date: Optional[date] = None ) -> Dict[str, Any]: """获取 API 调用统计概览""" query = self.db.query(func.count(ApiCallLog.id)) if start_date: query = query.filter(func.date(ApiCallLog.created_at) >= start_date) if end_date: query = query.filter(func.date(ApiCallLog.created_at) <= end_date) total_calls = query.scalar() or 0 success_query = self.db.query(func.count(ApiCallLog.id)).filter(ApiCallLog.status == 'success') if start_date: success_query = success_query.filter(func.date(ApiCallLog.created_at) >= start_date) if end_date: success_query = success_query.filter(func.date(ApiCallLog.created_at) <= end_date) success_calls = success_query.scalar() or 0 return { "total_calls": total_calls, "success_calls": success_calls, "failed_calls": total_calls - success_calls, } @stats_cached() async def get_business_trend( self, start_date: date, end_date: date ) -> List[Dict[str, Any]]: """获取 API 调用趋势""" calls_by_date = self.db.query( func.date(ApiCallLog.created_at).label('date'), func.count(ApiCallLog.id).label('count') ).filter( func.date(ApiCallLog.created_at) >= start_date, func.date(ApiCallLog.created_at) <= end_date ).group_by(func.date(ApiCallLog.created_at)).all() date_range = [] current = start_date while current <= end_date: date_range.append(current) current += timedelta(days=1) calls_dict = {item.date: item.count for item in calls_by_date} return [ { "date": d.isoformat(), "api_calls": calls_dict.get(d, 0), } for d in date_range ] @stats_cached() async def get_model_usage_ranking( self, start_date: Optional[date] = None, end_date: Optional[date] = None, top_n: int = 10 ) -> List[Dict[str, Any]]: """获取模型使用排行(基于 API 调用日志)""" query = self.db.query( ApiCallLog.model_name, func.count(ApiCallLog.id).label('count') ) if start_date: query = query.filter(func.date(ApiCallLog.created_at) >= start_date) if end_date: query = query.filter(func.date(ApiCallLog.created_at) <= end_date) query = query.filter(ApiCallLog.model_name.isnot(None)) query = query.group_by(ApiCallLog.model_name) query = query.order_by(func.count(ApiCallLog.id).desc()) query = query.limit(top_n) results = query.all() total_count = sum(r.count for r in results) return [ { "model_name": r.model_name, "usage_count": r.count, "percentage": round((r.count / total_count * 100) if total_count > 0 else 0, 2) } for r in results ] @stats_cached() async def get_dashboard_metrics(self) -> Dict[str, Any]: """获取仪表盘核心指标""" today = date.today() yesterday = today - timedelta(days=1) seven_days_ago = today - timedelta(days=7) # 今日新增用户 today_new_users = self.db.query(func.count(User.id)).filter( func.date(User.created_at) == today ).scalar() or 0 yesterday_new_users = self.db.query(func.count(User.id)).filter( func.date(User.created_at) == yesterday ).scalar() or 0 today_new_users_growth = ( ((today_new_users - yesterday_new_users) / yesterday_new_users * 100) if yesterday_new_users > 0 else 0.0 ) total_users = self.db.query(func.count(User.id)).scalar() or 0 # 活跃用户数(近7日有API调用的用户) active_users = self.db.query(func.count(func.distinct(ApiCallLog.user_id))).filter( func.date(ApiCallLog.created_at) >= seven_days_ago ).scalar() or 0 # API调用量(今日) today_api_calls = self.db.query(func.count(ApiCallLog.id)).filter( func.date(ApiCallLog.created_at) == today ).scalar() or 0 return { "today_new_users": today_new_users, "today_new_users_growth": round(today_new_users_growth, 2), "total_users": total_users, "active_users": active_users, "today_api_calls": today_api_calls, }