
I am trying example of spark structured streaming given on the spark website but it is throwing error

1. Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

2. not enough arguments for method as: (implicit evidence$2: org.apache.spark.sql.Encoder[data])org.apache.spark.sql.Dataset[data]. Unspecified value parameter evidence$2. val ds: Dataset[data] = df.as[data]

Here is my code

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._
import org.apache.spark.sql.Encoders
object final_stream {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
        import spark.implicits._


    case class data(name: String, id: String)

    val df = spark
          .option("kafka.bootstrap.servers", "")
          .option("subscribe", "test")

    val ds: Dataset[data] = df.as[data]
    val value = ds.select("name").where("id > 10")



any help on how to make this work.? I want final output like this i want output like this

| name|    id
|Jacek|     1
can you move the case class out of final_stream object and run ?koiralo
If i move case class outside the object it is showing error "Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'name' given input columns: [topic, timestamp, value, key, offset, timestampType, partition];"HIREN GALA
your data contains [topic, timestamp, value, key, offset, timestampType, partition] but your case class contains only name and idkoiralo
how should i solve this error ?HIREN GALA
I just want to display the json string from kafka to spark console through structured streamingHIREN GALA

2 Answers


The reason for the error is that you are dealing with Array[Byte] as coming from Kafka and there are no fields to match data case class.

scala> println(schema.treeString)
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

Change the line df.as[data] to the following:

  select($"value" cast "string").
  map(value => ...parse the value to get name and id here...).

I strongly recommend using select and functions object to deal with the incoming data.


The error is due to mismatch of number of column in dataframe and your case class.

You have [topic, timestamp, value, key, offset, timestampType, partition] columns in dataframe

Whereas your case class is only with two columns

case class data(name: String, id: String)

You can display the content of dataframe as

val display = df.writeStream.format("console").start()

Sleep for some seconds and then


And also use option("startingOffsets", "earliest") as mentioned here

Then create a case class as per your data.

Hope this helps!