0
votes

I'm consuming stream from Kafka source for my flink job reading from 50 topics at once like this:

FlinkKafkaConsumer<GenericRecord> kafkaConsumer = new FlinkKafkaConsumer<GenericRecord>(
            Pattern.compile("TOPIC_NAME[1-50].stream"), // getting data stream from all topics
            <DeserializationSchema>, //using avro schema
            properties); // auto.commit.interval.ms=1000 ...

And then there are some operators like: filter->map->keyBy->window->aggreagate->sink

The maximum throughput I'm able to get is from 10k to 20k records per second which is pretty low considering the source publishes hundreds of thousands of events and I can clearly see the consumer is lagging behind the producer. I've even tried to remove the sink and other operators to ensure there's no back pressure but it's still the same. I'm deploying my application to Amazon Kinesis data analytics and have tried several parallelism settings but none of those seem to improve the throughput.

Anything I'm missing?

1

1 Answers

0
votes

Several things can significantly impact throughput.

Inefficient serialization is often a major factor leading to poor throughput. See Flink Serialization Tuning Vol. 1: Choosing your Serializer — if you can for insight on this topic. The Avro generic record serializer isn't too bad, but are you carrying data around that you don't actually need?

Are you changing the parallelism anywhere in the pipeline? That's expensive.

With Kinesis data analytics you must use the RocksDB state backend, which has much less throughput than the heap-based state backend. But having the right configuration can help quite a bit. You should be using the fastest available local disk for the RocksDB working directory (SSD, or in extreme cases, a RAM disk may be warranted). Make sure that your instance type offers sufficient IOPs. Give RocksDB enough memory. Bloom filters are worth enabling if you do lots of reads. See The Impact of Disks on RocksDB State Backend in Flink: A Case Study for more insight on working with RocksDB.

You might try disabling checkpointing as an experiment. If that helps, that'll provide some clues.

Some of the network settings affect throughput. The defaults generally provide decent performance, but if you've modified them that's worth investigating.