I am setting up a minimal example here, where I have N streams (100 in the example below) from N Kakfa topics.
I want to finish each stream when it sees a "EndofStream" message.
When all streams are finished, I expected the Flink Program to gracefully finish.
This is true when parallelism is set to 1, but does not happen in general.
From Another question, it seems like not all threads of kafka consumer group end.
Others have suggested to throw an exception. However, the program will terminate at the first exception and would not wait for all the streams to finish.
I am also adding a minimal python program to add messages to kafka topics for reproducibility. Please fill in the <IP>:<PORT>
in each program.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String outputPath = "file://" + System.getProperty("user.dir") + "/out/output";
Properties kafkaProps = null;
kafkaProps = new Properties();
String brokers = "<IP>:<PORT>";
kafkaProps.setProperty("bootstrap.servers", brokers);
kafkaProps.setProperty("auto.offset.reset", "earliest");
ArrayList<FlinkKafkaConsumer<String>> consumersList = new ArrayList<FlinkKafkaConsumer<String>>();
ArrayList<DataStream<String>> streamList = new ArrayList<DataStream<String>>();
for (int i = 0; i < 100; i++) {
consumersList.add(new FlinkKafkaConsumer<String>(Integer.toString(i),
new SimpleStringSchema() {
@Override
public boolean isEndOfStream(String nextElement) {
if (nextElement.contains("EndofStream")) {
// throw new RuntimeException("End of Stream");
return true;
} else {
return false;
}
}
}
, kafkaProps));
consumersList.get(i).setStartFromEarliest();
streamList.add(env.addSource(consumersList.get(i)));
streamList.get(i).writeAsText(outputPath + Integer.toString(i), WriteMode.OVERWRITE);
}
// execute program
env.execute("Flink Streaming Java API Skeleton");
Python 3 Program
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='<IP>:<PORT>')
for i in range(100): # Channel Number
for j in range(100): # Message Number
message = "Message: " + str(j) + " going on channel: " + str(i)
producer.send(str(i), str.encode(message))
message = "EndofStream on channel: " + str(i)
producer.send(str(i), str.encode(message))
producer.flush()
Changing this line: streamList.add(env.addSource(consumersList.get(i)));
to streamList.add(env.addSource(consumersList.get(i)).setParallelism(1));
also does the job, but then Flink places all the consumers to the same physical machine.
I would like the consumers to be distributed as well.
flink-conf.yaml
parallelism.default: 2
cluster.evenly-spread-out-slots: true
Last resort to write each topic in separate file and use file as a source instead of kafka consumer.
The end-goal is to test how much time flink takes to process certain workloads for certain programs.