#!/usr/bin/env python # -*- coding: utf-8 -*- """ 测试完整的编制依据审查工作流 """ import asyncio import sys import os # 添加项目根目录到sys.path current_file = os.path.abspath(__file__) root_dir = os.path.dirname(os.path.dirname(os.path.dirname(current_file))) sys.path.append(root_dir) from core.base.task_models import TaskFileInfo from core.construction_review.workflows.ai_review_workflow import AIReviewWorkflow async def test_prep_basis_workflow(): """测试完整的编制依据审查工作流""" print("开始测试编制依据审查工作流") # 模拟结构化内容,包含编制依据 structured_content = { 'file_id': 'test_file_002', 'file_name': 'construction_plan.doc', 'prep_basis': ''' (1)《架桥机通用技术条件》(GB/T 26470-2011); (2)《起重机设计规范》(GB/T 3811-2008); (3)《起重机械安全规程 第 5 部分:桥式和门式起重机》(GB 6067.5-2014); (4)《电气装置安装工程 起重机电气装置施工及验收规范》(GB 50256-2014); (5)《起重设备安装工程施工及验收规范》(GB50278-2010); (6)《施工现场机械设备检查技术规范》(JGJ 160-2016); (7)《公路桥涵施工技术规范》(JTG/T 3650-2020); (8)《建设工程安全生产管理条例》《四川省安全生产条例》(2023); (9)《重要用途钢丝绳》(GB 8918-2006); (10)《起重机用钢丝绳》(GB T 34198-2017); (11)《起重机钢丝绳保养、维护、检验和报废》(GBT5972-2023); (12)《建筑施工起重吊装工程安全技术规范》(JGJ 276-2012); (13)《建筑施工高处作业安全技术规范》(JGJ 80-2016); ''', 'chunks': [ { 'section_label': '第一章 编制依据', 'content': '''第一章 编制依据 本专项施工方案编制依据如下: (1)《架桥机通用技术条件》(GB/T 26470-2011); (2)《起重机设计规范》(GB/T 3811-2008); (3)《起重机械安全规程 第 5 部分:桥式和门式起重机》(GB 6067.5-2014); ''', 'page': '1' }, { 'section_label': '第二章 工程概况', 'content': '''第二章 工程概况 本工程为某某大桥上部结构施工项目... ''', 'page': '2' } ], 'outline': { 'chapters': [ {'title': '第一章 编制依据', 'page': '1'}, {'title': '第二章 工程概况', 'page': '2'} ] } } # 创建模拟的TaskFileInfo,包含prep_basis_check配置 file_info_dict = { 'file_id': "test_file_002", 'callback_task_id': "test_callback_prep_basis_002", 'user_id': "test_user_002", 'review_config': ['completeness_check', 'prep_basis_check'], # 包含编制依据审查 'project_plan_type': "construction_plan", 'tendency_review_role': "safety_reviewer", 'file_name': "test_prep_basis_document.doc", 'file_type': "doc", 'launched_at': 1234567890 } task_info = TaskFileInfo(file_info_dict) try: # 创建AI审查工作流实例 workflow = AIReviewWorkflow( task_file_info=task_info, structured_content=structured_content, progress_manager=None, # 简化测试,不使用进度管理器 max_review_units=1, # 只测试一个单元,减少执行时间 review_mode="first" ) print("工作流创建成功,开始执行...") # 执行工作流 result = await workflow.execute() print("工作流执行完成!") print(f"执行结果:") print(f" - 文件ID: {result.get('file_id')}") print(f" - 总单元数: {result.get('total_units')}") print(f" - 成功单元数: {result.get('successful_units')}") print(f" - 失败单元数: {result.get('failed_units')}") print(f" - 状态: {result.get('status')}") # 检查审查结果中是否包含编制依据审查 review_results = result.get('review_results', []) if review_results: print(f" - 审查结果数量: {len(review_results)}") # 查找编制依据审查结果 prep_basis_result = None for item in review_results: if isinstance(item, dict) and 'review_results' in item: # 这可能是编制依据审查结果 if 'total_basis_items' in item: prep_basis_result = item break if prep_basis_result: print(f"编制依据审查结果:") print(f" - 成功状态: {prep_basis_result.get('success', False)}") print(f" - 总编制依据数: {prep_basis_result.get('total_basis_items', 0)}") print(f" - 有效项目数: {prep_basis_result.get('valid_items', 0)}") print(f" - 标准项目数: {prep_basis_result.get('standard_items', 0)}") print(f" - 执行时间: {prep_basis_result.get('execution_time', 0):.2f}秒") else: print("未找到编制依据审查结果") else: print("没有审查结果返回") except Exception as e: print(f"测试失败: {str(e)}") import traceback traceback.print_exc() if __name__ == "__main__": asyncio.run(test_prep_basis_workflow())