1
votes

Question: Does Spark have to finish processing the all entries in the reduceByKey step before starting the join step?

I think the answer is no. I think each partition/task has to finish the reduceByKey task before moving on to the join.

Details: In the below example, I reduce an RDD by key userId, and nest all the values with the same user id into a list.

Then I join this RDD of (userid,listOfEvents) with another RDD of (userid, otherEvent).

Note that in this example, the partitioners are the same (default HashParitioner on userId) between reduceByKey and join, so does this change whether reduceByKey has to completely finish processing all data before join?

In this example, the scenario where List(eventA, eventB) is joined with Event K without eventC will never happen, correct?

However, it's possible that List(eventA, eventB, eventC) is joined with Event K, but EventD and Event F are not reduced yet can occur?

Impression Events
userId  Event
1       eventA
1       eventB
1       eventC

2       eventD
2       eventF

Conversion Events
userId  Event
1       eventK

2       eventL
// The Reduce Step
final JavaPairRDD<Long, ObjectArrayList<Event>> impressionRDD = loadImpressionEvents()
    .mapToPair(event -> {

        final ObjectArrayList<Event> list = new ObjectArrayList();

        list.add(new Event(event.getTimestamp(),
            event.getCampaignIdentifier(), event.getSiteIdentifier()));

        return new Tuple2<>(
            event.getUserId(),
            list
        );
    })
    .reduceByKey((event1, event2) -> {
        // Combine impression events with the same user id
        event1.addAll(event2);
        return event1;
    });

// The Join Step 
final JavaPairRDD<Long, Tuple2<ConversionEvent, Event>> conversionImpressions = loadConversionEvents()
    .mapToPair(event -> new Tuple2<>(
        event.getUserId(),
        event
    ))
    .leftOuterJoin(impressionRDD);
1

1 Answers

1
votes

Spark has to finish shuffle before cogroup and flatten can be started so it is not possible to start join while reduceByKey is in progress.