Sfoglia il codice sorgente

feat: 时效性查询结果对比、去重、格式转换等操作

ai02 4 settimane fa
parent
commit
687275594e

+ 13 - 0
src/app/scripts/tool/excel_info_csv.py

@@ -0,0 +1,13 @@
+import pandas as pd
+
+# 配置你的文件路径
+xlsx_file = "F:\时效性相关文档\新增标准原材料\工作簿4.xlsx"   # 要转换的xlsx文件
+csv_file = "F:\时效性相关文档\新增标准原材料\工作簿4.csv"   # 输出的csv文件
+
+# 读取Excel
+df = pd.read_excel(xlsx_file)
+
+# 保存为CSV
+df.to_csv(csv_file, index=False, encoding="utf-8-sig")
+
+print(f"转换完成!文件已保存为:{csv_file}")

+ 105 - 0
src/app/scripts/tool/时效性去重.py

@@ -0,0 +1,105 @@
+import csv
+import os
+
+def process_standard_csv(input_file: str, output_file: str, delete_summary_file: str):
+    """
+    处理CSV文件去重,保留空字段最少的数据,删除数据单独汇总
+    :param input_file: 原始CSV文件路径
+    :param output_file: 去重后输出文件路径
+    :param delete_summary_file: 删除数据汇总文件路径
+    """
+    # 定义需要处理的6个关键字段
+    target_fields = [
+        "标准编号", "标准名称", "状态",
+        "发布日期", "实施日期", "发布部门"
+    ]
+
+    # 存储去重规则:key=标准编号+标准名称+状态,value=该行数据+空字段数量
+    unique_data = {}
+    # 存储所有被删除的数据
+    deleted_data = []
+
+    try:
+        # 1. 读取原始CSV文件(修复编码和表头问题)
+        with open(input_file, 'r', encoding='utf-8-sig', newline='') as f:
+            reader = csv.DictReader(f)
+            # 保留原始表头,确保输出格式一致
+            fieldnames = reader.fieldnames
+
+            # 打印表头用于调试
+            print("📋 读取到CSV表头:", fieldnames)
+
+            # 校验必须包含目标字段
+            missing_fields = [field for field in target_fields if field not in fieldnames]
+            if missing_fields:
+                raise ValueError(f"CSV文件缺少必填字段:{missing_fields}")
+
+            print(f"✅ 成功读取原始文件,共{len(fieldnames)}列,开始处理数据...")
+
+            for row in reader:
+                # 提取去重关键字段值
+                code = row["标准编号"].strip() if row["标准编号"] else ""
+                name = row["标准名称"].strip() if row["标准名称"] else ""
+                status = row["状态"].strip() if row["状态"] else ""
+                key = f"{code}|{name}|{status}"
+
+                # 计算当前行6个目标字段的空字段数量(空字符串/None/纯空格都算空)
+                empty_count = 0
+                for field in target_fields:
+                    val = row[field]
+                    if not val or val.strip() == "":
+                        empty_count += 1
+
+                # 2. 去重逻辑:相同key只保留空字段最少的数据
+                if key not in unique_data:
+                    # 首次出现,直接保存
+                    unique_data[key] = {"data": row, "empty_count": empty_count}
+                else:
+                    # 已存在,比较空字段数量
+                    existing = unique_data[key]
+                    if empty_count < existing["empty_count"]:
+                        # 新数据空字段更少,替换旧数据,旧数据移入删除列表
+                        deleted_data.append(existing["data"])
+                        unique_data[key] = {"data": row, "empty_count": empty_count}
+                    else:
+                        # 新数据空字段更多/相等,直接移入删除列表
+                        deleted_data.append(row)
+
+        # 3. 导出【去重后的数据】
+        with open(output_file, 'w', encoding='utf-8-sig', newline='') as f:
+            writer = csv.DictWriter(f, fieldnames=fieldnames)
+            writer.writeheader()
+            for item in unique_data.values():
+                writer.writerow(item["data"])
+
+        # 4. 导出【删除数据汇总】
+        with open(delete_summary_file, 'w', encoding='utf-8-sig', newline='') as f:
+            writer = csv.DictWriter(f, fieldnames=fieldnames)
+            writer.writeheader()
+            writer.writerows(deleted_data)
+
+        # 打印处理结果
+        print("=" * 50)
+        print(f"📊 处理完成!")
+        print(f"原始数据总数:{len(unique_data) + len(deleted_data)} 条")
+        print(f"去重后保留:{len(unique_data)} 条")
+        print(f"删除并汇总:{len(deleted_data)} 条")
+        print(f"✅ 去重文件:{output_file}")
+        print(f"✅ 删除汇总文件:{delete_summary_file}")
+        print("=" * 50)
+
+    except FileNotFoundError:
+        print(f"❌ 错误:未找到文件 {input_file}")
+    except Exception as e:
+        print(f"❌ 处理失败:{str(e)}")
+
+
+if __name__ == "__main__":
+    # ===================== 配置参数(已修复)=====================
+    INPUT_CSV = r"F:\时效性相关文档\新增标准原材料\原文档重新查询.csv"
+    OUTPUT_CSV = r"F:\时效性相关文档\新增标准原材料\原文档重新查询_去重后数据.csv"
+    DELETE_SUMMARY = r"F:\时效性相关文档\新增标准原材料\原文档重新查询_删除数据汇总.csv"
+    # =========================================================================
+
+    # 执行处理
+    process_standard_csv(INPUT_CSV, OUTPUT_CSV, DELETE_SUMMARY)

+ 85 - 0
src/app/scripts/tool/时效性查询结果对比.py

@@ -0,0 +1,85 @@
+import pandas as pd
+import json
+
+# ===================== 路径配置 =====================
+PATH_SOURCE = r"F:\时效性相关文档\时效性新增名单(建筑与市政).csv"
+PATH_RESULT = r"F:\时效性相关文档\标准查询结果.csv"
+OUTPUT_JSON = r"F:\时效性相关文档\对比结果.json"
+# 新增:不存在数据的输出CSV路径
+OUTPUT_NOT_EXIST_CSV = r"F:\时效性相关文档\源文件有_查询结果无.csv"
+
+# ===================== 读取文件 =====================
+df_source = pd.read_csv(PATH_SOURCE, encoding="utf-8-sig").fillna("")
+df_result = pd.read_csv(PATH_RESULT, encoding="utf-8-sig").fillna("")
+
+# 转为字典方便匹配
+result_dict = {}
+for _, row in df_result.iterrows():
+    code = str(row["标准编号"]).strip()
+    name = str(row["标准名称"].strip())
+    result_dict[code] = {
+        "标准名称": name,
+        "现行状态": row.get("现行状态", "")
+    }
+
+# 源列表
+source_list = []
+for _, row in df_source.iterrows():
+    code = str(row["标准编号"]).strip()
+    name = str(row["标准名称"].strip())
+    source_list.append({"标准编号": code, "标准名称": name})
+
+# ===================== 分组存储 =====================
+result = {
+    "不存在:源文件有,查询结果无": [],
+    "标准名称不同": [],
+    "多余:查询结果有,源文件无": []
+}
+
+# 1. 对比【源文件】每条
+for item in source_list:
+    s_code = item["标准编号"]
+    s_name = item["标准名称"]
+
+    if not s_code:
+        continue
+
+    # 不存在
+    if s_code not in result_dict:
+        result["不存在:源文件有,查询结果无"].append({
+            "标准编号": s_code,
+            "标准名称": s_name
+        })
+        continue
+
+    # 存在 → 对比名称
+    r_name = result_dict[s_code]["标准名称"]
+    if s_name != r_name:
+        result["标准名称不同"].append({
+            "标准编号": s_code,
+            "源文件名称": s_name,
+            "查询结果名称": r_name
+        })
+
+# 2. 检查【查询结果】多余的
+source_codes = [x["标准编号"] for x in source_list]
+for code in result_dict:
+    if code not in source_codes:
+        result["多余:查询结果有,源文件无"].append({
+            "标准编号": code,
+            "标准名称": result_dict[code]["标准名称"]
+        })
+
+# ===================== 输出 JSON =====================
+with open(OUTPUT_JSON, "w", encoding="utf-8") as f:
+    json.dump(result, f, ensure_ascii=False, indent=4)
+
+# ===================== 新增:输出不存在数据到CSV =====================
+df_not_exist = pd.DataFrame(result["不存在:源文件有,查询结果无"])
+df_not_exist.to_csv(OUTPUT_NOT_EXIST_CSV, encoding="utf-8-sig", index=False)
+
+# ===================== 打印结果 =====================
+print("✅ 对比完成!JSON 已按 3 类分组输出")
+print(f"✅ JSON 文件路径:{OUTPUT_JSON}")
+print(f"✅ 缺失数据已导出到 CSV:{OUTPUT_NOT_EXIST_CSV}")
+print(f"📊 共找到 {len(df_not_exist)} 条【源文件有、查询结果无】的记录")