1
votes

How do I get RxJava to work inside NiFi?? Or how do I get NiFi and RxJava to play nice? They seem like such a perfect complement to one another.

I've run into a problem that I can't figure out how to solve. NiFi keeps complaining about IllegalStateException or FlowFileHandlingException depending on the approach I take on where and how I read from the FlowFile input stream.

I'm learning about Apache NiFi and RxJava 2 (i.e. Flowables). I want to create an Apache NiFi Processor that operates similar to the existing SplitText processor - only simpler. No header processing, no fragment size processing -- just pull out each line of data -- I call it SplitLine.

There is no fancy threading going on here -- meaning I'm not trying to do anything with Flowable.observeOn() or Flowable.subscribeOn(). Everything should be done on one thread...the current thread.

I thought I would solve this by using RxJava. I would read characters from the FlowFile and publish them using a shifted buffer; for example...

Flowable<Tuple<Long, Integer>> chars = 
    Flowable.generate(
        () -> 0L,
        (cnt, emitter) -> { 
            int ch = flowStream.read();
            emitter.onNext(new Tuple<>(cnt, ch);
            if (ch == -1) emitter.onComplete();
            return cnt++;
        });

 return chars.buffer(2, 1).publish().autoConnect();

I also tried the equivalent using Flowable.create...

Flowable<Tuple<Long, Integer>> chars = 
    Flowable.create(emitter -> { 
        try {
            int ch;
            long cnt = 0;
            while ((ch = flowStream.read()) != -1) {
                emitter.onNext(new Tuple<>(cnt, ch);
                cnt++;
            }
            emitter.onComplete();
        } catch (IOException ex) { 
            ex.printStackTrace();
            emitter.onError(ex);
        } finally { 
            flowStream.close();
        }
    }, BackpressureStrategy.BUFFER);

return chars.buffer(2, 1).publish().autoConnect();

In the cases above, I am passing in the InputStream from the NiFi ProcessSession in the overridden onTrigger method of my Processor class.

InputStream stream = session.read(flowFile)
RxLineSplitter splitter = new RxLineSplitter(stream);

I have also tried using the callback version but unsurprisingly received an exception because the the stream was accessed from callback other than the read callback. That is...

session.read(flowFile, stream -> { 
    RxLineSplitter splitter = new RxLineSplitter(stream);

    // RxLineSplitter contains the code above which is the other callback it is complaining about... 
}

Why am I publishing the char stream? And why in pairs of chars? I have two subscribers on the char stream. One looks for the start of a line, the other looks for the end of a line. Because of Windows I need to look for one of [\r; \n; or \r\n)]. Basically, the second char in the pair is a lookahead.

In case you're interested, the crux of my RxSplitLine looks like...

Flowable<Tuple<Long, Integer>> findLineMarkers(
    Flowable<List<Tuple<Long, Integer>>> charPairs, 
    BiFunction<Tuple<Long, Integer>, Optional<Tuple<Long, Integer>>, Optional<Tuple<Long, Integer>>> strategy) { 

    return charPairs().map(pair -> {
            Tuple<Long, Integer> fst = pair.get(0);
            Optional<Tuple<Long, Integer>> snd = pair.size() > 1 ? Optional.of(pair.get(1)) : Optional.empty();

            return strategy.apply(fst, snd);
    }).filter(Optional::isPresent).map(Optional::get);
}

Flowable<SplitInfo> split(InputStream stream) throws IOException {

    return findLineMarkers(stream, startingPositionStrategy)
               .zipWith(findLineMarkers(stream, endingPositionStrategy), 
                        (s, e) -> new Split(s.item1, e.item1 - s.item1))
               .filter(split -> !removeEmptyLines || split.length > 0)
               .zipWith(counter(), Tuple::new)
               .timeInterval(TimeUnit.MILLISECONDS)
               .map(x -> new SplitInfo(x.value().item1.start,
                                       x.value().item1.length, 
                                       x.value().item2,
                                       x.time(), x.unit()));
}

Enough rambling... I'd be grateful for any help or pointers in getting NiFi and RxJava 2 to play nice with one another.

1
Can you post the stacktraces detailing the exceptions you noted above?Andy
Thanks for the interest Andy. I used a gist to copy a portion of the nifi-app.log and the onTrigger() of my SplitLine processor. gist.github.com/rkayman/60faa63723c9f54b20f619fb131ccb66Rob

1 Answers

0
votes

I believe I have found an answer... at least my SplitLine processor shows it has received the flow file and the read bytes is accurate too!

If you plan to read or do something with the input stream outside the normal InputStreamCallback, the NiFi docs instruct you to use one of the other overloads on ProcessSession.read specifically InputStream input = session.read(flowFile). The docs also state you are responsible for properly closing the stream. For those trying this as well, I might add on... Close the stream as quickly and as eagerly as possible.

In RxJava2 this means my Flowable.create method was close but not enough. You need to wrap a Flowable.using around your Flowable.create. Below is my modified constructor and method that worked...

A couple of highlights to note:

You might be tempted to pass the ProcessSession around and use it for the resourceSupplier in Flowable.using... that caused numerous headaches for me, ymmv, I don't recommend it (but if you find a way, please let me know).

I made use of the Flowable.using overload that allows you to specify the eager parameter. I set mine to true to eagerly close/dispose the resource (the InputStream).

RxLineSplitter(InputStream input, boolean removeEmptyLines) {

    this.inputStream = input;
    this.removeEmptyLines = removeEmptyLines;
}

private Flowable<List<Tuple<Long, Integer>>> getCharacters() {

    Flowable<Tuple<Long, Integer>> chars =
        Flowable.using(
            () -> this.inputStream,
            input -> Flowable.create(emitter -> {

                try {
                    long cnt = 0;
                    while (true) {
                        int ch = input.read();
                        if (isEOF.test(ch)) break;
                        emitter.onNext(new Tuple<>(cnt, ch));
                        ++cnt;
                    }
                    emitter.onComplete();

                } catch (Exception ex) {
                    emitter.onError(ex);
                }

            }, BackpressureStrategy.BUFFER),
            InputStream::close,
            true);

    return chars.buffer(2, 1);
}

Final thoughts:

  • I like that the supporting RxLineSplitter class has no dependencies on NiFi. Reduces coupling.

  • I don't like that the NiFi Processor.onTrigger method gets the InputStream, but requires the RxLineSplitter to close & dispose. This is discussed a bit in the documentation, but it feels dirty and error prone to me. Mitigating the above, the InputStream is used in one method only and is cleaned up in pretty obvious and clear way with Flowable.using.

Hope this helps someone else... time to see what other [learning] hurdles I encounter with NiFi and Rx.