1
votes

I am consuming from Kafka using spark structure streaming and inserting into Datastax Cassandra using Foreach. When I am inserting BigInt and String it is getting insert but when I insert Double values it is throwing "Codec not found for requested operation: [varchar <-> java.lang.Double]". How to write customized codec to accept Double, Long values in SCALA.

val view_a = VW_MS_PLAN_UNIT_LA
      .writeStream
      .option(WriteConf.IgnoreNullsParam.name, "true")
      .queryName("VIEW PLAN UNIT LA")
      .outputMode("Append")
      .foreach(new CassandraSinkForeach)
      .trigger(Trigger.ProcessingTime("10 seconds"))
      .start()


df.show()

Sample dataframe of Spark(Printschema) a - String b- Bigint C- Double

Sample Cassandra table;- Create table a ( a string, b bigint, c double )

 var cassandraDriver: CassandraDriver = null;
  var preparedStatement: PreparedStatement = null;
  def open(partitionId: Long, version: Long): Boolean = {
    // open connection
    println(s"Open connection")
    cassandraDriver = new CassandraDriver();
    preparedStatement = cassandraDriver.connector.withSessionDo(session =>
      session.prepare(s"""
       insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} 
      (a, b, c) values(?, ?, ?)""")
    true
  }

  def process(record: org.apache.spark.sql.Row) = {
    println(s"Process new $record")
    cassandraDriver.connector.withSessionDo(session =>
      session.execute(preparedStatement.bind(${record.getAs[String](0)}, 
           ${record.getAs[BigInt](1)}, ${record.getAs[Double](2)}))
    )
  }

com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [varchar <-> java.lang.Double]. Also my previous posts will help to describe it more How to change Datatypes of records inserting into Cassandra using Foreach Spark Structure streaming

1
Could it be that ${record.getAs[String](0) should be Double instead? - Jacek Laskowski
Thanks, Sir, I am a great follower of you and your page. Actually, I want to insert Double values to datastax Cassandra column.getAs[BigInt] is working fine with java. Long. But there is codec issue for inserting Double to Cassandra. I am using Spark/Scala.@JacekLaskowski - venkat Ramanan VTR
I think you should focus on developing a Scala example code that inserts a Double to Cassandra. I don't think it's anything related to Spark itself, isn't it? - Jacek Laskowski
Certainly, Sir, This is a Scala Codec issue. I am using Scala Object and getting ROW type record and separating it based on the columns. getAs[String] and getAs{bigInt] perfectly matching to text & Bigint Datatypes in Cassandra. only I am not able to convert Double because of its an object in scala. - venkat Ramanan VTR
Why aren't you trying to use writeStream if you're using DSE Analytics? - Alex Ott

1 Answers

1
votes

After looking into message again - your data doesn’t ma5ch to table structure. Just add explicit conversion...

Also, to work with Scala types in DataStax Java driver you can take codecs from the java-driver-scala-extras repository. Unfortunately there is no "official" jar builds for it, so you either need to compile and deploy code yourself, or just include pieces of the code into your project. There was a blog post on DataStax dev blog that explained how it's implemented.