""" 术语管理服务 处理用户术语库的增删改查 """ import logging from datetime import datetime from typing import Optional, List from sqlalchemy.orm import Session from sqlalchemy import and_ from app.models.translation import UserTerminology from app.schemas.translation_schema import ( TerminologyCreate, TerminologyResponse, TerminologyListResponse, TerminologyImportResponse ) logger = logging.getLogger(__name__) class TerminologyService: """术语管理服务类""" def __init__(self, db: Session): """ 初始化服务 Args: db: 数据库会话 """ self.db = db def add_term( self, user_id: str, term: TerminologyCreate ) -> TerminologyResponse: """ 添加术语 Args: user_id: 用户ID term: 术语创建请求 Returns: 术语响应 """ try: # 检查是否已存在 existing = self.db.query(UserTerminology).filter( and_( UserTerminology.user_id == user_id, UserTerminology.source_lang == term.source_lang, UserTerminology.target_lang == term.target_lang, UserTerminology.source_term == term.source_term, UserTerminology.is_deleted == False ) ).first() if existing: # 更新已存在的术语 existing.target_term = term.target_term existing.updated_at = datetime.now() self.db.commit() self.db.refresh(existing) logger.info(f"更新术语: user_id={user_id}, id={existing.id}") return TerminologyResponse( id=existing.id, source_lang=existing.source_lang, target_lang=existing.target_lang, source_term=existing.source_term, target_term=existing.target_term, created_at=existing.created_at.strftime("%Y-%m-%d %H:%M:%S") ) # 创建新术语 new_term = UserTerminology( user_id=user_id, source_lang=term.source_lang, target_lang=term.target_lang, source_term=term.source_term, target_term=term.target_term ) self.db.add(new_term) self.db.commit() self.db.refresh(new_term) logger.info(f"添加术语成功: user_id={user_id}, id={new_term.id}") return TerminologyResponse( id=new_term.id, source_lang=new_term.source_lang, target_lang=new_term.target_lang, source_term=new_term.source_term, target_term=new_term.target_term, created_at=new_term.created_at.strftime("%Y-%m-%d %H:%M:%S") ) except Exception as e: logger.error(f"添加术语失败: {str(e)}") self.db.rollback() raise Exception(f"添加术语失败: {str(e)}") def get_user_terms( self, user_id: str, source_lang: Optional[str] = None, target_lang: Optional[str] = None, keyword: Optional[str] = None ) -> TerminologyListResponse: """ 查询用户术语库 Args: user_id: 用户ID source_lang: 源语言筛选 target_lang: 目标语言筛选 keyword: 关键词搜索 Returns: 术语列表响应 """ query = self.db.query(UserTerminology).filter( UserTerminology.user_id == user_id, UserTerminology.is_deleted == False ) # 应用筛选条件 if source_lang: query = query.filter(UserTerminology.source_lang == source_lang) if target_lang: query = query.filter(UserTerminology.target_lang == target_lang) if keyword: query = query.filter( (UserTerminology.source_term.ilike(f"%{keyword}%")) | (UserTerminology.target_term.ilike(f"%{keyword}%")) ) # 排序 query = query.order_by(UserTerminology.created_at.desc()) items = query.all() # 构建响应 term_items = [ TerminologyResponse( id=item.id, source_lang=item.source_lang, target_lang=item.target_lang, source_term=item.source_term, target_term=item.target_term, created_at=item.created_at.strftime("%Y-%m-%d %H:%M:%S") ) for item in items ] return TerminologyListResponse(items=term_items) def delete_term( self, user_id: str, term_id: int ) -> bool: """ 删除术语(软删除) Args: user_id: 用户ID term_id: 术语ID Returns: 是否删除成功 """ try: term = self.db.query(UserTerminology).filter( UserTerminology.id == term_id, UserTerminology.user_id == user_id, UserTerminology.is_deleted == False ).first() if not term: logger.warning(f"术语不存在或已删除: term_id={term_id}, user_id={user_id}") return False # 软删除 term.is_deleted = True term.deleted_at = datetime.now() self.db.commit() logger.info(f"软删除术语成功: term_id={term_id}, user_id={user_id}") return True except Exception as e: logger.error(f"软删除术语失败: {str(e)}") self.db.rollback() return False def import_terms( self, user_id: str, source_lang: str, target_lang: str, terms: List[dict] ) -> TerminologyImportResponse: """ 批量导入术语 Args: user_id: 用户ID source_lang: 源语言 target_lang: 目标语言 terms: 术语列表 Returns: 导入结果响应 """ success_count = 0 failed_count = 0 errors = [] for term_data in terms: try: source_term = term_data.get("source_term", "").strip() target_term = term_data.get("target_term", "").strip() if not source_term or not target_term: failed_count += 1 errors.append(f"术语不能为空: {term_data}") continue # 检查是否已存在 existing = self.db.query(UserTerminology).filter( and_( UserTerminology.user_id == user_id, UserTerminology.source_lang == source_lang, UserTerminology.target_lang == target_lang, UserTerminology.source_term == source_term, UserTerminology.is_deleted == False ) ).first() if existing: # 更新已存在的术语 existing.target_term = target_term existing.updated_at = datetime.now() else: # 创建新术语 new_term = UserTerminology( user_id=user_id, source_lang=source_lang, target_lang=target_lang, source_term=source_term, target_term=target_term ) self.db.add(new_term) success_count += 1 except Exception as e: failed_count += 1 errors.append(f"导入失败: {term_data}, 错误: {str(e)}") logger.error(f"导入术语失败: {str(e)}") try: self.db.commit() logger.info(f"批量导入术语完成: user_id={user_id}, success={success_count}, failed={failed_count}") except Exception as e: self.db.rollback() logger.error(f"批量导入术语提交失败: {str(e)}") return TerminologyImportResponse( success_count=0, failed_count=len(terms), total_count=len(terms), errors=[f"数据库提交失败: {str(e)}"] ) return TerminologyImportResponse( success_count=success_count, failed_count=failed_count, total_count=len(terms), errors=errors )