Skip to content

Commit 8d72dc8

Browse files
committed
Recompute worker sentinels every time
1 parent e1ee023 commit 8d72dc8

File tree

1 file changed

+11
-12
lines changed

1 file changed

+11
-12
lines changed

Lib/multiprocessing/pool.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -218,14 +218,8 @@ def __init__(self, processes=None, initializer=None, initargs=(),
218218
self_notifier_sentinels = [self._change_notifier._reader]
219219
else:
220220
self_notifier_sentinels = []
221-
try:
222-
worker_sentinels = [worker.sentinel for worker in self._pool]
223-
except AttributeError:
224-
worker_sentinels = []
225221

226-
sentinels = [*task_queue_sentinels,
227-
*worker_sentinels,
228-
*self_notifier_sentinels]
222+
sentinels = [*task_queue_sentinels, *self_notifier_sentinels]
229223

230224
self._worker_handler = threading.Thread(
231225
target=Pool._handle_workers,
@@ -490,7 +484,7 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
490484
return result
491485

492486
@staticmethod
493-
def _wait_for_updates(sentinels, change_notifier, timeout=0.1):
487+
def _wait_for_updates(sentinels, change_notifier, timeout=None):
494488
wait(sentinels, timeout=timeout)
495489
while not change_notifier.empty():
496490
change_notifier.get()
@@ -506,9 +500,14 @@ def _handle_workers(cls, cache, taskqueue, ctx, Process, processes,
506500
# is terminated.
507501
while thread._state == RUN or (cache and thread._state != TERMINATE):
508502
cls._maintain_pool(ctx, Process, processes, pool, inqueue,
509-
outqueue, initializer, initargs,
510-
maxtasksperchild, wrap_exception)
511-
cls._wait_for_updates(sentinels, change_notifier)
503+
outqueue, initializer, initargs,
504+
maxtasksperchild, wrap_exception)
505+
506+
worker_sentinels = [worker.sentinel for worker in
507+
pool if hasattr(worker, "sentinel")]
508+
current_sentinels = [*worker_sentinels, *sentinels]
509+
510+
cls._wait_for_updates(current_sentinels, change_notifier)
512511
# send sentinel to stop workers
513512
taskqueue.put(None)
514513
util.debug('worker handler exiting')
@@ -930,5 +929,5 @@ def _help_stuff_finish(inqueue, task_handler, size):
930929
for i in range(size):
931930
inqueue.put(None)
932931

933-
def _wait_for_updates(self, sentinels, change_notifier, timeout=0.2):
932+
def _wait_for_updates(self, sentinels, change_notifier, timeout):
934933
time.sleep(timeout)

0 commit comments

Comments
 (0)