2
votes

I would like to convert Avro files to Parquet in NiFi. I know it's possible to convert to ORC via the ConvertAvroToORC processor but I didn't found a solution to convert to Parquet.

I'm converting a JSON to Avro via a ConvertRecord (JsonTreeReader and AvroRecordSetWriter) processor. After that I would like to convert the Avro payload to Parquet before I will put it in a S3 bucket. I don't want to store it in HDFS, therefore the PutParquet processor seems not to be applicable.

I would need a processor such as: ConvertAvroToParquet

2

2 Answers

4
votes

@Martin, you can use a very handy processor ConvertAvroToParquet which I recently contributed in Nifi. It should be available in latest version.

The purpose of this processor is exactly similar to what you are looking for. For more details on this processor & why it was created : Nifi-5706

Code Link.

2
votes

Actually it is possible to use the PutParquet processor.

Following description is from a working flow in nifi-1.8.

Place the following libs into a folder e.g. home/nifi/s3libs/:

  • aws-java-sdk-1.11.455.jar (+ Third-party libs)
  • hadoop-aws-3.0.0.jar

Create a xml file e.g. /home/nifi/s3conf/core-site.xml. Might need some additional tweaking, use the right endpoint for your zone.

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>s3a://BUCKET_NAME</value>
    </property>
    <property>
        <name>fs.s3a.access.key</name>
        <value>ACCESS-KEY</value>
    </property>
    <property>
        <name>fs.s3a.secret.key</name>
        <value>SECRET-KEY</value>
    </property>
    <property>
        <name>fs.AbstractFileSystem.s3a.imp</name>
        <value>org.apache.hadoop.fs.s3a.S3A</value>
    </property>
    <property>
        <name>fs.s3a.multipart.size</name>
        <value>104857600</value>
        <description>Parser could not handle 100M. replacing with bytes. Maybe not needed after testing</description>
    </property>
    <property>
        <name>fs.s3a.endpoint</name>
        <value>s3.eu-central-1.amazonaws.com</value> 
        <description>Frankfurt</description>
    </property>
    <property>
        <name>fs.s3a.fast.upload.active.blocks</name>
        <value>4</value>
        <description>
    Maximum Number of blocks a single output stream can have
    active (uploading, or queued to the central FileSystem
    instance's pool of queued operations.

    This stops a single stream overloading the shared thread pool.
        </description>
    </property>
    <property>
        <name>fs.s3a.threads.max</name>
        <value>10</value>
        <description>The total number of threads available in the filesystem for data
    uploads *or any other queued filesystem operation*.</description>
    </property>

    <property>
        <name>fs.s3a.max.total.tasks</name>
        <value>5</value>
        <description>The number of operations which can be queued for execution</description>
    </property>

    <property>
        <name>fs.s3a.threads.keepalivetime</name>
        <value>60</value>
        <description>Number of seconds a thread can be idle before being terminated.</description>
    </property>
    <property>
        <name>fs.s3a.connection.maximum</name>
        <value>15</value>
    </property>
</configuration>

Usage

Create a PutParquet processor. Under Properties set

  • Hadoop Configuration Resources: /home/nifi/s3conf/core-site.xml,
  • Additional Classpath Reources: /home/nifi/s3libs,
  • Directory: s3a://BUCKET_NAME/folder/ (EL available)
  • Compression Type: tested with NONE, SNAPPY
  • Remove CRC: true

The flow-file must contain a filename attribute - No fancy chars or slashes.