0
votes

I want to write spark structured streaming data into cassandra. My spark version is 2.4.0.

My input source from Kafka is with JSON, so when writing to the console, it is OK, but when I query in the cqlsh Cassandra there is no record appended to the table. Can you tell me what is wrong?

schema = StructType() \
            .add("humidity", IntegerType(), True) \
            .add("time", TimestampType(), True) \
            .add("temperature", IntegerType(), True) \
            .add("ph", IntegerType(), True) \
            .add("sensor", StringType(), True) \
            .add("id", StringType(), True)

def writeToCassandra(writeDF, epochId):
    writeDF.write \
        .format("org.apache.spark.sql.cassandra") \
        .mode('append') \
        .options("spark.cassandra.connection.host", "cassnode1, cassnode2") \
        .options(table="sensor", keyspace="sensordb") \
        .save()

# Load json format to dataframe
df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "kafkanode") \
      .option("subscribe", "iot-data-sensor") \
      .load() \
      .select([
            get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c)
            for c in ["humidity", "time", "temperature", "ph", "sensor", "id"]])

df.writeStream \
    .foreachBatch(writeToCassandra) \
    .outputMode("update") \
    .start()
1

1 Answers

0
votes

I had the same issue in pyspark. try below steps

  1. First, validate if it is connecting to cassandra. You can either point to a table which is not available and see if it is failing because of "table not found"

  2. Try writeStream as below (include trigger and output mode before calling the cassandra update)

df.writeStream \ .trigger(processingTime="10 seconds") \ .outputMode("update") \ .foreachBatch(writeToCassandra) \