0
votes

I have a collection that represents a data stream and testing StreamingFileSink to write the stream to S3. Program running successfully, but there is no data in the given S3 path.

    public class S3Sink {

    public static void main(String args[]) throws Exception {
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.enableCheckpointing(100);

        List<String> input = new ArrayList<>();
        input.add("test");

        DataStream<String> inputStream = see.fromCollection(input);

        RollingPolicy<Object, String> rollingPolicy = new CustomRollingPolicy();

        StreamingFileSink s3Sink = StreamingFileSink.
                forRowFormat(new Path("<S3 Path>"),
                new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(rollingPolicy)
                .build();


        inputStream.addSink(s3Sink);

        see.execute();
    }
}

Checkpointing enabled as well. Any thoughts on why Sink is not working as expected ?

UPDATE: Based on David's answer, created custom source which generates random string continuously and I am expecting Checkpointing to trigger after configured interval to write the data to S3.

public class S3SinkCustom {

    public static void main(String args[]) throws Exception {
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.enableCheckpointing(1000);

        DataStream<String> inputStream = see.addSource(new CustomSource());

        RollingPolicy<Object, String> rollingPolicy = new CustomRollingPolicy();

        StreamingFileSink s3Sink = StreamingFileSink.
                forRowFormat(new Path("s3://mybucket/data/"),
                new SimpleStringEncoder<>("UTF-8"))
                .build();


        //inputStream.print();

        inputStream.addSink(s3Sink);

        see.execute();
    }

    static class CustomSource extends RichSourceFunction<String> {

        private volatile boolean running = false;

        final String[] strings = {"ABC", "XYZ", "DEF"};

        @Override
        public void open(Configuration parameters){
            running = true;
        }

        @Override
        public void run(SourceContext sourceContext) throws Exception {
            while (running) {
                Random random = new Random();
                int index = random.nextInt(strings.length);
                sourceContext.collect(strings[index]);
                Thread.sleep(1000);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }

}

Still, There is no data in s3 and Flink Process is not even validating given S3 bucket is valid or not, but the process running without any issues.

Update:

Below is the custom rolling policy details:

public class CustomRollingPolicy implements RollingPolicy<Object, String> {

    @Override
    public boolean shouldRollOnCheckpoint(PartFileInfo partFileInfo) throws IOException {
        return partFileInfo.getSize() > 1;
    }

    @Override
    public boolean shouldRollOnEvent(PartFileInfo partFileInfo, Object o) throws IOException {
        return true;
    }

    @Override
    public boolean shouldRollOnProcessingTime(PartFileInfo partFileInfo, long l) throws IOException {
        return true;
    }
}
2
What does the custom rolling policy look like?David Anderson
I tried without custom rolling policy as well, still same behavior. Updated answer with custom rolling policy details.Ramana
Which S3 library are you using? The StreamingFileSink only works with the Hadoop-based S3 filesystem library (and not with the one from Presto).David Anderson
Have you checked if checkpoints are completing? You can see this in the web ui.David Anderson
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-s3-fs-hadoop</artifactId> <version>1.8.1</version> </dependency>Ramana

2 Answers

0
votes

I believe the issue is that the job you've written isn't going to run long enough to actually checkpoint, so the output isn't going to be finalized.

Another potential issue is that the StreamingFileSink only works with the Hadoop-based S3 filesystem (and not the one from Presto).

0
votes

Above issue is resolved after setting up flink-conf.yaml with required s3a properties like fs.s3a.access.key,fs.s3a.secret.key.

We need to let Flink know about the config location as well.

FileSystem.initialize(GlobalConfiguration.loadConfiguration(""));

With these changes, I was able to run S3 sink from local and messages persisted to S3 without any issues.