Skip to main content
fix spelling
Source Link
bstpierre
  • 31.5k
  • 16
  • 74
  • 105

IncrimentalIncremental parsing of Popen's stdout is not a problem really. Just insert a pipe into a thread and have it scrub through output, looking for delimiters. Depending on your preference, it can pipe it into another pipe / file-like or put the parsed "chunks" on the "stack" in asynchronous mode. Here is an example of asynchronous "chunking" of stdout based on custom delimiter:

import cStringIO import uuid import threading import os class InputStreamChunker(threading.Thread): ''' Threaded object / code that mediates reading output from a stream, detects "separation markers" in the stream and spits out chunks of original stream, split when ends of chunk are encountered. Results are made available as a list of filled file-like objects (your choice). Results are accessible either "asynchronously" (you can poll at will for results in a non-blocking way) or "synchronously" by exposing a "subscribe and wait" system based on threading.Event flags. Usage: - instantiate this object - give our input pipe as "stdout" to other subprocess and start it: Popen(..., stdout = th.input, ...) - (optional) subscribe to data_available event - pull resulting file-like objects off .data (if you are "messing" with .data from outside of the thread, be curteous and wrap the thread-unsafe manipulations between: obj.data_unoccupied.clear() ... mess with .data obj.data_unoccupied.set() The thread will not touch obj.data for the duration and will block reading.) License: Public domain Absolutely no warranty provided ''' def __init__(self, delimiter = None, outputObjConstructor = None): ''' delimiter - the string that will be considered a delimiter for the stream outputObjConstructor - instanses of these will be attached to self.data array (intantiator_pointer, args, kw) ''' super(InputStreamChunker,self).__init__() self._data_available = threading.Event() self._data_available.clear() # parent will .wait() on this for results. self._data = [] self._data_unoccupied = threading.Event() self._data_unoccupied.set() # parent will set this to true when self.results is being changed from outside self._r, self._w = os.pipe() # takes all inputs. self.input = public pipe in. self._stop = False if not delimiter: delimiter = str(uuid.uuid1()) self._stream_delimiter = [l for l in delimiter] self._stream_roll_back_len = ( len(delimiter)-1 ) * -1 if not outputObjConstructor: self._obj = (cStringIO.StringIO, (), {}) else: self._obj = outputObjConstructor @property def data_available(self): '''returns a threading.Event instance pointer that is True (and non-blocking to .wait() ) when we attached a new IO obj to the .data array. Code consuming the array may decide to set it back to False if it's done with all chunks and wants to be blocked on .wait()''' return self._data_available @property def data_unoccupied(self): '''returns a threading.Event instance pointer that is normally True (and non-blocking to .wait() ) Set it to False with .clear() before you start non-thread-safe manipulations (changing) .data array. Set it back to True with .set() when you are done''' return self._data_unoccupied @property def data(self): '''returns a list of input chunkes (file-like objects) captured so far. This is a "stack" of sorts. Code consuming the chunks would be responsible for disposing of the file-like objects. By default, the file-like objects are instances of cStringIO''' return self._data @property def input(self): '''This is a file desciptordescriptor (not a file-like). It's the input end of our pipe which you give to other process to be used as stdout pipe for that process''' return self._w def flush(self): '''Normallly'''Normally a read on a pipe is blocking. To get things moving (make the subprocess yield the buffer, we inject our chunk delimiter into self.input This is useful when primary subprocess does not write anythionganything to our in pipe, but we need to make internal pipe reader let go of the pipe and move on with things. ''' os.write(self._w, ''.join(self._stream_delimiter)) def stop(self): self._stop = True self.flush() # reader has its teeth on the pipe. This makes it let go for for a sec. os.close(self._w) self._data_available.set() def __del__(self): try: self.stop() except: pass try: del self._w del self._r del self._data except: pass def run(self): ''' Plan: - We read into a fresh instance of IO obj until marker encountered. - When marker is detected, we attach that IO obj to "resutls""results" array and signal the calling code (through threading.Event flag) that results are available - repeat until .stop() was called on the thread. ''' marker = ['' for l in self._stream_delimiter] # '' is there on purpose tf = self._obj[0](*self._obj[1], **self._obj[2]) while not self._stop: l = os.read(self._r, 1) print('Thread talking: Ordinal of char is:%s' %ord(l)) trash_str = marker.pop(0) marker.append(l) if marker != self._stream_delimiter: tf.write(l) else: # chopping off the marker first tf.seek(self._stream_roll_back_len, 2) tf.truncate() tf.seek(0) self._data_unoccupied.wait(5) # seriously, how much time is needed to get your items off the stack? self._data.append(tf) self._data_available.set() tf = self._obj[0](*self._obj[1], **self._obj[2]) os.close(self._r) tf.close() del tf def waitforresults(ch, answers, expect): while len(answers) < expect: ch.data_available.wait(0.5); ch.data_unoccupied.clear() while ch.data: answers.append(ch.data.pop(0)) ch.data_available.clear(); ch.data_unoccupied.set() print('Main talking: %s answers received, expecting %s\n' % ( len(answers), expect) ) def test(): ''' - set up chunker - set up Popen with chunker's output stream - push some data into proc.stdin - get results - cleanup ''' import subprocess ch = InputStreamChunker('\n') ch.daemon = True ch.start() print('starting the subprocess\n') p = subprocess.Popen( ['cat'], stdin = subprocess.PIPE, stdout = ch.input, stderr = subprocess.PIPE) answers = [] i = p.stdin i.write('line1 qwer\n') # will be in results i.write('line2 qwer\n') # will be in results i.write('line3 zxcv asdf') # will be in results only after a ch.flush(), # prepended to other line or when the pipe is closed waitforresults(ch, answers, expect = 2) i.write('line4 tyui\n') # will be in results i.write('line5 hjkl\n') # will be in results i.write('line6 mnbv') # will be in results only after a ch.flush(), # prepended to other line or when the pipe is closed waitforresults(ch, answers, expect = 4) ## now we will flush the rest of input (that last line did not have a delimiter) i.close() ch.flush() waitforresults(ch, answers, expect = 5) should_be = ['line1 qwer', 'line2 qwer', 'line3 zxcv asdfline4 tyui', 'line5 hjkl', 'line6 mnbv'] assert should_be == [i.read() for i in answers] # don't forget to stop the chunker. It it closes the pipes p.terminate() ch.stop() del p, ch if __name__ == '__main__': test() 

Incrimental parsing of Popen's stdout is not a problem really. Just insert a pipe into a thread and have it scrub through output, looking for delimiters. Depending on your preference, it can pipe it into another pipe / file-like or put the parsed "chunks" on the "stack" in asynchronous mode. Here is an example of asynchronous "chunking" of stdout based on custom delimiter:

import cStringIO import uuid import threading import os class InputStreamChunker(threading.Thread): ''' Threaded object / code that mediates reading output from a stream, detects "separation markers" in the stream and spits out chunks of original stream, split when ends of chunk are encountered. Results are made available as a list of filled file-like objects (your choice). Results are accessible either "asynchronously" (you can poll at will for results in a non-blocking way) or "synchronously" by exposing a "subscribe and wait" system based on threading.Event flags. Usage: - instantiate this object - give our input pipe as "stdout" to other subprocess and start it: Popen(..., stdout = th.input, ...) - (optional) subscribe to data_available event - pull resulting file-like objects off .data (if you are "messing" with .data from outside of the thread, be curteous and wrap the thread-unsafe manipulations between: obj.data_unoccupied.clear() ... mess with .data obj.data_unoccupied.set() The thread will not touch obj.data for the duration and will block reading.) License: Public domain Absolutely no warranty provided ''' def __init__(self, delimiter = None, outputObjConstructor = None): ''' delimiter - the string that will be considered a delimiter for the stream outputObjConstructor - instanses of these will be attached to self.data array (intantiator_pointer, args, kw) ''' super(InputStreamChunker,self).__init__() self._data_available = threading.Event() self._data_available.clear() # parent will .wait() on this for results. self._data = [] self._data_unoccupied = threading.Event() self._data_unoccupied.set() # parent will set this to true when self.results is being changed from outside self._r, self._w = os.pipe() # takes all inputs. self.input = public pipe in. self._stop = False if not delimiter: delimiter = str(uuid.uuid1()) self._stream_delimiter = [l for l in delimiter] self._stream_roll_back_len = ( len(delimiter)-1 ) * -1 if not outputObjConstructor: self._obj = (cStringIO.StringIO, (), {}) else: self._obj = outputObjConstructor @property def data_available(self): '''returns a threading.Event instance pointer that is True (and non-blocking to .wait() ) when we attached a new IO obj to the .data array. Code consuming the array may decide to set it back to False if it's done with all chunks and wants to be blocked on .wait()''' return self._data_available @property def data_unoccupied(self): '''returns a threading.Event instance pointer that is normally True (and non-blocking to .wait() ) Set it to False with .clear() before you start non-thread-safe manipulations (changing) .data array. Set it back to True with .set() when you are done''' return self._data_unoccupied @property def data(self): '''returns a list of input chunkes (file-like objects) captured so far. This is a "stack" of sorts. Code consuming the chunks would be responsible for disposing of the file-like objects. By default, the file-like objects are instances of cStringIO''' return self._data @property def input(self): '''This is a file desciptor (not a file-like). It's the input end of our pipe which you give to other process to be used as stdout pipe for that process''' return self._w def flush(self): '''Normallly a read on a pipe is blocking. To get things moving (make the subprocess yield the buffer, we inject our chunk delimiter into self.input This is useful when primary subprocess does not write anythiong to our in pipe, but we need to make internal pipe reader let go of the pipe and move on with things. ''' os.write(self._w, ''.join(self._stream_delimiter)) def stop(self): self._stop = True self.flush() # reader has its teeth on the pipe. This makes it let go for for a sec. os.close(self._w) self._data_available.set() def __del__(self): try: self.stop() except: pass try: del self._w del self._r del self._data except: pass def run(self): ''' Plan: - We read into a fresh instance of IO obj until marker encountered. - When marker is detected, we attach that IO obj to "resutls" array and signal the calling code (through threading.Event flag) that results are available - repeat until .stop() was called on the thread. ''' marker = ['' for l in self._stream_delimiter] # '' is there on purpose tf = self._obj[0](*self._obj[1], **self._obj[2]) while not self._stop: l = os.read(self._r, 1) print('Thread talking: Ordinal of char is:%s' %ord(l)) trash_str = marker.pop(0) marker.append(l) if marker != self._stream_delimiter: tf.write(l) else: # chopping off the marker first tf.seek(self._stream_roll_back_len, 2) tf.truncate() tf.seek(0) self._data_unoccupied.wait(5) # seriously, how much time is needed to get your items off the stack? self._data.append(tf) self._data_available.set() tf = self._obj[0](*self._obj[1], **self._obj[2]) os.close(self._r) tf.close() del tf def waitforresults(ch, answers, expect): while len(answers) < expect: ch.data_available.wait(0.5); ch.data_unoccupied.clear() while ch.data: answers.append(ch.data.pop(0)) ch.data_available.clear(); ch.data_unoccupied.set() print('Main talking: %s answers received, expecting %s\n' % ( len(answers), expect) ) def test(): ''' - set up chunker - set up Popen with chunker's output stream - push some data into proc.stdin - get results - cleanup ''' import subprocess ch = InputStreamChunker('\n') ch.daemon = True ch.start() print('starting the subprocess\n') p = subprocess.Popen( ['cat'], stdin = subprocess.PIPE, stdout = ch.input, stderr = subprocess.PIPE) answers = [] i = p.stdin i.write('line1 qwer\n') # will be in results i.write('line2 qwer\n') # will be in results i.write('line3 zxcv asdf') # will be in results only after a ch.flush(), # prepended to other line or when the pipe is closed waitforresults(ch, answers, expect = 2) i.write('line4 tyui\n') # will be in results i.write('line5 hjkl\n') # will be in results i.write('line6 mnbv') # will be in results only after a ch.flush(), # prepended to other line or when the pipe is closed waitforresults(ch, answers, expect = 4) ## now we will flush the rest of input (that last line did not have a delimiter) i.close() ch.flush() waitforresults(ch, answers, expect = 5) should_be = ['line1 qwer', 'line2 qwer', 'line3 zxcv asdfline4 tyui', 'line5 hjkl', 'line6 mnbv'] assert should_be == [i.read() for i in answers] # don't forget to stop the chunker. It it closes the pipes p.terminate() ch.stop() del p, ch if __name__ == '__main__': test() 

Incremental parsing of Popen's stdout is not a problem really. Just insert a pipe into a thread and have it scrub through output, looking for delimiters. Depending on your preference, it can pipe it into another pipe / file-like or put the parsed "chunks" on the "stack" in asynchronous mode. Here is an example of asynchronous "chunking" of stdout based on custom delimiter:

import cStringIO import uuid import threading import os class InputStreamChunker(threading.Thread): ''' Threaded object / code that mediates reading output from a stream, detects "separation markers" in the stream and spits out chunks of original stream, split when ends of chunk are encountered. Results are made available as a list of filled file-like objects (your choice). Results are accessible either "asynchronously" (you can poll at will for results in a non-blocking way) or "synchronously" by exposing a "subscribe and wait" system based on threading.Event flags. Usage: - instantiate this object - give our input pipe as "stdout" to other subprocess and start it: Popen(..., stdout = th.input, ...) - (optional) subscribe to data_available event - pull resulting file-like objects off .data (if you are "messing" with .data from outside of the thread, be curteous and wrap the thread-unsafe manipulations between: obj.data_unoccupied.clear() ... mess with .data obj.data_unoccupied.set() The thread will not touch obj.data for the duration and will block reading.) License: Public domain Absolutely no warranty provided ''' def __init__(self, delimiter = None, outputObjConstructor = None): ''' delimiter - the string that will be considered a delimiter for the stream outputObjConstructor - instanses of these will be attached to self.data array (intantiator_pointer, args, kw) ''' super(InputStreamChunker,self).__init__() self._data_available = threading.Event() self._data_available.clear() # parent will .wait() on this for results. self._data = [] self._data_unoccupied = threading.Event() self._data_unoccupied.set() # parent will set this to true when self.results is being changed from outside self._r, self._w = os.pipe() # takes all inputs. self.input = public pipe in. self._stop = False if not delimiter: delimiter = str(uuid.uuid1()) self._stream_delimiter = [l for l in delimiter] self._stream_roll_back_len = ( len(delimiter)-1 ) * -1 if not outputObjConstructor: self._obj = (cStringIO.StringIO, (), {}) else: self._obj = outputObjConstructor @property def data_available(self): '''returns a threading.Event instance pointer that is True (and non-blocking to .wait() ) when we attached a new IO obj to the .data array. Code consuming the array may decide to set it back to False if it's done with all chunks and wants to be blocked on .wait()''' return self._data_available @property def data_unoccupied(self): '''returns a threading.Event instance pointer that is normally True (and non-blocking to .wait() ) Set it to False with .clear() before you start non-thread-safe manipulations (changing) .data array. Set it back to True with .set() when you are done''' return self._data_unoccupied @property def data(self): '''returns a list of input chunkes (file-like objects) captured so far. This is a "stack" of sorts. Code consuming the chunks would be responsible for disposing of the file-like objects. By default, the file-like objects are instances of cStringIO''' return self._data @property def input(self): '''This is a file descriptor (not a file-like). It's the input end of our pipe which you give to other process to be used as stdout pipe for that process''' return self._w def flush(self): '''Normally a read on a pipe is blocking. To get things moving (make the subprocess yield the buffer, we inject our chunk delimiter into self.input This is useful when primary subprocess does not write anything to our in pipe, but we need to make internal pipe reader let go of the pipe and move on with things. ''' os.write(self._w, ''.join(self._stream_delimiter)) def stop(self): self._stop = True self.flush() # reader has its teeth on the pipe. This makes it let go for for a sec. os.close(self._w) self._data_available.set() def __del__(self): try: self.stop() except: pass try: del self._w del self._r del self._data except: pass def run(self): ''' Plan: - We read into a fresh instance of IO obj until marker encountered. - When marker is detected, we attach that IO obj to "results" array and signal the calling code (through threading.Event flag) that results are available - repeat until .stop() was called on the thread. ''' marker = ['' for l in self._stream_delimiter] # '' is there on purpose tf = self._obj[0](*self._obj[1], **self._obj[2]) while not self._stop: l = os.read(self._r, 1) print('Thread talking: Ordinal of char is:%s' %ord(l)) trash_str = marker.pop(0) marker.append(l) if marker != self._stream_delimiter: tf.write(l) else: # chopping off the marker first tf.seek(self._stream_roll_back_len, 2) tf.truncate() tf.seek(0) self._data_unoccupied.wait(5) # seriously, how much time is needed to get your items off the stack? self._data.append(tf) self._data_available.set() tf = self._obj[0](*self._obj[1], **self._obj[2]) os.close(self._r) tf.close() del tf def waitforresults(ch, answers, expect): while len(answers) < expect: ch.data_available.wait(0.5); ch.data_unoccupied.clear() while ch.data: answers.append(ch.data.pop(0)) ch.data_available.clear(); ch.data_unoccupied.set() print('Main talking: %s answers received, expecting %s\n' % ( len(answers), expect) ) def test(): ''' - set up chunker - set up Popen with chunker's output stream - push some data into proc.stdin - get results - cleanup ''' import subprocess ch = InputStreamChunker('\n') ch.daemon = True ch.start() print('starting the subprocess\n') p = subprocess.Popen( ['cat'], stdin = subprocess.PIPE, stdout = ch.input, stderr = subprocess.PIPE) answers = [] i = p.stdin i.write('line1 qwer\n') # will be in results i.write('line2 qwer\n') # will be in results i.write('line3 zxcv asdf') # will be in results only after a ch.flush(), # prepended to other line or when the pipe is closed waitforresults(ch, answers, expect = 2) i.write('line4 tyui\n') # will be in results i.write('line5 hjkl\n') # will be in results i.write('line6 mnbv') # will be in results only after a ch.flush(), # prepended to other line or when the pipe is closed waitforresults(ch, answers, expect = 4) ## now we will flush the rest of input (that last line did not have a delimiter) i.close() ch.flush() waitforresults(ch, answers, expect = 5) should_be = ['line1 qwer', 'line2 qwer', 'line3 zxcv asdfline4 tyui', 'line5 hjkl', 'line6 mnbv'] assert should_be == [i.read() for i in answers] # don't forget to stop the chunker. It it closes the pipes p.terminate() ch.stop() del p, ch if __name__ == '__main__': test() 
Bounty Awarded with 50 reputation awarded by Mike Pennington
added 154 characters in body
Source Link
ddotsenko
  • 5k
  • 1
  • 27
  • 25
import cStringIO import uuid import threading import os class InputStreamChunker(threading.Thread): ''' Threaded object / code that mediates reading output from a stream, detects "separation markers" in the stream and spits out chunks  of original stream, split when ends of chunk are encountered.   Results are made available as a list of filled file-like objects  (your choice). Results are accessible either "asynchronously"  (you can poll at will for results in a non-blocking way) or  "synchronously" by exposing a "subscribe and wait" system based on threading.Event flags.   Usage: - instantiate this object - give our input pipe as "stdout" to other subprocess and start it:  Popen(..., stdout = th.input, ...) - (optional) subscribe to data_available event - pull resulting file-like objects off .data   (if you are "messing" with .data from outside of the thread,   be curteous and wrap the thread-unsafe manipulations between:   obj.data_unoccupied.clear()   ... mess with .data   obj.data_unoccupied.set()   The thread will not touch obj.data for the duration and will   block reading.)   License: Public domain Absolutely no warranty provided  www.accentsolution.com/ddotsenko  ''' def __init__(self, delimiter = None, outputObjConstructor = None): ''' markerdelimiter - the string that will be considered a delimiter for the stream outputObjConstructor - instanses of these will be attached to self.resultsdata array   (intantiator_pointer, args, kw)  ''' super(InputStreamChunker,self).__init__()   self._data_available = threading.Event() self._data_available.clear() # parent will .wait() on this for results. self._data = [] self._data_unoccupied = threading.Event() self._data_unoccupied.set() # parent will set this to true when self.results is being changed from outside self._r, self._w = os.pipe() # takes all inputs. self.input = public pipe in. self._stop = False if not delimiter: delimiter = str(uuid.uuid1()) self._stream_delimiter = [l for l in delimiter] self._stream_roll_back_len = ( len(delimiter)-1 ) * -1 if not outputObjConstructor:  self._obj = (cStringIO.StringIO, (), {}) else: self._obj = outputObjConstructor @property def data_available(self): '''returns a threading.Event instance pointer that is True (and non-blocking to .wait() ) when we attached a  new IO obj to the .data array.  Code consuming the array may decide to set it back to False if it's done with all chunks and wants to be blocked on .wait()''' return self._data_available @property def data_unoccupied(self): '''returns a threading.Event instance pointer that is normally True (and non-blocking to .wait() ) Set it to False with .clear() before you start non-thread-safe manipulations (changing) .data  array. Set it back to True with .set() when you are done''' return self._data_unoccupied @property def data(self): '''returns a list of input chunkes (file-like objects) captured so far. This is a "stack" of sorts. Code consuming the chunks would be responsible for disposing of the file-like objects. By default, the file-like objects are instances of cStringIO''' return self._data @property def input(self): '''This is a file desciptor (not a file-like). It's the input end of our pipe which you give to other process  to be used as stdout pipe for that process''' return self._w def flush(self): '''Normallly a read on a pipe is blocking.  To get things moving (make the subprocess yield the buffer, we inject our chunk delimiter into self.input   This is useful when primary subprocess does not write anythiong  to our in pipe, but we need to make internal pipe reader let go of the pipe and move on with things. ''' os.write(self._w, ''.join(self._stream_delimiter))  def stop(self): self._stop = True self.flush() # reader has its teeth on the pipe. This makes it let go for for a sec. os.close(self._w) self._data_available.set() def __del__(self): try: self.stop() except: pass try: del self._w del self._r del self.results_data except: pass def run(self): ''' Plan: - We read into a fresh instance of IO obj until marker encountered. - When marker is detected, we attach that IO obj to "resutls" array   and signal the calling code (through threading.Event flag) that    results are available - repeat until .stop() was called on the thread. ''' marker = ['' for l in self._stream_delimiter] # '' is there on purpose tf = self._obj[0](*self._obj[1], **self._obj[2]) while not self._stop: l = os.read(self._r, 1) print('Thread talking: Ordinal of char is:%s' %ord(l)) trash_str = marker.pop(0) marker.append(l) if marker != self._stream_delimiter: tf.write(l) else: # chopping off the marker first tf.seek(self._stream_roll_back_len, 2) tf.truncate() tf.seek(0) self._data_unoccupied.wait(5) # seriously, how much time is needed to get your items off the stack? self._data.append(tf) self._data_available.set() tf = self._obj[0](*self._obj[1], **self._obj[2]) os.close(self._r) tf.close() del tf def testthwaitforresults(ch, answers, expect): while len(answers) < expect: ch.data_available.wait(0.5); ch.data_unoccupied.clear() while ch.data: answers.append(ch.data.pop(0)) ch.data_available.clear(); ch.data_unoccupied.set() print('Main talking: %s answers received, expecting %s\n' % ( len(answers), expect) ) def test(): ''' - set up chunker - set up Popen with chunker's output stream  - push process.wait() into a thread  - push some data into proc.stdin - close proc.stdin - see if proc isget alive.results - see results of chunkercleanup '''   import subprocess, tempfile ch = InputStreamChunker('\n') ch.daemon = True ch.start()  p = subprocess.Popen(['grep','asdf'],stdin = subprocess.PIPE ,stdout = ch.input,stderr = subprocess.PIPE)  iprint('starting =the subprocess\n')  p = subprocess.Popen( ['cat'], stdin = subprocess.PIPE, stdout = ch.input, stderr = subprocess.PIPE) i.write('line1 asdf\n') # will be displayed i.write('line2 asdf') # will be appended to next line i.write('line3 qwer\n') # will be displayed only because prior line did not have a delimiter in it. i.write('line4 qwer\n') # will NOT be displayed i.write('line5 zxcv asdf') # will be displayed just because it's the last one in the pipe, even withoutanswers a= delimeter.[] ##i until= now,p.stdin  because p is waitingi.write('line1 forqwer\n') stdin's# EOFwill be in results i.write(needs'line2 toqwer\n') # will be closed)in results ##i.write('line3 therezxcv asdf') # will be noin results inonly after a ch.dataflush(), assert  # prepended to other line or when the pipe is closed waitforresults(ch.data, ==answers, []expect = 2) ##i.write('line4 thistyui\n') # will flushbe outin theresults  input i.write('line5 hjkl\n') # will be in results i.closewrite('line6 mnbv') # will be in results only after a ch.flush(), # prepended to other line or when the pipe is closed waitforresults(ch, answers, expect = 4) ## because the last write does not end with delimiter, you would expect it to be now ##we stillwill inflush the output chunker's buffer... wrong. While you cannot manually send EOF to ## a python pipe, pythonrest CANof doinput (that andlast itline did immedately after being donenot readinghave froma idelimiter) while len(chi.dataclose() < 3:   print('Main talking: Data is %s long, should be 3\n' % len(ch.data)flush()   ch.data_available.waitwaitforresults(0.2) ch, answers, expect = ch.data_available.clear(5) ##should_be by= this['line1 timeqwer', because the stdin was closed and we have all'line2 expectedqwer',  output from the p  ##'line3 wezxcv expectasdfline4 ptyui', to'line5 behjkl', done'line6 running.mnbv']   print 'return code isassert %s'should_be %== p[i.returncode #read() assertfor p.returncodei ==in 0answers] ## now we will flush the rest# ofdon't inputforget andto stop the chunker ## this will flush out an. emptyIt stringit intocloses the results.  pipes p.terminate() ch.stop() print 'process return code is %s' % p.returncode while len(ch.data) < 4: print('Main talking: Data is %s long, should be 4\n' % len(ch.data)) ch.data_available.wait(1) assert len(ch.data) == 4 a = ch.data del ch, p should_be = ['line1 asdf','line2 asdfline3 qwer','line5 zxcv asdf',''] assert should_be == [i.read() for i in a]ch if __name__ == '__main__': testth() 

It works - allows you to detect custom-defined "markers" in the output of subprocess in "real-time"

You problem is not really reading the live / delayed output. You problem will actually be pushing delayed input into subprocess.Popen's object. It does NOT allow interrupted streams. Let me explain:

p = subprocess.Popentest(..., stdin = a , ) 

that "a" can be: None, subprocess's PIPE, your own pipe, your own file-like. In cases of "pipe", p will not process the input UNTIL YOU CLOSE THE PIPE. in case of "file-like" you can only feed data into (and rewind - a.seek(0)) it BEFORE you initiate p

In both cases (i tried on Windows and linux, with cPython 2.6) it seems you only have ONE shot at giving input to p. Once you are done writing (i.e. closedEdit: removed the pipe, or p read your pre-populated and rewound a file-like), there is no way you can feed more input to p with subprocess module.

If you need to "stream" inputerroneous verbiage about "writing to proc's stdin is a process at irregular intervals you need to look elsewhere.one-time-thing"

import cStringIO import uuid import threading import os class InputStreamChunker(threading.Thread): ''' Threaded object / code that mediates reading output from a stream, detects "separation markers" in the stream and spits out chunks  of original stream, split when ends of chunk are encountered.   Results are made available as a list of filled file-like objects  (your choice). Results are accessible either "asynchronously"  (you can poll at will for results in a non-blocking way) or  "synchronously" by exposing a "subscribe and wait" system based on threading.Event flags.   Usage: - instantiate this object - give our input pipe as "stdout" to other subprocess and start it:  Popen(..., stdout = th.input, ...) - (optional) subscribe to data_available event - pull resulting file-like objects off .data (if you are "messing" with .data from outside of the thread, be curteous and wrap the thread-unsafe manipulations between: obj.data_unoccupied.clear() ... mess with .data obj.data_unoccupied.set() The thread will not touch obj.data for the duration and will block reading.)   License: Public domain Absolutely no warranty provided  www.accentsolution.com/ddotsenko  ''' def __init__(self, delimiter = None, outputObjConstructor = None): ''' marker - the string that will be considered a delimiter for the stream outputObjConstructor - instanses of these will be attached to self.results array (intantiator_pointer, args, kw)  ''' super(InputStreamChunker,self).__init__()   self._data_available = threading.Event() self._data_available.clear() # parent will .wait() on this for results. self._data = [] self._data_unoccupied = threading.Event() self._data_unoccupied.set() # parent will set this to true when self.results is being changed from outside self._r, self._w = os.pipe() # takes all inputs. self.input = public pipe in. self._stop = False if not delimiter: delimiter = str(uuid.uuid1()) self._stream_delimiter = [l for l in delimiter] self._stream_roll_back_len = ( len(delimiter)-1 ) * -1 if not outputObjConstructor:  self._obj = (cStringIO.StringIO, (), {}) else: self._obj = outputObjConstructor @property def data_available(self): '''returns a threading.Event instance pointer that is True (and non-blocking to .wait() ) when we attached a  new IO obj to the .data array.  Code consuming the array may decide to set it back to False if it's done with all chunks and wants to be blocked on .wait()''' return self._data_available @property def data_unoccupied(self): '''returns a threading.Event instance pointer that is normally True (and non-blocking to .wait() ) Set it to False with .clear() before you start non-thread-safe manipulations (changing) .data  array. Set it back to True with .set() when you are done''' return self._data_unoccupied @property def data(self): '''returns a list of input chunkes (file-like objects) captured so far. This is a "stack" of sorts. Code consuming the chunks would be responsible for disposing of the file-like objects. By default, the file-like objects are instances of cStringIO''' return self._data @property def input(self): '''This is a file desciptor (not a file-like). It's the input end of our pipe which you give to other process  to be used as stdout pipe for that process''' return self._w def flush(self): '''Normallly a read on a pipe is blocking.  To get things moving (make the subprocess yield the buffer, we inject our chunk delimiter into self.input   This is useful when primary subprocess does not write anythiong  to our in pipe, but we need to make internal pipe reader let go of the pipe and move on with things. ''' os.write(self._w, ''.join(self._stream_delimiter))  def stop(self): self._stop = True self.flush() # reader has its teeth on the pipe. This makes it let go for for a sec. os.close(self._w) self._data_available.set() def __del__(self): try: self.stop() except: pass try: del self._w del self._r del self.results except: pass def run(self): ''' Plan: - We read into a fresh instance of IO obj until marker encountered. - When marker is detected, we attach that IO obj to "resutls" array and signal the calling code (through threading.Event flag) that  results are available - repeat until .stop() was called on the thread. ''' marker = ['' for l in self._stream_delimiter] # '' is there on purpose tf = self._obj[0](*self._obj[1], **self._obj[2]) while not self._stop: l = os.read(self._r, 1) print('Thread talking: Ordinal of char is:%s' %ord(l)) trash_str = marker.pop(0) marker.append(l) if marker != self._stream_delimiter: tf.write(l) else: # chopping off the marker first tf.seek(self._stream_roll_back_len, 2) tf.truncate() tf.seek(0) self._data_unoccupied.wait(5) self._data.append(tf) self._data_available.set() tf = self._obj[0](*self._obj[1], **self._obj[2]) os.close(self._r) tf.close() del tf def testth(): ''' - set up chunker - set up Popen with chunker's output stream  - push process.wait() into a thread  - push some data into proc.stdin - close proc.stdin - see if proc is alive. - see results of chunker '''   import subprocess, tempfile ch = InputStreamChunker('\n') ch.daemon = True ch.start()  p = subprocess.Popen(['grep','asdf'],stdin = subprocess.PIPE ,stdout = ch.input,stderr = subprocess.PIPE)  i = p.stdin i.write('line1 asdf\n') # will be displayed i.write('line2 asdf') # will be appended to next line i.write('line3 qwer\n') # will be displayed only because prior line did not have a delimiter in it. i.write('line4 qwer\n') # will NOT be displayed i.write('line5 zxcv asdf') # will be displayed just because it's the last one in the pipe, even without a delimeter. ## until now, because p is waiting for stdin's EOF (needs to be closed) ## there will be no results in ch.data assert ch.data == [] ## this will flush out the input. i.close() ## because the last write does not end with delimiter, you would expect it to be  ## still in the output chunker's buffer... wrong. While you cannot manually send EOF to ## a python pipe, python CAN do that and it did immedately after being done reading from i while len(ch.data) < 3:   print('Main talking: Data is %s long, should be 3\n' % len(ch.data))   ch.data_available.wait(0.2)  ch.data_available.clear() ## by this time, because the stdin was closed and we have all expected output from the p  ## we expect p to be done running.   print 'return code is %s' % p.returncode # assert p.returncode == 0 ## now we will flush the rest of input and stop the chunker ## this will flush out an empty string into the results.  p.terminate() ch.stop() print 'process return code is %s' % p.returncode while len(ch.data) < 4: print('Main talking: Data is %s long, should be 4\n' % len(ch.data)) ch.data_available.wait(1) assert len(ch.data) == 4 a = ch.data del ch, p should_be = ['line1 asdf','line2 asdfline3 qwer','line5 zxcv asdf',''] assert should_be == [i.read() for i in a] if __name__ == '__main__': testth() 

It works - allows you to detect custom-defined "markers" in the output of subprocess in "real-time"

You problem is not really reading the live / delayed output. You problem will actually be pushing delayed input into subprocess.Popen's object. It does NOT allow interrupted streams. Let me explain:

p = subprocess.Popen(..., stdin = a , ) 

that "a" can be: None, subprocess's PIPE, your own pipe, your own file-like. In cases of "pipe", p will not process the input UNTIL YOU CLOSE THE PIPE. in case of "file-like" you can only feed data into (and rewind - a.seek(0)) it BEFORE you initiate p

In both cases (i tried on Windows and linux, with cPython 2.6) it seems you only have ONE shot at giving input to p. Once you are done writing (i.e. closed the pipe, or p read your pre-populated and rewound a file-like), there is no way you can feed more input to p with subprocess module.

If you need to "stream" input to a process at irregular intervals you need to look elsewhere.

import cStringIO import uuid import threading import os class InputStreamChunker(threading.Thread): ''' Threaded object / code that mediates reading output from a stream, detects "separation markers" in the stream and spits out chunks of original stream, split when ends of chunk are encountered. Results are made available as a list of filled file-like objects (your choice). Results are accessible either "asynchronously" (you can poll at will for results in a non-blocking way) or "synchronously" by exposing a "subscribe and wait" system based on threading.Event flags. Usage: - instantiate this object - give our input pipe as "stdout" to other subprocess and start it: Popen(..., stdout = th.input, ...) - (optional) subscribe to data_available event - pull resulting file-like objects off .data   (if you are "messing" with .data from outside of the thread,   be curteous and wrap the thread-unsafe manipulations between:   obj.data_unoccupied.clear()   ... mess with .data   obj.data_unoccupied.set()   The thread will not touch obj.data for the duration and will   block reading.) License: Public domain Absolutely no warranty provided ''' def __init__(self, delimiter = None, outputObjConstructor = None): ''' delimiter - the string that will be considered a delimiter for the stream outputObjConstructor - instanses of these will be attached to self.data array   (intantiator_pointer, args, kw) ''' super(InputStreamChunker,self).__init__() self._data_available = threading.Event() self._data_available.clear() # parent will .wait() on this for results. self._data = [] self._data_unoccupied = threading.Event() self._data_unoccupied.set() # parent will set this to true when self.results is being changed from outside self._r, self._w = os.pipe() # takes all inputs. self.input = public pipe in. self._stop = False if not delimiter: delimiter = str(uuid.uuid1()) self._stream_delimiter = [l for l in delimiter] self._stream_roll_back_len = ( len(delimiter)-1 ) * -1 if not outputObjConstructor: self._obj = (cStringIO.StringIO, (), {}) else: self._obj = outputObjConstructor @property def data_available(self): '''returns a threading.Event instance pointer that is True (and non-blocking to .wait() ) when we attached a new IO obj to the .data array. Code consuming the array may decide to set it back to False if it's done with all chunks and wants to be blocked on .wait()''' return self._data_available @property def data_unoccupied(self): '''returns a threading.Event instance pointer that is normally True (and non-blocking to .wait() ) Set it to False with .clear() before you start non-thread-safe manipulations (changing) .data array. Set it back to True with .set() when you are done''' return self._data_unoccupied @property def data(self): '''returns a list of input chunkes (file-like objects) captured so far. This is a "stack" of sorts. Code consuming the chunks would be responsible for disposing of the file-like objects. By default, the file-like objects are instances of cStringIO''' return self._data @property def input(self): '''This is a file desciptor (not a file-like). It's the input end of our pipe which you give to other process to be used as stdout pipe for that process''' return self._w def flush(self): '''Normallly a read on a pipe is blocking. To get things moving (make the subprocess yield the buffer, we inject our chunk delimiter into self.input This is useful when primary subprocess does not write anythiong to our in pipe, but we need to make internal pipe reader let go of the pipe and move on with things. ''' os.write(self._w, ''.join(self._stream_delimiter)) def stop(self): self._stop = True self.flush() # reader has its teeth on the pipe. This makes it let go for for a sec. os.close(self._w) self._data_available.set() def __del__(self): try: self.stop() except: pass try: del self._w del self._r del self._data except: pass def run(self): ''' Plan: - We read into a fresh instance of IO obj until marker encountered. - When marker is detected, we attach that IO obj to "resutls" array   and signal the calling code (through threading.Event flag) that   results are available - repeat until .stop() was called on the thread. ''' marker = ['' for l in self._stream_delimiter] # '' is there on purpose tf = self._obj[0](*self._obj[1], **self._obj[2]) while not self._stop: l = os.read(self._r, 1) print('Thread talking: Ordinal of char is:%s' %ord(l)) trash_str = marker.pop(0) marker.append(l) if marker != self._stream_delimiter: tf.write(l) else: # chopping off the marker first tf.seek(self._stream_roll_back_len, 2) tf.truncate() tf.seek(0) self._data_unoccupied.wait(5) # seriously, how much time is needed to get your items off the stack? self._data.append(tf) self._data_available.set() tf = self._obj[0](*self._obj[1], **self._obj[2]) os.close(self._r) tf.close() del tf def waitforresults(ch, answers, expect): while len(answers) < expect: ch.data_available.wait(0.5); ch.data_unoccupied.clear() while ch.data: answers.append(ch.data.pop(0)) ch.data_available.clear(); ch.data_unoccupied.set() print('Main talking: %s answers received, expecting %s\n' % ( len(answers), expect) ) def test(): ''' - set up chunker - set up Popen with chunker's output stream - push some data into proc.stdin - get results - cleanup ''' import subprocess ch = InputStreamChunker('\n') ch.daemon = True ch.start() print('starting the subprocess\n')  p = subprocess.Popen( ['cat'], stdin = subprocess.PIPE, stdout = ch.input, stderr = subprocess.PIPE) answers = [] i = p.stdin  i.write('line1 qwer\n') # will be in results i.write('line2 qwer\n') # will be in results i.write('line3 zxcv asdf') # will be in results only after a ch.flush(),   # prepended to other line or when the pipe is closed waitforresults(ch, answers, expect = 2) i.write('line4 tyui\n') # will be in results   i.write('line5 hjkl\n') # will be in results i.write('line6 mnbv') # will be in results only after a ch.flush(), # prepended to other line or when the pipe is closed waitforresults(ch, answers, expect = 4) ## now we will flush the rest of input (that last line did not have a delimiter) i.close() ch.flush() waitforresults(ch, answers, expect = 5) should_be = ['line1 qwer', 'line2 qwer',  'line3 zxcv asdfline4 tyui', 'line5 hjkl', 'line6 mnbv'] assert should_be == [i.read() for i in answers] # don't forget to stop the chunker. It it closes the pipes p.terminate() ch.stop() del p, ch if __name__ == '__main__': test() 

Edit: removed the erroneous verbiage about "writing to proc's stdin is a one-time-thing"

Source Link
ddotsenko
  • 5k
  • 1
  • 27
  • 25

Incrimental parsing of Popen's stdout is not a problem really. Just insert a pipe into a thread and have it scrub through output, looking for delimiters. Depending on your preference, it can pipe it into another pipe / file-like or put the parsed "chunks" on the "stack" in asynchronous mode. Here is an example of asynchronous "chunking" of stdout based on custom delimiter:

import cStringIO import uuid import threading import os class InputStreamChunker(threading.Thread): ''' Threaded object / code that mediates reading output from a stream, detects "separation markers" in the stream and spits out chunks of original stream, split when ends of chunk are encountered. Results are made available as a list of filled file-like objects (your choice). Results are accessible either "asynchronously" (you can poll at will for results in a non-blocking way) or "synchronously" by exposing a "subscribe and wait" system based on threading.Event flags. Usage: - instantiate this object - give our input pipe as "stdout" to other subprocess and start it: Popen(..., stdout = th.input, ...) - (optional) subscribe to data_available event - pull resulting file-like objects off .data (if you are "messing" with .data from outside of the thread, be curteous and wrap the thread-unsafe manipulations between: obj.data_unoccupied.clear() ... mess with .data obj.data_unoccupied.set() The thread will not touch obj.data for the duration and will block reading.) License: Public domain Absolutely no warranty provided www.accentsolution.com/ddotsenko ''' def __init__(self, delimiter = None, outputObjConstructor = None): ''' marker - the string that will be considered a delimiter for the stream outputObjConstructor - instanses of these will be attached to self.results array (intantiator_pointer, args, kw) ''' super(InputStreamChunker,self).__init__() self._data_available = threading.Event() self._data_available.clear() # parent will .wait() on this for results. self._data = [] self._data_unoccupied = threading.Event() self._data_unoccupied.set() # parent will set this to true when self.results is being changed from outside self._r, self._w = os.pipe() # takes all inputs. self.input = public pipe in. self._stop = False if not delimiter: delimiter = str(uuid.uuid1()) self._stream_delimiter = [l for l in delimiter] self._stream_roll_back_len = ( len(delimiter)-1 ) * -1 if not outputObjConstructor: self._obj = (cStringIO.StringIO, (), {}) else: self._obj = outputObjConstructor @property def data_available(self): '''returns a threading.Event instance pointer that is True (and non-blocking to .wait() ) when we attached a new IO obj to the .data array. Code consuming the array may decide to set it back to False if it's done with all chunks and wants to be blocked on .wait()''' return self._data_available @property def data_unoccupied(self): '''returns a threading.Event instance pointer that is normally True (and non-blocking to .wait() ) Set it to False with .clear() before you start non-thread-safe manipulations (changing) .data array. Set it back to True with .set() when you are done''' return self._data_unoccupied @property def data(self): '''returns a list of input chunkes (file-like objects) captured so far. This is a "stack" of sorts. Code consuming the chunks would be responsible for disposing of the file-like objects. By default, the file-like objects are instances of cStringIO''' return self._data @property def input(self): '''This is a file desciptor (not a file-like). It's the input end of our pipe which you give to other process to be used as stdout pipe for that process''' return self._w def flush(self): '''Normallly a read on a pipe is blocking. To get things moving (make the subprocess yield the buffer, we inject our chunk delimiter into self.input This is useful when primary subprocess does not write anythiong to our in pipe, but we need to make internal pipe reader let go of the pipe and move on with things. ''' os.write(self._w, ''.join(self._stream_delimiter)) def stop(self): self._stop = True self.flush() # reader has its teeth on the pipe. This makes it let go for for a sec. os.close(self._w) self._data_available.set() def __del__(self): try: self.stop() except: pass try: del self._w del self._r del self.results except: pass def run(self): ''' Plan: - We read into a fresh instance of IO obj until marker encountered. - When marker is detected, we attach that IO obj to "resutls" array and signal the calling code (through threading.Event flag) that results are available - repeat until .stop() was called on the thread. ''' marker = ['' for l in self._stream_delimiter] # '' is there on purpose tf = self._obj[0](*self._obj[1], **self._obj[2]) while not self._stop: l = os.read(self._r, 1) print('Thread talking: Ordinal of char is:%s' %ord(l)) trash_str = marker.pop(0) marker.append(l) if marker != self._stream_delimiter: tf.write(l) else: # chopping off the marker first tf.seek(self._stream_roll_back_len, 2) tf.truncate() tf.seek(0) self._data_unoccupied.wait(5) self._data.append(tf) self._data_available.set() tf = self._obj[0](*self._obj[1], **self._obj[2]) os.close(self._r) tf.close() del tf def testth(): ''' - set up chunker - set up Popen with chunker's output stream - push process.wait() into a thread - push some data into proc.stdin - close proc.stdin - see if proc is alive. - see results of chunker ''' import subprocess, tempfile ch = InputStreamChunker('\n') ch.daemon = True ch.start() p = subprocess.Popen(['grep','asdf'],stdin = subprocess.PIPE ,stdout = ch.input,stderr = subprocess.PIPE) i = p.stdin i.write('line1 asdf\n') # will be displayed i.write('line2 asdf') # will be appended to next line i.write('line3 qwer\n') # will be displayed only because prior line did not have a delimiter in it. i.write('line4 qwer\n') # will NOT be displayed i.write('line5 zxcv asdf') # will be displayed just because it's the last one in the pipe, even without a delimeter. ## until now, because p is waiting for stdin's EOF (needs to be closed) ## there will be no results in ch.data assert ch.data == [] ## this will flush out the input. i.close() ## because the last write does not end with delimiter, you would expect it to be ## still in the output chunker's buffer... wrong. While you cannot manually send EOF to ## a python pipe, python CAN do that and it did immedately after being done reading from i while len(ch.data) < 3: print('Main talking: Data is %s long, should be 3\n' % len(ch.data)) ch.data_available.wait(0.2) ch.data_available.clear() ## by this time, because the stdin was closed and we have all expected output from the p ## we expect p to be done running. print 'return code is %s' % p.returncode # assert p.returncode == 0 ## now we will flush the rest of input and stop the chunker ## this will flush out an empty string into the results. p.terminate() ch.stop() print 'process return code is %s' % p.returncode while len(ch.data) < 4: print('Main talking: Data is %s long, should be 4\n' % len(ch.data)) ch.data_available.wait(1) assert len(ch.data) == 4 a = ch.data del ch, p should_be = ['line1 asdf','line2 asdfline3 qwer','line5 zxcv asdf',''] assert should_be == [i.read() for i in a] if __name__ == '__main__': testth() 

It works - allows you to detect custom-defined "markers" in the output of subprocess in "real-time"

You problem is not really reading the live / delayed output. You problem will actually be pushing delayed input into subprocess.Popen's object. It does NOT allow interrupted streams. Let me explain:

p = subprocess.Popen(..., stdin = a , ) 

that "a" can be: None, subprocess's PIPE, your own pipe, your own file-like. In cases of "pipe", p will not process the input UNTIL YOU CLOSE THE PIPE. in case of "file-like" you can only feed data into (and rewind - a.seek(0)) it BEFORE you initiate p

In both cases (i tried on Windows and linux, with cPython 2.6) it seems you only have ONE shot at giving input to p. Once you are done writing (i.e. closed the pipe, or p read your pre-populated and rewound a file-like), there is no way you can feed more input to p with subprocess module.

If you need to "stream" input to a process at irregular intervals you need to look elsewhere.