|
|
@@ -0,0 +1,243 @@
|
|
|
+#!/usr/bin/env python
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+Redis CSV处理器
|
|
|
+功能:读取CSV文件,存入Redis,然后一次性读取并保存为新CSV文件
|
|
|
+"""
|
|
|
+
|
|
|
+import csv
|
|
|
+import pandas as pd
|
|
|
+import redis
|
|
|
+import json
|
|
|
+import configparser
|
|
|
+import os
|
|
|
+
|
|
|
+from foundation.observability.logger.loggering import server_logger as logger
|
|
|
+
|
|
|
+# 从config.ini读取Redis配置
|
|
|
+config = configparser.ConfigParser()
|
|
|
+config_path = os.path.join(os.path.dirname(__file__), '../../../../../../config/config.ini')
|
|
|
+config.read(config_path, encoding='utf-8')
|
|
|
+
|
|
|
+# Redis配置
|
|
|
+REDIS_HOST = config.get('redis', 'REDIS_HOST', fallback='localhost')
|
|
|
+REDIS_PORT = config.getint('redis', 'REDIS_PORT', fallback=6379)
|
|
|
+REDIS_PASSWORD = config.get('redis', 'REDIS_PASSWORD', fallback='')
|
|
|
+REDIS_DB = config.getint('redis', 'REDIS_DB', fallback=0)
|
|
|
+
|
|
|
+# 绑定ID
|
|
|
+BIND_ID = '2d5d99c823a6b1a19f770932f3237bf8-1768535328'
|
|
|
+
|
|
|
+# 文件路径
|
|
|
+INPUT_CSV = 'outlines_review_results.csv'
|
|
|
+OUTPUT_CSV = 'outlines_review_results_redis.csv'
|
|
|
+
|
|
|
+
|
|
|
+def get_redis_connection():
|
|
|
+ """获取Redis连接"""
|
|
|
+ try:
|
|
|
+ r = redis.Redis(
|
|
|
+ host=REDIS_HOST,
|
|
|
+ port=REDIS_PORT,
|
|
|
+ password=REDIS_PASSWORD,
|
|
|
+ db=REDIS_DB,
|
|
|
+ decode_responses=True
|
|
|
+ )
|
|
|
+ # 测试连接
|
|
|
+ r.ping()
|
|
|
+ logger.info(f"[OK] Redis连接成功 (host={REDIS_HOST}, port={REDIS_PORT})")
|
|
|
+ return r
|
|
|
+ except Exception as e:
|
|
|
+ logger.info(f"[ERROR] Redis连接失败: {e}")
|
|
|
+ raise
|
|
|
+
|
|
|
+
|
|
|
+def store_row_to_redis(redis_client, bind_id, row_key, row_data):
|
|
|
+ """将单行数据存储到Redis"""
|
|
|
+ row_json = json.dumps(row_data, ensure_ascii=False)
|
|
|
+ redis_client.hset(bind_id, row_key, row_json)
|
|
|
+
|
|
|
+
|
|
|
+def store_header_to_redis(redis_client, bind_id, header):
|
|
|
+ """将表头存储到Redis"""
|
|
|
+ header_json = json.dumps(header, ensure_ascii=False)
|
|
|
+ redis_client.hset(bind_id, "header", header_json)
|
|
|
+
|
|
|
+
|
|
|
+def df_store_to_redis(redis_client, data=None, bind_id=None):
|
|
|
+ """读取CSV文件并按行存入Redis
|
|
|
+
|
|
|
+ Args:
|
|
|
+ redis_client: Redis客户端
|
|
|
+ data: DataFrame数据(可选),如果为None则从CSV文件读取
|
|
|
+ bind_id: 绑定ID
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ list: 存储的行数据列表
|
|
|
+ """
|
|
|
+ logger.info(f"\n[Redis存储] 开始存储数据到Redis,bind_id: {bind_id}")
|
|
|
+
|
|
|
+ # 调试信息:记录Redis连接信息
|
|
|
+ try:
|
|
|
+ logger.info(f"[DEBUG] Redis连接信息: host={redis_client.connection_pool.connection_kwargs.get('host')}, "
|
|
|
+ f"port={redis_client.connection_pool.connection_kwargs.get('port')}, "
|
|
|
+ f"db={redis_client.connection_pool.connection_kwargs.get('db')}")
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"[DEBUG] 无法获取Redis连接信息: {e}")
|
|
|
+
|
|
|
+ if data is None:
|
|
|
+ # 使用pandas读取CSV文件
|
|
|
+ logger.info(f"[Redis存储] 从CSV文件读取数据: {INPUT_CSV}")
|
|
|
+ df = pd.read_csv(INPUT_CSV, encoding='utf-8-sig')
|
|
|
+ # header = df.columns.tolist()
|
|
|
+ rows = df.to_dict('records')
|
|
|
+
|
|
|
+ logger.info(f"[OK] 读取到 {len(rows)} 行数据")
|
|
|
+ # logger.info(f"[OK] CSV表头: {header}")
|
|
|
+ else:
|
|
|
+ logger.info(f"[Redis存储] 使用传入的DataFrame数据,共 {len(data)} 行")
|
|
|
+ rows = data.to_dict('records')
|
|
|
+
|
|
|
+ # 清空Redis中该ID的数据
|
|
|
+ redis_client.delete(bind_id)
|
|
|
+ logger.info(f"[OK] 清空Redis中ID '{bind_id}' 的旧数据")
|
|
|
+
|
|
|
+ # 按行循环存入Redis(使用Hash结构)
|
|
|
+ for idx, row in enumerate(rows, start=1):
|
|
|
+ store_row_to_redis(redis_client, bind_id, f"row_{idx}", row)
|
|
|
+
|
|
|
+ # # 存储表头
|
|
|
+ # store_header_to_redis(redis_client, BIND_ID)
|
|
|
+
|
|
|
+ # 存储行数
|
|
|
+ redis_client.hset(bind_id, "row_count", len(rows))
|
|
|
+
|
|
|
+ logger.info(f"[OK] 成功将 {len(rows)} 行数据存入Redis")
|
|
|
+ logger.info(f"[OK] Redis Key: {bind_id}")
|
|
|
+ logger.info(f"[Redis存储] 数据存储完成")
|
|
|
+
|
|
|
+ return rows
|
|
|
+
|
|
|
+
|
|
|
+def read_from_redis_and_save_csv(redis_client, bind_id=None, csv_save_path=None):
|
|
|
+ """从Redis一次性读取所有数据并保存为CSV文件
|
|
|
+
|
|
|
+ Args:
|
|
|
+ redis_client: Redis客户端
|
|
|
+ bind_id: 绑定ID
|
|
|
+ csv_save_path: CSV文件保存路径(可选)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ pandas.DataFrame: 包含数据的DataFrame,如果Redis中不存在数据则返回空DataFrame
|
|
|
+ """
|
|
|
+ logger.info(f"\n从Redis读取数据 (ID: {bind_id})")
|
|
|
+
|
|
|
+ # 调试信息:记录Redis连接信息
|
|
|
+ try:
|
|
|
+ logger.info(f"[DEBUG] Redis连接信息: host={redis_client.connection_pool.connection_kwargs.get('host')}, "
|
|
|
+ f"port={redis_client.connection_pool.connection_kwargs.get('port')}, "
|
|
|
+ f"db={redis_client.connection_pool.connection_kwargs.get('db')}")
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"[DEBUG] 无法获取Redis连接信息: {e}")
|
|
|
+
|
|
|
+ # 调试信息:检查键是否存在
|
|
|
+ key_exists = redis_client.exists(bind_id)
|
|
|
+ logger.info(f"[DEBUG] Redis键 '{bind_id}' 存在状态: {key_exists}")
|
|
|
+
|
|
|
+ # 调试信息:列出所有匹配的键
|
|
|
+ try:
|
|
|
+ all_keys = redis_client.keys(f"*{bind_id}*")
|
|
|
+ logger.info(f"[DEBUG] Redis中匹配的键: {all_keys}")
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"[DEBUG] 无法列出Redis键: {e}")
|
|
|
+
|
|
|
+ # 检查数据是否存在
|
|
|
+ if not key_exists:
|
|
|
+ logger.warning(f"[WARN] Redis中不存在ID '{bind_id}' 的数据,返回空DataFrame")
|
|
|
+ return pd.DataFrame() # 返回空DataFrame而不是None
|
|
|
+
|
|
|
+ # # 获取表头
|
|
|
+ # header_json = redis_client.hget(bind_id, "header")
|
|
|
+ # if not header_json:
|
|
|
+ # logger.info("[ERROR] 未找到表头数据")
|
|
|
+ # return
|
|
|
+
|
|
|
+ # header = json.loads(header_json)
|
|
|
+ # logger.info(f"[OK] 读取到表头: {header}")
|
|
|
+
|
|
|
+ # 获取行数
|
|
|
+ row_count = int(redis_client.hget(bind_id, "row_count") or 0)
|
|
|
+ logger.info(f"[OK] 总行数: {row_count}")
|
|
|
+
|
|
|
+ # 读取所有行数据
|
|
|
+ rows = []
|
|
|
+ for idx in range(1, row_count + 1):
|
|
|
+ row_key = f"row_{idx}"
|
|
|
+ row_json = redis_client.hget(bind_id, row_key)
|
|
|
+ if row_json:
|
|
|
+ row = json.loads(row_json)
|
|
|
+ rows.append(row)
|
|
|
+
|
|
|
+ logger.info(f"[OK] 成功从Redis读取 {len(rows)} 行数据")
|
|
|
+
|
|
|
+ # 使用pandas保存为新的CSV文件
|
|
|
+ df_output = pd.DataFrame(rows)
|
|
|
+ if csv_save_path:
|
|
|
+ df_output.to_csv(OUTPUT_CSV, index=False, encoding='utf-8-sig')
|
|
|
+
|
|
|
+ logger.info(f"[OK] 数据已保存到: {OUTPUT_CSV}")
|
|
|
+
|
|
|
+ # 读取完成后删除Redis中的bind_id数据
|
|
|
+ redis_client.delete(bind_id)
|
|
|
+ logger.info(f"[OK] 已删除Redis中ID '{bind_id}' 的数据")
|
|
|
+
|
|
|
+ return df_output
|
|
|
+
|
|
|
+
|
|
|
+def display_redis_data(redis_client, bind_id=None):
|
|
|
+ """显示Redis中存储的数据摘要"""
|
|
|
+ logger.info(f"\nRedis数据摘要 (ID: {bind_id})")
|
|
|
+ logger.info("-" * 50)
|
|
|
+
|
|
|
+ # 获取所有字段
|
|
|
+ all_fields = redis_client.hkeys(bind_id)
|
|
|
+ logger.info(f"字段数量: {len(all_fields)}")
|
|
|
+ logger.info(f"字段列表: {all_fields}")
|
|
|
+
|
|
|
+ logger.info("-" * 50)
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ """主函数"""
|
|
|
+ logger.info("=" * 60)
|
|
|
+ logger.info("Redis CSV 处理器")
|
|
|
+ logger.info("=" * 60)
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 获取Redis连接
|
|
|
+ redis_client = get_redis_connection()
|
|
|
+
|
|
|
+ # 读取CSV并存入Redis
|
|
|
+ df_store_to_redis(redis_client)
|
|
|
+
|
|
|
+ # 显示Redis数据摘要
|
|
|
+ display_redis_data(redis_client)
|
|
|
+
|
|
|
+ # 从Redis读取并保存为新的CSV文件
|
|
|
+ rows = read_from_redis_and_save_csv(redis_client)
|
|
|
+ # 将字典列表转换为JSON字符串列表
|
|
|
+ # rows_str = [json.dumps(row, ensure_ascii=False) for row in rows]
|
|
|
+ # logger.info(f"[OK] 保存到CSV文件成功 (\n{header}\n{'\n'.join(rows_str)})")
|
|
|
+ logger.info(f"{pd.DataFrame(rows)}")
|
|
|
+ logger.info("\n" + "=" * 60)
|
|
|
+ logger.info("[OK] 处理完成!")
|
|
|
+ logger.info("=" * 60)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.info(f"\n[ERROR] 处理过程中发生错误: {e}")
|
|
|
+ import traceback
|
|
|
+ traceback.logger.info_exc()
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|