I am testing how quickly Apache Flink(using v1.8.2) can read messages from Kinesis Data Stream. Kinesis Data Streams contains only one shard, and it contains 40,000 messages. Each message size is under 5 KB.
Trying to read stream from oldest message using TRIM_HORIZON, I am expecting the app should be able to read all messages quickly since each shard can support up to a maximum total data read rate of 2 MB per second via GetRecords. With connector configuration (SHARD_GETRECORDS_MAX=400, SHARD_GETRECORDS_INTERVAL_MILLIS=1000) app should complete within few minutes to read all messages. But for some reason it is taking lot of time to read all messages.
Would you mind checking what is wrong in my connector config? Appreciate your help.
public static DataStream<ObjectNode> createKinesisStream(
StreamExecutionEnvironment env) throws IOException {
Properties properties = new Properties();
properties.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
properties.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
properties.put(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "400");
properties.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");
properties.put(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "PROFILE");
properties.put(ConsumerConfigConstants.AWS_PROFILE_NAME, "d");
return env.addSource(new FlinkKinesisConsumer<>(
"stream1", new JsonNodeDeserializationSchema(), properties));
}
main() code:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(10000L);
source = AppConfig.createKinesisStream(env);
DataStream<ObjectNode> filteredStream = source
.map(new CustomMap());
I have put a counter in discarding sink, in one fetch it read 27 messages( counter 829-855)
24 Mar 2020 08:11:50,519 INFO DiscardingSink:15 - 827
24 Mar 2020 08:11:50,519 INFO DiscardingSink:15 - 828
24 Mar 2020 08:11:51,631 INFO DiscardingSink:15 - 829
24 Mar 2020 08:11:51,631 INFO DiscardingSink:15 - 830
.
.
24 Mar 2020 08:11:51,639 INFO DiscardingSink:15 - 854
24 Mar 2020 08:11:51,639 INFO DiscardingSink:15 - 855
24 Mar 2020 08:11:52,749 INFO DiscardingSink:15 - 856