2
votes

I have the below configuration:

  1. One kafka topic with 2 partitions
  2. One zookeeper instance
  3. One kafka instance
  4. Two consumers with same group id

Flink job snippet:

speStream.addSink(new FlinkKafkaProducer011(kafkaTopicName,new 
SimpleStringSchema(), props));

Scenario 1:

I have written a flink job (Producer) on eclipse which is reading a file from a folder and putting the msgs on kafka topic.

So when i run this code using eclipse, it works fine.

For example : If I place a file with 100 records, flink sends few msgs to partition 1 & few msgs to partition 2 and hence both the consumers gets few msgs.

Scenario 2: When i create the jar of the above code and run it on flink server, flink sends all the msgs to a single partition and hence only one consumer get all the msgs.

I want the scenario 1 using the jar created in scenario 2.

2
Are you sure that you are not using the same key for the messages? Kafka will place messages with the same key, in the same partition.Giorgos Myrianthous
I am using default partitioner which puts the msgs in round robin fashion. As i said, it is working fine if i run through eclipseAnkit

2 Answers

1
votes

If you do not provide a FlinkKafkaPartitioner or do not explicitly say to use Kafka's one a FlinkFixedPartitioner will be used, meaning that all events from one task will end up in the same partition.

To use Kafka's partitioner use this ctor:

speStream.addSink(new FlinkKafkaProducer011(kafkaTopicName,new SimpleStringSchema(), props), Optional.empty());

The difference between running from IDE and eclipse are probably because of different setup for parallelism or partitioning within Flink.

2
votes

For Flink-Kafka Producers, add "null" as the last parameter.

speStream.addSink(new FlinkKafkaProducer011(
    kafkaTopicName,
    new SimpleStringSchema(),
    props,
    (FlinkKafkaPartitioner) null)
);

The short explanation for this is that this turns off Flink from using the default partitioner FlinkFixedPartitioner. This being turned off as the default will allow Kafka to distribute the data amongst its partitions as it sees fit. If this is NOT turned off, then each parallelism/task slot used for the sink that utilizes the FlinkKafkaProducer will only write to one partition per parallelism/task slot.