0
votes

I am trying to run flink job as below to read data from Apache Kafka & print:

Java Program

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "test.net:9092");
    properties.setProperty("group.id", "flink_consumer");
    properties.setProperty("zookeeper.connect", "dev.com:2181,dev2.com:2181,dev.com:2181/dev2");
    properties.setProperty("topic", "topic_name");

    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>("topic_name", new SimpleStringSchema(), properties));

            messageStream.rebalance().map(new MapFunction<String, String>() {
                private static final long serialVersionUID = -6867736771747690202L;

                public String map(String value) throws Exception {
                    return "Kafka and Flink says: " + value;
                }
            }).print();

            env.execute();

Scala Code

  var properties = new Properties();
  properties.setProperty("bootstrap.servers", "msg01.staging.bigdata.sv2.247-inc.net:9092");
  properties.setProperty("group.id", "flink_consumer");
  properties.setProperty("zookeeper.connect", "host33.dev.swamp.sv2.tellme.com:2181,host37.dev.swamp.sv2.tellme.com:2181,host38.dev.swamp.sv2.tellme.com:2181/staging_sv2");
  properties.setProperty("topic", "sv2.staging.rtdp.idm.events.omnichannel");
  var env = StreamExecutionEnvironment.getExecutionEnvironment();
  var stream:DataStream[(String)] = env
.addSource(new FlinkKafkaConsumer082[String]("sv2.staging.rtdp.idm.events.omnichannel", new SimpleStringSchema(), properties));
  stream.print();
  env.execute();

Whenever I run this in app in eclipse, I see below out to start with:

03/27/2017 20:06:19 Job execution switched to status RUNNING.

03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(1/4) switched to SCHEDULED 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(1/4) switched to DEPLOYING 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(2/4) switched to SCHEDULED 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(2/4) switched to DEPLOYING 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(3/4) switched to SCHEDULED 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(3/4) switched to DEPLOYING 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(4/4) switched to SCHEDULED 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(4/4) switched to DEPLOYING 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(4/4) switched to RUNNING 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(2/4) switched to RUNNING 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(1/4) switched to RUNNING 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(3/4) switched to RUNNING

Question I have is:

1) Why am I seeing 4 instance of sink in all the cases(Scheduled, deployed and running).

2) For every line received in Apache Kafka, I see being printed here multiple times mostly 4 times. What's a reason?

Ideally I want to read each lines only once and do further processing with it. Any input/help will be appreciable!

1
How many partitions does your topic have?Paul Back
Topic I am using have PartitionCount:6 & ReplicationFactor:2k_b

1 Answers

2
votes

If you run the program in the LocalStreamEnvironment (which you get when you call StreamExecutionEnvironment.getExecutionEnvironment() in an IDE) the default parallelism of all operators is equal to the number of CPU cores.

So in your example each operator is parallelized into four subtasks. In the log you see message for each of these four subtasks (3/4 indicates this is the third of in total four tasks).

You can control the number of subtasks by calling StreamExecutionEnvironment.setParallelism(int) or call setParallelism(int) on each individual operator.

Given your program, the Kafka records should not be replicated. Each record should only be printed once. However, since the records are written in parallel, line of output is prefixed by x> where x indicates the id of the parallel subtask that emitted the line.