#!/usr/bin/env python # -*- coding: utf-8 -*- """ 模拟实际工作流场景测试 使用 DocumentWorkflow 完整流程测试 """ import sys import os import asyncio from pathlib import Path project_root = Path(__file__).parent.parent.parent os.chdir(project_root) sys.path.insert(0, str(project_root)) from core.construction_review.workflows.document_workflow import DocumentWorkflow from core.base.task_models import TaskFileInfo TARGET_FILE = Path(__file__).parent / "330测试fa56f7a8-bd36-4140-8cb1-f9f973ce8745.pdf" async def test_document_workflow(): """测试完整文档工作流""" if not TARGET_FILE.exists(): print(f"[ERROR] 文件不存在: {TARGET_FILE}") return 1 # 读取文件内容 with open(TARGET_FILE, "rb") as f: file_content = f.read() # 创建 TaskFileInfo task_info = TaskFileInfo({ "file_id": "test_330_file", "user_id": "test_user", "callback_task_id": "test_callback_001", "file_name": TARGET_FILE.name, "file_type": "pdf", "file_content": file_content, "review_config": [], "review_item_config": [], "project_plan_type": "T梁运输及安装专项施工方案", "tendency_review_role": "", "launched_at": 0 }) # 创建工作流 workflow = DocumentWorkflow( task_file_info=task_info, progress_manager=None, # 不依赖进度管理器 redis_duplicate_checker=None ) print(f"\n[INFO] 开始处理文件: {TARGET_FILE.name}") print(f" 文件大小: {len(file_content)} bytes") try: result = await workflow.execute( file_content=file_content, file_type="pdf" ) print("\n" + "=" * 80) print("文档处理结果") print("=" * 80) print(f" file_id: {result.get('file_id')}") print(f" document_name: {result.get('document_name')}") print(f" total_chunks: {result.get('total_chunks')}") structured = result.get('structured_content', {}) chunks = structured.get('chunks', []) # 分析章节分布 print("\n" + "=" * 80) print("章节分布分析") print("=" * 80) first_levels = set() for c in chunks: label = c.get('section_label', '') or c.get('title', '') if '->' in label: first = label.split('->')[0].strip() else: first = label.strip() first_levels.add(first) print(f" 共 {len(first_levels)} 个一级章节:") for fl in sorted(first_levels): # 计算该章节下的chunks数量 count = sum(1 for c in chunks if (c.get('section_label', '') or c.get('title', '')).startswith(fl)) print(f" - {fl}: {count} chunks") # 检查是否有缺失的章节 expected_chapters = ["第一章", "第二章", "第三章", "第四章", "第五章", "第六章", "第七章", "第八章", "第九章", "第十章"] found_chapters = [] missing_chapters = [] for ec in expected_chapters: found = any(ec in fl for fl in first_levels) if found: found_chapters.append(ec) else: missing_chapters.append(ec) print(f"\n 找到的章节: {found_chapters}") if missing_chapters: print(f" [WARNING] 缺失的章节: {missing_chapters}") # 详细信息 print("\n" + "=" * 80) print("所有 Chunks 详情") print("=" * 80) for i, chunk in enumerate(chunks, 1): label = chunk.get('section_label', '') or chunk.get('title', '') content = chunk.get('content', '') or chunk.get('review_chunk_content', '') print(f"\n {i}. {label}") print(f" chunk_id: {chunk.get('chunk_id')}") print(f" page: {chunk.get('page')}") print(f" content_length: {len(content)}") if content: preview = content[:80].replace('\n', ' ') print(f" preview: {preview}...") return 0 except Exception as e: print(f"\n[ERROR] 处理失败: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": result = asyncio.run(test_document_workflow()) sys.exit(result)