| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- """
- 施工方案审查API测试客户端
- 用于测试Mock接口和前端联调
- """
- import requests
- import json
- import time
- import uuid
- from pathlib import Path
- from typing import Optional, Dict, Any
- class ConstructionReviewAPIClient:
- """施工方案审查API客户端"""
- def __init__(self, base_url: str = "http://127.0.0.1:8034", api_key: Optional[str] = None):
- self.base_url = base_url.rstrip('/')
- self.api_key = api_key
- self.session = requests.Session()
- if api_key:
- self.session.headers.update({
- 'Authorization': f'Bearer {api_key}'
- })
- def upload_file(self, file_path: str, project_plan_type: str, user: str,
- callback_url: Optional[str] = None) -> Dict[str, Any]:
- """
- 上传文件
- Args:
- file_path: 文件路径
- project_plan_type: 工程方案类型
- user: 用户标识
- callback_url: 回调URL(可选)
- Returns:
- 上传响应结果
- """
- url = f"{self.base_url}/sgsc/file_upload"
- if not Path(file_path).exists():
- raise FileNotFoundError(f"文件不存在: {file_path}")
- with open(file_path, 'rb') as f:
- files = {'file': f}
- data = {
- 'project_plan_type': project_plan_type,
- 'user': user
- }
- if callback_url:
- data['callback_url'] = callback_url
- response = self.session.post(url, files=files, data=data)
- response.raise_for_status()
- return response.json()
- def get_task_progress(self, callback_task_id: str, user: str) -> Dict[str, Any]:
- """
- 查询任务进度
- Args:
- callback_task_id: 任务ID
- user: 用户标识
- Returns:
- 进度查询结果
- """
- url = f"{self.base_url}/sgsc/task_progress/{callback_task_id}"
- params = {'user': user}
- response = self.session.get(url, params=params)
- response.raise_for_status()
- return response.json()
- def get_review_results(self, file_id: str, user: str, result_type: str) -> Dict[str, Any]:
- """
- 获取审查结果
- Args:
- file_id: 文件ID
- user: 用户标识
- result_type: 结果类型 ("summary" 或 "issues")
- Returns:
- 审查结果
- """
- url = f"{self.base_url}/sgsc/review_results"
- data = {
- 'id': file_id,
- 'user': user,
- 'type': result_type
- }
- response = self.session.post(url, json=data)
- response.raise_for_status()
- return response.json()
- def wait_for_completion(self, callback_task_id: str, user: str,
- timeout: int = 1800, interval: int = 10) -> Dict[str, Any]:
- """
- 等待任务完成
- Args:
- callback_task_id: 任务ID
- user: 用户标识
- timeout: 超时时间(秒)
- interval: 轮询间隔(秒)
- Returns:
- 最终任务状态
- """
- start_time = time.time()
- while time.time() - start_time < timeout:
- try:
- result = self.get_task_progress(callback_task_id, user)
- if result['data']['review_task_status'] == 'completed':
- print(f"任务完成! 总进度: {result['data']['overall_progress']}%")
- return result
- else:
- progress = result['data']['overall_progress']
- print(f"任务进行中... 进度: {progress}%")
- time.sleep(interval)
- except Exception as e:
- print(f"查询进度失败: {e}")
- time.sleep(interval)
- raise TimeoutError(f"任务超时,等待时间超过 {timeout} 秒")
- class MockTestRunner:
- """Mock测试运行器"""
- def __init__(self, client: ConstructionReviewAPIClient):
- self.client = client
- def test_file_upload(self, file_path: str = None) -> Dict[str, Any]:
- """测试文件上传"""
- print("=== 测试文件上传 ===")
- # 创建测试文件(如果没有提供文件路径)
- if not file_path:
- test_file = Path(r"D:\wx_work\sichuan_luqiao\LQAgentPlatform\data_pipeline\test_rawdata\1f3e1d98-5b4a-4a06-87b3-c7f0413b901a.pdf")
- if not test_file.exists():
- # 创建一个简单的测试PDF文件内容
- test_file.write_bytes(b"%PDF-1.4\n%Mock PDF for testing\n")
- file_path = str(test_file)
- try:
- result = self.client.upload_file(
- file_path=file_path,
- project_plan_type="bridge_up_part",
- user=str(uuid.uuid4()),
- callback_url="https://client.example.com/callback"
- )
- print(f"✅ 文件上传成功")
- print(f"文件ID: {result['data']['id']}")
- print(f"任务ID: {result['data']['callback_task_id']}")
- return result
- except Exception as e:
- print(f"❌ 文件上传失败: {e}")
- raise
- def test_progress_query(self, callback_task_id: str, user: str) -> None:
- """测试进度查询"""
- print("\n=== 测试进度查询 ===")
- try:
- result = self.client.get_task_progress(callback_task_id, user)
- print(f"✅ 进度查询成功")
- print(f"任务状态: {result['data']['review_task_status']}")
- print(f"总进度: {result['data']['overall_progress']}%")
- for stage in result['data']['stages']:
- print(f" - {stage['stage_name']}: {stage['progress']}% ({stage['stage_status']})")
- except Exception as e:
- print(f"❌ 进度查询失败: {e}")
- raise
- def test_review_results(self, file_id: str, user: str) -> None:
- """测试审查结果获取"""
- print("\n=== 测试审查结果获取 ===")
- # 测试获取总结报告
- try:
- result = self.client.get_review_results(file_id, user, "summary")
- print(f"✅ 总结报告获取成功")
- print(f"风险统计: {result['data']['risk_stats']}")
- print(f"四维评分: {result['data']['dimension_scores']}")
- print(f"总结报告: {result['data']['summary_report']}")
- except Exception as e:
- print(f"❌ 总结报告获取失败: {e}")
- # 测试获取问题条文
- try:
- result = self.client.get_review_results(file_id, user, "issues")
- print(f"\n✅ 问题条文获取成功")
- issues = result['data']['issues']
- print(f"发现问题数量: {len(issues)}")
- for i, issue in enumerate(issues):
- print(f"\n问题 {i+1}:")
- print(f" ID: {issue['issue_id']}")
- print(f" 页码: {issue['metadata']['page']}")
- print(f" 章节: {issue['metadata']['chapter']}")
- print(f" 风险等级: {issue['risk_summary']['max_risk_level']}")
- print(f" 检查项数量: {len(issue['review_lists'])}")
- except Exception as e:
- print(f"❌ 问题条文获取失败: {e}")
- def run_complete_test(self) -> None:
- """运行完整测试流程"""
- print("开始施工方案审查API完整测试...")
- try:
- # 1. 上传文件
- upload_result = self.test_file_upload()
- file_id = upload_result['data']['id']
- callback_task_id = upload_result['data']['callback_task_id']
- user = str(uuid.uuid4()) # 实际应该从上传响应中获取,这里简化
- # 2. 查询进度(等待一段时间让任务完成)
- print("\n等待任务完成...")
- time.sleep(2) # 短暂等待
- # 先测试进度查询
- self.test_progress_query(callback_task_id, user)
- # 3. 获取审查结果(可能需要等待任务完成)
- print("\n获取审查结果...")
- # 如果任务还未完成,直接标记完成(仅用于Mock测试)
- try:
- self.test_review_results(file_id, user)
- except Exception as e:
- print(f"审查结果获取失败,尝试完成任务: {e}")
- # 完成任务(Mock功能)
- response = requests.post(f"{self.client.base_url}/sgsc/mock/complete_task",
- data={"callback_task_id": callback_task_id})
- print("任务已强制完成,重新获取结果...")
- self.test_review_results(file_id, user)
- print("\n🎉 完整测试流程执行成功!")
- except Exception as e:
- print(f"\n❌ 测试失败: {e}")
- raise
- def main():
- """主函数 - 运行测试"""
- print("施工方案审查API Mock测试客户端")
- print("=" * 50)
- # 创建客户端
- client = ConstructionReviewAPIClient(
- base_url="http://127.0.0.1:8034",
- api_key="mock-api-key-12345"
- )
- # 创建测试运行器
- test_runner = MockTestRunner(client)
- # 运行完整测试
- test_runner.run_complete_test()
- if __name__ == "__main__":
- main()
|