1
votes

I have the following code to read and process Kafka data using Structured Streaming

object ETLTest {

  case class record(value: String, topic: String)

  def main(args: Array[String]): Unit = {
    run();
  }

  def run(): Unit = {

    val spark = SparkSession
      .builder
      .appName("Test JOB")
      .master("local[*]")
      .getOrCreate()

    val kafkaStreamingDF = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "...")
      .option("subscribe", "...")
      .option("failOnDataLoss", "false")
      .option("startingOffsets","earliest")
      .load()
      .selectExpr("CAST(value as STRING)", "CAST(timestamp as STRING)","CAST(topic as STRING)")

    val sdvWriter = new ForeachWriter[record] {
      def open(partitionId: Long, version: Long): Boolean = {
        true
      }
      def process(record: record) = {
        println("record:: " + record)
      }
      def close(errorOrNull: Throwable): Unit = {}
    }

    val sdvDF = kafkaStreamingDF
      .as[record]
      .filter($"value".isNotNull)

    // DOES NOT WORK
    /*val query = sdvDF
        .writeStream
        .format("console")
        .start()
        .awaitTermination()*/

    // WORKS
    /*val query = sdvDF
      .writeStream
      .foreach(sdvWriter)
      .start()
      .awaitTermination()
      */

  }

}

I am running this code from IntellijIdea IDE and when I use the foreach(sdvWriter), I could see the records consumed from Kafka, but when I use .writeStream.format("console") I do not see any records. I assume that the console write stream is maintaining some sort of checkpoint and assumes it has processed all the records. Is that the case ? Am I missing something obvious here?

1

1 Answers

0
votes

reproduced your code here both of the options worked. actually in both options without the
import spark.implicits._ it would fail so I'm not sure what you are missing. might be some dependencies configured not correctly. can you add the pom.xml?

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.Trigger



object Check {

  case class record(value: String, topic: String)


  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder().master("local[2]")
      .getOrCreate


    import spark.implicits._

    val kafkaStreamingDF = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "test")
      .option("startingOffsets","earliest")
      .option("failOnDataLoss", "false")
      .load()
      .selectExpr("CAST(value as STRING)", "CAST(timestamp as STRING)","CAST(topic as STRING)")


    val sdvDF = kafkaStreamingDF
      .as[record]
      .filter($"value".isNotNull)

    val query = sdvDF.writeStream
          .format("console")
          .option("truncate","false")
          .start()
          .awaitTermination()

  }


}