Skip to content

Conversation

@pablogsal
Copy link
Member

@pablogsal pablogsal commented Jan 9, 2019

This is a very simple fix for this problem using Process.sentinel. If you want a more sophisticated solution, please advice.

import multiprocessing import time CONCURRENCY = 1 NTASK = 100 def noop(): pass with multiprocessing.Pool(CONCURRENCY, maxtasksperchild=1) as pool: start_time = time.monotonic() results = [pool.apply_async(noop, ()) for _ in range(NTASK)] for result in results: result.get() dt = time.monotonic() - start_time pool.terminate() pool.join() print("Total: %.1f sec" % dt)

Before this PR

Total: 10.2 sec 

After this PR:

Total: 0.5 sec 

https://bugs.python.org/issue35493

while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
pool._maintain_pool()
time.sleep(0.1)
pool._wait_for_updates(timeout=0.2)
Copy link
Member Author

@pablogsal pablogsal Jan 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout is needed for detecting changes in thread._state

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asyncio uses the "self-pipe" pattern to wake up itself when it gets an event from a different thread or when it gets a Unix signal. Would it be possible to use a self-pipe (or something else) to wake up the wait when thread._state changes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need to manage also the case when pool._cache is empty.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vstinner I have used the self-pipe pattern to receive notifications on pool._cache and thread._state changes. I maintained the 0.2 timeout for making sure the old behaviour is maintained if some change is not notified using the self._change_notifier queue (by mistake or because of external reasons).

Why 0.2 and not 0.1 or 1.0? I understand that replacing 0.1 with 0.2 doubles the latency of thread pool. Am I right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used 0.2 to maintain backwards compatibility with the old behaviour. See my other comment explaining why I maintained the timeout.

@pablogsal
Copy link
Member Author

pablogsal commented Jan 12, 2019

@vstinner I have used the self-pipe pattern to receive notifications on pool._cache and thread._state changes. I maintained the 0.2 timeout for making sure the old behaviour is maintained if some change is not notified using the self._change_notifier queue (by mistake or because of external reasons).

while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
pool._maintain_pool()
time.sleep(0.1)
pool._wait_for_updates(timeout=0.2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vstinner I have used the self-pipe pattern to receive notifications on pool._cache and thread._state changes. I maintained the 0.2 timeout for making sure the old behaviour is maintained if some change is not notified using the self._change_notifier queue (by mistake or because of external reasons).

Why 0.2 and not 0.1 or 1.0? I understand that replacing 0.1 with 0.2 doubles the latency of thread pool. Am I right?


@classmethod
def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, pool_notifier,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would prefer to reuse the same variable name: pool_notifier => change_notifier.

@bedevere-bot
Copy link

When you're done making the requested changes, leave the comment: I have made the requested changes; please review again.

@pablogsal pablogsal force-pushed the bpo35493 branch 8 times, most recently from 7ba415b to fc3fa24 Compare January 14, 2019 22:09
Copy link
Member

@vstinner vstinner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wow, any multiprocessing is so complex... I'm happy with @pablogsal handles it instead of me :-D

*self_notifier_sentinels]
wait(sentinels, timeout=timeout)
while not self._change_notifier.empty():
self._change_notifier.get()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this code is safe, it looks like a race condition: https://en.wikipedia.org/wiki/Time_of_check_to_time_of_use

I suggest to call get(block=False) in a loop until you get an Empty exception.

Note: the race condition is not really critical, since it's fine if we miss a few events.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no race condition as long as this is the only thread that pops from the queue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, sadly queue.SimpleQueue has no block=False option (we could add one, but I think is not needed).

sentinels = [*task_queue_sentinels,
*worker_sentinels,
*self_notifier_sentinels]
wait(sentinels, timeout=timeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a comment on wait() to explain that it completes when at least one sentinel is set and that it's important to not wait until all sentinels completed, but exit frequently to refresh the pool.

This point is non-trivial and it surprised me when I wrote PR #11136, my comment #11136 (comment):

My change doesn't work: self._worker_state_event isn't set when a worker completes, whereas _maintain_pool() should be called frequently to check when a worker completed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't ask to documen the behavior of wait, but more explicit that we stop as soon as the first event complete on purpose.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks obvious to me, especially as the function is named wait_for_updates, but I guess it doesn't hurt to add a comment.

@@ -0,0 +1,3 @@
Use :func:`multiprocessing.connection.wait` instead of polling each 0.2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I clicked on the wrong button :-( Your NEWS entry still doesn't explain that only process pools are affected.

Another issue: "polling each 0.2 seconds": currently the code uses 0.1 seconds.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I completely forgot about that while fighting Windows issues :/

@pablogsal
Copy link
Member Author

I made some changes to eliminate some problems that I found on Windows. I have run all multiprocessing test in a loop with this patch manually on almost all our Windows buildbots and it passes without problems, so I think the pipe/socket solution is resilient also on Windows.

@pablogsal
Copy link
Member Author

@vstinner @pitrou I think I have addressed all the review comments. Could you review again? Thanks!

@pablogsal pablogsal force-pushed the bpo35493 branch 2 times, most recently from f271edb to 71d5fc9 Compare February 12, 2019 20:11
@pablogsal
Copy link
Member Author

pablogsal commented Feb 12, 2019

@vstinner @pitrou I had to rebase since the changes in https://bugs.python.org/issue35378 make the handle_workers independent on the pool itself, so we cannot rely on the thread keeping the pool alive or to use self in the waiting_for_updates function. Some new changes are needed, for example, gathering the sentinels needs to be done in the constructor to avoid references to self:

https://github.com/python/cpython/pull/11488/files#diff-2d95253d6de7bbeebbeb131c5f3aecd9R213

the if blocks are needed to make the ThreadPool constructor not crash.

Also, the minimal timeout needs to still exist because now that the thread does not keep the pool alive, it needs a wait to exit from _wait_for_updates if the pool does not unblock the sentinels because is dead

To avoid hanging if the pool dies too quickly, I have changed the del to push a notification to unblock the worker thread:

https://github.com/python/cpython/pull/11488/files#diff-2d95253d6de7bbeebbeb131c5f3aecd9R269

Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few minor points below.

else:
self_notifier_sentinels = []

sentinels = [*task_queue_sentinels, *self_notifier_sentinels]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about a dedicated method that returns this list and that you can override in ThreadPool?

maxtasksperchild, wrap_exception)

worker_sentinels = [worker.sentinel for worker in
pool if hasattr(worker, "sentinel")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: how about a dedicated method to get worker sentinels that you can override in ThreadPool?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is that we don't have the reference to the pool object (self) after https://bugs.python.org/issue35378 so we can only call class/static methods. The most we can do here is make a static/class method that takes the list of workers (called pool here) and then it returns the list of sentinels.

What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds fine to me.

self._setup_queues()
self._taskqueue = queue.SimpleQueue()
self._cache = {}
# The _change_notifier queue exist to wake ip self._handle_workers()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"wake up"

self._taskqueue = queue.SimpleQueue()
self._cache = {}
# The _change_notifier queue exist to wake ip self._handle_workers()
# when the cache (self._cache) is empty or when ther is a change in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"when there is"

def __delitem__(self, item):
super().__delitem__(item)
if not self:
self.notifier.put(None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment explaining why it's important to wake up when the cache is emptied?

@vstinner
Copy link
Member

I tested manually example from https://bugs.python.org/issue35493#msg331797:

  • Without the change: 10.2 sec
  • With the change: 0.3 sec

It's 34x faster, nice :-)

Copy link
Member

@vstinner vstinner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but I would prefer that @pitrou or @applio also review the change.

LGTM means that I reviewed the change and it seems like you covered any changes which can wake up _wait_for_updates().

It seems like @pitrou proposed a different implementation, but I don't recall care of this level of detail. I let you deal with that :-)

I tested manually that the PR fix the bug that I reported: see my previous comment and https://bugs.python.org/issue35493#msg331797 initial message.

if self._state == RUN:
_warn(f"unclosed running multiprocessing pool {self!r}",
ResourceWarning, source=self)
if getattr(self, '_change_notifier') is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just self._change_notifier?

getattr(obj, attr) raises AttributeError if the attribute doesn't exist. Maybe you want to write getattr(self, '_change_notifier', None)?

"""
def __init__(self, *args, notifier=None, **kwds):
self.notifier = notifier
super().__init__(*args, **kwds)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PEP 8, please add an empty line between methods.

@pablogsal
Copy link
Member Author

@pitrou Commit 41cf470 should address everything. Could you take a final look?

@pablogsal pablogsal requested a review from pitrou March 10, 2019 22:17
@pitrou
Copy link
Member

pitrou commented Mar 16, 2019

I added a minor comment, otherwise LGTM. Thanks @pablogsal !

@pablogsal pablogsal merged commit 7c99454 into python:master Mar 16, 2019
@bedevere-bot
Copy link

@pablogsal: Please replace # with GH- in the commit message next time. Thanks!

@pablogsal pablogsal deleted the bpo35493 branch March 16, 2019 22:34
@pablogsal
Copy link
Member Author

Hummm...weird. GitHub has notified me that there was a problem merging the PR and when I clicked "retry" it has not used the message I wrote for the commit.

Anyway, thank you everyone that participated in the review :)

@rgommers
Copy link
Contributor

rgommers commented Apr 10, 2020

@pablogsal this seems to have caused https://bugs.python.org/issue38501 (hangs on both macOS and Windows with Python 3.8). That bug shows up in multiple SciPy modules as hangs (e.g. scipy/scipy#11835). Could you please have a look at [bpo-38501](https://bugs.python.org/issue38501)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

6 participants