3
votes

Background: I'm using Spark Streaming to stream events from Kafka which are in the form of comma separated key value pairs Here is an example of how events are streamed into my spark application.

Key1=Value1, Key2=Value2, Key3=Value3, Key4=Value4,responseTime=200
Key1=Value5, Key2=Value6, Key3=Value7, Key4=Value8,responseTime=150
Key1=Value9, Key2=Value10, Key3=Value11, Key4=Value12,responseTime=100

Output:

I want to calculate different metrics (avg, count etc.) grouped by different keys in the stream for a given batch interval e.g.

  1. Average responseTime by Key1, Key2 (responseTime is one of the keys in every event)
  2. Count by Key1, Key2

My attempts so far:

val stream = KafkaUtils
  .createDirectStream[String, String, StringDecoder, StringDecoder](
     ssc, kafkaParams, topicsSet)

val pStream = stream.persist()

val events: DStream[String] = pStream.flatMap(_._2.split(","))
val pairs= events.map(data => data.split("=")).map(array => (array(0), array(1)))
// pairs results in tuples of (Key1, Value1), (Key2, Value2) and so on.

Update - 03/04 The keys Key1, Key2...can arrive out of order in the incoming stream.

Appreciate your inputs / hints.

1
Could you provide expected output type and your attempts so far? It is not cleat what you really want.zero323
@zero323 - Ok, I updated my question with the expected output type and my attempts so far. I'll post the answer if I'm able to get through this.codehammer
Thanks. I formatted this a little.zero323
You beat me on the update :). I realized after posting that I missed the formatting. Thanks!codehammer
Sure :) Do you want to compute stats per window? If so have you considered using DataFrames?zero323

1 Answers

2
votes

One possible solution is something like this:

  • create a case class representing each record so we don't have deal with tuples:

    case class Record(
      key1: String, key2: String, key3: String, key4: String, rt: Double)
    
  • use regexp to parse record and drop malformed entries:

    import scala.util.matching.Regex
    
    val recordPattern = new Regex(
      "^Key1=(.*?), ?Key2=(.*?), ?Key3=(.*?), ?Key4=(.*?), ?" ++
      "responseTime=(0-9+)$"
    )
    
    val records = pStream.map {
      case recordPattern(key1, key2, key3, key4, rt) =>
        Some(Record(key1, key2, key3, key4, rt.toDouble))
      case _ => None
    }.flatMap(x => x) // Drop malformed
    
  • reshape data to key-value pairs:

    val pairs = records.map(r => ((r.key1, r.key2), r.rt))
    
  • create a partitioner and use StatCounter to aggregate statistics:

    import org.apache.spark.util.StatCounter
    import org.apache.spark.HashPartitioner
    
    val paritioner: HashPartitioner = ???
    
    pairs.combineByKey[StatCounter](
      StatCounter(_), _ merge _,  _ merge _, paritioner
    )
    
  • extract fields of interest:

    stats.mapValues(s => (s.count, s.mean))
    

You can also try something like this for unordered data although I would strongly suggest fixing things upstream:

val kvPattern = "(\\w+)=(\\w+)".r
val pairs = pStream.map(line => {
  val kvs = kvPattern.findAllMatchIn(line)
    .map(m => (m.group(1), m.group(2))).toMap

  // This will discard any malformed lines
  // (lack of key1, key2, lack or invalid format of responseTime)
  Try((
    (kvs("Key1"), kvs("Key2")), 
    kvs("responseTime").toDouble
  ))

}).flatMap(_.toOption)

and proceed as before.