2

I'm trying to run a lengthy command within Python that outputs to both stdout and stderr. I'd like to poll the subprocess and write the output to separate files.

I tried the following, based on this answer Non-blocking read on a subprocess.PIPE in python

import subprocess from Queue import Queue, Empty from threading import Thread def send_cmd(cmd, shell=False): """ Send cmd to the shell """ if not isinstance(cmd, list): cmd = shlex.split(cmd) params = {'args' : cmd, 'stdout' : subprocess.PIPE, 'stderr' : subprocess.PIPE, 'shell' : shell} proc = subprocess.Popen(**params) return proc def monitor_command(process, stdout_log=os.devnull, stderr_log=os.devnull): """ Monitor the process that is running, and log it if desired """ def enqueue_output(out, queue): for line in iter(out.readline, b''): queue.put(line) def setup_process(log_name, proc): FID = open(log_name, 'w') queue = Queue() thread = Thread(target=enqueue_output, args=(proc, queue)) thread.daemon = True # Thread dies with program thread.start() return (queue, FID) def check_queues(queue_list, errors): for queue, FID in queue_list: try: line = queue.get_nowait() if 'error' in line.lower() or 'failed' in line.lower(): errors.append(line) except Empty: pass else: FID.write(line) errors = [] queue_list = [] for log, proc in [(stdout_log, process.stdout), (stderr_log, process.stderr)]: queue_list.append(setup_process(log, proc) while process.poll() is None: check_queues(queue_list, errors) while not queue_list[0][0].empty() or queue_list[1][0].empty(): check_queues(queue_list, errors) for queue, FID in queue_list: FID.close() return errors process = send_cmd('long_program.exe') errors = monitor_command(process, stdout_log='stdout.log', stderr_log='stderr.log') 

But it the output file for stdout is empty, and the output file for stderr is only a few lines long, whereas both should be quite large.

What am I missing?

2 Answers 2

0

I did that once.. here is some old code I wrote

  class Process_Communicator(): def join(self): self.te.join() self.to.join() self.running = False self.aggregator.join() self.ti.join() def enqueue_in(self): while self.running and self.p.stdin is not None: while not self.stdin_queue.empty(): s = self.stdin_queue.get() self.p.stdin.write(str(s) + '\n\r') pass def enqueue_output(self): if not self.p.stdout or self.p.stdout.closed: return out = self.p.stdout for line in iter(out.readline, b''): self.qo.put(line) # out.flush() def enqueue_err(self): if not self.p.stderr or self.p.stderr.closed: return err = self.p.stderr for line in iter(err.readline, b''): self.qe.put(line) def aggregate(self): while (self.running): self.update() self.update() def update(self): line = "" try: while self.qe.not_empty: line = self.qe.get_nowait() # or q.get(timeout=.1) self.unbblocked_err += line except Queue.Empty: pass line = "" try: while self.qo.not_empty: line = self.qo.get_nowait() # or q.get(timeout=.1) self.unbblocked_out += line except Queue.Empty: pass while not self.stdin_queue.empty(): s = self.stdin_queue.get() self.p.stdin.write(str(s)) def get_stdout(self, clear=True): ret = self.unbblocked_out if clear: self.unbblocked_out = "" return ret def has_stdout(self): ret = self.get_stdout(False) if ret == '': return None else: return ret def get_stderr(self, clear=True): ret = self.unbblocked_out if clear: self.unbblocked_out = "" return ret def has_stderr(self): ret = self.get_stdout(False) if ret == '': return None else: return ret def __init__(self, subp): '''This is a simple class that collects and aggregates the output from a subprocess so that you can more reliably use the class without having to block for subprocess.communicate.''' self.p = subp self.unbblocked_out = "" self.unbblocked_err = "" self.running = True self.qo = Queue.Queue() self.to = threading.Thread(name="out_read", target=self.enqueue_output, args=()) self.to.daemon = True # thread dies with the program self.to.start() self.qe = Queue.Queue() self.te = threading.Thread(name="err_read", target=self.enqueue_err, args=()) self.te.daemon = True # thread dies with the program self.te.start() self.stdin_queue = Queue.Queue() self.aggregator = threading.Thread(name="aggregate", target=self.aggregate, args=()) self.aggregator.daemon = True # thread dies with the program self.aggregator.start() pass 

You may not need the whole example, but feel free to cut copy and paste what you need. It's also important to show how I did the threading.

Sign up to request clarification or add additional context in comments.

Comments

0

The code looks more complicated than the task requires. I don't see why do you need to call process.poll() or queue.get_nowait() here. To deliver subprocess' stdout/stderr to several sinks; you could start with teed_call() that accepts arbitrary file-like objects: you could pass logfiles and special file-like objects that accumulates errors in theirs .write() methods.

To fix your code with minimal changes; you should call .join() on the reader threads (even if process.poll() is not None i.e., the subprocess exited; there could be some pending output. Joining reader's threads ensures that all output is read).

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.