1
votes

I was surprised to find that there are no outer joins for DataStream in Flink (DataStream docs).

For DataSet you have all the options: leftOuterJoin, rightOuterJoin and fullOuterJoin, apart from the regular join (DataSet docs). But for DataStream you just have the plain old join.

Is this due to some fundamental properties of the DataStream that make it impossible to have outer joins? Or maybe we can expect this in the (close?) future?

I could really use an outer join on DataStream for the problem I'm working on... Is there any way to achieve a similar behaviour?

2

2 Answers

0
votes

You can implement outer joins using the DataStream.coGroup() transformation. A CoGroupFunction receives two iterators (one for each input), which serve all elements of a certain key and which may be empty if no matching element is found. This allows to implement outer join functionality.

First-class support for outer joins might be added to the DataStream API in one of the next releases of Flink. I am not aware of any such efforts at the moment. However, creating an issue in the Apache Flink JIRA could help.

0
votes

One way would be to go from a stream -> table -> stream, using the following api: FLINK TABLE API - OUTER JOIN

Here is a java example:

    DataStream<String> data = env.readTextFile( ... );
    DataStream<String> data2Merge = env.readTextFile( ... );

    ...

    tableEnv.registerDataStream("myDataLeft", data, "left_column1, left_column2");
    tableEnv.registerDataStream("myDataRight", data2Merge, "right_column1, right_column2");

    String queryLeft = "SELECT left_column1, left_column2 FROM myDataLeft";
    String queryRight = "SELECT right_column1, right_column2 FROM myDataRight";

    Table tableLeft = tableEnv.sqlQuery(queryLeft);
    Table tableRight = tableEnv.sqlQuery(queryRight);

    Table fullOuterResult = tableLeft.fullOuterJoin(tableRight, "left_column1 == right_column1").select("left_column1, left_column2, right_column2");
    DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(fullOuterResult, Row.class);