10

Im trying to use python's default logging module in a multiprocessing scenario. I've read:

  1. Python MultiProcess, Logging, Various Classes
  2. Logging using multiprocessing

and other multiple posts about multiprocessing, logging, python classes and such. After all this reading I've came to this piece of code I cannot make it properly run which uses python's logutils QueueHandler:

import sys import logging from logging import INFO from multiprocessing import Process, Queue as mpQueue import threading import time from logutils.queue import QueueListener, QueueHandler class Worker(Process): def __init__(self, n, q): super(Worker, self).__init__() self.n = n self.queue = q self.qh = QueueHandler(self.queue) self.root = logging.getLogger() self.root.addHandler(self.qh) self.root.setLevel(logging.DEBUG) self.logger = logging.getLogger("W%i"%self.n) def run(self): self.logger.info("Worker %i Starting"%self.n) for i in xrange(10): self.logger.log(INFO, "testing %i"%i) self.logger.log(INFO, "Completed %i"%self.n) def listener_process(queue): while True: try: record = queue.get() if record is None: break logger = logging.getLogger(record.name) logger.handle(record) except (KeyboardInterrupt, SystemExit): raise except: import sys, traceback print >> sys.stderr, 'Whoops! Problem:' traceback.print_exc(file=sys.stderr) if __name__ == "__main__": mpq = mpQueue(-1) root = logging.getLogger() h = logging.StreamHandler() f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s') h.setFormatter(f) root.addHandler(h) l = logging.getLogger("Test") l.setLevel(logging.DEBUG) listener = Process(target=listener_process, args=(mpq,)) listener.start() workers=[] for i in xrange(1): worker = Worker(i, mpq) worker.daemon = True worker.start() workers.append(worker) for worker in workers: worker.join() mpq.put_nowait(None) listener.join() for i in xrange(10): l.info("testing %i"%i) print "Finish" 

If the code is executed, the output somehow repeats lines like:

2013-12-02 16:44:46,002 Worker-2 W0 INFO Worker 0 Starting 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 0 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 1 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 2 2013-12-02 16:44:46,002 Worker-2 W0 INFO Worker 0 Starting 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 3 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 0 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 1 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 4 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 2 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 3 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 5 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 4 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 6 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 5 2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 7 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 6 2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 8 2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 7 2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 9 2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 8 2013-12-02 16:44:46,004 Worker-2 W0 INFO Completed 0 2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 9 2013-12-02 16:44:46,004 Worker-2 W0 INFO Completed 0 2013-12-02 16:44:46,005 MainProcess Test INFO testing 0 2013-12-02 16:44:46,005 MainProcess Test INFO testing 1 2013-12-02 16:44:46,005 MainProcess Test INFO testing 2 2013-12-02 16:44:46,005 MainProcess Test INFO testing 3 2013-12-02 16:44:46,005 MainProcess Test INFO testing 4 2013-12-02 16:44:46,005 MainProcess Test INFO testing 5 2013-12-02 16:44:46,006 MainProcess Test INFO testing 6 2013-12-02 16:44:46,006 MainProcess Test INFO testing 7 2013-12-02 16:44:46,006 MainProcess Test INFO testing 8 2013-12-02 16:44:46,006 MainProcess Test INFO testing 9 Finish 

In other questios it's suggested that the handler gets added more than once, but, as you can see, I only add the streamhanlder once in the main method. I've already tested embedding the main method into a class with the same result.

EDIT: as @max suggested (or what I believe he said) I've modified the code of the worker class as:

class Worker(Process): root = logging.getLogger() qh = None def __init__(self, n, q): super(Worker, self).__init__() self.n = n self.queue = q if not self.qh: Worker.qh = QueueHandler(self.queue) Worker.root.addHandler(self.qh) Worker.root.setLevel(logging.DEBUG) self.logger = logging.getLogger("W%i"%self.n) print self.root.handlers def run(self): self.logger.info("Worker %i Starting"%self.n) for i in xrange(10): self.logger.log(INFO, "testing %i"%i) self.logger.log(INFO, "Completed %i"%self.n) 

With the same results, Now the queue handler is not added again and again but still there are duplicate log entries, even with just one worker.

EDIT2: I've changed the code a little bit. I changed the listener process and now use a QueueListener (that's what I intended in the begining anyway), moved the main code to a class.

import sys import logging from logging import INFO from multiprocessing import Process, Queue as mpQueue import threading import time from logutils.queue import QueueListener, QueueHandler root = logging.getLogger() added_qh = False class Worker(Process): def __init__(self, logconf, n, qh): super(Worker, self).__init__() self.n = n self.logconf = logconf # global root global added_qh if not added_qh: added_qh = True root.addHandler(qh) root.setLevel(logging.DEBUG) self.logger = logging.getLogger("W%i"%self.n) #print root.handlers def run(self): self.logger.info("Worker %i Starting"%self.n) for i in xrange(10): self.logger.log(INFO, "testing %i"%i) self.logger.log(INFO, "Completed %i"%self.n) class Main(object): def __init__(self): pass def start(self): mpq = mpQueue(-1) qh = QueueHandler(mpq) h = logging.StreamHandler() ql = QueueListener(mpq, h) #h.setFormatter(f) root.addHandler(qh) l = logging.getLogger("Test") l.setLevel(logging.DEBUG) workers=[] for i in xrange(15): worker = Worker(logconf, i, qh) worker.daemon = True worker.start() workers.append(worker) for worker in workers: print "joining worker: {}".format(worker) worker.join() mpq.put_nowait(None) ql.start() # listener.join() for i in xrange(10): l.info("testing %i"%i) if __name__ == "__main__": x = Main() x.start() time.sleep(10) print "Finish" 

Now it mostly works until I reach a certain number of workers (~15) when for some reason the Main class get blocked in de join and the rest of the workers do nothing.

2
  • Are you on windows or *nix? Commented Dec 2, 2013 at 15:58
  • I'm running this code in Linux with python 2.7. Commented Dec 2, 2013 at 16:00

3 Answers 3

8

I'm coming late, so you probably don't need the answer anymore. The problem comes from the fact that you already have a handler set in your main process, and in your worker you are adding another one. This means that in your worker process, two handlers are in fact managing your data, the one in pushing the log to queue, and the one writing to the stream.

You can fix this simply by adding an extra line self.root.handlers = [] to your code. From your original code, the __init__ method of the worker would look like this:

def __init__(self, n, q): super(Worker, self).__init__() self.n = n self.queue = q self.qh = QueueHandler(self.queue) self.root = logging.getLogger() self.root.handlers = [] self.root.addHandler(self.qh) self.root.setLevel(logging.DEBUG) self.logger = logging.getLogger("W%i"%self.n) 

The output now looks like this:

python workers.py 2016-05-12 10:07:02,971 Worker-2 W0 INFO Worker 0 Starting 2016-05-12 10:07:02,972 Worker-2 W0 INFO testing 0 2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 1 2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 2 2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 3 2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 4 2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 5 2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 6 2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 7 2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 8 2016-05-12 10:07:02,973 Worker-2 W0 INFO testing 9 2016-05-12 10:07:02,973 Worker-2 W0 INFO Completed 0 Finish 
Sign up to request clarification or add additional context in comments.

1 Comment

Late but nevertheless very pertinent :) Thanks :)
3

I figured out a pretty simple workaround using monkeypatching. It probably isn't robust and I am not an expert with the logging module, but it seemed like the best solution for my situation. After trying some code-changes (to enable passing in an existing logger, from multiprocess.get_logger()) I didn't like how much the code was changing and came up with a quick (well it would have been, had I done this in the first place) easy to read hack/workaround:

(working example, complete with multiprocessing pool)

import logging import multiprocessing class FakeLogger(object): def __init__(self, q): self.q = q def info(self, item): self.q.put('INFO - {}'.format(item)) def debug(self, item): self.q.put('DEBUG - {}'.format(item)) def critical(self, item): self.q.put('CRITICAL - {}'.format(item)) def warning(self, item): self.q.put('WARNING - {}'.format(item)) def some_other_func_that_gets_logger_and_logs(num): # notice the name get's discarded # of course you can easily add this to your FakeLogger class local_logger = logging.getLogger('local') local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2)) local_logger.debug('hmm, something may need debugging here') return num*2 def func_to_parallelize(data_chunk): # unpack our args the_num, logger_q = data_chunk # since we're now in a new process, let's monkeypatch the logging module logging.getLogger = lambda name=None: FakeLogger(logger_q) # now do the actual work that happens to log stuff too new_num = some_other_func_that_gets_logger_and_logs(the_num) return (the_num, new_num) if __name__ == '__main__': multiprocessing.freeze_support() m = multiprocessing.Manager() logger_q = m.Queue() # we have to pass our data to be parallel-processed # we also need to pass the Queue object so we can retrieve the logs parallelable_data = [(1, logger_q), (2, logger_q)] # set up a pool of processes so we can take advantage of multiple CPU cores pool_size = multiprocessing.cpu_count() * 2 pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4) worker_output = pool.map(func_to_parallelize, parallelable_data) pool.close() # no more tasks pool.join() # wrap up current tasks # get the contents of our FakeLogger object while not logger_q.empty(): print logger_q.get() print 'worker output contained: {}'.format(worker_output) 

Of course this is probably not going to cover the whole gamut of logging usage, but I think the concept is simple enough here to get working quickly and relatively painlessly. And it should be easy to modify (for example the lambda func discards a possible prefix that can be passed into getLogger).

Comments

1

All your Workers share the same root logger object (obtained in Worker.__init__ -- the getLogger call always returns the same logger). However, every time you create a Worker, you add a handler (QueueHandler) to that logger.

So if you create 10 Workers, you will have 10 (identical) handlers on your root logger, which means output gets repeated 10 times.

Instead, you should make the logger a module attribute rather than an instance attribute, and configure it once at the module level -- not at the class level.

(actually, loggers should be configured once at the program level)

6 Comments

Can you give a simple snippet? I don't really understand when you say "module attribute"
Almost there... Now your root logger is a class attribute. Again, move it to the module (toplevel) scope -- it has nothing to do in a class. Also, your listener_process function is probably guilty of some repeating as well. I don't know what your program is supposed to do, but I get the feeling that you're trying to shoehorn too much stuff into it, especially as far as logging is concerned... What happens when you replace every logging call with a print? Can you understand why?
I'm not trying to make nothing usefull, just trying to make an example of a multiprocessing scenario with independent workers logging into the same handler. Eg: a master class spawns 10 workers which log into a file.
Ok, this is driving me completely crazy. I've moved root = logging.getLogger() to the top; I've tried using a QueueListener instead of the listener process; all in all with the same results. Any suggestion will be gladly accepted.
Me to ... any python geek to the rescue ?
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.