0
votes

I am writing a Flink job that accumulates data in a partitioned window for some time & should sink it to a file with specific name under a directory. There are lots of data pushed to this topic from where the job partitions on a 'type' & write them to a file. I tried StreamingFileSink but doesn't seems to work, any other built in api?

1

1 Answers

1
votes

The StreamingFileSink is the best way to do this. It does have a couple of minor restrictions, which perhaps explains it didn't work for you. To use it, you must have checkpointing enabled, and for S3, the StreamingFileSink supports only the Hadoop-based FileSystem implementation, not the implementation based on Presto.

Here's an example

DefaultRollingPolicy.PolicyBuilder policyBuilder = DefaultRollingPolicy.create();

StreamingFileSink<T> sink = StreamingFileSink
        .forRowFormat(
                new Path("file:///tmp/HourlyBuckets"),
                new SimpleStringEncoder<T>())
        .withBucketAssigner(new EventTimeBuckets())
        .withRollingPolicy(policyBuilder
                .withRolloverInterval(TimeUnit.HOURS.toMillis(1))
                .build())
        .build();

where the custom bucket assigner is grouping the events into hourly buckets based on timestamps in the events

public static class EventTimeBuckets implements BucketAssigner<T, String> {
    private transient DateTimeFormatter dateTimeFormatter;

    @Override
    public SimpleVersionedSerializer<String> getSerializer() {
        return SimpleVersionedStringSerializer.INSTANCE;
    }

    @Override
    public String getBucketId(T event, BucketAssigner.Context context) {
        if (dateTimeFormatter == null) {
            dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd--HH").withZone(ZoneId.of("UTC"));
        }
        return dateTimeFormatter.format(Instant.ofEpochMilli(event.getEventTime()));
    }

}