ソースを参照

dev:目录审查模块的redis临时数据自动过期;

ChenJiSheng 1 ヶ月 前
コミット
a992b7c30b

+ 1 - 0
config/config.ini.template

@@ -60,6 +60,7 @@ REDIS_URL=redis://127.0.0.1:6379/0
 REDIS_HOST=127.0.0.1
 REDIS_PORT=6379
 REDIS_DB=0
+REDIS_TTL=600
 REDIS_PASSWORD=123456
 REDIS_MAX_CONNECTIONS=50
 

+ 95 - 7
core/construction_review/component/reviewers/catalogues_check/utils/redis_utils.py

@@ -25,8 +25,8 @@ class CataloguesRedisManager:
 
     def __init__(self):
         """初始化 Redis 配置"""
-        self._load_redis_config()
         self._redis_client = None
+        self._load_redis_config()
 
     def _load_redis_config(self):
         """从 config.ini 读取 Redis 配置"""
@@ -38,6 +38,16 @@ class CataloguesRedisManager:
         self.REDIS_PORT = config.getint('redis', 'REDIS_PORT', fallback=6379)
         self.REDIS_PASSWORD = config.get('redis', 'REDIS_PASSWORD', fallback='')
         self.REDIS_DB = config.getint('redis', 'REDIS_DB', fallback=0)
+        
+        # 读取默认 TTL 配置(秒)
+        try:
+            self.DEFAULT_TTL = config.getint('redis', 'REDIS_TTL', fallback=600)  # 默认 10 分钟
+            if self.DEFAULT_TTL == 0:
+                self.DEFAULT_TTL = None  # 0 表示不过期
+            logger.info(f"Redis 默认 TTL 配置: {self.DEFAULT_TTL} 秒" if self.DEFAULT_TTL else "Redis 默认 TTL: 不过期")
+        except Exception as e:
+            logger.warning(f"读取 Redis TTL 配置失败: {e},使用默认值(10分钟)")
+            self.DEFAULT_TTL = 600  # 默认 10 分钟
 
     def get_redis_client(self):
         """获取 Redis 客户端(单例模式)"""
@@ -62,13 +72,14 @@ class CataloguesRedisManager:
         """获取数据存储的 Redis key(直接使用 title 作为 field)"""
         return f"catalogues_check:{task_id}:data"
 
-    def store_dataframe(self, df: pd.DataFrame, task_id: str) -> int:
+    def store_dataframe(self, df: pd.DataFrame, task_id: str, ttl_seconds: Optional[int] = None) -> int:
         """
         将 DataFrame 存入 Redis(直接使用 title 作为 field key)
 
         Args:
             df: 要存储的 DataFrame
             task_id: 任务 ID(callback_task_id)
+            ttl_seconds: 过期时间(秒),None 表示使用配置文件的默认值或不过期
 
         Returns:
             int: 存储的行数
@@ -94,7 +105,13 @@ class CataloguesRedisManager:
         # 存储行数
         redis_client.hset(data_key, "_row_count", len(rows))
 
-        logger.info(f"数据存储完成,共 {len(rows)} 行")
+        # 设置过期时间
+        effective_ttl = ttl_seconds if ttl_seconds is not None else self.DEFAULT_TTL
+        if effective_ttl:
+            redis_client.expire(data_key, effective_ttl)
+            logger.info(f"数据存储完成,共 {len(rows)} 行,TTL: {effective_ttl} 秒")
+        else:
+            logger.info(f"数据存储完成,共 {len(rows)} 行,未设置过期时间")
         return len(rows)
 
     def read_all(self, task_id: str) -> pd.DataFrame:
@@ -177,7 +194,8 @@ class CataloguesRedisManager:
         logger.info(f"按 titles 读取完成,找到 {len(rows)} 行")
         return pd.DataFrame(rows)
 
-    def update_row_by_index(self, task_id: str, row_index: int, row_data: Dict[str, Any]) -> bool:
+    def update_row_by_index(self, task_id: str, row_index: int, row_data: Dict[str, Any],
+                           reset_ttl: bool = True, ttl_seconds: Optional[int] = None) -> bool:
         """
         按行号更新单行数据(需要先读取所有数据找到对应行)
 
@@ -185,6 +203,8 @@ class CataloguesRedisManager:
             task_id: 任务 ID
             row_index: 行号(从1开始)
             row_data: 新的行数据
+            reset_ttl: 是否重置过期时间,默认为 True
+            ttl_seconds: 新的过期时间(秒),None 表示使用配置文件的默认值或不过期
 
         Returns:
             bool: 更新是否成功
@@ -209,10 +229,20 @@ class CataloguesRedisManager:
         row_json = json.dumps(row_data, ensure_ascii=False)
         redis_client.hset(data_key, title, row_json)
 
-        logger.info(f"行号 {row_index} (title: {title}) 更新成功")
+        # 重置过期时间
+        if reset_ttl:
+            effective_ttl = ttl_seconds if ttl_seconds is not None else self.DEFAULT_TTL
+            if effective_ttl:
+                redis_client.expire(data_key, effective_ttl)
+                logger.info(f"行号 {row_index} (title: {title}) 更新成功,TTL 重置为 {effective_ttl} 秒")
+            else:
+                logger.info(f"行号 {row_index} (title: {title}) 更新成功,未重置过期时间")
+        else:
+            logger.info(f"行号 {row_index} (title: {title}) 更新成功")
         return True
 
-    def update_row_by_title(self, task_id: str, title: str, row_data: Dict[str, Any]) -> bool:
+    def update_row_by_title(self, task_id: str, title: str, row_data: Dict[str, Any],
+                           reset_ttl: bool = True, ttl_seconds: Optional[int] = None) -> bool:
         """
         按 title 字段更新单行数据
 
@@ -220,6 +250,8 @@ class CataloguesRedisManager:
             task_id: 任务 ID
             title: 要更新的 title 值
             row_data: 新的行数据
+            reset_ttl: 是否重置过期时间,默认为 True
+            ttl_seconds: 新的过期时间(秒),None 表示使用配置文件的默认值或不过期
 
         Returns:
             bool: 更新是否成功
@@ -238,7 +270,16 @@ class CataloguesRedisManager:
         row_json = json.dumps(row_data, ensure_ascii=False)
         redis_client.hset(data_key, title, row_json)
 
-        logger.info(f"title '{title}' 更新成功")
+        # 重置过期时间
+        if reset_ttl:
+            effective_ttl = ttl_seconds if ttl_seconds is not None else self.DEFAULT_TTL
+            if effective_ttl:
+                redis_client.expire(data_key, effective_ttl)
+                logger.info(f"title '{title}' 更新成功,TTL 重置为 {effective_ttl} 秒")
+            else:
+                logger.info(f"title '{title}' 更新成功,未重置过期时间")
+        else:
+            logger.info(f"title '{title}' 更新成功")
         return True
 
     def delete_task_data(self, task_id: str) -> bool:
@@ -334,6 +375,53 @@ class CataloguesRedisManager:
         data_key = self._get_data_key(task_id)
         return redis_client.exists(data_key) > 0
 
+    def set_ttl(self, task_id: str, ttl_seconds: int) -> bool:
+        """
+        为指定任务的数据设置过期时间
+
+        Args:
+            task_id: 任务 ID
+            ttl_seconds: 过期时间(秒)
+
+        Returns:
+            bool: 设置是否成功
+        """
+        redis_client = self.get_redis_client()
+        data_key = self._get_data_key(task_id)
+
+        logger.info(f"为 task_id '{task_id}' 设置 TTL: {ttl_seconds} 秒")
+
+        # 检查数据是否存在
+        if not redis_client.exists(data_key):
+            logger.warning(f"task_id '{task_id}' 对应的数据不存在")
+            return False
+
+        # 设置过期时间
+        redis_client.expire(data_key, ttl_seconds)
+        logger.info(f"TTL 设置成功")
+        return True
+
+    def get_ttl(self, task_id: str) -> Optional[int]:
+        """
+        获取指定任务数据的剩余过期时间
+
+        Args:
+            task_id: 任务 ID
+
+        Returns:
+            int: 剩余过期时间(秒),-1 表示永不过期,-2 表示 key 不存在,None 表示出错
+        """
+        redis_client = self.get_redis_client()
+        data_key = self._get_data_key(task_id)
+
+        try:
+            ttl = redis_client.ttl(data_key)
+            logger.info(f"task_id '{task_id}' 的剩余 TTL: {ttl} 秒")
+            return ttl
+        except Exception as e:
+            logger.error(f"获取 TTL 失败: {e}")
+            return None
+
     def read_catalogues_data_by_chapters(self, task_id: str, chapter_labels: List[str]) -> pd.DataFrame:
         """
         按章节标签列表读取目录审查结果数据

+ 19 - 2
core/construction_review/component/reviewers/check_completeness/utils/redis_csv_utils.py

@@ -25,6 +25,16 @@ REDIS_PORT = config.getint('redis', 'REDIS_PORT', fallback=6379)
 REDIS_PASSWORD = config.get('redis', 'REDIS_PASSWORD', fallback='')
 REDIS_DB = config.getint('redis', 'REDIS_DB', fallback=0)
 
+# 读取默认 TTL 配置(秒)
+try:
+    DEFAULT_TTL = config.getint('redis', 'REDIS_TTL', fallback=600)  # 默认 10 分钟
+    if DEFAULT_TTL == 0:
+        DEFAULT_TTL = None  # 0 表示不过期
+    logger.info(f"Redis 默认 TTL 配置: {DEFAULT_TTL} 秒" if DEFAULT_TTL else "Redis 默认 TTL: 不过期")
+except Exception as e:
+    logger.warning(f"读取 Redis TTL 配置失败: {e},使用默认值(10分钟)")
+    DEFAULT_TTL = 600  # 默认 10 分钟
+
 # 绑定ID
 BIND_ID = '2d5d99c823a6b1a19f770932f3237bf8-1768535328'
 
@@ -64,13 +74,14 @@ def store_header_to_redis(redis_client, bind_id, header):
     redis_client.hset(bind_id, "header", header_json)
 
 
-def df_store_to_redis(redis_client, data=None, bind_id=None):
+def df_store_to_redis(redis_client, data=None, bind_id=None, ttl_seconds=None):
     """读取CSV文件并按行存入Redis
     
     Args:
         redis_client: Redis客户端
         data: DataFrame数据(可选),如果为None则从CSV文件读取
         bind_id: 绑定ID
+        ttl_seconds: 过期时间(秒),None 表示使用配置文件的默认值或不过期
     
     Returns:
         list: 存储的行数据列表
@@ -100,7 +111,13 @@ def df_store_to_redis(redis_client, data=None, bind_id=None):
     # 存储行数
     redis_client.hset(bind_id, "row_count", len(rows))
     
-    logger.info(f"数据存储完成,共 {len(rows)} 行")
+    # 设置过期时间
+    effective_ttl = ttl_seconds if ttl_seconds is not None else DEFAULT_TTL
+    if effective_ttl:
+        redis_client.expire(bind_id, effective_ttl)
+        logger.info(f"数据存储完成,共 {len(rows)} 行,TTL: {effective_ttl} 秒")
+    else:
+        logger.info(f"数据存储完成,共 {len(rows)} 行,未设置过期时间")
     
     return rows