# -*- coding: utf-8 -*- """ 大纲审查功能单元测试 测试OutlineReviewer.outline_review方法的入参出参 """ import pytest import asyncio import json from typing import Dict, Any import sys import os # 添加项目根目录到Python路径 sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..'))) from core.construction_review.component.reviewers.outline_reviewer import OutlineReviewer class TestOutlineReview: """大纲审查功能测试类""" def setup_method(self): """测试前置设置""" self.outline_reviewer = OutlineReviewer() self.test_trace_id = "test_outline_review_001" self.test_stage_name = "test_stage" self.test_state = {"status": "testing"} def load_test_data(self) -> Dict[str, Any]: """从测试数据文件加载大纲结构""" test_data_path = os.path.join( os.path.dirname(__file__), '..', '..', '..', 'temp', 'document_temp', '文档切分预处理结果.json' ) try: with open(test_data_path, 'r', encoding='utf-8') as f: data = json.load(f) return data.get('outline', {}) except Exception as e: print(f"加载测试数据失败: {e}") return self._get_mock_outline_data() def _get_mock_outline_data(self) -> Dict[str, Any]: """获取模拟大纲数据""" return { "chapters": [ { "index": 1, "title": "第一章编制依据", "page": "1", "original": "第一章编制依据...................................................... 1", "subsections": [ { "title": "一、编制依据", "page": "1", "level": 2, "original": "一、编制依据................................................................. 1" }, { "title": "二、编制原则", "page": "3", "level": 2, "original": "二、编制原则................................................................. 3" } ] }, { "index": 2, "title": "第二章工程概况", "page": "5", "original": "第二章工程概况...................................................... 5", "subsections": [ { "title": "一、项目背景", "page": "5", "level": 2, "original": "一、项目背景................................................................. 5" }, { "title": "二、工程地质", "page": "6", "level": 2, "original": "二、工程地质................................................................. 6" } ] } ] } def prepare_review_data(self, outline_data: Dict[str, Any]) -> Dict[str, Any]: """准备审查数据,模拟ai_review_engine.py中的数据处理逻辑""" chapters = outline_data.get('chapters', []) # 1. 获取整体大纲(1级大纲目录) overall_outline = "" for chapter in chapters: overall_outline += f"{chapter['title']} (页码: {chapter['page']})\n" # 2. 获取大纲各章节及其子目录的详细信息 detailed_outline = [] for chapter in chapters: # 追加章节标题和页码 detailed_outline.append(f"\n{chapter['title']}\n") detailed_outline.append(f"页码: {chapter['page']}") # 添加子目录 subsections = chapter.get('subsections', []) if subsections: detailed_outline.append("\n") for subsection in subsections: indent = " " * (subsection['level'] - 1) detailed_outline.append(f"{indent}- {subsection['title']} (页码: {subsection['page']})\n") else: detailed_outline.append("\n") return { 'outline_content': outline_data, 'overall_outline': overall_outline, 'detailed_outline': detailed_outline, 'state': self.test_state, 'stage_name': self.test_stage_name } @pytest.mark.asyncio async def test_outline_review_basic_functionality(self): """测试大纲审查基本功能""" print("\n=== 测试大纲审查基本功能 ===") # 加载测试数据 outline_data = self.load_test_data() print(f"加载的大纲章节数量: {len(outline_data.get('chapters', []))}") # 准备审查数据 review_data = self.prepare_review_data(outline_data) print(f"整体大纲内容预览:") print(f"{review_data['overall_outline'][:200]}...") print(f"详细大纲项目数量: {len(review_data['detailed_outline'])}") for i, item in enumerate(review_data['detailed_outline'][:5]): if item.strip(): print(f" {i+1}. {item.strip()[:50]}...") try: # 执行大纲审查 print("\n开始执行大纲审查...") result = await self.outline_reviewer.outline_review( review_data, self.test_trace_id, self.test_stage_name, self.test_state ) # 验证结果 print(f"\n=== 审查结果 ===") print(f"审查成功状态: {result.get('success', False)}") print(f"执行时间: {result.get('execution_time', 0):.2f}秒") print(f"详细项目总数: {result.get('total_detailed_items', 0)}") # 阶段1结果 stage1_result = result.get('stage1_overall_review') if stage1_result: print(f"\n--- 阶段1:整体大纲审查结果 ---") print(f"整体大纲审查成功: {stage1_result.get('success', False)}") parsed_overall = stage1_result.get('parsed_result') if parsed_overall and isinstance(parsed_overall, list): print(f"十大类章节审查结果数量: {len(parsed_overall)}") for item in parsed_overall[:3]: # 只显示前3个 chapter_category = item.get('章节类别', 'N/A') status = item.get('完整性状态', 'N/A') # 替换特殊字符避免编码问题 status_safe = status.replace('✔', '[完整]').replace('⚠', '[部分]').replace('✘', '[缺失]') print(f" - {chapter_category}: {status_safe}") # 阶段2结果 stage2_result = result.get('stage2_detailed_review', []) print(f"\n--- 阶段2:详细大纲逐项审查结果 ---") print(f"详细项目审查结果数量: {len(stage2_result)}") success_count = 0 for i, item_result in enumerate(stage2_result[:5]): # 只显示前5个 item_index = item_result.get('item_index', i) outline_item = item_result.get('outline_item', '')[:50] review_result = item_result.get('review_result', {}) print(f" 项目{item_index+1}: {outline_item}...") print(f" 审查成功: {review_result.get('success', False)}") if review_result.get('success'): category = review_result.get('category', 'N/A') parsed_result = review_result.get('parsed_result') if parsed_result and isinstance(parsed_result, list): print(f" 分类: {category}") print(f" 审查项目数: {len(parsed_result)}") success_count += 1 else: print(f" 错误: {review_result.get('error_message', 'N/A')}") print(f"\n前5个项目中成功审查数量: {success_count}") # 断言基本结果 assert result.get('success', False), "大纲审查应该成功" assert 'stage1_overall_review' in result, "应该包含阶段1审查结果" assert 'stage2_detailed_review' in result, "应该包含阶段2审查结果" assert isinstance(result.get('stage2_detailed_review', []), list), "阶段2结果应该是列表" print(f"\n[测试通过] 大纲审查基本功能正常") except Exception as e: print(f"\n[测试失败] {str(e)}") raise @pytest.mark.asyncio async def test_outline_review_with_empty_data(self): """测试空数据的大纲审查""" print("\n=== 测试空数据的大纲审查 ===") # 准备空数据 empty_review_data = { 'outline_content': {}, 'overall_outline': '', 'detailed_outline': [], 'state': self.test_state, 'stage_name': self.test_stage_name } try: result = await self.outline_reviewer.outline_review( empty_review_data, self.test_trace_id, self.test_stage_name, self.test_state ) print(f"空数据审查结果: {result.get('success', False)}") print(f"错误信息: {result.get('error_message', 'N/A')}") # 空数据应该处理失败 assert not result.get('success', False), "空数据应该审查失败" print(f"\n[测试通过] 空数据正确处理失败") except Exception as e: print(f"期望的异常: {str(e)}") print(f"\n[测试通过] 空数据正确抛出异常") def test_data_preparation_logic(self): """测试数据准备逻辑""" print("\n=== 测试数据准备逻辑 ===") # 加载测试数据 outline_data = self.load_test_data() review_data = self.prepare_review_data(outline_data) # 验证数据格式 assert 'overall_outline' in review_data, "应该包含整体大纲" assert 'detailed_outline' in review_data, "应该包含详细大纲" assert isinstance(review_data['detailed_outline'], list), "详细大纲应该是列表格式" print(f"整体大纲长度: {len(review_data['overall_outline'])}") print(f"详细大纲项目数: {len(review_data['detailed_outline'])}") # 验证整体大纲格式 overall_lines = review_data['overall_outline'].strip().split('\n') print(f"整体大纲行数: {len(overall_lines)}") # 验证详细大纲格式 non_empty_items = [item for item in review_data['detailed_outline'] if item.strip()] print(f"非空详细大纲项目数: {len(non_empty_items)}") print(f"\n[测试通过] 数据准备逻辑正确") if __name__ == "__main__": # 运行测试 pytest.main([__file__, "-v", "-s"])