70

I have a python subprocess that I'm trying to read output and error streams from. Currently I have it working, but I'm only able to read from stderr after I've finished reading from stdout. Here's what it looks like:

process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout_iterator = iter(process.stdout.readline, b"") stderr_iterator = iter(process.stderr.readline, b"") for line in stdout_iterator: # Do stuff with line print line for line in stderr_iterator: # Do stuff with line print line 

As you can see, the stderr for loop can't start until the stdout loop completes. How can I modify this to be able to read from both in the correct order the lines come in?

To clarify: I still need to be able to tell whether a line came from stdout or stderr because they will be treated differently in my code.

1

9 Answers 9

41

The code in your question may deadlock if the child process produces enough output on stderr (~100KB on my Linux machine).

There is a communicate() method that allows to read from both stdout and stderr separately:

from subprocess import Popen, PIPE process = Popen(command, stdout=PIPE, stderr=PIPE) output, err = process.communicate() 

If you need to read the streams while the child process is still running then the portable solution is to use threads (not tested):

from subprocess import Popen, PIPE from threading import Thread from Queue import Queue # Python 2 def reader(pipe, queue): try: with pipe: for line in iter(pipe.readline, b''): queue.put((pipe, line)) finally: queue.put(None) process = Popen(command, stdout=PIPE, stderr=PIPE, bufsize=1) q = Queue() Thread(target=reader, args=[process.stdout, q]).start() Thread(target=reader, args=[process.stderr, q]).start() for _ in range(2): for source, line in iter(q.get, None): print "%s: %s" % (source, line), 

See:

Sign up to request clarification or add additional context in comments.

9 Comments

Unfortunately this answer doesn't preserve the order that the lines come in from stdout and stderr. It is very close to what I need though! It's just important for me to know when an stderr line is piped relative to an stdout line.
@LukeSapan: I don't see any way to preserve the order and to capture stdout/stderr separately. You can get one or the other easily. On Unix you could try a select loop that can make the effect less apparent. It is starting to look like XY problem: edit your question and provide some context on what you are trying to do.
@LukeSapan As both FDs are independent of each other, a message coming through one may be delayed, so there is no concept of "before" and "after" in this case...
@LukeSapan why preserve the order? Just add timestamps and sort at the end.
Is there any solution how to interrupt the process while queue.get blocks?
|
24

Here's a solution based on selectors, but one that preserves order, and streams variable-length characters (even single chars).

The trick is to use read1(), instead of read().

import selectors import subprocess import sys p = subprocess.Popen( ["python", "random_out.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) sel = selectors.DefaultSelector() sel.register(p.stdout, selectors.EVENT_READ) sel.register(p.stderr, selectors.EVENT_READ) while True: for key, _ in sel.select(): data = key.fileobj.read1().decode() if not data: exit() if key.fileobj is p.stdout: print(data, end="") else: print(data, end="", file=sys.stderr) 

If you want a test program, use this.

import sys from time import sleep for i in range(10): print(f" x{i} ", file=sys.stderr, end="") sleep(0.1) print(f" y{i} ", end="") sleep(0.1) 

9 Comments

Looks like this is the culprit - stackoverflow.com/questions/375427/…. selectors don't work on windows for pipes :(
As an obvious and trivial improvement, get rid of the shell=True
note: 1- it doesn't work on Windows 2- it won't preserve the order (it just makes it less likely that you notice the order is wrong). See related comments under my answer
Is there a reproducible way to get the wrong order? Maybe some sort of fuzzing?
@shouldsee, Does an arbitrary size of 1024 work with python 3.5?
|
11

This works for Python3 (3.6):

import selectors import subprocess import sys p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) # Read both stdout and stderr simultaneously sel = selectors.DefaultSelector() sel.register(p.stdout, selectors.EVENT_READ) sel.register(p.stderr, selectors.EVENT_READ) ok = True while ok: for key, val1 in sel.select(): line = key.fileobj.readline() if not line: ok = False break if key.fileobj is p.stdout: print(f"STDOUT: {line}", end="") else: print(f"STDERR: {line}", end="", file=sys.stderr) 

Comments

10

The order in which a process writes data to different pipes is lost after write.

There is no way you can tell if stdout has been written before stderr.

You can try to read data simultaneously from multiple file descriptors in a non-blocking way as soon as data is available, but this would only minimize the probability that the order is incorrect.

This program should demonstrate this:

#!/usr/bin/env python # -*- coding: utf-8 -*- import os import select import subprocess testapps={ 'slow': ''' import os import time os.write(1, 'aaa') time.sleep(0.01) os.write(2, 'bbb') time.sleep(0.01) os.write(1, 'ccc') ''', 'fast': ''' import os os.write(1, 'aaa') os.write(2, 'bbb') os.write(1, 'ccc') ''', 'fast2': ''' import os os.write(1, 'aaa') os.write(2, 'bbbbbbbbbbbbbbb') os.write(1, 'ccc') ''' } def readfds(fds, maxread): while True: fdsin, _, _ = select.select(fds,[],[]) for fd in fdsin: s = os.read(fd, maxread) if len(s) == 0: fds.remove(fd) continue yield fd, s if fds == []: break def readfromapp(app, rounds=10, maxread=1024): f=open('testapp.py', 'w') f.write(testapps[app]) f.close() results={} for i in range(0, rounds): p = subprocess.Popen(['python', 'testapp.py'], stdout=subprocess.PIPE , stderr=subprocess.PIPE) data='' for (fd, s) in readfds([p.stdout.fileno(), p.stderr.fileno()], maxread): data = data + s results[data] = results[data] + 1 if data in results else 1 print 'running %i rounds %s with maxread=%i' % (rounds, app, maxread) results = sorted(results.items(), key=lambda (k,v): k, reverse=False) for data, count in results: print '%03i x %s' % (count, data) print print "=> if output is produced slowly this should work as whished" print " and should return: aaabbbccc" readfromapp('slow', rounds=100, maxread=1024) print print "=> now mostly aaacccbbb is returnd, not as it should be" readfromapp('fast', rounds=100, maxread=1024) print print "=> you could try to read data one by one, and return" print " e.g. a whole line only when LF is read" print " (b's should be finished before c's)" readfromapp('fast', rounds=100, maxread=1) print print "=> but even this won't work ..." readfromapp('fast2', rounds=100, maxread=1) 

and outputs something like this:

=> if output is produced slowly this should work as whished and should return: aaabbbccc running 100 rounds slow with maxread=1024 100 x aaabbbccc => now mostly aaacccbbb is returnd, not as it should be running 100 rounds fast with maxread=1024 006 x aaabbbccc 094 x aaacccbbb => you could try to read data one by one, and return e.g. a whole line only when LF is read (b's should be finished before c's) running 100 rounds fast with maxread=1 003 x aaabbbccc 003 x aababcbcc 094 x abababccc => but even this won't work ... running 100 rounds fast2 with maxread=1 003 x aaabbbbbbbbbbbbbbbccc 001 x aaacbcbcbbbbbbbbbbbbb 008 x aababcbcbcbbbbbbbbbbb 088 x abababcbcbcbbbbbbbbbb 

5 Comments

unrelated: use if not s: instead of if len(s) == 0: here. Use while fds: instead of while True: ... if fds == []: break. Use results = collections.defaultdict(int); ...; results[data]+=1 instead of results = {}; ...; results[data] = results[data] + 1 if data in results else 1
Or use results = collections.Counter(); ...; results[data]+=1; ...; for data, count in results.most_common():
you could use data = b''.join([s for _, s in readfds(...)])
you should close the pipes to avoid relying on the garbage colleciton to free file descriptors in the parent and call p.wait() to reap the child process explicitly.
6

from https://docs.python.org/3/library/subprocess.html#using-the-subprocess-module

If you wish to capture and combine both streams into one, use stdout=PIPE and stderr=STDOUT instead of capture_output.

so the easiest solution would be:

process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) stdout_iterator = iter(process.stdout.readline, b"") for line in stdout_iterator: # Do stuff with line print line 

1 Comment

This doesn't read the streams separately, but merges stderr into stdout.
3

I know this question is very old, but this answer may help others who stumble upon this page in researching a solution for a similar situation, so I'm posting it anyway.

I've built a simple python snippet that will merge any number of pipes into a single one. Of course, as stated above, the order cannot be guaranteed, but this is as close as I think you can get in Python.

It spawns a thread for each of the pipes, reads them line by line and puts them into a Queue (which is FIFO). The main thread loops through the queue, yielding each line.

import threading, queue def merge_pipes(**named_pipes): r''' Merges multiple pipes from subprocess.Popen (maybe other sources as well). The keyword argument keys will be used in the output to identify the source of the line. Example: p = subprocess.Popen(['some', 'call'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) outputs = {'out': log.info, 'err': log.warn} for name, line in merge_pipes(out=p.stdout, err=p.stderr): outputs[name](line) This will output stdout to the info logger, and stderr to the warning logger ''' # Constants. Could also be placed outside of the method. I just put them here # so the method is fully self-contained PIPE_OPENED=1 PIPE_OUTPUT=2 PIPE_CLOSED=3 # Create a queue where the pipes will be read into output = queue.Queue() # This method is the run body for the threads that are instatiated below # This could be easily rewritten to be outside of the merge_pipes method, # but to make it fully self-contained I put it here def pipe_reader(name, pipe): r""" reads a single pipe into the queue """ output.put( ( PIPE_OPENED, name, ) ) try: for line in iter(pipe.readline,''): output.put( ( PIPE_OUTPUT, name, line.rstrip(), ) ) finally: output.put( ( PIPE_CLOSED, name, ) ) # Start a reader for each pipe for name, pipe in named_pipes.items(): t=threading.Thread(target=pipe_reader, args=(name, pipe, )) t.daemon = True t.start() # Use a counter to determine how many pipes are left open. # If all are closed, we can return pipe_count = 0 # Read the queue in order, blocking if there's no data for data in iter(output.get,''): code=data[0] if code == PIPE_OPENED: pipe_count += 1 elif code == PIPE_CLOSED: pipe_count -= 1 elif code == PIPE_OUTPUT: yield data[1:] if pipe_count == 0: return 

Comments

0

This works for me (on windows): https://github.com/waszil/subpiper

from subpiper import subpiper def my_stdout_callback(line: str): print(f'STDOUT: {line}') def my_stderr_callback(line: str): print(f'STDERR: {line}') my_additional_path_list = [r'c:\important_location'] retcode = subpiper(cmd='echo magic', stdout_callback=my_stdout_callback, stderr_callback=my_stderr_callback, add_path_list=my_additional_path_list) 

Comments

0

Async nature of IO streams prevents preserving ideal chronology without sort by timestamps.

For the cases when there are no timestamps provided, this solution:

  • Prints from stdout and stderr of subprocess to stdout and stderr of current process in real time, flushing properly.
  • Stops the loop only when all the streams are closed.
  • Uses selectors recommended for "high-level and efficient I/O multiplexing" on Unix-like OS.
  • Passes typing and other linters.
#!/usr/bin/env python3 import selectors import subprocess import sys from typing import IO, Dict, cast sub = subprocess.Popen( ["bash", "-c", "for i in $(seq 5); do echo out $i && echo err $i >&2; done"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) assert sub.stdout and sub.stderr streams: Dict[IO[str], IO[str]] = { sub.stdout: sys.stdout, sub.stderr: sys.stderr, } with selectors.DefaultSelector() as selector: for sub_stream, sys_stream in streams.items(): selector.register(sub_stream, selectors.EVENT_READ, sys_stream) while streams: for selected, _ in selector.select(): sub_stream = cast(IO[str], selected.fileobj) if sub_stream not in streams: continue line = sub_stream.readline() if not line: streams.pop(sub_stream) continue sys_stream = selected.data sys_stream.write(line) sys_stream.flush() if sys_stream is sys.stdout: pass # Process stdout line. else: pass # Process stderr line. exit_code = sub.wait() print(exit_code) 

Output sample:

out 1 err 1 err 2 out 2 err 3 out 3 err 4 out 4 err 5 out 5 0 

Comments

0

Sorry for the slightly un-pytonic, opinion-ed proposal...

Case1: get stdout-str of a well behavior-ed command

from jjcli import qxbiggest_file = qx("ls -S | head -1") #=> biggest_file = "f.html" 

Case2: lines

for f in qx("ls *jpg").splitlines(): ## or qxlines("ls *.jpg") ... 

Case 3: get stdout-str and stderr-str of a not so well behavior-ed command

from jjcli import qxerr output, errors = qxerr("ls -d * XXXX") #=> output="f1.txt\nf2.txt" errors="ls: canot acess 'XXXX' ..." 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.