test_异步方法阻塞问题测试.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. '''
  4. @Project : lq-agent-api
  5. @File :test_async_blocking_demo.py
  6. @IDE :PyCharm
  7. @Author :
  8. @Date :2025/12/26
  9. '''
  10. import asyncio
  11. import time
  12. from typing import List, Tuple
  13. # ==================== 模拟任务函数 ====================
  14. async def normal_task(name: str, delay: float) -> str:
  15. """
  16. 正常任务,会在指定延迟后完成
  17. """
  18. print(f"[{time.strftime('%H:%M:%S')}] [成功] {name} 开始执行(预计{delay}秒后完成)")
  19. await asyncio.sleep(delay)
  20. print(f"[{time.strftime('%H:%M:%S')}] [成功] {name} 完成")
  21. return f"{name}的结果"
  22. async def hanging_task(name: str) -> str:
  23. """
  24. 会卡住的任务,模拟无响应的情况
  25. """
  26. print(f"[{time.strftime('%H:%M:%S')}] [卡住] {name} 开始执行(会卡住)")
  27. await asyncio.sleep(100) # 模拟卡住
  28. return f"{name}的结果"
  29. async def timeout_task(name: str, delay: float) -> str:
  30. """
  31. 会超时的任务
  32. """
  33. print(f"[{time.strftime('%H:%M:%S')}] [超时] {name} 开始执行(预计{delay}秒,超过2秒会超时)")
  34. await asyncio.sleep(delay)
  35. return f"{name}的结果"
  36. # ==================== 测试场景 ====================
  37. async def test_scenario_1_gather_without_protection():
  38. """
  39. 场景1: 使用 gather 但没有超时保护 - 会导致永久阻塞
  40. """
  41. print("\n" + "="*80)
  42. print("场景1: gather + 无超时保护 (会永久阻塞)")
  43. print("="*80)
  44. start = time.time()
  45. try:
  46. # 添加整体超时保护,否则会永久卡住
  47. results = await asyncio.wait_for(
  48. asyncio.gather(
  49. normal_task("任务1", 1),
  50. normal_task("任务2", 0.5),
  51. hanging_task("任务3"), # 这个会卡住
  52. ),
  53. timeout=5 # 5秒后强制超时
  54. )
  55. except asyncio.TimeoutError:
  56. print(f"[{time.strftime('%H:%M:%S')}] [警告] 整体超时!被任务3阻塞了")
  57. elapsed = time.time() - start
  58. print(f"\n[统计] 总耗时: {elapsed:.1f}秒")
  59. print("[问题] 任务1和任务2早就完成了,但要等任务3超时才结束")
  60. async def test_scenario_2_gather_with_wait_for():
  61. """
  62. 场景2: gather + wait_for (每个任务有独立超时)
  63. """
  64. print("\n" + "="*80)
  65. print("场景2: gather + wait_for (每个任务独立超时)")
  66. print("="*80)
  67. start = time.time()
  68. try:
  69. results = await asyncio.gather(
  70. asyncio.wait_for(normal_task("任务1", 1), timeout=3),
  71. asyncio.wait_for(timeout_task("任务2", 5), timeout=2), # 会超时
  72. asyncio.wait_for(normal_task("任务3", 0.5), timeout=3),
  73. )
  74. print(f"\n[统计] 所有任务结果: {results}")
  75. except asyncio.TimeoutError as e:
  76. print(f"[{time.strftime('%H:%M:%S')}] [警告] 某个任务超时: {e}")
  77. print("[问题] 任务2超时导致整个gather失败,任务1和任务3的结果也丢失了")
  78. elapsed = time.time() - start
  79. print(f"[统计] 总耗时: {elapsed:.1f}秒")
  80. async def test_scenario_3_gather_with_return_exceptions():
  81. """
  82. 场景3: gather + return_exceptions=True (推荐方案1)
  83. """
  84. print("\n" + "="*80)
  85. print("场景3: gather + wait_for + return_exceptions=True [推荐]")
  86. print("="*80)
  87. start = time.time()
  88. results = await asyncio.gather(
  89. asyncio.wait_for(normal_task("任务1", 1), timeout=3),
  90. asyncio.wait_for(timeout_task("任务2", 5), timeout=2), # 会超时
  91. asyncio.wait_for(normal_task("任务3", 0.5), timeout=3),
  92. return_exceptions=True # 关键:超时返回异常而不是抛出
  93. )
  94. print(f"\n[统计] 所有任务结果:")
  95. for i, result in enumerate(results, 1):
  96. if isinstance(result, Exception):
  97. print(f" 任务{i}: [失败] {type(result).__name__}")
  98. else:
  99. print(f" 任务{i}: [成功] {result}")
  100. elapsed = time.time() - start
  101. print(f"\n[统计] 总耗时: {elapsed:.1f}秒")
  102. print("[优点] 任务1和任务3成功,任务2超时但不影响其他任务")
  103. async def test_scenario_4_wait_with_timeout():
  104. """
  105. 场景4: asyncio.wait + 超时控制 (推荐方案2)
  106. """
  107. print("\n" + "="*80)
  108. print("场景4: asyncio.wait + 整体超时 + 自动取消未完成任务 [推荐]")
  109. print("="*80)
  110. start = time.time()
  111. # 创建任务
  112. tasks = [
  113. asyncio.create_task(asyncio.wait_for(normal_task("任务1", 1), timeout=3)),
  114. asyncio.create_task(asyncio.wait_for(timeout_task("任务2", 5), timeout=2)),
  115. asyncio.create_task(asyncio.wait_for(normal_task("任务3", 0.5), timeout=3)),
  116. asyncio.create_task(hanging_task("任务4")), # 会卡住
  117. ]
  118. # 等待任务完成(整体超时)
  119. done, pending = await asyncio.wait(tasks, timeout=5)
  120. print(f"\n[{time.strftime('%H:%M:%S')}] [统计] 完成状态: 已完成={len(done)}, 未完成={len(pending)}")
  121. # 取消未完成的任务
  122. for task in pending:
  123. task.cancel()
  124. print(f"[{time.strftime('%H:%M:%S')}] [取消] 取消未完成任务")
  125. # 收集结果
  126. print(f"\n[统计] 任务执行结果:")
  127. for i, task in enumerate(tasks, 1):
  128. try:
  129. if task.done():
  130. result = task.result()
  131. print(f" 任务{i}: [成功] {result}")
  132. else:
  133. print(f" 任务{i}: [未完成] (已取消)")
  134. except Exception as e:
  135. print(f" 任务{i}: [失败] {type(e).__name__}")
  136. elapsed = time.time() - start
  137. print(f"\n[统计] 总耗时: {elapsed:.1f}秒")
  138. print("[优点] 可以精确控制哪些任务完成,哪些未完成,并自动取消未完成任务")
  139. async def test_scenario_5_real_world_simulation():
  140. """
  141. 场景5: 模拟真实场景 - 多个审查任务并发执行
  142. """
  143. print("\n" + "="*80)
  144. print("场景5: 真实场景模拟 - 并发审查多个文档")
  145. print("="*80)
  146. # 模拟审查任务
  147. async def review_document(doc_id: str, processing_time: float) -> dict:
  148. print(f"[{time.strftime('%H:%M:%S')}] [文档] 开始审查文档 {doc_id}")
  149. await asyncio.sleep(processing_time)
  150. return {"doc_id": doc_id, "status": "通过", "issues": []}
  151. # 创建多个审查任务
  152. review_tasks = {
  153. "doc_001": asyncio.create_task(review_document("doc_001", 0.8)),
  154. "doc_002": asyncio.create_task(review_document("doc_002", 1.2)),
  155. "doc_003": asyncio.create_task(review_document("doc_003", 5.0)), # 会超时
  156. "doc_004": asyncio.create_task(review_document("doc_004", 0.5)),
  157. }
  158. print(f"\n[{time.strftime('%H:%M:%S')}] [开始] 并发启动 {len(review_tasks)} 个审查任务")
  159. start = time.time()
  160. # 方案A: 使用 gather + return_exceptions (推荐)
  161. print("\n--- 使用 gather + return_exceptions ---")
  162. results = await asyncio.gather(
  163. *[asyncio.wait_for(task, timeout=2) for task in review_tasks.values()],
  164. return_exceptions=True
  165. )
  166. # 统计结果
  167. success_count = sum(1 for r in results if not isinstance(r, Exception))
  168. timeout_count = sum(1 for r in results if isinstance(r, (asyncio.TimeoutError, TimeoutError)))
  169. print(f"\n[统计] 审查结果统计:")
  170. print(f" [成功] {success_count} 个文档")
  171. print(f" [超时] {timeout_count} 个文档")
  172. print(f" [成功率] {success_count/len(results)*100:.1f}%")
  173. elapsed = time.time() - start
  174. print(f"\n[统计] 总耗时: {elapsed:.1f}秒")
  175. print("[说明] 虽然doc_003超时,但其他文档都成功审查,整体流程未受阻")
  176. # ==================== 主函数 ====================
  177. async def main():
  178. """
  179. 运行所有测试场景
  180. """
  181. print("="*80)
  182. print("异步方法阻塞问题演示")
  183. print("="*80)
  184. print("\n本演示将展示不同的异步并发模式及其对阻塞问题的处理方式")
  185. print("请观察每种场景的耗时和任务完成情况\n")
  186. # 运行所有测试
  187. await test_scenario_1_gather_without_protection()
  188. await test_scenario_2_gather_with_wait_for()
  189. await test_scenario_3_gather_with_return_exceptions()
  190. await test_scenario_4_wait_with_timeout()
  191. await test_scenario_5_real_world_simulation()
  192. print("\n" + "="*80)
  193. print("演示完成!")
  194. print("="*80)
  195. print("\n[总结] 建议:")
  196. print("1. [避免] gather + 无超时保护 → 会永久阻塞")
  197. print("2. [谨慎] gather + wait_for → 单个超时会丢失所有结果")
  198. print("3. [推荐] gather + wait_for + return_exceptions=True")
  199. print("4. [推荐] asyncio.wait + 超时 + 手动取消未完成任务")
  200. print("5. [提示] 根据场景选择: 需要所有结果用方案3,需要精细控制用方案4")
  201. if __name__ == "__main__":
  202. # 运行演示
  203. asyncio.run(main())