1
votes

I am using a data stream to be written to a kafka topic as well as hbase. For Kafka, I use a format as this:

dataset.selectExpr("id as key", "to_json(struct(*)) as value")
        .writeStream.format("kafka")
        .option("kafka.bootstrap.servers", Settings.KAFKA_URL)
        .option("topic", Settings.KAFKA_TOPIC2)
        .option("checkpointLocation", "/usr/local/Cellar/zookeepertmp")
        .outputMode(OutputMode.Complete())
        .start()

and then for Hbase, I do something like this:

  dataset.writeStream.outputMode(OutputMode.Complete())
    .foreach(new ForeachWriter[Row] {
      override def process(r: Row): Unit = {
        //my logic
      }

      override def close(errorOrNull: Throwable): Unit = {}

      override def open(partitionId: Long, version: Long): Boolean = {
        true
      }
    }).start().awaitTermination()

This writes to Hbase as expected but doesn't always write to the kafka topic. I am not sure why that is happening.

1
doesn't always write to the kafka topic. What does "Doesn't always" mean? - Yuval Itzchakov
That sometimes both work fine and sometimes the kafka part doesn't work at all - gmoksh
stackoverflow.com/questions/45331883/… It looks like a similar issue but I can't see a solution - gmoksh
Do you see any errors in the Spark log? Driver/Executors? - Yuval Itzchakov
I don't see you using awaitTermination on the first query. - Yuval Itzchakov

1 Answers

1
votes

Use foreachBatch in spark:

If you want to write the output of a streaming query to multiple locations, then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    batchDF.persist()
    batchDF.write.format(…).save(…) // location 1
    batchDF.write.format(…).save(…) // location 2
    batchDF.unpersist()
}