I am reading log lines from kafka topic through spark structured streaming,separating the fields of loglines, performing some manipulations on fields and storing storing it in dataframe with separate columns for every fields. I want to write this dataframe to kafka
Below is my sample dataframe and writestream for writing it to kafka
val dfStructuredWrite = dfProcessedLogs.select(
dfProcessedLogs("result").getItem("_1").as("col1"),
dfProcessedLogs("result").getItem("_2").as("col2"),
dfProcessedLogs("result").getItem("_17").as("col3"))
dfStructuredWrite
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
Above code gives me below error
Required attribute 'value' not found
I believe this is because I don't have my dataframe in key/value format.How can I write my existing dataframe to kafka in most efficient way?