Skip to content

Commit 7c99454

Browse files
authored
bpo-35493: Use Process.sentinel instead of sleeping for polling worker status in multiprocessing.Pool (#11488)
* bpo-35493: Use Process.sentinel instead of sleeping for polling worker status in multiprocessing.Pool * Use self-pipe pattern to avoid polling for changes * Refactor some variable names and add comments * Restore timeout and poll * Use reader object only on wait() * Recompute worker sentinels every time * Remove timeout and use change notifier * Refactor some methods to be overloaded by the ThreadPool, document the cache class and fix typos
1 parent 962bdea commit 7c99454

File tree

2 files changed

+80
-11
lines changed

2 files changed

+80
-11
lines changed

Lib/multiprocessing/pool.py

Lines changed: 77 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@
2121
import time
2222
import traceback
2323
import warnings
24+
from queue import Empty
2425

2526
# If threading is available then ThreadPool should be provided. Therefore
2627
# we avoid top-level imports which are liable to fail on some systems.
2728
from . import util
2829
from . import get_context, TimeoutError
30+
from .connection import wait
2931

3032
#
3133
# Constants representing the state of a pool
@@ -145,6 +147,29 @@ def _helper_reraises_exception(ex):
145147
# Class representing a process pool
146148
#
147149

150+
class _PoolCache(dict):
151+
"""
152+
Class that implements a cache for the Pool class that will notify
153+
the pool management threads every time the cache is emptied. The
154+
notification is done by the use of a queue that is provided when
155+
instantiating the cache.
156+
"""
157+
def __init__(self, *args, notifier=None, **kwds):
158+
self.notifier = notifier
159+
super().__init__(*args, **kwds)
160+
161+
def __delitem__(self, item):
162+
super().__delitem__(item)
163+
164+
# Notify that the cache is empty. This is important because the
165+
# pool keeps maintaining workers until the cache gets drained. This
166+
# eliminates a race condition in which a task is finished after the
167+
# the pool's _handle_workers method has enter another iteration of the
168+
# loop. In this situation, the only event that can wake up the pool
169+
# is the cache to be emptied (no more tasks available).
170+
if not self:
171+
self.notifier.put(None)
172+
148173
class Pool(object):
149174
'''
150175
Class which supports an async version of applying functions to arguments.
@@ -165,7 +190,11 @@ def __init__(self, processes=None, initializer=None, initargs=(),
165190
self._ctx = context or get_context()
166191
self._setup_queues()
167192
self._taskqueue = queue.SimpleQueue()
168-
self._cache = {}
193+
# The _change_notifier queue exist to wake up self._handle_workers()
194+
# when the cache (self._cache) is empty or when there is a change in
195+
# the _state variable of the thread that runs _handle_workers.
196+
self._change_notifier = self._ctx.SimpleQueue()
197+
self._cache = _PoolCache(notifier=self._change_notifier)
169198
self._maxtasksperchild = maxtasksperchild
170199
self._initializer = initializer
171200
self._initargs = initargs
@@ -189,12 +218,14 @@ def __init__(self, processes=None, initializer=None, initargs=(),
189218
p.join()
190219
raise
191220

221+
sentinels = self._get_sentinels()
222+
192223
self._worker_handler = threading.Thread(
193224
target=Pool._handle_workers,
194225
args=(self._cache, self._taskqueue, self._ctx, self.Process,
195226
self._processes, self._pool, self._inqueue, self._outqueue,
196227
self._initializer, self._initargs, self._maxtasksperchild,
197-
self._wrap_exception)
228+
self._wrap_exception, sentinels, self._change_notifier)
198229
)
199230
self._worker_handler.daemon = True
200231
self._worker_handler._state = RUN
@@ -221,7 +252,7 @@ def __init__(self, processes=None, initializer=None, initargs=(),
221252
self._terminate = util.Finalize(
222253
self, self._terminate_pool,
223254
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
224-
self._worker_handler, self._task_handler,
255+
self._change_notifier, self._worker_handler, self._task_handler,
225256
self._result_handler, self._cache),
226257
exitpriority=15
227258
)
@@ -233,13 +264,25 @@ def __del__(self, _warn=warnings.warn, RUN=RUN):
233264
if self._state == RUN:
234265
_warn(f"unclosed running multiprocessing pool {self!r}",
235266
ResourceWarning, source=self)
267+
if getattr(self, '_change_notifier', None) is not None:
268+
self._change_notifier.put(None)
236269

237270
def __repr__(self):
238271
cls = self.__class__
239272
return (f'<{cls.__module__}.{cls.__qualname__} '
240273
f'state={self._state} '
241274
f'pool_size={len(self._pool)}>')
242275

276+
def _get_sentinels(self):
277+
task_queue_sentinels = [self._outqueue._reader]
278+
self_notifier_sentinels = [self._change_notifier._reader]
279+
return [*task_queue_sentinels, *self_notifier_sentinels]
280+
281+
@staticmethod
282+
def _get_worker_sentinels(workers):
283+
return [worker.sentinel for worker in
284+
workers if hasattr(worker, "sentinel")]
285+
243286
@staticmethod
244287
def _join_exited_workers(pool):
245288
"""Cleanup after any worker processes which have exited due to reaching
@@ -452,18 +495,28 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
452495
return result
453496

454497
@staticmethod
455-
def _handle_workers(cache, taskqueue, ctx, Process, processes, pool,
456-
inqueue, outqueue, initializer, initargs,
457-
maxtasksperchild, wrap_exception):
498+
def _wait_for_updates(sentinels, change_notifier, timeout=None):
499+
wait(sentinels, timeout=timeout)
500+
while not change_notifier.empty():
501+
change_notifier.get()
502+
503+
@classmethod
504+
def _handle_workers(cls, cache, taskqueue, ctx, Process, processes,
505+
pool, inqueue, outqueue, initializer, initargs,
506+
maxtasksperchild, wrap_exception, sentinels,
507+
change_notifier):
458508
thread = threading.current_thread()
459509

460510
# Keep maintaining workers until the cache gets drained, unless the pool
461511
# is terminated.
462512
while thread._state == RUN or (cache and thread._state != TERMINATE):
463-
Pool._maintain_pool(ctx, Process, processes, pool, inqueue,
464-
outqueue, initializer, initargs,
465-
maxtasksperchild, wrap_exception)
466-
time.sleep(0.1)
513+
cls._maintain_pool(ctx, Process, processes, pool, inqueue,
514+
outqueue, initializer, initargs,
515+
maxtasksperchild, wrap_exception)
516+
517+
current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels]
518+
519+
cls._wait_for_updates(current_sentinels, change_notifier)
467520
# send sentinel to stop workers
468521
taskqueue.put(None)
469522
util.debug('worker handler exiting')
@@ -593,11 +646,13 @@ def close(self):
593646
if self._state == RUN:
594647
self._state = CLOSE
595648
self._worker_handler._state = CLOSE
649+
self._change_notifier.put(None)
596650

597651
def terminate(self):
598652
util.debug('terminating pool')
599653
self._state = TERMINATE
600654
self._worker_handler._state = TERMINATE
655+
self._change_notifier.put(None)
601656
self._terminate()
602657

603658
def join(self):
@@ -622,7 +677,7 @@ def _help_stuff_finish(inqueue, task_handler, size):
622677
time.sleep(0)
623678

624679
@classmethod
625-
def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
680+
def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
626681
worker_handler, task_handler, result_handler, cache):
627682
# this is guaranteed to only be called once
628683
util.debug('finalizing pool')
@@ -638,6 +693,7 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
638693
"Cannot have cache with result_hander not alive")
639694

640695
result_handler._state = TERMINATE
696+
change_notifier.put(None)
641697
outqueue.put(None) # sentinel
642698

643699
# We must wait for the worker handler to exit before terminating
@@ -871,6 +927,13 @@ def _setup_queues(self):
871927
self._quick_put = self._inqueue.put
872928
self._quick_get = self._outqueue.get
873929

930+
def _get_sentinels(self):
931+
return [self._change_notifier._reader]
932+
933+
@staticmethod
934+
def _get_worker_sentinels(workers):
935+
return []
936+
874937
@staticmethod
875938
def _help_stuff_finish(inqueue, task_handler, size):
876939
# drain inqueue, and put sentinels at its head to make workers finish
@@ -881,3 +944,6 @@ def _help_stuff_finish(inqueue, task_handler, size):
881944
pass
882945
for i in range(size):
883946
inqueue.put(None)
947+
948+
def _wait_for_updates(self, sentinels, change_notifier, timeout):
949+
time.sleep(timeout)
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Use :func:`multiprocessing.connection.wait` instead of polling each 0.2
2+
seconds for worker updates in :class:`multiprocessing.Pool`. Patch by Pablo
3+
Galindo.

0 commit comments

Comments
 (0)