0
votes

I am processing a Kafka stream using Flink SQL where every message is pulled from Kafka, processed using flink sql and pushed back into kafka. I wanted a nested output where input is flat and output is nested. Say for example my input is

{'StudentName':'ABC','StudentAge':33}

and want output as

{'Student':{'Name':'ABC','Age':33}}

I tried searching here and few similar links but could not find so. Is it possible to do so using Apache Flink SQL API? Can use User Defined Functions if necessary but would want to avoid so.

2

2 Answers

0
votes

You could try something like this:

SELECT 
  MAP ['Student', MAP ['Name', StudentName, 'Age', StudentAge]] 
FROM 
  students;

I found the MAP function here, but I had to experiment in the SQL Client to figure out the syntax.

0
votes

I was able to achieve the same by returning a map from Flink UDF. The eval() function in UDF will return a Map while the FlinkSQL query will call the UDF with student as an Alias:

The UDF should looks like this

    public class getStudent extends ScalarFunction {
        public Map<String, String> eval(String name, Integer age) {
            Map<String, String> student = new HashMap<>();
            student.put("Name", name);
            student.put("Age", age.toString());
            return student;
        }
    }

and the FlinkSQL query stays like this:

    Select getStudent(StudentName, StudentAge) as `Student` from MyKafkaTopic

The same can be done for Lists as well when trying get List out of FlinkSQL