-1

I have a Java inputstream, that I skip 2 bytes every n bytes. Now the output of that are bytes that are clean after stripping the 2 delimiters every n bytes.

This output is a series of bytes where the first 4 bytes represent a length, so I need to get these calculate the length int and extract length + additional length bytes and write them to a file.

I use buffer to strip the 2 bytes but am not sure how to extract the length and length bytes. Basically, I need to gather/accumulate bytes for length and the additional length bytes that represent the message. Any help is appreciated.

Observable<Integer> byteObservable = Observable.create(emitter -> { try { while (true) { int b = fis.read(); if (b == -1) { // No more bytes to read emitter.onComplete(); } else { emitter.onNext(b); } } } catch (IOException e) { emitter.onError(e); } }); byteObservable .buffer(100, 102) // Buffer 100, skip 101/102 

As an example consider the stream to be

05AB@@CDE0@@7123@@4567@@10AB@@CDEF@@GHIJ 

Stripped of @@ at every 4 bytes

05ABCDE07123456710ABCDEFGHIJ 

This is made of sub messages

05 ABCDE 07 1234567 10 ABCDEFGHIJ 

Take each of those messages and write them to a file.

0

1 Answer 1

0

Here's one way to parse the messages using your simple example. These are my test results.

 5 ABCDE 7 1234567 10 ABCDEFGHIJ 

After discarding the skip bytes, we parse the input stream character by character. Either we are processing a length byte or a message byte. The int length field indicates whether we are processing a length byte or a message byte.

When the length is zero, we are processing a length byte. When we've processed two length bytes, we convert the length String to an int.

When the length is greater than zero, we are processing a message byte. When we've processed length message bytes, we output the message.

Here's the complete runnable code.

import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; public class ParseMessages { public static void main(String[] args) { String exampleString = "05AB@@CDE0@@7123@@4567@@10AB@@CDEF@@GHIJ"; InputStream is = new ByteArrayInputStream( exampleString.getBytes(StandardCharsets.UTF_8)); processMessages(is, 4, 2); try { is.close(); } catch (IOException e) { e.printStackTrace(); } } private static void processMessages(InputStream is, int count, int skip) { // Assuming all message lengths are two bytes int length = 0; int lengthMaximum = 2; byte[] text; String lengthString = ""; String message = ""; try { do { text = is.readNBytes(count); is.readNBytes(skip); for (int textIndex = 0; textIndex < text.length; textIndex++) { if (length == 0) { lengthString += Character.toString(text[textIndex]); if (lengthString.length() >= lengthMaximum) { length = Integer.valueOf(lengthString); lengthString = ""; } } else { message += Character.toString(text[textIndex]); if (message.length() >= length) { // Here's where you write the individual messages // to an output. System.out.println(String.format("%2d", length) + " " + message); message = ""; length = 0; } } } } while (text.length > 0); } catch (IOException e) { e.printStackTrace(); } } } 
Sign up to request clarification or add additional context in comments.

3 Comments

The sample provided was an example , it can be pretty huge and not something that can be done in memory. I am looking to do it using rxjava. Removing the delimiter and then parsing it using non rxjava is trivial. It needs to be done using rxjava stream handling , mot likely the solution will have flowable reduce and flatmates after the buffer that strips delimiters.
@chhil: The code I posted doesn't read the whole input into memory. It processes the bytes as they are read, assuming you output the messages to a file as they are created. Why use a library when plain Java code works?
I already have the behavior implemented using plain Java where one pass removes the delimiters and a second pass to create the messages. I am looking for a solution that does the stripping and message creation using bytes in memory efficiently using back pressure.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.