| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- 模拟实际工作流场景测试
- 使用 DocumentWorkflow 完整流程测试
- """
- import sys
- import os
- import asyncio
- from pathlib import Path
- project_root = Path(__file__).parent.parent.parent
- os.chdir(project_root)
- sys.path.insert(0, str(project_root))
- from core.construction_review.workflows.document_workflow import DocumentWorkflow
- from core.base.task_models import TaskFileInfo
- TARGET_FILE = Path(__file__).parent / "330测试fa56f7a8-bd36-4140-8cb1-f9f973ce8745.pdf"
- async def test_document_workflow():
- """测试完整文档工作流"""
- if not TARGET_FILE.exists():
- print(f"[ERROR] 文件不存在: {TARGET_FILE}")
- return 1
- # 读取文件内容
- with open(TARGET_FILE, "rb") as f:
- file_content = f.read()
- # 创建 TaskFileInfo
- task_info = TaskFileInfo({
- "file_id": "test_330_file",
- "user_id": "test_user",
- "callback_task_id": "test_callback_001",
- "file_name": TARGET_FILE.name,
- "file_type": "pdf",
- "file_content": file_content,
- "review_config": [],
- "review_item_config": [],
- "project_plan_type": "T梁运输及安装专项施工方案",
- "tendency_review_role": "",
- "launched_at": 0
- })
- # 创建工作流
- workflow = DocumentWorkflow(
- task_file_info=task_info,
- progress_manager=None, # 不依赖进度管理器
- redis_duplicate_checker=None
- )
- print(f"\n[INFO] 开始处理文件: {TARGET_FILE.name}")
- print(f" 文件大小: {len(file_content)} bytes")
- try:
- result = await workflow.execute(
- file_content=file_content,
- file_type="pdf"
- )
- print("\n" + "=" * 80)
- print("文档处理结果")
- print("=" * 80)
- print(f" file_id: {result.get('file_id')}")
- print(f" document_name: {result.get('document_name')}")
- print(f" total_chunks: {result.get('total_chunks')}")
- structured = result.get('structured_content', {})
- chunks = structured.get('chunks', [])
- # 分析章节分布
- print("\n" + "=" * 80)
- print("章节分布分析")
- print("=" * 80)
- first_levels = set()
- for c in chunks:
- label = c.get('section_label', '') or c.get('title', '')
- if '->' in label:
- first = label.split('->')[0].strip()
- else:
- first = label.strip()
- first_levels.add(first)
- print(f" 共 {len(first_levels)} 个一级章节:")
- for fl in sorted(first_levels):
- # 计算该章节下的chunks数量
- count = sum(1 for c in chunks if (c.get('section_label', '') or c.get('title', '')).startswith(fl))
- print(f" - {fl}: {count} chunks")
- # 检查是否有缺失的章节
- expected_chapters = ["第一章", "第二章", "第三章", "第四章", "第五章", "第六章", "第七章", "第八章", "第九章", "第十章"]
- found_chapters = []
- missing_chapters = []
- for ec in expected_chapters:
- found = any(ec in fl for fl in first_levels)
- if found:
- found_chapters.append(ec)
- else:
- missing_chapters.append(ec)
- print(f"\n 找到的章节: {found_chapters}")
- if missing_chapters:
- print(f" [WARNING] 缺失的章节: {missing_chapters}")
- # 详细信息
- print("\n" + "=" * 80)
- print("所有 Chunks 详情")
- print("=" * 80)
- for i, chunk in enumerate(chunks, 1):
- label = chunk.get('section_label', '') or chunk.get('title', '')
- content = chunk.get('content', '') or chunk.get('review_chunk_content', '')
- print(f"\n {i}. {label}")
- print(f" chunk_id: {chunk.get('chunk_id')}")
- print(f" page: {chunk.get('page')}")
- print(f" content_length: {len(content)}")
- if content:
- preview = content[:80].replace('\n', ' ')
- print(f" preview: {preview}...")
- return 0
- except Exception as e:
- print(f"\n[ERROR] 处理失败: {e}")
- import traceback
- traceback.print_exc()
- return 1
- if __name__ == "__main__":
- result = asyncio.run(test_document_workflow())
- sys.exit(result)
|