api_test_client.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. """
  2. 施工方案审查API测试客户端
  3. 用于测试Mock接口和前端联调
  4. """
  5. import requests
  6. import json
  7. import time
  8. import uuid
  9. from pathlib import Path
  10. from typing import Optional, Dict, Any
  11. class ConstructionReviewAPIClient:
  12. """施工方案审查API客户端"""
  13. def __init__(self, base_url: str = "http://127.0.0.1:8034", api_key: Optional[str] = None):
  14. self.base_url = base_url.rstrip('/')
  15. self.api_key = api_key
  16. self.session = requests.Session()
  17. if api_key:
  18. self.session.headers.update({
  19. 'Authorization': f'Bearer {api_key}'
  20. })
  21. def upload_file(self, file_path: str, project_plan_type: str, user: str,
  22. callback_url: Optional[str] = None) -> Dict[str, Any]:
  23. """
  24. 上传文件
  25. Args:
  26. file_path: 文件路径
  27. project_plan_type: 工程方案类型
  28. user: 用户标识
  29. callback_url: 回调URL(可选)
  30. Returns:
  31. 上传响应结果
  32. """
  33. url = f"{self.base_url}/sgsc/file_upload"
  34. if not Path(file_path).exists():
  35. raise FileNotFoundError(f"文件不存在: {file_path}")
  36. with open(file_path, 'rb') as f:
  37. files = {'file': f}
  38. data = {
  39. 'project_plan_type': project_plan_type,
  40. 'user': user
  41. }
  42. if callback_url:
  43. data['callback_url'] = callback_url
  44. response = self.session.post(url, files=files, data=data)
  45. response.raise_for_status()
  46. return response.json()
  47. def get_task_progress(self, callback_task_id: str, user: str) -> Dict[str, Any]:
  48. """
  49. 查询任务进度
  50. Args:
  51. callback_task_id: 任务ID
  52. user: 用户标识
  53. Returns:
  54. 进度查询结果
  55. """
  56. url = f"{self.base_url}/sgsc/task_progress/{callback_task_id}"
  57. params = {'user': user}
  58. response = self.session.get(url, params=params)
  59. response.raise_for_status()
  60. return response.json()
  61. def get_review_results(self, file_id: str, user: str, result_type: str) -> Dict[str, Any]:
  62. """
  63. 获取审查结果
  64. Args:
  65. file_id: 文件ID
  66. user: 用户标识
  67. result_type: 结果类型 ("summary" 或 "issues")
  68. Returns:
  69. 审查结果
  70. """
  71. url = f"{self.base_url}/sgsc/review_results"
  72. data = {
  73. 'id': file_id,
  74. 'user': user,
  75. 'type': result_type
  76. }
  77. response = self.session.post(url, json=data)
  78. response.raise_for_status()
  79. return response.json()
  80. def wait_for_completion(self, callback_task_id: str, user: str,
  81. timeout: int = 1800, interval: int = 10) -> Dict[str, Any]:
  82. """
  83. 等待任务完成
  84. Args:
  85. callback_task_id: 任务ID
  86. user: 用户标识
  87. timeout: 超时时间(秒)
  88. interval: 轮询间隔(秒)
  89. Returns:
  90. 最终任务状态
  91. """
  92. start_time = time.time()
  93. while time.time() - start_time < timeout:
  94. try:
  95. result = self.get_task_progress(callback_task_id, user)
  96. if result['data']['review_task_status'] == 'completed':
  97. print(f"任务完成! 总进度: {result['data']['overall_progress']}%")
  98. return result
  99. else:
  100. progress = result['data']['overall_progress']
  101. print(f"任务进行中... 进度: {progress}%")
  102. time.sleep(interval)
  103. except Exception as e:
  104. print(f"查询进度失败: {e}")
  105. time.sleep(interval)
  106. raise TimeoutError(f"任务超时,等待时间超过 {timeout} 秒")
  107. class MockTestRunner:
  108. """Mock测试运行器"""
  109. def __init__(self, client: ConstructionReviewAPIClient):
  110. self.client = client
  111. def test_file_upload(self, file_path: str = None) -> Dict[str, Any]:
  112. """测试文件上传"""
  113. print("=== 测试文件上传 ===")
  114. # 创建测试文件(如果没有提供文件路径)
  115. if not file_path:
  116. test_file = Path(r"D:\wx_work\sichuan_luqiao\LQAgentPlatform\data_pipeline\test_rawdata\1f3e1d98-5b4a-4a06-87b3-c7f0413b901a.pdf")
  117. if not test_file.exists():
  118. # 创建一个简单的测试PDF文件内容
  119. test_file.write_bytes(b"%PDF-1.4\n%Mock PDF for testing\n")
  120. file_path = str(test_file)
  121. try:
  122. result = self.client.upload_file(
  123. file_path=file_path,
  124. project_plan_type="bridge_up_part",
  125. user=str(uuid.uuid4()),
  126. callback_url="https://client.example.com/callback"
  127. )
  128. print(f"✅ 文件上传成功")
  129. print(f"文件ID: {result['data']['id']}")
  130. print(f"任务ID: {result['data']['callback_task_id']}")
  131. return result
  132. except Exception as e:
  133. print(f"❌ 文件上传失败: {e}")
  134. raise
  135. def test_progress_query(self, callback_task_id: str, user: str) -> None:
  136. """测试进度查询"""
  137. print("\n=== 测试进度查询 ===")
  138. try:
  139. result = self.client.get_task_progress(callback_task_id, user)
  140. print(f"✅ 进度查询成功")
  141. print(f"任务状态: {result['data']['review_task_status']}")
  142. print(f"总进度: {result['data']['overall_progress']}%")
  143. for stage in result['data']['stages']:
  144. print(f" - {stage['stage_name']}: {stage['progress']}% ({stage['stage_status']})")
  145. except Exception as e:
  146. print(f"❌ 进度查询失败: {e}")
  147. raise
  148. def test_review_results(self, file_id: str, user: str) -> None:
  149. """测试审查结果获取"""
  150. print("\n=== 测试审查结果获取 ===")
  151. # 测试获取总结报告
  152. try:
  153. result = self.client.get_review_results(file_id, user, "summary")
  154. print(f"✅ 总结报告获取成功")
  155. print(f"风险统计: {result['data']['risk_stats']}")
  156. print(f"四维评分: {result['data']['dimension_scores']}")
  157. print(f"总结报告: {result['data']['summary_report']}")
  158. except Exception as e:
  159. print(f"❌ 总结报告获取失败: {e}")
  160. # 测试获取问题条文
  161. try:
  162. result = self.client.get_review_results(file_id, user, "issues")
  163. print(f"\n✅ 问题条文获取成功")
  164. issues = result['data']['issues']
  165. print(f"发现问题数量: {len(issues)}")
  166. for i, issue in enumerate(issues):
  167. print(f"\n问题 {i+1}:")
  168. print(f" ID: {issue['issue_id']}")
  169. print(f" 页码: {issue['metadata']['page']}")
  170. print(f" 章节: {issue['metadata']['chapter']}")
  171. print(f" 风险等级: {issue['risk_summary']['max_risk_level']}")
  172. print(f" 检查项数量: {len(issue['review_lists'])}")
  173. except Exception as e:
  174. print(f"❌ 问题条文获取失败: {e}")
  175. def run_complete_test(self) -> None:
  176. """运行完整测试流程"""
  177. print("开始施工方案审查API完整测试...")
  178. try:
  179. # 1. 上传文件
  180. upload_result = self.test_file_upload()
  181. file_id = upload_result['data']['id']
  182. callback_task_id = upload_result['data']['callback_task_id']
  183. user = str(uuid.uuid4()) # 实际应该从上传响应中获取,这里简化
  184. # 2. 查询进度(等待一段时间让任务完成)
  185. print("\n等待任务完成...")
  186. time.sleep(2) # 短暂等待
  187. # 先测试进度查询
  188. self.test_progress_query(callback_task_id, user)
  189. # 3. 获取审查结果(可能需要等待任务完成)
  190. print("\n获取审查结果...")
  191. # 如果任务还未完成,直接标记完成(仅用于Mock测试)
  192. try:
  193. self.test_review_results(file_id, user)
  194. except Exception as e:
  195. print(f"审查结果获取失败,尝试完成任务: {e}")
  196. # 完成任务(Mock功能)
  197. response = requests.post(f"{self.client.base_url}/sgsc/mock/complete_task",
  198. data={"callback_task_id": callback_task_id})
  199. print("任务已强制完成,重新获取结果...")
  200. self.test_review_results(file_id, user)
  201. print("\n🎉 完整测试流程执行成功!")
  202. except Exception as e:
  203. print(f"\n❌ 测试失败: {e}")
  204. raise
  205. def main():
  206. """主函数 - 运行测试"""
  207. print("施工方案审查API Mock测试客户端")
  208. print("=" * 50)
  209. # 创建客户端
  210. client = ConstructionReviewAPIClient(
  211. base_url="http://127.0.0.1:8034",
  212. api_key="mock-api-key-12345"
  213. )
  214. # 创建测试运行器
  215. test_runner = MockTestRunner(client)
  216. # 运行完整测试
  217. test_runner.run_complete_test()
  218. if __name__ == "__main__":
  219. main()