1
votes

For the last hours, I was trying to convert a JSON file to a Scala case class with Apache Spark.

The JSON has the following structure:

{
  "12": {
    "wordA": 1,
    "wordB": 2,
    "wordC": 3
  },
  "13": {
    "wordX": 10,
    "wordY": 12,
    "wordZ": 15
  }
}

First try: Set an build-up schema

I have tried to build artificially my schema:

val schema = new StructType()
   .add("",MapType(StringType, new StructType()
          .add("", StringType)
          .add("", IntegerType)))
val df = session.read
  .option("multiline",true)
  .option("mode", "PERMISSIVE")
  .schema(schema)
  .json(filePath)
df.show()

But this is obviously not right since I have to give the field name.

Second try: map to a case class

I have also tried to create case classes, which is a bit more elegant:

case class KeywordData (keywordsByCode: Map[String, WordAndWeight])

case class WordAndWeight (word: String, weight: Int)

Problem:

But in any case, df.show() displays:

+----+
|    |
+----+
|null|
+----+

The JSON structure is not easy to manipulate since my columns don't have a fix name. Any idea?

Expected result

A map with 12 and 13 as key and List[wordA,...wordC] respectively List[wordX, ..., wordZ] as values

Edit: Map of Map With the case class

case class WordAndWeight(code: Map[String, Map[String, Integer]])

It gives me the following error:

+-------+----------+
|     12|        13|
+-------+----------+
|[1,2,3]|[10,12,15]|
+-------+----------+


cannot resolve '`code`' given input columns: [12, 13];
org.apache.spark.sql.AnalysisException: cannot resolve '`code`' given input columns: [12, 13];
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
1
Your JSON looks like a Map of Maps but your case classes only have one Map. "wordA", "word"B", "wordC" are all keys in a Map I think.Patrick McGloin
what is the expected output?Ramesh Maharjan
your first attempt without schema should do the trick. did you try that? you should be getting structs instead of map. maps and structs behave the same and can be interchangedRamesh Maharjan
@RameshMaharjan: but how do you map to a case class?KeyMaker00
why would you want to map to a case class? can you clarify that? and please include the expected output tooRamesh Maharjan

1 Answers

1
votes

You try to define schema which has MapType as a root type. In other words you want each line to be map. AFAIK Spark doesn't support MapType as a root type. It supports only StructType as a root type.

When you define type via case classes and reflection like this:

val schema = ScalaReflection.schemaFor[KeywordData].dataType.asInstanceOf[StructType]

You get StructType as a root type:

root
  |-- keywordsByCode: map (nullable = true)
  |    |-- key: string
  |    |-- value: struct (valueContainsNull = true)
  |    |    |-- word: string (nullable = true)
  |    |    |-- weight: integer (nullable = true)

It means that Spark will create DataFrame with one column which is called keywordsByCode. And it will expect JSON like this

{"keywordsByCode":{"12":{"wordA":1,"wordB":2,"wordC":3},"13":{"wordX":10,"wordY":12,"wordZ":15}}}

You need to modify you JSON or read your file like text and then parse each line to JSON.

UPDATE

I haven't noticed one more mistake, you case class should look like:

case class KeywordData (keywordsByCode: Map[String, Map[String, Int]])

Because your JSON has nested MapType. So the schema will look like:

root
|-- keywordsByCode: map (nullable = true)
|    |-- key: string
|    |-- value: map (valueContainsNull = true)
|    |    |-- key: string
|    |    |-- value: integer (valueContainsNull = true)

My testing code:

val df = spark.read
  .option("multiline",true)
  .option("mode", "PERMISSIVE")
  .schema(ScalaReflection.schemaFor[KeywordData].dataType.asInstanceOf[StructType])
  .json("test.json")
 df.printSchema()
 df.explain(true)
 df.show(10)