#!/usr/bin/env python # -*- coding: utf-8 -*- """ 测试内容时效性审查是否正确处理 JTG B01-2011 的情况 """ import json import asyncio from core.construction_review.component.reviewers.timeliness_content_reviewer import ( StandardExtractor, ContentTimelinessReviewer ) # 测试数据 - 模拟 problem.json 中的情况 test_tertiary_details = [ { "third_category_name": "国家方针、政策、标准和设计文件", "third_category_code": "NationalPoliciesStandardsAndDesignDocument", "start_line": 80, "end_line": 82, "content": """<80> 国家方针、政策、标准和设计文件 <81> 《公路工程技术标准》(JTG B01-2011) <82> 《公路桥涵设计通用规范》(JTG D60-2015)""" } ] # 测试提取器 def test_extractor(): print("=" * 60) print("测试规范提取器") print("=" * 60) extractor = StandardExtractor() for detail in test_tertiary_details: refs = extractor.extract_from_content(detail["content"]) print(f"\n从 '{detail['third_category_name']}' 提取到 {len(refs)} 个规范引用:") for ref in refs: print(f" - 原始文本: {ref.original_text}") print(f" 名称: {ref.name}") print(f" 编号: {ref.number}") print(f" 上下文: {ref.context[:100]}...") return refs # 测试过滤逻辑 def test_filter_logic(): print("\n" + "=" * 60) print("测试过滤逻辑") print("=" * 60) # 模拟 match_reference_files 返回的数据 mock_match_result = [ { "review_item": "《公路工程技术标准》(JTG B01-2011)", "has_related_file": True, "has_exact_match": False, "exact_match_info": "", "same_name_current": "《公路工程技术标准》(JTG B01-2014)状态为现行" }, { "review_item": "《公路桥涵设计通用规范》(JTG D60-2015)", "has_related_file": True, "has_exact_match": True, "exact_match_info": "《公路桥涵设计通用规范》(JTG D60-2015)状态为现行", "same_name_current": "" } ] print("\n模拟 match_reference_files 返回数据:") for idx, item in enumerate(mock_match_result): print(f"\n 项{idx}:") print(f" review_item: {item['review_item']}") print(f" has_related_file: {item['has_related_file']}") print(f" has_exact_match: {item['has_exact_match']}") print(f" exact_match_info: {item['exact_match_info']}") print(f" same_name_current: {item['same_name_current']}") # 测试旧过滤逻辑(只保留 exact_match_info 不为空的) old_filtered = [item for item in mock_match_result if item.get('exact_match_info')] print(f"\n旧过滤逻辑(只保留 exact_match_info 不为空的): {len(old_filtered)} 个项") for item in old_filtered: print(f" - {item['review_item']}") # 测试新过滤逻辑(保留有相关信息的) new_filtered = [ item for item in mock_match_result if item.get('has_related_file') or item.get('exact_match_info') or item.get('same_name_current') ] print(f"\n新过滤逻辑(保留有相关信息的): {len(new_filtered)} 个项") for item in new_filtered: print(f" - {item['review_item']}") # 分析差异 missed = [item for item in mock_match_result if item not in old_filtered] if missed: print(f"\n[警告] 旧逻辑漏检的项:") for item in missed: print(f" - {item['review_item']}") print(f" has_related_file: {item['has_related_file']}") print(f" same_name_current: {item['same_name_current']}") # 完整测试 async def test_full_review(): print("\n" + "=" * 60) print("完整审查测试(需要 Milvus 连接)") print("=" * 60) try: async with ContentTimelinessReviewer(max_concurrent=4) as reviewer: results = await reviewer.review_tertiary_content( tertiary_details=test_tertiary_details, collection_name="first_bfp_collection_status" ) print(f"\n审查完成,共 {len(results)} 个结果:") for idx, result in enumerate(results): print(f"\n 结果{idx}:") print(f" check_item: {result.get('check_item')}") print(f" exist_issue: {result.get('exist_issue')}") print(f" risk_info: {result.get('risk_info')}") check_result = result.get('check_result', {}) print(f" issue_point: {check_result.get('issue_point')}") print(f" suggestion: {check_result.get('suggestion')}") print(f" reason: {check_result.get('reason')}") except Exception as e: print(f"测试失败: {e}") import traceback traceback.print_exc() if __name__ == "__main__": # 测试提取器 refs = test_extractor() # 测试过滤逻辑 test_filter_logic() # 完整测试(可选) # asyncio.run(test_full_review())