|
|
@@ -1,16 +1,21 @@
|
|
|
|
|
|
import json
|
|
|
+import time
|
|
|
+import asyncio
|
|
|
+import sys
|
|
|
+from pathlib import Path
|
|
|
+# root_dir = Path(__file__).parent.parent.parent
|
|
|
+# print(root_dir)
|
|
|
+# sys.path.append(str(root_dir))
|
|
|
+from typing import Dict, Optional, Any
|
|
|
+from .time_statistics import track_execution_time
|
|
|
+from foundation.base.config import config_handler
|
|
|
from foundation.logger.loggering import server_logger
|
|
|
from foundation.base.redis_connection import RedisConnectionFactory
|
|
|
-from foundation.base.config import config_handler
|
|
|
# 缓存数据有效期 默认 3 分钟
|
|
|
CACHE_DATA_EXPIRED_TIME = 3 * 60
|
|
|
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
async def set_redis_result_cache_data(data_type: str , trace_id: str, value: str):
|
|
|
"""
|
|
|
设置redis结果缓存数据
|
|
|
@@ -24,9 +29,6 @@ async def set_redis_result_cache_data(data_type: str , trace_id: str, value: str
|
|
|
redis_store = await RedisConnectionFactory.get_redis_store()
|
|
|
await redis_store.set(key, value , ex=expired_time)
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
async def get_redis_result_cache_data(data_type: str , trace_id: str):
|
|
|
"""
|
|
|
获取redis结果缓存数据
|
|
|
@@ -41,7 +43,6 @@ async def get_redis_result_cache_data(data_type: str , trace_id: str):
|
|
|
return value
|
|
|
|
|
|
|
|
|
-
|
|
|
async def get_redis_result_cache_data_and_delete_key(data_type: str , trace_id: str):
|
|
|
"""
|
|
|
获取redis结果缓存数据
|
|
|
@@ -61,4 +62,304 @@ async def get_redis_result_cache_data_and_delete_key(data_type: str , trace_id:
|
|
|
data = json.loads(json_str)
|
|
|
# 删除key
|
|
|
#await redis_store.delete(key)
|
|
|
- return data
|
|
|
+ return data
|
|
|
+
|
|
|
+
|
|
|
+@track_execution_time
|
|
|
+async def _store_file_chunked(file_id: str, file_content: bytes, chunk_size: int = 1024*1024, expire_seconds: int = 3600) -> bool:
|
|
|
+ """
|
|
|
+ 分块存储大文件内容(内部方法)
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ redis_store = await RedisConnectionFactory.get_redis_store()
|
|
|
+ file_size = len(file_content)
|
|
|
+
|
|
|
+ server_logger.info(f"开始分块存储文件: {file_id}, 大小: {file_size/1024/1024:.2f}MB, 分块大小: {chunk_size/1024/1024:.2f}MB")
|
|
|
+
|
|
|
+ # 计算分块数量
|
|
|
+ chunk_count = (file_size + chunk_size - 1) // chunk_size
|
|
|
+
|
|
|
+ # 创建分块索引信息
|
|
|
+ chunk_index = {
|
|
|
+ 'file_id': file_id,
|
|
|
+ 'file_size': file_size,
|
|
|
+ 'chunk_size': chunk_size,
|
|
|
+ 'chunk_count': chunk_count,
|
|
|
+ 'created_at': int(time.time())
|
|
|
+ }
|
|
|
+
|
|
|
+ # 存储分块索引
|
|
|
+ await redis_store.setex(f"chunks:{file_id}", expire_seconds, json.dumps(chunk_index))
|
|
|
+
|
|
|
+ # 分块存储文件内容
|
|
|
+ tasks = []
|
|
|
+ for i in range(chunk_count):
|
|
|
+ start = i * chunk_size
|
|
|
+ end = min(start + chunk_size, file_size)
|
|
|
+ chunk_data = file_content[start:end]
|
|
|
+ chunk_key = f"chunk:{file_id}:{i}"
|
|
|
+ task = redis_store.setex(chunk_key, expire_seconds, chunk_data)
|
|
|
+ tasks.append(task)
|
|
|
+
|
|
|
+ # 并行执行所有分块存储
|
|
|
+ await asyncio.gather(*tasks)
|
|
|
+
|
|
|
+ server_logger.info(f"文件分块存储完成: {file_id}, {chunk_count}个块")
|
|
|
+ return True
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ server_logger.error(f"分块存储文件失败: {file_id}, {str(e)}")
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+async def _get_file_chunked(file_id: str) -> bytes:
|
|
|
+ """
|
|
|
+ 获取大文件内容(从分块中组装)(内部方法)
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ redis_store = await RedisConnectionFactory.get_redis_store()
|
|
|
+
|
|
|
+ # 获取分块索引
|
|
|
+ chunk_index_json = await redis_store.get(f"chunks:{file_id}")
|
|
|
+ if not chunk_index_json:
|
|
|
+ server_logger.warning(f"文件分块索引不存在: {file_id}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ chunk_index = json.loads(chunk_index_json.decode('utf-8'))
|
|
|
+ chunk_count = chunk_index['chunk_count']
|
|
|
+
|
|
|
+ # 并行获取所有分块
|
|
|
+ tasks = []
|
|
|
+ for i in range(chunk_count):
|
|
|
+ chunk_key = f"chunk:{file_id}:{i}"
|
|
|
+ task = redis_store.get(chunk_key)
|
|
|
+ tasks.append(task)
|
|
|
+
|
|
|
+ # 并行执行获取
|
|
|
+ chunks = await asyncio.gather(*tasks)
|
|
|
+
|
|
|
+ # 组装文件内容
|
|
|
+ file_content = b''.join(chunks)
|
|
|
+ return file_content
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ server_logger.error(f"获取分块文件失败: {file_id}, {str(e)}")
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+async def _delete_file_chunks(file_id: str) -> bool:
|
|
|
+ """
|
|
|
+ 删除大文件分块(内部方法)
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ redis_store = await RedisConnectionFactory.get_redis_store()
|
|
|
+
|
|
|
+ # 获取分块索引
|
|
|
+ chunk_index_json = await redis_store.get(f"chunks:{file_id}")
|
|
|
+ if not chunk_index_json:
|
|
|
+ return True # 可能已经删除了
|
|
|
+
|
|
|
+ chunk_index = json.loads(chunk_index_json.decode('utf-8'))
|
|
|
+ chunk_count = chunk_index['chunk_count']
|
|
|
+
|
|
|
+ # 构造要删除的所有键
|
|
|
+ keys_to_delete = [f"chunks:{file_id}"]
|
|
|
+ keys_to_delete.extend([f"chunk:{file_id}:{i}" for i in range(chunk_count)])
|
|
|
+
|
|
|
+ # 批量删除
|
|
|
+ await redis_store.delete(*keys_to_delete)
|
|
|
+ return True
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ server_logger.error(f"删除文件分块失败: {file_id}, {str(e)}")
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+@track_execution_time
|
|
|
+async def store_file_info(file_id: str, file_info: Dict[str, Any], expire_seconds: int = 3600) -> bool:
|
|
|
+ """
|
|
|
+ 存储文件信息(自动优化:小文件直接存储,大文件分块存储)
|
|
|
+
|
|
|
+ Args:
|
|
|
+ file_id: 文件ID
|
|
|
+ file_info: 文件信息字典
|
|
|
+ expire_seconds: 过期时间(秒),默认1小时
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ bool: 存储是否成功
|
|
|
+ """
|
|
|
+ # 直接存储开关,True表示使用直接存储,False表示使用分块存储
|
|
|
+ direct_storage = True
|
|
|
+ try:
|
|
|
+ redis_store = await RedisConnectionFactory.get_redis_store()
|
|
|
+
|
|
|
+ # 检查是否已存在,避免重复存储
|
|
|
+ existing_meta = await redis_store.get(f"meta:{file_id}")
|
|
|
+ if existing_meta:
|
|
|
+ server_logger.info(f"文件信息已存在,跳过存储: {file_id}")
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 提取文件内容
|
|
|
+ file_content = file_info.get('file_content')
|
|
|
+
|
|
|
+ if file_content:
|
|
|
+ file_size = len(file_content)
|
|
|
+ chunk_threshold = 50 * 1024 * 1024 # 50MB阈值
|
|
|
+
|
|
|
+ # 根据文件大小和强制参数选择存储策略
|
|
|
+ if direct_storage or file_size <= chunk_threshold:
|
|
|
+ storage_method = "直接存储" if direct_storage else "直接存储"
|
|
|
+ server_logger.info(f"使用{storage_method}策略: {file_id}, {file_size/1024/1024:.2f}MB")
|
|
|
+
|
|
|
+ # 直接存储
|
|
|
+ metadata = {k: v for k, v in file_info.items() if k != 'file_content'}
|
|
|
+ metadata['storage_type'] = 'direct_test' if direct_storage else 'direct'
|
|
|
+ metadata['file_size'] = file_size
|
|
|
+
|
|
|
+ # 并行执行元数据和内容存储以提高性能
|
|
|
+ tasks = [
|
|
|
+ redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(metadata)),
|
|
|
+ redis_store.setex(f"content:{file_id}", expire_seconds, file_content)
|
|
|
+ ]
|
|
|
+ await asyncio.gather(*tasks)
|
|
|
+
|
|
|
+ else:
|
|
|
+ server_logger.info(f"使用分块存储策略: {file_id}, {file_size/1024/1024:.2f}MB > 50MB")
|
|
|
+
|
|
|
+ # 分块存储文件内容
|
|
|
+ chunk_success = await _store_file_chunked(file_id, file_content, expire_seconds=expire_seconds)
|
|
|
+ if not chunk_success:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 存储元数据(不含文件内容)
|
|
|
+ metadata = {k: v for k, v in file_info.items() if k != 'file_content'}
|
|
|
+ metadata['storage_type'] = 'chunked'
|
|
|
+ metadata['file_size'] = file_size
|
|
|
+
|
|
|
+ await redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(metadata))
|
|
|
+ else:
|
|
|
+ # 没有文件内容,只存元数据
|
|
|
+ metadata = file_info.copy()
|
|
|
+ metadata['storage_type'] = 'metadata_only'
|
|
|
+ await redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(metadata))
|
|
|
+
|
|
|
+ server_logger.info(f"文件信息已存储到Redis: {file_id}")
|
|
|
+ return True
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ server_logger.error(f"存储文件信息到Redis失败: {str(e)}")
|
|
|
+ return False
|
|
|
+
|
|
|
+@track_execution_time
|
|
|
+async def get_file_info(file_id: str, include_content: bool = True) -> Optional[Dict[str, Any]]:
|
|
|
+ """
|
|
|
+ 根据file_id获取文件信息(自动适配分块和直接存储)
|
|
|
+
|
|
|
+ Args:
|
|
|
+ file_id: 文件ID
|
|
|
+ include_content: 是否包含文件内容(默认True),可选False以提高效率
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Dict: 文件信息字典,如果不存在返回None
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ redis_store = await RedisConnectionFactory.get_redis_store()
|
|
|
+
|
|
|
+ # 获取元数据
|
|
|
+ meta_key = f"meta:{file_id}"
|
|
|
+ meta_bytes = await redis_store.get(meta_key)
|
|
|
+
|
|
|
+ if not meta_bytes:
|
|
|
+ server_logger.warning(f"文件元数据不存在: {meta_key}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ # 解析元数据
|
|
|
+ file_info = json.loads(meta_bytes.decode('utf-8'))
|
|
|
+ storage_type = file_info.get('storage_type', 'direct')
|
|
|
+
|
|
|
+ # 根据存储类型获取文件内容
|
|
|
+ if include_content and 'file_size' in file_info:
|
|
|
+ if storage_type == 'chunked':
|
|
|
+ # 从分块中获取文件内容
|
|
|
+ file_content = await _get_file_chunked(file_id)
|
|
|
+ if file_content:
|
|
|
+ file_info['file_content'] = file_content
|
|
|
+ else:
|
|
|
+ server_logger.warning(f"分块文件内容获取失败: {file_id}")
|
|
|
+ elif storage_type == 'direct':
|
|
|
+ # 直接获取文件内容
|
|
|
+ content_key = f"content:{file_id}"
|
|
|
+ file_content = await redis_store.get(content_key)
|
|
|
+ if file_content:
|
|
|
+ file_info['file_content'] = file_content
|
|
|
+ else:
|
|
|
+ server_logger.warning(f"文件内容不存在: {content_key}")
|
|
|
+
|
|
|
+ server_logger.info(f"从Redis获取到文件信息: {meta_key}, 存储类型: {storage_type}")
|
|
|
+ return file_info
|
|
|
+
|
|
|
+ except json.JSONDecodeError as e:
|
|
|
+ server_logger.error(f"解析文件元数据JSON失败: {str(e)}")
|
|
|
+ return None
|
|
|
+ except Exception as e:
|
|
|
+ server_logger.error(f"获取文件信息失败: {str(e)}")
|
|
|
+ return None
|
|
|
+
|
|
|
+@track_execution_time
|
|
|
+async def delete_file_info(file_id: str) -> bool:
|
|
|
+ """
|
|
|
+ 删除文件信息(自动适配分块和直接存储)
|
|
|
+
|
|
|
+ Args:
|
|
|
+ file_id: 文件ID
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ bool: 删除是否成功
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ redis_store = await RedisConnectionFactory.get_redis_store()
|
|
|
+
|
|
|
+ # 获取元数据以确定存储类型
|
|
|
+ meta_key = f"meta:{file_id}"
|
|
|
+ meta_bytes = await redis_store.get(meta_key)
|
|
|
+
|
|
|
+ if not meta_bytes:
|
|
|
+ server_logger.warning(f"文件元数据不存在: {meta_key}")
|
|
|
+ return True # 可能已经删除了
|
|
|
+
|
|
|
+ # 解析元数据
|
|
|
+ file_info = json.loads(meta_bytes.decode('utf-8'))
|
|
|
+ storage_type = file_info.get('storage_type', 'direct')
|
|
|
+
|
|
|
+ # 根据存储类型删除相应的内容
|
|
|
+ deleted_count = 0
|
|
|
+
|
|
|
+ # 删除元数据
|
|
|
+ deleted_count += await redis_store.delete(meta_key)
|
|
|
+
|
|
|
+ if storage_type == 'chunked':
|
|
|
+ # 删除分块内容
|
|
|
+ chunk_success = await _delete_file_chunks(file_id)
|
|
|
+ if chunk_success:
|
|
|
+ server_logger.info(f"已删除分块文件内容: {file_id}")
|
|
|
+ elif storage_type == 'direct':
|
|
|
+ # 删除直接存储的内容
|
|
|
+ content_key = f"content:{file_id}"
|
|
|
+ deleted_count += await redis_store.delete(content_key)
|
|
|
+
|
|
|
+ if deleted_count > 0:
|
|
|
+ server_logger.info(f"已删除文件信息: {file_id}, {deleted_count}个键")
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ server_logger.warning(f"Redis缓存不存在,无法删除: {file_id}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ except json.JSONDecodeError as e:
|
|
|
+ server_logger.error(f"解析文件元数据JSON失败: {str(e)}")
|
|
|
+ return False
|
|
|
+ except Exception as e:
|
|
|
+ server_logger.error(f"删除文件信息失败: {str(e)}")
|
|
|
+ return False
|
|
|
+
|
|
|
+#asyncio.run(delete_file_info('e385049cde7d21a48c7de216182f0f23'))
|
|
|
+
|