Skip to content
25 changes: 23 additions & 2 deletions Doc/library/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,8 @@
free slot was available within that time. Otherwise (*block* is
``False``), put an item on the queue if a free slot is immediately
available, else raise the :exc:`queue.Full` exception (*timeout* is
ignored in that case).
ignored in that case). Raises the :exc:`queue.ShutDown` if the queue has
been shut down.

.. versionchanged:: 3.8
If the queue is closed, :exc:`ValueError` is raised instead of
Expand All @@ -890,7 +891,9 @@
it blocks at most *timeout* seconds and raises the :exc:`queue.Empty`
exception if no item was available within that time. Otherwise (block is
``False``), return an item if one is immediately available, else raise the
:exc:`queue.Empty` exception (*timeout* is ignored in that case).
:exc:`queue.Empty` exception (*timeout* is ignored in that case). Raises
the :exc:`queue.ShutDown` exception if the queue has been shut down and
is empty, or if the queue has been shut down immediately.

.. versionchanged:: 3.8
If the queue is closed, :exc:`ValueError` is raised instead of
Expand All @@ -900,6 +903,21 @@

Equivalent to ``get(False)``.

.. method:: shutdown(immediate=False)

Shut down the queue, making :meth:`~Queue.get` and :meth:`~Queue.put`
raise :exc:`queue.ShutDown`.

By default, :meth:`~Queue.get` on a shut down queue will only raise once
the queue is empty. Set *immediate* to true to make :meth:`~Queue.get`
raise immediately instead.

All blocked callers of :meth:`~Queue.put` will be unblocked. If

Check warning on line 915 in Doc/library/multiprocessing.rst

View workflow job for this annotation

GitHub Actions / Docs / Docs

py:meth reference target not found: Queue.join [ref.meth]
*immediate* is true, also unblock callers of :meth:`~Queue.get` and
:meth:`~Queue.join`.

.. versionadded:: 3.13

:class:`multiprocessing.Queue` has a few additional methods not found in
:class:`queue.Queue`. These methods are usually unnecessary for most
code:
Expand Down Expand Up @@ -988,6 +1006,9 @@
items have been processed (meaning that a :meth:`task_done` call was
received for every item that had been :meth:`~Queue.put` into the queue).

``shutdown(immediate=True)`` calls :meth:`task_done` for each remaining
item in the queue.

Raises a :exc:`ValueError` if called more times than there were items
placed in the queue.

Expand Down
2 changes: 2 additions & 0 deletions Doc/whatsnew/3.14.rst
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,8 @@ multiprocessing
``d |= {'b': 2}`` for proxies of :class:`dict`.

(Contributed by Roy Hyunjin Han for :gh:`103134`.)
* Add :meth:`multiprocessing.Queue.shutdown` for queue termination.
(Contributed by Laurie Opperman in :gh:`104230`.)


operator
Expand Down
58 changes: 55 additions & 3 deletions Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import weakref
import errno

from queue import Empty, Full
from queue import Empty, Full, ShutDown

from . import connection
from . import context
Expand Down Expand Up @@ -48,18 +48,21 @@ def __init__(self, maxsize=0, *, ctx):
# For use by concurrent.futures
self._ignore_epipe = False
self._reset()
self._is_shutdown = ctx.Value('B', False)

if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork)

def __getstate__(self):
context.assert_spawning(self)
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
self._rlock, self._wlock, self._sem, self._opid,
self._is_shutdown)

def __setstate__(self, state):
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._rlock, self._wlock, self._sem, self._opid,
self._is_shutdown) = state
self._reset()

def _after_fork(self):
Expand All @@ -84,10 +87,16 @@ def _reset(self, after_fork=False):
def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if self._is_shutdown.value:
raise ShutDown
if not self._sem.acquire(block, timeout):
if self._is_shutdown.value:
raise ShutDown
raise Full

with self._notempty:
if self._is_shutdown.value:
raise ShutDown
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
Expand All @@ -98,24 +107,34 @@ def get(self, block=True, timeout=None):
raise ValueError(f"Queue {self!r} is closed")
if block and timeout is None:
with self._rlock:
if self._is_shutdown.value and self.empty():
raise ShutDown
res = self._recv_bytes()
self._sem.release()
else:
if block:
deadline = time.monotonic() + timeout
if not self._rlock.acquire(block, timeout):
if self._is_shutdown.value and self.empty():
raise ShutDown
raise Empty
try:
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout):
if self._is_shutdown.value:
raise ShutDown
raise Empty
elif not self._poll():
if self._is_shutdown.value:
raise ShutDown
raise Empty

res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()

# unserialize the data after having released the lock
return _ForkingPickler.loads(res)

Expand All @@ -135,6 +154,25 @@ def get_nowait(self):
def put_nowait(self, obj):
return self.put(obj, False)

def _clear(self):
with self._rlock:
while self._poll():
self._recv_bytes()

def shutdown(self, immediate=False):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
with self._is_shutdown.get_lock():
self._is_shutdown.value = True
if immediate:
self._clear()
# TODO: unblock all getters to check empty (then shutdown)
for _ in range(self._maxsize):
try:
self._sem.release()
except ValueError:
break

def close(self):
self._closed = True
close = self._close
Expand Down Expand Up @@ -328,10 +366,16 @@ def __setstate__(self, state):
def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if self._is_shutdown.value:
raise ShutDown
if not self._sem.acquire(block, timeout):
if self._is_shutdown.value:
raise ShutDown
raise Full

with self._notempty, self._cond:
if self._is_shutdown.value:
raise ShutDown
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
Expand All @@ -350,6 +394,14 @@ def join(self):
if not self._unfinished_tasks._semlock._is_zero():
self._cond.wait()

def _clear(self):
with self._rlock:
while self._poll():
self._recv_bytes()
self._unfinished_tasks.acquire(block=False)
with self._cond:
self._cond.notify_all()

#
# Simplified Queue type -- really just a locked pipe
#
Expand Down
Loading
Loading