task_service.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. import logging
  2. from typing import List, Dict, Any, Tuple, Optional
  3. from app.base.async_mysql_connection import get_db_connection
  4. from app.base.minio_connection import get_minio_manager
  5. logger = logging.getLogger(__name__)
  6. class TaskService:
  7. """任务管理服务类"""
  8. def __init__(self):
  9. self.minio_manager = get_minio_manager()
  10. async def get_task_list(self, task_type: str) -> List[Dict[str, Any]]:
  11. """获取任务列表
  12. Args:
  13. task_type: 任务类型, 'data' 或 'image'
  14. """
  15. conn = get_db_connection()
  16. if not conn:
  17. return []
  18. cursor = conn.cursor()
  19. try:
  20. if task_type == 'data':
  21. # 类型为数据的,从 t_samp_document_main 拿名称
  22. sql = """
  23. SELECT
  24. t.id,
  25. t.business_id,
  26. t.task_id,
  27. t.type,
  28. d.title as name
  29. FROM t_task_management t
  30. JOIN t_samp_document_main d ON t.business_id COLLATE utf8mb4_unicode_ci = d.id COLLATE utf8mb4_unicode_ci
  31. WHERE t.type = 'data' AND d.whether_to_task = 1
  32. ORDER BY d.created_time DESC
  33. """
  34. elif task_type == 'image':
  35. # 类型为图片的,从 t_image_info 拿名称和 URL
  36. sql = """
  37. SELECT
  38. t.id,
  39. t.business_id,
  40. t.task_id,
  41. t.type,
  42. i.image_name as name,
  43. i.image_url
  44. FROM t_task_management t
  45. JOIN t_image_info i ON t.business_id COLLATE utf8mb4_unicode_ci = i.id COLLATE utf8mb4_unicode_ci
  46. WHERE t.type = 'image' AND i.whether_to_task = 1
  47. ORDER BY i.created_time DESC
  48. """
  49. else:
  50. return []
  51. cursor.execute(sql)
  52. tasks = cursor.fetchall()
  53. # 如果是图片类型,处理 URL 转换以支持前端预览
  54. if task_type == 'image':
  55. for item in tasks:
  56. if item.get('image_url'):
  57. item['image_url'] = self.minio_manager.get_full_url(item['image_url'])
  58. return tasks
  59. except Exception as e:
  60. logger.exception(f"获取任务列表失败 ({task_type}): {e}")
  61. return []
  62. finally:
  63. cursor.close()
  64. conn.close()
  65. async def add_task(self, business_id: str, task_type: str, task_id: str = None) -> Tuple[bool, str, Optional[int]]:
  66. """添加或更新任务记录
  67. Returns:
  68. Tuple[bool, str, Optional[int]]: (是否成功, 消息, 记录的自增id)
  69. """
  70. conn = get_db_connection()
  71. if not conn:
  72. return False, "数据库连接失败", None
  73. cursor = conn.cursor()
  74. try:
  75. # 1. 插入或更新记录
  76. # 使用 business_id 作为唯一标识(业务主键),id 为数据库自增主键
  77. sql = """
  78. INSERT INTO t_task_management (business_id, task_id, type)
  79. VALUES (%s, %s, %s)
  80. ON DUPLICATE KEY UPDATE task_id = VALUES(task_id)
  81. """
  82. cursor.execute(sql, (business_id, task_id, task_type))
  83. # 2. 获取当前记录的 id (如果是更新,lastrowid 也是有效的)
  84. record_id = cursor.lastrowid
  85. # 如果是更新且 lastrowid 没拿到,通过 business_id 查一下
  86. if not record_id:
  87. cursor.execute("SELECT id FROM t_task_management WHERE business_id = %s", (business_id,))
  88. res = cursor.fetchone()
  89. if res:
  90. record_id = res['id']
  91. conn.commit()
  92. return True, "添加成功", record_id
  93. except Exception as e:
  94. logger.exception(f"添加任务记录失败: {e}")
  95. conn.rollback()
  96. return False, f"添加失败: {str(e)}", None
  97. finally:
  98. cursor.close()
  99. conn.close()
  100. async def delete_task_by_id(self, id: int) -> Tuple[bool, str]:
  101. """根据主键 id 删除任务记录"""
  102. conn = get_db_connection()
  103. if not conn:
  104. return False, "数据库连接失败"
  105. cursor = conn.cursor()
  106. try:
  107. sql = "DELETE FROM t_task_management WHERE id = %s"
  108. cursor.execute(sql, (id,))
  109. conn.commit()
  110. return True, "删除成功"
  111. except Exception as e:
  112. logger.exception(f"根据id删除任务失败: {e}")
  113. conn.rollback()
  114. return False, f"删除失败: {str(e)}"
  115. finally:
  116. cursor.close()
  117. conn.close()
  118. async def delete_task(self, business_id: str) -> Tuple[bool, str]:
  119. """删除任务记录"""
  120. conn = get_db_connection()
  121. if not conn:
  122. return False, "数据库连接失败"
  123. cursor = conn.cursor()
  124. try:
  125. sql = "DELETE FROM t_task_management WHERE business_id = %s"
  126. cursor.execute(sql, (business_id,))
  127. conn.commit()
  128. return True, "删除成功"
  129. except Exception as e:
  130. logger.exception(f"删除任务记录失败: {e}")
  131. conn.rollback()
  132. return False, f"删除失败: {str(e)}"
  133. finally:
  134. cursor.close()
  135. conn.close()
  136. task_service = TaskService()