2

The following process runs for ten seconds. I want to kill it after five seconds.

import time def hello(): for _ in range(10): print("hello") time.sleep(1) hello() 

The solution could involve threads, multiprocessing or decorators. Similar questions have been asked before (sorry in advance), but the solutions have all been highly complex. Something as basic-sounding as this should be achievable with just a few lines of code.

2
  • Does it have to be windows? For example, if you run the script in docker or cygwin, then you'd be able to use signal.alarm which is only available on Unix platforms. Commented Jun 27, 2021 at 17:08
  • I'm using Windows unfortunately. Commented Jun 27, 2021 at 17:08

2 Answers 2

3

depends on use case ... for this very specific example

import time def hello(timeout=5): start_time = time.time() for _ in range(10): if time.time()-start_time > timeout: break print("hello") time.sleep(1) hello() 

is one way you could do it

alternatively you could use multiprocessing

import multiprocessing ... if __name__ == "__main__": proc = multiprocessing.Process(target=hello) proc.start() time.sleep(5) proc.terminate() 
Sign up to request clarification or add additional context in comments.

7 Comments

Joran, thanks so much for the response. But your solution is generating the following: "RuntimeError: Attempt to start a new process before the current process has finished its bootstrapping phase. This probably means that you are on Windows and you have forgotten to use the proper idiom in the main module:"
@NedHulton did you guard creating the process inside if __name__ == "__main__"?
just put the proc stuff behind an if __name__ == "__main__": (edited example)
also if you are going to bundle it into an exe you may have to add as the first line inside the main guard multiprocessing.freeze_support() since windows handles forking/spawning quite differrently than linux
Just added the if name == "main": line now. Seems to work. Thanks.
|
2

You can do this with multiprocessing.pool.ThreadPool. This will work both on Windows and Linux.

Basic solution

import multiprocessing.pool try: with multiprocessing.pool.ThreadPool() as pool: pool.apply_async(hello).get(timeout=5) except multiprocessing.TimeoutError: # do something if timeout 

This will run hello in a separate thread. If hello has not terminated after 5 seconds, the separate thread is killed.

  • get will raise multiprocessing.TimeoutError if the separate thread has not completed after timeout seconds.
  • This will immediately kill the separate thread :
    • Exiting the with block triggers the execution of pool.__exit__()
    • I did not find in the documentation an explanation about what pool.__exit__() does, but we can see in its source code that it calls pool.terminate()
    • According to its documentation, pool.terminate() "stops the worker processes immediately without completing outstanding work.". (Note that the documentation talks about worker processes here because it is the documentation for multiprocessing.pool.Pool, which uses processes. ThreadPool provides the same API as Pool but uses threads.)

Arguments

You can pass arguments to your function :

try: with multiprocessing.pool.ThreadPool() as pool: pool.apply_async( hello, (positional_argument, other_positional_argument), {keyword_argument=42, other_keyword_argument=1337} ).get(timeout=5) except multiprocessing.TimeoutError: # do something if timeout 

Return value

You can get the return value of your function :

try: with multiprocessing.pool.ThreadPool() as pool: return_value = pool.apply_async(hello).get(timeout=5) except multiprocessing.TimeoutError: # do something if timeout else: # do something with return_value 

Exceptions

If your function raises an exception, it will be transparently reraised :

try: with multiprocessing.pool.ThreadPool() as pool: pool.apply_async(hello).get(timeout=5) except multiprocessing.TimeoutError: # do something if timeout except SomeExceptionRaisedByHello as e: # do something with e 

Use a process instead of a thread

If you want to run your function in a separate process instead of a separate thread, just use multiprocessing.pool.Pool. It has the same API as multiprocessing.pool.ThreadPool.

Note that in this case you have to use if __name__ == '__main__' :

if __name__ == "__main__": try: with multiprocessing.pool.Pool() as pool: pool.apply_async(hello) except multiprocessing.TimeoutError: # do something if timeout 

Note about concurrent.futures.Executor

The documentation stays the following about ThreadPool :

Users should generally prefer to use concurrent.futures.ThreadPoolExecutor, which has a simpler interface that was designed around threads from the start, and which returns concurrent.futures.Future instances that are compatible with many other libraries, including asyncio.

concurrent.futures has an Executor class that offers similar functionality to multiprocessing's pools. However using an Executor instances to timeout a function like we did with multiprocessing's pool poses the following problem :

With an Executor, it is impossible to kill the separate thread or process when the timeout occurs (see this question). You can wait on your separate thread or process until a timeout occurs and then continue the execution of your main program, but the separate thread or process will still run in parallel until the target function terminates.

This is especially problematic if the function you want to timeout may never terminate or may take a very long time to do so, because its execution will continue even if your main program terminates. In this scenario, instead of terminating, your python scrip will hang for ever at the end of its execution waiting on the separate thread or process executing your target function.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.