If you want to treat stdout and stderr separately (as opposed to sending stderr to stdout, see my simplified answer), you can spawn two threads that handle them concurrently (live as the output is produced).
Adapted from my more detailed answer:
import logging from collections import deque from concurrent.futures import ThreadPoolExecutor from functools import partial from subprocess import PIPE, CalledProcessError, CompletedProcess, Popen def stream_command( args, *, stdout_handler=logging.info, stderr_handler=logging.error, # for a version that redirects stderr to stdout (without multithreading), see https://stackoverflow.com/a/76626021/5511061 check=True, text=True, **kwargs, ): """Mimic subprocess.run, while processing the command output in real time ref https://stackoverflow.com/a/76634163/5511061.""" with ( Popen(args, text=text, stdout=PIPE, stderr=PIPE, **kwargs) as process, ThreadPoolExecutor(2) as pool, # two threads to handle the (live) streams separately ): exhaust = partial(deque, maxlen=0) # collections recipe: exhaust an iterable at C-speed exhaust_async = partial(pool.submit, exhaust) # exhaust non-blocking in a background thread # exhaust both streams (stdout and stderr iterables) asynchronously, each in its own thread exhaust_async(stdout_handler(line[:-1]) for line in process.stdout) exhaust_async(stderr_handler(line[:-1]) for line in process.stderr) retcode = process.poll() # block until both iterables are exhausted (process finished) if check and retcode: raise CalledProcessError(retcode, process.args) return CompletedProcess(process.args, retcode)
Call with simple print handlers:
stream_command(["echo", "test"], stdout_handler=print, stderr_handler=print) # test
Or with custom handlers:
outs, errs = [], [] def stdout_handler(line): outs.append(line) print(line) def stderr_handler(line): errs.append(line) print(line) stream_command( ["echo", "test"], stdout_handler=stdout_handler, stderr_handler=stderr_handler, ) # test print(outs) # ['test']