4
votes

I'm looking to use spark for some ETL, which will mostly consist of "update" statements (a column is a set, that'll be appended to, so a simple insert is likely not going to work). As such, it seems like issuing CQL queries to import the data is the best option. Using the Spark Cassandra Connector, I see I can do this:

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra

Now I don't want to open a session and close it for every row in the source (am I right in not wanting this? Usually, I have one session for the entire process, and keep using that in "normal" apps). However, it says that the connector is serializable, but the session is obviously not. So, wrapping the whole import inside a single "withSessionDo" seems like it'll cause problems. I was thinking of using something like this:

class CassandraStorage(conf:SparkConf) {
  val session = CassandraConnector(conf).openSession()
  def store (t:Thingy) : Unit = {
    //session.execute cql goes here
  }
}

Is this a good approach? Do I need to worry about closing the session? Where / how best would I do that? Any pointers are appreciated.

1
I was wondering why you might not just want to create the Spark Conf object and the reference that in a spark context like the examples show on further up on the connector page you reference there? You should be able to just create the conf object then keep the context open while you run your queries.markc

1 Answers

3
votes

You actually do want to use withSessionDo because it won't actually open and close a session on every access. Under the hood, withSessionDo accesses a JVM level session. This means you will only have one session object PER cluster configuration PER node.

This means code like

val connector = CassandraConnector(sc.getConf)
sc.parallelize(1 to 10000000L).map(connector.withSessionDo( Session => stuff)

Will only ever make 1 cluster and session object on each executor JVM regardless of how many cores each machine has.

For efficiency i would still recommend using mapPartitions to minimize cache checks.

sc.parallelize(1 to 10000000L)
  .mapPartitions(it => connector.withSessionDo( session => 
      it.map( row => do stuff here )))

In addition the session object also uses a prepare cache, which lets you cache a prepared statement in your serialized code, and it will only ever be prepared once per jvm(all other calls will return the cache reference.)