I'm trying use cassandra as a Key-Value Lookupstore in some of our spark jobs.
We use Dataframes primarily and have moved away from the RDD APIs.
Instead of joining with the tables, loading them into spark or
pushing the join to cassandra and taking measures to avoid large
table scans, i thought i could just write a Spark UDF that connects to cassandra a looks up one keyI additionally want to convert the result row into a case class object and return the object.
I got some of this information based on responses from this question below. withSessionDo reuses an underlying JVM Level Session that is available on each node Spark Cassandra Connector proper usage
val connector = CassandraConnector(sparkConf) // I Know this is serializable.
def lookupKey(connector: CassandraConnector, keyspace: String, table: String): UserDefineFunction = udf((key: String) => {
connector.withSessionDo(session => {
val stmt = session.prepare(s"SELECT * FROM $keyspace.$table WHERE key = ?")
val result = session.execute( stmt.bind(key) )
MyCaseClass(
fieldl1 = result.getString(0),
fieldl2 = result.getInt(1)
...
)
}
})
Session isn't serializable so we cannot create one outside the udf and pass it in so we can use mapping manager to convert the rows to case class instances. An Alternative approach using Mapping Manager,
def lookupKeyAlt(connector: CassandraConnector, keyspace: String, table: String): UserDefineFunction = udf((key: String) => {
connector.withSessionDo(session => {
val manager = new MappingManager(session) // session isn't serializable, so creating one outside and passing to udf is not an option if wf we were willing to do the session management.
val mapperClass = manager.mapper(classOf[MyCaseClass], keyspace)
mapperClass.get(key)
}
})
I am new to cassandra so please bear with me on a few questions.
- Are there any Gotchas in these approaches that i am not aware of ?
- In the Second approach, i understand we are creating a new MappingManager(session) with every call of the UDF. Will this still use the jvm level session and open any more sessions ? Is it even right to instantiate MappingManager with every call ? The session isn't serializable so i cant create it outside and pass it to the UDF.
- What are the some other ways to convert a result Row to an object of a Case Class ?
- Are there any better alternatives to do this kind of lookup ?