redis_utils.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. import json
  2. import time
  3. import asyncio
  4. import sys
  5. from pathlib import Path
  6. # root_dir = Path(__file__).parent.parent.parent
  7. # print(root_dir)
  8. # sys.path.append(str(root_dir))
  9. from typing import Dict, Optional, Any
  10. from .time_statistics import track_execution_time
  11. from foundation.base.config import config_handler
  12. from foundation.logger.loggering import server_logger
  13. from foundation.base.redis_connection import RedisConnectionFactory
  14. # 缓存数据有效期 默认 3 分钟
  15. CACHE_DATA_EXPIRED_TIME = 3 * 60
  16. async def set_redis_result_cache_data(data_type: str , trace_id: str, value: str):
  17. """
  18. 设置redis结果缓存数据
  19. @param data_type: 数据类型,基本信息 cattle_info、体温信息 cattle_temperature 、步数信息 cattle_walk
  20. @param trace_id: 链路跟踪ID
  21. @param value: 缓存数据
  22. """
  23. expired_time = config_handler.get("api", "CACHE_DATA_EXPIRED_TIME" , CACHE_DATA_EXPIRED_TIME)
  24. key = f"{trace_id}:{data_type}"
  25. # 直接获取 RedisStore
  26. redis_store = await RedisConnectionFactory.get_redis_store()
  27. await redis_store.set(key, value , ex=expired_time)
  28. async def get_redis_result_cache_data(data_type: str , trace_id: str):
  29. """
  30. 获取redis结果缓存数据
  31. @param data_type: 数据类型,基本信息 cattle_info、体温信息 cattle_temperature 、步数信息 cattle_walk
  32. @param trace_id: 链路跟踪ID
  33. """
  34. key = f"{trace_id}:{data_type}"
  35. # 直接获取 RedisStore
  36. redis_store = await RedisConnectionFactory.get_redis_store()
  37. value = await redis_store.get(key)
  38. value = value.decode('utf-8')
  39. return value
  40. async def get_redis_result_cache_data_and_delete_key(data_type: str , trace_id: str):
  41. """
  42. 获取redis结果缓存数据
  43. @param data_type: 数据类型,基本信息 cattle_info、体温信息 cattle_temperature 、步数信息 cattle_walk
  44. @param trace_id: 链路跟踪ID
  45. """
  46. key = f"{trace_id}:{data_type}"
  47. # 直接获取 RedisStore
  48. redis_store = await RedisConnectionFactory.get_redis_store()
  49. value = await redis_store.get(key)
  50. server_logger.info(f"获取redis结果缓存数据: {key}-{value}")
  51. if value is None:
  52. return None
  53. # 第一步:转成字符串(decode)
  54. json_str = value.decode('utf-8')
  55. # 第二步:解析 JSON
  56. data = json.loads(json_str)
  57. # 删除key
  58. #await redis_store.delete(key)
  59. return data
  60. @track_execution_time
  61. async def _store_file_chunked(file_id: str, file_content: bytes, chunk_size: int = 1024*1024, expire_seconds: int = 3600) -> bool:
  62. """
  63. 分块存储大文件内容(内部方法)
  64. """
  65. try:
  66. redis_store = await RedisConnectionFactory.get_redis_store()
  67. file_size = len(file_content)
  68. server_logger.info(f"开始分块存储文件: {file_id}, 大小: {file_size/1024/1024:.2f}MB, 分块大小: {chunk_size/1024/1024:.2f}MB")
  69. # 计算分块数量
  70. chunk_count = (file_size + chunk_size - 1) // chunk_size
  71. # 创建分块索引信息
  72. chunk_index = {
  73. 'file_id': file_id,
  74. 'file_size': file_size,
  75. 'chunk_size': chunk_size,
  76. 'chunk_count': chunk_count,
  77. 'created_at': int(time.time())
  78. }
  79. # 存储分块索引
  80. await redis_store.setex(f"chunks:{file_id}", expire_seconds, json.dumps(chunk_index))
  81. # 分块存储文件内容
  82. tasks = []
  83. for i in range(chunk_count):
  84. start = i * chunk_size
  85. end = min(start + chunk_size, file_size)
  86. chunk_data = file_content[start:end]
  87. chunk_key = f"chunk:{file_id}:{i}"
  88. task = redis_store.setex(chunk_key, expire_seconds, chunk_data)
  89. tasks.append(task)
  90. # 并行执行所有分块存储
  91. await asyncio.gather(*tasks)
  92. server_logger.info(f"文件分块存储完成: {file_id}, {chunk_count}个块")
  93. return True
  94. except Exception as e:
  95. server_logger.error(f"分块存储文件失败: {file_id}, {str(e)}")
  96. return False
  97. async def _get_file_chunked(file_id: str) -> bytes:
  98. """
  99. 获取大文件内容(从分块中组装)(内部方法)
  100. """
  101. try:
  102. redis_store = await RedisConnectionFactory.get_redis_store()
  103. # 获取分块索引
  104. chunk_index_json = await redis_store.get(f"chunks:{file_id}")
  105. if not chunk_index_json:
  106. server_logger.warning(f"文件分块索引不存在: {file_id}")
  107. return None
  108. chunk_index = json.loads(chunk_index_json.decode('utf-8'))
  109. chunk_count = chunk_index['chunk_count']
  110. # 并行获取所有分块
  111. tasks = []
  112. for i in range(chunk_count):
  113. chunk_key = f"chunk:{file_id}:{i}"
  114. task = redis_store.get(chunk_key)
  115. tasks.append(task)
  116. # 并行执行获取
  117. chunks = await asyncio.gather(*tasks)
  118. # 组装文件内容
  119. file_content = b''.join(chunks)
  120. return file_content
  121. except Exception as e:
  122. server_logger.error(f"获取分块文件失败: {file_id}, {str(e)}")
  123. return None
  124. async def _delete_file_chunks(file_id: str) -> bool:
  125. """
  126. 删除大文件分块(内部方法)
  127. """
  128. try:
  129. redis_store = await RedisConnectionFactory.get_redis_store()
  130. # 获取分块索引
  131. chunk_index_json = await redis_store.get(f"chunks:{file_id}")
  132. if not chunk_index_json:
  133. return True # 可能已经删除了
  134. chunk_index = json.loads(chunk_index_json.decode('utf-8'))
  135. chunk_count = chunk_index['chunk_count']
  136. # 构造要删除的所有键
  137. keys_to_delete = [f"chunks:{file_id}"]
  138. keys_to_delete.extend([f"chunk:{file_id}:{i}" for i in range(chunk_count)])
  139. # 批量删除
  140. await redis_store.delete(*keys_to_delete)
  141. return True
  142. except Exception as e:
  143. server_logger.error(f"删除文件分块失败: {file_id}, {str(e)}")
  144. return False
  145. @track_execution_time
  146. async def store_file_info(file_id: str, file_info: Dict[str, Any], expire_seconds: int = 3600) -> bool:
  147. """
  148. 存储文件信息(自动优化:小文件直接存储,大文件分块存储)
  149. Args:
  150. file_id: 文件ID
  151. file_info: 文件信息字典
  152. expire_seconds: 过期时间(秒),默认1小时
  153. Returns:
  154. bool: 存储是否成功
  155. """
  156. # 直接存储开关,True表示使用直接存储,False表示使用分块存储
  157. direct_storage = True
  158. try:
  159. redis_store = await RedisConnectionFactory.get_redis_store()
  160. # 检查是否已存在,避免重复存储
  161. existing_meta = await redis_store.get(f"meta:{file_id}")
  162. if existing_meta:
  163. server_logger.info(f"文件信息已存在,跳过存储: {file_id}")
  164. return True
  165. # 提取文件内容
  166. file_content = file_info.get('file_content')
  167. if file_content:
  168. file_size = len(file_content)
  169. chunk_threshold = 50 * 1024 * 1024 # 50MB阈值
  170. # 根据文件大小和强制参数选择存储策略
  171. if direct_storage or file_size <= chunk_threshold:
  172. storage_method = "直接存储" if direct_storage else "直接存储"
  173. server_logger.info(f"使用{storage_method}策略: {file_id}, {file_size/1024/1024:.2f}MB")
  174. # 直接存储
  175. metadata = {k: v for k, v in file_info.items() if k != 'file_content'}
  176. metadata['storage_type'] = 'direct_test' if direct_storage else 'direct'
  177. metadata['file_size'] = file_size
  178. # 并行执行元数据和内容存储以提高性能
  179. tasks = [
  180. redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(metadata)),
  181. redis_store.setex(f"content:{file_id}", expire_seconds, file_content)
  182. ]
  183. await asyncio.gather(*tasks)
  184. else:
  185. server_logger.info(f"使用分块存储策略: {file_id}, {file_size/1024/1024:.2f}MB > 50MB")
  186. # 分块存储文件内容
  187. chunk_success = await _store_file_chunked(file_id, file_content, expire_seconds=expire_seconds)
  188. if not chunk_success:
  189. return False
  190. # 存储元数据(不含文件内容)
  191. metadata = {k: v for k, v in file_info.items() if k != 'file_content'}
  192. metadata['storage_type'] = 'chunked'
  193. metadata['file_size'] = file_size
  194. await redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(metadata))
  195. else:
  196. # 没有文件内容,只存元数据
  197. metadata = file_info.copy()
  198. metadata['storage_type'] = 'metadata_only'
  199. await redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(metadata))
  200. server_logger.info(f"文件信息已存储到Redis: {file_id}")
  201. return True
  202. except Exception as e:
  203. server_logger.error(f"存储文件信息到Redis失败: {str(e)}")
  204. return False
  205. @track_execution_time
  206. async def get_file_info(file_id: str, include_content: bool = True) -> Optional[Dict[str, Any]]:
  207. """
  208. 根据file_id获取文件信息(自动适配分块和直接存储)
  209. Args:
  210. file_id: 文件ID
  211. include_content: 是否包含文件内容(默认True),可选False以提高效率
  212. Returns:
  213. Dict: 文件信息字典,如果不存在返回None
  214. """
  215. try:
  216. redis_store = await RedisConnectionFactory.get_redis_store()
  217. # 获取元数据
  218. meta_key = f"meta:{file_id}"
  219. meta_bytes = await redis_store.get(meta_key)
  220. if not meta_bytes:
  221. server_logger.warning(f"文件元数据不存在: {meta_key}")
  222. return None
  223. # 解析元数据
  224. file_info = json.loads(meta_bytes.decode('utf-8'))
  225. storage_type = file_info.get('storage_type', 'direct')
  226. # 根据存储类型获取文件内容
  227. if include_content and 'file_size' in file_info:
  228. if storage_type == 'chunked':
  229. # 从分块中获取文件内容
  230. file_content = await _get_file_chunked(file_id)
  231. if file_content:
  232. file_info['file_content'] = file_content
  233. else:
  234. server_logger.warning(f"分块文件内容获取失败: {file_id}")
  235. elif storage_type == 'direct':
  236. # 直接获取文件内容
  237. content_key = f"content:{file_id}"
  238. file_content = await redis_store.get(content_key)
  239. if file_content:
  240. file_info['file_content'] = file_content
  241. else:
  242. server_logger.warning(f"文件内容不存在: {content_key}")
  243. server_logger.info(f"从Redis获取到文件信息: {meta_key}, 存储类型: {storage_type}")
  244. return file_info
  245. except json.JSONDecodeError as e:
  246. server_logger.error(f"解析文件元数据JSON失败: {str(e)}")
  247. return None
  248. except Exception as e:
  249. server_logger.error(f"获取文件信息失败: {str(e)}")
  250. return None
  251. @track_execution_time
  252. async def delete_file_info(file_id: str) -> bool:
  253. """
  254. 删除文件信息(自动适配分块和直接存储)
  255. Args:
  256. file_id: 文件ID
  257. Returns:
  258. bool: 删除是否成功
  259. """
  260. try:
  261. redis_store = await RedisConnectionFactory.get_redis_store()
  262. # 获取元数据以确定存储类型
  263. meta_key = f"meta:{file_id}"
  264. meta_bytes = await redis_store.get(meta_key)
  265. if not meta_bytes:
  266. server_logger.warning(f"文件元数据不存在: {meta_key}")
  267. return True # 可能已经删除了
  268. # 解析元数据
  269. file_info = json.loads(meta_bytes.decode('utf-8'))
  270. storage_type = file_info.get('storage_type', 'direct')
  271. # 根据存储类型删除相应的内容
  272. deleted_count = 0
  273. # 删除元数据
  274. deleted_count += await redis_store.delete(meta_key)
  275. if storage_type == 'chunked':
  276. # 删除分块内容
  277. chunk_success = await _delete_file_chunks(file_id)
  278. if chunk_success:
  279. server_logger.info(f"已删除分块文件内容: {file_id}")
  280. elif storage_type == 'direct':
  281. # 删除直接存储的内容
  282. content_key = f"content:{file_id}"
  283. deleted_count += await redis_store.delete(content_key)
  284. if deleted_count > 0:
  285. server_logger.info(f"已删除文件信息: {file_id}, {deleted_count}个键")
  286. return True
  287. else:
  288. server_logger.warning(f"Redis缓存不存在,无法删除: {file_id}")
  289. return False
  290. except json.JSONDecodeError as e:
  291. server_logger.error(f"解析文件元数据JSON失败: {str(e)}")
  292. return False
  293. except Exception as e:
  294. server_logger.error(f"删除文件信息失败: {str(e)}")
  295. return False
  296. #asyncio.run(delete_file_info('e385049cde7d21a48c7de216182f0f23'))