| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 |
- # -*- coding: utf-8 -*-
- """
- 大纲审查功能单元测试
- 测试OutlineReviewer.outline_review方法的入参出参
- """
- import pytest
- import asyncio
- import json
- from typing import Dict, Any
- import sys
- import os
- # 添加项目根目录到Python路径
- sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')))
- from core.construction_review.component.reviewers.outline_reviewer import OutlineReviewer
- class TestOutlineReview:
- """大纲审查功能测试类"""
- def setup_method(self):
- """测试前置设置"""
- self.outline_reviewer = OutlineReviewer()
- self.test_trace_id = "test_outline_review_001"
- self.test_stage_name = "test_stage"
- self.test_state = {"status": "testing"}
- def load_test_data(self) -> Dict[str, Any]:
- """从测试数据文件加载大纲结构"""
- test_data_path = os.path.join(
- os.path.dirname(__file__), '..', '..', '..',
- 'temp', 'document_temp', '文档切分预处理结果.json'
- )
- try:
- with open(test_data_path, 'r', encoding='utf-8') as f:
- data = json.load(f)
- return data.get('outline', {})
- except Exception as e:
- print(f"加载测试数据失败: {e}")
- return self._get_mock_outline_data()
- def _get_mock_outline_data(self) -> Dict[str, Any]:
- """获取模拟大纲数据"""
- return {
- "chapters": [
- {
- "index": 1,
- "title": "第一章编制依据",
- "page": "1",
- "original": "第一章编制依据...................................................... 1",
- "subsections": [
- {
- "title": "一、编制依据",
- "page": "1",
- "level": 2,
- "original": "一、编制依据................................................................. 1"
- },
- {
- "title": "二、编制原则",
- "page": "3",
- "level": 2,
- "original": "二、编制原则................................................................. 3"
- }
- ]
- },
- {
- "index": 2,
- "title": "第二章工程概况",
- "page": "5",
- "original": "第二章工程概况...................................................... 5",
- "subsections": [
- {
- "title": "一、项目背景",
- "page": "5",
- "level": 2,
- "original": "一、项目背景................................................................. 5"
- },
- {
- "title": "二、工程地质",
- "page": "6",
- "level": 2,
- "original": "二、工程地质................................................................. 6"
- }
- ]
- }
- ]
- }
- def prepare_review_data(self, outline_data: Dict[str, Any]) -> Dict[str, Any]:
- """准备审查数据,模拟ai_review_engine.py中的数据处理逻辑"""
- chapters = outline_data.get('chapters', [])
- # 1. 获取整体大纲(1级大纲目录)
- overall_outline = ""
- for chapter in chapters:
- overall_outline += f"{chapter['title']} (页码: {chapter['page']})\n"
- # 2. 获取大纲各章节及其子目录的详细信息
- detailed_outline = []
- for chapter in chapters:
- # 追加章节标题和页码
- detailed_outline.append(f"\n{chapter['title']}\n")
- detailed_outline.append(f"页码: {chapter['page']}")
- # 添加子目录
- subsections = chapter.get('subsections', [])
- if subsections:
- detailed_outline.append("\n")
- for subsection in subsections:
- indent = " " * (subsection['level'] - 1)
- detailed_outline.append(f"{indent}- {subsection['title']} (页码: {subsection['page']})\n")
- else:
- detailed_outline.append("\n")
- return {
- 'outline_content': outline_data,
- 'overall_outline': overall_outline,
- 'detailed_outline': detailed_outline,
- 'state': self.test_state,
- 'stage_name': self.test_stage_name
- }
- @pytest.mark.asyncio
- async def test_outline_review_basic_functionality(self):
- """测试大纲审查基本功能"""
- print("\n=== 测试大纲审查基本功能 ===")
- # 加载测试数据
- outline_data = self.load_test_data()
- print(f"加载的大纲章节数量: {len(outline_data.get('chapters', []))}")
- # 准备审查数据
- review_data = self.prepare_review_data(outline_data)
- print(f"整体大纲内容预览:")
- print(f"{review_data['overall_outline'][:200]}...")
- print(f"详细大纲项目数量: {len(review_data['detailed_outline'])}")
- for i, item in enumerate(review_data['detailed_outline'][:5]):
- if item.strip():
- print(f" {i+1}. {item.strip()[:50]}...")
- try:
- # 执行大纲审查
- print("\n开始执行大纲审查...")
- result = await self.outline_reviewer.outline_review(
- review_data, self.test_trace_id, self.test_stage_name, self.test_state
- )
- # 验证结果
- print(f"\n=== 审查结果 ===")
- print(f"审查成功状态: {result.get('success', False)}")
- print(f"执行时间: {result.get('execution_time', 0):.2f}秒")
- print(f"详细项目总数: {result.get('total_detailed_items', 0)}")
- # 阶段1结果
- stage1_result = result.get('stage1_overall_review')
- if stage1_result:
- print(f"\n--- 阶段1:整体大纲审查结果 ---")
- print(f"整体大纲审查成功: {stage1_result.get('success', False)}")
- parsed_overall = stage1_result.get('parsed_result')
- if parsed_overall and isinstance(parsed_overall, list):
- print(f"十大类章节审查结果数量: {len(parsed_overall)}")
- for item in parsed_overall[:3]: # 只显示前3个
- chapter_category = item.get('章节类别', 'N/A')
- status = item.get('完整性状态', 'N/A')
- # 替换特殊字符避免编码问题
- status_safe = status.replace('✔', '[完整]').replace('⚠', '[部分]').replace('✘', '[缺失]')
- print(f" - {chapter_category}: {status_safe}")
- # 阶段2结果
- stage2_result = result.get('stage2_detailed_review', [])
- print(f"\n--- 阶段2:详细大纲逐项审查结果 ---")
- print(f"详细项目审查结果数量: {len(stage2_result)}")
- success_count = 0
- for i, item_result in enumerate(stage2_result[:5]): # 只显示前5个
- item_index = item_result.get('item_index', i)
- outline_item = item_result.get('outline_item', '')[:50]
- review_result = item_result.get('review_result', {})
- print(f" 项目{item_index+1}: {outline_item}...")
- print(f" 审查成功: {review_result.get('success', False)}")
- if review_result.get('success'):
- category = review_result.get('category', 'N/A')
- parsed_result = review_result.get('parsed_result')
- if parsed_result and isinstance(parsed_result, list):
- print(f" 分类: {category}")
- print(f" 审查项目数: {len(parsed_result)}")
- success_count += 1
- else:
- print(f" 错误: {review_result.get('error_message', 'N/A')}")
- print(f"\n前5个项目中成功审查数量: {success_count}")
- # 断言基本结果
- assert result.get('success', False), "大纲审查应该成功"
- assert 'stage1_overall_review' in result, "应该包含阶段1审查结果"
- assert 'stage2_detailed_review' in result, "应该包含阶段2审查结果"
- assert isinstance(result.get('stage2_detailed_review', []), list), "阶段2结果应该是列表"
- print(f"\n[测试通过] 大纲审查基本功能正常")
- except Exception as e:
- print(f"\n[测试失败] {str(e)}")
- raise
- @pytest.mark.asyncio
- async def test_outline_review_with_empty_data(self):
- """测试空数据的大纲审查"""
- print("\n=== 测试空数据的大纲审查 ===")
- # 准备空数据
- empty_review_data = {
- 'outline_content': {},
- 'overall_outline': '',
- 'detailed_outline': [],
- 'state': self.test_state,
- 'stage_name': self.test_stage_name
- }
- try:
- result = await self.outline_reviewer.outline_review(
- empty_review_data, self.test_trace_id, self.test_stage_name, self.test_state
- )
- print(f"空数据审查结果: {result.get('success', False)}")
- print(f"错误信息: {result.get('error_message', 'N/A')}")
- # 空数据应该处理失败
- assert not result.get('success', False), "空数据应该审查失败"
- print(f"\n[测试通过] 空数据正确处理失败")
- except Exception as e:
- print(f"期望的异常: {str(e)}")
- print(f"\n[测试通过] 空数据正确抛出异常")
- def test_data_preparation_logic(self):
- """测试数据准备逻辑"""
- print("\n=== 测试数据准备逻辑 ===")
- # 加载测试数据
- outline_data = self.load_test_data()
- review_data = self.prepare_review_data(outline_data)
- # 验证数据格式
- assert 'overall_outline' in review_data, "应该包含整体大纲"
- assert 'detailed_outline' in review_data, "应该包含详细大纲"
- assert isinstance(review_data['detailed_outline'], list), "详细大纲应该是列表格式"
- print(f"整体大纲长度: {len(review_data['overall_outline'])}")
- print(f"详细大纲项目数: {len(review_data['detailed_outline'])}")
- # 验证整体大纲格式
- overall_lines = review_data['overall_outline'].strip().split('\n')
- print(f"整体大纲行数: {len(overall_lines)}")
- # 验证详细大纲格式
- non_empty_items = [item for item in review_data['detailed_outline'] if item.strip()]
- print(f"非空详细大纲项目数: {len(non_empty_items)}")
- print(f"\n[测试通过] 数据准备逻辑正确")
- if __name__ == "__main__":
- # 运行测试
- pytest.main([__file__, "-v", "-s"])
|