| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- """
- 热词词表管理服务
- """
- import json
- import logging
- from typing import Dict, Any, Optional, List
- from datetime import datetime
- from sqlalchemy.orm import Session
- from sqlalchemy import desc
- from app.models.tingwu import TranscriptionPhrase
- from app.services.tingwu_client import TingwuClient, TingwuAPIError
- logger = logging.getLogger(__name__)
- class PhraseService:
- """热词词表管理服务"""
-
- def __init__(self, db: Session, user_id: str, api_key: str):
- """
- 初始化服务
-
- Args:
- db: 数据库会话
- user_id: 用户 ID
- api_key: DashScope API Key
- """
- self.db = db
- self.user_id = user_id
- self.client = TingwuClient(api_key)
-
- async def create_phrase(
- self,
- name: str,
- word_weights: Dict[str, int],
- description: Optional[str] = None
- ) -> TranscriptionPhrase:
- """
- 创建热词词表
-
- Args:
- name: 词表名称
- word_weights: 热词及权重字典
- description: 词表描述
-
- Returns:
- 创建的词表对象
-
- Raises:
- ValueError: 参数错误
- TingwuAPIError: API 调用失败
- """
- # 1. 参数验证
- if not name or len(name) > 100:
- raise ValueError("词表名称不能为空且长度不超过 100 字符")
-
- if not word_weights or not isinstance(word_weights, dict):
- raise ValueError("热词权重必须是非空字典")
-
- # 验证权重值
- for word, weight in word_weights.items():
- if not isinstance(weight, int) or weight < 1 or weight > 10:
- raise ValueError(f"热词 '{word}' 的权重必须是 1-10 之间的整数")
-
- # 2. 调用 DashScope API 创建词表
- result = await self.client.create_phrase(name, word_weights, description)
-
- phrase_id = result.get('PhraseId')
- if not phrase_id:
- raise ValueError("API 返回的词表 ID 为空")
-
- # 3. 保存到数据库
- phrase = TranscriptionPhrase(
- user_id=self.user_id,
- phrase_id=phrase_id,
- name=name,
- description=description,
- word_weights=json.dumps(word_weights, ensure_ascii=False),
- status='ACTIVE'
- )
-
- self.db.add(phrase)
- self.db.commit()
- self.db.refresh(phrase)
-
- logger.info(f"创建热词词表成功: phrase_id={phrase_id}, user_id={self.user_id}")
-
- return phrase
-
- async def update_phrase(
- self,
- phrase_id: str,
- name: Optional[str] = None,
- word_weights: Optional[Dict[str, int]] = None,
- description: Optional[str] = None
- ) -> TranscriptionPhrase:
- """
- 更新热词词表
-
- Args:
- phrase_id: 词表 ID
- name: 新名称
- word_weights: 新的热词权重
- description: 新描述
-
- Returns:
- 更新后的词表对象
-
- Raises:
- ValueError: 词表不存在或参数错误
- TingwuAPIError: API 调用失败
- """
- # 1. 查询词表
- phrase = self.db.query(TranscriptionPhrase).filter_by(
- phrase_id=phrase_id,
- user_id=self.user_id,
- status='ACTIVE'
- ).first()
-
- if not phrase:
- raise ValueError(f"词表不存在或已删除: {phrase_id}")
-
- # 2. 参数验证
- if name and len(name) > 100:
- raise ValueError("词表名称长度不超过 100 字符")
-
- if word_weights:
- if not isinstance(word_weights, dict):
- raise ValueError("热词权重必须是字典")
-
- for word, weight in word_weights.items():
- if not isinstance(weight, int) or weight < 1 or weight > 10:
- raise ValueError(f"热词 '{word}' 的权重必须是 1-10 之间的整数")
-
- # 3. 调用 DashScope API 更新
- await self.client.update_phrase(phrase_id, name, word_weights, description)
-
- # 4. 更新数据库
- if name:
- phrase.name = name
- if word_weights:
- phrase.word_weights = json.dumps(word_weights, ensure_ascii=False)
- if description is not None:
- phrase.description = description
-
- phrase.updated_at = datetime.now()
- self.db.commit()
- self.db.refresh(phrase)
-
- logger.info(f"更新热词词表成功: phrase_id={phrase_id}")
-
- return phrase
-
- async def delete_phrase(self, phrase_id: str) -> bool:
- """
- 删除热词词表
-
- Args:
- phrase_id: 词表 ID
-
- Returns:
- 是否删除成功
-
- Raises:
- ValueError: 词表不存在
- TingwuAPIError: API 调用失败
- """
- # 1. 查询词表
- phrase = self.db.query(TranscriptionPhrase).filter_by(
- phrase_id=phrase_id,
- user_id=self.user_id,
- status='ACTIVE'
- ).first()
-
- if not phrase:
- raise ValueError(f"词表不存在或已删除: {phrase_id}")
-
- # 2. 调用 DashScope API 删除
- await self.client.delete_phrase(phrase_id)
-
- # 3. 软删除(更新状态)
- phrase.status = 'DELETED'
- phrase.updated_at = datetime.now()
- self.db.commit()
-
- logger.info(f"删除热词词表成功: phrase_id={phrase_id}")
-
- return True
-
- def get_user_phrases(
- self,
- page: int = 1,
- page_size: int = 20,
- status: str = 'ACTIVE'
- ) -> Dict[str, Any]:
- """
- 获取用户热词词表列表
-
- Args:
- page: 页码
- page_size: 每页数量
- status: 状态筛选
-
- Returns:
- 包含词表列表和总数的字典
- """
- query = self.db.query(TranscriptionPhrase).filter_by(
- user_id=self.user_id,
- status=status
- )
-
- # 总数
- total = query.count()
-
- # 分页查询
- phrases = query.order_by(desc(TranscriptionPhrase.created_at)).offset(
- (page - 1) * page_size
- ).limit(page_size).all()
-
- return {
- 'total': total,
- 'page': page,
- 'page_size': page_size,
- 'items': [phrase.to_dict() for phrase in phrases]
- }
-
- def get_phrase_by_id(self, phrase_id: str) -> Optional[TranscriptionPhrase]:
- """
- 根据 ID 获取词表
-
- Args:
- phrase_id: 词表 ID
-
- Returns:
- 词表对象,不存在返回 None
- """
- return self.db.query(TranscriptionPhrase).filter_by(
- phrase_id=phrase_id,
- user_id=self.user_id,
- status='ACTIVE'
- ).first()
|