0
votes

I'm trying to query local Cassandra tables using Apache-Spark, however I'm running into this error when running any select show statement

Could not initialize class com.datastax.spark.connector.types.TypeConverter$

Versions:

  • Cassandra:version 3.11.2 | cqlsh version 5.0.1

  • Apache-Spark: version 2.3.1

  • Scala version 2.12.6

Cassandra Keyspace -> Table

CREATE KEYSPACE test_users
...   WITH REPLICATION = { 
...    'class' : 'SimpleStrategy', 
...    'replication_factor' : 1 
...   };

CREATE TABLE member (
    member_id bigint PRIMARY KEY,
    member_name varchar,
    member_age int
);


cqlsh> select * from member;

+---------+----------+-----------------+
|member_id|member_age|      member_name|
+---------+----------+-----------------+
|        5|        53|     Walter White|
|        6|        29|Henry Derplestick|
|        1|        67|      Larry David|
|        4|        31|       Joe Schmoe|
|        2|        19|  Karen Dinglebop|
|        3|        49|     Kenny Logins|
+---------+----------+-----------------+

QueryMembers.Scala

spark-shell  --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3 --conf spark.cassandra.connection.host="10.0.0.233" 

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.expressions.Window
import spark.implicits._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.joda.time.LocalDate
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.{litrow_number}
import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.SQLContext

val conf = new SparkConf(true).
  set("spark.cassandra.connection.host", "10.0.0.233").
  set("spark.cassandra.connection.port", "9042")

val sc = new SparkContext("spark://10.0.0.233:9042", "test", conf)

val members = spark.
        read.
        format("org.apache.spark.sql.cassandra").
        options(Map( "table" -> "member", "keyspace" -> "test_users" )).
        load()

members.printSchema()

val older_members = members.select("member_id", "member_age", "member_name").
  where("member_age > 50")
older_members: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [member_id: bigint, member_age: int ... 1 more field]

older_members.show() // breaks here

Errors

Caused by: java.io.IOException: Exception during preparation of SELECT "member_id", "member_age", "member_name" FROM "test_users"."member" WHERE token("member_id") > ? AND token("member_id") <= ? ALLOW FILTERING: Could not initialize class com.datastax.spark.connector.types.TypeConverter$

2018-07-29 18:57:09 ERROR Executor:91 - Exception in task 0.0 in stage 10.0 (TID 29)
java.io.IOException: Exception during preparation of SELECT "member_id", "member_age", "member_name" FROM "test_users"."member" WHERE token("member_id") > ? AND token("member_id") <= ?   ALLOW FILTERING: Could not initialize class com.datastax.spark.connector.types.TypeConverter$
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:293)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:307)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$19.apply(CassandraTableScanRDD.scala:335)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$19.apply(CassandraTableScanRDD.scala:335)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class com.datastax.spark.connector.types.TypeConverter$
    at com.datastax.spark.connector.types.BigIntType$.converterToCassandra(PrimitiveColumnType.scala:50)
    at com.datastax.spark.connector.types.BigIntType$.converterToCassandra(PrimitiveColumnType.scala:46)
    at com.datastax.spark.connector.types.ColumnType$.converterToCassandra(ColumnType.scala:229)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$13.apply(CassandraTableScanRDD.scala:282)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$13.apply(CassandraTableScanRDD.scala:282)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:

Any insight would be greatly appreciated.

3

3 Answers

3
votes

For Spark 2.3.0 you need to use the latest version of spark-cassandra-connector: 2.3.1...

Also, you must not use version 2.0.0-M3 - it's the pre-release version. Latest version in the 2.0.x series is 2.0.9. You need always check the versions at Maven Central.

0
votes

After researching further I decided to try rolling back the Apache Spark version from 2.3.1 to 2.2.0, which has resolved the issue.

I'm now able to run and get the results of the query

val older_members = members.select("member_id", "member_age", "member_name").
 where("member_age > 50")

 older_members.show 

enter image description here

0
votes

I have the same issue with this connector. spent almost 1 day to figure out. Dont use spark-cassandra-connector_2.10-2.0.0-M3, Use newer version of spark-cassandra-connector.

Thanks @Alex Ott