1
votes

I am trying example of spark structured streaming given on the spark website but it is throwing error

1. Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

2. not enough arguments for method as: (implicit evidence$2: org.apache.spark.sql.Encoder[data])org.apache.spark.sql.Dataset[data]. Unspecified value parameter evidence$2. val ds: Dataset[data] = df.as[data]

Here is my code

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.Encoders
object final_stream {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
          .builder()
          .appName("kafka-consumer")
          .master("local[*]")
          .getOrCreate()
        import spark.implicits._

        spark.sparkContext.setLogLevel("WARN")

    case class data(name: String, id: String)

    val df = spark
          .readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "172.21.0.187:9093")
          .option("subscribe", "test")
          .load()
    println(df.isStreaming)

    val ds: Dataset[data] = df.as[data]
    val value = ds.select("name").where("id > 10")




    value.writeStream
          .outputMode("append")
          .format("console")
          .start()
          .awaitTermination()

  }
}

any help on how to make this work.? I want final output like this i want output like this

+-----+--------+
| name|    id
+-----+--------+
|Jacek|     1
+-----+--------+
2
can you move the case class out of final_stream object and run ?koiralo
If i move case class outside the object it is showing error "Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'name' given input columns: [topic, timestamp, value, key, offset, timestampType, partition];"HIREN GALA
your data contains [topic, timestamp, value, key, offset, timestampType, partition] but your case class contains only name and idkoiralo
how should i solve this error ?HIREN GALA
I just want to display the json string from kafka to spark console through structured streamingHIREN GALA

2 Answers

1
votes

The reason for the error is that you are dealing with Array[Byte] as coming from Kafka and there are no fields to match data case class.

scala> println(schema.treeString)
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

Change the line df.as[data] to the following:

df.
  select($"value" cast "string").
  map(value => ...parse the value to get name and id here...).
  as[data]

I strongly recommend using select and functions object to deal with the incoming data.

0
votes

The error is due to mismatch of number of column in dataframe and your case class.

You have [topic, timestamp, value, key, offset, timestampType, partition] columns in dataframe

Whereas your case class is only with two columns

case class data(name: String, id: String)

You can display the content of dataframe as

val display = df.writeStream.format("console").start()

Sleep for some seconds and then

display.stop()

And also use option("startingOffsets", "earliest") as mentioned here

Then create a case class as per your data.

Hope this helps!