1
votes

I am trying to read scylladb table installed one pc into pyspark dataframe on another pc.
The 2 pcs have ssh connectivity and I am able to read the table via python code, there is problem only while connecting with spark.I have used this connector:

--packages datastax:spark-cassandra-connector:2.3.0-s_2.11 , 

My spark -version = 2.3.1 , scala-version-2.11.8.

**First Approach**
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
conf = SparkConf().set("spark.cassandra.connection.host","192.168.0.118")
sc = SparkContext(conf = conf)
spark=SparkSession.builder.config(conf=conf).appName('FinancialRecon').getOrCreate()
sqlContext =SQLContext(sc)
data=spark.read.format("org.apache.spark.sql.cassandra").options(table="datarecon",keyspace="finrecondata").load().show()

Resulting error:

File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 172, in load File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o43.load. : java.lang.ClassNotFoundException: org.apache.spark.Logging was removed in Spark 2.0. Please check if your library is compatible with Spark 2.0 at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:646) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:190) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NoClassDefFoundError: org/apache/spark/Logging at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:411) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618) at scala.util.Try.orElse(Try.scala:84) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618) ... 13 more Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 33 more

Another Approch that I have used is :

data=sc.read.format("org.apache.spark.sql.cassandra").options(table="datarecon",keyspace="finrecondata").load().show()

For this I get:

AttributeError: 'SparkContext' object has no attribute 'read'

Third Approach:

data=sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="datarecon",keyspace="finrecondata").load().show()

For this I get the same error as the first approach.

Please advice whether it is scylla spark connector issue or some spark library issue and how to solve it.

2

2 Answers

1
votes

Follow these steps :

1.Run the spark-shell with the packages line.To configure the default Spark Configuration pass key value pairs with --conf, In my case scylla host is 172.17.0.2

bin/spark-shell --conf spark.cassandra.connection.host=172.17.0.2 --packages datastax:spark-cassandra-connector:2.3.0-s_2.11

2.Enable Cassandra-specific functions on the SparkContext, SparkSession, RDD, and DataFrame:

import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._

3.Load data from scylla

val rdd = sc.cassandraTable("my_keyspace", "my_table")

4.Test

scala> rdd.collect().foreach(println)
CassandraRow{id: 1, name: ash}
0
votes

The resulting error occurs due to a version conflict. Maybe you can solve it reading here.

The first approach will work because read method is available on SparkSession.