""" 热词词表管理服务 """ import json import logging from typing import Dict, Any, Optional, List from datetime import datetime from sqlalchemy.orm import Session from sqlalchemy import desc from app.models.tingwu import TranscriptionPhrase from app.services.tingwu_client import TingwuClient, TingwuAPIError logger = logging.getLogger(__name__) class PhraseService: """热词词表管理服务""" def __init__(self, db: Session, user_id: str, api_key: str): """ 初始化服务 Args: db: 数据库会话 user_id: 用户 ID api_key: DashScope API Key """ self.db = db self.user_id = user_id self.client = TingwuClient(api_key) async def create_phrase( self, name: str, word_weights: Dict[str, int], description: Optional[str] = None ) -> TranscriptionPhrase: """ 创建热词词表 Args: name: 词表名称 word_weights: 热词及权重字典 description: 词表描述 Returns: 创建的词表对象 Raises: ValueError: 参数错误 TingwuAPIError: API 调用失败 """ # 1. 参数验证 if not name or len(name) > 100: raise ValueError("词表名称不能为空且长度不超过 100 字符") if not word_weights or not isinstance(word_weights, dict): raise ValueError("热词权重必须是非空字典") # 验证权重值 for word, weight in word_weights.items(): if not isinstance(weight, int) or weight < 1 or weight > 10: raise ValueError(f"热词 '{word}' 的权重必须是 1-10 之间的整数") # 2. 调用 DashScope API 创建词表 result = await self.client.create_phrase(name, word_weights, description) phrase_id = result.get('PhraseId') if not phrase_id: raise ValueError("API 返回的词表 ID 为空") # 3. 保存到数据库 phrase = TranscriptionPhrase( user_id=self.user_id, phrase_id=phrase_id, name=name, description=description, word_weights=json.dumps(word_weights, ensure_ascii=False), status='ACTIVE' ) self.db.add(phrase) self.db.commit() self.db.refresh(phrase) logger.info(f"创建热词词表成功: phrase_id={phrase_id}, user_id={self.user_id}") return phrase async def update_phrase( self, phrase_id: str, name: Optional[str] = None, word_weights: Optional[Dict[str, int]] = None, description: Optional[str] = None ) -> TranscriptionPhrase: """ 更新热词词表 Args: phrase_id: 词表 ID name: 新名称 word_weights: 新的热词权重 description: 新描述 Returns: 更新后的词表对象 Raises: ValueError: 词表不存在或参数错误 TingwuAPIError: API 调用失败 """ # 1. 查询词表 phrase = self.db.query(TranscriptionPhrase).filter_by( phrase_id=phrase_id, user_id=self.user_id, status='ACTIVE' ).first() if not phrase: raise ValueError(f"词表不存在或已删除: {phrase_id}") # 2. 参数验证 if name and len(name) > 100: raise ValueError("词表名称长度不超过 100 字符") if word_weights: if not isinstance(word_weights, dict): raise ValueError("热词权重必须是字典") for word, weight in word_weights.items(): if not isinstance(weight, int) or weight < 1 or weight > 10: raise ValueError(f"热词 '{word}' 的权重必须是 1-10 之间的整数") # 3. 调用 DashScope API 更新 await self.client.update_phrase(phrase_id, name, word_weights, description) # 4. 更新数据库 if name: phrase.name = name if word_weights: phrase.word_weights = json.dumps(word_weights, ensure_ascii=False) if description is not None: phrase.description = description phrase.updated_at = datetime.now() self.db.commit() self.db.refresh(phrase) logger.info(f"更新热词词表成功: phrase_id={phrase_id}") return phrase async def delete_phrase(self, phrase_id: str) -> bool: """ 删除热词词表 Args: phrase_id: 词表 ID Returns: 是否删除成功 Raises: ValueError: 词表不存在 TingwuAPIError: API 调用失败 """ # 1. 查询词表 phrase = self.db.query(TranscriptionPhrase).filter_by( phrase_id=phrase_id, user_id=self.user_id, status='ACTIVE' ).first() if not phrase: raise ValueError(f"词表不存在或已删除: {phrase_id}") # 2. 调用 DashScope API 删除 await self.client.delete_phrase(phrase_id) # 3. 软删除(更新状态) phrase.status = 'DELETED' phrase.updated_at = datetime.now() self.db.commit() logger.info(f"删除热词词表成功: phrase_id={phrase_id}") return True def get_user_phrases( self, page: int = 1, page_size: int = 20, status: str = 'ACTIVE' ) -> Dict[str, Any]: """ 获取用户热词词表列表 Args: page: 页码 page_size: 每页数量 status: 状态筛选 Returns: 包含词表列表和总数的字典 """ query = self.db.query(TranscriptionPhrase).filter_by( user_id=self.user_id, status=status ) # 总数 total = query.count() # 分页查询 phrases = query.order_by(desc(TranscriptionPhrase.created_at)).offset( (page - 1) * page_size ).limit(page_size).all() return { 'total': total, 'page': page, 'page_size': page_size, 'items': [phrase.to_dict() for phrase in phrases] } def get_phrase_by_id(self, phrase_id: str) -> Optional[TranscriptionPhrase]: """ 根据 ID 获取词表 Args: phrase_id: 词表 ID Returns: 词表对象,不存在返回 None """ return self.db.query(TranscriptionPhrase).filter_by( phrase_id=phrase_id, user_id=self.user_id, status='ACTIVE' ).first()