2
votes

After clearing my keyspace via:

drop keyspace simplex

I'm executing the following commands via the datastax java Cassandra client (from scala code):

  val songsTable = (
    "CREATE TABLE IF NOT EXISTS simplex.songs ("
    + "id uuid PRIMARY KEY,"
    + "title text,"
    + "album text,"
    + "artist text,"
    + "tags set<text>,"
    + "data blob"
    + ");")

  val listsTable = (
    "CREATE TABLE IF NOT EXISTS simplex.playlists ("
    + "id bigint,"
    + "title text,"
    + "album text, "
    + "artist text,"
    + "song_id uuid,"
    + "PRIMARY KEY (id, title, album, artist)"
    + ");")

  val songs = (
    "INSERT INTO simplex.songs "
    + "(id, title, album, artist, tags) "
    + "VALUES ("
    + "756716f7-2e54-4715-9f00-91dcbea6cf50,"
    + "'La Petite Tonkinoise',"
    + "'Bye Bye Blackbird',"
    + "'Joséphine Baker',"
    + "{'jazz', '2013'}"
    + ");")
  ...
  ...
  ...
  val rsf = session.executeAsync(command) // in parallel
  rsf.addListener(() => p.success(rsf.get), exec)

This results in only the playlist table being created, and the callback for the "create songs table" and "insert song" commands never get executed.

It was my understanding that the datastax java Cassandra client was safe to use concurrently. Is this not the case? What is the problem with my assumptions?

1
What is command? (is it the concatenation of those statements?)Alex Popescu
No, each is executed in parallel (i.e., s.executeAsync(cmd1); s.executeAsync(cmd2); s.executeAsync(cmd3)).jonderry
@jonderry what makes you think they're executed in proper order? e.g. you're inserting in table which not exists yetom-nom-nom
Yes, it's true this is an issue, but the same problem occurs if I enforce sequential ordering using onSuccess or andThen. For the record, I found there actually was an error being returned that I didn't properly propagate to the promise, so hanging is not an issue, just the fact that there is an error even when run the following code createKeyspaceSimplex onSuccess createTablePlaylists. I get an exception: Cannot add column family 'playlists' to non existing keyspace 'simplex'.jonderry

1 Answers

1
votes

When you are creating tables or keyspaces - common in integration tests - you will need to close the Session and Cluster instances because the Cluster metadata goes out of date. Although I see a log message saying that the driver is attempting to async update the metadata it never seems to return in a timely fashion - making closing and reopening the Cluster and Session instances performant enough for integration tests.

Because Cluster.connect(keyspaceName) exists and I cannot seem to get USE keyspace; to work correctly as an executed command (nevermind what that means for sharing a session across the application where any client could just use another keyspace arbitrarily) I ended up writing the following:

(Here _cluster is an Option[Cluster] that is initialized during preStart for my CassandraSessionActor and keyspace is an Option[String] that implementors define. CassandraSessionActor returns a Session when requested, but invokes getSession once during preStart so it can share the session across the application, as per the Datastax 4 Simple Rules.)

trait AutoCreateKeyspace extends CassandraSessionActor {
  override def getSession = {
    keyspace match {
      case None => _cluster.get.connect()
      case Some(ks) =>
        _cluster.map { cluster =>
          if (cluster.getMetadata.getKeyspace(ks) == null &&
            cluster.getMetadata.getKeyspace(ks.toLowerCase) == null) {
            val temporarySession = cluster.connect()
            log.debug(s"creating keyspace $ks")
            temporarySession.execute(s"""CREATE KEYSPACE $ks
                                       |with replication = {
                                       |'class':                'SimpleStrategy',
                                       |'replication_factor':    2 }""".stripMargin)
            temporarySession.close()
            cluster.close()
          }
        }
        _cluster = Some(Cassandra.createNewCluster())
        _cluster.get.connect(ks)
    }

  }
}

private object Cassandra extends LazyLogging {

  def conf = ConfigFactory.load()

  val hosts = conf.getStringList("cassandra.hosts")

  logger.info(s"cassandra.hosts specified: $hosts")


  def createNewCluster(): Cluster = {
    Cluster.builder()
      .addContactPoints(hosts: _*)
      .build()
  }

}