6
 from twisted.internet import reactor from twisted.internet import threads from twisted.internet import defer import time def worker(arg): print 'Hello world' time.sleep(10) return 1 def run(): print 'Starting workers' l = [] for x in range(2): l.append(threads.deferToThread(worker, x)) return defer.DeferredList(l) def res(results): print results reactor.stop() d = run() d.addCallback(res) reactor.run() 

How to stop workers by timeout ?

0

4 Answers 4

5

Threads cannot be interrupted unless they cooperate with you. time.sleep(10) is not going to cooperate, so I don't think you can interrupt this worker. If you have another kind of worker that has several discrete phases, or operates in a loop over some tasks, then you can do something like this:

def worker(stop, jobs): for j in jobs: if stop: break j.do() stop = [] d = deferToThread(worker) # This will make the list eval to true and break out of the loop. stop.append(None) 

This isn't Twisted specific, either. This is just how threads work in Python.

Sign up to request clarification or add additional context in comments.

4 Comments

As of 2019, this response should no longer be the accepted answer as there is a twisted call specifically made for this use case. twistedmatrix.com/documents/current/api/…
Not so. Deferreds can have a timeout added but threads still cannot be interrupted without cooperation.
The title is unambiguous: "How to add timeout to twisted defer". The original user may have been confused about the cooperative nature of threads, but that's kind of aside the point. Right now, that specific search yields this SO answer as the accepted one.
Fixed the title for you
3

While it may not be possible to interrupt the threads, the Deferred can be stopped via the cancel function, which I think is available in Twisted 10.1.0 and later.

I've used the following class to make Deferreds that callback a particular function if the Deferred hasn't fired after some time. It might be useful for someone that has the same question as that posed in the subject of the OP.

EDIT: As suggested by the comments below, it's best not to inherit from defer.Deferred. Therefore, I've changed the code to use a wrapper that achieves the same effect.

class DeferredWrapperWithTimeout(object): ''' Holds a deferred that allows a specified function to be called-back if the deferred does not fire before some specified timeout. ''' def __init__(self, canceller=None): self._def = defer.Deferred(canceller) def _finish(self, r, t): ''' Function to be called (internally) after the Deferred has fired, in order to cancel the timeout. ''' if ( (t!=None) and (t.active()) ): t.cancel() return r def getDeferred(self): return self._def def addTimeoutCallback(self, reactr, timeout, callUponTimeout, *args, **kw): ''' The function 'callUponTimeout' (with optional args or keywords) will be called after 'timeout' seconds, unless the Deferred fires. ''' def timeoutCallback(): self._def.cancel() callUponTimeout(*args, **kw) toc = reactr.callLater(timeout, timeoutCallback) return self._def.addCallback(self._finish, toc) 

Example callback before timeout:

from twisted.internet import reactor from DeferredWithTimeout import * dw = DeferredWrapperWithTimeout() d = dw.getDeferred() def testCallback(x=None): print "called" def testTimeout(x=None): print "timedout" d.addCallback(testCallback) dw.addTimeoutCallback(reactor, 20, testTimeout, "to") reactor.callLater(2, d.callback, "cb") reactor.run() 

Prints "called" and nothing else.

Example timeout before callback:

from twisted.internet import reactor from DeferredWithTimeout import * dw = DeferredWrapperWithTimeout() d = dw.getDeferred() def testCallback(x=None): print "called" def testTimeout(x=None): print "timedout" d.addCallback(testCallback) dw.addTimeoutCallback(reactor, 20, testTimeout, "to") reactor.run() 

Prints "timedout" after 20 seconds, and nothing else.

4 Comments

You really shouldn't subclass Deferred. Implement this functionality as a separate helper, not a subclass. pyvideo.org/video/1684/…
In addition to the usual warnings about not subclassing things, Deferred is a particularly bad thing to subclass, because its behavior assumes very specific things about its own implementation, and will not react well to having certain methods overridden.
Thanks for the link to that video! It has totally changed the way I design code.
This answer is outdated as of 2019. Use the official API: twistedmatrix.com/documents/current/api/…
1

We do it like this using a decorator. This method has the advantage that the deferred is cancelled when the timeout is reached. This should somehow become part of the Twisted library imho

from twisted.internet import defer, reactor def timeout(secs): """Decorator to add timeout to Deferred calls""" def wrap(func): @defer.inlineCallbacks def _timeout(*args, **kwargs): raw_d = func(*args, **kwargs) if not isinstance(raw_d, defer.Deferred): defer.returnValue(raw_d) timeout_d = defer.Deferred() times_up = reactor.callLater(secs, timeout_d.callback, None) try: raw_result, timeout_result = yield defer.DeferredList( [raw_d, timeout_d], fireOnOneCallback=True, fireOnOneErrback=True, consumeErrors=True) except defer.FirstError as e: # Only raw_d should raise an exception assert e.index == 0 times_up.cancel() e.subFailure.raiseException() else: # timeout if timeout_d.called: raw_d.cancel() raise Exception("%s secs have expired" % secs) # no timeout times_up.cancel() defer.returnValue(raw_result) return _timeout return wrap 

Comments

0

Well, my answer is not about threads, but as it was said, you can implement timeout functionality as a separate helper:

from twisted.internet import defer def add_watchdog(deferred, timeout=0.05): def callback(value): if not watchdog.called: watchdog.cancel() return value deferred.addBoth(callback) from twisted.internet import reactor watchdog = reactor.callLater(timeout, defer.timeout, deferred) d = defer.Deferred() add_watchdog(d) 

Then you can trap defer.TimeoutError in deferred's errback if you need.

2 Comments

hmmm seems like cancellation is missing
@CarlD'Halluin , care to elaborate, or suggest an edit?

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.