4

I have a script which must execute some shell commands. However if command takes too long to complete it must be forcibly killed. Consider the following code snippet:

import asyncio, random q = asyncio.Queue() MAX_WAIT = 5 @asyncio.coroutine def blocking_task(sec): print('This task will sleep {} sec.'.format(sec)) create = asyncio.create_subprocess_shell( 'sleep {s}; echo "Woke up after {s} sec." >> ./tst.log'.format(s=sec), stdout=asyncio.subprocess.PIPE) proc = yield from create yield from proc.wait() @asyncio.coroutine def produce(): while True: q.put_nowait(random.randint(3,8)) yield from asyncio.sleep(0.5 + random.random()) @asyncio.coroutine def consume(): while True: value = yield from q.get() try: yield from asyncio.wait_for(blocking_task(value), MAX_WAIT) except asyncio.TimeoutError: print('~/~ Job has been cancelled !!') else: print('=/= Job has been done :]') loop = asyncio.get_event_loop() asyncio.ensure_future(produce()) asyncio.ensure_future(consume()) loop.run_forever() 

This code produce the following output:

This task will sleep 4 sec. =/= Job has been done :] This task will sleep 8 sec. ~/~ Job has been cancelled !! This task will sleep 5 sec. ~/~ Job has been cancelled !! 

So it seems that it's working as expected, jobs are stopped if they take too long to finish. But if I check the log I can confirm that however time consuming tasks continued running and were not actually stopped / killed / aborted:

Woke up after 4 sec. Woke up after 8 sec. Woke up after 5 sec. 

I would expect there should be just one line in the log, as other processes must have been aborted before they had a chance to finish:

Woke up after 4 sec. 

Is there a way to achieve what I want?

I'm not even sure if I need asyncio here, perhaps concurrent.futures could be used too. Either way task is the same - terminate tasks, which are taking too much time to finish.

1 Answer 1

1

You could use Process.terminate:

try: yield from proc.wait() except asyncio.CancelledError: proc.terminate() raise 

Or:

try: yield from proc.wait() finally: if proc.returncode is None: proc.terminate() 

EDIT

Why I didn't see asyncio.CancelledError raised in my code?

When asyncio.wait_for (or anything else) cancels a task, it throws a CancelledError in the corresponding coroutine. This allows the coroutine to perform some clean up if necessary (using a context manager or a try/finally clause for instance). This error does not need to be logged since it is the normal behavior of a canceled task. But try to await a task after cancelling it and a CancelledError will be raised.

Sign up to request clarification or add additional context in comments.

3 Comments

That worked really beautifully, thank you! I just don't quite understand, why I didn't see asyncio.CancelledError raised in my code? The output was clean as shown, without any exceptions.
in your first example, what will raise CancelledError? Looks like proc.wait() will wait forever if the process doesn't die.
@Rugnar asyncio.wait_for calls task.cancel which causes the task to stop its current operation with a CancelledError.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.