Actually it is possible to use the PutParquet processor.
Following description is from a working flow in nifi-1.8.
Place the following libs into a folder e.g. home/nifi/s3libs/
:
- aws-java-sdk-1.11.455.jar (+ Third-party libs)
- hadoop-aws-3.0.0.jar
Create a xml file e.g. /home/nifi/s3conf/core-site.xml
. Might need some additional tweaking, use the right endpoint for your zone.
<configuration>
<property>
<name>fs.defaultFS</name>
<value>s3a://BUCKET_NAME</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<value>ACCESS-KEY</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>SECRET-KEY</value>
</property>
<property>
<name>fs.AbstractFileSystem.s3a.imp</name>
<value>org.apache.hadoop.fs.s3a.S3A</value>
</property>
<property>
<name>fs.s3a.multipart.size</name>
<value>104857600</value>
<description>Parser could not handle 100M. replacing with bytes. Maybe not needed after testing</description>
</property>
<property>
<name>fs.s3a.endpoint</name>
<value>s3.eu-central-1.amazonaws.com</value>
<description>Frankfurt</description>
</property>
<property>
<name>fs.s3a.fast.upload.active.blocks</name>
<value>4</value>
<description>
Maximum Number of blocks a single output stream can have
active (uploading, or queued to the central FileSystem
instance's pool of queued operations.
This stops a single stream overloading the shared thread pool.
</description>
</property>
<property>
<name>fs.s3a.threads.max</name>
<value>10</value>
<description>The total number of threads available in the filesystem for data
uploads *or any other queued filesystem operation*.</description>
</property>
<property>
<name>fs.s3a.max.total.tasks</name>
<value>5</value>
<description>The number of operations which can be queued for execution</description>
</property>
<property>
<name>fs.s3a.threads.keepalivetime</name>
<value>60</value>
<description>Number of seconds a thread can be idle before being terminated.</description>
</property>
<property>
<name>fs.s3a.connection.maximum</name>
<value>15</value>
</property>
</configuration>
Usage
Create a PutParquet
processor. Under Properties set
- Hadoop Configuration Resources:
/home/nifi/s3conf/core-site.xml
,
- Additional Classpath Reources:
/home/nifi/s3libs
,
- Directory:
s3a://BUCKET_NAME/folder/
(EL available)
- Compression Type: tested with NONE, SNAPPY
- Remove CRC: true
The flow-file must contain a filename
attribute - No fancy chars or slashes.