1
votes

I'm using scala 2.12 spark 3.0.0. I have to create one single json string where each attribute comes from different tables and save that resulting json in another dataframe, i.e (using 1 row only in this example to keep it simple)

tableA

id action date
u1 insert 20210428

tableB

id name date
u1 some name 20210428

I need to return the following :

{
    "A": [ 
    {
      "id":"u1",
      "action": "insert",
      "date": "20210428"
    }
    ],
    "B": [
    {
      "id":"u1",
      "name": "some name"
      "date": "20210428",
    } 
   ]
}  

I've tried many things but the closest i've gotten is doing the following for each table:

val tableADF = spark.read.format("delta").load(path +"/tableA")
val tableADF = spark.read.format("delta").load(path +"/tableB")

create the dataframe with all vaules converted to json for each table

val tableAJsonDF = tableADF.groupBy("date").agg(collect_list(struct($"id",$"action")).alias("attributesA"))
val tableBJsonDF = tableBDF.groupBy("date").agg(collect_list(struct($"id",$"name")).alias("attributesB"))
date attributesA
20210428 [{"id":"u1", "action": "insert"}]
date attributesB
20210428 [{"id":"u1", "name": "some name"}]

Now combine the json from both tables into one json to be added to a new dataframe:

val schema = new StructType().add("request", StringType)
val requestDF = spark.createDataFrame(sc.emptyRDD[Row], schema)

val resultDF = requestDF.withColumn("request", concat(to_json(tableAJsonDF("attributesA")), 
                                                      to_json(tableBJsonDF("attributesB"))))

but I get the following error. I read that this type of error happens when you try to combine two dataframes but I can't seem to find a way to create 1 single json as shown in the desired results by combining both attributes into 1 new column, any ideas?

org.apache.spark.sql.AnalysisException: Resolved attribute(s) attributesA#4726,attributesB#4783 missing from request#24790 in operator !Project [concat(to_json(attributesA#4726, Some(EST)), to_json(attributesB#4783, Some(EST))) AS request#24792].;;

1

1 Answers

1
votes

You need to join the dataframes. e.g.

val t1 = tableADF.select(
    col("id"), 
    array(struct(tableADF.columns.map(col):_*)).as("A")
)

val t2 = tableBDF.select(
    col("id"), 
    array(struct(tableBDF.columns.map(col):_*)).as("B")
)

val result = t1.join(t2, Seq("id")).select(to_json(struct("A", "B")).as("result"))

result.show(false)
+--------------------------------------------------------------------------------------------------------------+
|result                                                                                                        |
+--------------------------------------------------------------------------------------------------------------+
|{"A":[{"id":"u1","action":"insert","date":"20210428"}],"B":[{"id":"u1","name":"some name","date":"20210428"}]}|
+--------------------------------------------------------------------------------------------------------------+