My objective is to setup a high throughput cluster using Kafka as source & Flink as the stream processing engine. Here's what I have done.
I have setup a 2-node cluster the following configuration on the master and the workers.
Master flink-conf.yaml
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 256
taskmanager.heap.mb: 512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
Worker flink-conf.yaml
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 512 #256
taskmanager.heap.mb: 1024 #512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
The slaves
file on the Master node looks like this:
<WORKER_IP_ADDR>
localhost
The flink setup on both nodes is in a folder which has the same name. I start up the cluster on the master by running
bin/start-cluster-streaming.sh
This starts up the task manager on the Worker node.
My input source is Kafka. Here is the snippet.
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream =
env.addSource(
new KafkaSource<String>(kafkaUrl,kafkaTopic, new SimpleStringSchema()));
stream.addSink(stringSinkFunction);
env.execute("Kafka stream");
Here is my Sink function
public class MySink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
public void invoke(String arg0) throws Exception {
processMessage(arg0);
System.out.println("Processed Message");
}
}
Here are the Flink Dependencies in my pom.xml.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>0.9.0</version>
</dependency>
Then I run the packaged jar with this command on the master
bin/flink run flink-test-jar-with-dependencies.jar
However when I insert messages into the Kafka topic I am able to account for all messages coming in from my Kafka topic (via debug messages in the invoke method of my SinkFunction
implementation) on the Master node alone.
In the Job manager UI I am able to see 2 Task managers as below:
Also The dashboard looks like so : Questions:
- Why are the worker nodes not getting the tasks?
- Am I missing some configuration?
stringSinkFunction
look like? (Is it just printing to standard out?) – Robert MetzgerCustom Source -> Stream Sink (x/100)
deployed. Therefore, I'm wondering what's not working for you. Can it be that your topic has less than100
partitions? Since Flink creates a mapping between Kafka partition and source task, there would be tasks which don't receive any input. – Till Rohrmann