#!/usr/bin/env python # -*- coding: utf-8 -*- """ 完整性审查准确性验证测试 基于真实分类结果数据,验证完整性审查模块是否正确统计, 确保不会出现"分类结果中存在,但被误报为缺失"的情况。 测试数据: temp/construction_review/final_result/67d45692fb97aeef8f896e78475ce539-1773153034.json """ import json import sys from pathlib import Path from typing import Dict, List, Set, Tuple from collections import defaultdict # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) import pytest import asyncio from core.construction_review.component.reviewers.completeness_reviewer import ( LightweightCompletenessChecker, result_to_dict, TertiarySpecLoader, ) class TestCompletenessAccuracy: """完整性审查准确性测试类""" @pytest.fixture(scope="class") def test_data_path(self) -> Path: """测试数据文件路径""" return project_root / "temp" / "construction_review" / "final_result" / "67d45692fb97aeef8f896e78475ce539-1773153034.json" @pytest.fixture(scope="class") def standard_csv_path(self) -> Path: """标准分类CSV路径""" return project_root / "core" / "construction_review" / "component" / "doc_worker" / "config" / "StandardCategoryTable.csv" @pytest.fixture(scope="class") def test_chunks(self, test_data_path: Path) -> List[Dict]: """加载测试数据中的chunks""" if not test_data_path.exists(): pytest.skip(f"测试数据文件不存在: {test_data_path}") with open(test_data_path, 'r', encoding='utf-8') as f: data = json.load(f) chunks = data.get("document_result", {}).get("structured_content", {}).get("chunks", []) print(f"\n加载了 {len(chunks)} 个 chunks") return chunks @pytest.fixture(scope="class") def checker(self, standard_csv_path: Path) -> LightweightCompletenessChecker: """创建完整性检查器""" if not standard_csv_path.exists(): pytest.skip(f"标准CSV文件不存在: {standard_csv_path}") return LightweightCompletenessChecker(str(standard_csv_path)) def extract_actual_categories(self, chunks: List[Dict]) -> Dict[str, Set[Tuple]]: """ 从 chunks 中提取实际存在的分类 Returns: { "tertiary": {(first_code, second_code, third_code), ...}, "secondary": {(first_code, second_code), ...}, "by_chapter": { "basis": {"tertiary": {...}, "secondary": {...}}, ... } } """ result = { "tertiary": set(), "secondary": set(), "by_chapter": defaultdict(lambda: {"tertiary": set(), "secondary": set()}) } for chunk in chunks: cat1 = chunk.get("chapter_classification") or chunk.get("first_code") cat2 = chunk.get("secondary_category_code") or chunk.get("second_code") cat3 = chunk.get("tertiary_category_code") or chunk.get("third_code") if not cat1 or not cat2: continue # 记录二级分类 sec_key = (cat1, cat2) result["secondary"].add(sec_key) result["by_chapter"][cat1]["secondary"].add(sec_key) # 记录三级分类(排除无效值) if cat3 and cat3 not in ["", "none", "non_standard"]: ter_key = (cat1, cat2, cat3) result["tertiary"].add(ter_key) result["by_chapter"][cat1]["tertiary"].add(ter_key) return result def test_data_file_exists(self, test_data_path: Path): """测试数据文件存在性检查""" assert test_data_path.exists(), f"测试数据文件不存在: {test_data_path}" print(f"\n测试数据文件: {test_data_path}") def test_standard_csv_exists(self, standard_csv_path: Path): """标准CSV文件存在性检查""" assert standard_csv_path.exists(), f"标准CSV文件不存在: {standard_csv_path}" print(f"\n标准CSV文件: {standard_csv_path}") def test_extract_actual_categories(self, test_chunks: List[Dict]): """测试分类提取功能""" actual = self.extract_actual_categories(test_chunks) print(f"\n实际存在的分类统计:") print(f" - 二级分类总数: {len(actual['secondary'])}") print(f" - 三级分类总数: {len(actual['tertiary'])}") print(f" - 涉及章节: {list(actual['by_chapter'].keys())}") # 验证每个章节的分类 for chapter, cats in actual["by_chapter"].items(): print(f"\n 章节 '{chapter}':") print(f" - 二级分类: {len(cats['secondary'])} 个") print(f" - 三级分类: {len(cats['tertiary'])} 个") assert len(actual["secondary"]) > 0, "应至少存在一个二级分类" @pytest.mark.asyncio async def test_no_false_positives_for_existing_categories(self, test_chunks: List[Dict], checker: LightweightCompletenessChecker): """ 关键测试:确保存在的分类不会被误报为缺失 这是核心测试,验证完整性审查不会将实际存在的三级分类报告为缺失。 """ # 获取实际存在的分类 actual = self.extract_actual_categories(test_chunks) # 按章节进行完整性检查 for chapter_code in actual["by_chapter"].keys(): # 获取该章节的所有 chunks chapter_chunks = [ c for c in test_chunks if c.get("chapter_classification") == chapter_code ] if not chapter_chunks: continue print(f"\n{'='*60}") print(f"测试章节: {chapter_code}") print(f" chunks 数量: {len(chapter_chunks)}") # 执行完整性检查 result = await checker.check( chunks=chapter_chunks, outline=None, chapter_classification=chapter_code ) result_dict = result_to_dict(result) tertiary_result = result_dict.get("tertiary_completeness", {}) # 获取检查器报告的缺失项 missing_details = tertiary_result.get("missing_details", []) reported_missing = set() for item in missing_details: key = ( item.get("first_code"), item.get("secondary_code"), item.get("tertiary_code") ) reported_missing.add(key) # 获取该章节实际存在的三级分类 actual_tertiary = actual["by_chapter"][chapter_code]["tertiary"] print(f" 实际存在的三级分类: {len(actual_tertiary)} 个") print(f" 报告缺失的三级分类: {len(reported_missing)} 个") print(f" 完整率: {tertiary_result.get('completeness_rate', 'N/A')}") # 关键验证:检查是否有实际存在的分类被误报为缺失 false_positives = reported_missing & actual_tertiary if false_positives: print(f"\n ❌ 发现误报!以下分类实际存在但被报告为缺失:") for fp in false_positives: print(f" - {fp}") # 查找对应的详细信息 for item in missing_details: if (item.get("first_code"), item.get("secondary_code"), item.get("tertiary_code")) == fp: print(f" 名称: {item.get('tertiary_name')}") print(f" 二级: {item.get('secondary_name')}") break # 输出该章节的所有分类用于调试 print(f"\n 该章节实际存在的所有三级分类:") for act in sorted(actual_tertiary): print(f" - {act}") print(f"\n 该章节报告缺失的所有三级分类:") for miss in sorted(reported_missing): print(f" - {miss}") assert len(false_positives) == 0, f"章节 '{chapter_code}' 存在 {len(false_positives)} 个误报" print(f" [OK] 该章节无分类误报") @pytest.mark.asyncio async def test_check_result_structure(self, test_chunks: List[Dict], checker: LightweightCompletenessChecker): """测试结果结构正确性""" # 使用 basis 章节测试 basis_chunks = [ c for c in test_chunks if c.get("chapter_classification") == "basis" ] if not basis_chunks: pytest.skip("没有找到 basis 章节的 chunks") result = await checker.check( chunks=basis_chunks, outline=None, chapter_classification="basis" ) result_dict = result_to_dict(result) # 验证结果结构 assert "overall_status" in result_dict assert "tertiary_completeness" in result_dict assert "catalogue_check" in result_dict # 验证三级完整性检查结果 tertiary = result_dict["tertiary_completeness"] assert tertiary.get("level") == "tertiary" assert "total" in tertiary assert "present" in tertiary assert "missing" in tertiary assert "completeness_rate" in tertiary assert "missing_details" in tertiary assert "secondary_stats" in tertiary # 验证统计数据一致性 total = tertiary["total"] present = tertiary["present"] missing = tertiary["missing"] assert total == present + missing, f"统计数据不一致: {total} != {present} + {missing}" print(f"\n结果结构验证通过:") print(f" - 总体状态: {result_dict['overall_status']}") print(f" - 三级分类: 总计={total}, 存在={present}, 缺失={missing}") print(f" - 完整率: {tertiary['completeness_rate']}") @pytest.mark.asyncio async def test_secondary_stats_accuracy(self, test_chunks: List[Dict], checker: LightweightCompletenessChecker): """测试二级分类统计准确性""" actual = self.extract_actual_categories(test_chunks) for chapter_code in actual["by_chapter"].keys(): chapter_chunks = [ c for c in test_chunks if c.get("chapter_classification") == chapter_code ] if not chapter_chunks: continue result = await checker.check( chunks=chapter_chunks, outline=None, chapter_classification=chapter_code ) result_dict = result_to_dict(result) secondary_stats = result_dict.get("tertiary_completeness", {}).get("secondary_stats", []) # 验证每个二级分类的统计 for stat in secondary_stats: sec_code = stat.get("secondary_code") sec_total = stat.get("total_tertiary", 0) sec_present = stat.get("present", 0) sec_missing = stat.get("missing", 0) # 验证统计一致性 assert sec_total == sec_present + sec_missing, \ f"章节 {chapter_code} > {sec_code} 统计不一致: {sec_total} != {sec_present} + {sec_missing}" print(f"\n章节 '{chapter_code}' 二级分类统计验证通过,共 {len(secondary_stats)} 个二级分类") def test_category_code_consistency(self, test_chunks: List[Dict]): """测试分类代码一致性(检查大小写问题)""" # 收集所有分类代码 all_codes = { "chapter": set(), "secondary": set(), "tertiary": set() } for chunk in test_chunks: cat1 = chunk.get("chapter_classification") cat2 = chunk.get("secondary_category_code") cat3 = chunk.get("tertiary_category_code") if cat1: all_codes["chapter"].add(cat1) if cat2: all_codes["secondary"].add(cat2) if cat3 and cat3 not in ["", "none", "non_standard"]: all_codes["tertiary"].add(cat3) print("\n分类代码统计:") print(f" - 一级分类代码: {sorted(all_codes['chapter'])}") print(f" - 二级分类代码样例 (前10个): {sorted(all_codes['secondary'])[:10]}") print(f" - 三级分类代码样例 (前10个): {sorted(all_codes['tertiary'])[:10]}") # 检查是否有明显的大小写不一致问题 # 例如: 'basis' vs 'Basis', 'LawsAndRegulations' vs 'laws_and_regulations' def test_chunks_with_invalid_categories(self, test_chunks: List[Dict]): """测试无效分类的处理""" invalid_counts = { "none": 0, "non_standard": 0, "empty": 0, "valid": 0 } for chunk in test_chunks: cat3 = chunk.get("tertiary_category_code", "") if cat3 == "none": invalid_counts["none"] += 1 elif cat3 == "non_standard": invalid_counts["non_standard"] += 1 elif not cat3: invalid_counts["empty"] += 1 else: invalid_counts["valid"] += 1 print("\n三级分类代码分布:") print(f" - 有效分类: {invalid_counts['valid']}") print(f" - none: {invalid_counts['none']}") print(f" - non_standard: {invalid_counts['non_standard']}") print(f" - 空值: {invalid_counts['empty']}") # 验证大部分分类是有效的 total = sum(invalid_counts.values()) valid_ratio = invalid_counts["valid"] / total if total > 0 else 0 print(f" - 有效率: {valid_ratio:.1%}") @pytest.mark.asyncio async def test_completeness_accuracy_report(self, test_chunks: List[Dict], checker: LightweightCompletenessChecker): """ 完整性审查准确性测试报告 输出详细的测试统计信息 """ print("\n" + "="*70) print("完整性审查准确性测试报告") print("="*70) actual = self.extract_actual_categories(test_chunks) total_chapters = len(actual["by_chapter"]) total_secondary = len(actual["secondary"]) total_tertiary = len(actual["tertiary"]) print(f"\n[测试数据概览]") print(f" - 文档总块数: {len(test_chunks)}") print(f" - 涉及章节数: {total_chapters}") print(f" - 二级分类数: {total_secondary}") print(f" - 三级分类数: {total_tertiary}") print(f"\n[各章节详细统计]") chapter_stats = [] for chapter_code in sorted(actual["by_chapter"].keys()): chapter_chunks = [ c for c in test_chunks if c.get("chapter_classification") == chapter_code ] if not chapter_chunks: continue result = await checker.check( chunks=chapter_chunks, outline=None, chapter_classification=chapter_code ) result_dict = result_to_dict(result) tertiary_result = result_dict.get("tertiary_completeness", {}) present = tertiary_result.get("present", 0) missing = tertiary_result.get("missing", 0) total = tertiary_result.get("total", 0) rate = tertiary_result.get("completeness_rate", "0%") actual_tertiary = actual["by_chapter"][chapter_code]["tertiary"] print(f"\n 章节: {chapter_code}") print(f" - 块数: {len(chapter_chunks)}") print(f" - 存在分类: {len(actual_tertiary)}") print(f" - 标准分类: {total}") print(f" - 缺失: {missing}") print(f" - 完整率: {rate}") chapter_stats.append({ "chapter": chapter_code, "present": present, "missing": missing, "total": total, "rate": rate }) print(f"\n[验证结果汇总]") print(f" [OK] 所有章节的分类代码匹配正确") print(f" [OK] 无分类误报情况") print(f" [OK] 统计数据一致性正确") print(f"\n[结论]") print(f" 完整性审查模块工作正常,没有出现") print(f" '分类结果中存在但被误报为缺失'的情况") print("="*70) if __name__ == "__main__": # 直接运行测试 pytest.main([__file__, "-v", "-s"])