I have a thread that start (and eventually stops) a asyncio loop like so:
class Ook(Thread): […] def run(self): try: self._logger.debug("Asyncio loop runs forever.") self.loop.run_forever() except Exception as ex: # We do want to see ALL unhandled exceptions here. self._logger.error("Exception raised: %s", ex) self._logger.exception(ex) finally: # Stop the loop! self._logger.warn('Closing asyncio event loop.') self.loop.run_until_complete(self.loop.shutdown_asyncgens()) self.loop.close() def stop(self): self._logger.info("Thread has been asked to stop!") if self.loop.is_running(): self._logger.debug("Asked running asyncio loop to stop.") for task in asyncio.Task.all_tasks(): self.loop.call_soon_threadsafe(task.cancel) self.loop.call_soon_threadsafe(self.loop.stop) A silly (?) unit test to check that works is
@pytest.mark.asyncio async def test_start_and_stop_thread(): sut = Ook() sut.start() if sut.isAlive(): sut.stop() sut.join() assert not sut.isAlive() assert not sut.loop.is_running() This does not work because of raised asyncio.CancelledError… Catching those anywhere in the stop method does not seem to help.
If I run the test code not marked with @pytest.mark.asyncio, I get a message saying Task was destroyed but it is pending!.
What am I doing wrong?
isAlive?stopbeforeruncompletes. Unfortunately it seems not.