6
votes

I'm writing an ETL Spark (2.4) job in Scala reading ;-separated CSV files with a glob pattern on S3. The data is loaded in a DataFrame and contains a column (let's say it is named custom) with a JSON-formatted string (multiple levels of nesting). The goal is to automatically infer the schema from that column so that it can be structured for a write sink on Parquet files back in S3.

This post (How to query JSON data column using Spark DataFrames?) suggests schema_of_json from Spark 2.4 can infer the schema from a JSON-formatted column or string.

Here is what I tried:

val jsonSchema: String = df.select(schema_of_json(col("custom"))).as[String].first

df.withColumn(
    "nestedCustom",
    from_json(col("custom"), jsonSchema, Map[String, String]())
)

But the above doesn't work and raise this exception:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(`custom`)' due to data type mismatch: The input json should be a string literal and not null; however, got `custom`.;;
'Project [schemaofjson(custom#7) AS schemaofjson(custom)#16]

Keep in mind I'm filtering out null values on custom for this DataFrame.


EDIT: whole code below.

import org.apache.spark.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

/**
  * RandomName entry point.
  *
  * @author Random author
  */
object RandomName {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder
      .appName("RandomName")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.parquet.fs.optimized.committer.optimization-enabled", true)
      .getOrCreate

    import spark.implicits._

    val randomName: RandomName = new RandomName(spark)

    val df: sql.DataFrame  = randomName.read().filter($"custom".isNotNull)
    val jsonSchema: String = df.select(schema_of_json(col("custom"))).as[String].first

    df.withColumn(
      "nestedCustom",
      from_json(col("custom"), jsonSchema, Map[String, String]())
    )

    df.show

    spark.stop
  }
}

class RandomName(private val spark: SparkSession) {

  /**
    * Reads CSV files from S3 and creates a sql.DataFrame.
    *
    * @return a sql.DataFrame
    */
  def read(): sql.DataFrame = {
    val tableSchema = StructType(
      Array(
        StructField("a", StringType, true),
        StructField("b", StringType, true),
        StructField("c", DateType, true),
        StructField("custom", StringType, true)
      ))

    spark.read
      .format("csv")
      .option("sep", ";")
      .option("header", "true")
      .option("inferSchema", "true")
      .schema(tableSchema)
      .load("s3://random-bucket/*")
  }
}

And an example of a JSON:

{
  "lvl1":  {
    "lvl2a": {
      "lvl3a":   {
        "lvl4a": "random_data",
        "lvl4b": "random_data"
      }
    },
    "lvl2b":   {
      "lvl3a":   {
        "lvl4a": "ramdom_data"
      },
      "lvl3b":  {
        "lvl4a": "random_data",
        "lvl4b": "random_data"
      }
    }
  }
}
1
Can you add your whole code and also a snippet of the json? - uh_big_mike_boi
Jut did that @big_mike_boiii, tell me if you need more information. - ngc2359

1 Answers

0
votes

That's an indicator that custom is not a valid input for schema_of_json

scala> spark.sql("SELECT schema_of_json(struct(1, 2))")
org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(named_struct('col1', 1, 'col2', 2))' due to data type mismatch: argument 1 requires string type, however, 'named_struct('col1', 1, 'col2', 2)' is of struct<col1:int,col2:int> type.; line 1 pos 7;
...

You should go back to your data and make sure that custom is indeed a String.