156
votes

I would like to read a CSV in spark and convert it as DataFrame and store it in HDFS with df.registerTempTable("table_name")

I have tried:

scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")

Error which I got:

java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
    at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
    at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

What is the right command to load CSV file as DataFrame in Apache Spark?

14

14 Answers

212
votes

spark-csv is part of core Spark functionality and doesn't require a separate library. So you could just do for example

df = spark.read.format("csv").option("header", "true").load("csvfile.csv")

In scala,(this works for any format-in delimiter mention "," for csv, "\t" for tsv etc)

val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")

178
votes

Parse CSV and load as DataFrame/DataSet with Spark 2.x

First, initialize SparkSession object by default it will available in shells as spark

val spark = org.apache.spark.sql.SparkSession.builder
        .master("local") # Change it as per your cluster
        .appName("Spark CSV Reader")
        .getOrCreate;

Use any one of the following ways to load CSV as DataFrame/DataSet

1. Do it in a programmatic way

 val df = spark.read
         .format("csv")
         .option("header", "true") //first line in file has headers
         .option("mode", "DROPMALFORMED")
         .load("hdfs:///csv/file/dir/file.csv")

Update: Adding all options from here in case the link will be broken in future

  • path: location of files. Similar to Spark can accept standard Hadoop globbing expressions.
  • header: when set to true the first line of files will be used to name columns and will not be included in data. All types will be assumed string. The default value is false.
  • delimiter: by default columns are delimited using, but delimiter can be set to any character
  • quote: by default the quote character is ", but can be set to any character. Delimiters inside quotes are ignored
  • escape: by default, the escape character is , but can be set to any character. Escaped quote characters are ignored
  • parserLib: by default, it is "commons" that can be set to "univocity" to use that library for CSV parsing.
  • mode: determines the parsing mode. By default it is PERMISSIVE. Possible values are:
    • PERMISSIVE: tries to parse all lines: nulls are inserted for missing tokens and extra tokens are ignored.
    • DROPMALFORMED: drops lines which have fewer or more tokens than expected or tokens which do not match the schema
    • FAILFAST: aborts with a RuntimeException if encounters any malformed line charset: defaults to 'UTF-8' but can be set to other valid charset names
  • inferSchema: automatically infers column types. It requires one extra pass over the data and is false by default comment: skip lines beginning with this character. Default is "#". Disable comments by setting this to null.
  • nullValue: specifies a string that indicates a null value, any fields matching this string will be set as nulls in the DataFrame
  • dateFormat: specifies a string that indicates the date format to use when reading dates or timestamps. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to both DateType and TimestampType. By default, it is null which means trying to parse times and date by java.sql.Timestamp.valueOf() and java.sql.Date.valueOf().

2. You can do this SQL way as well

 val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")

Dependencies:

 "org.apache.spark" % "spark-core_2.11" % 2.0.0,
 "org.apache.spark" % "spark-sql_2.11" % 2.0.0,

Spark version < 2.0

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") 
    .option("mode", "DROPMALFORMED")
    .load("csv/file/path"); 

Dependencies:

"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,
18
votes

It's for whose Hadoop is 2.6 and Spark is 1.6 and without "databricks" package.

import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;

val csv = sc.textFile("/path/to/file.csv")
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_(0) != header(0))
val rdd = data.map(row => Row(row(0),row(1).toInt))

val schema = new StructType()
    .add(StructField("id", StringType, true))
    .add(StructField("val", IntegerType, true))

val df = sqlContext.createDataFrame(rdd, schema)
14
votes

With Spark 2.0, following is how you can read CSV

val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder
  .config(conf = conf)
  .appName("spark session example")
  .getOrCreate()

val path = "/Users/xxx/Downloads/usermsg.csv"
val base_df = sparkSession.read.option("header","true").
  csv(path)
9
votes

In Java 1.8 This code snippet perfectly working to read CSV files

POM.xml

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>2.0.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.8</version>
</dependency>
<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-csv_2.10</artifactId>
    <version>1.4.0</version>
</dependency>

Java

SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);

Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");

        //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
System.out.println("========== Print Schema ============");
df.printSchema();
System.out.println("========== Print Data ==============");
df.show();
System.out.println("========== Print title ==============");
df.select("title").show();
6
votes

There are a lot of challenges to parsing a CSV file, it keeps adding up if the file size is bigger, if there are non-english/escape/separator/other characters in the column values, that could cause parsing errors.

The magic then is in the options that are used. The ones that worked for me and hope should cover most of the edge cases are in code below:

### Create a Spark Session
spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()

### Note the options that are used. You may have to tweak these in case of error
html_df = spark.read.csv(html_csv_file_path, 
                         header=True, 
                         multiLine=True, 
                         ignoreLeadingWhiteSpace=True, 
                         ignoreTrailingWhiteSpace=True, 
                         encoding="UTF-8",
                         sep=',',
                         quote='"', 
                         escape='"',
                         maxColumns=2,
                         inferSchema=True)

Hope that helps. For more refer: Using PySpark 2 to read CSV having HTML source code

Note: The code above is from Spark 2 API, where the CSV file reading API comes bundled with built-in packages of Spark installable.

Note: PySpark is a Python wrapper for Spark and shares the same API as Scala/Java.

4
votes

Penny's Spark 2 example is the way to do it in spark2. There's one more trick: have that header generated for you by doing an initial scan of the data, by setting the option inferSchema to true

Here, then, assumming that spark is a spark session you have set up, is the operation to load in the CSV index file of all the Landsat images which amazon host on S3.

  /*
   * Licensed to the Apache Software Foundation (ASF) under one or more
   * contributor license agreements.  See the NOTICE file distributed with
   * this work for additional information regarding copyright ownership.
   * The ASF licenses this file to You under the Apache License, Version 2.0
   * (the "License"); you may not use this file except in compliance with
   * the License.  You may obtain a copy of the License at
   *
   *    http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */

val csvdata = spark.read.options(Map(
    "header" -> "true",
    "ignoreLeadingWhiteSpace" -> "true",
    "ignoreTrailingWhiteSpace" -> "true",
    "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
    "inferSchema" -> "true",
    "mode" -> "FAILFAST"))
  .csv("s3a://landsat-pds/scene_list.gz")

The bad news is: this triggers a scan through the file; for something large like this 20+MB zipped CSV file, that can take 30s over a long haul connection. Bear that in mind: you are better off manually coding up the schema once you've got it coming in.

(code snippet Apache Software License 2.0 licensed to avoid all ambiguity; something I've done as a demo/integration test of S3 integration)

2
votes

In case you are building a jar with scala 2.11 and Apache 2.0 or higher.

There is no need to create a sqlContext or sparkContext object. Just a SparkSession object suffices the requirement for all needs.

Following is mycode which works fine:

import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.log4j.{Level, LogManager, Logger}

object driver {

  def main(args: Array[String]) {

    val log = LogManager.getRootLogger

    log.info("**********JAR EXECUTION STARTED**********")

    val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate()
    val df = spark.read.format("csv")
      .option("header", "true")
      .option("delimiter","|")
      .option("inferSchema","true")
      .load("d:/small_projects/spark/test.pos")
    df.show()
  }
}

In case you are running in cluster just change .master("local") to .master("yarn") while defining the sparkBuilder object

The Spark Doc covers this: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html

1
votes

With Spark 2.4+, if you want to load a csv from a local directory, then you can use 2 sessions and load that into hive. The first session should be created with master() config as "local[*]" and the second session with "yarn" and Hive enabled.

The below one worked for me.

import org.apache.log4j.{Level, Logger}
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.sql._

object testCSV { 

  def main(args: Array[String]) {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark_local = SparkSession.builder().appName("CSV local files reader").master("local[*]").getOrCreate()

    import spark_local.implicits._
    spark_local.sql("SET").show(100,false)
    val local_path="/tmp/data/spend_diversity.csv"  // Local file
    val df_local = spark_local.read.format("csv").option("inferSchema","true").load("file://"+local_path) // "file://" is mandatory
    df_local.show(false)

    val spark = SparkSession.builder().appName("CSV HDFS").config("spark.sql.warehouse.dir", "/apps/hive/warehouse").enableHiveSupport().getOrCreate()

    import spark.implicits._
    spark.sql("SET").show(100,false)
    val df = df_local
    df.createOrReplaceTempView("lcsv")
    spark.sql(" drop table if exists work.local_csv ")
    spark.sql(" create table work.local_csv as select * from lcsv ")

   }

When ran with spark2-submit --master "yarn" --conf spark.ui.enabled=false testCSV.jar it went fine and created the table in hive.

1
votes

Add following Spark dependencies to POM file :

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

Spark configuration:

val spark = SparkSession.builder().master("local").appName("Sample App").getOrCreate()

Read csv file:

val df = spark.read.option("header", "true").csv("FILE_PATH")

Display output:

df.show()
0
votes

To read from relative path on the system use System.getProperty method to get current directory and further uses to load the file using relative path.

scala> val path = System.getProperty("user.dir").concat("/../2015-summary.csv")
scala> val csvDf = spark.read.option("inferSchema","true").option("header", "true").csv(path)
scala> csvDf.take(3)

spark:2.4.4 scala:2.11.12

-1
votes

Default file format is Parquet with spark.read.. and file reading csv that why you are getting the exception. Specify csv format with api you are trying to use

-1
votes

Try this if using spark 2.0+

For non-hdfs file:
df = spark.read.csv("file:///csvfile.csv")


For hdfs file:
df = spark.read.csv("hdfs:///csvfile.csv")

For hdfs file (with different delimiter than comma:
df = spark.read.option("delimiter","|")csv("hdfs:///csvfile.csv")

Note:- this work for any delimited file. Just use option(“delimiter”,) to change value.

Hope this is helpful.

-1
votes

With in-built Spark csv, you can get it done easily with new SparkSession object for Spark > 2.0.

val df = spark.
        read.
        option("inferSchema", "false").
        option("header","true").
        option("mode","DROPMALFORMED").
        option("delimiter", ";").
        schema(dataSchema).
        csv("/csv/file/dir/file.csv")
df.show()
df.printSchema()

There are various options you can set.

  • header: whether your file includes header line at the top
  • inferSchema: whether you want to infer schema automatically or not. Default is true. I always prefer to provide schema to ensure proper datatypes.
  • mode: parsing mode, PERMISSIVE, DROPMALFORMED or FAILFAST
  • delimiter: to specify delimiter, default is comma(',')