56

Are there any exemplary examples of the GoF Observer implemented in Python? I have a bit code which currently has bits of debugging code laced through the key class (currently generating messages to stderr if a magic env is set). Additionally, the class has an interface for incrementally return results as well as storing them (in memory) for post processing. (The class itself is a job manager for concurrently executing commands on remote machines over ssh).

Currently the usage of the class looks something like:

job = SSHJobMan(hostlist, cmd) job.start() while not job.done(): for each in job.poll(): incrementally_process(job.results[each]) time.sleep(0.2) # or other more useful work post_process(job.results) 

An alernative usage model is:

job = SSHJobMan(hostlist, cmd) job.wait() # implicitly performs a start() process(job.results) 

This all works fine for the current utility. However it does lack flexibility. For example I currently support a brief output format or a progress bar as incremental results, I also support brief, complete and "merged message" outputs for the post_process() function.

However, I'd like to support multiple results/output streams (progress bar to the terminal, debugging and warnings to a log file, outputs from successful jobs to one file/directory, error messages and other results from non-successful jobs to another, etc).

This sounds like a situation that calls for Observer ... have instances of my class accept registration from other objects and call them back with specific types of events as they occur.

I'm looking at PyPubSub since I saw several references to that in SO related questions. I'm not sure I'm ready to add the external dependency to my utility but I could see value in using their interface as a model for mine if that's going to make it easier for others to use. (The project is intended as both a standalone command line utility and a class for writing other scripts/utilities).

In short I know how to do what I want ... but there are numerous ways to accomplish it. I want suggestions on what's most likely to work for other users of the code in the long run.

The code itself is at: classh.

9 Answers 9

62

However it does lack flexibility.

Well... actually, this looks like a good design to me if an asynchronous API is what you want. It usually is. Maybe all you need is to switch from stderr to Python's logging module, which has a sort of publish/subscribe model of its own, what with Logger.addHandler() and so on.

If you do want to support observers, my advice is to keep it simple. You really only need a few lines of code.

class Event(object): pass class Observable(object): def __init__(self): self.callbacks = [] def subscribe(self, callback): self.callbacks.append(callback) def fire(self, **attrs): e = Event() e.source = self for k, v in attrs.items(): setattr(e, k, v) for fn in self.callbacks: fn(e) 

Your Job class can subclass Observable. When something of interest happens, call self.fire(type="progress", percent=50) or the like.

Sign up to request clarification or add additional context in comments.

6 Comments

Heh - I was going to remove the comment about logging since I've added another answer with sample code. But now that this answer has the check mark, maybe I will leave it alone. :)
What if I want to observe a bound method of an object? I really like this implementation but I can't see how to apply it to less specific use cases. I posted my implementation below. It is more complex, but it covers observing functions and bound methods with a pretty simple interface.
@DanielSank you could always wrap it in a decorator for a method and it should work the same
Python3: use items instead of iteritems, stackoverflow.com/questions/30418481/…
AttributeError: 'Child' object has no attribute 'callbacks'
|
41

I think people in the other answers overdo it. You can easily achieve events in Python with less than 15 lines of code.

You simple have two classes: Event and Observer. Any class that wants to listen for an event, needs to inherit Observer and set to listen (observe) for a specific event. When an Event is instantiated and fired, all observers listening to that event will run the specified callback functions.

class Observer(): _observers = [] def __init__(self): self._observers.append(self) self._observables = {} def observe(self, event_name, callback): self._observables[event_name] = callback class Event(): def __init__(self, name, data, autofire = True): self.name = name self.data = data if autofire: self.fire() def fire(self): for observer in Observer._observers: if self.name in observer._observables: observer._observables[self.name](self.data) 

Example:

class Room(Observer): def __init__(self): print("Room is ready.") Observer.__init__(self) # Observer's init needs to be called def someone_arrived(self, who): print(who + " has arrived!") room = Room() room.observe('someone arrived', room.someone_arrived) Event('someone arrived', 'Lenard') 

Output:

Room is ready. Lenard has arrived! 

Comments

16

A few more approaches...

Example: the logging module

Maybe all you need is to switch from stderr to Python's logging module, which has a powerful publish/subscribe model.

It's easy to get started producing log records.

# producer import logging log = logging.getLogger("myjobs") # that's all the setup you need class MyJob(object): def run(self): log.info("starting job") n = 10 for i in range(n): log.info("%.1f%% done" % (100.0 * i / n)) log.info("work complete") 

On the consumer side there's a bit more work. Unfortunately configuring logger output takes, like, 7 whole lines of code to do. ;)

# consumer import myjobs, sys, logging if user_wants_log_output: ch = logging.StreamHandler(sys.stderr) ch.setLevel(logging.INFO) formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s") ch.setFormatter(formatter) myjobs.log.addHandler(ch) myjobs.log.setLevel(logging.INFO) myjobs.MyJob().run() 

On the other hand there's an amazing amount of stuff in the logging package. If you ever need to send log data to a rotating set of files, an email address, and the Windows Event Log, you're covered.

Example: simplest possible observer

But you don't need to use any library at all. An extremely simple way to support observers is to call a method that does nothing.

# producer class MyJob(object): def on_progress(self, pct): """Called when progress is made. pct is the percent complete. By default this does nothing. The user may override this method or even just assign to it.""" pass def run(self): n = 10 for i in range(n): self.on_progress(100.0 * i / n) self.on_progress(100.0) # consumer import sys, myjobs job = myjobs.MyJob() job.on_progress = lambda pct: sys.stdout.write("%.1f%% done\n" % pct) job.run() 

Sometimes instead of writing a lambda, you can just say job.on_progress = progressBar.update, which is nice.

This is about as simple as it gets. One drawback is that it doesn't naturally support multiple listeners subscribing to the same events.

Example: C#-like events

With a bit of support code, you can get C#-like events in Python. Here's the code:

# glue code class event(object): def __init__(self, func): self.__doc__ = func.__doc__ self._key = ' ' + func.__name__ def __get__(self, obj, cls): try: return obj.__dict__[self._key] except KeyError, exc: be = obj.__dict__[self._key] = boundevent() return be class boundevent(object): def __init__(self): self._fns = [] def __iadd__(self, fn): self._fns.append(fn) return self def __isub__(self, fn): self._fns.remove(fn) return self def __call__(self, *args, **kwargs): for f in self._fns[:]: f(*args, **kwargs) 

The producer declares the event using a decorator:

# producer class MyJob(object): @event def progress(pct): """Called when progress is made. pct is the percent complete.""" def run(self): n = 10 for i in range(n+1): self.progress(100.0 * i / n) #consumer import sys, myjobs job = myjobs.MyJob() job.progress += lambda pct: sys.stdout.write("%.1f%% done\n" % pct) job.run() 

This works exactly like the "simple observer" code above, but you can add as many listeners as you like using +=. (Unlike C#, there are no event handler types, you don't have to new EventHandler(foo.bar) when subscribing to an event, and you don't have to check for null before firing the event. Like C#, events do not squelch exceptions.)

How to choose

If logging does everything you need, use that. Otherwise do the simplest thing that works for you. The key thing to note is that you don't need to take on a big external dependency.

3 Comments

In the Python code, why is @event a descriptor? Seems like it could just as well be a simple decorator, no?
@miracle2k I don't see how. The point of the descriptor is to support the C# += syntax.
I like the c# like syntax for events + a nice decorator exemple
11

How about an implementation where objects aren't kept alive just because they're observing something? Below please find an implementation of the observer pattern with the following features:

  1. Usage is pythonic. To add an observer to a bound method .bar of instance foo, just do foo.bar.addObserver(observer).
  2. Observers are not kept alive by virtue of being observers. In other words, the observer code uses no strong references.
  3. No sub-classing necessary (descriptors ftw).
  4. Can be used with unhashable types.
  5. Can be used as many times you want in a single class.
  6. (bonus) As of today the code exists in a proper downloadable, installable package on github.

Here's the code (the github package or PyPI package have the most up to date implementation):

import weakref import functools class ObservableMethod(object): """ A proxy for a bound method which can be observed. I behave like a bound method, but other bound methods can subscribe to be called whenever I am called. """ def __init__(self, obj, func): self.func = func functools.update_wrapper(self, func) self.objectWeakRef = weakref.ref(obj) self.callbacks = {} #observing object ID -> weak ref, methodNames def addObserver(self, boundMethod): """ Register a bound method to observe this ObservableMethod. The observing method will be called whenever this ObservableMethod is called, and with the same arguments and keyword arguments. If a boundMethod has already been registered to as a callback, trying to add it again does nothing. In other words, there is no way to sign up an observer to be called back multiple times. """ obj = boundMethod.__self__ ID = id(obj) if ID in self.callbacks: s = self.callbacks[ID][1] else: wr = weakref.ref(obj, Cleanup(ID, self.callbacks)) s = set() self.callbacks[ID] = (wr, s) s.add(boundMethod.__name__) def discardObserver(self, boundMethod): """ Un-register a bound method. """ obj = boundMethod.__self__ if id(obj) in self.callbacks: self.callbacks[id(obj)][1].discard(boundMethod.__name__) def __call__(self, *arg, **kw): """ Invoke the method which I proxy, and all of it's callbacks. The callbacks are called with the same *args and **kw as the main method. """ result = self.func(self.objectWeakRef(), *arg, **kw) for ID in self.callbacks: wr, methodNames = self.callbacks[ID] obj = wr() for methodName in methodNames: getattr(obj, methodName)(*arg, **kw) return result @property def __self__(self): """ Get a strong reference to the object owning this ObservableMethod This is needed so that ObservableMethod instances can observe other ObservableMethod instances. """ return self.objectWeakRef() class ObservableMethodDescriptor(object): def __init__(self, func): """ To each instance of the class using this descriptor, I associate an ObservableMethod. """ self.instances = {} # Instance id -> (weak ref, Observablemethod) self._func = func def __get__(self, inst, cls): if inst is None: return self ID = id(inst) if ID in self.instances: wr, om = self.instances[ID] if not wr(): msg = "Object id %d should have been cleaned up"%(ID,) raise RuntimeError(msg) else: wr = weakref.ref(inst, Cleanup(ID, self.instances)) om = ObservableMethod(inst, self._func) self.instances[ID] = (wr, om) return om def __set__(self, inst, val): raise RuntimeError("Assigning to ObservableMethod not supported") def event(func): return ObservableMethodDescriptor(func) class Cleanup(object): """ I manage remove elements from a dict whenever I'm called. Use me as a weakref.ref callback to remove an object's id from a dict when that object is garbage collected. """ def __init__(self, key, d): self.key = key self.d = d def __call__(self, wr): del self.d[self.key] 

To use this we just decorate methods we want to make observable with @event. Here's an example

class Foo(object): def __init__(self, name): self.name = name @event def bar(self): print("%s called bar"%(self.name,)) def baz(self): print("%s called baz"%(self.name,)) a = Foo('a') b = Foo('b') a.bar.addObserver(b.bar) a.bar() 

1 Comment

I think (in your example) you should call a.bar.addObserver(b.baz) otherwise your method baz would be useless.
5

From wikipedia:

from collections import defaultdict class Observable (defaultdict): def __init__ (self): defaultdict.__init__(self, object) def emit (self, *args): '''Pass parameters to all observers and update states.''' for subscriber in self: response = subscriber(*args) self[subscriber] = response def subscribe (self, subscriber): '''Add a new subscriber to self.''' self[subscriber] def stat (self): '''Return a tuple containing the state of each observer.''' return tuple(self.values()) 

The Observable is used like this.

myObservable = Observable () # subscribe some inlined functions. # myObservable[lambda x, y: x * y] would also work here. myObservable.subscribe(lambda x, y: x * y) myObservable.subscribe(lambda x, y: float(x) / y) myObservable.subscribe(lambda x, y: x + y) myObservable.subscribe(lambda x, y: x - y) # emit parameters to each observer myObservable.emit(6, 2) # get updated values myObservable.stat() # returns: (8, 3.0, 4, 12) 

4 Comments

lambda before_emit=object(): before_emit might be a better default than object for defaultdict.__init__(self, default). Thus all values before emit() is called for the first time are the same and they are distinct from any possible response from subscribers.
@JFS I'm sure they let you improve the wikipedia examples.
I read the Wikipedia example before I posted. It didn't seem applicable to my needs nor does it seem to be describing some common or best practice from the Python community. Also I think the use of defaultdict and self[subscriber] is inscrutable as well as precluded by my choice to be compatible with Python 2.4.x (as commonly shipped with Linux distributions --- my primary intended audience).
@Jim Denis: Please update your question with this comment. This is information that changes the answers.
4

Based on Jason's answer, I implemented the C#-like events example as a fully-fledged python module including documentation and tests. I love fancy pythonic stuff :)

So, if you want some ready-to-use solution, you can just use the code on github.

Comments

4

OP asks "Are there any exemplary examples of the GoF Observer implemented in Python?" This is an example in Python 3.7. This Observable class meets the requirement of creating a relationship between one observable and many observers while remaining independent of their structure.

from functools import partial from dataclasses import dataclass, field import sys from typing import List, Callable @dataclass class Observable: observers: List[Callable] = field(default_factory=list) def register(self, observer: Callable): self.observers.append(observer) def deregister(self, observer: Callable): self.observers.remove(observer) def notify(self, *args, **kwargs): for observer in self.observers: observer(*args, **kwargs) def usage_demo(): observable = Observable() # Register two anonymous observers using lambda. observable.register( lambda *args, **kwargs: print(f'Observer 1 called with args={args}, kwargs={kwargs}')) observable.register( lambda *args, **kwargs: print(f'Observer 2 called with args={args}, kwargs={kwargs}')) # Create an observer function, register it, then deregister it. def callable_3(): print('Observer 3 NOT called.') observable.register(callable_3) observable.deregister(callable_3) # Create a general purpose observer function and register four observers. def callable_x(*args, **kwargs): print(f'{args[0]} observer called with args={args}, kwargs={kwargs}') for gui_field in ['Form field 4', 'Form field 5', 'Form field 6', 'Form field 7']: observable.register(partial(callable_x, gui_field)) observable.notify('test') if __name__ == '__main__': sys.exit(usage_demo()) 

1 Comment

This is a beautiful implantation of the pattern.Thank you
2

Example: twisted log observers

To register an observer yourCallable() (a callable that accepts a dictionary) to receive all log events (in addition to any other observers):

twisted.python.log.addObserver(yourCallable) 

Example: complete producer/consumer example

From Twisted-Python mailing list:

#!/usr/bin/env python """Serve as a sample implementation of a twisted producer/consumer system, with a simple TCP server which asks the user how many random integers they want, and it sends the result set back to the user, one result per line.""" import random from zope.interface import implements from twisted.internet import interfaces, reactor from twisted.internet.protocol import Factory from twisted.protocols.basic import LineReceiver class Producer: """Send back the requested number of random integers to the client.""" implements(interfaces.IPushProducer) def __init__(self, proto, cnt): self._proto = proto self._goal = cnt self._produced = 0 self._paused = False def pauseProducing(self): """When we've produced data too fast, pauseProducing() will be called (reentrantly from within resumeProducing's transport.write method, most likely), so set a flag that causes production to pause temporarily.""" self._paused = True print('pausing connection from %s' % (self._proto.transport.getPeer())) def resumeProducing(self): self._paused = False while not self._paused and self._produced < self._goal: next_int = random.randint(0, 10000) self._proto.transport.write('%d\r\n' % (next_int)) self._produced += 1 if self._produced == self._goal: self._proto.transport.unregisterProducer() self._proto.transport.loseConnection() def stopProducing(self): pass class ServeRandom(LineReceiver): """Serve up random data.""" def connectionMade(self): print('connection made from %s' % (self.transport.getPeer())) self.transport.write('how many random integers do you want?\r\n') def lineReceived(self, line): cnt = int(line.strip()) producer = Producer(self, cnt) self.transport.registerProducer(producer, True) producer.resumeProducing() def connectionLost(self, reason): print('connection lost from %s' % (self.transport.getPeer())) factory = Factory() factory.protocol = ServeRandom reactor.listenTCP(1234, factory) print('listening on 1234...') reactor.run() 

2 Comments

I like how they document the restrictions on observers (and how they will remove any exception raising callables from the subscriber list. I'm not sure that "addObservable()" is my preferred name for this; but I'll defer to the community on that if I hear a call for it.
@Jim Dennis: Observer and Observable are different beasts. The former observes the latter one.
1

A functional approach to observer design:

def add_listener(obj, method_name, listener): # Get any existing listeners listener_attr = method_name + '_listeners' listeners = getattr(obj, listener_attr, None) # If this is the first listener, then set up the method wrapper if not listeners: listeners = [listener] setattr(obj, listener_attr, listeners) # Get the object's method method = getattr(obj, method_name) @wraps(method) def method_wrapper(*args, **kwags): method(*args, **kwags) for l in listeners: l(obj, *args, **kwags) # Listener also has object argument # Replace the original method with the wrapper setattr(obj, method_name, method_wrapper) else: # Event is already set up, so just add another listener listeners.append(listener) def remove_listener(obj, method_name, listener): # Get any existing listeners listener_attr = method_name + '_listeners' listeners = getattr(obj, listener_attr, None) if listeners: # Remove the listener next((listeners.pop(i) for i, l in enumerate(listeners) if l == listener), None) # If this was the last listener, then remove the method wrapper if not listeners: method = getattr(obj, method_name) delattr(obj, listener_attr) setattr(obj, method_name, method.__wrapped__) 

These methods can then be used to add a listener to any class method. For example:

class MyClass(object): def __init__(self, prop): self.prop = prop def some_method(self, num, string): print('method:', num, string) def listener_method(obj, num, string): print('listener:', num, string, obj.prop) my = MyClass('my_prop') add_listener(my, 'some_method', listener_method) my.some_method(42, 'with listener') remove_listener(my, 'some_method', listener_method) my.some_method(42, 'without listener') 

And the output is:

method: 42 with listener listener: 42 with listener my_prop method: 42 without listener 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.