1

I know this is a classic problem in stream processing, but I don't know how to handle it in Python. I have a file handle that is being written to by an active process. I want to consume content from that file handle on a line-by-line basis, but I don't want to deadlock waiting to read. I will keep reading until EOF or 60 seconds of looped reading, whichever comes first. Advice on how to do this would be appreciated. My pseudo code description of this problem is below.

proc = genprocess("command") found_a = False found_b = False start = time.time() while True: line = proc.readline() while line: if not found_a and grep(pattern_a, line): found_a = True print "Found A, now looking for B" elif not found_b and grep(pattern_b, line): found_b = True print "Found B, all done" break if time.time() - start > 60: break else: time.sleep(5) proc.kill() 

The problem is that this only reads one line on each interval. Instead I want the inside of the loop to iterate as many times as possible, but not to block waiting for new content to be written to the file. Once it has read as much as is available, it should sleep for 5 seconds to allow more content to accumulate.

2 Answers 2

2

If you're running on a Unix environment, you could use Python's select module to wait for data on the file handle. Also, you can use Python's fcntl module to change a file handle to non-blocking mode as described in this question.

For example, assuming your proc variable is a regular file handle that supports fileno():

file_num = proc.fileno() old_flags = fcntl.fcntl(file_num, fcntl.F_GETFL) fcntl.fcntl(file_num, fcntl.F_SETFL, old_flags | os.O_NONBLOCK) 
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks. I actually figured this out myself after some more searching. I'll add an answer to my own question of how I did it.
1

The fcntl example linked to above is OK (except that it puts the process in a busy loop polling), however I ended up using "select" to achieve more-or-less the desired functionality.

 started = False while True: if (time.time() - start > wait_for) or started: break (rlist, wlist, xlist) = select([proc.stdout], [], [], wait_interval) if len(rlist) > 0: line = rlist[0].readline() # read one line (this blocks until '\n' is read) else: # nothing available to read from proc.stdout print ".", sys.stdout.flush() time.sleep(1) continue if re.search("daemon started", line): started = True if not started: proc.kill() # don't leave the process running if it didn't start properly 

And if this is the sort of thing a user might CTRL-C, then putting the whole thing in a try/except block and looking for KeyboardInterrupt allows proc.kill() to be called instead of leaving the process running in the background.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.