1
votes

I'm new to this whole Kafka/Spark thing. I have Spark Streaming (PySpark) taking in data from a Kafka producer. It runs fine for a minute and then always throws a kafka.common.OffsetOutOfRangeException. The Kafka consumer is version 0.8 (0.10 is not supported, apparently, for PySpark). I have a master node with 3 workers on AWS Ubuntu 14.04. I don't know if this is relevant, but the Kafka logs here are relatively large (~1-10kb) and I've adjusted the producer/broker/consumer configs accordingly. The data is being passed through fine, though maybe slower than what I think the producer is probably producing (this may be the source of the problem?).

A similar problem was solved by increasing the retention time/size here: Kafka OffsetOutOfRangeException

But my retention time is an hour and the size is 1GB in each node's server.properties, and more importantly, there's no change in Spark's time-to-failure and the set retention time/size.

Is there any other possibility for adjustment, maybe on the Spark Streaming configs? All the answers I see online have to do with Kafka provisioning, but it doesn't seem to make a difference in my case.

EDIT 1: I tried a) having multiple streams reading from the producer and b) slowing down the producer stream itself with time.sleep(1.0). Neither had a lasting effect.

i.e.

n_secs = 1
ssc = StreamingContext(sc, n_secs)
kds = [KafkaUtils.createDirectStream(ssc, ['test-video'], {
                    'bootstrap.servers':'localhost:9092', 
                    'group.id':'test-video-group', 
                    'fetch.message.max.bytes':'15728640',
                    'auto.offset.reset':'largest'}) for _ in range(n_streams)]

stream = ssc.union(*kds)
1
Looks like you are using the new consumer in 0.8 right? I'm guessing this by the bootstrap servers instead of a zk connecting. How do you commit offsets? - dawsaw
@dawsaw It's committed automatically, but over the weekend I think I've established that it's a backpressure problem in Spark Streaming. - thefourtheye

1 Answers

0
votes

Is it possible that your producer generates too many messages too fast so that 1G is not enough on each broker? 1G seems very low in all reality. After Spark Streaming decides the offset range it needs to process in the micro batch and try to retrieve the messages from the broker based on the offset, the messages are gone due to size limit. Please increase the broker size to something bigger like 100G and see if that fixes your problem.