I want to write spark structured streaming data into cassandra. My spark version is 2.4.0.
I've research some post and some of uses DataStax enterprise platform.
I've didn't use it and found a method foreachBatch which helps for write streaming data to sink.
I've review a docs based on the databricks site. And try it own.
This is the code I've written:
parsed = parsed_opc \
.withWatermark("sourceTimeStamp", "10 minutes") \
.dropDuplicates(["id", "sourceTimeStamp"]) \
.groupBy(
window(parsed_opc.sourceTimeStamp, "4 seconds"),
parsed_opc.id
) \
.agg({"value": "avg"}) \
.withColumnRenamed("avg(value)", "avg")\
.withColumnRenamed("window", "sourceTime")
def writeToCassandra(writeDF, epochId):
writeDF.write \
.format("org.apache.spark.sql.cassandra")\
.mode('append')\
.options(table="opc", keyspace="poc")\
.save()
parsed.writeStream \
.foreachBatch(writeToCassandra) \
.outputMode("update") \
.start()
The schema of the parsed dataframe is:
root
|-- sourceTime: struct (nullable = false)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- id: string (nullable = true)
|-- avg: double (nullable = true)
I can succesfully write this streaming df to console like this:
query = parsed \
.writeStream \
.format("console")\
.outputMode("complete")\
.start()
And the outputs as follows in console:
+--------------------+----+---+
| sourceTime| id|avg|
+--------------------+----+---+
|[2019-07-20 18:55...|Temp|2.0|
+--------------------+----+---+
So, when writing to the console, thats OK.
But when I query in the cqlsh there is no record appended to the table.
This is the table create script in cassandra:
CREATE TABLE poc.opc ( id text, avg float,sourceTime timestamp PRIMARY KEY );
So, Can you tell me what is wrong?