1
votes

I am using Kafka+Spark integration where I am sending case class object (Website) and map into spark.

    case class Website(id: Int, name: String)

    implicit val productSchema = Encoders.product[Website]
    val website = Website(1,"lokesh")
    EmbeddedKafka.publishToKafka(topic, website.toString)(config,new StringSerializer)

    val df:Dataset[Website] = spark
          .readStream
          .format("kafka")
          .option("subscribe", topic)
          .option("kafka.bootstrap.servers", "localhost:1244")
          .option("startingoffsets", "earliest")
          .load()
          .select("value") 
          .as[Website]

I get the error

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'id' given input columns: [value];

1

1 Answers

1
votes

tl;dr Use proper serialization format, e.g. JSON or Avro.


The following code sends out a textual representation of the Website case class.

EmbeddedKafka.publishToKafka(topic, website.toString)(config,new StringSerializer)

The following code takes the textual representation as Array[Byte]:

.select("value")

So you'd be better off casting the value to a string and simply...parse to make out an object, e.g. .select($"value" cast "string").

With that, you'd be better off sending out a JSON representation of the website object that would make parsing so much easier. You could also use a comma-separated "serialization format", but that would require that your websites do not have any fields with commas.


AnalysisException: cannot resolve 'id' given input columns: [value]

The above exception says that you want to make an object of type Website (that is made up of id and name fields) from value which is clearly not possible.

After .select("value") the only column is value (obvious) and so .as[Website] is not possible as there are no "parts" of Website type to fill out (by name).