1
votes

I'm investigating the performances of a Flink job that transports data from Kafka to an S3 Sink. We are using a BucketingSink to write parquet files. The bucketing logic divides the messages having a folder per type of data, tenant (customer), date-time, extraction Id, etc etc. This results in each file is stored in a folder structure composed by 9-10 layers (s3_bucket:/1/2/3/4/5/6/7/8/9/myFile...)

If the data is distributed as bursts of messages for tenant-type we see good performances in writing, but when the data is more a white noise distribution on thousands of tenants, dozens of data types and multiple extraction IDs, we have an incredible loss of performances. (in the order of 300x times)

Attaching a debugger, it seems the issue is connected to the number of handlers open at the same time on S3 to write data. More specifically: profiling execution

Researching in the hadoop libraries used to write to S3 I have found some possible improvements setting:

      <name>fs.s3a.connection.maximum</name>
      <name>fs.s3a.threads.max</name>
      <name>fs.s3a.threads.core</name>
      <name>fs.s3a.max.total.tasks</name>

But none of these made a big difference in throughput. I also tried to flatten the folder structure to write to a single key like (1_2_3_...) but also this didn't bring any improvement.

Note: The tests have been done on Flink 1.8 with the Hadoop FileSystem (BucketingSink), writing to S3 using the hadoop fs libraries 2.6.x (as we use Cloudera CDH 5.x for savepoints), so we can't switch to StreamingFileSink.

1

1 Answers

2
votes

After the suggestion from Kostas in https://lists.apache.org/thread.html/50ef4d26a1af408df8d9abb70589699cb6b26b2600ab6f4464e86ea4%40%3Cdev.flink.apache.org%3E

The culprit of the slow-down is this piece of code: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L543-L551

This alone takes around 4-5 secs, with a total of 6 secs to open the file. Logs from an instrumented call:

2020-02-07 08:51:05,825 INFO  BucketingSink  - openNewPartFile FS verification
2020-02-07 08:51:09,906 INFO  BucketingSink  - openNewPartFile FS verification - done
2020-02-07 08:51:11,181 INFO  BucketingSink  - openNewPartFile FS - completed partPath = s3a://....

This together with the default setup of the bucketing sink with 60 secs inactivity rollover https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L195 means that with more than 10 parallel bucket on a slot by the time we finish creating the last bucket the first one became stale, so needs to be rotated generating a blocking situation.

We solved this by replacing the BucketingSink.java and deleting the FS check mentioned above:

        LOG.debug("Opening new part file FS verification");
        if (!fs.exists(bucketPath)) {
            try {
                if (fs.mkdirs(bucketPath)) {
                    LOG.debug("Created new bucket directory: {}", bucketPath);
                }
            }
            catch (IOException e) {
                throw new RuntimeException("Could not create new bucket path.", e);
            }
        }
        LOG.debug("Opening new part file FS verification - done");

as we see that the sink works fine without it, now the file opening takes ~1.2sec.

Moreover we set the default inactive threshold to 5 mins. With this changes we can easily handle more than 200 buckets per slot (once the job takes speed it will ingest on all the slots so postponing the inactive timeout)