#!/usr/bin/env python # -*- coding: utf-8 -*- ''' @Project : lq-agent-api @File :test_async_blocking_demo.py @IDE :PyCharm @Author : @Date :2025/12/26 ''' import asyncio import time from typing import List, Tuple # ==================== 模拟任务函数 ==================== async def normal_task(name: str, delay: float) -> str: """ 正常任务,会在指定延迟后完成 """ print(f"[{time.strftime('%H:%M:%S')}] [成功] {name} 开始执行(预计{delay}秒后完成)") await asyncio.sleep(delay) print(f"[{time.strftime('%H:%M:%S')}] [成功] {name} 完成") return f"{name}的结果" async def hanging_task(name: str) -> str: """ 会卡住的任务,模拟无响应的情况 """ print(f"[{time.strftime('%H:%M:%S')}] [卡住] {name} 开始执行(会卡住)") await asyncio.sleep(100) # 模拟卡住 return f"{name}的结果" async def timeout_task(name: str, delay: float) -> str: """ 会超时的任务 """ print(f"[{time.strftime('%H:%M:%S')}] [超时] {name} 开始执行(预计{delay}秒,超过2秒会超时)") await asyncio.sleep(delay) return f"{name}的结果" # ==================== 测试场景 ==================== async def test_scenario_1_gather_without_protection(): """ 场景1: 使用 gather 但没有超时保护 - 会导致永久阻塞 """ print("\n" + "="*80) print("场景1: gather + 无超时保护 (会永久阻塞)") print("="*80) start = time.time() try: # 添加整体超时保护,否则会永久卡住 results = await asyncio.wait_for( asyncio.gather( normal_task("任务1", 1), normal_task("任务2", 0.5), hanging_task("任务3"), # 这个会卡住 ), timeout=5 # 5秒后强制超时 ) except asyncio.TimeoutError: print(f"[{time.strftime('%H:%M:%S')}] [警告] 整体超时!被任务3阻塞了") elapsed = time.time() - start print(f"\n[统计] 总耗时: {elapsed:.1f}秒") print("[问题] 任务1和任务2早就完成了,但要等任务3超时才结束") async def test_scenario_2_gather_with_wait_for(): """ 场景2: gather + wait_for (每个任务有独立超时) """ print("\n" + "="*80) print("场景2: gather + wait_for (每个任务独立超时)") print("="*80) start = time.time() try: results = await asyncio.gather( asyncio.wait_for(normal_task("任务1", 1), timeout=3), asyncio.wait_for(timeout_task("任务2", 5), timeout=2), # 会超时 asyncio.wait_for(normal_task("任务3", 0.5), timeout=3), ) print(f"\n[统计] 所有任务结果: {results}") except asyncio.TimeoutError as e: print(f"[{time.strftime('%H:%M:%S')}] [警告] 某个任务超时: {e}") print("[问题] 任务2超时导致整个gather失败,任务1和任务3的结果也丢失了") elapsed = time.time() - start print(f"[统计] 总耗时: {elapsed:.1f}秒") async def test_scenario_3_gather_with_return_exceptions(): """ 场景3: gather + return_exceptions=True (推荐方案1) """ print("\n" + "="*80) print("场景3: gather + wait_for + return_exceptions=True [推荐]") print("="*80) start = time.time() results = await asyncio.gather( asyncio.wait_for(normal_task("任务1", 1), timeout=3), asyncio.wait_for(timeout_task("任务2", 5), timeout=2), # 会超时 asyncio.wait_for(normal_task("任务3", 0.5), timeout=3), return_exceptions=True # 关键:超时返回异常而不是抛出 ) print(f"\n[统计] 所有任务结果:") for i, result in enumerate(results, 1): if isinstance(result, Exception): print(f" 任务{i}: [失败] {type(result).__name__}") else: print(f" 任务{i}: [成功] {result}") elapsed = time.time() - start print(f"\n[统计] 总耗时: {elapsed:.1f}秒") print("[优点] 任务1和任务3成功,任务2超时但不影响其他任务") async def test_scenario_4_wait_with_timeout(): """ 场景4: asyncio.wait + 超时控制 (推荐方案2) """ print("\n" + "="*80) print("场景4: asyncio.wait + 整体超时 + 自动取消未完成任务 [推荐]") print("="*80) start = time.time() # 创建任务 tasks = [ asyncio.create_task(asyncio.wait_for(normal_task("任务1", 1), timeout=3)), asyncio.create_task(asyncio.wait_for(timeout_task("任务2", 5), timeout=2)), asyncio.create_task(asyncio.wait_for(normal_task("任务3", 0.5), timeout=3)), asyncio.create_task(hanging_task("任务4")), # 会卡住 ] # 等待任务完成(整体超时) done, pending = await asyncio.wait(tasks, timeout=5) print(f"\n[{time.strftime('%H:%M:%S')}] [统计] 完成状态: 已完成={len(done)}, 未完成={len(pending)}") # 取消未完成的任务 for task in pending: task.cancel() print(f"[{time.strftime('%H:%M:%S')}] [取消] 取消未完成任务") # 收集结果 print(f"\n[统计] 任务执行结果:") for i, task in enumerate(tasks, 1): try: if task.done(): result = task.result() print(f" 任务{i}: [成功] {result}") else: print(f" 任务{i}: [未完成] (已取消)") except Exception as e: print(f" 任务{i}: [失败] {type(e).__name__}") elapsed = time.time() - start print(f"\n[统计] 总耗时: {elapsed:.1f}秒") print("[优点] 可以精确控制哪些任务完成,哪些未完成,并自动取消未完成任务") async def test_scenario_5_real_world_simulation(): """ 场景5: 模拟真实场景 - 多个审查任务并发执行 """ print("\n" + "="*80) print("场景5: 真实场景模拟 - 并发审查多个文档") print("="*80) # 模拟审查任务 async def review_document(doc_id: str, processing_time: float) -> dict: print(f"[{time.strftime('%H:%M:%S')}] [文档] 开始审查文档 {doc_id}") await asyncio.sleep(processing_time) return {"doc_id": doc_id, "status": "通过", "issues": []} # 创建多个审查任务 review_tasks = { "doc_001": asyncio.create_task(review_document("doc_001", 0.8)), "doc_002": asyncio.create_task(review_document("doc_002", 1.2)), "doc_003": asyncio.create_task(review_document("doc_003", 5.0)), # 会超时 "doc_004": asyncio.create_task(review_document("doc_004", 0.5)), } print(f"\n[{time.strftime('%H:%M:%S')}] [开始] 并发启动 {len(review_tasks)} 个审查任务") start = time.time() # 方案A: 使用 gather + return_exceptions (推荐) print("\n--- 使用 gather + return_exceptions ---") results = await asyncio.gather( *[asyncio.wait_for(task, timeout=2) for task in review_tasks.values()], return_exceptions=True ) # 统计结果 success_count = sum(1 for r in results if not isinstance(r, Exception)) timeout_count = sum(1 for r in results if isinstance(r, (asyncio.TimeoutError, TimeoutError))) print(f"\n[统计] 审查结果统计:") print(f" [成功] {success_count} 个文档") print(f" [超时] {timeout_count} 个文档") print(f" [成功率] {success_count/len(results)*100:.1f}%") elapsed = time.time() - start print(f"\n[统计] 总耗时: {elapsed:.1f}秒") print("[说明] 虽然doc_003超时,但其他文档都成功审查,整体流程未受阻") # ==================== 主函数 ==================== async def main(): """ 运行所有测试场景 """ print("="*80) print("异步方法阻塞问题演示") print("="*80) print("\n本演示将展示不同的异步并发模式及其对阻塞问题的处理方式") print("请观察每种场景的耗时和任务完成情况\n") # 运行所有测试 await test_scenario_1_gather_without_protection() await test_scenario_2_gather_with_wait_for() await test_scenario_3_gather_with_return_exceptions() await test_scenario_4_wait_with_timeout() await test_scenario_5_real_world_simulation() print("\n" + "="*80) print("演示完成!") print("="*80) print("\n[总结] 建议:") print("1. [避免] gather + 无超时保护 → 会永久阻塞") print("2. [谨慎] gather + wait_for → 单个超时会丢失所有结果") print("3. [推荐] gather + wait_for + return_exceptions=True") print("4. [推荐] asyncio.wait + 超时 + 手动取消未完成任务") print("5. [提示] 根据场景选择: 需要所有结果用方案3,需要精细控制用方案4") if __name__ == "__main__": # 运行演示 asyncio.run(main())