import json from foundation.logger.loggering import server_logger from foundation.base.redis_connection import RedisConnectionFactory from foundation.base.config import config_handler # 缓存数据有效期 默认 3 分钟 CACHE_DATA_EXPIRED_TIME = 3 * 60 async def set_redis_result_cache_data(data_type: str , trace_id: str, value: str): """ 设置redis结果缓存数据 @param data_type: 数据类型,基本信息 cattle_info、体温信息 cattle_temperature 、步数信息 cattle_walk @param trace_id: 链路跟踪ID @param value: 缓存数据 """ expired_time = config_handler.get("api", "CACHE_DATA_EXPIRED_TIME" , CACHE_DATA_EXPIRED_TIME) key = f"{trace_id}:{data_type}" # 直接获取 RedisStore redis_store = await RedisConnectionFactory.get_redis_store() await redis_store.set(key, value , ex=expired_time) async def get_redis_result_cache_data(data_type: str , trace_id: str): """ 获取redis结果缓存数据 @param data_type: 数据类型,基本信息 cattle_info、体温信息 cattle_temperature 、步数信息 cattle_walk @param trace_id: 链路跟踪ID """ key = f"{trace_id}:{data_type}" # 直接获取 RedisStore redis_store = await RedisConnectionFactory.get_redis_store() value = await redis_store.get(key) value = value.decode('utf-8') return value async def get_redis_result_cache_data_and_delete_key(data_type: str , trace_id: str): """ 获取redis结果缓存数据 @param data_type: 数据类型,基本信息 cattle_info、体温信息 cattle_temperature 、步数信息 cattle_walk @param trace_id: 链路跟踪ID """ key = f"{trace_id}:{data_type}" # 直接获取 RedisStore redis_store = await RedisConnectionFactory.get_redis_store() value = await redis_store.get(key) server_logger.info(f"获取redis结果缓存数据: {key}-{value}") if value is None: return None # 第一步:转成字符串(decode) json_str = value.decode('utf-8') # 第二步:解析 JSON data = json.loads(json_str) # 删除key #await redis_store.delete(key) return data