6
votes

I'm using Spark 2.0 while working with tab-separated value (TSV) and comma-separated value (CSV) files. I want to load the data into Spark-SQL dataframes, where I would like to control the schema completely when the files are read. I don't want Spark to guess the schema from the data in the file.

How would I load TSV or CSV files into Spark SQL Dataframes and apply a schema to them?

1

1 Answers

20
votes

Below is a complete Spark 2.0 example of loading a tab-separated value (TSV) file and applying a schema.

I'm using the Iris data set in TSV format from UAH.edu as an example. Here are the first few rows from that file:

Type    PW      PL      SW      SL
0       2       14      33      50
1       24      56      31      67
1       23      51      31      69
0       2       10      36      46
1       20      52      30      65

To enforce a schema, you can programmatically build it using one of two methods:

A. Create the schema with StructType:

import org.apache.spark.sql.types._

var irisSchema = StructType(Array(
    StructField("Type",         IntegerType, true),
    StructField("PetalWidth",   IntegerType, true),
    StructField("PetalLength",  IntegerType, true),
    StructField("SepalWidth",   IntegerType, true),
    StructField("SepalLength",  IntegerType, true)
    ))

B. Alternatively, create the schema with a case class and Encoders (this approach is less verbose):

import org.apache.spark.sql.Encoders

case class IrisSchema(Type: Int, PetalWidth: Int, PetalLength: Int, 
                      SepalWidth: Int, SepalLength: Int)

var irisSchema = Encoders.product[IrisSchema].schema

Once you have created your schema, you can use spark.read to read in the TSV file. Note that you can actually also read comma-separated value (CSV) files as well, or any delimited files, as long as you set the option("delimiter", d) option correctly. Further, if you have a data file that has a header line, be sure to set option("header", "true").

Below is the complete final code:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Encoders

val spark = SparkSession.builder().getOrCreate()

case class IrisSchema(Type: Int, PetalWidth: Int, PetalLength: Int,
                      SepalWidth: Int, SepalLength: Int)

var irisSchema = Encoders.product[IrisSchema].schema

var irisDf = spark.read.format("csv").     // Use "csv" regardless of TSV or CSV.
                option("header", "true").  // Does the file have a header line?
                option("delimiter", "\t"). // Set delimiter to tab or comma.
                schema(irisSchema).        // Schema that was built above.
                load("iris.tsv")

irisDf.show(5)

And here is the output:

scala> irisDf.show(5)
+----+----------+-----------+----------+-----------+
|Type|PetalWidth|PetalLength|SepalWidth|SepalLength|
+----+----------+-----------+----------+-----------+
|   0|         2|         14|        33|         50|
|   1|        24|         56|        31|         67|
|   1|        23|         51|        31|         69|
|   0|         2|         10|        36|         46|
|   1|        20|         52|        30|         65|
+----+----------+-----------+----------+-----------+
only showing top 5 rows