0
votes

In Apache flink, I have 2 streams of Tuple8<> say in and out. 4 (Tuple4) out of 8 fields of event tuple act as key. I want to perform the correlation of records present between two streams, as a step of this I am joining 2 streams using join operator. As per semantics I should be getting output stream containing inner joined records. However, I am not getting any output or match. The timing characteristics of the env are set to event time stamp, first element of tuple is time stamp which I extract and mark it as timestamp using assign

DataStream<String> input = env.readTextFile("/tmp/logScrape/out/raw-input.out");
DataStream<Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String>> inFiltered =
                input.flatMap(new Splitter())
                        .filter(new InFilter())
                        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String>>(Time.seconds(10)) {
                            @Override
                            public long extractTimestamp(Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String> record) {
                                return record.f0;
                            }
                        });
        DataStream<Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String>> exitFiltered =
                input.flatMap(new Splitter())
                        .filter(new ExitFilter())
                        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String>>(Time.seconds(10)) {
                            @Override
                            public long extractTimestamp(Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String> record) {
                                return record.f0;
                            }
                        });

inFiltered.join(exitFiltered)
                .where(new TupleKeySelector())
                .equalTo(new TupleKeySelector())
                .window(TumblingEventTimeWindows.of(Time.milliseconds(1000000)))
                .apply(new StreamJoinner())
                .writeAsText("/tmp/logScrape/out/output");

 public static class TupleKeySelector implements KeySelector<
          Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String>, 
          Tuple4<String, Integer, String, Integer>> {
        @Override
        public Tuple4<String, Integer, String, Integer> getKey(Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String> value) {
            return new Tuple4<>(value.f2, value.f3, value.f4, value.f5);
        }
    }

Here are the output records I got for inFiltered stream

(1519254461076381189,1234,program1,11,program2,20,27,in)
(1519254462071697685,1234,program1,11,program2,20,27,in)
(1519254463067014246,1234,program1,11,program2,20,27,in)

Here are the output records I got for exitFiltered stream

(1519254458167640292,6789,program1,11,program2,20,27,out)
(1519254460158076301,6789,program1,11,program2,20,27,out)
(1519254461153294238,6789,program1,11,program2,20,27,out)
(1519254462148512207,6789,program1,11,program2,20,27,out)
(1519254463143730191,6789,program1,11,program2,20,27,out)

Question:

  • Is there anything that I am missing here so that I should start seeing joined results?
  • Is there any way that I can debug the code while the processing is happening? I am not sure for my case if its key selector is issue or windowing is not happening properly.
1
It is recommended to use POJOs (Plain old Java objects) instead of TupleX for data types with many fields. Also, POJOs can be used to give large Tuple-types a name. ci.apache.org/projects/flink/flink-docs-release-1.4/dev/…Brutal_JL

1 Answers

0
votes

You've got a tumbling window of 1,000,000ms, right? From looking at the timestamps of your two filtered streams (first field, right?) I don't see any that occur in the same 1M milliseconds.