2
votes

I am setting up a minimal example here, where I have N streams (100 in the example below) from N Kakfa topics.

I want to finish each stream when it sees a "EndofStream" message. When all streams are finished, I expected the Flink Program to gracefully finish.
This is true when parallelism is set to 1, but does not happen in general.

From Another question, it seems like not all threads of kafka consumer group end.

Others have suggested to throw an exception. However, the program will terminate at the first exception and would not wait for all the streams to finish.

I am also adding a minimal python program to add messages to kafka topics for reproducibility. Please fill in the <IP>:<PORT> in each program.

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String outputPath = "file://" + System.getProperty("user.dir") + "/out/output";

        Properties kafkaProps = null;
        kafkaProps = new Properties();
        String brokers = "<IP>:<PORT>";
        kafkaProps.setProperty("bootstrap.servers", brokers);
        kafkaProps.setProperty("auto.offset.reset", "earliest");


        ArrayList<FlinkKafkaConsumer<String>> consumersList = new ArrayList<FlinkKafkaConsumer<String>>();
        ArrayList<DataStream<String>> streamList = new ArrayList<DataStream<String>>();

        for (int i = 0; i < 100; i++) {
            consumersList.add(new FlinkKafkaConsumer<String>(Integer.toString(i),
                        new SimpleStringSchema() {
                            @Override
                            public boolean isEndOfStream(String nextElement) {
                                if (nextElement.contains("EndofStream")) {
                                    // throw new RuntimeException("End of Stream");       
                                    return true;
                                } else { 
                                    return false;
                                }
                            }
                        }
                        , kafkaProps));
            consumersList.get(i).setStartFromEarliest();
            streamList.add(env.addSource(consumersList.get(i)));
            streamList.get(i).writeAsText(outputPath + Integer.toString(i), WriteMode.OVERWRITE);
        }

        // execute program
        env.execute("Flink Streaming Java API Skeleton");

Python 3 Program

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='<IP>:<PORT>')

for i in range(100): # Channel Number
    for j in range(100): # Message Number
        message = "Message: " + str(j) + " going on channel: " + str(i)
        producer.send(str(i), str.encode(message))

    message = "EndofStream on channel: " + str(i)
    producer.send(str(i), str.encode(message))


producer.flush()

Changing this line: streamList.add(env.addSource(consumersList.get(i))); to streamList.add(env.addSource(consumersList.get(i)).setParallelism(1)); also does the job, but then Flink places all the consumers to the same physical machine.

I would like the consumers to be distributed as well.

flink-conf.yaml

parallelism.default: 2
cluster.evenly-spread-out-slots: true

Last resort to write each topic in separate file and use file as a source instead of kafka consumer.
The end-goal is to test how much time flink takes to process certain workloads for certain programs.

1

1 Answers

0
votes

Use the cancel method from FlinkKafkaConsumerBase which is the parent class of FlinkKafkaConsumer.

public void cancel() Description copied from interface: SourceFunction Cancels the source. Most sources will have a while loop inside the SourceFunction.run(SourceContext) method. The implementation needs to ensure that the source will break out of that loop after this method is called. A typical pattern is to have an "volatile boolean isRunning" flag that is set to false in this method. That flag is checked in the loop condition.

When a source is canceled, the executing thread will also be interrupted (via Thread.interrupt()). The interruption happens strictly after this method has been called, so any interruption handler can rely on the fact that this method has completed. It is good practice to make any flags altered by this method "volatile", in order to guarantee the visibility of the effects of this method to any interruption handler.

Specified by: cancel in interface SourceFunction


You are right. It is necessary to use the SimpleStringSchema. This was based on this answer https://stackoverflow.com/a/44247452/2096986. Take a look on this example. First I sent the string Flink code we saw also works in a cluster and the Kafka consumer consumes the message. Then I send SHUTDOWNDDDDDDD which also has no effect to finish the stream. Finally, I sent SHUTDOWN and the stream job is finalized. See the logs below the program.

package org.sense.flink.examples.stream.kafka;

import java.util.Properties;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

public class KafkaConsumerQuery {

    public KafkaConsumerQuery() throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");

        FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer(java.util.regex.Pattern.compile("test"),
                new MySimpleStringSchema(), properties);

        DataStream<String> stream = env.addSource(myConsumer);
        stream.print();

        System.out.println("Execution plan >>>\n" + env.getExecutionPlan());
        env.execute(KafkaConsumerQuery.class.getSimpleName());
    }

    private static class MySimpleStringSchema extends SimpleStringSchema {
        private static final long serialVersionUID = 1L;
        private final String SHUTDOWN = "SHUTDOWN";

        @Override
        public String deserialize(byte[] message) {

            return super.deserialize(message);
        }

        @Override
        public boolean isEndOfStream(String nextElement) {
            if (SHUTDOWN.equalsIgnoreCase(nextElement)) {
                return true;
            }
            return super.isEndOfStream(nextElement);
        }
    }

    public static void main(String[] args) throws Exception {
        new KafkaConsumerQuery();
    }
}

Logs:

2020-07-02 16:39:59,025 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-8, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
3> Flink code we saw also works in a cluster. To run this code in a cluster
3> SHUTDOWNDDDDDDD
2020-07-02 16:40:27,973 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Sink: Print to Std. Out (3/4) (5f47c2b3f55c5eb558484d49fb1fcf0e) switched from RUNNING to FINISHED.
2020-07-02 16:40:27,973 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source -> Sink: Print to Std. Out (3/4) (5f47c2b3f55c5eb558484d49fb1fcf0e).
2020-07-02 16:40:27,974 INFO  org.apache.flink.runtime.taskmanager.Task                     - Ensuring all FileSystem streams are closed for task Source: Custom Source -> Sink: Print to Std. Out (3/4) (5f47c2b3f55c5eb558484d49fb1fcf0e) [FINISHED]
2020-07-02 16:40:27,975 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> Sink: Print to Std. Out (3/4) 5f47c2b3f55c5eb558484d49fb1fcf0e.
2020-07-02 16:40:27,979 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Sink: Print to Std. Out (3/4) (5f47c2b3f55c5eb558484d49fb1fcf0e) switched from RUNNING to FINISHED.