1
votes

I'm working with Spark 1.6.2, developing a Python Spark client (it runs in yarn-client mode). The important thing here is, always in the client machine, I cannot spark-submit my Python script, but I need to run the script as a Python script.

At certain point of the code, I need to load a CSV file within HDFS as a Spark Dataframe (i.e. using a SQL context). As you may know, Spark 1.6.2 has no native support for CSV-based dataframes, and Databricks spark-csv must be used.

The data loading sentence is as follows:

df = sql_context.read.load(format='com.databricks.spark.csv', path=url, header=True, inferSchema=False, delimiter=',')

The problem is com.databricks.spark.csv is not found.

I know the Databricks spark-csv jars must be downloaded and put somewhere. The question is: where? Is this a requirement in the client machine, or in the cluster?

Since I don't know, I've tried this at the client machine, without success:

  • export PYTHONPATH=/path/where/jars/were/downloaded/.
  • conf = SparkConf().set('spark.jars', '/path/where/jars/were/downloaded/').
  • conf = SparkConf().set('spark.driver.extraClassPath', '/path/where/jars/were/downloaded/'). [ref]

I've also tried this at Spark cluster, without success too:

  • Setting custom spark-defaults property spark.jars through Ambari.
  • Setting custom spark-defaults property spark.driver.extraClassPaththrough Ambari.

I'll remember you that command line options as --jars or --packages are not suitable for me, since I'm not running any Spark script :)

Other solutions, such as doing setting the jar in the Spark context using addJar() will not work, since Spark 1.6.2 do not implement it.

So, any idea about how my code can find Databricks spark-csv jar?

Just in case, this is the error trace:

java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark.csv. Please find packages at http://spark-packages.org
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:77)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:102)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:109)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:209)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.databricks.spark.csv.DefaultSource
        at java.net.URLClassLoader$1.run(URLClassLoader.java:359)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:348)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:347)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:62)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:62)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:62)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:62)
        at scala.util.Try.orElse(Try.scala:82)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:62)
        ... 14 more

Other post I've read...

Thanks in advance.

1

1 Answers

0
votes

Finally, I found this issue at Databricks Github, and @drorata's answer worked for me:

export PACKAGES="com.databricks:spark-csv_2.11:1.3.0"
export PYSPARK_SUBMIT_ARGS="--packages ${PACKAGES} pyspark-shell"

By exporting the above environment variables, Databrick spark-csv package (and dependencies) where downloaded to my local .ivy2 folder, and automatically uploaded to the cluster while creating the Spark Context.