redis_utils.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. import json
  2. import time
  3. import asyncio
  4. import sys
  5. from pathlib import Path
  6. # root_dir = Path(__file__).parent.parent.parent
  7. # print(root_dir)
  8. # sys.path.append(str(root_dir))
  9. from typing import Dict, Optional, Any
  10. from foundation.observability.monitoring.time_statistics import track_execution_time
  11. from foundation.infrastructure.config import config_handler
  12. from foundation.observability.logger.loggering import server_logger
  13. from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory
  14. # 缓存数据有效期 默认 3 分钟
  15. CACHE_DATA_EXPIRED_TIME = 3 * 60
  16. async def set_redis_result_cache_data(data_type: str , trace_id: str, value: str):
  17. """
  18. 设置redis结果缓存数据
  19. @param data_type: 数据类型,基本信息 cattle_info、体温信息 cattle_temperature 、步数信息 cattle_walk
  20. @param trace_id: 链路跟踪ID
  21. @param value: 缓存数据
  22. """
  23. expired_time = config_handler.get("api", "CACHE_DATA_EXPIRED_TIME" , CACHE_DATA_EXPIRED_TIME)
  24. key = f"{trace_id}:{data_type}"
  25. # 直接获取 RedisStore
  26. redis_store = await RedisConnectionFactory.get_redis_store()
  27. await redis_store.set(key, value , ex=expired_time)
  28. async def get_redis_result_cache_data(data_type: str , trace_id: str):
  29. """
  30. 获取redis结果缓存数据
  31. @param data_type: 数据类型,基本信息 cattle_info、体温信息 cattle_temperature 、步数信息 cattle_walk
  32. @param trace_id: 链路跟踪ID
  33. """
  34. key = f"{trace_id}:{data_type}"
  35. # 直接获取 RedisStore
  36. redis_store = await RedisConnectionFactory.get_redis_store()
  37. value = await redis_store.get(key)
  38. value = value.decode('utf-8')
  39. return value
  40. async def get_redis_result_cache_data_and_delete_key(data_type: str , trace_id: str):
  41. """
  42. 获取redis结果缓存数据
  43. @param data_type: 数据类型,基本信息 cattle_info、体温信息 cattle_temperature 、步数信息 cattle_walk
  44. @param trace_id: 链路跟踪ID
  45. """
  46. key = f"{trace_id}:{data_type}"
  47. # 直接获取 RedisStore
  48. redis_store = await RedisConnectionFactory.get_redis_store()
  49. value = await redis_store.get(key)
  50. server_logger.info(f"获取redis结果缓存数据: {key}-{value}")
  51. if value is None:
  52. return None
  53. # 第一步:转成字符串(decode)
  54. json_str = value.decode('utf-8')
  55. # 第二步:解析 JSON
  56. data = json.loads(json_str)
  57. # 删除key
  58. #await redis_store.delete(key)
  59. return data
  60. @track_execution_time
  61. async def store_file_info(file_id: str, file_info: Dict[str, Any], expire_seconds: int = 3600, force_update: bool = False) -> bool:
  62. """
  63. 存储文件信息(直接存储模式)
  64. Args:
  65. file_id: 文件ID
  66. file_info: 文件信息字典
  67. expire_seconds: 过期时间(秒),默认1小时
  68. force_update: 是否强制更新已存在的文件信息
  69. Returns:
  70. bool: 存储是否成功
  71. """
  72. try:
  73. redis_store = await RedisConnectionFactory.get_redis_store()
  74. # 检查是否已存在,如果存在则更新callback_task_id
  75. existing_meta = await redis_store.get(f"meta:{file_id}")
  76. if existing_meta:
  77. # 解析现有元数据
  78. existing_file_info = json.loads(existing_meta.decode('utf-8'))
  79. # 更新callback_task_id为最新的
  80. if 'callback_task_id' in file_info:
  81. existing_file_info['callback_task_id'] = file_info['callback_task_id']
  82. elif 'callback_task_id' not in existing_file_info:
  83. # 如果两者都没有callback_task_id,添加一个新的
  84. existing_file_info['callback_task_id'] = None
  85. # 并行更新meta和content的TTL,确保同步过期
  86. update_tasks = [
  87. redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(existing_file_info))
  88. ]
  89. # 如果存在content,也需要更新其TTL以保持同步
  90. content_key = f"content:{file_id}"
  91. existing_content = await redis_store.get(content_key)
  92. if existing_content:
  93. update_tasks.append(redis_store.setex(content_key, expire_seconds, existing_content))
  94. server_logger.info(f"同步更新content的TTL: {content_key}")
  95. else:
  96. server_logger.warning(f"未找到content键,只更新meta TTL: {content_key}")
  97. # 执行并行更新
  98. await asyncio.gather(*update_tasks)
  99. server_logger.info(f"文件信息已存在,同步更新TTL: {file_id} -> {existing_file_info['callback_task_id']}")
  100. return True
  101. # 提取文件内容
  102. file_content = file_info.get('file_content')
  103. if file_content:
  104. file_size = len(file_content)
  105. server_logger.info(f"使用直接存储策略: {file_id}, {file_size/1024/1024:.2f}MB")
  106. # 直接存储
  107. metadata = {k: v for k, v in file_info.items() if k != 'file_content'}
  108. metadata['file_size'] = file_size
  109. # 并行执行元数据和内容存储以提高性能
  110. tasks = [
  111. redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(metadata)),
  112. redis_store.setex(f"content:{file_id}", expire_seconds, file_content)
  113. ]
  114. await asyncio.gather(*tasks)
  115. else:
  116. # 没有文件内容,只存元数据
  117. metadata = file_info.copy()
  118. await redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(metadata))
  119. server_logger.info(f"文件信息已存储到Redis: {file_id}")
  120. return True
  121. except Exception as e:
  122. server_logger.error(f"存储文件信息到Redis失败: {str(e)}")
  123. return False
  124. @track_execution_time
  125. async def get_file_info(file_id: str, include_content: bool = True) -> Optional[Dict[str, Any]]:
  126. """
  127. 根据file_id获取文件信息
  128. Args:
  129. file_id: 文件ID
  130. include_content: 是否包含文件内容(默认True),可选False以提高效率
  131. Returns:
  132. Dict: 文件信息字典,如果不存在返回None
  133. """
  134. try:
  135. redis_store = await RedisConnectionFactory.get_redis_store()
  136. # 获取元数据
  137. meta_key = f"meta:{file_id}"
  138. meta_bytes = await redis_store.get(meta_key)
  139. if not meta_bytes:
  140. server_logger.warning(f"文件元数据不存在: {meta_key}")
  141. return None
  142. # 解析元数据
  143. file_info = json.loads(meta_bytes.decode('utf-8'))
  144. # 根据存储类型获取文件内容
  145. if include_content and 'file_size' in file_info:
  146. # 直接获取文件内容
  147. content_key = f"content:{file_id}"
  148. file_content = await redis_store.get(content_key)
  149. if file_content:
  150. file_info['file_content'] = file_content
  151. else:
  152. server_logger.warning(f"文件内容不存在: {content_key}")
  153. return None # 文件内容缺失,返回None
  154. server_logger.info(f"从Redis获取到文件信息: {meta_key}")
  155. return file_info
  156. except json.JSONDecodeError as e:
  157. server_logger.error(f"解析文件元数据JSON失败: {str(e)}")
  158. return None
  159. except Exception as e:
  160. server_logger.error(f"获取文件信息失败: {str(e)}")
  161. return None
  162. async def delete_file_info(file_id: str) -> bool:
  163. """
  164. 删除文件信息
  165. Args:
  166. file_id: 文件ID
  167. Returns:
  168. bool: 删除是否成功
  169. """
  170. try:
  171. # 为了避免事件循环冲突,直接创建新的Redis连接
  172. from foundation.infrastructure.cache.redis_config import load_config_from_env
  173. from foundation.infrastructure.cache.redis_connection import RedisAdapter
  174. redis_config = load_config_from_env()
  175. adapter = RedisAdapter(redis_config)
  176. await adapter.connect()
  177. redis_store = adapter.get_langchain_redis_client()
  178. # 获取元数据以确定存储类型
  179. meta_key = f"meta:{file_id}"
  180. meta_bytes = await redis_store.get(meta_key)
  181. if not meta_bytes:
  182. server_logger.warning(f"文件元数据不存在: {meta_key}")
  183. # 清理连接
  184. await adapter.close()
  185. return True # 可能已经删除了
  186. # 解析元数据
  187. file_info = json.loads(meta_bytes.decode('utf-8'))
  188. # 删除相应的内容
  189. deleted_count = 0
  190. # 删除元数据
  191. deleted_count += await redis_store.delete(meta_key)
  192. # 如果有文件大小信息,说明有文件内容,需要删除
  193. if 'file_size' in file_info:
  194. # 删除文件内容
  195. content_key = f"content:{file_id}"
  196. deleted_count += await redis_store.delete(content_key)
  197. if deleted_count > 0:
  198. server_logger.info(f"已删除文件信息: {file_id}, {deleted_count}个键")
  199. else:
  200. server_logger.warning(f"Redis缓存不存在,无法删除: {file_id}")
  201. # 清理连接
  202. await adapter.close()
  203. return True if deleted_count > 0 else False
  204. except json.JSONDecodeError as e:
  205. server_logger.error(f"解析文件元数据JSON失败: {str(e)}")
  206. # 清理连接
  207. await adapter.close()
  208. return False
  209. except Exception as e:
  210. server_logger.error(f"删除文件信息失败: {str(e)}")
  211. # 清理连接
  212. await adapter.close()
  213. return False
  214. finally:
  215. # 确保连接被关闭
  216. await adapter.close()
  217. #asyncio.run(delete_file_info('e385049cde7d21a48c7de216182f0f23'))