-1

I have a Java inputstream, that I skip 2 bytes every n bytes. Now the output of that are bytes that are clean after stripping the 2 delimiters every n bytes.

This output is a series of bytes where the first 4 bytes represent a length, so I need to get these calculate the length int and extract length + additional length bytes and write them to a file.

I use buffer to strip the 2 bytes but am not sure how to extract the length and length bytes. Basically, I need to gather/accumulate bytes for length and the additional length bytes that represent the message. Any help is appreciated.

Observable<Integer> byteObservable = Observable.create(emitter -> {
    try {
        while (true) {
            int b = fis.read();
            if (b == -1) {
                // No more bytes to read
                emitter.onComplete();
            } else {
                emitter.onNext(b);
            }
        }
    } catch (IOException e) {
        emitter.onError(e);
    }
});

byteObservable
    .buffer(100, 102) // Buffer 100, skip 101/102

As an example consider the stream to be

05AB@@CDE0@@7123@@4567@@10AB@@CDEF@@GHIJ

Stripped of @@ at every 4 bytes

05ABCDE07123456710ABCDEFGHIJ

This is made of sub messages

05 ABCDE
07 1234567
10 ABCDEFGHIJ

Take each of those messages and write them to a file.

1
  • To the person who down voted, do you even understand what the question is? Maybe share a reason for the down voted.
    – chhil
    Commented Jul 7 at 11:36

1 Answer 1

0

Here's one way to parse the messages using your simple example. These are my test results.

 5 ABCDE
 7 1234567
10 ABCDEFGHIJ

After discarding the skip bytes, we parse the input stream character by character. Either we are processing a length byte or a message byte. The int length field indicates whether we are processing a length byte or a message byte.

When the length is zero, we are processing a length byte. When we've processed two length bytes, we convert the length String to an int.

When the length is greater than zero, we are processing a message byte. When we've processed length message bytes, we output the message.

Here's the complete runnable code.

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;

public class ParseMessages {

    public static void main(String[] args) {
        String exampleString = "05AB@@CDE0@@7123@@4567@@10AB@@CDEF@@GHIJ";

        InputStream is = new ByteArrayInputStream(
                exampleString.getBytes(StandardCharsets.UTF_8));
        processMessages(is, 4, 2);

        try {
            is.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void processMessages(InputStream is, int count, int skip) {
        // Assuming all message lengths are two bytes
        int length = 0;
        int lengthMaximum = 2;
        byte[] text;
        String lengthString = "";
        String message = "";

        try {
            do {
                text = is.readNBytes(count);
                is.readNBytes(skip);
                for (int textIndex = 0; textIndex < text.length; textIndex++) {
                    if (length == 0) {
                        lengthString += Character.toString(text[textIndex]);
                        if (lengthString.length() >= lengthMaximum) {
                            length = Integer.valueOf(lengthString);
                            lengthString = "";
                        }
                    } else {
                        message += Character.toString(text[textIndex]);
                        if (message.length() >= length) {
                            // Here's where you write the individual messages
                            // to an output.
                            System.out.println(String.format("%2d", length)
                                    + " " + message);
                            message = "";
                            length = 0;
                        }
                    }
                }
            } while (text.length > 0);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}
3
  • The sample provided was an example , it can be pretty huge and not something that can be done in memory. I am looking to do it using rxjava. Removing the delimiter and then parsing it using non rxjava is trivial. It needs to be done using rxjava stream handling , mot likely the solution will have flowable reduce and flatmates after the buffer that strips delimiters.
    – chhil
    Commented Jul 7 at 11:32
  • @chhil: The code I posted doesn't read the whole input into memory. It processes the bytes as they are read, assuming you output the messages to a file as they are created. Why use a library when plain Java code works? Commented Jul 7 at 12:05
  • I already have the behavior implemented using plain Java where one pass removes the delimiters and a second pass to create the messages. I am looking for a solution that does the stripping and message creation using bytes in memory efficiently using back pressure.
    – chhil
    Commented Jul 7 at 13:16

Not the answer you're looking for? Browse other questions tagged or ask your own question.