Skip to content

Commit e1c71fd

Browse files
authored
Merge pull request #4 from robertwb/master
Eagerly shut down workers on workerpool shutdown when work queue is empty.
2 parents 4c647bf + 99cc024 commit e1c71fd

File tree

1 file changed

+16
-2
lines changed

1 file changed

+16
-2
lines changed

collapsing_thread_pool_executor/collapsing_thread_pool_executor.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ def run(self):
125125
# - The interpreter is shutting down OR
126126
# - The executor that owns the worker has been collected OR
127127
# - The executor that owns the worker has been shutdown.
128-
if _shutdown or executor is None:
128+
if _shutdown or executor is None or executor._shutdown:
129129
return
130130

131131
del executor
@@ -171,11 +171,16 @@ def __init__(self, max_workers=None, thread_name_prefix=None,
171171
)
172172
self._work_queue_thread.daemon = True
173173
self._work_queue_thread.start()
174+
self._work_queue_finished = False
174175

175176
_thread_pools.add(self)
176177

177178
def _worker_available(self, worker):
178-
self._available_workers_queue.put(worker)
179+
if self._work_queue_finished:
180+
# wake the worker to exit right away
181+
worker.work_item_available_event.set()
182+
else:
183+
self._available_workers_queue.put(worker)
179184

180185
def _cleanup_threads(self):
181186
last_num_workers = -1
@@ -225,6 +230,15 @@ def _handle_work_queue(self):
225230
try:
226231
work_item = self._work_queue.get(timeout=5)
227232
if work_item is None: # shutdown commanded
233+
# wake all the workers so they exit quickly
234+
self._work_queue_finished = True
235+
try:
236+
while True:
237+
w = self._available_workers_queue.get_nowait()
238+
if w:
239+
w.work_item_available_event.set()
240+
except queue.Empty:
241+
pass
228242
return
229243
except queue.Empty:
230244
continue

0 commit comments

Comments
 (0)