
In Apache flink, I have 2 streams of Tuple8<> say in and out. 4 (Tuple4) out of 8 fields of event tuple act as key. I want to perform the correlation of records present between two streams, as a step of this I am joining 2 streams using join operator. As per semantics I should be getting output stream containing inner joined records. However, I am not getting any output or match. The timing characteristics of the env are set to event time stamp, first element of tuple is time stamp which I extract and mark it as timestamp using assign

DataStream<String> input = env.readTextFile("/tmp/logScrape/out/raw-input.out");
DataStream<Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String>> inFiltered =
                input.flatMap(new Splitter())
                        .filter(new InFilter())
                        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String>>(Time.seconds(10)) {
                            public long extractTimestamp(Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String> record) {
                                return record.f0;
        DataStream<Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String>> exitFiltered =
                input.flatMap(new Splitter())
                        .filter(new ExitFilter())
                        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String>>(Time.seconds(10)) {
                            public long extractTimestamp(Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String> record) {
                                return record.f0;

                .where(new TupleKeySelector())
                .equalTo(new TupleKeySelector())
                .apply(new StreamJoinner())

 public static class TupleKeySelector implements KeySelector<
          Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String>, 
          Tuple4<String, Integer, String, Integer>> {
        public Tuple4<String, Integer, String, Integer> getKey(Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String> value) {
            return new Tuple4<>(value.f2, value.f3, value.f4, value.f5);

Here are the output records I got for inFiltered stream


Here are the output records I got for exitFiltered stream



  • Is there anything that I am missing here so that I should start seeing joined results?
  • Is there any way that I can debug the code while the processing is happening? I am not sure for my case if its key selector is issue or windowing is not happening properly.
It is recommended to use POJOs (Plain old Java objects) instead of TupleX for data types with many fields. Also, POJOs can be used to give large Tuple-types a name. ci.apache.org/projects/flink/flink-docs-release-1.4/dev/…Brutal_JL

1 Answers


You've got a tumbling window of 1,000,000ms, right? From looking at the timestamps of your two filtered streams (first field, right?) I don't see any that occur in the same 1M milliseconds.