1
votes

I am running a simple example to test window based on EventTime. I am able to generate output with processing time but when i using EventTime, no output is coming . Please help me to understand what i am doing wrong.

i am creating a SlidingWindow of size 10 seconds which slides every 5 seconds and at the end of the window, the system will emit the number of messages that were received during that time.

input :
a,1513695853 (generated at 13th second, received at 13th second) 
a,1513695853 (generated at 13th second, received at 13th second) 
a,1513695856 (generated at 16th second, received at 19th second) 
a,1513695859 (generated at 13th second, received at 19th second) 

2nd field represent timestamp of event, representing 13th,13th,16th,19th second of a minute.

if i am using Processing Time window :

Output :
(a,1)
(a,3)
(a,2)

But when i am using Event Time than no output is printing. Please help me to understand what is going wrong.

package org.apache.flink.window.training;

import java.io.InputStream;
import java.util.Properties;

import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import com.fasterxml.jackson.databind.ObjectMapper;

public class SocketStream {


  private static Properties properties = new Properties();

  public static void main(String args[]) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    InputStream inputStream =
        SocketStream.class.getClassLoader().getResourceAsStream("local-kafka-server.properties");

    properties.load(inputStream);

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    FlinkKafkaConsumer010<String> consumer =
        new FlinkKafkaConsumer010<>("test-topic", new SimpleStringSchema(), properties);

    DataStream<Element> socketStockStream =
        env.addSource(consumer).map(new MapFunction<String, Element>() {
          @Override
          public Element map(String value) throws Exception {

            String split[] = value.split(",");
            Element element = new Element(split[0], Long.parseLong(split[1]));

            return element;
          }
        }).assignTimestampsAndWatermarks(new TimestampExtractor());

    socketStockStream.map(new MapFunction<Element, Tuple2<String, Integer>>() {

      @Override
      public Tuple2<String, Integer> map(Element value) throws Exception {

        return new Tuple2<String, Integer>(value.getId(), 1);
      }
    }).keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5))
    .sum(1).
     print();

    env.execute();
  }

  public static class TimestampExtractor implements AssignerWithPunctuatedWatermarks<Element> {

    private static final long serialVersionUID = 1L;

    @Override
    public long extractTimestamp(Element element, long previousElementTimestamp) {

      return element.getTimestamp();
    }

    @Override
    public Watermark checkAndGetNextWatermark(Element lastElement, long extractedTimestamp) {
      // TODO Auto-generated method stub
      return null;
    }
  }
}
1

1 Answers

4
votes

Event-time processing requires properly generated timestamps and watermarks.

The TimestampExtractor in your code does not generate watermark but returns always null.