0
votes

The question: the problem is that this program is writing to Kafka more than once every window (is creating 2-3 or more lines per window, meanwhile it is supposed to create 1 line per window as with the reduce function it lets only one element). I have the same code written in Spark and it works perfectly. I have been trying to find info about this issue and I haven't found anything :(. Also I have been trying changing some functions' parallelism and some more things and nothing worked, and I can not realise where can be the problem.

I am testing Flink latency. Here you have the environment of my problem:

Cluster: I am using Flink 1.2.0 and OpenJDK 8. I have 3 computers: 1 JobManager, 2 TaskManagers (4 cores, 2GB RAM, 4 task slots each TaskManager).

Input data: lines produced by one java producer to the Kafka 24 partitions' topic with two elements: incremental value and creation timestamp:

  • 1 1497790546981
  • 2 1497790546982
  • 3 1497790546983
  • 4 1497790546984
  • ............................

My Java Class:

  • It reads from a Kafka topic with 24 partitions (Kafka is in the same machine than the JobManager).
  • The filter functions are useless together with the union as I use them just for checking their latency.
  • Basically, it adds a "1" to each line,then there is a tumbling window every 2 seconds, and the reduce function sum all this 1's and all the timestamps, this last timestamp is later divided in the map function between the sum of 1's which gives me the average, and finally in the last map function it adds a timestamp of the current moment to each reduced line and the difference between this timestamp and the average timestamp.
  • This line is written to Kafka (to a 2 partitions' topic).

        //FLINK CONFIGURATION
        final StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();
    
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    
    
        //KAFKA CONSUMER CONFIGURATION
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.0.155:9092");
        FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<>(args[0], new SimpleStringSchema(), properties);
    
    
        //KAFKA PRODUCER
        Properties producerConfig = new Properties();
        producerConfig.setProperty("bootstrap.servers", "192.168.0.155:9092");
        producerConfig.setProperty("acks", "0");
        producerConfig.setProperty("linger.ms", "0");
    
    
        //MAIN PROGRAM
        //Read from Kafka
        DataStream<String> line = env.addSource(myConsumer);
    
        //Add 1 to each line
        DataStream<Tuple2<String, Integer>> line_Num = line.map(new NumberAdder());
    
        //Filted Odd numbers
        DataStream<Tuple2<String, Integer>> line_Num_Odd = line_Num.filter(new FilterOdd());
    
        //Filter Even numbers
        DataStream<Tuple2<String, Integer>> line_Num_Even = line_Num.filter(new FilterEven());
    
        //Join Even and Odd
        DataStream<Tuple2<String, Integer>> line_Num_U = line_Num_Odd.union(line_Num_Even);
    
        //Tumbling windows every 2 seconds
        AllWindowedStream<Tuple2<String, Integer>, TimeWindow> windowedLine_Num_U = line_Num_U
                .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)));
    
        //Reduce to one line with the sum
        DataStream<Tuple2<String, Integer>> wL_Num_U_Reduced = windowedLine_Num_U.reduce(new Reducer());
    
        //Calculate the average of the elements summed
        DataStream<String> wL_Average = wL_Num_U_Reduced.map(new AverageCalculator());
    
        //Add timestamp and calculate the difference with the average
        DataStream<String> averageTS = wL_Average.map(new TimestampAdder());
    
    
        //Send the result to Kafka
        FlinkKafkaProducer010Configuration<String> myProducerConfig = (FlinkKafkaProducer010Configuration<String>) FlinkKafkaProducer010
                .writeToKafkaWithTimestamps(averageTS, "testRes", new SimpleStringSchema(), producerConfig);
    
        myProducerConfig.setWriteTimestampToKafka(true);
    
        env.execute("TimestampLongKafka");
    }
    
    
    //Functions used in the program implementation:
    
    public static class FilterOdd implements FilterFunction<Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;
    
        public boolean filter(Tuple2<String, Integer> line) throws Exception {
            Boolean isOdd = (Long.valueOf(line._1.split(" ")[0]) % 2) != 0;
            return isOdd;
        }
    };
    
    
    public static class FilterEven implements FilterFunction<Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;
    
        public boolean filter(Tuple2<String, Integer> line) throws Exception {
            Boolean isEven = (Long.valueOf(line._1.split(" ")[0]) % 2) == 0;
            return isEven;
        }
    };
    
    
    public static class NumberAdder implements MapFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;
    
        public Tuple2<String, Integer> map(String line) {
            Tuple2<String, Integer> newLine = new Tuple2<String, Integer>(line, 1);
            return newLine;
        }
    };
    
    
    public static class Reducer implements ReduceFunction<Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;
    
        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> line1, Tuple2<String, Integer> line2) throws Exception {
            Long sum = Long.valueOf(line1._1.split(" ")[0]) + Long.valueOf(line2._1.split(" ")[0]);
            Long sumTS = Long.valueOf(line1._1.split(" ")[1]) + Long.valueOf(line2._1.split(" ")[1]);
            Tuple2<String, Integer> newLine = new Tuple2<String, Integer>(String.valueOf(sum) + " " + String.valueOf(sumTS), 
                    line1._2 + line2._2);
            return newLine;
        }
    };
    
    
    public static class AverageCalculator implements MapFunction<Tuple2<String, Integer>, String> {
        private static final long serialVersionUID = 1L;
    
        public String map(Tuple2<String, Integer> line) throws Exception {
            Long average = Long.valueOf(line._1.split(" ")[1]) / line._2;
            String result = String.valueOf(line._2) + " " + String.valueOf(average);
            return result;
        }
    };
    
    
    public static final class TimestampAdder implements MapFunction<String, String> {
        private static final long serialVersionUID = 1L;
    
        public String map(String line) throws Exception {
            Long currentTime = System.currentTimeMillis();
            String totalTime = String.valueOf(currentTime - Long.valueOf(line.split(" ")[1]));
            String newLine = line.concat(" " + String.valueOf(currentTime) + " " + totalTime);
    
            return newLine;
        }
    };
    

Some output data: this output has been written to the 2 partitions' topic, and with a producing rate of less than 1000 records/second (**in this case it is creating 3 output lines per window):

  • 1969 1497791240910 1497791241999 1089 1497791242001 1091
  • 1973 1497791240971 1497791241999 1028 1497791242002 1031
  • 1970 1497791240937 1497791242094 1157 1497791242198 1261
  • 1917 1497791242912 1497791243999 1087 1497791244051 1139
  • 1905 1497791242971 1497791243999 1028 1497791244051 1080
  • 1916 1497791242939 1497791244096 1157 1497791244199 1260
  • 1994 1497791244915 1497791245999 1084 1497791246002 1087
  • 1993 1497791244966 1497791245999 1033 1497791246004 1038
  • 1990 1497791244939 1497791246097 1158 1497791246201 1262

Thanks in advance!

1
I'm not sure what's going on, but a couple of things seem a bit odd: Why set the time characteristic to event time, then never setup a timestamp extractor and watermark generator, and then use processing time windows? And why have an infrastructure capable of parallel processing, and then use windowAll, which gathers all records into one task? - David Anderson
TimeCharacteristic: I use a consumer later for copying the results to a file and I use record.timestamp() so I need to setWriteTimestampToKafka(true) in my program. windowAll: I haven't done it yet, I have though about adding a key after both filters so the window can be done with parallelism=2 - froblesmartin
I forgot, I have changed EventTime to ProcessingTime! Thanks! - froblesmartin

1 Answers

0
votes

I don't know exactly why, but I can fix the problem stopping the Flink cluster and starting it again. After some job executions it starts to produce more outputs, at leats x3, and probably the problem can keep growing. I will open an issue on Jira and update this as soon as possible.