I'm new to this whole Kafka/Spark thing. I have Spark Streaming (PySpark) taking in data from a Kafka producer. It runs fine for a minute and then always throws a kafka.common.OffsetOutOfRangeException. The Kafka consumer is version 0.8 (0.10 is not supported, apparently, for PySpark). I have a master node with 3 workers on AWS Ubuntu 14.04. I don't know if this is relevant, but the Kafka logs here are relatively large (~1-10kb) and I've adjusted the producer/broker/consumer configs accordingly. The data is being passed through fine, though maybe slower than what I think the producer is probably producing (this may be the source of the problem?).
A similar problem was solved by increasing the retention time/size here: Kafka OffsetOutOfRangeException
But my retention time is an hour and the size is 1GB in each node's server.properties, and more importantly, there's no change in Spark's time-to-failure and the set retention time/size.
Is there any other possibility for adjustment, maybe on the Spark Streaming configs? All the answers I see online have to do with Kafka provisioning, but it doesn't seem to make a difference in my case.
EDIT 1: I tried a) having multiple streams reading from the producer and b) slowing down the producer stream itself with time.sleep(1.0). Neither had a lasting effect.
i.e.
n_secs = 1
ssc = StreamingContext(sc, n_secs)
kds = [KafkaUtils.createDirectStream(ssc, ['test-video'], {
'bootstrap.servers':'localhost:9092',
'group.id':'test-video-group',
'fetch.message.max.bytes':'15728640',
'auto.offset.reset':'largest'}) for _ in range(n_streams)]
stream = ssc.union(*kds)