1
votes

I am trying to Insert the Deserialized Kafka records to Data Stax Cassandra using Spark Structure Streaming using Foreach Sink.

For example, my deserialized Data frame data like all are in string format.

id   name    date
100 'test' sysdate

Using foreach Sink I created a class and trying to insert the records as below by converting it.

session.execute(
  s"""insert into ${cassandraDriver.namespace}.${cassandraDriver.brand_dub_sink} (id,name,date)
  values  ('${row.getAs[Long](0)}','${rowstring(1)}','${rowstring(2)}')"""))
  }
)

I exactly followed this project https://github.com/epishova/Structured-Streaming-Cassandra-Sink/blob/master/src/main/scala/cassandra_sink.scala

when inserting into Cassandra table converting the string "id" column datatype to Long as mentioned above, It is not converting. And throwing error

"Invalid STRING constant (100) for "id" of type bigint"

CASSANDRA TABLE;-

create table test(
id bigint,
name text,
date timestamp)

Any suggestions to convert the string datatype to Long inside "def Process".

It will be great any alternative suggestion also.Thanks

This is the code:

import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.expr

class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] {
  // This class implements the interface ForeachWriter, which has methods that get called 
  // whenever there is a sequence of rows generated as output

  var cassandraDriver: CassandraDriver = null;
  def open(partitionId: Long, version: Long): Boolean = {
    // open connection
    println(s"Open connection")
    true
  }

  def process(record: org.apache.spark.sql.Row) = {
    println(s"Process new $record")
    if (cassandraDriver == null) {
      cassandraDriver = new CassandraDriver();
    }
    cassandraDriver.connector.withSessionDo(session =>
      session.execute(s"""
       insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} (fx_marker, timestamp_ms, timestamp_dt)
       values('${record.getLong(0)}', '${record(1)}', '${record(2)}')""")
    )
  }

  def close(errorOrNull: Throwable): Unit = {
    // close the connection
    println(s"Close connection")
  }
}

class SparkSessionBuilder extends Serializable {
  // Build a spark session. Class is made serializable so to get access to SparkSession in a driver and executors. 
  // Note here the usage of @transient lazy val 
  def buildSparkSession: SparkSession = {
    @transient lazy val conf: SparkConf = new SparkConf()
    .setAppName("Structured Streaming from Kafka to Cassandra")
    .set("spark.cassandra.connection.host", "ec2-52-23-103-178.compute-1.amazonaws.com")
    .set("spark.sql.streaming.checkpointLocation", "checkpoint")

    @transient lazy val spark = SparkSession
    .builder()
    .config(conf)
    .getOrCreate()

    spark
  }
}

class CassandraDriver extends SparkSessionBuilder {
  // This object will be used in CassandraSinkForeach to connect to Cassandra DB from an executor.
  // It extends SparkSessionBuilder so to use the same SparkSession on each node.
  val spark = buildSparkSession

  import spark.implicits._

  val connector = CassandraConnector(spark.sparkContext.getConf)

  // Define Cassandra's table which will be used as a sink
  /* For this app I used the following table:
       CREATE TABLE fx.spark_struct_stream_sink (
       id Bigint,
       name text,
       timestamp_dt date,
       primary key (id));
  */
  val namespace = "fx"
  val foreachTableSink = "spark_struct_stream_sink"
}

object KafkaToCassandra extends SparkSessionBuilder {
  // Main body of the app. It also extends SparkSessionBuilder.
  def main(args: Array[String]) {
    val spark = buildSparkSession

    import spark.implicits._

    // Define location of Kafka brokers:
    val broker = "ec2-18-209-75-68.compute-1.amazonaws.com:9092,ec2-18-205-142-57.compute-1.amazonaws.com:9092,ec2-50-17-32-144.compute-1.amazonaws.com:9092"

    /*Here is an example massage which I get from a Kafka stream. It contains multiple jsons separated by \n 
    {"100": "test1", "01-mar-2018"}
    {"101": "test2", "02-mar-2018"}  */
    val dfraw = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker)
    .option("subscribe", "currency_exchange")
    .load()

    val schema = StructType(
      Seq(
        StructField("id", StringType, false),
        StructField("name", StringType, false),
StructField("date", StringType, false)

      )
    )

    val df = dfraw
    .selectExpr("CAST(value AS STRING)").as[String]
    .flatMap(_.split("\n"))

    val jsons = df.select(from_json($"value", schema) as "data").select("data.*")


    val sink = jsons
    .writeStream
    .queryName("KafkaToCassandraForeach")
    .outputMode("update")
    .foreach(new CassandraSinkForeach())
    .start()

    sink.awaitTermination()
  }
}  

My modified code;-

def open(partitionId: Long, version: Long): Boolean = {
    // open connection
    println(s"in my Open connection")
    val cassandraDriver = new CassandraDriver();
    true
  }


  def process(record: Row) = {


    val optype = record(0)

    if (cassandraDriver == null) {
      val  cassandraDriver = new CassandraDriver();
    }

  if (optype == "I" || optype == "U") {

        println(s"Process insert or Update Idempotent new $record")

        cassandraDriver.connector.withSessionDo(session =>{
          val prepare_rating_brand = session.prepare(s"""insert into ${cassandraDriver.namespace}.${cassandraDriver.brand_dub_sink} (table_name,op_type,op_ts,current_ts,pos,brand_id,brand_name,brand_creation_dt,brand_modification_dt,create_date) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""")

          session.execute(prepare_rating_brand.bind(record.getAs[String](0),record.getAs[String](1),record.getAs[String](2),record.getAs[String](3),record.getAs[String](4),record.getAs[BigInt](5),record.getAs[String](6),record.getAs[String](7),record.getAs[String](8),record.getAs[String](9))
          )

        })
      }  else if (optype == "D") {

        println(s"Process delete new $record")
        cassandraDriver.connector.withSessionDo(session =>
          session.execute(s"""DELETE FROM ${cassandraDriver.namespace}.${cassandraDriver.brand_dub_sink} WHERE brand_id = ${record.getAs[Long](5)}"""))

      } else if (optype == "T") {
        println(s"Process Truncate new $record")
        cassandraDriver.connector.withSessionDo(session =>
          session.execute(s"""Truncate table  ${cassandraDriver.namespace}.${cassandraDriver.plan_rating_archive_dub_sink}"""))

      }
    }

  def close(errorOrNull: Throwable): Unit = {
    // close the connection
    println(s"Close connection")
  }


}

1
If you're using Spark 2.4.x, then you can use the foreachBatch instead - it's much simpler than foreach. Here is an example: github.com/alexott/spark-cassandra-oss-demos/blob/master/src/… - Alex Ott
Thanks, @Alex Ott. I am using Spark 2.2 with datastax 6.0. - venkat Ramanan VTR
if you're using DataStax distribution, you can simply use writeStream instead, withouth this processor: github.com/alexott/dse-java-playground/blob/master/src/main/… - in another file in same folder, there is an example for Kafka - Alex Ott
ok, @AlexOtt, Consider this scenario I am having 10 queries writing in datastax, One query got terminated in spark. I am having streaming query manager, Checkpoint and exception handling everything. Do you guide me on how to restart that particular query without affecting the session? since I already have queryawaittermination() handled. - venkat Ramanan VTR
Hmmm, I'm not understand it fully. Query is usually retried automatically if possible: docs.datastax.com/en/developer/java-driver/3.7/manual/retries but you may need to specifically mark it as idempotent. But as I mentioned, if you're on DataStax Enterprise - you don't need to write all this code yourself - you just need to use writeStream - then DSE connector will handle everything automatically - Alex Ott

1 Answers

0
votes

Your error is that you specify value for id field as '${row.getAs[Long](0)}' - you've added the single quotes around it, so it's treated as string, not as a long/bigint - just remove single quotes around this value: ${row.getAs[Long](0)}...

Also, for performance reasons it's better to move instantiation of the cassandra driver into open method, and use the prepared statements, something like this:

  var cassandraDriver: CassandraDriver = null;
  var preparedStatement: PreparedStatement = null;
  def open(partitionId: Long, version: Long): Boolean = {
    // open connection
    println(s"Open connection")
    cassandraDriver = new CassandraDriver();
    preparedStatement = cassandraDriver.connector.withSessionDo(session =>
      session.prepare(s"""
       insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} 
      (fx_marker, timestamp_ms, timestamp_dt) values(?, ?, ?)""")
    true
  }

  def process(record: org.apache.spark.sql.Row) = {
    println(s"Process new $record")
    cassandraDriver.connector.withSessionDo(session =>
      session.execute(preparedStatement.bind(${record.getLong(0)}, 
           ${record(1)}, ${record(2)}))
    )
  }

it will be more performant, and you'll need not to perform quoting of the values yourself.