1
votes

I'm trying to join two types (say event A and B) of events in Flink. I want to confirm if my understanding is correct. Some of the properties of events -

  1. Event A flows immediately into flink with a few minutes delay (5-10 minutes)
  2. Event B flows at a slight delay of 15-30 minutes
  3. There is a 1:1 join between event A and event B

I have configured event A's datastream with BoundedOutOfOrdernessTimestampExtractor of 10 minutes, and event B's datastream with 30 minutes. I later do a time window join using the Table API.

Is my understanding correct about the following -

  1. Events are processed and joined as soon as they are received, as long as they are within the lateness window (10 minutes for event A, and 30 minutes for event B). The end to end latency has no minimum constraints due to any of Flink's configuration.
  2. The Table would hold onto the events for maximum of 30 minutes until the watermarks arrive from both streams. Later based on the watermarks the events are cleaned up
  3. The query configuration in the code below is redundant and not really required

Any other suggestions about the code below?

queryConfig.withIdleStateRetentionTime(
    org.apache.flink.api.common.time.Time.seconds(1),
    org.apache.flink.api.common.time.Time.minutes(30))

val stream: DataStream[Any] = textStream.flatMap(json => convert(json))

val aStream: DataStream[ClassA] =
    stream
        .filter(obj => obj.isInstanceOf[ClassA])
        .rebalance
        .map(obj => obj.asInstanceOf[ClassA])
        .assignTimestampsAndWatermarks(
            new BoundedOutOfOrdernessTimestampExtractor[ClassA](
                Time.minutes(10)){
                override def extractTimestamp(element: ClassA): Long =
                    element.serviceTimestamp.toInstant.toEpochMilli
            })

val bStream: DataStream[ClassB] =
    stream
            .filter(obj => obj.isInstanceOf[ClassB])
            .rebalance
            .map(obj => obj.asInstanceOf[ClassB])
            .assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor[ClassB](
                    Time.minutes(30)){
                    override def extractTimestamp(element: ClassB): Long =
                        element.timestamp.toInstant.toEpochMilli
                })

val aTable: Table  = tableEnv.fromDataStream[ClassA](aStream,
    // The .rowtime is for setting event time attributes
    'aTimestamp.rowtime as 'aTimestamp, 'aUniqueId, 'aItem)

val bTable: Table  = tableEnv.fromDataStream[ClassB](bStream,
    // The .rowtime is for setting event time attributes
    // https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
    'bTimestamp.rowtime as 'bTimestamp, 'uniqueId, 'bItem)

val result: Table = aTable
        .join(aTable)
        .where('aUniqueId === 'uniqueId
                // Give ClassB events 30 minutes lateness.
                // Use a time window join as optimization - https://stackoverflow.com/a/51620821
                // & https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#time-windowed-joins
                // Both time clauses are need to qualify as time window join
                && 'bTimestamp >= 'aTimestamp
                && 'bTimestamp <= 'aTimestamp + 30.minutes)
        // DO NOT change order without changing order in later parsing code
        .select('uniqueId, 'aItem, 'bItem, 'bTimestamp, 'aTimestamp.cast(createTypeInformation[Timestamp]))

val outputStream: DataStream[ClassC]  = tableEnv
                .toAppendStream[(String, String, String, Timestamp, Timestamp)](result)
                // TODO find better way to map to a POJO
                .map(row => ClassCUtils.toClassC(row))
1

1 Answers

0
votes

Events are processed and joined as soon as they are received, as long as they are within the lateness window (10 minutes for event A, and 30 minutes for event B). The end to end latency has no minimum constraints due to any of Flink's configuration.

That's correct. Event's will be mapped and filtered as they're received, and put into a buffer in order to meet the join window requirements.

The Table would hold onto the events for maximum of 30 minutes until the watermarks arrive from both streams. Later based on the watermarks the events are cleaned up

That's correct. The IntervalJoinOperator would receive events from both right and left sides of the join, check if they're in the time bounds, and if so emit them downstream:

private <THIS, OTHER> void processElement(
            final StreamRecord<THIS> record,
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
            final long relativeLowerBound,
            final long relativeUpperBound,
            final boolean isLeft) throws Exception {

        final THIS ourValue = record.getValue();
        final long ourTimestamp = record.getTimestamp();

        if (ourTimestamp == Long.MIN_VALUE) {
            throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
                    "interval stream joins need to have timestamps meaningful timestamps.");
        }

        if (isLate(ourTimestamp)) {
            return;
        }

        addToBuffer(ourBuffer, ourValue, ourTimestamp);

        for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
            final long timestamp  = bucket.getKey();

            if (timestamp < ourTimestamp + relativeLowerBound ||
                    timestamp > ourTimestamp + relativeUpperBound) {
                continue;
            }

            for (BufferEntry<OTHER> entry: bucket.getValue()) {
                if (isLeft) {
                    collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
                } else {
                    collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
                }
            }
        }

        long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
        if (isLeft) {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
        } else {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
        }
    }

The query configuration in the code below is redundant and not really required

That's correct. withIdleStateRetentionTime is relevant when you use unbounded operators, such as a GROUP BY clause in SQL with no windows attributes.