4
votes

Have been trying to connect Spark 2.2.1 on my EMR 5.11.0 cluster to our Redshift store.

The approaches I followed was -

  1. Use the inbuilt Redshift JDBC

    pyspark --jars /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar
    
    from pyspark.sql import SQLContext
    sc
    sql_context = SQLContext(sc)
    
    redshift_url = "jdbc:redshift://HOST:PORT/DATABASE?user=USER&password=PASSWORD"
    
    redshift_query  = "select * from table;"
    
    redshift_query_tempdir_storage = "s3://personal_warehouse/wip_dumps/"        
    
    # Read data from a query
    df_users = sql_context.read \
        .format("com.databricks.spark.redshift") \
        .option("url", redshift_url) \
        .option("query", redshift_query) \
        .option("tempdir", redshift_query_tempdir_storage) \
        .option("forward_spark_s3_credentials", "true") \
        .load()
    

    This gives me the following error -

Traceback (most recent call last): File "", line 7, in File "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 165, in load return self._df(self._jreader.load()) File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, kw) File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value ***py4j.protocol.Py4JJavaError: An error occurred while calling o63.load. : java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark.redshift. Please find packages at http://spark.apache.org/third-party-projects.html at* org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:546) at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:87) at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:87) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:302) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.databricks.spark.redshift.DefaultSource at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$22$$anonfun$apply$14.apply(DataSource.scala:530) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$22$$anonfun$apply$14.apply(DataSource.scala:530) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$22.apply(DataSource.scala:530) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$22.apply(DataSource.scala:530) at scala.util.Try.orElse(Try.scala:84) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:530) ... 16 more

Can someone please help tell where I've missed out on something / made a stupid mistake?

Thanks!

3

3 Answers

3
votes

I had to include 4 jar files in the EMR spark-submit options to get this working.

List of jar files:

1.RedshiftJDBC41-1.2.12.1017.jar

2.spark-redshift_2.10-2.0.0.jar

3.minimal-json-0.9.4.jar

4.spark-avro_2.11-3.0.0.jar

You can download the jar files and store them on a S3 bucket and point to it in the spark-submit options like :

--jars s3://<pathToJarFile>/RedshiftJDBC41-1.2.10.1009.jar,s3://<pathToJarFile>/minimal-json-0.9.4.jar,s3://<pathToJarFile>/spark-avro_2.11-3.0.0.jar,s3://<pathToJarFile>/spark-redshift_2.10-2.0.0.jar

And then finally query your redshift like in this example : spark-redshift-example in your spark code.

2
votes

You need to add the Spark Redshift datasource to your pyspark command:

pyspark --jars /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar \
        --packages com.databricks:spark-redshift_2.11:2.0.1
0
votes

The problem is that spark is not finding the necessary packages in the moment to execute it. To do this at the time of executing the script .sh that launches the execution of the python file you have to add not only the driver but also the necessary package.

script test.sh

sudo pip install boto3

spark-submit --jars RedshiftJDBC42-1.2.15.1025.jar --packages com.databricks:spark-redshift_2.11:2.0.1 test.py

script test.py

from pyspark.sql import SQLContext
sc
sql_context = SQLContext(sc)

redshift_url = "jdbc:redshift://HOST:PORT/DATABASE?user=USER&password=PASSWORD"

redshift_query  = "select * from table;"

redshift_query_tempdir_storage = "s3://personal_warehouse/wip_dumps/" 



 # Read data from a query



df_users = sql_context.read \
    .format("com.databricks.spark.redshift") \
    .option("url", redshift_url) \
    .option("query", redshift_query) \
    .option("tempdir", redshift_query_tempdir_storage) \
    .option("forward_spark_s3_credentials", "true") \
    .load()

Run the script test.sh

sudo sh test.sh

The problem must be solved now.