# !/usr/bin/ python # -*- coding: utf-8 -*- ''' @Project : lq-agent-api @File :test.py.py @IDE :PyCharm @Author : @Date :2025/7/11 12:23 ''' import os import sys sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) import redis #from langchain.storage import RedisStore from langchain_community.storage import RedisStore import asyncio from redis import Redis as SyncRedisClient async def main(): # 创建同步Redis客户端(注意不是aioredis) redis_client = redis.Redis.from_url( "redis://localhost:6379", decode_responses=False # LangChain需要bytes ) # redis_client = SyncRedisClient.from_url( # "redis://localhost:6379", # decode_responses=False # LangChain需要bytes # ) # 创建存储 store = RedisStore(client=redis_client) # 存储数据(注意是set不是aset) # 存储数据:使用 mset(即使只有一个键) store.mset([("test_key", b"test_value")]) # 获取数据:使用 mget value = store.mget(["test_key"]) print(f"Retrieved: {value}") # [b'test_value'] # 如果你想提取第一个值 if value and value[0] is not None: print(f"Value: {value[0].decode('utf-8')}") # 输出: Value: test_value # 关闭连接(可选) redis_client.close() asyncio.run(main())