Im trying to use python's default logging module in a multiprocessing scenario. I've read:
and other multiple posts about multiprocessing, logging, python classes and such. After all this reading I've came to this piece of code I cannot make it properly run which uses python's logutils QueueHandler:
import sys import logging from logging import INFO from multiprocessing import Process, Queue as mpQueue import threading import time from logutils.queue import QueueListener, QueueHandler class Worker(Process): def __init__(self, n, q): super(Worker, self).__init__() self.n = n self.queue = q self.qh = QueueHandler(self.queue) self.root = logging.getLogger() self.root.addHandler(self.qh) self.root.setLevel(logging.DEBUG) self.logger = logging.getLogger("W%i"%self.n) def run(self): self.logger.info("Worker %i Starting"%self.n) for i in xrange(10): self.logger.log(INFO, "testing %i"%i) self.logger.log(INFO, "Completed %i"%self.n) def listener_process(queue): while True: try: record = queue.get() if record is None: break logger = logging.getLogger(record.name) logger.handle(record) except (KeyboardInterrupt, SystemExit): raise except: import sys, traceback print >> sys.stderr, 'Whoops! Problem:' traceback.print_exc(file=sys.stderr) if __name__ == "__main__": mpq = mpQueue(-1) root = logging.getLogger() h = logging.StreamHandler() f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s') h.setFormatter(f) root.addHandler(h) l = logging.getLogger("Test") l.setLevel(logging.DEBUG) listener = Process(target=listener_process, args=(mpq,)) listener.start() workers=[] for i in xrange(1): worker = Worker(i, mpq) worker.daemon = True worker.start() workers.append(worker) for worker in workers: worker.join() mpq.put_nowait(None) listener.join() for i in xrange(10): l.info("testing %i"%i) print "Finish" If the code is executed, the output somehow repeats lines like:
2013-12-02 16:44:46,002 Worker-2 W0 INFO Worker 0 Starting 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 0 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 1 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 2 2013-12-02 16:44:46,002 Worker-2 W0 INFO Worker 0 Starting 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 3 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 0 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 1 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 4 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 2 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 3 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 5 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 4 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 6 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 5 2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 7 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 6 2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 8 2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 7 2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 9 2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 8 2013-12-02 16:44:46,004 Worker-2 W0 INFO Completed 0 2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 9 2013-12-02 16:44:46,004 Worker-2 W0 INFO Completed 0 2013-12-02 16:44:46,005 MainProcess Test INFO testing 0 2013-12-02 16:44:46,005 MainProcess Test INFO testing 1 2013-12-02 16:44:46,005 MainProcess Test INFO testing 2 2013-12-02 16:44:46,005 MainProcess Test INFO testing 3 2013-12-02 16:44:46,005 MainProcess Test INFO testing 4 2013-12-02 16:44:46,005 MainProcess Test INFO testing 5 2013-12-02 16:44:46,006 MainProcess Test INFO testing 6 2013-12-02 16:44:46,006 MainProcess Test INFO testing 7 2013-12-02 16:44:46,006 MainProcess Test INFO testing 8 2013-12-02 16:44:46,006 MainProcess Test INFO testing 9 Finish In other questios it's suggested that the handler gets added more than once, but, as you can see, I only add the streamhanlder once in the main method. I've already tested embedding the main method into a class with the same result.
EDIT: as @max suggested (or what I believe he said) I've modified the code of the worker class as:
class Worker(Process): root = logging.getLogger() qh = None def __init__(self, n, q): super(Worker, self).__init__() self.n = n self.queue = q if not self.qh: Worker.qh = QueueHandler(self.queue) Worker.root.addHandler(self.qh) Worker.root.setLevel(logging.DEBUG) self.logger = logging.getLogger("W%i"%self.n) print self.root.handlers def run(self): self.logger.info("Worker %i Starting"%self.n) for i in xrange(10): self.logger.log(INFO, "testing %i"%i) self.logger.log(INFO, "Completed %i"%self.n) With the same results, Now the queue handler is not added again and again but still there are duplicate log entries, even with just one worker.
EDIT2: I've changed the code a little bit. I changed the listener process and now use a QueueListener (that's what I intended in the begining anyway), moved the main code to a class.
import sys import logging from logging import INFO from multiprocessing import Process, Queue as mpQueue import threading import time from logutils.queue import QueueListener, QueueHandler root = logging.getLogger() added_qh = False class Worker(Process): def __init__(self, logconf, n, qh): super(Worker, self).__init__() self.n = n self.logconf = logconf # global root global added_qh if not added_qh: added_qh = True root.addHandler(qh) root.setLevel(logging.DEBUG) self.logger = logging.getLogger("W%i"%self.n) #print root.handlers def run(self): self.logger.info("Worker %i Starting"%self.n) for i in xrange(10): self.logger.log(INFO, "testing %i"%i) self.logger.log(INFO, "Completed %i"%self.n) class Main(object): def __init__(self): pass def start(self): mpq = mpQueue(-1) qh = QueueHandler(mpq) h = logging.StreamHandler() ql = QueueListener(mpq, h) #h.setFormatter(f) root.addHandler(qh) l = logging.getLogger("Test") l.setLevel(logging.DEBUG) workers=[] for i in xrange(15): worker = Worker(logconf, i, qh) worker.daemon = True worker.start() workers.append(worker) for worker in workers: print "joining worker: {}".format(worker) worker.join() mpq.put_nowait(None) ql.start() # listener.join() for i in xrange(10): l.info("testing %i"%i) if __name__ == "__main__": x = Main() x.start() time.sleep(10) print "Finish" Now it mostly works until I reach a certain number of workers (~15) when for some reason the Main class get blocked in de join and the rest of the workers do nothing.