0
votes

Trying to write data to Kafka topic using Spark Structured Streaming and getting following error.

aggregatedDataset
        .select(to_json(struct("*")).as("value"))
        .writeStream()
        .outputMode(OutputMode.Append())
        .option("kafka.bootstrap.servers", kafkaBootstrapServersString)
        .option("topic", topic)
        .option("checkpointLocation", checkpointLocation)
        .start();

Stacktrace:

Exception in thread "main" java.lang.IllegalArgumentException: 'path' is not specified
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$11.apply(DataSource.scala:276)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$11.apply(DataSource.scala:276)
    at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
    at org.apache.spark.sql.catalyst.util.CaseInsensitiveMap.getOrElse(CaseInsensitiveMap.scala:28)
    at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:275)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:286)
1
Missing .format("kafka") - zero323
Good catch. That's what I was missing :( - Himanshu Yadav

1 Answers

2
votes

In your writeStream section format is missing, which in you case seems to be kafka,

 aggregatedDataset
    ...
    .writeStream
    .format("kafka")
    ...

Hope this helps!