I'm trying to write a data frame which has about 230 million records to a Kafka. More specifically to a Kafka-enable Azure Event Hub, but I'm not sure if that's actually the source of my issue.
EH_SASL = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=****";'
dfKafka \
.write \
.format("kafka") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.bootstrap.servers", "myeventhub.servicebus.windows.net:9093") \
.option("topic", "mytopic") \
.option("checkpointLocation", "/mnt/telemetry/cp.txt") \
.save()
This starts up fine and writes about 3-4 million records successfully (and pretty fast) to the queue. But then the job stops after a couple of minutes with messages like those:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 7.0 failed 4 times, most recent failure: Lost task 6.3 in stage 7.0 (TID 248, 10.139.64.5, executor 1): kafkashaded.org.apache.kafka.common.errors.TimeoutException: Expiring 61 record(s) for mytopic-18: 32839 ms has passed since last append
or
org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 8.0 failed 4 times, most recent failure: Lost task 13.3 in stage 8.0 (TID 348, 10.139.64.5, executor 1): kafkashaded.org.apache.kafka.common.errors.TimeoutException: The request timed out.
Also, I never see the checkpoint file being created/written to.
I also played around with .option("kafka.delivery.timeout.ms", 30000) and different values but that didn't seem to have any effect.
I'm running this in an Azure Databricks cluster version 5.0 (includes Apache Spark 2.4.0, Scala 2.11)
I don't see any errors like throttling on my Event Hub, so that should be ok.