0

I have a thread that start (and eventually stops) a asyncio loop like so:

class Ook(Thread): […] def run(self): try: self._logger.debug("Asyncio loop runs forever.") self.loop.run_forever() except Exception as ex: # We do want to see ALL unhandled exceptions here. self._logger.error("Exception raised: %s", ex) self._logger.exception(ex) finally: # Stop the loop! self._logger.warn('Closing asyncio event loop.') self.loop.run_until_complete(self.loop.shutdown_asyncgens()) self.loop.close() def stop(self): self._logger.info("Thread has been asked to stop!") if self.loop.is_running(): self._logger.debug("Asked running asyncio loop to stop.") for task in asyncio.Task.all_tasks(): self.loop.call_soon_threadsafe(task.cancel) self.loop.call_soon_threadsafe(self.loop.stop) 

A silly (?) unit test to check that works is

@pytest.mark.asyncio async def test_start_and_stop_thread(): sut = Ook() sut.start() if sut.isAlive(): sut.stop() sut.join() assert not sut.isAlive() assert not sut.loop.is_running() 

This does not work because of raised asyncio.CancelledError… Catching those anywhere in the stop method does not seem to help.

If I run the test code not marked with @pytest.mark.asyncio, I get a message saying Task was destroyed but it is pending!.

What am I doing wrong?

4
  • Try to wait a few seconds before check isAlive? Commented Nov 6, 2017 at 9:39
  • @Sraw Why would that help? In any case, I tried it and it does not. Commented Nov 6, 2017 at 9:41
  • 1
    Because I think it may caused by calling stop before run completes. Unfortunately it seems not. Commented Nov 6, 2017 at 9:44
  • @Sraw That was a fair point. Commented Nov 6, 2017 at 9:56

1 Answer 1

2

We have several issues here.

  1. the Task.cancel() raises a asyncio.CancelledError() inside the couroutine. You should add a "try/exec CancelledError" in your coroutines to handle that exception.

  2. Another way could be suppressing the CancelledError exception in the def stop:

    from asyncio import CancelledError from contextlib import suppress def stop(self): self._logger.info("Thread has been asked to stop!") if self.loop.is_running(): self._logger.debug("Asked running asyncio loop to stop.") self.loop.call_soon_threadsafe(self.loop.stop) for task in asyncio.Task.all_tasks(): task.cancel() with suppress(CancelledError): loop.run_until_complete(task) 
  3. remember to close also all asynchronous generators with

    loop.run_until_complete(loop.shutdown_asyncgens()) 
Sign up to request clarification or add additional context in comments.

2 Comments

Looks like it should be for task in asyncio.Task.all_tasks(self.loop):. Without the param the array returned is empty.
Now asyncio.all_tasks(loop=self.loop)

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.