27

I need a way to either read all currently available characters in stream created by Popen or to find out how many characters are left in the buffer.

Backround: I want to remote control an interactive application in Python. So far I used Popen to create a new subprocess:

process=subprocess.Popen(["python"],shell=True,stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE, cwd=workingDir) 

(I'm not really starting python, but the actual interactive interface is similar.) At the moment I read 1 byte until I detect that the process has reached the command prompt:

output = "" while output[-6:]!="SCIP> ": output += process.stdout.read(1) sys.stdout.write(output[-1]) return output 

Then I start a lengthy computation via process.stdin.write("command\n"). My problem is, that I cannot check whether the computation has finished or not, because I cannot check, whether the last characters in the stream are the prompt or not. read() or read(n) blocks my thread until it reaches EOF, which it never will, because the interactive program will not end until it is told to. Looking for the prompt in the way the above loop does won't work either, because the prompt will only occur after the computation.

The ideal solution would allow me to read all available character from the stream and immediately return an empty string, if there is nothing to read.

3
  • Might pexpect noah.org/wiki/Pexpect do what you need? Commented Jun 19, 2010 at 17:45
  • 1
    I've looked into that one already, and yes it might. But if possible I would like a solution that will work without external modules. Commented Jun 19, 2010 at 17:57
  • 1
    I'm not sure you can given: python.org/dev/peps/pep-3145 Commented Jun 19, 2010 at 18:18

6 Answers 6

14
+50

Incremental parsing of Popen's stdout is not a problem really. Just insert a pipe into a thread and have it scrub through output, looking for delimiters. Depending on your preference, it can pipe it into another pipe / file-like or put the parsed "chunks" on the "stack" in asynchronous mode. Here is an example of asynchronous "chunking" of stdout based on custom delimiter:

import cStringIO import uuid import threading import os class InputStreamChunker(threading.Thread): ''' Threaded object / code that mediates reading output from a stream, detects "separation markers" in the stream and spits out chunks of original stream, split when ends of chunk are encountered. Results are made available as a list of filled file-like objects (your choice). Results are accessible either "asynchronously" (you can poll at will for results in a non-blocking way) or "synchronously" by exposing a "subscribe and wait" system based on threading.Event flags. Usage: - instantiate this object - give our input pipe as "stdout" to other subprocess and start it: Popen(..., stdout = th.input, ...) - (optional) subscribe to data_available event - pull resulting file-like objects off .data (if you are "messing" with .data from outside of the thread, be curteous and wrap the thread-unsafe manipulations between: obj.data_unoccupied.clear() ... mess with .data obj.data_unoccupied.set() The thread will not touch obj.data for the duration and will block reading.) License: Public domain Absolutely no warranty provided ''' def __init__(self, delimiter = None, outputObjConstructor = None): ''' delimiter - the string that will be considered a delimiter for the stream outputObjConstructor - instanses of these will be attached to self.data array (intantiator_pointer, args, kw) ''' super(InputStreamChunker,self).__init__() self._data_available = threading.Event() self._data_available.clear() # parent will .wait() on this for results. self._data = [] self._data_unoccupied = threading.Event() self._data_unoccupied.set() # parent will set this to true when self.results is being changed from outside self._r, self._w = os.pipe() # takes all inputs. self.input = public pipe in. self._stop = False if not delimiter: delimiter = str(uuid.uuid1()) self._stream_delimiter = [l for l in delimiter] self._stream_roll_back_len = ( len(delimiter)-1 ) * -1 if not outputObjConstructor: self._obj = (cStringIO.StringIO, (), {}) else: self._obj = outputObjConstructor @property def data_available(self): '''returns a threading.Event instance pointer that is True (and non-blocking to .wait() ) when we attached a new IO obj to the .data array. Code consuming the array may decide to set it back to False if it's done with all chunks and wants to be blocked on .wait()''' return self._data_available @property def data_unoccupied(self): '''returns a threading.Event instance pointer that is normally True (and non-blocking to .wait() ) Set it to False with .clear() before you start non-thread-safe manipulations (changing) .data array. Set it back to True with .set() when you are done''' return self._data_unoccupied @property def data(self): '''returns a list of input chunkes (file-like objects) captured so far. This is a "stack" of sorts. Code consuming the chunks would be responsible for disposing of the file-like objects. By default, the file-like objects are instances of cStringIO''' return self._data @property def input(self): '''This is a file descriptor (not a file-like). It's the input end of our pipe which you give to other process to be used as stdout pipe for that process''' return self._w def flush(self): '''Normally a read on a pipe is blocking. To get things moving (make the subprocess yield the buffer, we inject our chunk delimiter into self.input This is useful when primary subprocess does not write anything to our in pipe, but we need to make internal pipe reader let go of the pipe and move on with things. ''' os.write(self._w, ''.join(self._stream_delimiter)) def stop(self): self._stop = True self.flush() # reader has its teeth on the pipe. This makes it let go for for a sec. os.close(self._w) self._data_available.set() def __del__(self): try: self.stop() except: pass try: del self._w del self._r del self._data except: pass def run(self): ''' Plan: - We read into a fresh instance of IO obj until marker encountered. - When marker is detected, we attach that IO obj to "results" array and signal the calling code (through threading.Event flag) that results are available - repeat until .stop() was called on the thread. ''' marker = ['' for l in self._stream_delimiter] # '' is there on purpose tf = self._obj[0](*self._obj[1], **self._obj[2]) while not self._stop: l = os.read(self._r, 1) print('Thread talking: Ordinal of char is:%s' %ord(l)) trash_str = marker.pop(0) marker.append(l) if marker != self._stream_delimiter: tf.write(l) else: # chopping off the marker first tf.seek(self._stream_roll_back_len, 2) tf.truncate() tf.seek(0) self._data_unoccupied.wait(5) # seriously, how much time is needed to get your items off the stack? self._data.append(tf) self._data_available.set() tf = self._obj[0](*self._obj[1], **self._obj[2]) os.close(self._r) tf.close() del tf def waitforresults(ch, answers, expect): while len(answers) < expect: ch.data_available.wait(0.5); ch.data_unoccupied.clear() while ch.data: answers.append(ch.data.pop(0)) ch.data_available.clear(); ch.data_unoccupied.set() print('Main talking: %s answers received, expecting %s\n' % ( len(answers), expect) ) def test(): ''' - set up chunker - set up Popen with chunker's output stream - push some data into proc.stdin - get results - cleanup ''' import subprocess ch = InputStreamChunker('\n') ch.daemon = True ch.start() print('starting the subprocess\n') p = subprocess.Popen( ['cat'], stdin = subprocess.PIPE, stdout = ch.input, stderr = subprocess.PIPE) answers = [] i = p.stdin i.write('line1 qwer\n') # will be in results i.write('line2 qwer\n') # will be in results i.write('line3 zxcv asdf') # will be in results only after a ch.flush(), # prepended to other line or when the pipe is closed waitforresults(ch, answers, expect = 2) i.write('line4 tyui\n') # will be in results i.write('line5 hjkl\n') # will be in results i.write('line6 mnbv') # will be in results only after a ch.flush(), # prepended to other line or when the pipe is closed waitforresults(ch, answers, expect = 4) ## now we will flush the rest of input (that last line did not have a delimiter) i.close() ch.flush() waitforresults(ch, answers, expect = 5) should_be = ['line1 qwer', 'line2 qwer', 'line3 zxcv asdfline4 tyui', 'line5 hjkl', 'line6 mnbv'] assert should_be == [i.read() for i in answers] # don't forget to stop the chunker. It it closes the pipes p.terminate() ch.stop() del p, ch if __name__ == '__main__': test() 

Edit: removed the erroneous verbiage about "writing to proc's stdin is a one-time-thing"

Sign up to request clarification or add additional context in comments.

3 Comments

I believe I like your version better than the one I actually implemented (see Brian's answer). Although you also read only one character at a time, the multithread approach is cleaner. Yet I disagree with your conclusion about the input streams. At least if you flush it and give the process enough time to read the data, its result will get back to you over the output stream. See codepad.org/Yu7SoORS lines 172-188 as an example (I have replaced grep with cat). Otherwise the program I need it for would fail miserable.
@Perseids Very cool. I stand corrected on my hastily-drawn assumptions about subprocess.Popen's STDIN. Will adjust the code example
cStringIO seems to be removed in Python 3.
8

Poking around I found this really nice solution

Persistent python subprocess

which avoids the blocking issue all together by using fcntl to set file attributes on the subprocess pipes to non-blocking mode, no auxiliary threads or polling required. I could be missing something but it has solved my interactive process control problem.

3 Comments

fcntl is Unix systems only. No worky on Windows == not a "Python" solution. :)
I have very little experience with windows programming but this link appears to suggest that there is an equivalent of non-blocking I/O in Windows as well.
And this link shows how win32file Python module can be used to create pipes in overlapped (Windows slang for non-blocking) mode.
3

I've tried a lot of approaches like making a non-blocking stdout by the following:

fd = output.fileno() fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) 

But the only working solution is described here:

master, slave = pty.openpty() proc = subprocess.Popen( shlex.split(command), stdout=slave, stderr=slave, close_fds=True, bufsize=0 ) stdout = os.fdopen(master) 

And then:

while True: out = stdout.readline() output_result = proc.poll() if out == '' and output_result is not None: break if out != '': print(out) 

Comments

2

There's another possible solution, but it may require that you rearrange your program a bit.

If you have multiple sources of I/O (file descriptors, sockets, etc.), and you want to wait on all of them at once, use the Python select module. You could (for instance) put standard input (for reading from the terminal) and the pipe (from the subprocess) in a list, and wait for input to be ready on either one of them. select blocks until I/O is available on any of the descriptors in the list. You then scan the list, looking for the ones that have available data.

This approach turns out to be quite efficient--much more so than polling a file descriptor to see if there's any data. It also has the virtue of simplicity; that is, you can accomplish what you want with a minimum of code. Simpler code means fewer opportunities for bugs.

1 Comment

I got it to work using poll from the select module. Thx. My readAllAvailableData() now looks like this: codepad.org/ArYdEc3s . The implementation isn't efficient at all, but it is fast enough for my purpose. I guess the most elegant solution would have been using Pexpect as Mark has suggested (if you can use external modules).
1

It is not correct that read() blocks till EOF - it blocks until it gets enough data that it needs - and from the other side is possible that some data is kept in the buffers (it's not flushed just because you ended print with new line).

Why not try in the child printing something like "### OVER ###\n" and then stdout.flush(), then on parent side collect til you see the OVER token, say with ''.join(i for i in iter(process.stdout.readline, '### OVER ###\n'))

1 Comment

This wouldn't help, because I want to do the opposite: Not wait until it is ready, but find out that it is not. Reading the prompt once it's there works pretty reliable. (On a side note: The program I load into the subprocess is a precompiled binary and the reason I use the interactive interface is mostly that I want to spare myself the trouble of using the C++ interface. Therefore I cannot change the output.)
-1

I don't think that readline() will block your process.

line = process.stdout.readline() 

Earlier I tried to use

for line in process.stdout: print(line) 

but that seems to hang until the process terminates.

1 Comment

Unfortunately <code>readline()</code> does block the process. Furthermore it wouldn't read the prompt, because there is no newline at the end.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.