|
|
@@ -0,0 +1,116 @@
|
|
|
+import csv
|
|
|
+from pathlib import Path
|
|
|
+from typing import Optional, Dict, List
|
|
|
+from datetime import date, datetime
|
|
|
+
|
|
|
+from foundation.database.base.sql.async_mysql_base_dao import AsyncBaseDAO
|
|
|
+from foundation.database.base.sql.async_mysql_conn_pool import AsyncMySQLPool
|
|
|
+from foundation.observability.logger.loggering import server_logger
|
|
|
+
|
|
|
+
|
|
|
+_CSV_DIR = Path("temp/construction_review/doc_callback_task_id_table")
|
|
|
+_CSV_PATH = _CSV_DIR / "doc_callback_task_id_table.csv"
|
|
|
+_CSV_COLUMNS = ["file_name", "file_id", "callback_task_id", "upload_date", "created_at"]
|
|
|
+
|
|
|
+
|
|
|
+def _ensure_csv() -> Path:
|
|
|
+ _CSV_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
+ if not _CSV_PATH.exists():
|
|
|
+ with open(_CSV_PATH, "w", newline="", encoding="utf-8") as f:
|
|
|
+ import csv as _csv
|
|
|
+ writer = _csv.writer(f)
|
|
|
+ writer.writerow(_CSV_COLUMNS)
|
|
|
+ return _CSV_PATH
|
|
|
+
|
|
|
+
|
|
|
+def _read_csv() -> List[Dict]:
|
|
|
+ path = _ensure_csv()
|
|
|
+ with open(path, "r", newline="", encoding="utf-8") as f:
|
|
|
+ return list(csv.DictReader(f))
|
|
|
+
|
|
|
+
|
|
|
+def _append_csv(row: Dict) -> None:
|
|
|
+ path = _ensure_csv()
|
|
|
+ with open(path, "a", newline="", encoding="utf-8") as f:
|
|
|
+ writer = csv.DictWriter(f, fieldnames=_CSV_COLUMNS)
|
|
|
+ writer.writerow(row)
|
|
|
+
|
|
|
+
|
|
|
+class DocCallbackTaskIdDAO(AsyncBaseDAO):
|
|
|
+
|
|
|
+ def __init__(self, db_pool: AsyncMySQLPool):
|
|
|
+ super().__init__(db_pool)
|
|
|
+ self._table_exists_cache = None
|
|
|
+
|
|
|
+ async def _check_table_exists(self) -> bool:
|
|
|
+ if self._table_exists_cache is not None:
|
|
|
+ return self._table_exists_cache
|
|
|
+ try:
|
|
|
+ async with self.db_pool.get_cursor() as cursor:
|
|
|
+ await cursor.execute(
|
|
|
+ "SELECT COUNT(*) AS cnt FROM information_schema.tables "
|
|
|
+ "WHERE table_schema = DATABASE() AND table_name = %s",
|
|
|
+ ("doc_callback_task_id_table",),
|
|
|
+ )
|
|
|
+ row = await cursor.fetchone()
|
|
|
+ self._table_exists_cache = bool(row and row.get("cnt", 0) > 0)
|
|
|
+ except Exception:
|
|
|
+ self._table_exists_cache = False
|
|
|
+ return self._table_exists_cache
|
|
|
+
|
|
|
+ async def insert(self, file_name: str, file_id: str,
|
|
|
+ callback_task_id: str, upload_date: date) -> int:
|
|
|
+ if await self._check_table_exists():
|
|
|
+ sql = """
|
|
|
+ INSERT INTO doc_callback_task_id_table
|
|
|
+ (file_name, file_id, callback_task_id, upload_date)
|
|
|
+ VALUES (%s, %s, %s, %s)
|
|
|
+ """
|
|
|
+ from foundation.utils.common import handler_err
|
|
|
+ try:
|
|
|
+ async with self.db_pool.get_cursor() as cursor:
|
|
|
+ await cursor.execute(sql, (file_name, file_id, callback_task_id, upload_date))
|
|
|
+ return cursor.lastrowid
|
|
|
+ except Exception as err:
|
|
|
+ handler_err(logger=server_logger, err=err, err_name="插入文件上传记录失败")
|
|
|
+ raise
|
|
|
+
|
|
|
+ server_logger.info("doc_callback_task_id_table 表不存在,保存到 CSV")
|
|
|
+ _append_csv({
|
|
|
+ "file_name": file_name,
|
|
|
+ "file_id": file_id,
|
|
|
+ "callback_task_id": callback_task_id,
|
|
|
+ "upload_date": str(upload_date),
|
|
|
+ "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
|
+ })
|
|
|
+ return 0
|
|
|
+
|
|
|
+ async def get_by_callback_task_id(self, callback_task_id: str) -> Optional[Dict]:
|
|
|
+ if await self._check_table_exists():
|
|
|
+ sql = "SELECT * FROM doc_callback_task_id_table WHERE callback_task_id = %s"
|
|
|
+ return await self.fetch_one(sql, (callback_task_id,))
|
|
|
+
|
|
|
+ for row in _read_csv():
|
|
|
+ if row.get("callback_task_id") == callback_task_id:
|
|
|
+ return row
|
|
|
+ return None
|
|
|
+
|
|
|
+ async def get_by_file_id(self, file_id: str) -> List[Dict]:
|
|
|
+ if await self._check_table_exists():
|
|
|
+ sql = "SELECT * FROM doc_callback_task_id_table WHERE file_id = %s ORDER BY created_at DESC"
|
|
|
+ return await self.fetch_all(sql, (file_id,))
|
|
|
+
|
|
|
+ result = [r for r in _read_csv() if r.get("file_id") == file_id]
|
|
|
+ result.sort(key=lambda r: r.get("created_at", ""), reverse=True)
|
|
|
+ return result
|
|
|
+
|
|
|
+ async def get_by_date_range(self, start_date: date, end_date: date) -> List[Dict]:
|
|
|
+ if await self._check_table_exists():
|
|
|
+ sql = "SELECT * FROM doc_callback_task_id_table WHERE upload_date BETWEEN %s AND %s ORDER BY created_at DESC"
|
|
|
+ return await self.fetch_all(sql, (start_date, end_date))
|
|
|
+
|
|
|
+ start_str = str(start_date)
|
|
|
+ end_str = str(end_date)
|
|
|
+ result = [r for r in _read_csv() if start_str <= r.get("upload_date", "") <= end_str]
|
|
|
+ result.sort(key=lambda r: r.get("created_at", ""), reverse=True)
|
|
|
+ return result
|