2
votes

I implemented a spark job to read stream from a kafka topic with foreachbatch in the structured streaming.

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "mykafka.broker.io:6667")
  .option("subscribe", "test-topic")
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", "/home/hadoop/cacerts")
  .option("kafka.ssl.truststore.password", tspass)
  .option("kafka.ssl.truststore.type", "JKS")
  .option("kafka.sasl.kerberos.service.name", "kafka")
  .option("kafka.sasl.mechanism", "GSSAPI")
  .option("groupIdPrefix","MY_GROUP_ID")
  .load()

val streamservice = df.selectExpr("CAST(value AS STRING)")
  .select(from_json(col("value"), schema).as("data"))
  .select("data.*")


var stream_df = streamservice
  .selectExpr("cast(id as string) id", "cast(x as int) x")

val monitoring_stream = stream_df.writeStream
  .trigger(Trigger.ProcessingTime("120 seconds"))
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    if(!batchDF.isEmpty) { }
  }
  .start()
  .awaitTermination()

I have the following questions.

  1. If kafka topic does not have data for a long time, will stream_df.writeStream be terminated automatically? Are there some timeout control on this?

  2. If kafka topic is deleted from kafka broker, will stream_df.writeStream be terminated?

I hope that the spark job keep on monitoring on the kafka topic without termination in the above two cases. Do I need some special settings for kafka connector and/or stream_df.writerstream?

1
Have you tried testing the code and stopped messages in the Kafka topic? What happened?OneCricketeer
Thanks. I have no control on the kafka broker. It was provided by another system.yyuankm
You don't need to control the broker. Start the Spark code. Then stop your Kafka producerOneCricketeer
yeah, the problem is that kafka producer is also in another system. But I got a chance to test this scenario today. The topic has no data for the whole morning. My spark job just waits without timeout. When the data is back, it picks up all the data. It is just what I wanted. :-)yyuankm

1 Answers

1
votes
  1. If kafka topic does not have data for a long time, will stream_df.writeStream be terminated automatically? Are there some timeout control on this?

The termination of the query is independent of the data being processed. Even if no new messages are produced to your Kafka topic the query will keep running, as it is running as a stream.

I guess that is what you have already figured out yourself while testing. We are using structured streaming queries to process data from Kafka and they have no issues being idle for a longer time (for example over the week-end outside of business hours).

  1. If kafka topic is deleted from kafka broker, will stream_df.writeStream be terminated?

By default, if you delete the Kafka topic while your query is running an Exception is thrown:

ERROR MicroBatchExecution: Query [id = b1f84242-d72b-4097-97c9-ee603badc484, runId = 752b0fe4-2762-4fff-8912-f4cffdbd7bdc] terminated with error
java.lang.IllegalStateException: Partition test-0's offset was changed from 1 to 0, some data may have been missed. 
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you don't want your streaming query to fail on such cases, set the
 source option "failOnDataLoss" to "false".

I mentioned "by default" because the query option failOnDataLoss default to true. As explained in the Exception message you could set this to false to let your streaming query running. This option is described in the Structured streaming + Kafka Integration Guide as:

"Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected."