import json import time import asyncio import sys from pathlib import Path # root_dir = Path(__file__).parent.parent.parent # print(root_dir) # sys.path.append(str(root_dir)) from typing import Dict, Optional, Any from .time_statistics import track_execution_time from foundation.base.config import config_handler from foundation.logger.loggering import server_logger from foundation.base.redis_connection import RedisConnectionFactory # 缓存数据有效期 默认 3 分钟 CACHE_DATA_EXPIRED_TIME = 3 * 60 async def set_redis_result_cache_data(data_type: str , trace_id: str, value: str): """ 设置redis结果缓存数据 @param data_type: 数据类型,基本信息 cattle_info、体温信息 cattle_temperature 、步数信息 cattle_walk @param trace_id: 链路跟踪ID @param value: 缓存数据 """ expired_time = config_handler.get("api", "CACHE_DATA_EXPIRED_TIME" , CACHE_DATA_EXPIRED_TIME) key = f"{trace_id}:{data_type}" # 直接获取 RedisStore redis_store = await RedisConnectionFactory.get_redis_store() await redis_store.set(key, value , ex=expired_time) async def get_redis_result_cache_data(data_type: str , trace_id: str): """ 获取redis结果缓存数据 @param data_type: 数据类型,基本信息 cattle_info、体温信息 cattle_temperature 、步数信息 cattle_walk @param trace_id: 链路跟踪ID """ key = f"{trace_id}:{data_type}" # 直接获取 RedisStore redis_store = await RedisConnectionFactory.get_redis_store() value = await redis_store.get(key) value = value.decode('utf-8') return value async def get_redis_result_cache_data_and_delete_key(data_type: str , trace_id: str): """ 获取redis结果缓存数据 @param data_type: 数据类型,基本信息 cattle_info、体温信息 cattle_temperature 、步数信息 cattle_walk @param trace_id: 链路跟踪ID """ key = f"{trace_id}:{data_type}" # 直接获取 RedisStore redis_store = await RedisConnectionFactory.get_redis_store() value = await redis_store.get(key) server_logger.info(f"获取redis结果缓存数据: {key}-{value}") if value is None: return None # 第一步:转成字符串(decode) json_str = value.decode('utf-8') # 第二步:解析 JSON data = json.loads(json_str) # 删除key #await redis_store.delete(key) return data @track_execution_time async def store_file_info(file_id: str, file_info: Dict[str, Any], expire_seconds: int = 3600) -> bool: """ 存储文件信息(直接存储模式) Args: file_id: 文件ID file_info: 文件信息字典 expire_seconds: 过期时间(秒),默认1小时 Returns: bool: 存储是否成功 """ try: redis_store = await RedisConnectionFactory.get_redis_store() # 检查是否已存在,避免重复存储 existing_meta = await redis_store.get(f"meta:{file_id}") if existing_meta: server_logger.info(f"文件信息已存在,跳过存储: {file_id}") return True # 提取文件内容 file_content = file_info.get('file_content') if file_content: file_size = len(file_content) server_logger.info(f"使用直接存储策略: {file_id}, {file_size/1024/1024:.2f}MB") # 直接存储 metadata = {k: v for k, v in file_info.items() if k != 'file_content'} metadata['file_size'] = file_size # 并行执行元数据和内容存储以提高性能 tasks = [ redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(metadata)), redis_store.setex(f"content:{file_id}", expire_seconds, file_content) ] await asyncio.gather(*tasks) else: # 没有文件内容,只存元数据 metadata = file_info.copy() await redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(metadata)) server_logger.info(f"文件信息已存储到Redis: {file_id}") return True except Exception as e: server_logger.error(f"存储文件信息到Redis失败: {str(e)}") return False @track_execution_time async def get_file_info(file_id: str, include_content: bool = True) -> Optional[Dict[str, Any]]: """ 根据file_id获取文件信息 Args: file_id: 文件ID include_content: 是否包含文件内容(默认True),可选False以提高效率 Returns: Dict: 文件信息字典,如果不存在返回None """ try: redis_store = await RedisConnectionFactory.get_redis_store() # 获取元数据 meta_key = f"meta:{file_id}" meta_bytes = await redis_store.get(meta_key) if not meta_bytes: server_logger.warning(f"文件元数据不存在: {meta_key}") return None # 解析元数据 file_info = json.loads(meta_bytes.decode('utf-8')) # 根据存储类型获取文件内容 if include_content and 'file_size' in file_info: # 直接获取文件内容 content_key = f"content:{file_id}" file_content = await redis_store.get(content_key) if file_content: file_info['file_content'] = file_content else: server_logger.warning(f"文件内容不存在: {content_key}") return None # 文件内容缺失,返回None server_logger.info(f"从Redis获取到文件信息: {meta_key}") return file_info except json.JSONDecodeError as e: server_logger.error(f"解析文件元数据JSON失败: {str(e)}") return None except Exception as e: server_logger.error(f"获取文件信息失败: {str(e)}") return None async def delete_file_info(file_id: str) -> bool: """ 删除文件信息 Args: file_id: 文件ID Returns: bool: 删除是否成功 """ try: # 为了避免事件循环冲突,直接创建新的Redis连接 from foundation.base.redis_config import load_config_from_env from foundation.base.redis_connection import RedisAdapter redis_config = load_config_from_env() adapter = RedisAdapter(redis_config) await adapter.connect() redis_store = adapter.get_langchain_redis_client() # 获取元数据以确定存储类型 meta_key = f"meta:{file_id}" meta_bytes = await redis_store.get(meta_key) if not meta_bytes: server_logger.warning(f"文件元数据不存在: {meta_key}") # 清理连接 await adapter.close() return True # 可能已经删除了 # 解析元数据 file_info = json.loads(meta_bytes.decode('utf-8')) # 删除相应的内容 deleted_count = 0 # 删除元数据 deleted_count += await redis_store.delete(meta_key) # 如果有文件大小信息,说明有文件内容,需要删除 if 'file_size' in file_info: # 删除文件内容 content_key = f"content:{file_id}" deleted_count += await redis_store.delete(content_key) if deleted_count > 0: server_logger.info(f"已删除文件信息: {file_id}, {deleted_count}个键") else: server_logger.warning(f"Redis缓存不存在,无法删除: {file_id}") # 清理连接 await adapter.close() return True if deleted_count > 0 else False except json.JSONDecodeError as e: server_logger.error(f"解析文件元数据JSON失败: {str(e)}") # 清理连接 await adapter.close() return False except Exception as e: server_logger.error(f"删除文件信息失败: {str(e)}") # 清理连接 await adapter.close() return False finally: # 确保连接被关闭 await adapter.close() #asyncio.run(delete_file_info('e385049cde7d21a48c7de216182f0f23'))