import json import time import asyncio import sys from pathlib import Path # root_dir = Path(__file__).parent.parent.parent # print(root_dir) # sys.path.append(str(root_dir)) from typing import Dict, Optional, Any from .time_statistics import track_execution_time from foundation.base.config import config_handler from foundation.logger.loggering import server_logger from foundation.base.redis_connection import RedisConnectionFactory # 缓存数据有效期 默认 3 分钟 CACHE_DATA_EXPIRED_TIME = 3 * 60 async def set_redis_result_cache_data(data_type: str , trace_id: str, value: str): """ 设置redis结果缓存数据 @param data_type: 数据类型,基本信息 cattle_info、体温信息 cattle_temperature 、步数信息 cattle_walk @param trace_id: 链路跟踪ID @param value: 缓存数据 """ expired_time = config_handler.get("api", "CACHE_DATA_EXPIRED_TIME" , CACHE_DATA_EXPIRED_TIME) key = f"{trace_id}:{data_type}" # 直接获取 RedisStore redis_store = await RedisConnectionFactory.get_redis_store() await redis_store.set(key, value , ex=expired_time) async def get_redis_result_cache_data(data_type: str , trace_id: str): """ 获取redis结果缓存数据 @param data_type: 数据类型,基本信息 cattle_info、体温信息 cattle_temperature 、步数信息 cattle_walk @param trace_id: 链路跟踪ID """ key = f"{trace_id}:{data_type}" # 直接获取 RedisStore redis_store = await RedisConnectionFactory.get_redis_store() value = await redis_store.get(key) value = value.decode('utf-8') return value async def get_redis_result_cache_data_and_delete_key(data_type: str , trace_id: str): """ 获取redis结果缓存数据 @param data_type: 数据类型,基本信息 cattle_info、体温信息 cattle_temperature 、步数信息 cattle_walk @param trace_id: 链路跟踪ID """ key = f"{trace_id}:{data_type}" # 直接获取 RedisStore redis_store = await RedisConnectionFactory.get_redis_store() value = await redis_store.get(key) server_logger.info(f"获取redis结果缓存数据: {key}-{value}") if value is None: return None # 第一步:转成字符串(decode) json_str = value.decode('utf-8') # 第二步:解析 JSON data = json.loads(json_str) # 删除key #await redis_store.delete(key) return data @track_execution_time async def _store_file_chunked(file_id: str, file_content: bytes, chunk_size: int = 1024*1024, expire_seconds: int = 3600) -> bool: """ 分块存储大文件内容(内部方法) """ try: redis_store = await RedisConnectionFactory.get_redis_store() file_size = len(file_content) server_logger.info(f"开始分块存储文件: {file_id}, 大小: {file_size/1024/1024:.2f}MB, 分块大小: {chunk_size/1024/1024:.2f}MB") # 计算分块数量 chunk_count = (file_size + chunk_size - 1) // chunk_size # 创建分块索引信息 chunk_index = { 'file_id': file_id, 'file_size': file_size, 'chunk_size': chunk_size, 'chunk_count': chunk_count, 'created_at': int(time.time()) } # 存储分块索引 await redis_store.setex(f"chunks:{file_id}", expire_seconds, json.dumps(chunk_index)) # 分块存储文件内容 tasks = [] for i in range(chunk_count): start = i * chunk_size end = min(start + chunk_size, file_size) chunk_data = file_content[start:end] chunk_key = f"chunk:{file_id}:{i}" task = redis_store.setex(chunk_key, expire_seconds, chunk_data) tasks.append(task) # 并行执行所有分块存储 await asyncio.gather(*tasks) server_logger.info(f"文件分块存储完成: {file_id}, {chunk_count}个块") return True except Exception as e: server_logger.error(f"分块存储文件失败: {file_id}, {str(e)}") return False async def _get_file_chunked(file_id: str) -> bytes: """ 获取大文件内容(从分块中组装)(内部方法) """ try: redis_store = await RedisConnectionFactory.get_redis_store() # 获取分块索引 chunk_index_json = await redis_store.get(f"chunks:{file_id}") if not chunk_index_json: server_logger.warning(f"文件分块索引不存在: {file_id}") return None chunk_index = json.loads(chunk_index_json.decode('utf-8')) chunk_count = chunk_index['chunk_count'] # 并行获取所有分块 tasks = [] for i in range(chunk_count): chunk_key = f"chunk:{file_id}:{i}" task = redis_store.get(chunk_key) tasks.append(task) # 并行执行获取 chunks = await asyncio.gather(*tasks) # 组装文件内容 file_content = b''.join(chunks) return file_content except Exception as e: server_logger.error(f"获取分块文件失败: {file_id}, {str(e)}") return None async def _delete_file_chunks(file_id: str) -> bool: """ 删除大文件分块(内部方法) """ try: redis_store = await RedisConnectionFactory.get_redis_store() # 获取分块索引 chunk_index_json = await redis_store.get(f"chunks:{file_id}") if not chunk_index_json: return True # 可能已经删除了 chunk_index = json.loads(chunk_index_json.decode('utf-8')) chunk_count = chunk_index['chunk_count'] # 构造要删除的所有键 keys_to_delete = [f"chunks:{file_id}"] keys_to_delete.extend([f"chunk:{file_id}:{i}" for i in range(chunk_count)]) # 批量删除 await redis_store.delete(*keys_to_delete) return True except Exception as e: server_logger.error(f"删除文件分块失败: {file_id}, {str(e)}") return False @track_execution_time async def store_file_info(file_id: str, file_info: Dict[str, Any], expire_seconds: int = 3600) -> bool: """ 存储文件信息(自动优化:小文件直接存储,大文件分块存储) Args: file_id: 文件ID file_info: 文件信息字典 expire_seconds: 过期时间(秒),默认1小时 Returns: bool: 存储是否成功 """ # 直接存储开关,True表示使用直接存储,False表示使用分块存储 direct_storage = True try: redis_store = await RedisConnectionFactory.get_redis_store() # 检查是否已存在,避免重复存储 existing_meta = await redis_store.get(f"meta:{file_id}") if existing_meta: server_logger.info(f"文件信息已存在,跳过存储: {file_id}") return True # 提取文件内容 file_content = file_info.get('file_content') if file_content: file_size = len(file_content) chunk_threshold = 50 * 1024 * 1024 # 50MB阈值 # 根据文件大小和强制参数选择存储策略 if direct_storage or file_size <= chunk_threshold: storage_method = "直接存储" if direct_storage else "直接存储" server_logger.info(f"使用{storage_method}策略: {file_id}, {file_size/1024/1024:.2f}MB") # 直接存储 metadata = {k: v for k, v in file_info.items() if k != 'file_content'} metadata['storage_type'] = 'direct_test' if direct_storage else 'direct' metadata['file_size'] = file_size # 并行执行元数据和内容存储以提高性能 tasks = [ redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(metadata)), redis_store.setex(f"content:{file_id}", expire_seconds, file_content) ] await asyncio.gather(*tasks) else: server_logger.info(f"使用分块存储策略: {file_id}, {file_size/1024/1024:.2f}MB > 50MB") # 分块存储文件内容 chunk_success = await _store_file_chunked(file_id, file_content, expire_seconds=expire_seconds) if not chunk_success: return False # 存储元数据(不含文件内容) metadata = {k: v for k, v in file_info.items() if k != 'file_content'} metadata['storage_type'] = 'chunked' metadata['file_size'] = file_size await redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(metadata)) else: # 没有文件内容,只存元数据 metadata = file_info.copy() metadata['storage_type'] = 'metadata_only' await redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(metadata)) server_logger.info(f"文件信息已存储到Redis: {file_id}") return True except Exception as e: server_logger.error(f"存储文件信息到Redis失败: {str(e)}") return False @track_execution_time async def get_file_info(file_id: str, include_content: bool = True) -> Optional[Dict[str, Any]]: """ 根据file_id获取文件信息(自动适配分块和直接存储) Args: file_id: 文件ID include_content: 是否包含文件内容(默认True),可选False以提高效率 Returns: Dict: 文件信息字典,如果不存在返回None """ try: redis_store = await RedisConnectionFactory.get_redis_store() # 获取元数据 meta_key = f"meta:{file_id}" meta_bytes = await redis_store.get(meta_key) if not meta_bytes: server_logger.warning(f"文件元数据不存在: {meta_key}") return None # 解析元数据 file_info = json.loads(meta_bytes.decode('utf-8')) storage_type = file_info.get('storage_type', 'direct') # 根据存储类型获取文件内容 if include_content and 'file_size' in file_info: if storage_type == 'chunked': # 从分块中获取文件内容 file_content = await _get_file_chunked(file_id) if file_content: file_info['file_content'] = file_content else: server_logger.warning(f"分块文件内容获取失败: {file_id}") elif storage_type == 'direct': # 直接获取文件内容 content_key = f"content:{file_id}" file_content = await redis_store.get(content_key) if file_content: file_info['file_content'] = file_content else: server_logger.warning(f"文件内容不存在: {content_key}") server_logger.info(f"从Redis获取到文件信息: {meta_key}, 存储类型: {storage_type}") return file_info except json.JSONDecodeError as e: server_logger.error(f"解析文件元数据JSON失败: {str(e)}") return None except Exception as e: server_logger.error(f"获取文件信息失败: {str(e)}") return None @track_execution_time async def delete_file_info(file_id: str) -> bool: """ 删除文件信息(自动适配分块和直接存储) Args: file_id: 文件ID Returns: bool: 删除是否成功 """ try: redis_store = await RedisConnectionFactory.get_redis_store() # 获取元数据以确定存储类型 meta_key = f"meta:{file_id}" meta_bytes = await redis_store.get(meta_key) if not meta_bytes: server_logger.warning(f"文件元数据不存在: {meta_key}") return True # 可能已经删除了 # 解析元数据 file_info = json.loads(meta_bytes.decode('utf-8')) storage_type = file_info.get('storage_type', 'direct') # 根据存储类型删除相应的内容 deleted_count = 0 # 删除元数据 deleted_count += await redis_store.delete(meta_key) if storage_type == 'chunked': # 删除分块内容 chunk_success = await _delete_file_chunks(file_id) if chunk_success: server_logger.info(f"已删除分块文件内容: {file_id}") elif storage_type == 'direct': # 删除直接存储的内容 content_key = f"content:{file_id}" deleted_count += await redis_store.delete(content_key) if deleted_count > 0: server_logger.info(f"已删除文件信息: {file_id}, {deleted_count}个键") return True else: server_logger.warning(f"Redis缓存不存在,无法删除: {file_id}") return False except json.JSONDecodeError as e: server_logger.error(f"解析文件元数据JSON失败: {str(e)}") return False except Exception as e: server_logger.error(f"删除文件信息失败: {str(e)}") return False #asyncio.run(delete_file_info('e385049cde7d21a48c7de216182f0f23'))