I'm using scala 2.12 spark 3.0.0. I have to create one single json string where each attribute comes from different tables and save that resulting json in another dataframe, i.e (using 1 row only in this example to keep it simple)
tableA
id | action | date |
---|---|---|
u1 | insert | 20210428 |
tableB
id | name | date |
---|---|---|
u1 | some name | 20210428 |
I need to return the following :
{
"A": [
{
"id":"u1",
"action": "insert",
"date": "20210428"
}
],
"B": [
{
"id":"u1",
"name": "some name"
"date": "20210428",
}
]
}
I've tried many things but the closest i've gotten is doing the following for each table:
val tableADF = spark.read.format("delta").load(path +"/tableA")
val tableADF = spark.read.format("delta").load(path +"/tableB")
create the dataframe with all vaules converted to json for each table
val tableAJsonDF = tableADF.groupBy("date").agg(collect_list(struct($"id",$"action")).alias("attributesA"))
val tableBJsonDF = tableBDF.groupBy("date").agg(collect_list(struct($"id",$"name")).alias("attributesB"))
date | attributesA |
---|---|
20210428 | [{"id":"u1", "action": "insert"}] |
date | attributesB |
---|---|
20210428 | [{"id":"u1", "name": "some name"}] |
Now combine the json from both tables into one json to be added to a new dataframe:
val schema = new StructType().add("request", StringType)
val requestDF = spark.createDataFrame(sc.emptyRDD[Row], schema)
val resultDF = requestDF.withColumn("request", concat(to_json(tableAJsonDF("attributesA")),
to_json(tableBJsonDF("attributesB"))))
but I get the following error. I read that this type of error happens when you try to combine two dataframes but I can't seem to find a way to create 1 single json as shown in the desired results by combining both attributes into 1 new column, any ideas?
org.apache.spark.sql.AnalysisException: Resolved attribute(s) attributesA#4726,attributesB#4783 missing from request#24790 in operator !Project [concat(to_json(attributesA#4726, Some(EST)), to_json(attributesB#4783, Some(EST))) AS request#24792].;;