""" 施工方案审查API测试客户端 用于测试Mock接口和前端联调 """ import requests import json import time import uuid from pathlib import Path from typing import Optional, Dict, Any class ConstructionReviewAPIClient: """施工方案审查API客户端""" def __init__(self, base_url: str = "http://127.0.0.1:8034", api_key: Optional[str] = None): self.base_url = base_url.rstrip('/') self.api_key = api_key self.session = requests.Session() if api_key: self.session.headers.update({ 'Authorization': f'Bearer {api_key}' }) def upload_file(self, file_path: str, project_plan_type: str, user: str, callback_url: Optional[str] = None) -> Dict[str, Any]: """ 上传文件 Args: file_path: 文件路径 project_plan_type: 工程方案类型 user: 用户标识 callback_url: 回调URL(可选) Returns: 上传响应结果 """ url = f"{self.base_url}/sgsc/file_upload" if not Path(file_path).exists(): raise FileNotFoundError(f"文件不存在: {file_path}") with open(file_path, 'rb') as f: files = {'file': f} data = { 'project_plan_type': project_plan_type, 'user': user } if callback_url: data['callback_url'] = callback_url response = self.session.post(url, files=files, data=data) response.raise_for_status() return response.json() def get_task_progress(self, callback_task_id: str, user: str) -> Dict[str, Any]: """ 查询任务进度 Args: callback_task_id: 任务ID user: 用户标识 Returns: 进度查询结果 """ url = f"{self.base_url}/sgsc/task_progress/{callback_task_id}" params = {'user': user} response = self.session.get(url, params=params) response.raise_for_status() return response.json() def get_review_results(self, file_id: str, user: str, result_type: str) -> Dict[str, Any]: """ 获取审查结果 Args: file_id: 文件ID user: 用户标识 result_type: 结果类型 ("summary" 或 "issues") Returns: 审查结果 """ url = f"{self.base_url}/sgsc/review_results" data = { 'id': file_id, 'user': user, 'type': result_type } response = self.session.post(url, json=data) response.raise_for_status() return response.json() def wait_for_completion(self, callback_task_id: str, user: str, timeout: int = 1800, interval: int = 10) -> Dict[str, Any]: """ 等待任务完成 Args: callback_task_id: 任务ID user: 用户标识 timeout: 超时时间(秒) interval: 轮询间隔(秒) Returns: 最终任务状态 """ start_time = time.time() while time.time() - start_time < timeout: try: result = self.get_task_progress(callback_task_id, user) if result['data']['review_task_status'] == 'completed': print(f"任务完成! 总进度: {result['data']['overall_progress']}%") return result else: progress = result['data']['overall_progress'] print(f"任务进行中... 进度: {progress}%") time.sleep(interval) except Exception as e: print(f"查询进度失败: {e}") time.sleep(interval) raise TimeoutError(f"任务超时,等待时间超过 {timeout} 秒") class MockTestRunner: """Mock测试运行器""" def __init__(self, client: ConstructionReviewAPIClient): self.client = client def test_file_upload(self, file_path: str = None) -> Dict[str, Any]: """测试文件上传""" print("=== 测试文件上传 ===") # 创建测试文件(如果没有提供文件路径) if not file_path: test_file = Path(r"D:\wx_work\sichuan_luqiao\LQAgentPlatform\data_pipeline\test_rawdata\1f3e1d98-5b4a-4a06-87b3-c7f0413b901a.pdf") if not test_file.exists(): # 创建一个简单的测试PDF文件内容 test_file.write_bytes(b"%PDF-1.4\n%Mock PDF for testing\n") file_path = str(test_file) try: result = self.client.upload_file( file_path=file_path, project_plan_type="bridge_up_part", user=str(uuid.uuid4()), callback_url="https://client.example.com/callback" ) print(f"✅ 文件上传成功") print(f"文件ID: {result['data']['id']}") print(f"任务ID: {result['data']['callback_task_id']}") return result except Exception as e: print(f"❌ 文件上传失败: {e}") raise def test_progress_query(self, callback_task_id: str, user: str) -> None: """测试进度查询""" print("\n=== 测试进度查询 ===") try: result = self.client.get_task_progress(callback_task_id, user) print(f"✅ 进度查询成功") print(f"任务状态: {result['data']['review_task_status']}") print(f"总进度: {result['data']['overall_progress']}%") for stage in result['data']['stages']: print(f" - {stage['stage_name']}: {stage['progress']}% ({stage['stage_status']})") except Exception as e: print(f"❌ 进度查询失败: {e}") raise def test_review_results(self, file_id: str, user: str) -> None: """测试审查结果获取""" print("\n=== 测试审查结果获取 ===") # 测试获取总结报告 try: result = self.client.get_review_results(file_id, user, "summary") print(f"✅ 总结报告获取成功") print(f"风险统计: {result['data']['risk_stats']}") print(f"四维评分: {result['data']['dimension_scores']}") print(f"总结报告: {result['data']['summary_report']}") except Exception as e: print(f"❌ 总结报告获取失败: {e}") # 测试获取问题条文 try: result = self.client.get_review_results(file_id, user, "issues") print(f"\n✅ 问题条文获取成功") issues = result['data']['issues'] print(f"发现问题数量: {len(issues)}") for i, issue in enumerate(issues): print(f"\n问题 {i+1}:") print(f" ID: {issue['issue_id']}") print(f" 页码: {issue['metadata']['page']}") print(f" 章节: {issue['metadata']['chapter']}") print(f" 风险等级: {issue['risk_summary']['max_risk_level']}") print(f" 检查项数量: {len(issue['review_lists'])}") except Exception as e: print(f"❌ 问题条文获取失败: {e}") def run_complete_test(self) -> None: """运行完整测试流程""" print("开始施工方案审查API完整测试...") try: # 1. 上传文件 upload_result = self.test_file_upload() file_id = upload_result['data']['id'] callback_task_id = upload_result['data']['callback_task_id'] user = str(uuid.uuid4()) # 实际应该从上传响应中获取,这里简化 # 2. 查询进度(等待一段时间让任务完成) print("\n等待任务完成...") time.sleep(2) # 短暂等待 # 先测试进度查询 self.test_progress_query(callback_task_id, user) # 3. 获取审查结果(可能需要等待任务完成) print("\n获取审查结果...") # 如果任务还未完成,直接标记完成(仅用于Mock测试) try: self.test_review_results(file_id, user) except Exception as e: print(f"审查结果获取失败,尝试完成任务: {e}") # 完成任务(Mock功能) response = requests.post(f"{self.client.base_url}/sgsc/mock/complete_task", data={"callback_task_id": callback_task_id}) print("任务已强制完成,重新获取结果...") self.test_review_results(file_id, user) print("\n🎉 完整测试流程执行成功!") except Exception as e: print(f"\n❌ 测试失败: {e}") raise def main(): """主函数 - 运行测试""" print("施工方案审查API Mock测试客户端") print("=" * 50) # 创建客户端 client = ConstructionReviewAPIClient( base_url="http://127.0.0.1:8034", api_key="mock-api-key-12345" ) # 创建测试运行器 test_runner = MockTestRunner(client) # 运行完整测试 test_runner.run_complete_test() if __name__ == "__main__": main()