test_real_scenario.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. 模拟实际工作流场景测试
  5. 使用 DocumentWorkflow 完整流程测试
  6. """
  7. import sys
  8. import os
  9. import asyncio
  10. from pathlib import Path
  11. project_root = Path(__file__).parent.parent.parent
  12. os.chdir(project_root)
  13. sys.path.insert(0, str(project_root))
  14. from core.construction_review.workflows.document_workflow import DocumentWorkflow
  15. from core.base.task_models import TaskFileInfo
  16. TARGET_FILE = Path(__file__).parent / "330测试fa56f7a8-bd36-4140-8cb1-f9f973ce8745.pdf"
  17. async def test_document_workflow():
  18. """测试完整文档工作流"""
  19. if not TARGET_FILE.exists():
  20. print(f"[ERROR] 文件不存在: {TARGET_FILE}")
  21. return 1
  22. # 读取文件内容
  23. with open(TARGET_FILE, "rb") as f:
  24. file_content = f.read()
  25. # 创建 TaskFileInfo
  26. task_info = TaskFileInfo({
  27. "file_id": "test_330_file",
  28. "user_id": "test_user",
  29. "callback_task_id": "test_callback_001",
  30. "file_name": TARGET_FILE.name,
  31. "file_type": "pdf",
  32. "file_content": file_content,
  33. "review_config": [],
  34. "review_item_config": [],
  35. "project_plan_type": "T梁运输及安装专项施工方案",
  36. "tendency_review_role": "",
  37. "launched_at": 0
  38. })
  39. # 创建工作流
  40. workflow = DocumentWorkflow(
  41. task_file_info=task_info,
  42. progress_manager=None, # 不依赖进度管理器
  43. redis_duplicate_checker=None
  44. )
  45. print(f"\n[INFO] 开始处理文件: {TARGET_FILE.name}")
  46. print(f" 文件大小: {len(file_content)} bytes")
  47. try:
  48. result = await workflow.execute(
  49. file_content=file_content,
  50. file_type="pdf"
  51. )
  52. print("\n" + "=" * 80)
  53. print("文档处理结果")
  54. print("=" * 80)
  55. print(f" file_id: {result.get('file_id')}")
  56. print(f" document_name: {result.get('document_name')}")
  57. print(f" total_chunks: {result.get('total_chunks')}")
  58. structured = result.get('structured_content', {})
  59. chunks = structured.get('chunks', [])
  60. # 分析章节分布
  61. print("\n" + "=" * 80)
  62. print("章节分布分析")
  63. print("=" * 80)
  64. first_levels = set()
  65. for c in chunks:
  66. label = c.get('section_label', '') or c.get('title', '')
  67. if '->' in label:
  68. first = label.split('->')[0].strip()
  69. else:
  70. first = label.strip()
  71. first_levels.add(first)
  72. print(f" 共 {len(first_levels)} 个一级章节:")
  73. for fl in sorted(first_levels):
  74. # 计算该章节下的chunks数量
  75. count = sum(1 for c in chunks if (c.get('section_label', '') or c.get('title', '')).startswith(fl))
  76. print(f" - {fl}: {count} chunks")
  77. # 检查是否有缺失的章节
  78. expected_chapters = ["第一章", "第二章", "第三章", "第四章", "第五章", "第六章", "第七章", "第八章", "第九章", "第十章"]
  79. found_chapters = []
  80. missing_chapters = []
  81. for ec in expected_chapters:
  82. found = any(ec in fl for fl in first_levels)
  83. if found:
  84. found_chapters.append(ec)
  85. else:
  86. missing_chapters.append(ec)
  87. print(f"\n 找到的章节: {found_chapters}")
  88. if missing_chapters:
  89. print(f" [WARNING] 缺失的章节: {missing_chapters}")
  90. # 详细信息
  91. print("\n" + "=" * 80)
  92. print("所有 Chunks 详情")
  93. print("=" * 80)
  94. for i, chunk in enumerate(chunks, 1):
  95. label = chunk.get('section_label', '') or chunk.get('title', '')
  96. content = chunk.get('content', '') or chunk.get('review_chunk_content', '')
  97. print(f"\n {i}. {label}")
  98. print(f" chunk_id: {chunk.get('chunk_id')}")
  99. print(f" page: {chunk.get('page')}")
  100. print(f" content_length: {len(content)}")
  101. if content:
  102. preview = content[:80].replace('\n', ' ')
  103. print(f" preview: {preview}...")
  104. return 0
  105. except Exception as e:
  106. print(f"\n[ERROR] 处理失败: {e}")
  107. import traceback
  108. traceback.print_exc()
  109. return 1
  110. if __name__ == "__main__":
  111. result = asyncio.run(test_document_workflow())
  112. sys.exit(result)