2
votes

I am using following code to read csv file using pyspark

import os
import sys

os.environ["SPARK_HOME"] = "D:\ProgramFiles\spark-2.1.0-bin-hadoop2.7"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

conf = SparkConf() 
conf.setMaster('local') 
conf.setAppName('test')
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

df = qlContext.read.format("com.databricks.spark.csv").schema(customSchema).option("header", "true").option("mode", "DROPMALFORMED").load("iris.csv")

df.show()

The error is thrown as follows:-

File "", line 1, in df = sqlContext.read.format("com.databricks.spark.csv").schema(customSchema).option("header", "true").option("mode", "DROPMALFORMED").load("iris.csv")

File "D:\ProgramFiles\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\context.py", line 464, in read return DataFrameReader(self)

File "D:\ProgramFiles\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\readwriter.py", line 70, in init self._jreader = spark._ssql_ctx.read()

File "D:\ProgramFiles\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py", line 1133, in call answer, self.gateway_client, self.target_id, self.name)

File "D:\ProgramFiles\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\utils.py", line 79, in deco raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)

IllegalArgumentException: "Error while instantiating 'org.apache.spark.sql.internal.SessionState':"

1
# Here is missing code. Iris is absolute path. customSchema = StructType([ \ StructField("Sepal.Length", DoubleType(), True), \ StructField("Sepal.Width", DoubleType(), True), \ StructField("Petal.Length", DoubleType(), True), \ StructField("Petal.Width", DoubleType(), True), \ StructField("Species", StringType(), True)]) df = sqlContext.read.format("com.databricks.spark.csv").schema(customSchema).option("header", "true").option("mode", "DROPMALFORMED").load("d:\iris.csv")Shiv
Please do not use the comments space to add code or other details - edit & update the question insteaddesertnaut

1 Answers

0
votes

Above way of reading csv is applied on spark version < 2.0.0

For spark > 2.0.0 You need to read with spark session as in,

spark.read.csv("some_file.csv", header=True, mode="DROPMALFORMED", schema=schema)

or

(spark.read
 .schema(schema)
 .option("header", "true")
 .option("mode", "DROPMALFORMED")
 .csv("some_file.csv"))