0
votes

I am new to scala and trying to make custom schema from array of elements to read files based on a new custom schema.

I read the arrays from json file and used explode method and created a dataframe for each element in column array.

val otherPeople = sqlContext.read.option("multiline", "true").json(otherPeopleDataset)
val column_values = otherPeople.withColumn("columns", explode($"columns")).select("columns.*")
column_values.printSchema()

Output obtained is:

column_values: org.apache.spark.sql.DataFrame = [column_id: string, data_sensitivty: string ... 3 more fields]
root
 |-- column_id: string (nullable = true)
 |-- data_sensitivty: string (nullable = true)
 |-- datatype: string (nullable = true)
 |-- length: string (nullable = true)
 |-- name: string (nullable = true)

val column_name = column_values.select("name","datatype")

column_name: org.apache.spark.sql.DataFrame = [name: string, datatype: string]
column_name.show(4)


+-----------------+--------+
|             name|datatype|
+-----------------+--------+
|    object_number| varchar|
|    function_type| varchar|
|            hof_1| varchar|
|            hof_2| varchar|
|           region| varchar|
|          country| varchar|
+-----------------+--------+

Now for all the values listed above i wanted create a val schema dynamically.

example:

val schema = new StructType()
      .add("object_number",StringType,true)
      .add("function_type",StringType,true)
      .add("hof_1",StringType,true)
      .add("hof_2",StringType,true)
      .add("region",StringType,true)
      .add("Country",StringType,true)

i want to build above struct dynamically once i obtained column dataframe, i read that first i need to create a map of datatype for each element and then create a struct in loop. can some one help here as i have limited knowledge of scala.

2
Can you show otherPeople dataframe printschema ? Also post same data for this otherPeopleSrinivas

2 Answers

2
votes

DataFrame with fields data can be collected, and for each row field is added to "StructType":

val schemaColumns = column_name.collect()
val schema = schemaColumns.foldLeft(new StructType())(
  (schema, columnRow) => schema.add(columnRow.getAs[String]("name"), getFieldType(columnRow.getAs[String]("datatype")), true)
  )

def getFieldType(typeName: String): DataType = typeName match {
    case "varchar" => StringType
    // TODO include other types here
    case _ => StringType
  }
0
votes

You can follow this approach, it could work fine for your example:

 //The schema is encoded in a string
  val schemaString = "object_number function_type hof_1 hof_2 region Country"
  //Generate the schema based on the string of schema
  val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
  val schema = StructType(fields)
  //Convert records of the RDD (myRdd) to Rows
  val rowRDD = sc.textFile("dir").map(line => line.split(",")).map(attributes => Row(attributes(0),attributes(1),attributes(2), attributes(3),attributes(4),attributes(5)))
  //Apply the schema to the RDD
  val perDF = spark.createDataFrame(rowRDD, schema)