I'am trying to create a structured Streaming from Kafka into Spark which is a json string. Now want to parse the json into specific column and then save the dataframe to cassandra table with optimum speed. Using Spark 2.4 and cassandra 2.11 (Apache) and Not DSE.
I have tried creating a Direct Stream which gives DStream of case class which I was saving into Cassandra using foreachRDD on DStream but this gets hang after every 6-7 days. So was trying to stream which gives dataframe directly and can be saved to Cassandra.
val conf = new SparkConf()
.setMaster("local[3]")
.setAppName("Fleet Live Data")
.set("spark.cassandra.connection.host", "ip")
.set("spark.cassandra.connection.keep_alive_ms", "20000")
.set("spark.cassandra.auth.username", "user")
.set("spark.cassandra.auth.password", "pass")
.set("spark.streaming.stopGracefullyOnShutdown", "true")
.set("spark.executor.memory", "2g")
.set("spark.driver.memory", "2g")
.set("spark.submit.deployMode", "cluster")
.set("spark.executor.instances", "4")
.set("spark.executor.cores", "2")
.set("spark.cores.max", "9")
.set("spark.driver.cores", "9")
.set("spark.speculation", "true")
.set("spark.locality.wait", "2s")
val spark = SparkSession
.builder
.appName("Fleet Live Data")
.config(conf)
.getOrCreate()
println("Spark Session Config Done")
val sc = SparkContext.getOrCreate(conf)
sc.setLogLevel("ERROR")
val ssc = new StreamingContext(sc, Seconds(10))
val sqlContext = new SQLContext(sc)
val topics = Map("livefleet" -> 1)
import spark.implicits._
implicit val formats = DefaultFormats
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "brokerIP:port")
.option("subscribe", "livefleet")
.load()
val collection = df.selectExpr("CAST(value AS STRING)").map(f => parse(f.toString()).extract[liveevent])
val query = collection.writeStream
.option("checkpointLocation", "/tmp/check_point/")
.format("kafka")
.format("org.apache.spark.sql.cassandra")
.option("keyspace", "trackfleet_db")
.option("table", "locationinfotemp1")
.outputMode(OutputMode.Update)
.start()
query.awaitTermination()
Expected is to save the dataframe to cassandra. But getting this error :-
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start()
.format("kafka").format("org.apache.spark.sql.cassandra")is not correct - OneCricketeerwriteStream.start()at the end of the code? - Soheil Pourbafrani