This is fairly straight-forward, with some caveats. First, it helps to understand how this works from the Kafka side.
Kafka manages what are called offsets -- each message in Kafka has an offset relative to its position in a partition. (Partitions are logical divisions of a topic.) The first message in a partition has an offset of 0L
, second one is 1L
etc. Except that, because of log rollover and possibly topic compaction, 0L
isn't always the earliest offset in a partition.
The first thing you are going to have to do is to collect the offsets for all of the partitions you want to read from the beginning. Here's a function that does this:
def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = {
val time = kafka.api.OffsetRequest.LatestTime
val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo](
(new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000))
)
val req = new kafka.javaapi.OffsetRequest(
reqInfo, kafka.api.OffsetRequest.CurrentVersion, "test"
)
val resp = consumer.getOffsetsBefore(req)
val offsets = resp.offsets(topic, partition)
(offsets(offsets.size - 1), offsets(0))
}
You would call it like this:
val (firstOffset,nextOffset) = getOffsets(consumer, "MyTopicName", 0)
For everything you ever wanted to know about retrieving offsets from Kafka, read this. It's cryptic, to say the least. (Let me know when you fully understand the second argument to PartitionOffsetRequestInfo
, for example.)
Now that you have firstOffset
and lastOffset
of the partition you want to look at historically, you then use the fromOffset
parameter of createDirectStream
, which is of type: fromOffset: Map[TopicAndPartition, Long]
. You would set the Long
/ value to the firstOffset
you got from getOffsets()
.
As for nextOffset
-- you can use that to determine in your stream when you move from handling historical data to new data. If msg.offset == nextOffset
then you are processing the first non-historical record within the partition.
Now for the caveats, directly from the documentation:
- Once a context has been started, no new streaming computations can be
set up or added to it.
- Once a context has been stopped, it cannot be
restarted.
- Only one StreamingContext can be active in a JVM at the
same time.
- stop() on StreamingContext also stops the SparkContext. To
stop only the StreamingContext, set the optional parameter of stop()
called stopSparkContext to false.
- A SparkContext can be re-used to
create multiple StreamingContexts, as long as the previous
StreamingContext is stopped (without stopping the SparkContext)
before the next StreamingContext is created.
It's because of these caveats that I grab nextOffset
at the same time as firstOffset
-- so I can keep the stream up, but change the context from historical to present-time processing.