# !/usr/bin/ python # -*- coding: utf-8 -*- ''' @Project : lq-agent-api @File :test.py.py @IDE :PyCharm @Author : @Date :2025/7/11 12:23 ''' import os import sys from sqlalchemy.orm import session sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) import asyncio from base.redis_connection import RedisConnectionFactory #from langchain_core.checkpoints import BaseCheckpointSaver async def main(): # 直接获取 RedisStore redis_store = await RedisConnectionFactory.get_redis_store() await redis_store.set("some_key", "some_value") data = await redis_store.get("some_key") print(data) # 2. 创建 checkpointer # checkpointer = AsyncCheckpointSaver(store=redis_store) # print(checkpointer) # 直接获取 RedisStore #self.redis_store = await RedisConnectionFactory.get_langchain_redis_store() from langgraph.checkpoint.redis import RedisSaver from redis import Redis import redis # Step 1: 连接 Redis redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True) checkpointer = RedisSaver(redis_client=redis_client) print(f"checkpointer={checkpointer}") #from langchain.storage import RedisStore #from langchain_community.storage import RedisStore #redis_store = RedisStore.from_client(redis_client) #AttributeError: type object 'RedisStore' has no attribute 'from_client' # 报错 UnboundLocalError: cannot access local variable 'RedisStore' where it is not associated with a value #print(f"redis_store={redis_store}") #from langchain_community.checkpoint import RedisCheckpointer #from langchain.agents import RedisCheckpointer #from langchain.memory import RedisCheckpointer #checkpointer2 = RedisCheckpointer(redis_store) #print(f"checkpointer2={checkpointer2}") #from langchain.storage import RedisStore # 已经过时 from langchain_community.storage import RedisStore from langchain.memory import ConversationBufferMemory from langchain_community.chat_message_histories import RedisChatMessageHistory session_id = "session_id" # 初始化 Redis 存储 redis_url = "redis://localhost:6379/0" #redis_store = RedisStore.from_url(redis_url) redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True) redis_store = RedisStore(client=redis_client) # 使用 RedisChatMessageHistory 存储对话历史 chat_history = RedisChatMessageHistory( session_id=session_id, # 唯一标识会话 url=redis_url # 或直接使用 redis_client ) # 使用 Redis 存储记忆 memory = ConversationBufferMemory( memory_key="chat_history", return_messages=True, chat_memory=chat_history # 或其他兼容存储 ) print(f"memory={memory}") # redis_client = Redis.from_url("redis://localhost:6379") # checkpointer = RedisSaver(connection=redis_client) # checkpointer.setup() asyncio.run(main())