I have Cassandra 3.0.0 running on my localhost (127.0.0.1:9042). It is accessible from cqlsh and I can create/query tables.
In the Spark project I added maven dependencies for Cassandra connector, Cassandra driver and so on.
When I now try to read or insert data to Cassandra, the Cassandra Connector connects to the Cassandra Cluster, but I get the following error: Exception in thread "main" java.util.NoSuchElementException: key not found: 'text'
I tried it with different tables, versions, coding and so on. Nothing helped. I think that this might be a problem because of wrong or missing maven dependencies. Maybe you can help me. Here is my coding and dependencies:
Cassandra table:
CREATE TABLE mykeyspace2.kv(key text PRIMARY KEY, value int);
Spark code:
val master = config.getString(Configuration.SPARK_MASTER)
logger.info("Starting, spark master:$master")
val sparkConf = new SparkConf()
.setAppName("test streaming")
.setMaster(master)
.set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext(sparkConf)
val cc = new CassandraSQLContext(sc)
val newRdd = sc.parallelize(Seq(("cat",40),("fox",50)))
newRdd.saveToCassandra("mykeyspace2","kv",SomeColumns("key", "value"))
val rdd = sc.cassandraTable("mykeyspace2", "users3")
Maven dependencies:
<dependencies>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>1.5.0-M3</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.10</artifactId>
<version>1.5.0-M3</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.0.0-alpha4</version>
</dependency>
<!-- Spark dependencies -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.5.0</version>
</dependency>
</dependencies>
Error:
15/12/04 07:43:40 INFO Cluster: New Cassandra host /127.0.0.1:9042 added
15/12/04 07:43:40 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
Exception in thread "main" java.util.NoSuchElementException: key not found: 'text'
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at com.datastax.spark.connector.types.ColumnType$.fromDriverType(ColumnType.scala:81)
at com.datastax.spark.connector.cql.ColumnDef$.apply(Schema.scala:117)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchPartitionKey$1.apply(Schema.scala:199)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchPartitionKey$1.apply(Schema.scala:198)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchPartitionKey(Schema.scala:198)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:239)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:238)
at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
at scala.collection.immutable.Set$Set3.foreach(Set.scala:115)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchTables$1(Schema.scala:238)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:247)
at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:246)
at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1(Schema.scala:246)
at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:252)
at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:249)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:121)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:120)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120)
at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:249)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:263)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
CREATE TABLE mykeyspace2.kv3(mykey text PRIMARY KEY, myvalue int);
But I still get the same error :/ – AlexL