1
votes

I'm trying to join two streams in apache flink to get some results.

The current state of my project is, that I am fetching twitter data and map it into a 2-tuple, where the language of the user and the sum of tweets in a defined time window get saved. I do these both for the number of tweets per language and retweets per language. The tweet/retweet aggregation works fine in other processes.

I now want to get the percentage of the number of retweets to the number of all tweets in a time window.

Therefore I use the following code:

Time windowSize = Time.seconds(15);

// Sum up tweets per language
DataStream<Tuple2<String, Integer>> tweetsLangSum = tweets
        .flatMap(new TweetLangFlatMap())
        .keyBy(0)
        .timeWindow(windowSize)
        .sum(1);

// ---

// Get retweets out of all tweets per language
DataStream<Tuple2<String, Integer>> retweetsLangMap = tweets
        .keyBy(new KeyByTweetPostId())
        .flatMap(new RetweetLangFlatMap());

// Sum up retweets per language
DataStream<Tuple2<String, Integer>> retweetsLangSum = retweetsLangMap
        .keyBy(0)
        .timeWindow(windowSize)
        .sum(1);

// ---

tweetsLangSum.join(retweetsLangSum)
            .where(new KeySelector<Tuple2<String, Integer>, String>() {
                @Override
                public String getKey(Tuple2<String, Integer> tweet) throws Exception {
                    return tweet.f0;
                }
            })
            .equalTo(new KeySelector<Tuple2<String, Integer>, String>() {
                @Override
                public String getKey(Tuple2<String, Integer> tweet) throws Exception {
                    return tweet.f0;
                }
            })
            .window(TumblingEventTimeWindows.of(windowSize))
            .apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple4<String, Integer, Integer, Double>>() {
                @Override
                public Tuple4<String, Integer, Integer, Double> join(Tuple2<String, Integer> in1, Tuple2<String, Integer> in2) throws Exception {
                    String lang = in1.f0;
                    Double percentage = (double) in1.f1 / in2.f1;
                    return new Tuple4<>(in1.f0, in1.f1, in2.f1, percentage);
                }
            })
            .print();

When I print tweetsLangSum or retweetsLangSum the output seems to be fine. My problem is that I never get an output from the join. Does anyone have an idea why? Or am I using the window function in the first step of aggregation wrong when it comes to the join?

1

1 Answers

2
votes

This might be caused by a mix of different time semantics. The KeyedStream.timeWindow() method is a shortcut that creates a window operator based on the configured time characteristics, i.e., an event-time window if event-time is enabled or a processing-time window otherwise. For the join, you explicitly define an event-time window.

Did you enable event-time processing?

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);