-
- Notifications
You must be signed in to change notification settings - Fork 33.5k
bpo-35493: Use Process.sentinel instead of sleeping for polling worker status in multiprocessing.Pool #11488
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Lib/multiprocessing/pool.py Outdated
| while thread._state == RUN or (pool._cache and thread._state != TERMINATE): | ||
| pool._maintain_pool() | ||
| time.sleep(0.1) | ||
| pool._wait_for_updates(timeout=0.2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timeout is needed for detecting changes in thread._state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
asyncio uses the "self-pipe" pattern to wake up itself when it gets an event from a different thread or when it gets a Unix signal. Would it be possible to use a self-pipe (or something else) to wake up the wait when thread._state changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would need to manage also the case when pool._cache is empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vstinner I have used the self-pipe pattern to receive notifications on pool._cache and thread._state changes. I maintained the 0.2 timeout for making sure the old behaviour is maintained if some change is not notified using the self._change_notifier queue (by mistake or because of external reasons).
Why 0.2 and not 0.1 or 1.0? I understand that replacing 0.1 with 0.2 doubles the latency of thread pool. Am I right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used 0.2 to maintain backwards compatibility with the old behaviour. See my other comment explaining why I maintained the timeout.
| @vstinner I have used the |
Lib/multiprocessing/pool.py Outdated
| while thread._state == RUN or (pool._cache and thread._state != TERMINATE): | ||
| pool._maintain_pool() | ||
| time.sleep(0.1) | ||
| pool._wait_for_updates(timeout=0.2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vstinner I have used the self-pipe pattern to receive notifications on pool._cache and thread._state changes. I maintained the 0.2 timeout for making sure the old behaviour is maintained if some change is not notified using the self._change_notifier queue (by mistake or because of external reasons).
Why 0.2 and not 0.1 or 1.0? I understand that replacing 0.1 with 0.2 doubles the latency of thread pool. Am I right?
Lib/multiprocessing/pool.py Outdated
| | ||
| @classmethod | ||
| def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, | ||
| def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, pool_notifier, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i would prefer to reuse the same variable name: pool_notifier => change_notifier.
| When you're done making the requested changes, leave the comment: |
7ba415b to fc3fa24 Compare
vstinner left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh wow, any multiprocessing is so complex... I'm happy with @pablogsal handles it instead of me :-D
Lib/multiprocessing/pool.py Outdated
| *self_notifier_sentinels] | ||
| wait(sentinels, timeout=timeout) | ||
| while not self._change_notifier.empty(): | ||
| self._change_notifier.get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that this code is safe, it looks like a race condition: https://en.wikipedia.org/wiki/Time_of_check_to_time_of_use
I suggest to call get(block=False) in a loop until you get an Empty exception.
Note: the race condition is not really critical, since it's fine if we miss a few events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no race condition as long as this is the only thread that pops from the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, sadly queue.SimpleQueue has no block=False option (we could add one, but I think is not needed).
| sentinels = [*task_queue_sentinels, | ||
| *worker_sentinels, | ||
| *self_notifier_sentinels] | ||
| wait(sentinels, timeout=timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add a comment on wait() to explain that it completes when at least one sentinel is set and that it's important to not wait until all sentinels completed, but exit frequently to refresh the pool.
This point is non-trivial and it surprised me when I wrote PR #11136, my comment #11136 (comment):
My change doesn't work: self._worker_state_event isn't set when a worker completes, whereas _maintain_pool() should be called frequently to check when a worker completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait is already documented: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.wait
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't ask to documen the behavior of wait, but more explicit that we stop as soon as the first event complete on purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks obvious to me, especially as the function is named wait_for_updates, but I guess it doesn't hurt to add a comment.
| @@ -0,0 +1,3 @@ | |||
| Use :func:`multiprocessing.connection.wait` instead of polling each 0.2 | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, I clicked on the wrong button :-( Your NEWS entry still doesn't explain that only process pools are affected.
Another issue: "polling each 0.2 seconds": currently the code uses 0.1 seconds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I completely forgot about that while fighting Windows issues :/
| I made some changes to eliminate some problems that I found on Windows. I have run all multiprocessing test in a loop with this patch manually on almost all our Windows buildbots and it passes without problems, so I think the pipe/socket solution is resilient also on Windows. |
f271edb to 71d5fc9 Compare | @vstinner @pitrou I had to rebase since the changes in https://bugs.python.org/issue35378 make the https://github.com/python/cpython/pull/11488/files#diff-2d95253d6de7bbeebbeb131c5f3aecd9R213 the if blocks are needed to make the ThreadPool constructor not crash.
To avoid hanging if the pool dies too quickly, I have changed the del to push a notification to unblock the worker thread: https://github.com/python/cpython/pull/11488/files#diff-2d95253d6de7bbeebbeb131c5f3aecd9R269 |
pitrou left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few minor points below.
Lib/multiprocessing/pool.py Outdated
| else: | ||
| self_notifier_sentinels = [] | ||
| | ||
| sentinels = [*task_queue_sentinels, *self_notifier_sentinels] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about a dedicated method that returns this list and that you can override in ThreadPool?
Lib/multiprocessing/pool.py Outdated
| maxtasksperchild, wrap_exception) | ||
| | ||
| worker_sentinels = [worker.sentinel for worker in | ||
| pool if hasattr(worker, "sentinel")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here: how about a dedicated method to get worker sentinels that you can override in ThreadPool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem here is that we don't have the reference to the pool object (self) after https://bugs.python.org/issue35378 so we can only call class/static methods. The most we can do here is make a static/class method that takes the list of workers (called pool here) and then it returns the list of sentinels.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds fine to me.
Lib/multiprocessing/pool.py Outdated
| self._setup_queues() | ||
| self._taskqueue = queue.SimpleQueue() | ||
| self._cache = {} | ||
| # The _change_notifier queue exist to wake ip self._handle_workers() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"wake up"
Lib/multiprocessing/pool.py Outdated
| self._taskqueue = queue.SimpleQueue() | ||
| self._cache = {} | ||
| # The _change_notifier queue exist to wake ip self._handle_workers() | ||
| # when the cache (self._cache) is empty or when ther is a change in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"when there is"
| def __delitem__(self, item): | ||
| super().__delitem__(item) | ||
| if not self: | ||
| self.notifier.put(None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment explaining why it's important to wake up when the cache is emptied?
| I tested manually example from https://bugs.python.org/issue35493#msg331797:
It's 34x faster, nice :-) |
vstinner left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but I would prefer that @pitrou or @applio also review the change.
LGTM means that I reviewed the change and it seems like you covered any changes which can wake up _wait_for_updates().
It seems like @pitrou proposed a different implementation, but I don't recall care of this level of detail. I let you deal with that :-)
I tested manually that the PR fix the bug that I reported: see my previous comment and https://bugs.python.org/issue35493#msg331797 initial message.
Lib/multiprocessing/pool.py Outdated
| if self._state == RUN: | ||
| _warn(f"unclosed running multiprocessing pool {self!r}", | ||
| ResourceWarning, source=self) | ||
| if getattr(self, '_change_notifier') is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just self._change_notifier?
getattr(obj, attr) raises AttributeError if the attribute doesn't exist. Maybe you want to write getattr(self, '_change_notifier', None)?
| """ | ||
| def __init__(self, *args, notifier=None, **kwds): | ||
| self.notifier = notifier | ||
| super().__init__(*args, **kwds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PEP 8, please add an empty line between methods.
…e cache class and fix typos
| @pitrou Commit 41cf470 should address everything. Could you take a final look? |
| I added a minor comment, otherwise LGTM. Thanks @pablogsal ! |
| @pablogsal: Please replace |
| Hummm...weird. GitHub has notified me that there was a problem merging the PR and when I clicked "retry" it has not used the message I wrote for the commit. Anyway, thank you everyone that participated in the review :) |
| @pablogsal this seems to have caused https://bugs.python.org/issue38501 (hangs on both macOS and Windows with Python 3.8). That bug shows up in multiple SciPy modules as hangs (e.g. scipy/scipy#11835). Could you please have a look at |
This is a very simple fix for this problem using
Process.sentinel. If you want a more sophisticated solution, please advice.Before this PR
After this PR:
https://bugs.python.org/issue35493