3
votes

I'm using Apache Nifi 1.9.2 to load data from a relational database into Google Cloud Storage. The purpose is to write the outcome into Parquet files as it stores data in columnar way. To achieve this I make use of the ConvertAvroToParquet (default settings) processor in Nifi (followed by the PutGCSObject processor). The problem with these resulting files is that I cannot read Decimal typed columns when consuming the files in Spark 2.4.0 (scala 2.11.12): Parquet column cannot be converted ... Column: [ARHG3A], Expected: decimal(2,0), Found: BINARY

Links to parquet/avro example files: https://drive.google.com/file/d/1PmaP1qanIZjKTAOnNehw3XKD6-JuDiwC/view?usp=sharing https://drive.google.com/file/d/138BEZROzHKwmSo_Y-SNPMLNp0rj9ci7q/view?usp=sharing

As I know that Nifi works with the Avro format in between processors within the flowfile, I have also written the avro file (like it is just before the ConvertAvroToParquet processor) and this I can read in Spark. It is also possible to not use logical types in Avro, but then I lose the column types in the end and all columns are Strings (not preferred). I have also experimented with the PutParquet processor without success.

val arhg_parquet = spark.read.format("parquet").load("ARHG.parquet")
arhg_parquet.printSchema()
arhg_parquet.show(10,false)

printSchema() gives proper result, indicating ARHG3A is a decimal(2,0) Executing the show(10,false) results in an ERROR: Parquet column cannot be converted in file file:///C:/ARHG.parquet. Column: [ARHG3A], Expected: decimal(2,0), Found: BINARY

1
This question touched upon multiple tools (nifi, spark, even s3) and within that there are several steps. Please try to narrow it down as much as possible (and ideally provide a reprodible example) so others may help more easily - Dennis Jaheruddin

1 Answers

0
votes

To achieve this I make use of the ConvertAvroToParquet (default settings) processor in Nifi (followed by the PutGCSObject processor)

Try upgrading to NiFi 1.12.1, our latest release. Some improvements were made to handling decimals that might be applicable here. Also, you can use the Parquet reader and writer services to convert from Avro to Parquet now as of ~1.10.0. If that doesn't work, it may be a bug that should have a Jira ticket filed against it.