2
votes

There is a batch topic(Json content) in kafka that has to be consumed through NiFi(version 1.8). I'm able to consume this topic using consumekafkarecord processor but would like to filter based on an attribute value as I don't need all the records from that topic.

Can the filter be done while consuming the kafka topic even before getting the records into NiFi? What would be the best approach for this like what Processors or Scripts to use ?

I just want to filter out huge number of records based on one of the attribute values as they are not needed.

1
when you say "attributes" you mean the value of a field in each record (json) ? - Bryan Bende
Yes exactly Bryan! - Leibnitz

1 Answers

2
votes

There isn't a way I know of to filter records inside of ConsumeKafkaRecord, but you can do this easily right after that processor.

One option would be to connect it to a QueryRecord processor and write a SQL statement that selects the records you are interested in.

A second option would be to use PartitionRecord which allows you to partition records based on a record path expression. So you can say partition on field1, and if there are two values like A and B, then it will produce two flow files, one containing all records with A and one containing all records with B, then you just route the one you are interested in, and send the other one to a dead-end.