I want to write spark structured streaming data into cassandra. My spark version is 2.4.0.
My input source from Kafka is with JSON, so when writing to the console, it is OK, but when I query in the cqlsh Cassandra there is no record appended to the table. Can you tell me what is wrong?
schema = StructType() \
.add("humidity", IntegerType(), True) \
.add("time", TimestampType(), True) \
.add("temperature", IntegerType(), True) \
.add("ph", IntegerType(), True) \
.add("sensor", StringType(), True) \
.add("id", StringType(), True)
def writeToCassandra(writeDF, epochId):
writeDF.write \
.format("org.apache.spark.sql.cassandra") \
.mode('append') \
.options("spark.cassandra.connection.host", "cassnode1, cassnode2") \
.options(table="sensor", keyspace="sensordb") \
.save()
# Load json format to dataframe
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafkanode") \
.option("subscribe", "iot-data-sensor") \
.load() \
.select([
get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c)
for c in ["humidity", "time", "temperature", "ph", "sensor", "id"]])
df.writeStream \
.foreachBatch(writeToCassandra) \
.outputMode("update") \
.start()