0
votes

I have a Dataset<Tuple3<String,String,Double>> values which has the following data:

<Vijaya,Chocolate,5>
<Vijaya,Chips,10>
<Rahul,Chocolate,2>
<Rahul,Chips,8>

I want the DataSet<Tuple5<String,String,Double,String,Double>> values1as following:

<Vijaya,Chocolate,5,Chips,10>
<Rahul,Chocolate,2,Chips,8>

My code looks like following:

DataSet<Tuple5<String, String, Double, String, Double>> values1 = values.fullOuterJoin(values)
    .where(0)
    .equalTo(0)
    .with(
        new JoinFunction<Tuple3<String, String, Double>, Tuple3<String, String, Double>, Tuple5<String, String, Double, String, Double>>() {
            private static final long serialVersionUID = 1L;

            public Tuple5<String, String, Double, String, Double> join(Tuple3<String, String, Double> first, Tuple3<String, String, Double> second) {
                return new Tuple5<String, String, Double, String, Double>(first.f0, first.f1, first.f2, second.f1, second.f2);
            }
        })
    .distinct(1, 3)
    .distinct(1);

In the above code I tried doing self join.I want the output in that particular format but I am unable to get it. How to do this? Please help.

1
Any source code? What have you tried, that doesnt work? - Alex

1 Answers

0
votes

Since you don't want the output to have the same item repeated, you can use a flat-join, in which you can output only those records that have the value in the 2nd position not equal to the value in the 4th position. Also, if you want only "chocolate" in the 2nd position, that can also be checked inside the FlatJoinFunction. Please find below the link to Flink's documentation about Flat-join.

https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/batch/dataset_transformations.html#join-with-flat-join-function

Approach using GroupReduceFunction:

     values
    .groupBy(0)
    .reduceGroup(new GroupReduceFunction<Tuple3<String,String,Double>, Tuple2<String, String>>() {

        @Override
        public void reduce(Iterable<Tuple3<String,String,Double>> in, Collector<Tuple2<String, String>> out) {

            StringBuilder output = new StringBuilder();
            String name = null;

            for (Tuple3<String,String,Double> item : in) {          

                name = item.f0;
                output.append(item.f1+","+item.f2+",");         

            }

            out.collect(new Tuple2<String, String>(name,output.toString())); 

        }
    });