1
votes

I'm using Spark Structured Streaming for Machine Learning purpose in real time, and I want to stored predictions in my Cassandra cluster.

Since I am in a streaming context, executing multiple times per seconds the same request, one mandatory optimization is to use PreparedStatement.

In the cassandra spark driver (https://github.com/datastax/spark-cassandra-connector) there is no way to use PreparedStatement (in scala or python, i'm not considering java as a option)

Should i use a scala (https://github.com/outworkers/phantom) / python (https://github.com/datastax/python-driver) cassandra driver ? How does it work then, my connection object need to be serializable to be passed to workers ?

If anyone can help me !

Thanks :)

1

1 Answers

2
votes

In order to do a prepared Statement and then register data in Cassandra while processing streaming with structured spark streaming, You need:

  • import com.datastax.driver.core.Session
  • import com.datastax.spark.connector.cql.CassandraConnector

Then, build your connector:

 val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf) 

Having both session and connector, you can now call the prepared Statement function you wrote in the Statement scala class

 connector.withSessionDo { session =>
 Statements.PreparedStatement()

}

You can finally finish by writing the data in Cassandra with the function below, cql being the function that binds the variables to the prepared Statement and execute it:

  private def processRow(value: Commons.UserEvent) = {
  connector.withSessionDo { session =>
  session.execute(Statements.cql(value.device_id, value.category, value.window_time, value.m1_sum_downstream, value.m2_sum_downstream))
}

}

Of course you'll have to call this function (processRow) in the foreach writer

     // This Foreach sink writer writes the output to cassandra.
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[Commons.UserEvent] {
  override def open(partitionId: Long, version: Long) = true
  override def process(value: Commons.UserEvent) = {
    processRow(value)
  }
  override def close(errorOrNull: Throwable) = {}
}

val query =
  ds.writeStream.queryName("aggregateStructuredStream").outputMode("complete").foreach(writer).start