1
votes

I want to use Flink Table API to join two tables on the same field.

I want to implement

SELECT
    a.id
    b.id
FROM
    table1 AS a
JOIN
    table2 AS b
ON
    a.id = b.id

I tried, but found the only way to achieve my goal was like

val table1 = tableEnv.fromDataSet(dbData, "id1")
val table2 = tableEnv.fromDataSet(dbData, "id2")
val res = table1.join(table2).where("id1=id2")

But I want to reuse the key "id".

I found this on Flink Documentation:

Both tables must have distinct field names and an equality join predicate must be defined using a where or filter operator.

How can I reuse the filed key?

1

1 Answers

0
votes

The schema of the result of a join is the concatenation of both input schemas. For example if you have left: [a, b, c] and right: [d, e, f] the result of the join of left and right is a new Table with schema [a, b, c, d, e, f].

If left and right have fields with the same name, a subsequent operation such as a select cannot identify the field without ambiguity.

So basically

val left: Table = ???   // [id, valLeft]
val right: Table = ???  // [id, valRight]
val result: Table = left.join(right).where('id === 'id)

is equivalent to a SQL query

SELECT l.id AS id, l.valLeft, r.id AS id, r.valRight
FROM left l, right r
WHERE l.id = r.id

This SQL query would also not be accepted because there are two fields called id in the result.

You can resolve the issue by either

  • only renaming one of the input fields and reuse the other
  • rename both input fields and rename one of them after the join with as