-
- Notifications
You must be signed in to change notification settings - Fork 33.5k
gh-96471: Add asyncio queue shutdown #104228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
440a702 fb458db e5951ac a72aedd d5e925d f3517fb bd2a7c3 e9ac8de 1e7813a 1275bb6 eec29bb 2c6156f 17f1f32 a233830 420a247 25ad2ac f3321b4 6d9edd6 1135d85 ddc6ad6 2fa1bd9 aef4063 d49c6dd 5a435a6 c8db40e ca01ee1 b02c4dd 8deca77 File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -62,6 +62,9 @@ Queue | |
| Remove and return an item from the queue. If queue is empty, | ||
| wait until an item is available. | ||
| | ||
| Raises :exc:`QueueShutDown` if the queue has been shut down and | ||
| is empty, or if the queue has been shut down immediately. | ||
| | ||
| .. method:: get_nowait() | ||
| | ||
| Return an item if one is immediately available, else raise | ||
| | @@ -77,11 +80,16 @@ Queue | |
| work on it is complete. When the count of unfinished tasks drops | ||
| to zero, :meth:`join` unblocks. | ||
| | ||
| Raises :exc:`QueueShutDown` if the queue has been shut down | ||
| immediately. | ||
| | ||
| .. coroutinemethod:: put(item) | ||
| | ||
| Put an item into the queue. If the queue is full, wait until a | ||
| free slot is available before adding the item. | ||
| | ||
| Raises :exc:`QueueShutDown` if the queue has been shut down. | ||
| | ||
| .. method:: put_nowait(item) | ||
| | ||
| Put an item into the queue without blocking. | ||
| | @@ -92,6 +100,21 @@ Queue | |
| | ||
| Return the number of items in the queue. | ||
| | ||
| .. method:: shutdown(immediate=False) | ||
| | ||
| Shut down the queue, making :meth:`~Queue.get` and :meth:`~Queue.put` | ||
| raise :exc:`QueueShutDown`. | ||
| | ||
| By default, :meth:`~Queue.get` on a shut down queue will only raise once | ||
| the queue is empty. Set *immediate* to true to make gets raise | ||
| immediately instead. | ||
| | ||
| All blocked callers of :meth:`~Queue.put` will be unblocked. If | ||
| *immediate* is true, also unblock callers of :meth:`~Queue.get` and | ||
| :meth:`~Queue.join`. | ||
| | ||
| Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry but I have a doubt, shouldn't this documentation block be rather: In event of change, the docstring of the Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Also, I think the threading queue docs are the same. Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It's very precise, better.
Yes, I commented here so as not to forget (see #117532 (comment)). English is your native language, I think it'is best if you update documentations and docstrings. Update: but I can create the follow-up PR. Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've made a PR: #117621 | ||
| .. versionadded:: 3.13 | ||
| | ||
| .. method:: task_done() | ||
| | ||
| Indicate that a formerly enqueued task is complete. | ||
| | @@ -108,6 +131,9 @@ Queue | |
| Raises :exc:`ValueError` if called more times than there were | ||
| items placed in the queue. | ||
| | ||
| Raises :exc:`QueueShutDown` if the queue has been shut down | ||
| immediately. | ||
| | ||
| | ||
| Priority Queue | ||
| ============== | ||
| | @@ -145,6 +171,14 @@ Exceptions | |
| on a queue that has reached its *maxsize*. | ||
| | ||
| | ||
| .. exception:: QueueShutDown | ||
| | ||
| Exception raised when getting an item from or putting an item onto a | ||
| queue which has been shut down. | ||
| | ||
| .. versionadded:: 3.13 | ||
| | ||
| | ||
| Examples | ||
| ======== | ||
| | ||
| | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,14 @@ | ||
| __all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty') | ||
| __all__ = ( | ||
| 'Queue', | ||
| 'PriorityQueue', | ||
| 'LifoQueue', | ||
| 'QueueFull', | ||
| 'QueueEmpty', | ||
| 'QueueShutDown', | ||
| ) | ||
| | ||
| import collections | ||
| import enum | ||
| import heapq | ||
| from types import GenericAlias | ||
| | ||
| | @@ -18,6 +26,17 @@ class QueueFull(Exception): | |
| pass | ||
| | ||
| | ||
| class QueueShutDown(Exception): | ||
| """Raised when putting on to or getting from a shut-down Queue.""" | ||
| pass | ||
| | ||
| | ||
| class _QueueState(enum.Enum): | ||
| ALIVE = "alive" | ||
| SHUTDOWN = "shutdown" | ||
| SHUTDOWN_IMMEDIATE = "shutdown-immediate" | ||
| | ||
| | ||
| class Queue(mixins._LoopBoundMixin): | ||
| """A queue, useful for coordinating producer and consumer coroutines. | ||
| | ||
| | @@ -41,6 +60,7 @@ def __init__(self, maxsize=0): | |
| self._finished = locks.Event() | ||
| self._finished.set() | ||
| self._init(maxsize) | ||
| self._shutdown_state = _QueueState.ALIVE | ||
| | ||
| # These three are overridable in subclasses. | ||
| | ||
| | @@ -81,6 +101,8 @@ def _format(self): | |
| result += f' _putters[{len(self._putters)}]' | ||
| if self._unfinished_tasks: | ||
| result += f' tasks={self._unfinished_tasks}' | ||
| if not self._is_alive(): | ||
| result += f' state={self._shutdown_state.value}' | ||
| return result | ||
| | ||
| def qsize(self): | ||
| | @@ -112,7 +134,11 @@ async def put(self, item): | |
| | ||
| Put an item into the queue. If the queue is full, wait until a free | ||
| slot is available before adding item. | ||
| | ||
| Raises QueueShutDown if the queue has been shut down. | ||
| """ | ||
| if not self._is_alive(): | ||
| raise QueueShutDown | ||
| while self.full(): | ||
| putter = self._get_loop().create_future() | ||
| self._putters.append(putter) | ||
| | @@ -125,20 +151,26 @@ async def put(self, item): | |
| self._putters.remove(putter) | ||
| except ValueError: | ||
| # The putter could be removed from self._putters by a | ||
| # previous get_nowait call. | ||
| # previous get_nowait call or a shutdown call. | ||
| pass | ||
| if not self.full() and not putter.cancelled(): | ||
| # We were woken up by get_nowait(), but can't take | ||
| # the call. Wake up the next in line. | ||
| self._wakeup_next(self._putters) | ||
| raise | ||
| if not self._is_alive(): | ||
| raise QueueShutDown | ||
| return self.put_nowait(item) | ||
| | ||
| def put_nowait(self, item): | ||
| """Put an item into the queue without blocking. | ||
| | ||
| If no free slot is immediately available, raise QueueFull. | ||
| | ||
| Raises QueueShutDown if the queue has been shut down. | ||
| """ | ||
| if not self._is_alive(): | ||
| raise QueueShutDown | ||
| if self.full(): | ||
| raise QueueFull | ||
| self._put(item) | ||
| | @@ -150,8 +182,15 @@ async def get(self): | |
| """Remove and return an item from the queue. | ||
| | ||
| If queue is empty, wait until an item is available. | ||
| | ||
| Raises QueueShutDown if the queue has been shut down and is empty, or | ||
| if the queue has been shut down immediately. | ||
| """ | ||
| if self._is_shutdown_immediate(): | ||
| raise QueueShutDown | ||
| while self.empty(): | ||
| if self._is_shutdown(): | ||
| raise QueueShutDown | ||
| getter = self._get_loop().create_future() | ||
| self._getters.append(getter) | ||
| try: | ||
| | @@ -163,21 +202,31 @@ async def get(self): | |
| self._getters.remove(getter) | ||
| except ValueError: | ||
| # The getter could be removed from self._getters by a | ||
| # previous put_nowait call. | ||
| # previous put_nowait call, | ||
| # or a shutdown call. | ||
| pass | ||
| if not self.empty() and not getter.cancelled(): | ||
| # We were woken up by put_nowait(), but can't take | ||
| # the call. Wake up the next in line. | ||
| self._wakeup_next(self._getters) | ||
| raise | ||
| if self._is_shutdown_immediate(): | ||
| raise QueueShutDown | ||
| return self.get_nowait() | ||
| | ||
| def get_nowait(self): | ||
| """Remove and return an item from the queue. | ||
| | ||
| Return an item if one is immediately available, else raise QueueEmpty. | ||
| | ||
| Raises QueueShutDown if the queue has been shut down and is empty, or | ||
| if the queue has been shut down immediately. | ||
| """ | ||
| if self._is_shutdown_immediate(): | ||
| raise QueueShutDown | ||
| if self.empty(): | ||
EpicWink marked this conversation as resolved. Show resolved Hide resolved | ||
| if self._is_shutdown(): | ||
| raise QueueShutDown | ||
| raise QueueEmpty | ||
| item = self._get() | ||
| self._wakeup_next(self._putters) | ||
| | @@ -196,7 +245,11 @@ def task_done(self): | |
| | ||
| Raises ValueError if called more times than there were items placed in | ||
| the queue. | ||
| | ||
| Raises QueueShutDown if the queue has been shut down immediately. | ||
| """ | ||
| if self._is_shutdown_immediate(): | ||
| raise QueueShutDown | ||
| if self._unfinished_tasks <= 0: | ||
| raise ValueError('task_done() called too many times') | ||
| self._unfinished_tasks -= 1 | ||
| | @@ -210,9 +263,57 @@ async def join(self): | |
| queue. The count goes down whenever a consumer calls task_done() to | ||
| indicate that the item was retrieved and all work on it is complete. | ||
| When the count of unfinished tasks drops to zero, join() unblocks. | ||
| | ||
| Raises QueueShutDown if the queue has been shut down immediately. | ||
| """ | ||
| if self._is_shutdown_immediate(): | ||
| raise QueueShutDown | ||
| if self._unfinished_tasks > 0: | ||
| await self._finished.wait() | ||
| if self._is_shutdown_immediate(): | ||
| raise QueueShutDown | ||
| | ||
| def shutdown(self, immediate=False): | ||
| """Shut-down the queue, making queue gets and puts raise. | ||
| | ||
| By default, gets will only raise once the queue is empty. Set | ||
| 'immediate' to True to make gets raise immediately instead. | ||
| | ||
| All blocked callers of put() will be unblocked, and also get() | ||
| and join() if 'immediate'. The QueueShutDown exception is raised. | ||
| """ | ||
| Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Docstring to modify depending of agree/disagree on my first remark about `blocked callers'. | ||
| if self._is_shutdown_immediate(): | ||
| return | ||
| # here _shutdown_state is ALIVE or SHUTDOWN | ||
| if immediate: | ||
| self._set_shutdown_immediate() | ||
| while self._getters: | ||
| getter = self._getters.popleft() | ||
| if not getter.done(): | ||
| getter.set_result(None) | ||
| # Release all 'blocked' tasks/coros in `join()` | ||
| self._finished.set() | ||
| else: | ||
| self._set_shutdown() | ||
| while self._putters: | ||
| putter = self._putters.popleft() | ||
| if not putter.done(): | ||
| putter.set_result(None) | ||
| | ||
| def _is_alive(self): | ||
| return self._shutdown_state is _QueueState.ALIVE | ||
| | ||
| def _is_shutdown(self): | ||
| return self._shutdown_state is _QueueState.SHUTDOWN | ||
| | ||
| def _is_shutdown_immediate(self): | ||
| return self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE | ||
| | ||
| def _set_shutdown(self): | ||
| self._shutdown_state = _QueueState.SHUTDOWN | ||
| | ||
| def _set_shutdown_immediate(self): | ||
| self._shutdown_state = _QueueState.SHUTDOWN_IMMEDIATE | ||
| | ||
| | ||
| class PriorityQueue(Queue): | ||
| | ||
Uh oh!
There was an error while loading. Please reload this page.