2
votes

I am trying to understand the exact difference and which Method can be used in what particular Scenario between Creating Schema Implicitly & Programmatically.

On Databricks site the information is not that much elborative & explanatory.

As we can see that when using Reflection(implicit RDD to DF) way we can create a Case Class by choosing specific columns from a textfile by using the Map function.

And in Programmatic Style - we are loading the Dataset a textfile (similar to reflection)

Creating a SchemaString (String) = "Knowing the file we can specify the columns we need " (Similar to case class in Reflection way)

Importing the ROW API - which will again Map to the Specific Columns & data types used in Schema String (Similar to case classes)

Then we create DataFrame & after this everything is same.. So what is the exact difference in these two approaches.

http://spark.apache.org/docs/1.5.2/sql-programming-guide.html#inferring-the-schema-using-reflection

http://spark.apache.org/docs/1.5.2/sql-programming-guide.html#programmatically-specifying-the-schema

Please Explain...

2

2 Answers

1
votes

The produced schemas are the same, so from that point of view, there's no difference. In both cases, you're supplying a schema for your data, but in one case, you're doing it from a case class, in the other you can use collections, since a schema is built as a StructType(Array[StructField]). So it's basically a choice between tuples and collections. The way I see it, the biggest difference is that cases classes have to be in the code, while programmatically specifying the schema can be done at runtime, so you could, for instance, build a schema based on another DataFrame that you're reading at runtime. As an example, I wrote a generic tool to "nest" data, reading from CSV, and transforming a set of prefixed field into an array of structs. Since the tool is generic, and the schema is known only at runtime, I used the programmatic approach. On the other hand, it's generally easier to code it with reflection, since you don't have to deal with all the StructField objects, since they are derived from the hive metastore their data type has to be mapped to your scala types.

0
votes

Programmatically Specifying the Schema When case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps.

Create an RDD of Rows from the original RDD; Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1. Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.

For example:

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Import Row.
import org.apache.spark.sql.Row;

// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};

// Generate the schema based on the string of schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// Register the DataFrames as a table.
peopleDataFrame.registerTempTable("people")

Inferring the Schema Using Reflection The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as Sequences or Arrays. This RDD can be implicitly converted to a DataFrame and then be registered as a table. Tables can be used in subsequent SQL statements.

For example:

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")