1
votes

I have Cassandra 3.0.0 running on my localhost (127.0.0.1:9042). It is accessible from cqlsh and I can create/query tables.

In the Spark project I added maven dependencies for Cassandra connector, Cassandra driver and so on.

When I now try to read or insert data to Cassandra, the Cassandra Connector connects to the Cassandra Cluster, but I get the following error: Exception in thread "main" java.util.NoSuchElementException: key not found: 'text'

I tried it with different tables, versions, coding and so on. Nothing helped. I think that this might be a problem because of wrong or missing maven dependencies. Maybe you can help me. Here is my coding and dependencies:

Cassandra table: CREATE TABLE mykeyspace2.kv(key text PRIMARY KEY, value int);

Spark code:

val master = config.getString(Configuration.SPARK_MASTER)
logger.info("Starting, spark master:$master")

val sparkConf = new SparkConf()
  .setAppName("test streaming")
  .setMaster(master)
  .set("spark.cassandra.connection.host", "127.0.0.1")

val sc = new SparkContext(sparkConf)
val cc = new CassandraSQLContext(sc)


val newRdd = sc.parallelize(Seq(("cat",40),("fox",50)))
newRdd.saveToCassandra("mykeyspace2","kv",SomeColumns("key", "value"))

val rdd = sc.cassandraTable("mykeyspace2", "users3")

Maven dependencies:

<dependencies>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.5.0-M3</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>1.5.0-M3</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>3.0.0-alpha4</version>
    </dependency>

    <!-- Spark dependencies -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.5.0</version>
    </dependency>

</dependencies>

Error:

15/12/04 07:43:40 INFO Cluster: New Cassandra host /127.0.0.1:9042 added 15/12/04 07:43:40 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster Exception in thread "main" java.util.NoSuchElementException: key not found: 'text' at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at com.datastax.spark.connector.types.ColumnType$.fromDriverType(ColumnType.scala:81) at com.datastax.spark.connector.cql.ColumnDef$.apply(Schema.scala:117) at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchPartitionKey$1.apply(Schema.scala:199) at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchPartitionKey$1.apply(Schema.scala:198) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchPartitionKey(Schema.scala:198) at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:239) at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:238) at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722) at scala.collection.immutable.Set$Set3.foreach(Set.scala:115) at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721) at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchTables$1(Schema.scala:238) at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:247) at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:246) at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722) at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153) at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306) at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721) at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1(Schema.scala:246) at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:252) at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:249) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:121) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:120) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109) at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109) at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120) at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:249) at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:263) at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)

2
Try to give some different name for column instead of keyKaushal
Hi kaushal, I changed column names: CREATE TABLE mykeyspace2.kv3(mykey text PRIMARY KEY, myvalue int); But I still get the same error :/AlexL

2 Answers

3
votes

I tried out several combinations of version of the connector and the driver. Finally I found one combination, which is working for me. Actually the compatibility matrix from the spark connector git page seems not to be correct. With their mentioned compatibility list, the components did not work (https://github.com/datastax/spark-cassandra-connector).

The working combination is the following:

Cassandra 2.1.12

   <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.4.0</version>
    </dependency>

    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>2.1.9</version>
    </dependency>

    <!-- Spark dependencies -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.4.0</version>
    </dependency>
0
votes

I am using Cassandra 3.2.1, below dependency's working fine for me. Late answer but it may help someone.

For below version I got error like insufficient heap memory so I have set arguments as -Xmx1024m -Xms512m

<!-- Spark dependencies -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.6.0</version>
</dependency>
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.10</artifactId>
    <version>1.6.2</version>
</dependency>