137

I want to fire off a function every 0.5 seconds and be able to start and stop and reset the timer. I'm not too knowledgeable of how Python threads work and am having difficulties with the python timer.

However, I keep getting RuntimeError: threads can only be started once when I execute threading.timer.start() twice. Is there a work around for this? I tried applying threading.timer.cancel() before each start.

Pseudo code:

t=threading.timer(0.5,function) while True: t.cancel() t.start() 

16 Answers 16

152

The best way is to start the timer thread once. Inside your timer thread you'd code the following

class MyThread(Thread): def __init__(self, event): Thread.__init__(self) self.stopped = event def run(self): while not self.stopped.wait(0.5): print("my thread") # call a function 

In the code that started the timer, you can then set the stopped event to stop the timer.

stopFlag = Event() thread = MyThread(stopFlag) thread.start() # this will stop the timer stopFlag.set() 
Sign up to request clarification or add additional context in comments.

9 Comments

Then it will finish it's sleep and stop afterwards. There is no way to forcibly suspend a thread in python. This is a design decision made by the python developers. However the net result will be the same. You thread will still run (sleep) for a short while, but it will not perform your function.
Well, actually, if you want to be able to stop the timer thread immediately, just use a threading.Event and wait instead of sleep. Then, to wake it up, just set the event. You don't even need the self.stopped then because you just check the event flag.
The event would be used strictly to interrupt the timer thread. Normally, the event.wait would just timeout and act like a sleep, but if you wanted to stop (or otherwise interrupt the thread) you'd set the thread's event and it would wake up immediately.
I have updated my answer to use event.wait(). Thanks for the suggestions.
just a question, how can I restart the thread after that? calling thread.start() gives me threads can only be started once
|
76

Improving a little on Hans Then's answer, we can just subclass the Timer function. The following becomes our entire "repeat timer" code, and it can be used as a drop-in replacement for threading.Timer with all the same arguments:

from threading import Timer class RepeatTimer(Timer): def run(self): while not self.finished.wait(self.interval): self.function(*self.args, **self.kwargs) 

Usage example:

def dummyfn(msg="foo"): print(msg) timer = RepeatTimer(1, dummyfn) timer.start() time.sleep(5) timer.cancel() 

produces the following output:

foo foo foo foo 

and

timer = RepeatTimer(1, dummyfn, args=("bar",)) timer.start() time.sleep(5) timer.cancel() 

produces

bar bar bar bar 

7 Comments

Will this approach allow me to start/cancel/start/cancel the timer thread?
No. While this approach allows you to do anything you would with an ordinary Timer, you can't do that with an ordinary Timer. Since start/cancel is related to the underlying thread, if you try to .start() a thread which has previously been .cancel()'ed then you will get an exception, RuntimeError: threads can only be started once.
Really elegant solution! Odd that they didn't just include a class that does this.
this solution is very impressive, but I struggled to understand how it was designed from simply reading the Python3 threading Timer interface documentation. The answer appears to build on knowing the implementation by going into the threading.py module itself.
Really nice solution! ...there's actually no need for dummyfn if you only want to call print... RepeatTimer(1, print, args=("my message",)) does the job equally well!
|
38

From Equivalent of setInterval in python:

import threading def setInterval(interval): def decorator(function): def wrapper(*args, **kwargs): stopped = threading.Event() def loop(): # executed in another thread while not stopped.wait(interval): # until stopped function(*args, **kwargs) t = threading.Thread(target=loop) t.daemon = True # stop if the program exits t.start() return stopped return wrapper return decorator 

Usage:

@setInterval(.5) def function(): "..." stop = function() # start timer, the first call is in .5 seconds stop.set() # stop the loop stop = function() # start new timer # ... stop.set() 

Or here's the same functionality but as a standalone function instead of a decorator:

cancel_future_calls = call_repeatedly(60, print, "Hello, World") # ... cancel_future_calls() 

Here's how to do it without using threads.

5 Comments

how would you change the interval when using a decorator? say i want to change .5s at runtime to 1 second or whatever?
@lightxx: just use @setInterval(1).
hm. so either I'm a bit slow or you misunderstood me. I meant at runtime. I know i can change the decorator in the source code any time. what, for example, i had three functions, each decorated with a @setInterval(n). now at runtime i want to change the interval of function 2 but leave functions 1 and 3 alone.
@lightxx: you could use different interface e.g., stop = repeat(every=second, call=your_function); ...; stop().
35

Using timer threads-

from threading import Timer,Thread,Event class perpetualTimer(): def __init__(self,t,hFunction): self.t=t self.hFunction = hFunction self.thread = Timer(self.t,self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t,self.handle_function) self.thread.start() def start(self): self.thread.start() def cancel(self): self.thread.cancel() def printer(): print 'ipsem lorem' t = perpetualTimer(5,printer) t.start() 

this can be stopped by t.cancel()

3 Comments

I believe this code has a bug in the cancel method. When this is called, the thread is either 1) not running or 2) running. In 1) we are waiting to run the function, so cancel will work fine. in 2) we are currently running, so cancel will have no effect on the current execution. furthermore, the current execution reschedules itself so it will not have an effect in future etiher.
This code creates a new thread every time the timer runs down. This is a colossal waste as compared to the accepted answer.
This solution should be avoided, for the reason mentioned above: it creates a new thread every time
19

In the interest of providing a correct answer using Timer as the OP requested, I'll improve upon swapnil jariwala's answer:

from threading import Timer class InfiniteTimer(): """A Timer class that does not stop, unless you want it to.""" def __init__(self, seconds, target): self._should_continue = False self.is_running = False self.seconds = seconds self.target = target self.thread = None def _handle_target(self): self.is_running = True self.target() self.is_running = False self._start_timer() def _start_timer(self): if self._should_continue: # Code could have been running when cancel was called. self.thread = Timer(self.seconds, self._handle_target) self.thread.start() def start(self): if not self._should_continue and not self.is_running: self._should_continue = True self._start_timer() else: print("Timer already started or running, please wait if you're restarting.") def cancel(self): if self.thread is not None: self._should_continue = False # Just in case thread is running and cancel fails. self.thread.cancel() else: print("Timer never started or failed to initialize.") def tick(): print('ipsem lorem') # Example Usage t = InfiniteTimer(0.5, tick) t.start() 

1 Comment

This is a really good framework, but does result in an ever increasing number of _thread.lock objects for each iteration of _start_timer
5

I have changed some code in swapnil-jariwala code to make a little console clock.

from threading import Timer, Thread, Event from datetime import datetime class PT(): def __init__(self, t, hFunction): self.t = t self.hFunction = hFunction self.thread = Timer(self.t, self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t, self.handle_function) self.thread.start() def start(self): self.thread.start() def printer(): tempo = datetime.today() h,m,s = tempo.hour, tempo.minute, tempo.second print(f"{h}:{m}:{s}") t = PT(1, printer) t.start() 

OUTPUT

>>> 11:39:11 11:39:12 11:39:13 11:39:14 11:39:15 11:39:16 ... 

Timer with a tkinter Graphic interface

This code puts the clock timer in a little window with tkinter

from threading import Timer, Thread, Event from datetime import datetime import tkinter as tk app = tk.Tk() lab = tk.Label(app, text="Timer will start in a sec") lab.pack() class perpetualTimer(): def __init__(self, t, hFunction): self.t = t self.hFunction = hFunction self.thread = Timer(self.t, self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t, self.handle_function) self.thread.start() def start(self): self.thread.start() def cancel(self): self.thread.cancel() def printer(): tempo = datetime.today() clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second) try: lab['text'] = clock except RuntimeError: exit() t = perpetualTimer(1, printer) t.start() app.mainloop() 

An example of flashcards game (sort of)

from threading import Timer, Thread, Event from datetime import datetime class perpetualTimer(): def __init__(self, t, hFunction): self.t = t self.hFunction = hFunction self.thread = Timer(self.t, self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t, self.handle_function) self.thread.start() def start(self): self.thread.start() def cancel(self): self.thread.cancel() x = datetime.today() start = x.second def printer(): global questions, counter, start x = datetime.today() tempo = x.second if tempo - 3 > start: show_ans() #print("\n{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second), end="") print() print("-" + questions[counter]) counter += 1 if counter == len(answers): counter = 0 def show_ans(): global answers, c2 print("It is {}".format(answers[c2])) c2 += 1 if c2 == len(answers): c2 = 0 questions = ["What is the capital of Italy?", "What is the capital of France?", "What is the capital of England?", "What is the capital of Spain?"] answers = "Rome", "Paris", "London", "Madrid" counter = 0 c2 = 0 print("Get ready to answer") t = perpetualTimer(3, printer) t.start() 

output:

Get ready to answer >>> -What is the capital of Italy? It is Rome -What is the capital of France? It is Paris -What is the capital of England? ... 

1 Comment

if hFunction is blocking wouldn't this add some delay to subsequent starting times? Maybe you could swap the lines round so that handle_function starts timer first and then calls hFunction?
5

I'm a bit late but here are my two cents:

You can reuse the threading.Timer object by calling its .run() method repeatedly, like so:

class SomeClassThatNeedsATimer: def __init__(...): self.timer = threading.Timer(interval, self.on_timer) self.timer.start() def on_timer(self): print('On timer') self.timer.run() 

2 Comments

This would have been nice, unfortunately if this runs fast, it fails with RecursionError: maximum recursion depth exceeded in comparison
@sdbbs you can modify the code to use a queue and enqueue the run outside the on_timer function to mitigate this recursion depth exception
3

I had to do this for a project. What I ended up doing was start a separate thread for the function

t = threading.Thread(target =heartbeat, args=(worker,)) t.start() 

****heartbeat is my function, worker is one of my arguments****

inside of my heartbeat function:

def heartbeat(worker): while True: time.sleep(5) #all of my code 

So when I start the thread the function will repeatedly wait 5 seconds, run all of my code, and do that indefinitely. If you want to kill the process just kill the thread.

Comments

2

In addition to the above great answers using Threads, in case you have to use your main thread or prefer an async approach - I wrapped a short class around aio_timers Timer class (to enable repeating)

import asyncio from aio_timers import Timer class RepeatingAsyncTimer(): def __init__(self, interval, cb, *args, **kwargs): self.interval = interval self.cb = cb self.args = args self.kwargs = kwargs self.aio_timer = None self.start_timer() def start_timer(self): self.aio_timer = Timer(delay=self.interval, callback=self.cb_wrapper, callback_args=self.args, callback_kwargs=self.kwargs ) def cb_wrapper(self, *args, **kwargs): self.cb(*args, **kwargs) self.start_timer() from time import time def cb(timer_name): print(timer_name, time()) print(f'clock starts at: {time()}') timer_1 = RepeatingAsyncTimer(interval=5, cb=cb, timer_name='timer_1') timer_2 = RepeatingAsyncTimer(interval=10, cb=cb, timer_name='timer_2') 

clock starts at: 1602438840.9690785

timer_1 1602438845.980087

timer_2 1602438850.9806316

timer_1 1602438850.9808934

timer_1 1602438855.9863033

timer_2 1602438860.9868324

timer_1 1602438860.9876585

4 Comments

splendid. :) better than all others. and precise.
@mork why did you import asyncio ?
@eranotzap I think it was needed at the time, for the underlying lib.
This is missing something. I think the event loop. It prints the first line (e.g. "clock starts at...") and exits.
1

I have implemented a class that works as a timer.

I leave the link here in case anyone needs it: https://github.com/ivanhalencp/python/tree/master/xTimer

1 Comment

Whilst this may theoretically answer the question, it would be preferable to include the essential parts of the answer here, and provide the link for reference.
1
from threading import Timer def TaskManager(): #do stuff t = Timer( 1, TaskManager ) t.start() TaskManager() 

Here is small sample, it will help beter understanding how it runs. function taskManager() at the end create delayed function call to it self.

Try to change "dalay" variable and you will able to see difference

from threading import Timer, _sleep # ------------------------------------------ DATA = [] dalay = 0.25 # sec counter = 0 allow_run = True FIFO = True def taskManager(): global counter, DATA, delay, allow_run counter += 1 if len(DATA) > 0: if FIFO: print("["+str(counter)+"] new data: ["+str(DATA.pop(0))+"]") else: print("["+str(counter)+"] new data: ["+str(DATA.pop())+"]") else: print("["+str(counter)+"] no data") if allow_run: #delayed method/function call to it self t = Timer( dalay, taskManager ) t.start() else: print(" END task-manager: disabled") # ------------------------------------------ def main(): DATA.append("data from main(): 0") _sleep(2) DATA.append("data from main(): 1") _sleep(2) # ------------------------------------------ print(" START task-manager:") taskManager() _sleep(2) DATA.append("first data") _sleep(2) DATA.append("second data") print(" START main():") main() print(" END main():") _sleep(2) DATA.append("last data") allow_run = False 

2 Comments

can you also tell a little more as to why this works?
your example was a little confusing, the first codeblock was all you needed to say.
1

I like right2clicky's answer, especially in that it doesn't require a Thread to be torn down and a new one created every time the Timer ticks. In addition, it's an easy override to create a class with a timer callback that gets called periodically. That's my normal use case:

class MyClass(RepeatTimer): def __init__(self, period): super().__init__(period, self.on_timer) def on_timer(self): print("Tick") if __name__ == "__main__": mc = MyClass(1) mc.start() time.sleep(5) mc.cancel() 

Comments

1

This is an alternate implementation using function instead of class. Inspired by @Andrew Wilkins above.

Because wait is more accurate than sleep ( it takes function runtime into account ):

import threading PING_ON = threading.Event() def ping(): while not PING_ON.wait(1): print("my thread %s" % str(threading.current_thread().ident)) t = threading.Thread(target=ping) t.start() sleep(5) PING_ON.set() 

Comments

1

I have come up with another solution with SingleTon class. Please tell me if any memory leakage is here.

import time,threading class Singleton: __instance = None sleepTime = 1 executeThread = False def __init__(self): if Singleton.__instance != None: raise Exception("This class is a singleton!") else: Singleton.__instance = self @staticmethod def getInstance(): if Singleton.__instance == None: Singleton() return Singleton.__instance def startThread(self): self.executeThread = True self.threadNew = threading.Thread(target=self.foo_target) self.threadNew.start() print('doing other things...') def stopThread(self): print("Killing Thread ") self.executeThread = False self.threadNew.join() print(self.threadNew) def foo(self): print("Hello in " + str(self.sleepTime) + " seconds") def foo_target(self): while self.executeThread: self.foo() print(self.threadNew) time.sleep(self.sleepTime) if not self.executeThread: break sClass = Singleton() sClass.startThread() time.sleep(5) sClass.getInstance().stopThread() sClass.getInstance().sleepTime = 2 sClass.startThread() 

Comments

1

In the interest of providing a correct answer using Timer as the OP requested, I'll improve upon https://stackoverflow.com/a/41450617/20750754

from threading import Timer class InfiniteTimer(): """A Timer class that does not stop, unless you want it to.""" def __init__(self, seconds, target, args=[], kwargs=dict()): self._should_continue = False self.is_running = False self.seconds = seconds self.target = target self.args = args self.kwargs = kwargs self.thread = None def _handle_target(self): self.is_running = True self.target(*self.args, **self.kwargs) self.is_running = False self._start_timer() def _start_timer(self): if self._should_continue: # Code could have been running when cancel was called. self.thread = Timer( self.seconds, self._handle_target, ) self.thread.start() def start(self): if not self._should_continue and not self.is_running: self._should_continue = True self._start_timer() else: print( "Timer already started or running, please wait if you're restarting.") def cancel(self): if self.thread is not None: self._should_continue = False # Just in case thread is running and cancel fails. self.thread.cancel() else: print("Timer never started or failed to initialize.") def tick(i): print('ipsem lorem', i) # Example Usage t = InfiniteTimer(0.5, tick, kwargs=dict(i="i")) t.start() 

Comments

0

This is the sample code for running the timer continuously. Just create a new timer upon exhaustion and call the same function. Not the best way to do it but can be done like this as well.

import threading import time class ContinousTimer(): def __init__(self): self.timer = None def run(self, msg='abc'): print(msg) self.timer = threading.Timer(interval=2, function=self.run, args=(msg, )) self.timer.start() if __name__ == "__main__": t = ContinousTimer() try: t.run(msg="Hello") while True: time.sleep(0.1) except KeyboardInterrupt: # Cancel Timer t.timer.cancel() 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.