| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- 测试完整的编制依据审查工作流
- """
- import asyncio
- import sys
- import os
- # 添加项目根目录到sys.path
- current_file = os.path.abspath(__file__)
- root_dir = os.path.dirname(os.path.dirname(os.path.dirname(current_file)))
- sys.path.append(root_dir)
- from core.base.task_models import TaskFileInfo
- from core.construction_review.workflows.ai_review_workflow import AIReviewWorkflow
- async def test_prep_basis_workflow():
- """测试完整的编制依据审查工作流"""
- print("开始测试编制依据审查工作流")
- # 模拟结构化内容,包含编制依据
- structured_content = {
- 'file_id': 'test_file_002',
- 'file_name': 'construction_plan.doc',
- 'prep_basis': '''
- (1)《架桥机通用技术条件》(GB/T 26470-2011);
- (2)《起重机设计规范》(GB/T 3811-2008);
- (3)《起重机械安全规程 第 5 部分:桥式和门式起重机》(GB 6067.5-2014);
- (4)《电气装置安装工程 起重机电气装置施工及验收规范》(GB 50256-2014);
- (5)《起重设备安装工程施工及验收规范》(GB50278-2010);
- (6)《施工现场机械设备检查技术规范》(JGJ 160-2016);
- (7)《公路桥涵施工技术规范》(JTG/T 3650-2020);
- (8)《建设工程安全生产管理条例》《四川省安全生产条例》(2023);
- (9)《重要用途钢丝绳》(GB 8918-2006);
- (10)《起重机用钢丝绳》(GB T 34198-2017);
- (11)《起重机钢丝绳保养、维护、检验和报废》(GBT5972-2023);
- (12)《建筑施工起重吊装工程安全技术规范》(JGJ 276-2012);
- (13)《建筑施工高处作业安全技术规范》(JGJ 80-2016);
- ''',
- 'chunks': [
- {
- 'section_label': '第一章 编制依据',
- 'content': '''第一章 编制依据
- 本专项施工方案编制依据如下:
- (1)《架桥机通用技术条件》(GB/T 26470-2011);
- (2)《起重机设计规范》(GB/T 3811-2008);
- (3)《起重机械安全规程 第 5 部分:桥式和门式起重机》(GB 6067.5-2014);
- ''',
- 'page': '1'
- },
- {
- 'section_label': '第二章 工程概况',
- 'content': '''第二章 工程概况
- 本工程为某某大桥上部结构施工项目...
- ''',
- 'page': '2'
- }
- ],
- 'outline': {
- 'chapters': [
- {'title': '第一章 编制依据', 'page': '1'},
- {'title': '第二章 工程概况', 'page': '2'}
- ]
- }
- }
- # 创建模拟的TaskFileInfo,包含prep_basis_check配置
- file_info_dict = {
- 'file_id': "test_file_002",
- 'callback_task_id': "test_callback_prep_basis_002",
- 'user_id': "test_user_002",
- 'review_config': ['completeness_check', 'prep_basis_check'], # 包含编制依据审查
- 'project_plan_type': "construction_plan",
- 'tendency_review_role': "safety_reviewer",
- 'file_name': "test_prep_basis_document.doc",
- 'file_type': "doc",
- 'launched_at': 1234567890
- }
- task_info = TaskFileInfo(file_info_dict)
- try:
- # 创建AI审查工作流实例
- workflow = AIReviewWorkflow(
- task_file_info=task_info,
- structured_content=structured_content,
- progress_manager=None, # 简化测试,不使用进度管理器
- max_review_units=1, # 只测试一个单元,减少执行时间
- review_mode="first"
- )
- print("工作流创建成功,开始执行...")
- # 执行工作流
- result = await workflow.execute()
- print("工作流执行完成!")
- print(f"执行结果:")
- print(f" - 文件ID: {result.get('file_id')}")
- print(f" - 总单元数: {result.get('total_units')}")
- print(f" - 成功单元数: {result.get('successful_units')}")
- print(f" - 失败单元数: {result.get('failed_units')}")
- print(f" - 状态: {result.get('status')}")
- # 检查审查结果中是否包含编制依据审查
- review_results = result.get('review_results', [])
- if review_results:
- print(f" - 审查结果数量: {len(review_results)}")
- # 查找编制依据审查结果
- prep_basis_result = None
- for item in review_results:
- if isinstance(item, dict) and 'review_results' in item:
- # 这可能是编制依据审查结果
- if 'total_basis_items' in item:
- prep_basis_result = item
- break
- if prep_basis_result:
- print(f"编制依据审查结果:")
- print(f" - 成功状态: {prep_basis_result.get('success', False)}")
- print(f" - 总编制依据数: {prep_basis_result.get('total_basis_items', 0)}")
- print(f" - 有效项目数: {prep_basis_result.get('valid_items', 0)}")
- print(f" - 标准项目数: {prep_basis_result.get('standard_items', 0)}")
- print(f" - 执行时间: {prep_basis_result.get('execution_time', 0):.2f}秒")
- else:
- print("未找到编制依据审查结果")
- else:
- print("没有审查结果返回")
- except Exception as e:
- print(f"测试失败: {str(e)}")
- import traceback
- traceback.print_exc()
- if __name__ == "__main__":
- asyncio.run(test_prep_basis_workflow())
|