I am trying to run flink job as below to read data from Apache Kafka & print:
Java Program
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "test.net:9092");
properties.setProperty("group.id", "flink_consumer");
properties.setProperty("zookeeper.connect", "dev.com:2181,dev2.com:2181,dev.com:2181/dev2");
properties.setProperty("topic", "topic_name");
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>("topic_name", new SimpleStringSchema(), properties));
messageStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
public String map(String value) throws Exception {
return "Kafka and Flink says: " + value;
}
}).print();
env.execute();
Scala Code
var properties = new Properties();
properties.setProperty("bootstrap.servers", "msg01.staging.bigdata.sv2.247-inc.net:9092");
properties.setProperty("group.id", "flink_consumer");
properties.setProperty("zookeeper.connect", "host33.dev.swamp.sv2.tellme.com:2181,host37.dev.swamp.sv2.tellme.com:2181,host38.dev.swamp.sv2.tellme.com:2181/staging_sv2");
properties.setProperty("topic", "sv2.staging.rtdp.idm.events.omnichannel");
var env = StreamExecutionEnvironment.getExecutionEnvironment();
var stream:DataStream[(String)] = env
.addSource(new FlinkKafkaConsumer082[String]("sv2.staging.rtdp.idm.events.omnichannel", new SimpleStringSchema(), properties));
stream.print();
env.execute();
Whenever I run this in app in eclipse, I see below out to start with:
03/27/2017 20:06:19 Job execution switched to status RUNNING.
03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(1/4) switched to SCHEDULED 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(1/4) switched to DEPLOYING 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(2/4) switched to SCHEDULED 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(2/4) switched to DEPLOYING 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(3/4) switched to SCHEDULED 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(3/4) switched to DEPLOYING 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(4/4) switched to SCHEDULED 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(4/4) switched to DEPLOYING 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(4/4) switched to RUNNING 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(2/4) switched to RUNNING 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(1/4) switched to RUNNING 03/27/2017 20:06:19 Source: Custom Source -> Sink: Unnamed(3/4) switched to RUNNING
Question I have is:
1) Why am I seeing 4 instance of sink in all the cases(Scheduled, deployed and running).
2) For every line received in Apache Kafka, I see being printed here multiple times mostly 4 times. What's a reason?
Ideally I want to read each lines only once and do further processing with it. Any input/help will be appreciable!