terminology_service.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. """
  2. 术语管理服务
  3. 处理用户术语库的增删改查
  4. """
  5. import logging
  6. from datetime import datetime
  7. from typing import Optional, List
  8. from sqlalchemy.orm import Session
  9. from sqlalchemy import and_
  10. from app.models.translation import UserTerminology
  11. from app.schemas.translation_schema import (
  12. TerminologyCreate,
  13. TerminologyResponse,
  14. TerminologyListResponse,
  15. TerminologyImportResponse
  16. )
  17. logger = logging.getLogger(__name__)
  18. class TerminologyService:
  19. """术语管理服务类"""
  20. def __init__(self, db: Session):
  21. """
  22. 初始化服务
  23. Args:
  24. db: 数据库会话
  25. """
  26. self.db = db
  27. def add_term(
  28. self,
  29. user_id: str,
  30. term: TerminologyCreate
  31. ) -> TerminologyResponse:
  32. """
  33. 添加术语
  34. Args:
  35. user_id: 用户ID
  36. term: 术语创建请求
  37. Returns:
  38. 术语响应
  39. """
  40. try:
  41. # 检查是否已存在
  42. existing = self.db.query(UserTerminology).filter(
  43. and_(
  44. UserTerminology.user_id == user_id,
  45. UserTerminology.source_lang == term.source_lang,
  46. UserTerminology.target_lang == term.target_lang,
  47. UserTerminology.source_term == term.source_term,
  48. UserTerminology.is_deleted == False
  49. )
  50. ).first()
  51. if existing:
  52. # 更新已存在的术语
  53. existing.target_term = term.target_term
  54. existing.updated_at = datetime.now()
  55. self.db.commit()
  56. self.db.refresh(existing)
  57. logger.info(f"更新术语: user_id={user_id}, id={existing.id}")
  58. return TerminologyResponse(
  59. id=existing.id,
  60. source_lang=existing.source_lang,
  61. target_lang=existing.target_lang,
  62. source_term=existing.source_term,
  63. target_term=existing.target_term,
  64. created_at=existing.created_at.strftime("%Y-%m-%d %H:%M:%S")
  65. )
  66. # 创建新术语
  67. new_term = UserTerminology(
  68. user_id=user_id,
  69. source_lang=term.source_lang,
  70. target_lang=term.target_lang,
  71. source_term=term.source_term,
  72. target_term=term.target_term
  73. )
  74. self.db.add(new_term)
  75. self.db.commit()
  76. self.db.refresh(new_term)
  77. logger.info(f"添加术语成功: user_id={user_id}, id={new_term.id}")
  78. return TerminologyResponse(
  79. id=new_term.id,
  80. source_lang=new_term.source_lang,
  81. target_lang=new_term.target_lang,
  82. source_term=new_term.source_term,
  83. target_term=new_term.target_term,
  84. created_at=new_term.created_at.strftime("%Y-%m-%d %H:%M:%S")
  85. )
  86. except Exception as e:
  87. logger.error(f"添加术语失败: {str(e)}")
  88. self.db.rollback()
  89. raise Exception(f"添加术语失败: {str(e)}")
  90. def get_user_terms(
  91. self,
  92. user_id: str,
  93. source_lang: Optional[str] = None,
  94. target_lang: Optional[str] = None,
  95. keyword: Optional[str] = None
  96. ) -> TerminologyListResponse:
  97. """
  98. 查询用户术语库
  99. Args:
  100. user_id: 用户ID
  101. source_lang: 源语言筛选
  102. target_lang: 目标语言筛选
  103. keyword: 关键词搜索
  104. Returns:
  105. 术语列表响应
  106. """
  107. query = self.db.query(UserTerminology).filter(
  108. UserTerminology.user_id == user_id,
  109. UserTerminology.is_deleted == False
  110. )
  111. # 应用筛选条件
  112. if source_lang:
  113. query = query.filter(UserTerminology.source_lang == source_lang)
  114. if target_lang:
  115. query = query.filter(UserTerminology.target_lang == target_lang)
  116. if keyword:
  117. query = query.filter(
  118. (UserTerminology.source_term.ilike(f"%{keyword}%")) |
  119. (UserTerminology.target_term.ilike(f"%{keyword}%"))
  120. )
  121. # 排序
  122. query = query.order_by(UserTerminology.created_at.desc())
  123. items = query.all()
  124. # 构建响应
  125. term_items = [
  126. TerminologyResponse(
  127. id=item.id,
  128. source_lang=item.source_lang,
  129. target_lang=item.target_lang,
  130. source_term=item.source_term,
  131. target_term=item.target_term,
  132. created_at=item.created_at.strftime("%Y-%m-%d %H:%M:%S")
  133. )
  134. for item in items
  135. ]
  136. return TerminologyListResponse(items=term_items)
  137. def delete_term(
  138. self,
  139. user_id: str,
  140. term_id: int
  141. ) -> bool:
  142. """
  143. 删除术语(软删除)
  144. Args:
  145. user_id: 用户ID
  146. term_id: 术语ID
  147. Returns:
  148. 是否删除成功
  149. """
  150. try:
  151. term = self.db.query(UserTerminology).filter(
  152. UserTerminology.id == term_id,
  153. UserTerminology.user_id == user_id,
  154. UserTerminology.is_deleted == False
  155. ).first()
  156. if not term:
  157. logger.warning(f"术语不存在或已删除: term_id={term_id}, user_id={user_id}")
  158. return False
  159. # 软删除
  160. term.is_deleted = True
  161. term.deleted_at = datetime.now()
  162. self.db.commit()
  163. logger.info(f"软删除术语成功: term_id={term_id}, user_id={user_id}")
  164. return True
  165. except Exception as e:
  166. logger.error(f"软删除术语失败: {str(e)}")
  167. self.db.rollback()
  168. return False
  169. def import_terms(
  170. self,
  171. user_id: str,
  172. source_lang: str,
  173. target_lang: str,
  174. terms: List[dict]
  175. ) -> TerminologyImportResponse:
  176. """
  177. 批量导入术语
  178. Args:
  179. user_id: 用户ID
  180. source_lang: 源语言
  181. target_lang: 目标语言
  182. terms: 术语列表
  183. Returns:
  184. 导入结果响应
  185. """
  186. success_count = 0
  187. failed_count = 0
  188. errors = []
  189. for term_data in terms:
  190. try:
  191. source_term = term_data.get("source_term", "").strip()
  192. target_term = term_data.get("target_term", "").strip()
  193. if not source_term or not target_term:
  194. failed_count += 1
  195. errors.append(f"术语不能为空: {term_data}")
  196. continue
  197. # 检查是否已存在
  198. existing = self.db.query(UserTerminology).filter(
  199. and_(
  200. UserTerminology.user_id == user_id,
  201. UserTerminology.source_lang == source_lang,
  202. UserTerminology.target_lang == target_lang,
  203. UserTerminology.source_term == source_term,
  204. UserTerminology.is_deleted == False
  205. )
  206. ).first()
  207. if existing:
  208. # 更新已存在的术语
  209. existing.target_term = target_term
  210. existing.updated_at = datetime.now()
  211. else:
  212. # 创建新术语
  213. new_term = UserTerminology(
  214. user_id=user_id,
  215. source_lang=source_lang,
  216. target_lang=target_lang,
  217. source_term=source_term,
  218. target_term=target_term
  219. )
  220. self.db.add(new_term)
  221. success_count += 1
  222. except Exception as e:
  223. failed_count += 1
  224. errors.append(f"导入失败: {term_data}, 错误: {str(e)}")
  225. logger.error(f"导入术语失败: {str(e)}")
  226. try:
  227. self.db.commit()
  228. logger.info(f"批量导入术语完成: user_id={user_id}, success={success_count}, failed={failed_count}")
  229. except Exception as e:
  230. self.db.rollback()
  231. logger.error(f"批量导入术语提交失败: {str(e)}")
  232. return TerminologyImportResponse(
  233. success_count=0,
  234. failed_count=len(terms),
  235. total_count=len(terms),
  236. errors=[f"数据库提交失败: {str(e)}"]
  237. )
  238. return TerminologyImportResponse(
  239. success_count=success_count,
  240. failed_count=failed_count,
  241. total_count=len(terms),
  242. errors=errors
  243. )