I haven't used Python Watchdog, but from a generic real-time systems perspective,
- processing xml with
_validate_xml can be slow, and make you miss events. - event = similar to an interrupt, handling should be as fast as possible.
To more you do while handling an event, the less "real-time" your system becomes. What you can do is offload the xml validity check to another process and exchange messages with a Queue (message would be event.dest_path) the paths you have seen moving. Your event handling will be as simple as putting messages on a queue, and the files can be processed in batch by the consumer of the queue.
In short:
- instantiate a
Queue fork() process - in the
on_moved handler, put messages on the queue, - in the forked process, pop messages from the queue and call
_validate_xml. - you may optionally leverage
multiprocessing.Pool do validate xml files in parallel.
good luck.
EDIT: tested out on my system; most of the comments above seem not to apply because watchdog's code seems to handle threading just fine.
#!/usr/bin/env python import time from watchdog.observers import Observer, api from watchdog.events import LoggingEventHandler, FileSystemEventHandler, FileMovedEvent import logging def counter_gen(): count = 0 while True: count += 1 yield count class XmlValidatorHandler(FileSystemEventHandler): sleep_time = 0.1 COUNTER = counter_gen() def on_moved(self, event): if isinstance(event, FileMovedEvent): print '%s - event %d; validate: %s' % ( type(self).__name__, self.COUNTER.next(), event.dest_path) time.sleep(self.sleep_time) class SlowXmlValidatorHandler(XmlValidatorHandler): sleep_time = 2 COUNTER = counter_gen() def get_observer(handler): observer = Observer(timeout=0.5) observer.event_queue.maxsize=10 observer.schedule(handler, path='.', recursive=True) return observer if __name__ == "__main__": logging.basicConfig(level=logging.INFO) event_handler = LoggingEventHandler() observer1 = get_observer(XmlValidatorHandler()) observer2 = get_observer(SlowXmlValidatorHandler()) observer1.start() observer2.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer1.stop() observer2.stop() observer1.join() observer2.join()
Wasn't able to reproduce your issue. some pointers:
- check queue
maxsize, if you already have items in there and they don't get handled in a timely fashion, then my guess is that the timeout kicks in and the event is lost. You may want to resize in that case. - check
timeout, if it is configured, you may want to tune that parameter.
Maybe a more complete snippet would help us help you.