1
votes

I try to join two DataStreams by IDs, and found there are two API set can do so,

  1. https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html
  2. https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#joins

It seems both of them can get the job done.

So my questions are:

  1. What is the main different? How to select?
  2. If I join stream A and B, and both has lot of records (eg. A:10000, B:20000), are all records in different stream compared to each other one by one? Total number of comparison is 10000x20000?
  3. Moreover, is there any cases (maybe network issue), stream B is delayed, then some of record in stream B is not compared to stream A?

Thanks.

1
Flink supports many different kinds of joins, both on bounded (batch) and unbounded (streaming) inputs. A thorough response to your question would be rather long and involved. For example, youtube.com/watch?v=UnCkwIp_614 goes through five different types of joins that can be done with Flink SQL, the code for which is in github.com/fhueske/flink-sql-demo. Can you narrow down the focus by giving a specific example of a join you want to do?David Anderson
Thanks. I don't have specific example yet. Just want to know more about the pros and cons of different API. However, I cannot find any comparison of them...Wilson Wong

1 Answers

1
votes

What the main differences? How to choose?

There are several different APIs that can be used to implement joins with Flink. You'll find a survey of the different approaches in the Apache Flink developer training materials shared by Ververica, at https://training.ververica.com/decks/joins/?mode=presenter (behind the registration form). Disclaimer: I wrote these training materials.

To summarize:

The low-level building block for implementing streaming joins is the KeyedCoProcessFunction. Using this directly makes sense in special cases where having complete control is valuable, but for most purposes you're better off using a higher-level API.

The DataSet API offers batch joins implemented as hash joins, sort-merge joins, and broadcast joins. This API has been soft deprecated, and will ultimately be replaced by a combination of bounded streaming and Flink's relational APIs (SQL/Table).

The DataStream API only offers some time windowed and interval joins. It doesn't support any joins where unbounded state retention might be required.

The SQL/Table API supports a wide range of both batch and streaming joins:

STREAMING & BATCH

  • Time-Windowed and Interval INNER + OUTER JOIN
  • Non-windowed INNER + OUTER JOIN

STREAMING ONLY

  • Time-versioned INNER JOIN
  • External lookup INNER JOIN

The SQL optimizer is able to reason about state no longer needed because of temporal constraints. But some streaming joins do have the potential to require unbounded state to produce fully correct results; a state retention policy can be put in place to clear out stale entries that are unlikely to be needed.

Note that the Table API is fully interoperable with the DataStream API. I would use SQL/Table joins wherever possible, as they are much simpler to implement and are very well optimized.

If I join stream A and B, and both has lot of records (eg. A:10000, B:20000), are all records in different stream compared to each other one by one? Total number of comparison is 10000x20000?

Flink supports equi-key joins, where for some specific key, you want to join records from streams A and B having the same value for that key. If there are 10000 records from A and 20000 records from B all having the same key, then yes, an unconstrained join of A and B will produce 10000x20000 results.

But I don't believe that's what you meant. Flink will materialize distributed hash tables in its managed state, which will be sharded across the cluster (by key). For example, as a new record arrives from stream A it will be hashed into the build-side hash table for A, and the corresponding hash table for B will probed to find matching records -- and all suitable results will be emitted.

Note that this is done in parallel. But all events from both A and B for a specific key will be processed by the same instance.

Moreover, is there any cases (maybe network issue), stream B is delayed, then some of record in stream B is not compared to stream A?

If you are doing event time processing in combination with a time-windowed or interval join as provided by the SQL/Table API, then late events (as determined by the watermarking) won't be considered, and the results will be incomplete. With the DataStream API it is possible to implement special handling for late events, such as sending them to a side output, or retracting and updating the results.

For joins without temporal constraints, delayed events are processed normally whenever they arrive. The results are (eventually) complete.