| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- import json
- import time
- import asyncio
- import sys
- from pathlib import Path
- # root_dir = Path(__file__).parent.parent.parent
- # print(root_dir)
- # sys.path.append(str(root_dir))
- from typing import Dict, Optional, Any
- from .time_statistics import track_execution_time
- from foundation.base.config import config_handler
- from foundation.logger.loggering import server_logger
- from foundation.base.redis_connection import RedisConnectionFactory
- # 缓存数据有效期 默认 3 分钟
- CACHE_DATA_EXPIRED_TIME = 3 * 60
- async def set_redis_result_cache_data(data_type: str , trace_id: str, value: str):
- """
- 设置redis结果缓存数据
- @param data_type: 数据类型,基本信息 cattle_info、体温信息 cattle_temperature 、步数信息 cattle_walk
- @param trace_id: 链路跟踪ID
- @param value: 缓存数据
- """
- expired_time = config_handler.get("api", "CACHE_DATA_EXPIRED_TIME" , CACHE_DATA_EXPIRED_TIME)
- key = f"{trace_id}:{data_type}"
- # 直接获取 RedisStore
- redis_store = await RedisConnectionFactory.get_redis_store()
- await redis_store.set(key, value , ex=expired_time)
- async def get_redis_result_cache_data(data_type: str , trace_id: str):
- """
- 获取redis结果缓存数据
- @param data_type: 数据类型,基本信息 cattle_info、体温信息 cattle_temperature 、步数信息 cattle_walk
- @param trace_id: 链路跟踪ID
- """
- key = f"{trace_id}:{data_type}"
- # 直接获取 RedisStore
- redis_store = await RedisConnectionFactory.get_redis_store()
- value = await redis_store.get(key)
- value = value.decode('utf-8')
- return value
- async def get_redis_result_cache_data_and_delete_key(data_type: str , trace_id: str):
- """
- 获取redis结果缓存数据
- @param data_type: 数据类型,基本信息 cattle_info、体温信息 cattle_temperature 、步数信息 cattle_walk
- @param trace_id: 链路跟踪ID
- """
- key = f"{trace_id}:{data_type}"
- # 直接获取 RedisStore
- redis_store = await RedisConnectionFactory.get_redis_store()
- value = await redis_store.get(key)
- server_logger.info(f"获取redis结果缓存数据: {key}-{value}")
- if value is None:
- return None
- # 第一步:转成字符串(decode)
- json_str = value.decode('utf-8')
- # 第二步:解析 JSON
- data = json.loads(json_str)
- # 删除key
- #await redis_store.delete(key)
- return data
- @track_execution_time
- async def store_file_info(file_id: str, file_info: Dict[str, Any], expire_seconds: int = 3600) -> bool:
- """
- 存储文件信息(直接存储模式)
- Args:
- file_id: 文件ID
- file_info: 文件信息字典
- expire_seconds: 过期时间(秒),默认1小时
- Returns:
- bool: 存储是否成功
- """
- try:
- redis_store = await RedisConnectionFactory.get_redis_store()
- # 检查是否已存在,避免重复存储
- existing_meta = await redis_store.get(f"meta:{file_id}")
- if existing_meta:
- server_logger.info(f"文件信息已存在,跳过存储: {file_id}")
- return True
- # 提取文件内容
- file_content = file_info.get('file_content')
- if file_content:
- file_size = len(file_content)
- server_logger.info(f"使用直接存储策略: {file_id}, {file_size/1024/1024:.2f}MB")
- # 直接存储
- metadata = {k: v for k, v in file_info.items() if k != 'file_content'}
- metadata['file_size'] = file_size
- # 并行执行元数据和内容存储以提高性能
- tasks = [
- redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(metadata)),
- redis_store.setex(f"content:{file_id}", expire_seconds, file_content)
- ]
- await asyncio.gather(*tasks)
- else:
- # 没有文件内容,只存元数据
- metadata = file_info.copy()
- await redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(metadata))
- server_logger.info(f"文件信息已存储到Redis: {file_id}")
- return True
- except Exception as e:
- server_logger.error(f"存储文件信息到Redis失败: {str(e)}")
- return False
- @track_execution_time
- async def get_file_info(file_id: str, include_content: bool = True) -> Optional[Dict[str, Any]]:
- """
- 根据file_id获取文件信息
- Args:
- file_id: 文件ID
- include_content: 是否包含文件内容(默认True),可选False以提高效率
- Returns:
- Dict: 文件信息字典,如果不存在返回None
- """
- try:
- redis_store = await RedisConnectionFactory.get_redis_store()
- # 获取元数据
- meta_key = f"meta:{file_id}"
- meta_bytes = await redis_store.get(meta_key)
- if not meta_bytes:
- server_logger.warning(f"文件元数据不存在: {meta_key}")
- return None
- # 解析元数据
- file_info = json.loads(meta_bytes.decode('utf-8'))
- # 根据存储类型获取文件内容
- if include_content and 'file_size' in file_info:
- # 直接获取文件内容
- content_key = f"content:{file_id}"
- file_content = await redis_store.get(content_key)
- if file_content:
- file_info['file_content'] = file_content
- else:
- server_logger.warning(f"文件内容不存在: {content_key}")
- return None # 文件内容缺失,返回None
- server_logger.info(f"从Redis获取到文件信息: {meta_key}")
- return file_info
- except json.JSONDecodeError as e:
- server_logger.error(f"解析文件元数据JSON失败: {str(e)}")
- return None
- except Exception as e:
- server_logger.error(f"获取文件信息失败: {str(e)}")
- return None
- async def delete_file_info(file_id: str) -> bool:
- """
- 删除文件信息
- Args:
- file_id: 文件ID
- Returns:
- bool: 删除是否成功
- """
- try:
- # 为了避免事件循环冲突,直接创建新的Redis连接
- from foundation.base.redis_config import load_config_from_env
- from foundation.base.redis_connection import RedisAdapter
- redis_config = load_config_from_env()
- adapter = RedisAdapter(redis_config)
- await adapter.connect()
- redis_store = adapter.get_langchain_redis_client()
- # 获取元数据以确定存储类型
- meta_key = f"meta:{file_id}"
- meta_bytes = await redis_store.get(meta_key)
- if not meta_bytes:
- server_logger.warning(f"文件元数据不存在: {meta_key}")
- # 清理连接
- await adapter.close()
- return True # 可能已经删除了
- # 解析元数据
- file_info = json.loads(meta_bytes.decode('utf-8'))
- # 删除相应的内容
- deleted_count = 0
- # 删除元数据
- deleted_count += await redis_store.delete(meta_key)
- # 如果有文件大小信息,说明有文件内容,需要删除
- if 'file_size' in file_info:
- # 删除文件内容
- content_key = f"content:{file_id}"
- deleted_count += await redis_store.delete(content_key)
- if deleted_count > 0:
- server_logger.info(f"已删除文件信息: {file_id}, {deleted_count}个键")
- else:
- server_logger.warning(f"Redis缓存不存在,无法删除: {file_id}")
- # 清理连接
- await adapter.close()
- return True if deleted_count > 0 else False
- except json.JSONDecodeError as e:
- server_logger.error(f"解析文件元数据JSON失败: {str(e)}")
- # 清理连接
- await adapter.close()
- return False
- except Exception as e:
- server_logger.error(f"删除文件信息失败: {str(e)}")
- # 清理连接
- await adapter.close()
- return False
- finally:
- # 确保连接被关闭
- await adapter.close()
- #asyncio.run(delete_file_info('e385049cde7d21a48c7de216182f0f23'))
|