""" 翻译记忆服务 处理翻译记忆的增删改查和相似句子匹配 """ import logging from datetime import datetime from typing import Optional, List from sqlalchemy.orm import Session from sqlalchemy import and_, func from app.models.translation import TranslationMemory from app.schemas.translation_schema import ( MemoryCreate, MemoryResponse, MemoryListResponse, MemoryMatch, MemoryMatchResponse ) logger = logging.getLogger(__name__) class TranslationMemoryService: """翻译记忆服务类""" def __init__(self, db: Session): """ 初始化服务 Args: db: 数据库会话 """ self.db = db def add_memory( self, user_id: str, memory: MemoryCreate, source_type: str = "manual", task_id: Optional[str] = None ) -> MemoryResponse: """ 添加翻译记忆 Args: user_id: 用户ID memory: 翻译记忆创建请求 source_type: 来源类型 task_id: 关联的任务ID Returns: 翻译记忆响应 """ try: # 检查是否已存在相同的记忆 existing = self.db.query(TranslationMemory).filter( and_( TranslationMemory.user_id == user_id, TranslationMemory.source_lang == memory.source_lang, TranslationMemory.target_lang == memory.target_lang, TranslationMemory.source_text == memory.source_text, TranslationMemory.is_deleted == False ) ).first() if existing: # 更新已存在的记忆 existing.target_text = memory.target_text existing.updated_at = datetime.now() self.db.commit() self.db.refresh(existing) logger.info(f"更新翻译记忆: user_id={user_id}, id={existing.id}") return MemoryResponse( id=existing.id, source_lang=existing.source_lang, target_lang=existing.target_lang, source_text=existing.source_text, target_text=existing.target_text, source_type=existing.source_type, usage_count=existing.usage_count, created_at=existing.created_at.strftime("%Y-%m-%d %H:%M:%S") ) # 创建新记忆 new_memory = TranslationMemory( user_id=user_id, source_lang=memory.source_lang, target_lang=memory.target_lang, source_text=memory.source_text, target_text=memory.target_text, source_type=source_type, task_id=task_id ) self.db.add(new_memory) self.db.commit() self.db.refresh(new_memory) logger.info(f"添加翻译记忆成功: user_id={user_id}, id={new_memory.id}") return MemoryResponse( id=new_memory.id, source_lang=new_memory.source_lang, target_lang=new_memory.target_lang, source_text=new_memory.source_text, target_text=new_memory.target_text, source_type=new_memory.source_type, usage_count=new_memory.usage_count, created_at=new_memory.created_at.strftime("%Y-%m-%d %H:%M:%S") ) except Exception as e: logger.error(f"添加翻译记忆失败: {str(e)}") self.db.rollback() raise Exception(f"添加翻译记忆失败: {str(e)}") def get_user_memories( self, user_id: str, source_lang: Optional[str] = None, target_lang: Optional[str] = None, keyword: Optional[str] = None ) -> MemoryListResponse: """ 查询用户翻译记忆 Args: user_id: 用户ID source_lang: 源语言筛选 target_lang: 目标语言筛选 keyword: 关键词搜索 Returns: 翻译记忆列表响应 """ query = self.db.query(TranslationMemory).filter( TranslationMemory.user_id == user_id, TranslationMemory.is_deleted == False ) # 应用筛选条件 if source_lang: query = query.filter(TranslationMemory.source_lang == source_lang) if target_lang: query = query.filter(TranslationMemory.target_lang == target_lang) if keyword: query = query.filter( (TranslationMemory.source_text.ilike(f"%{keyword}%")) | (TranslationMemory.target_text.ilike(f"%{keyword}%")) ) # 排序(按使用次数和创建时间) query = query.order_by( TranslationMemory.usage_count.desc(), TranslationMemory.created_at.desc() ) items = query.all() # 构建响应 memory_items = [ MemoryResponse( id=item.id, source_lang=item.source_lang, target_lang=item.target_lang, source_text=item.source_text, target_text=item.target_text, source_type=item.source_type, usage_count=item.usage_count, created_at=item.created_at.strftime("%Y-%m-%d %H:%M:%S") ) for item in items ] return MemoryListResponse(items=memory_items) def delete_memory( self, user_id: str, memory_id: int ) -> bool: """ 删除翻译记忆(软删除) Args: user_id: 用户ID memory_id: 记忆ID Returns: 是否删除成功 """ try: memory = self.db.query(TranslationMemory).filter( TranslationMemory.id == memory_id, TranslationMemory.user_id == user_id, TranslationMemory.is_deleted == False ).first() if not memory: logger.warning(f"翻译记忆不存在或已删除: memory_id={memory_id}, user_id={user_id}") return False # 软删除 memory.is_deleted = True memory.deleted_at = datetime.now() self.db.commit() logger.info(f"软删除翻译记忆成功: memory_id={memory_id}, user_id={user_id}") return True except Exception as e: logger.error(f"软删除翻译记忆失败: {str(e)}") self.db.rollback() return False def find_similar( self, user_id: str, source_text: str, source_lang: str, target_lang: str, threshold: float = 0.8, limit: int = 5 ) -> MemoryMatchResponse: """ 查找相似翻译 使用简单的文本相似度算法(基于字符串包含关系) Args: user_id: 用户ID source_text: 源文本 source_lang: 源语言 target_lang: 目标语言 threshold: 相似度阈值 limit: 返回结果数量限制 Returns: 相似句子匹配响应 """ try: # 查询相同语言对的翻译记忆 memories = self.db.query(TranslationMemory).filter( TranslationMemory.user_id == user_id, TranslationMemory.source_lang == source_lang, TranslationMemory.target_lang == target_lang, TranslationMemory.is_deleted == False ).all() matches = [] for memory in memories: # 计算简单的相似度(基于字符串包含和长度) similarity = self._calculate_similarity(source_text, memory.source_text) if similarity >= threshold: matches.append({ "memory": memory, "similarity": similarity }) # 按相似度排序 matches.sort(key=lambda x: x["similarity"], reverse=True) # 限制返回数量 matches = matches[:limit] # 更新使用统计 for match in matches: memory = match["memory"] memory.usage_count += 1 memory.last_used_at = datetime.now() self.db.commit() # 构建响应 match_results = [ MemoryMatch( source_text=match["memory"].source_text, target_text=match["memory"].target_text, similarity=match["similarity"] ) for match in matches ] return MemoryMatchResponse(matches=match_results) except Exception as e: logger.error(f"查找相似翻译失败: {str(e)}") return MemoryMatchResponse(matches=[]) def _calculate_similarity(self, text1: str, text2: str) -> float: """ 计算文本相似度(简单算法) 基于最长公共子序列和字符串长度 Args: text1: 文本1 text2: 文本2 Returns: 相似度(0-1) """ if text1 == text2: return 1.0 # 完全包含关系 if text1 in text2 or text2 in text1: shorter = min(len(text1), len(text2)) longer = max(len(text1), len(text2)) return shorter / longer # 计算最长公共子序列长度 lcs_length = self._lcs_length(text1, text2) # 相似度 = LCS长度 / 较长字符串长度 max_length = max(len(text1), len(text2)) if max_length == 0: return 0.0 return lcs_length / max_length def _lcs_length(self, text1: str, text2: str) -> int: """ 计算最长公共子序列长度 Args: text1: 文本1 text2: 文本2 Returns: LCS长度 """ m, n = len(text1), len(text2) # 创建DP表 dp = [[0] * (n + 1) for _ in range(m + 1)] # 填充DP表 for i in range(1, m + 1): for j in range(1, n + 1): if text1[i - 1] == text2[j - 1]: dp[i][j] = dp[i - 1][j - 1] + 1 else: dp[i][j] = max(dp[i - 1][j], dp[i][j - 1]) return dp[m][n]