1
votes

I'm trying to connect spark to cassandra and then I make a query to the keyspace and table from flask.

The problem is that when I run the web application I get an error saying that the keyspace in not created. cassandra.InvalidRequest: Error from server: code=2200 [Invalid query] message="Keyspace MyKeyspace does not exist"

In spark I run the following commands:

val flightRecommendations = finalPredictions.writeStream.foreachBatch {
      (batchDF: DataFrame, batchId: Long) =>
        batchDF
            .write
            .cassandraFormat("MytableName", "MyKeyspace") 
            .option("cluster", "cassandra_cluster")
            .mode("append")
            .save
    }.start()

My question is whether the above code automatically generates the keyspace and the table.

I think it could also be a problem of connection because I'm working in docker and the setting I put is this: spark.setCassandraConf("cassandra_cluster", CassandraConnectorConf.ConnectionHostParam.option("cassandra"))

Also in the spark-submit command I put the following two configurations:

--conf spark.cassandra.connection.host=cassandra \
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions \

It's weird because the spark-submit doesn't give errors but the keyspace is not created.

1

1 Answers

0
votes

Yes, that's possible with since Spark Cassandra Connector 2.5.0. There is a new function createCassandraTableEx that allows to create a new table based on the Dataframe schema and it has an option to handle the cases where table does already exist (in addition to other things, like, control the sorting of the clustering columns, table options, etc.) - before 2.5.0 there was the createCassandraTable function, but it thrown exception if table already exists.

Here is example from the blog post that announces 2.5.0 release. For dataframe with following structure:

root
 |-- id: integer (nullable = false)
 |-- c: integer (nullable = false)
 |-- t: string (nullable = true)

it's possible to create a new table using following code:

import com.datastax.spark.connector.cql.ClusteringColumn
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector._

data.createCassandraTableEx("test", "test_new", Seq("id"), 
  Seq(("c", ClusteringColumn.Descending)),
  ifNotExists = true, tableOptions = Map("gc_grace_seconds" -> "1000"))

And you don't need to use foreachBatch with that version - it was required only before 2.5.0 - in new version you can just write:

val query = streamingCountsDF.writeStream
      .outputMode(OutputMode.Update)
      .option("checkpointLocation", ".../checkpoint")
      .option("table", "tablename")
      .("keyspace", "ksname")
      .start()

And with Spark 3.x & SCC 3.x you can create keyspaces & tables in Cassandra using the Spark SQL - see documentation for more details.