| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- import logging
- from typing import List, Dict, Any, Tuple, Optional
- from app.base.async_mysql_connection import get_db_connection
- from app.base.minio_connection import get_minio_manager
- logger = logging.getLogger(__name__)
- class TaskService:
- """任务管理服务类"""
-
- def __init__(self):
- self.minio_manager = get_minio_manager()
- async def get_task_list(self, task_type: str) -> List[Dict[str, Any]]:
- """获取任务列表
-
- Args:
- task_type: 任务类型, 'data' 或 'image'
- """
- conn = get_db_connection()
- if not conn:
- return []
-
- cursor = conn.cursor()
- try:
- if task_type == 'data':
- # 类型为数据的,从 t_samp_document_main 拿名称
- sql = """
- SELECT
- t.id,
- t.business_id,
- t.task_id,
- t.type,
- d.title as name
- FROM t_task_management t
- JOIN t_samp_document_main d ON t.business_id COLLATE utf8mb4_unicode_ci = d.id COLLATE utf8mb4_unicode_ci
- WHERE t.type = 'data' AND d.whether_to_task = 1
- ORDER BY d.created_time DESC
- """
- elif task_type == 'image':
- # 类型为图片的,从 t_image_info 拿名称和 URL
- sql = """
- SELECT
- t.id,
- t.business_id,
- t.task_id,
- t.type,
- i.image_name as name,
- i.image_url
- FROM t_task_management t
- JOIN t_image_info i ON t.business_id COLLATE utf8mb4_unicode_ci = i.id COLLATE utf8mb4_unicode_ci
- WHERE t.type = 'image' AND i.whether_to_task = 1
- ORDER BY i.created_time DESC
- """
- else:
- return []
-
- cursor.execute(sql)
- tasks = cursor.fetchall()
-
- # 如果是图片类型,处理 URL 转换以支持前端预览
- if task_type == 'image':
- for item in tasks:
- if item.get('image_url'):
- item['image_url'] = self.minio_manager.get_full_url(item['image_url'])
-
- return tasks
- except Exception as e:
- logger.exception(f"获取任务列表失败 ({task_type}): {e}")
- return []
- finally:
- cursor.close()
- conn.close()
- async def add_task(self, business_id: str, task_type: str, task_id: str = None) -> Tuple[bool, str, Optional[int]]:
- """添加或更新任务记录
-
- Returns:
- Tuple[bool, str, Optional[int]]: (是否成功, 消息, 记录的自增id)
- """
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败", None
-
- cursor = conn.cursor()
- try:
- # 1. 插入或更新记录
- # 使用 business_id 作为唯一标识(业务主键),id 为数据库自增主键
- sql = """
- INSERT INTO t_task_management (business_id, task_id, type)
- VALUES (%s, %s, %s)
- ON DUPLICATE KEY UPDATE task_id = VALUES(task_id)
- """
- cursor.execute(sql, (business_id, task_id, task_type))
-
- # 2. 获取当前记录的 id (如果是更新,lastrowid 也是有效的)
- record_id = cursor.lastrowid
-
- # 如果是更新且 lastrowid 没拿到,通过 business_id 查一下
- if not record_id:
- cursor.execute("SELECT id FROM t_task_management WHERE business_id = %s", (business_id,))
- res = cursor.fetchone()
- if res:
- record_id = res['id']
- conn.commit()
- return True, "添加成功", record_id
- except Exception as e:
- logger.exception(f"添加任务记录失败: {e}")
- conn.rollback()
- return False, f"添加失败: {str(e)}", None
- finally:
- cursor.close()
- conn.close()
- async def delete_task_by_id(self, id: int) -> Tuple[bool, str]:
- """根据主键 id 删除任务记录"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
- try:
- sql = "DELETE FROM t_task_management WHERE id = %s"
- cursor.execute(sql, (id,))
- conn.commit()
- return True, "删除成功"
- except Exception as e:
- logger.exception(f"根据id删除任务失败: {e}")
- conn.rollback()
- return False, f"删除失败: {str(e)}"
- finally:
- cursor.close()
- conn.close()
- async def delete_task(self, business_id: str) -> Tuple[bool, str]:
- """删除任务记录"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
- try:
- sql = "DELETE FROM t_task_management WHERE business_id = %s"
- cursor.execute(sql, (business_id,))
- conn.commit()
- return True, "删除成功"
- except Exception as e:
- logger.exception(f"删除任务记录失败: {e}")
- conn.rollback()
- return False, f"删除失败: {str(e)}"
- finally:
- cursor.close()
- conn.close()
- task_service = TaskService()
|