21

I'm working with data from spinn3r, which consists of multiple different protobuf messages serialized into a byte stream:

http://code.google.com/p/spinn3r-client/wiki/Protostream

"A protostream is a stream of protocol buffer messages, encoded on the wire as length prefixed varints according to the Google protocol buffer specification. The stream has three parts: a header, the payload, and a tail marker."

This seems like a pretty standard use case for protobufs. In fact, protobuf core distribution provides CodedInputStream for both C++ and Java. But, it appears that protobuf does not provide such a tool for python -- the 'internal' tools are not setup for this kind of external use:

https://groups.google.com/forum/?fromgroups#!topic/protobuf/xgmUqXVsK-o

So... before I go and cobble together a python varint parser and tools for parsing a stream of different message types: does anyone know of any tools for this?

Why is it missing from protobuf? (Or am I just failing to find it?)

This seems like a big gap for protobuf, especially when compared to thrift's equivalent tools for both 'transport' and 'protocol'. Am I viewing that correctly?

1

3 Answers 3

16

It looks like the code in the other answer is potentially lifted from here. Check the licence before using this file but I managed to get it to read varint32s using code such as this:

import sys import myprotocol_pb2 as proto import varint # (this is the varint.py file) data = open("filename.bin", "rb").read() # read file as string decoder = varint.decodeVarint32 # get a varint32 decoder # others are available in varint.py next_pos, pos = 0, 0 while pos < len(data): msg = proto.Msg() # your message type next_pos, pos = decoder(data, pos) msg.ParseFromString(data[pos:pos + next_pos]) # use parsed message pos += next_pos print "done!" 

This is very simple code designed to load messages of a single type delimited by varint32s which describe the next message's size.


Update: It may also be possible to include this file directly from the protobuf library by using:

from google.protobuf.internal.decoder import _DecodeVarint32 
Sign up to request clarification or add additional context in comments.

5 Comments

For newer stuff, I get the decoder via from google.protobuf.internal.decoder import _DecodeVarint32
I've had to do this a number of times recently. I wasn't able to figure out the msg.type == proto.Msg.END condition, but simply doing while pos < len(data): works great for me.
Of course, makes sense, answer updated. Thanks @Moodragonx
just curious, this looks to be creating a new object of type Msg in each loop, what was Google's rationale for this? Is this overhead expensive?
@Tommy To clarify this isn't google code, this is my code as an example of how one might solve this problem in Python. I am not a Python expert but even if you clear the Message object to reuse it, its binary data will still have to be copied from the stream anyway. If you want a no-copy solution perhaps Python isn't the way to go; I primarily use protobufs in C++.
8

I've implemented a small python package to serialize multiple protobuf messages into a stream and deserialize them from a stream. You can install it by pip:

pip install pystream-protobuf 

Here's a sample code writing two lists of protobuf messages in to a file:

import stream with stream.open("test.gam", "wb") as ostream: ostream.write(*objects_list) ostream.write(*another_objects_list) 

and then reading the same messages (e.g. Alignment messages defined in vg_pb2.py) from the stream:

import stream import vg_pb2 alns_list = [] with stream.open("test.gam", "rb") as istream: for data in istream: aln = vg_pb2.Alignment() aln.ParseFromString(data) alns_list.append(aln) 

1 Comment

This package offers builtin gzip compression.
-3

This is simple enough that I can see why maybe nobody has bothered to make a reusable tool:

''' Parses multiple protobuf messages from a stream of spinn3r data ''' import sys sys.path.append('python_proto/src') import spinn3rApi_pb2 import protoStream_pb2 data = open('8mny44bs6tYqfnofg0ELPg.protostream').read() def _VarintDecoder(mask): '''Like _VarintDecoder() but decodes signed values.''' local_ord = ord def DecodeVarint(buffer, pos): result = 0 shift = 0 while 1: b = local_ord(buffer[pos]) result |= ((b & 0x7f) << shift) pos += 1 if not (b & 0x80): if result > 0x7fffffffffffffff: result -= (1 << 64) result |= ~mask else: result &= mask return (result, pos) shift += 7 if shift >= 64: ## need to create (and also catch) this exception class... raise _DecodeError('Too many bytes when decoding varint.') return DecodeVarint ## get a 64bit varint decoder decoder = _VarintDecoder((1<<64) - 1) ## get the three types of protobuf messages we expect to see header = protoStream_pb2.ProtoStreamHeader() delimiter = protoStream_pb2.ProtoStreamDelimiter() entry = spinn3rApi_pb2.Entry() ## get the header pos = 0 next_pos, pos = decoder(data, pos) header.ParseFromString(data[pos:pos + next_pos]) ## should check its contents while 1: pos += next_pos next_pos, pos = decoder(data, pos) delimiter.ParseFromString(data[pos:pos + next_pos]) if delimiter.delimiter_type == delimiter.END: break pos += next_pos next_pos, pos = decoder(data, pos) entry.ParseFromString(data[pos:pos + next_pos]) print entry 

1 Comment

The line in DecodeVarint where the return is done: return (result, pos) is indented one too far. Edited to correct.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.