0
votes

I have a Spark Streaming application to analyze events incoming from a Kafka broker. I have rules like below and new rules can be generated by combining existing ones:

If this event type occurs raise an alert.
If this event type occurs more than 3 times in a 5-minute interval, raise an alert.

In parallel, I save every incoming data to Cassandra. What I like to do is run this streaming app for historic data from Cassandra. For example,

<This rule> would have generated <these> alerts for <last week>.

Is there any way to do this in Spark or is it in roadmap? For example, Apache Flink has event time processing. But migrating existing codebase to it seems hard and I'd like to solve this problem with reusing my existing code.

1

1 Answers

0
votes

This is fairly straight-forward, with some caveats. First, it helps to understand how this works from the Kafka side.

Kafka manages what are called offsets -- each message in Kafka has an offset relative to its position in a partition. (Partitions are logical divisions of a topic.) The first message in a partition has an offset of 0L, second one is 1L etc. Except that, because of log rollover and possibly topic compaction, 0L isn't always the earliest offset in a partition.

The first thing you are going to have to do is to collect the offsets for all of the partitions you want to read from the beginning. Here's a function that does this:

def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = {
  val time = kafka.api.OffsetRequest.LatestTime
  val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo](
    (new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000))
  )
  val req = new kafka.javaapi.OffsetRequest(
    reqInfo, kafka.api.OffsetRequest.CurrentVersion, "test"
  )
  val resp = consumer.getOffsetsBefore(req)
  val offsets = resp.offsets(topic, partition)
  (offsets(offsets.size - 1), offsets(0))
}

You would call it like this:

val (firstOffset,nextOffset) = getOffsets(consumer, "MyTopicName", 0)

For everything you ever wanted to know about retrieving offsets from Kafka, read this. It's cryptic, to say the least. (Let me know when you fully understand the second argument to PartitionOffsetRequestInfo, for example.)

Now that you have firstOffset and lastOffset of the partition you want to look at historically, you then use the fromOffset parameter of createDirectStream, which is of type: fromOffset: Map[TopicAndPartition, Long]. You would set the Long / value to the firstOffset you got from getOffsets().

As for nextOffset -- you can use that to determine in your stream when you move from handling historical data to new data. If msg.offset == nextOffset then you are processing the first non-historical record within the partition.

Now for the caveats, directly from the documentation:

  • Once a context has been started, no new streaming computations can be set up or added to it.
  • Once a context has been stopped, it cannot be restarted.
  • Only one StreamingContext can be active in a JVM at the same time.
  • stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.
  • A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.

It's because of these caveats that I grab nextOffset at the same time as firstOffset -- so I can keep the stream up, but change the context from historical to present-time processing.