1
votes

I am writing a flink code in which I am reading a file from local system and writing it to database using "writeUsingOutputFormat".

Now my requirement is to write to hdfs instead of database.

Could you please help me how can i do in flink.

Note : hdfs is up and running on my local machine.

2

2 Answers

0
votes

Flink provides HDFS connector which can be used to write data to any file system supported by Hadoop Filesystem.

The provided sink is a Bucketing sink which partitions the data stream into folders containing rolling files. The bucketing behavior, as well as the writing, can be configured with parameters such as batch size and batch roll over time interval

The Flink document gives following example -

DataStream<Tuple2<IntWritable,Text>> input = ...;

BucketingSink<String> sink = new BucketingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")));
sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins

input.addSink(sink);
0
votes

The newer Streaming File Sink is probably a better choice than the Bucketing Sink at this point. This description is from the Flink 1.6 release notes (note that support for S3 was added in Flink 1.7):

The new StreamingFileSink is an exactly-once sink for writing to filesystems which capitalizes on the knowledge acquired from the previous BucketingSink. Exactly-once is supported through integration of the sink with Flink’s checkpointing mechanism. The new sink is built upon Flink’s own FileSystem abstraction and it supports local file system and HDFS, with plans for S3 support in the near future [now included in Flink 1.7]. It exposes pluggable file rolling and bucketing policies. Apart from row-wise encoding formats, the new StreamingFileSink comes with support for Parquet. Other bulk-encoding formats like ORC can be easily added using the exposed APIs.