4

Is there a way to simultaneously write to different streams in a custom processor in NiFi? For instance I have third party libraries that do significant processing using APIs that work something like this:

public void process(InputStream in, OutputStream foo, OutputStream baa, List<String> args)
{
    ...
    foo.write(things);
    baa.write(stuff);
    ...
}

But the only examples I can find all just use one output stream:

FlowFile transform = session.write(original, new OutputStreamCallback() {
        @Override
        public void process(OutputStream out) throws IOException {
            out.write("stuff");
        }
    });

Processing is done in batches, (due to its large scale), so its not practical to perform all the processing then write out the separate flows.

The only way I can come up with is process the input multiple times :(

To clarify, I want to write to multiple FlowFiles, using the session.write(flowfile, callback) method, so the different streams can be sent/managed separately

5
  • Is the use of TeeOutputStream out of the question ? See : stackoverflow.com/questions/7987395/…
    – GPI
    Commented Jul 11, 2016 at 9:12
  • I dont think so, the only way I know of writing to a flowfile is using the OutputStreamCallback, which has only one function (process), which takes only one argument (an OutputStream).
    – foobarking
    Commented Jul 11, 2016 at 10:32
  • Yes, but a TeeOutputStream allows you to have 1 stream that writes to 2 separate files, isn(t that enough ?
    – GPI
    Commented Jul 11, 2016 at 10:50
  • I don't believe so, TeeOutputStream writes the same thing to both streams, my functions do not (such as "things" and "stuff" in my example). Thanks
    – foobarking
    Commented Jul 11, 2016 at 10:58
  • Also, I want to write to multiple flowfiles (this isn't clear, i'll update the question) using the session.write(flowfile, callback) method.
    – foobarking
    Commented Jul 11, 2016 at 11:04

2 Answers 2

4

The NiFi API is based on acting upon one flow file at a time, but you should be able to do something like this:

        FlowFile flowFile1 = session.create();
        final AtomicReference<FlowFile> holder = new AtomicReference<>(session.create());

        flowFile1 = session.write(flowFile1, new OutputStreamCallback() {
            @Override
            public void process(OutputStream out) throws IOException {

                FlowFile flowFile2 = session.write(holder.get(), new OutputStreamCallback() {
                    @Override
                    public void process(OutputStream out) throws IOException {

                    }
                });
                holder.set(flowFile2);

            }
        });
1
  • Thanks this works. My only change would be make the first OutputStream a different name (and final) so you can write to both in the inner-inner function.
    – foobarking
    Commented Jul 13, 2016 at 0:13
3

Since you're making different outputs from the same input you might also consider having these steps be broken out as discrete processors that focus on doing their specific function. Above you show "things" and "stuff" so for example I'm suggesting you have a 'DoThings' and 'DoStuff' processor. In your flow you can send the same flowfile to both by simply using the source connection twice. This then enables nice parallel operations and allows them to have different runtimes/etc. NiFi will still maintain the provenance trail for you and it won't actually be copying the bytes at all but rather passing a pointer to the original content.

2
  • I agree this is has its advantages, but if there is a large amount of processing done per input stream, it feels wasteful to throw it all away and do it again (or twice in parallel). Its a trade-off. Thanks though.
    – foobarking
    Commented Jul 13, 2016 at 0:16
  • Could you do the intermediate (or "shared") processing in a single processor and then split to two subsequent processors when the actions differ? Nothing says that the flowfile content had to be in a certain state when processor A finishes, as long as B and B' can both accept the output of A as input.
    – Andy
    Commented Jul 16, 2016 at 3:59

Not the answer you're looking for? Browse other questions tagged or ask your own question.