| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- """
- 管理员模型管理服务(新表结构)
- """
- from datetime import datetime
- from decimal import Decimal
- from typing import Optional, List, Tuple
- from sqlalchemy.orm import Session
- from sqlalchemy import or_, desc
- from app.models.model import ModelNew, ModelPriceNew
- from app.schemas.admin_schema import (
- ModelListParams, ModelListItem, ModelDetailResponse,
- ModelCreateRequest, ModelUpdateRequest, ModelPriceRequest
- )
- class AdminModelService:
- def __init__(self, db: Session):
- self.db = db
- def list_models(self, params: ModelListParams) -> Tuple[List[ModelListItem], int]:
- query = self.db.query(ModelNew).filter(ModelNew.is_local == False)
- if params.keyword:
- kw = f"%{params.keyword}%"
- query = query.filter(or_(
- ModelNew.model_code.ilike(kw),
- ModelNew.display_name.ilike(kw),
- ModelNew.supplier.ilike(kw),
- ))
- if params.category is not None:
- query = query.filter(ModelNew.categories.any(params.category))
- if params.supplier:
- query = query.filter(ModelNew.supplier == params.supplier)
- if params.is_show_enabled is not None:
- query = query.filter(ModelNew.is_show_enabled == params.is_show_enabled)
- if params.is_api_enabled is not None:
- query = query.filter(ModelNew.is_api_enabled == params.is_api_enabled)
- total = query.count()
- from sqlalchemy.orm import selectinload
- models = query.order_by(desc(ModelNew.id)).options(
- selectinload(ModelNew.prices)
- ).offset((params.page - 1) * params.size).limit(params.size).all()
- result = [
- ModelListItem(
- id=m.id,
- model_code=m.model_code,
- display_name=m.display_name,
- img=m.img or "",
- category=m.categories[0] if m.categories else 0,
- categories=m.categories or [],
- supplier=m.supplier,
- description=(m.custom_description or m.description or "")[:80] or None,
- # 取第一条有效价格记录作摘要
- price_label=m.prices[0].label if m.prices else None,
- input_price_original=m.prices[0].input_price_original if m.prices else None,
- output_price_original=m.prices[0].output_price_original if m.prices else None,
- input_price_discounted=m.prices[0].input_price_discounted if m.prices else None,
- output_price_discounted=m.prices[0].output_price_discounted if m.prices else None,
- discount_rate=m.prices[0].discount_rate if m.prices else None,
- discount_label=m.prices[0].discount_label if m.prices else None,
- price_unit=m.prices[0].unit if m.prices else None,
- is_show_enabled=m.is_show_enabled,
- is_api_enabled=m.is_api_enabled,
- is_featured=m.is_featured,
- is_search=m.is_search,
- is_thinking=m.is_thinking,
- )
- for m in models
- ]
- return result, total
- def get_model_detail(self, model_id: int) -> Optional[ModelDetailResponse]:
- model = self.db.query(ModelNew).filter(ModelNew.id == model_id).first()
- if not model:
- return None
- return ModelDetailResponse(
- id=model.id,
- model_code=model.model_code,
- display_name=model.display_name,
- img=model.img,
- category=model.categories[0] if model.categories else 0,
- supplier=model.supplier,
- description=model.description,
- custom_description=model.custom_description,
- tag1=model.tag1,
- tag2=model.tag2,
- keywords=model.keywords,
- is_featured=model.is_featured,
- is_search=model.is_search,
- is_thinking=model.is_thinking,
- is_show_enabled=model.is_show_enabled,
- is_api_enabled=model.is_api_enabled,
- source_keys=model.source_keys,
- normalized_keys=model.normalized_keys,
- created_at=model.created_at,
- updated_at=model.updated_at,
- )
- def create_model(self, data: ModelCreateRequest) -> int:
- existing = self.db.query(ModelNew).filter(
- ModelNew.model_code == data.model_code
- ).first()
- if existing:
- raise ValueError("MODEL_CODE_EXISTS")
- model = ModelNew(
- model_code=data.model_code,
- display_name=data.display_name,
- img=data.img,
- categories=[data.category] if data.category is not None else [],
- supplier=data.supplier,
- description=data.description,
- custom_description=data.custom_description,
- tag1=data.tag1,
- tag2=data.tag2,
- keywords=data.keywords,
- is_featured=data.is_featured,
- is_search=data.is_search,
- is_thinking=data.is_thinking,
- is_show_enabled=data.is_show_enabled,
- is_api_enabled=data.is_api_enabled,
- )
- self.db.add(model)
- self.db.commit()
- self.db.refresh(model)
- return model.id
- def update_model(self, model_id: int, data: ModelUpdateRequest) -> bool:
- model = self.db.query(ModelNew).filter(ModelNew.id == model_id).first()
- if not model:
- raise ValueError("MODEL_NOT_FOUND")
- for field, value in data.model_dump(exclude_unset=True).items():
- setattr(model, field, value)
- self.db.commit()
- return True
- def update_model_price(self, model_id: int, data: ModelPriceRequest) -> bool:
- """Upsert 价格:旧记录 is_active=false,插入新记录"""
- model = self.db.query(ModelNew).filter(ModelNew.id == model_id).first()
- if not model:
- raise ValueError("MODEL_NOT_FOUND")
- now = datetime.utcnow()
- label = data.label or "default"
- # 旧记录失效
- self.db.query(ModelPriceNew).filter(
- ModelPriceNew.model_code == model.model_code,
- ModelPriceNew.label == label,
- ModelPriceNew.is_active == True,
- ).update({"is_active": False})
- input_orig = data.input_price or Decimal("0")
- output_orig = data.output_price or Decimal("0")
- rate = data.discount_rate if data.discount_rate is not None else Decimal("1")
- # discount_rate 范围 0~1,1=无折扣
- new_price = ModelPriceNew(
- model_code=model.model_code,
- label=label,
- tier_min=data.tier_min,
- tier_max=data.tier_max,
- tier_unit=data.tier_unit,
- input_price_original=input_orig,
- output_price_original=output_orig,
- discount_rate=rate,
- input_price_discounted=input_orig * rate,
- output_price_discounted=output_orig * rate,
- currency=data.currency or "CNY",
- unit=data.unit or "元/每百万tokens",
- display_multiplier=data.display_multiplier or 1000000,
- source_url=None,
- crawled_at=now,
- is_active=True,
- )
- self.db.add(new_price)
- self.db.commit()
- return True
- def update_model_status(self, model_id: int, field: str, value: bool) -> bool:
- model = self.db.query(ModelNew).filter(ModelNew.id == model_id).first()
- if not model:
- raise ValueError("MODEL_NOT_FOUND")
- if field not in ("is_show_enabled", "is_api_enabled", "is_featured"):
- raise ValueError("INVALID_FIELD")
- setattr(model, field, value)
- self.db.commit()
- return True
|