| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287 |
- """
- 术语管理服务
- 处理用户术语库的增删改查
- """
- import logging
- from datetime import datetime
- from typing import Optional, List
- from sqlalchemy.orm import Session
- from sqlalchemy import and_
- from app.models.translation import UserTerminology
- from app.schemas.translation_schema import (
- TerminologyCreate,
- TerminologyResponse,
- TerminologyListResponse,
- TerminologyImportResponse
- )
- logger = logging.getLogger(__name__)
- class TerminologyService:
- """术语管理服务类"""
- def __init__(self, db: Session):
- """
- 初始化服务
-
- Args:
- db: 数据库会话
- """
- self.db = db
- def add_term(
- self,
- user_id: str,
- term: TerminologyCreate
- ) -> TerminologyResponse:
- """
- 添加术语
-
- Args:
- user_id: 用户ID
- term: 术语创建请求
-
- Returns:
- 术语响应
- """
- try:
- # 检查是否已存在
- existing = self.db.query(UserTerminology).filter(
- and_(
- UserTerminology.user_id == user_id,
- UserTerminology.source_lang == term.source_lang,
- UserTerminology.target_lang == term.target_lang,
- UserTerminology.source_term == term.source_term,
- UserTerminology.is_deleted == False
- )
- ).first()
-
- if existing:
- # 更新已存在的术语
- existing.target_term = term.target_term
- existing.updated_at = datetime.now()
- self.db.commit()
- self.db.refresh(existing)
-
- logger.info(f"更新术语: user_id={user_id}, id={existing.id}")
-
- return TerminologyResponse(
- id=existing.id,
- source_lang=existing.source_lang,
- target_lang=existing.target_lang,
- source_term=existing.source_term,
- target_term=existing.target_term,
- created_at=existing.created_at.strftime("%Y-%m-%d %H:%M:%S")
- )
-
- # 创建新术语
- new_term = UserTerminology(
- user_id=user_id,
- source_lang=term.source_lang,
- target_lang=term.target_lang,
- source_term=term.source_term,
- target_term=term.target_term
- )
-
- self.db.add(new_term)
- self.db.commit()
- self.db.refresh(new_term)
-
- logger.info(f"添加术语成功: user_id={user_id}, id={new_term.id}")
-
- return TerminologyResponse(
- id=new_term.id,
- source_lang=new_term.source_lang,
- target_lang=new_term.target_lang,
- source_term=new_term.source_term,
- target_term=new_term.target_term,
- created_at=new_term.created_at.strftime("%Y-%m-%d %H:%M:%S")
- )
-
- except Exception as e:
- logger.error(f"添加术语失败: {str(e)}")
- self.db.rollback()
- raise Exception(f"添加术语失败: {str(e)}")
- def get_user_terms(
- self,
- user_id: str,
- source_lang: Optional[str] = None,
- target_lang: Optional[str] = None,
- keyword: Optional[str] = None
- ) -> TerminologyListResponse:
- """
- 查询用户术语库
-
- Args:
- user_id: 用户ID
- source_lang: 源语言筛选
- target_lang: 目标语言筛选
- keyword: 关键词搜索
-
- Returns:
- 术语列表响应
- """
- query = self.db.query(UserTerminology).filter(
- UserTerminology.user_id == user_id,
- UserTerminology.is_deleted == False
- )
-
- # 应用筛选条件
- if source_lang:
- query = query.filter(UserTerminology.source_lang == source_lang)
- if target_lang:
- query = query.filter(UserTerminology.target_lang == target_lang)
- if keyword:
- query = query.filter(
- (UserTerminology.source_term.ilike(f"%{keyword}%")) |
- (UserTerminology.target_term.ilike(f"%{keyword}%"))
- )
-
- # 排序
- query = query.order_by(UserTerminology.created_at.desc())
-
- items = query.all()
-
- # 构建响应
- term_items = [
- TerminologyResponse(
- id=item.id,
- source_lang=item.source_lang,
- target_lang=item.target_lang,
- source_term=item.source_term,
- target_term=item.target_term,
- created_at=item.created_at.strftime("%Y-%m-%d %H:%M:%S")
- )
- for item in items
- ]
-
- return TerminologyListResponse(items=term_items)
- def delete_term(
- self,
- user_id: str,
- term_id: int
- ) -> bool:
- """
- 删除术语(软删除)
-
- Args:
- user_id: 用户ID
- term_id: 术语ID
-
- Returns:
- 是否删除成功
- """
- try:
- term = self.db.query(UserTerminology).filter(
- UserTerminology.id == term_id,
- UserTerminology.user_id == user_id,
- UserTerminology.is_deleted == False
- ).first()
-
- if not term:
- logger.warning(f"术语不存在或已删除: term_id={term_id}, user_id={user_id}")
- return False
-
- # 软删除
- term.is_deleted = True
- term.deleted_at = datetime.now()
- self.db.commit()
-
- logger.info(f"软删除术语成功: term_id={term_id}, user_id={user_id}")
- return True
-
- except Exception as e:
- logger.error(f"软删除术语失败: {str(e)}")
- self.db.rollback()
- return False
- def import_terms(
- self,
- user_id: str,
- source_lang: str,
- target_lang: str,
- terms: List[dict]
- ) -> TerminologyImportResponse:
- """
- 批量导入术语
-
- Args:
- user_id: 用户ID
- source_lang: 源语言
- target_lang: 目标语言
- terms: 术语列表
-
- Returns:
- 导入结果响应
- """
- success_count = 0
- failed_count = 0
- errors = []
-
- for term_data in terms:
- try:
- source_term = term_data.get("source_term", "").strip()
- target_term = term_data.get("target_term", "").strip()
-
- if not source_term or not target_term:
- failed_count += 1
- errors.append(f"术语不能为空: {term_data}")
- continue
-
- # 检查是否已存在
- existing = self.db.query(UserTerminology).filter(
- and_(
- UserTerminology.user_id == user_id,
- UserTerminology.source_lang == source_lang,
- UserTerminology.target_lang == target_lang,
- UserTerminology.source_term == source_term,
- UserTerminology.is_deleted == False
- )
- ).first()
-
- if existing:
- # 更新已存在的术语
- existing.target_term = target_term
- existing.updated_at = datetime.now()
- else:
- # 创建新术语
- new_term = UserTerminology(
- user_id=user_id,
- source_lang=source_lang,
- target_lang=target_lang,
- source_term=source_term,
- target_term=target_term
- )
- self.db.add(new_term)
-
- success_count += 1
-
- except Exception as e:
- failed_count += 1
- errors.append(f"导入失败: {term_data}, 错误: {str(e)}")
- logger.error(f"导入术语失败: {str(e)}")
-
- try:
- self.db.commit()
- logger.info(f"批量导入术语完成: user_id={user_id}, success={success_count}, failed={failed_count}")
- except Exception as e:
- self.db.rollback()
- logger.error(f"批量导入术语提交失败: {str(e)}")
- return TerminologyImportResponse(
- success_count=0,
- failed_count=len(terms),
- total_count=len(terms),
- errors=[f"数据库提交失败: {str(e)}"]
- )
-
- return TerminologyImportResponse(
- success_count=success_count,
- failed_count=failed_count,
- total_count=len(terms),
- errors=errors
- )
|