queue.py 807 B

12345678910111213141516171819202122232425262728293031323334
  1. import asyncio
  2. import logging
  3. logger = logging.getLogger(__name__)
  4. class AsyncUniqueQueue:
  5. def __init__(self):
  6. self.queue = asyncio.Queue()
  7. self.set = set()
  8. self.lock = asyncio.Lock()
  9. async def put(self, item):
  10. async with self.lock:
  11. if item not in self.set:
  12. self.set.add(item)
  13. await self.queue.put(item)
  14. else:
  15. logger.debug(f"Item {item.__hash__()} already in queue.")
  16. async def get(self):
  17. item = await self.queue.get()
  18. async with self.lock:
  19. self.set.remove(item)
  20. return item
  21. def qsize(self):
  22. return self.queue.qsize()
  23. async def empty(self):
  24. return self.queue.empty()
  25. def task_done(self):
  26. self.queue.task_done()