7
votes

My objective is to setup a high throughput cluster using Kafka as source & Flink as the stream processing engine. Here's what I have done.

I have setup a 2-node cluster the following configuration on the master and the workers.

Master flink-conf.yaml

jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost

jobmanager.rpc.port: 6123

jobmanager.heap.mb: 256

taskmanager.heap.mb: 512

taskmanager.numberOfTaskSlots: 50

parallelism.default: 100

Worker flink-conf.yaml

jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost

jobmanager.rpc.port: 6123

jobmanager.heap.mb: 512 #256

taskmanager.heap.mb: 1024 #512

taskmanager.numberOfTaskSlots: 50

parallelism.default: 100

The slaves file on the Master node looks like this:

<WORKER_IP_ADDR>
localhost

The flink setup on both nodes is in a folder which has the same name. I start up the cluster on the master by running

bin/start-cluster-streaming.sh

This starts up the task manager on the Worker node.

My input source is Kafka. Here is the snippet.

final StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> stream = 
    env.addSource(
    new KafkaSource<String>(kafkaUrl,kafkaTopic, new SimpleStringSchema()));
stream.addSink(stringSinkFunction);

env.execute("Kafka stream");

Here is my Sink function

public class MySink implements SinkFunction<String> {

    private static final long serialVersionUID = 1L;

    public void invoke(String arg0) throws Exception {
        processMessage(arg0);
        System.out.println("Processed Message");
    }
}

Here are the Flink Dependencies in my pom.xml.

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-core</artifactId>
    <version>0.9.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>0.9.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>0.9.0</version>
</dependency>

Then I run the packaged jar with this command on the master

bin/flink run flink-test-jar-with-dependencies.jar

However when I insert messages into the Kafka topic I am able to account for all messages coming in from my Kafka topic (via debug messages in the invoke method of my SinkFunction implementation) on the Master node alone.

In the Job manager UI I am able to see 2 Task managers as below: Job Manager dashboard - task managers

Also The dashboard looks like so : enter image description here Questions:

  1. Why are the worker nodes not getting the tasks?
  2. Am I missing some configuration?
1
Thank you for posting such a well-written question! How do you get these debug messages on the master? Its not really possible to execute user code on the master (JobManager) Are you using Flink 0.9.0 or 0.10-SNAPSHOT? How does your stringSinkFunction look like? (Is it just printing to standard out?)Robert Metzger
@rmetzger, You're welcome. I have updated the question. Any help would be appreciated.ss_everywhere
@SudarshanShubakar, from the screenshots it looks that you've 2 TMs registered with 50 slots each. This also corresponds with your configuration. Moreover, it looks as if your job is executed properly. In each of the 100 slots there is the task Custom Source -> Stream Sink (x/100) deployed. Therefore, I'm wondering what's not working for you. Can it be that your topic has less than 100 partitions? Since Flink creates a mapping between Kafka partition and source task, there would be tasks which don't receive any input.Till Rohrmann
ok@TillRohrmann This may be the issue. I believe there are less than 100 partitiions on the Kafka topic. Let me report back after changing this.ss_everywhere
Alright @TillRohrmann comment solved the problem. Can you convert your comment into an answer?ss_everywhere

1 Answers

14
votes

When reading from a Kafka source in Flink, the maximum degree of parallelism for the source task is limited by the number of partitions of a given Kafka topic. A Kafka partition is the smallest unit which can be consumed by a source task in Flink. If there are more partitions than source tasks, then some tasks will consume multiple partitions.

Consequently, in order to supply input to all of your 100 tasks, you should assure that your Kafka topic has at least 100 partitions.

If you cannot change the number of partitions of your topic, then it is also possible to initially read from Kafka using a lower degree of parallelism using the setParallelism method. Alternatively, you can use the rebalance method which will shuffle your data across all available tasks of the preceding operation.