I am consuming from Kafka using spark structure streaming and inserting into Datastax Cassandra using Foreach. When I am inserting BigInt and String it is getting insert but when I insert Double values it is throwing "Codec not found for requested operation: [varchar <-> java.lang.Double]". How to write customized codec to accept Double, Long values in SCALA.
val view_a = VW_MS_PLAN_UNIT_LA
.writeStream
.option(WriteConf.IgnoreNullsParam.name, "true")
.queryName("VIEW PLAN UNIT LA")
.outputMode("Append")
.foreach(new CassandraSinkForeach)
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
df.show()
Sample dataframe of Spark(Printschema) a - String b- Bigint C- Double
Sample Cassandra table;- Create table a ( a string, b bigint, c double )
var cassandraDriver: CassandraDriver = null;
var preparedStatement: PreparedStatement = null;
def open(partitionId: Long, version: Long): Boolean = {
// open connection
println(s"Open connection")
cassandraDriver = new CassandraDriver();
preparedStatement = cassandraDriver.connector.withSessionDo(session =>
session.prepare(s"""
insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink}
(a, b, c) values(?, ?, ?)""")
true
}
def process(record: org.apache.spark.sql.Row) = {
println(s"Process new $record")
cassandraDriver.connector.withSessionDo(session =>
session.execute(preparedStatement.bind(${record.getAs[String](0)},
${record.getAs[BigInt](1)}, ${record.getAs[Double](2)}))
)
}
com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [varchar <-> java.lang.Double]. Also my previous posts will help to describe it more How to change Datatypes of records inserting into Cassandra using Foreach Spark Structure streaming
${record.getAs[String](0)should beDoubleinstead? - Jacek LaskowskiwriteStreamif you're using DSE Analytics? - Alex Ott