How do I get RxJava to work inside NiFi?? Or how do I get NiFi and RxJava to play nice? They seem like such a perfect complement to one another.
I've run into a problem that I can't figure out how to solve. NiFi keeps complaining about IllegalStateException
or FlowFileHandlingException
depending on the approach I take on where and how I read from the FlowFile input stream.
I'm learning about Apache NiFi and RxJava 2 (i.e. Flowables). I want to create an Apache NiFi Processor that operates similar to the existing SplitText processor - only simpler. No header processing, no fragment size processing -- just pull out each line of data -- I call it SplitLine.
There is no fancy threading going on here -- meaning I'm not trying to do anything with Flowable.observeOn() or Flowable.subscribeOn(). Everything should be done on one thread...the current thread.
I thought I would solve this by using RxJava. I would read characters from the FlowFile and publish them using a shifted buffer; for example...
Flowable<Tuple<Long, Integer>> chars =
Flowable.generate(
() -> 0L,
(cnt, emitter) -> {
int ch = flowStream.read();
emitter.onNext(new Tuple<>(cnt, ch);
if (ch == -1) emitter.onComplete();
return cnt++;
});
return chars.buffer(2, 1).publish().autoConnect();
I also tried the equivalent using Flowable.create...
Flowable<Tuple<Long, Integer>> chars =
Flowable.create(emitter -> {
try {
int ch;
long cnt = 0;
while ((ch = flowStream.read()) != -1) {
emitter.onNext(new Tuple<>(cnt, ch);
cnt++;
}
emitter.onComplete();
} catch (IOException ex) {
ex.printStackTrace();
emitter.onError(ex);
} finally {
flowStream.close();
}
}, BackpressureStrategy.BUFFER);
return chars.buffer(2, 1).publish().autoConnect();
In the cases above, I am passing in the InputStream
from the NiFi ProcessSession
in the overridden onTrigger
method of my Processor class.
InputStream stream = session.read(flowFile)
RxLineSplitter splitter = new RxLineSplitter(stream);
I have also tried using the callback version but unsurprisingly received an exception because the the stream was accessed from callback other than the read callback. That is...
session.read(flowFile, stream -> {
RxLineSplitter splitter = new RxLineSplitter(stream);
// RxLineSplitter contains the code above which is the other callback it is complaining about...
}
Why am I publishing the char stream? And why in pairs of chars? I have two subscribers on the char stream. One looks for the start of a line, the other looks for the end of a line. Because of Windows I need to look for one of [\r; \n; or \r\n)]. Basically, the second char in the pair is a lookahead.
In case you're interested, the crux of my RxSplitLine looks like...
Flowable<Tuple<Long, Integer>> findLineMarkers(
Flowable<List<Tuple<Long, Integer>>> charPairs,
BiFunction<Tuple<Long, Integer>, Optional<Tuple<Long, Integer>>, Optional<Tuple<Long, Integer>>> strategy) {
return charPairs().map(pair -> {
Tuple<Long, Integer> fst = pair.get(0);
Optional<Tuple<Long, Integer>> snd = pair.size() > 1 ? Optional.of(pair.get(1)) : Optional.empty();
return strategy.apply(fst, snd);
}).filter(Optional::isPresent).map(Optional::get);
}
Flowable<SplitInfo> split(InputStream stream) throws IOException {
return findLineMarkers(stream, startingPositionStrategy)
.zipWith(findLineMarkers(stream, endingPositionStrategy),
(s, e) -> new Split(s.item1, e.item1 - s.item1))
.filter(split -> !removeEmptyLines || split.length > 0)
.zipWith(counter(), Tuple::new)
.timeInterval(TimeUnit.MILLISECONDS)
.map(x -> new SplitInfo(x.value().item1.start,
x.value().item1.length,
x.value().item2,
x.time(), x.unit()));
}
Enough rambling... I'd be grateful for any help or pointers in getting NiFi and RxJava 2 to play nice with one another.