I am writing a Flink job that accumulates data in a partitioned window for some time & should sink it to a file with specific name under a directory. There are lots of data pushed to this topic from where the job partitions on a 'type' & write them to a file.
I tried StreamingFileSink
but doesn't seems to work, any other built in api?
0
votes
1 Answers
1
votes
The StreamingFileSink
is the best way to do this. It does have a couple of minor restrictions, which perhaps explains it didn't work for you. To use it, you must have checkpointing enabled, and for S3, the StreamingFileSink
supports only the Hadoop-based FileSystem implementation, not the implementation based on Presto.
Here's an example
DefaultRollingPolicy.PolicyBuilder policyBuilder = DefaultRollingPolicy.create();
StreamingFileSink<T> sink = StreamingFileSink
.forRowFormat(
new Path("file:///tmp/HourlyBuckets"),
new SimpleStringEncoder<T>())
.withBucketAssigner(new EventTimeBuckets())
.withRollingPolicy(policyBuilder
.withRolloverInterval(TimeUnit.HOURS.toMillis(1))
.build())
.build();
where the custom bucket assigner is grouping the events into hourly buckets based on timestamps in the events
public static class EventTimeBuckets implements BucketAssigner<T, String> {
private transient DateTimeFormatter dateTimeFormatter;
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
@Override
public String getBucketId(T event, BucketAssigner.Context context) {
if (dateTimeFormatter == null) {
dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd--HH").withZone(ZoneId.of("UTC"));
}
return dateTimeFormatter.format(Instant.ofEpochMilli(event.getEventTime()));
}
}