14
votes

Is there a way to use a schema to convert messages from with to ? The schema file for user records:

{
  "fields": [
    { "name": "firstName", "type": "string" },
    { "name": "lastName", "type": "string" }
  ],
  "name": "user",
  "type": "record"
}

And code snippets from SqlNetworkWordCount example and Kafka, Spark and Avro - Part 3, Producing and consuming Avro messages to read in messages.

object Injection {
  val parser = new Schema.Parser()
  val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
  val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}

...

messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
  import sqlContext.implicits._

  val df = rdd.map(message => Injection.injection.invert(message._2).get)
    .map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()

  df.show()
})

case class User(firstName: String, lastName: String)

Somehow I can't find another way than using a case class to convert AVRO messages to DataFrame. Is there a possibility to use the schema instead? I'm using Spark 1.6.2 and Kafka 0.10.

The complete code, in case you're interested.

import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}

object ReadMessagesFromKafka {
  object Injection {
    val parser = new Schema.Parser()
    val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
    val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
  }

  def main(args: Array[String]) {
    val brokers = "127.0.0.1:9092"
    val topics = "test"

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("ReadMessagesFromKafka").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
  ssc, kafkaParams, topicsSet)

    messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._

      val df = rdd.map(message => Injection.injection.invert(message._2).get)
    .map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()

      df.show()
    })

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

/** Case class for converting RDD to DataFrame */
case class User(firstName: String, lastName: String)

/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
  @transient  private var instance: SQLContext = _

  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}
4

4 Answers

4
votes

OP probably resolved the issue but for future reference I solved this issue quite generally so thought it might be helpful to post here.

So generally speaking you should convert the Avro schema to a spark StructType and also convert the object you have in your RDD to Row[Any] and then use:

spark.createDataFrame(<RDD[obj] mapped to RDD[Row}>,<schema as StructType>

In order to convert the Avro schema I used spark-avro like so:

SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]

The convertion of the RDD was more tricky.. if your schema is simple you can probably just do a simple map.. something like this:

rdd.map(obj=>{
    val seq = (obj.getName(),obj.getAge()
    Row.fromSeq(seq))
    })

In this example the object has 2 fields name and age.

The important thing is to make sure the elements in the Row will match the order and types of the fields in the StructType from before.

In my perticular case I had a much more complex object which I wanted to handle generically to support future schema changes so my code was much more complex.

the method suggested by OP should also work on some casese but will be hard to imply on complex objects (not primitive or case-class)

another tip is that if you have a class within a class you should convert that class to a Row so that the wrapping class will be converted to something like:

Row(Any,Any,Any,Row,...)

you can also look at the spark-avro project I mentioned earlier on how to convert objects to rows.. I used some of the logic there myself

If someone reading this needs further help ask me in the comments and I'll try to help

Similar problem is solved also here.

4
votes

Please take a look at this https://github.com/databricks/spark-avro/blob/master/src/test/scala/com/databricks/spark/avro/AvroSuite.scala

So instead of

 val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString,records.get("lastName").toString)).toDF()

you can try this

 val df = spark.read.avro(message._2.get)
2
votes

I worked on the similar issue, but in Java. So not sure about Scala, but take a look at the library com.databricks.spark.avro.

1
votes

For anyone interested in handling this in a way that can handle schema changes without needing to stop and redeploy your spark application (assuming your app logic can handle this) see this question/answer.