28

Is it possible to modify code below to have printout from 'stdout 'and 'stderr':

  • printed on the terminal (in real time),
  • and finally stored in outs and errs variables?

The code:

#!/usr/bin/python3 # -*- coding: utf-8 -*- import subprocess def run_cmd(command, cwd=None): p = subprocess.Popen(command, cwd=cwd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) outs, errs = p.communicate() rc = p.returncode outs = outs.decode('utf-8') errs = errs.decode('utf-8') return (rc, (outs, errs)) 

Thanks to @unutbu, special thanks for @j-f-sebastian, final function:

#!/usr/bin/python3 # -*- coding: utf-8 -*- import sys from queue import Queue from subprocess import PIPE, Popen from threading import Thread def read_output(pipe, funcs): for line in iter(pipe.readline, b''): for func in funcs: func(line.decode('utf-8')) pipe.close() def write_output(get): for line in iter(get, None): sys.stdout.write(line) def run_cmd(command, cwd=None, passthrough=True): outs, errs = None, None proc = Popen( command, cwd=cwd, shell=False, close_fds=True, stdout=PIPE, stderr=PIPE, bufsize=1 ) if passthrough: outs, errs = [], [] q = Queue() stdout_thread = Thread( target=read_output, args=(proc.stdout, [q.put, outs.append]) ) stderr_thread = Thread( target=read_output, args=(proc.stderr, [q.put, errs.append]) ) writer_thread = Thread( target=write_output, args=(q.get,) ) for t in (stdout_thread, stderr_thread, writer_thread): t.daemon = True t.start() proc.wait() for t in (stdout_thread, stderr_thread): t.join() q.put(None) outs = ' '.join(outs) errs = ' '.join(errs) else: outs, errs = proc.communicate() outs = '' if outs == None else outs.decode('utf-8') errs = '' if errs == None else errs.decode('utf-8') rc = proc.returncode return (rc, (outs, errs)) 
5
  • The code example does store outs and errs and returns them... To print to the terminal, simply if outs: print outs if errs: print errs Commented Jun 19, 2013 at 11:43
  • 2
    @bnlucas Thanks, but as I stated in first point: the output should be printed in REAL TIME to terminal, like as without PIPEing. Commented Jun 19, 2013 at 12:00
  • 2
    If you need Python 3 code; add python-3.x tag (i see python3 in the shebang). Your code as written will leave reading threads hanging. In Python 3 '' is a Unicode literal, but pipe.readline() returns bytes by default ('' != b"" on Python 3). If you fix it then the writer thread won't end, because nothing puts "" into the queue. Commented Jun 19, 2013 at 16:02
  • related: Displaying subprocess output to stdout and redirecting it Commented Sep 21, 2014 at 15:50
  • Does this answer your question? Wrap subprocess' stdout/stderr Commented Feb 20, 2024 at 21:32

4 Answers 4

26

To capture and display at the same time both stdout and stderr from a child process line by line in a single thread, you could use asynchronous I/O:

#!/usr/bin/env python3 import asyncio import os import sys from asyncio.subprocess import PIPE @asyncio.coroutine def read_stream_and_display(stream, display): """Read from stream line by line until EOF, display, and capture the lines. """ output = [] while True: line = yield from stream.readline() if not line: break output.append(line) display(line) # assume it doesn't block return b''.join(output) @asyncio.coroutine def read_and_display(*cmd): """Capture cmd's stdout, stderr while displaying them as they arrive (line by line). """ # start process process = yield from asyncio.create_subprocess_exec(*cmd, stdout=PIPE, stderr=PIPE) # read child's stdout/stderr concurrently (capture and display) try: stdout, stderr = yield from asyncio.gather( read_stream_and_display(process.stdout, sys.stdout.buffer.write), read_stream_and_display(process.stderr, sys.stderr.buffer.write)) except Exception: process.kill() raise finally: # wait for the process to exit rc = yield from process.wait() return rc, stdout, stderr # run the event loop if os.name == 'nt': loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows asyncio.set_event_loop(loop) else: loop = asyncio.get_event_loop() rc, *output = loop.run_until_complete(read_and_display(*cmd)) loop.close() 
Sign up to request clarification or add additional context in comments.

7 Comments

This code looks good, could you add a version for Python 2.7?
@kinORnirvana: asyncio works only on Python 3.3+ There is trollius—a Python 2 clone but it is deprecated
Note that once the loop is closed doing get_event_loop will get you the same closed loop which cannot be re-used as is (event loop is closed message). I ended up doing asyncio.set_event_loop(asyncio.new_event_loop()) to get a fresh event loop.
I was running this code in a Jupyter notebook. I was getting an AttributeError because sys.stdout.buffer no longer existed. This helped clear it up: docs.python.org/3/library/sys.html#sys.stderr When in a Jupyter notebook I used sys.stdout.write in lieu of sys.stdout.buffer.writeand the output appeared in the notebook logging output.
I found that this displays all the ouptut from stderr, then all the output from stdout.
|
19

You could spawn threads to read the stdout and stderr pipes, write to a common queue, and append to lists. Then use a third thread to print items from the queue.

import time import Queue import sys import threading import subprocess PIPE = subprocess.PIPE def read_output(pipe, funcs): for line in iter(pipe.readline, ''): for func in funcs: func(line) # time.sleep(1) pipe.close() def write_output(get): for line in iter(get, None): sys.stdout.write(line) process = subprocess.Popen( ['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True, bufsize=1) q = Queue.Queue() out, err = [], [] tout = threading.Thread( target=read_output, args=(process.stdout, [q.put, out.append])) terr = threading.Thread( target=read_output, args=(process.stderr, [q.put, err.append])) twrite = threading.Thread(target=write_output, args=(q.get,)) for t in (tout, terr, twrite): t.daemon = True t.start() process.wait() for t in (tout, terr): t.join() q.put(None) print(out) print(err) 

The reason for using the third thread -- instead of letting the first two threads both print directly to the terminal -- is to prevent both print statements from occurring concurrently, which can result in sometimes garbled text.


The above calls random_print.py, which prints to stdout and stderr at random:

import sys import time import random for i in range(50): f = random.choice([sys.stdout,sys.stderr]) f.write(str(i)+'\n') f.flush() time.sleep(0.1) 

This solution borrows code and ideas from J. F. Sebastian, here.


Here is an alternative solution for Unix-like systems, using select.select:

import collections import select import fcntl import os import time import Queue import sys import threading import subprocess PIPE = subprocess.PIPE def make_async(fd): # https://stackoverflow.com/a/7730201/190597 '''add the O_NONBLOCK flag to a file descriptor''' fcntl.fcntl( fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK) def read_async(fd): # https://stackoverflow.com/a/7730201/190597 '''read some data from a file descriptor, ignoring EAGAIN errors''' # time.sleep(1) try: return fd.read() except IOError, e: if e.errno != errno.EAGAIN: raise e else: return '' def write_output(fds, outmap): for fd in fds: line = read_async(fd) sys.stdout.write(line) outmap[fd.fileno()].append(line) process = subprocess.Popen( ['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True) make_async(process.stdout) make_async(process.stderr) outmap = collections.defaultdict(list) while True: rlist, wlist, xlist = select.select([process.stdout, process.stderr], [], []) write_output(rlist, outmap) if process.poll() is not None: write_output([process.stdout, process.stderr], outmap) break fileno = {'stdout': process.stdout.fileno(), 'stderr': process.stderr.fileno()} print(outmap[fileno['stdout']]) print(outmap[fileno['stderr']]) 

This solution uses code and ideas from Adam Rosenfield's post, here.

3 Comments

you could add q.put(None) after process.wait() and exit the 3rd thread on None e.g., for line in iter(get, None):. Also pipe.close() is missing.
@J.F.Sebastian: Thanks for the corrections. Suppose read_output for some reason does not keep pace with the output being written to pipe. (I try to simulate that with a time.sleep(1) above). When the time.sleep(1) is uncommented, out and err fail to collect all the output before process.wait() completes. Do you know a way to guarantee that out and err get all the output?
t{err,out}.join() before put(None). btw, to get lines in "real time", bufsize=1 might help (ignoring `block-buffering issue)
1

Here is another version for your run_cmd function using asyncio in Python 3.11:

import asyncio import io import sys from subprocess import SubprocessError # Maximum number of bytes to read at once from the 'asyncio.subprocess.PIPE' _MAX_BUFFER_CHUNK_SIZE = 1024 async def run_cmd_async(command, cwd=None, check=False): stdout_buffer = io.BytesIO() stderr_buffer = io.BytesIO() process = await asyncio.subprocess.create_subprocess_exec( *command, cwd=cwd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) async def write_stdout() -> None: assert process.stdout is not None while chunk := await process.stdout.read(_MAX_BUFFER_CHUNK_SIZE): stdout_buffer.write(chunk) print(chunk.decode(), end="", flush=True) async def write_stderr() -> None: assert process.stderr is not None while chunk := await process.stderr.read(_MAX_BUFFER_CHUNK_SIZE): stderr_buffer.write(chunk) print(chunk.decode(), file=sys.stderr, end="", flush=True) async with asyncio.TaskGroup() as task_group: task_group.create_task(write_stdout()) task_group.create_task(write_stderr()) exit_code = await process.wait() if check and exit_code != 0: raise SubprocessError( f"Command '{command}' returned non-zero exit status {exit_code}." ) return exit_code, (stdout_buffer.getvalue().decode(), stderr_buffer.getvalue().decode()) def run_cmd(command, cwd=None, check=False): return asyncio.run(run_cmd_async(command, cwd=cwd, check=check)) 

Comments

0

To stream live output (stdout and stderr) of a subprocess to the terminal, as well as to variables, you can spawn two threads to handle the streams concurrently.

Adapted from my more detailed answer:

import logging from collections import deque from concurrent.futures import ThreadPoolExecutor from functools import partial from subprocess import PIPE, CalledProcessError, CompletedProcess, Popen def stream_command( args, *, stdout_handler=logging.info, stderr_handler=logging.error, check=True, text=True, stdout=PIPE, stderr=PIPE, **kwargs, ): """Mimic subprocess.run, while processing the command output in real time.""" with Popen(args, text=text, stdout=stdout, stderr=stderr, **kwargs) as process: with ThreadPoolExecutor(2) as pool: # two threads to handle the streams exhaust = partial(pool.submit, partial(deque, maxlen=0)) exhaust(stdout_handler(line[:-1]) for line in process.stdout) exhaust(stderr_handler(line[:-1]) for line in process.stderr) retcode = process.poll() if check and retcode: raise CalledProcessError(retcode, process.args) return CompletedProcess(process.args, retcode) 

Call with custom handlers:

outs, errs = [], [] def stdout_handler(line): outs.append(line) print(line) def stderr_handler(line): errs.append(line) print(line) stream_command( ["echo", "test"], stdout_handler=stdout_handler, stderr_handler=stderr_handler, ) # test print(outs) # ['test'] 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.