I am trying to run the Flink streaming job. I want to determine the throughput and latency for the streaming process. i have started the Kafka broker server and have incoming messages from kafka.How do i count messages per second (Throughput)? (Like rdd.count. Is there any similar method to get the count of incoming messages)
(Complete scenerio : I have sent the message through Producer as a Json Object. I am adding some information like name as string and also System.currentTimeMills in the Json object. During streaming , how do i obtain the sent json object through messageStream(DataStream)?)
Thanks in advance.
CODE :
/** * Read Strings from Kafka and print them to standard out. */
public static void main(String[] args) throws Exception {
System.setProperty("hadoop.home.dir", "c:/winutils/");
// parse input argum ents
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
if(parameterTool.getNumberOfParameters() < 4) {
System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " +
"--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>");
return;
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
DataStream<String> messageStream = env
.addSource(new FlinkKafkaConsumer010<>(
parameterTool.getRequired("topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));
messageStream.print();
env.execute();
}