I have a collection that represents a data stream and testing StreamingFileSink to write the stream to S3. Program running successfully, but there is no data in the given S3 path.
public class S3Sink {
public static void main(String args[]) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.enableCheckpointing(100);
List<String> input = new ArrayList<>();
input.add("test");
DataStream<String> inputStream = see.fromCollection(input);
RollingPolicy<Object, String> rollingPolicy = new CustomRollingPolicy();
StreamingFileSink s3Sink = StreamingFileSink.
forRowFormat(new Path("<S3 Path>"),
new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(rollingPolicy)
.build();
inputStream.addSink(s3Sink);
see.execute();
}
}
Checkpointing enabled as well. Any thoughts on why Sink is not working as expected ?
UPDATE: Based on David's answer, created custom source which generates random string continuously and I am expecting Checkpointing to trigger after configured interval to write the data to S3.
public class S3SinkCustom {
public static void main(String args[]) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.enableCheckpointing(1000);
DataStream<String> inputStream = see.addSource(new CustomSource());
RollingPolicy<Object, String> rollingPolicy = new CustomRollingPolicy();
StreamingFileSink s3Sink = StreamingFileSink.
forRowFormat(new Path("s3://mybucket/data/"),
new SimpleStringEncoder<>("UTF-8"))
.build();
//inputStream.print();
inputStream.addSink(s3Sink);
see.execute();
}
static class CustomSource extends RichSourceFunction<String> {
private volatile boolean running = false;
final String[] strings = {"ABC", "XYZ", "DEF"};
@Override
public void open(Configuration parameters){
running = true;
}
@Override
public void run(SourceContext sourceContext) throws Exception {
while (running) {
Random random = new Random();
int index = random.nextInt(strings.length);
sourceContext.collect(strings[index]);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
}
Still, There is no data in s3 and Flink Process is not even validating given S3 bucket is valid or not, but the process running without any issues.
Update:
Below is the custom rolling policy details:
public class CustomRollingPolicy implements RollingPolicy<Object, String> {
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo partFileInfo) throws IOException {
return partFileInfo.getSize() > 1;
}
@Override
public boolean shouldRollOnEvent(PartFileInfo partFileInfo, Object o) throws IOException {
return true;
}
@Override
public boolean shouldRollOnProcessingTime(PartFileInfo partFileInfo, long l) throws IOException {
return true;
}
}