1
votes

I've been struggling with this for a while and can't wrap my head around it , so far I've only found examples which explode() a MapType column to n Row entries.

What I'm trying to achieve is having, for example, a Map with 5 entries as 5 columns within the same Row.

Taking this DF as an example...

case class SampleRow(one: String, two: String, three: String, four: String, five: Map[String, String])       

val df = List(
          SampleRow(
            "one", 
            "two", 
            "three", 
            "four", 
            Map("sample_one" -> "hey", "sample_two" -> "hey"))
        ).toDF()

The DF, after exploding Column five, should be the following.

    Columns ->  one | two | three | four | sample_one | sample_two
    Values  -> "one"|"two"|"three"|"four"|   "hey"    |    "hey"

What I tried until now is the following.

val explodedDS = originDS
      .select(cols :+ $"key".as("outerMap") :+ $"value.*":_*) // Column 'value' as a previous Map has been exploded before

But doing that prompts the following error into the console.

Exception in thread "main" org.apache.spark.sql.AnalysisException: Can only star expand struct data types. Attribute: `ArrayBuffer(value)`;

I understand that exploding a Map to Columns generate the issue of not being able to infer a schema until all Row objects contain the exact same number of Columns, either null or with a value, right?

But apart from that, would there be any option to accomplish this despite the schema problem?

1
Does the Map in each record have the same structure? The same number and name of keys? - Travis Hegner
Yes it does @TravisHegner, I prefer to avoid the StructType with all its inner fields specified to make it more flexible but each of the records will have consistency across them. - czr_RR
can you please reformat your output in a table structure. It helps better understand what is expected. - C.S.Reddy Gadipally
@C.S.ReddyGadipally just edited the question. - czr_RR

1 Answers

2
votes

This may not be the fastest, but seems to work:

import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.functions._
import spark.implicits._

case class SampleRow(one: String, two: String, three: String, four: String, five: Map[String, String])
implicit def enc: Encoder[SampleRow] = Encoders.product[SampleRow]

val df = spark.createDataset(List(
          SampleRow(
            "one", 
            "two", 
            "three", 
            "four", 
            Map("sample_one" -> "hey", "sample_two" -> "hey"))
        ))

df.select($"*", explode($"five"))
  .groupBy("one", "two", "three", "four")
  .pivot("key")
  .agg(first($"value"))
  .show()

This results in the desired output of:

+---+---+-----+----+----------+----------+
|one|two|three|four|sample_one|sample_two|
+---+---+-----+----+----------+----------+
|one|two|three|four|       hey|       hey|
+---+---+-----+----+----------+----------+

This may not be perfectly generalized for your actual use case, but it should be close enough to make workable.