| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- '''
- @Project : lq-agent-api
- @File :test_async_blocking_demo.py
- @IDE :PyCharm
- @Author :
- @Date :2025/12/26
- '''
- import asyncio
- import time
- from typing import List, Tuple
- # ==================== 模拟任务函数 ====================
- async def normal_task(name: str, delay: float) -> str:
- """
- 正常任务,会在指定延迟后完成
- """
- print(f"[{time.strftime('%H:%M:%S')}] [成功] {name} 开始执行(预计{delay}秒后完成)")
- await asyncio.sleep(delay)
- print(f"[{time.strftime('%H:%M:%S')}] [成功] {name} 完成")
- return f"{name}的结果"
- async def hanging_task(name: str) -> str:
- """
- 会卡住的任务,模拟无响应的情况
- """
- print(f"[{time.strftime('%H:%M:%S')}] [卡住] {name} 开始执行(会卡住)")
- await asyncio.sleep(100) # 模拟卡住
- return f"{name}的结果"
- async def timeout_task(name: str, delay: float) -> str:
- """
- 会超时的任务
- """
- print(f"[{time.strftime('%H:%M:%S')}] [超时] {name} 开始执行(预计{delay}秒,超过2秒会超时)")
- await asyncio.sleep(delay)
- return f"{name}的结果"
- # ==================== 测试场景 ====================
- async def test_scenario_1_gather_without_protection():
- """
- 场景1: 使用 gather 但没有超时保护 - 会导致永久阻塞
- """
- print("\n" + "="*80)
- print("场景1: gather + 无超时保护 (会永久阻塞)")
- print("="*80)
- start = time.time()
- try:
- # 添加整体超时保护,否则会永久卡住
- results = await asyncio.wait_for(
- asyncio.gather(
- normal_task("任务1", 1),
- normal_task("任务2", 0.5),
- hanging_task("任务3"), # 这个会卡住
- ),
- timeout=5 # 5秒后强制超时
- )
- except asyncio.TimeoutError:
- print(f"[{time.strftime('%H:%M:%S')}] [警告] 整体超时!被任务3阻塞了")
- elapsed = time.time() - start
- print(f"\n[统计] 总耗时: {elapsed:.1f}秒")
- print("[问题] 任务1和任务2早就完成了,但要等任务3超时才结束")
- async def test_scenario_2_gather_with_wait_for():
- """
- 场景2: gather + wait_for (每个任务有独立超时)
- """
- print("\n" + "="*80)
- print("场景2: gather + wait_for (每个任务独立超时)")
- print("="*80)
- start = time.time()
- try:
- results = await asyncio.gather(
- asyncio.wait_for(normal_task("任务1", 1), timeout=3),
- asyncio.wait_for(timeout_task("任务2", 5), timeout=2), # 会超时
- asyncio.wait_for(normal_task("任务3", 0.5), timeout=3),
- )
- print(f"\n[统计] 所有任务结果: {results}")
- except asyncio.TimeoutError as e:
- print(f"[{time.strftime('%H:%M:%S')}] [警告] 某个任务超时: {e}")
- print("[问题] 任务2超时导致整个gather失败,任务1和任务3的结果也丢失了")
- elapsed = time.time() - start
- print(f"[统计] 总耗时: {elapsed:.1f}秒")
- async def test_scenario_3_gather_with_return_exceptions():
- """
- 场景3: gather + return_exceptions=True (推荐方案1)
- """
- print("\n" + "="*80)
- print("场景3: gather + wait_for + return_exceptions=True [推荐]")
- print("="*80)
- start = time.time()
- results = await asyncio.gather(
- asyncio.wait_for(normal_task("任务1", 1), timeout=3),
- asyncio.wait_for(timeout_task("任务2", 5), timeout=2), # 会超时
- asyncio.wait_for(normal_task("任务3", 0.5), timeout=3),
- return_exceptions=True # 关键:超时返回异常而不是抛出
- )
- print(f"\n[统计] 所有任务结果:")
- for i, result in enumerate(results, 1):
- if isinstance(result, Exception):
- print(f" 任务{i}: [失败] {type(result).__name__}")
- else:
- print(f" 任务{i}: [成功] {result}")
- elapsed = time.time() - start
- print(f"\n[统计] 总耗时: {elapsed:.1f}秒")
- print("[优点] 任务1和任务3成功,任务2超时但不影响其他任务")
- async def test_scenario_4_wait_with_timeout():
- """
- 场景4: asyncio.wait + 超时控制 (推荐方案2)
- """
- print("\n" + "="*80)
- print("场景4: asyncio.wait + 整体超时 + 自动取消未完成任务 [推荐]")
- print("="*80)
- start = time.time()
- # 创建任务
- tasks = [
- asyncio.create_task(asyncio.wait_for(normal_task("任务1", 1), timeout=3)),
- asyncio.create_task(asyncio.wait_for(timeout_task("任务2", 5), timeout=2)),
- asyncio.create_task(asyncio.wait_for(normal_task("任务3", 0.5), timeout=3)),
- asyncio.create_task(hanging_task("任务4")), # 会卡住
- ]
- # 等待任务完成(整体超时)
- done, pending = await asyncio.wait(tasks, timeout=5)
- print(f"\n[{time.strftime('%H:%M:%S')}] [统计] 完成状态: 已完成={len(done)}, 未完成={len(pending)}")
- # 取消未完成的任务
- for task in pending:
- task.cancel()
- print(f"[{time.strftime('%H:%M:%S')}] [取消] 取消未完成任务")
- # 收集结果
- print(f"\n[统计] 任务执行结果:")
- for i, task in enumerate(tasks, 1):
- try:
- if task.done():
- result = task.result()
- print(f" 任务{i}: [成功] {result}")
- else:
- print(f" 任务{i}: [未完成] (已取消)")
- except Exception as e:
- print(f" 任务{i}: [失败] {type(e).__name__}")
- elapsed = time.time() - start
- print(f"\n[统计] 总耗时: {elapsed:.1f}秒")
- print("[优点] 可以精确控制哪些任务完成,哪些未完成,并自动取消未完成任务")
- async def test_scenario_5_real_world_simulation():
- """
- 场景5: 模拟真实场景 - 多个审查任务并发执行
- """
- print("\n" + "="*80)
- print("场景5: 真实场景模拟 - 并发审查多个文档")
- print("="*80)
- # 模拟审查任务
- async def review_document(doc_id: str, processing_time: float) -> dict:
- print(f"[{time.strftime('%H:%M:%S')}] [文档] 开始审查文档 {doc_id}")
- await asyncio.sleep(processing_time)
- return {"doc_id": doc_id, "status": "通过", "issues": []}
- # 创建多个审查任务
- review_tasks = {
- "doc_001": asyncio.create_task(review_document("doc_001", 0.8)),
- "doc_002": asyncio.create_task(review_document("doc_002", 1.2)),
- "doc_003": asyncio.create_task(review_document("doc_003", 5.0)), # 会超时
- "doc_004": asyncio.create_task(review_document("doc_004", 0.5)),
- }
- print(f"\n[{time.strftime('%H:%M:%S')}] [开始] 并发启动 {len(review_tasks)} 个审查任务")
- start = time.time()
- # 方案A: 使用 gather + return_exceptions (推荐)
- print("\n--- 使用 gather + return_exceptions ---")
- results = await asyncio.gather(
- *[asyncio.wait_for(task, timeout=2) for task in review_tasks.values()],
- return_exceptions=True
- )
- # 统计结果
- success_count = sum(1 for r in results if not isinstance(r, Exception))
- timeout_count = sum(1 for r in results if isinstance(r, (asyncio.TimeoutError, TimeoutError)))
- print(f"\n[统计] 审查结果统计:")
- print(f" [成功] {success_count} 个文档")
- print(f" [超时] {timeout_count} 个文档")
- print(f" [成功率] {success_count/len(results)*100:.1f}%")
- elapsed = time.time() - start
- print(f"\n[统计] 总耗时: {elapsed:.1f}秒")
- print("[说明] 虽然doc_003超时,但其他文档都成功审查,整体流程未受阻")
- # ==================== 主函数 ====================
- async def main():
- """
- 运行所有测试场景
- """
- print("="*80)
- print("异步方法阻塞问题演示")
- print("="*80)
- print("\n本演示将展示不同的异步并发模式及其对阻塞问题的处理方式")
- print("请观察每种场景的耗时和任务完成情况\n")
- # 运行所有测试
- await test_scenario_1_gather_without_protection()
- await test_scenario_2_gather_with_wait_for()
- await test_scenario_3_gather_with_return_exceptions()
- await test_scenario_4_wait_with_timeout()
- await test_scenario_5_real_world_simulation()
- print("\n" + "="*80)
- print("演示完成!")
- print("="*80)
- print("\n[总结] 建议:")
- print("1. [避免] gather + 无超时保护 → 会永久阻塞")
- print("2. [谨慎] gather + wait_for → 单个超时会丢失所有结果")
- print("3. [推荐] gather + wait_for + return_exceptions=True")
- print("4. [推荐] asyncio.wait + 超时 + 手动取消未完成任务")
- print("5. [提示] 根据场景选择: 需要所有结果用方案3,需要精细控制用方案4")
- if __name__ == "__main__":
- # 运行演示
- asyncio.run(main())
|