0
votes

I am testing how quickly Apache Flink(using v1.8.2) can read messages from Kinesis Data Stream. Kinesis Data Streams contains only one shard, and it contains 40,000 messages. Each message size is under 5 KB.

Trying to read stream from oldest message using TRIM_HORIZON, I am expecting the app should be able to read all messages quickly since each shard can support up to a maximum total data read rate of 2 MB per second via GetRecords. With connector configuration (SHARD_GETRECORDS_MAX=400, SHARD_GETRECORDS_INTERVAL_MILLIS=1000) app should complete within few minutes to read all messages. But for some reason it is taking lot of time to read all messages.

Would you mind checking what is wrong in my connector config? Appreciate your help.

    public static DataStream<ObjectNode> createKinesisStream(
            StreamExecutionEnvironment env) throws IOException {
        Properties properties = new Properties();
        properties.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
        properties.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
        properties.put(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "400");
        properties.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");

        properties.put(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "PROFILE");
        properties.put(ConsumerConfigConstants.AWS_PROFILE_NAME, "d");

        return env.addSource(new FlinkKinesisConsumer<>(
                    "stream1", new JsonNodeDeserializationSchema(), properties));
    }

   main() code:
   final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
   env.getConfig().setAutoWatermarkInterval(10000L);

   source = AppConfig.createKinesisStream(env);

   DataStream<ObjectNode> filteredStream = source
                .map(new CustomMap());
I have put a counter in discarding sink, in one fetch it read 27 messages( counter 829-855)

24 Mar 2020 08:11:50,519 INFO  DiscardingSink:15 - 827
24 Mar 2020 08:11:50,519 INFO  DiscardingSink:15 - 828
24 Mar 2020 08:11:51,631 INFO  DiscardingSink:15 - 829
24 Mar 2020 08:11:51,631 INFO  DiscardingSink:15 - 830
.
.
24 Mar 2020 08:11:51,639 INFO  DiscardingSink:15 - 854
24 Mar 2020 08:11:51,639 INFO  DiscardingSink:15 - 855
24 Mar 2020 08:11:52,749 INFO  DiscardingSink:15 - 856
1
Adding more: No other consumer exists, and also ReadProvisionedThroughputExceeded is zero on Kinesis streamSparkle8

1 Answers

0
votes

One possible explanation is that something in your pipeline is exerting back pressure on the source. To measure just the capacity of the source, you could simplify the job down to this:

source.addSink(new DiscardingSink<>());

where DiscardingSink is

public static class DiscardingSink<OUT> implements SinkFunction<OUT> {

    @Override
    public void invoke(OUT value, Context context) throws Exception {
    }
}