0
votes

I'm trying to build a flow in NiFi (1.11.4) that reads Avro messages from AMQ, accumulates them using Merge Records processor and then writes merged parquet file to HDFS.

The issue is that when I'm trying to use ParquetRecordSetWriter in Merge Record processor (in conjunction with AvroReader) - merged content is never emitted based on a Minimum Bin Size threshold set - I tried to set very low values - it just doesn't work. At the same time Max Bin Age threshold works just fine.

Also, if I use AvroRecordSetWriter - min size threshold works just fine. So I tried to use AvroRecordSetWriter followed by PutParquet (or ConvertAvroToParquet) and faced another issue: if I set Row Group Size for parquet file (e.g. 128 MB) - then small files are never written.

Looks like it buffers content in memory but should it really do so? Because in the simple Java program that I've written to test AvroParquetWriter (essentially the same one that NiFi uses) - I was able to write small file with huge row group size set.

Are there any known issues related to parquet writing in NiFi? I'm very confused with described behavior. Any assistance appreciated.

Thanks in advance.

1

1 Answers

1
votes

I'm running a similar flow on 1.12.0, but I'm emitting on record count and max bin age, not min bin size. Works just fine. One thing that might be at play for you here is that I've noticed that the Parquet output is about 50% the size of the Avro binary pretty consistently in our data set. So, if you're guesstimating the bin memory size based on Avro it's probably going to be wrong for you with Parquet.