""" 管理员模型管理服务(新表结构) """ from datetime import datetime from decimal import Decimal from typing import Optional, List, Tuple from sqlalchemy.orm import Session from sqlalchemy import or_, desc from app.models.model import ModelNew, ModelPriceNew from app.schemas.admin_schema import ( ModelListParams, ModelListItem, ModelDetailResponse, ModelCreateRequest, ModelUpdateRequest, ModelPriceRequest ) class AdminModelService: def __init__(self, db: Session): self.db = db def list_models(self, params: ModelListParams) -> Tuple[List[ModelListItem], int]: query = self.db.query(ModelNew).filter(ModelNew.is_local == False) if params.keyword: kw = f"%{params.keyword}%" query = query.filter(or_( ModelNew.model_code.ilike(kw), ModelNew.display_name.ilike(kw), ModelNew.supplier.ilike(kw), )) if params.category is not None: query = query.filter(ModelNew.categories.any(params.category)) if params.supplier: query = query.filter(ModelNew.supplier == params.supplier) if params.is_show_enabled is not None: query = query.filter(ModelNew.is_show_enabled == params.is_show_enabled) if params.is_api_enabled is not None: query = query.filter(ModelNew.is_api_enabled == params.is_api_enabled) total = query.count() from sqlalchemy.orm import selectinload models = query.order_by(desc(ModelNew.id)).options( selectinload(ModelNew.prices) ).offset((params.page - 1) * params.size).limit(params.size).all() result = [ ModelListItem( id=m.id, model_code=m.model_code, display_name=m.display_name, img=m.img or "", category=m.categories[0] if m.categories else 0, categories=m.categories or [], supplier=m.supplier, description=(m.custom_description or m.description or "")[:80] or None, # 取第一条有效价格记录作摘要 price_label=m.prices[0].label if m.prices else None, input_price_original=m.prices[0].input_price_original if m.prices else None, output_price_original=m.prices[0].output_price_original if m.prices else None, input_price_discounted=m.prices[0].input_price_discounted if m.prices else None, output_price_discounted=m.prices[0].output_price_discounted if m.prices else None, discount_rate=m.prices[0].discount_rate if m.prices else None, discount_label=m.prices[0].discount_label if m.prices else None, price_unit=m.prices[0].unit if m.prices else None, is_show_enabled=m.is_show_enabled, is_api_enabled=m.is_api_enabled, is_featured=m.is_featured, is_search=m.is_search, is_thinking=m.is_thinking, ) for m in models ] return result, total def get_model_detail(self, model_id: int) -> Optional[ModelDetailResponse]: model = self.db.query(ModelNew).filter(ModelNew.id == model_id).first() if not model: return None return ModelDetailResponse( id=model.id, model_code=model.model_code, display_name=model.display_name, img=model.img, category=model.categories[0] if model.categories else 0, supplier=model.supplier, description=model.description, custom_description=model.custom_description, tag1=model.tag1, tag2=model.tag2, keywords=model.keywords, is_featured=model.is_featured, is_search=model.is_search, is_thinking=model.is_thinking, is_show_enabled=model.is_show_enabled, is_api_enabled=model.is_api_enabled, source_keys=model.source_keys, normalized_keys=model.normalized_keys, created_at=model.created_at, updated_at=model.updated_at, ) def create_model(self, data: ModelCreateRequest) -> int: existing = self.db.query(ModelNew).filter( ModelNew.model_code == data.model_code ).first() if existing: raise ValueError("MODEL_CODE_EXISTS") model = ModelNew( model_code=data.model_code, display_name=data.display_name, img=data.img, categories=[data.category] if data.category is not None else [], supplier=data.supplier, description=data.description, custom_description=data.custom_description, tag1=data.tag1, tag2=data.tag2, keywords=data.keywords, is_featured=data.is_featured, is_search=data.is_search, is_thinking=data.is_thinking, is_show_enabled=data.is_show_enabled, is_api_enabled=data.is_api_enabled, ) self.db.add(model) self.db.commit() self.db.refresh(model) return model.id def update_model(self, model_id: int, data: ModelUpdateRequest) -> bool: model = self.db.query(ModelNew).filter(ModelNew.id == model_id).first() if not model: raise ValueError("MODEL_NOT_FOUND") for field, value in data.model_dump(exclude_unset=True).items(): setattr(model, field, value) self.db.commit() return True def update_model_price(self, model_id: int, data: ModelPriceRequest) -> bool: """Upsert 价格:旧记录 is_active=false,插入新记录""" model = self.db.query(ModelNew).filter(ModelNew.id == model_id).first() if not model: raise ValueError("MODEL_NOT_FOUND") now = datetime.utcnow() label = data.label or "default" # 旧记录失效 self.db.query(ModelPriceNew).filter( ModelPriceNew.model_code == model.model_code, ModelPriceNew.label == label, ModelPriceNew.is_active == True, ).update({"is_active": False}) input_orig = data.input_price or Decimal("0") output_orig = data.output_price or Decimal("0") rate = data.discount_rate if data.discount_rate is not None else Decimal("1") # discount_rate 范围 0~1,1=无折扣 new_price = ModelPriceNew( model_code=model.model_code, label=label, tier_min=data.tier_min, tier_max=data.tier_max, tier_unit=data.tier_unit, input_price_original=input_orig, output_price_original=output_orig, discount_rate=rate, input_price_discounted=input_orig * rate, output_price_discounted=output_orig * rate, currency=data.currency or "CNY", unit=data.unit or "元/每百万tokens", display_multiplier=data.display_multiplier or 1000000, source_url=None, crawled_at=now, is_active=True, ) self.db.add(new_price) self.db.commit() return True def update_model_status(self, model_id: int, field: str, value: bool) -> bool: model = self.db.query(ModelNew).filter(ModelNew.id == model_id).first() if not model: raise ValueError("MODEL_NOT_FOUND") if field not in ("is_show_enabled", "is_api_enabled", "is_featured"): raise ValueError("INVALID_FIELD") setattr(model, field, value) self.db.commit() return True