3
votes

I'm playing around with spark-streaming and kafka together in python, and loosely following along with this post but I'm a little confused about the KafkaUtils.createStream() function mentioned early on.

The documentation doesn't do much by way of explicitly explaining what the topics dictionary affects. But I have a suspicion that I only think so because my knowledge of how kafka works is shaky, and the answer is obvious.

I understand that it should be a dictionary like so: {"topic.name": 1} and I can parrot the documentation and say that means the stream created will consume from a single partition.

So I suppose I'm just looking for some clarification on both the usage of this particular function, and also my understanding of kafka concepts. We'll use the following example:

Let's say I have defined a topic my.topic that has 3 partitions and whose incoming messages are split on a key, let's just say a userid.

If I initialize a stream like so:

from pyspark.streaming.kafka import KafkaUtils

kafkaStream = KafkaUtils.createStream(
    ssc, 
    'kafka:2181', 
    'consumer-group-name', 
    {'my.topic':1}
)  

Am I right in thinking that this stream will only consume from a single partition, and so will not see every message coming into my.topic? In other words it will only see messages from userid's sent to one of the 3 partitions?

My questions then:

  1. How do I properly set this parameter to consume all messages sent to my.topic?

    My intuition is that I would just set the topics parameter to {'my.topic': 3}, so then my question becomes:

  2. Why would I ever use a number smaller than the total number of partitions?

    My intuition here tells me that it's a matter of how "atomic" the work you're doing is. For example if I were simply transforming data (say, from a CSV into a list of JSON documents or something) Then having 3 of the above streams each with {'my.topic': 1} set as their topics parameter and all part of the same consumer group would be beneficial by enabling parallel consumption from each of the partitions because no information needs to be shared about each message consumed.

    Meanwhile if I was calculating live metrics meant to be over the entire topic I.E. time-windowed averages with filters, etc. I struggle to see a way to implement something like that without setting {'my.topic': 3},or if it's something like a sum, then doing slightly more complicated downstream processing of each component signal within a consumer group I.E. Sum1 + Sum2 + Sum3 = TotalSum

    But again my knowledge is squarely in the "fledgling" stage of playing around with Kafka and Spark.

  3. Is there a way to tell createStream() to consume from all partitions, without knowing ahead of time how many there are? Something like {'my.topic': -1}?

  4. Can there be more than one topic specified in a single stream? I.E. {'my.topic': 1, 'my.other.topic': 1}

I would really hate for the answer to this question to simply be "Yes, your intuition is correct.". Best case scenario is someone tells me I'm misunderstanding literally everything and sets me straight. So please...do that!

2

2 Answers

4
votes

This is whats mentioned in the Kafka-Spark integration page.

val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

The KafkaUtils.createStream will create a receiver and consume the Kafka topic(s).

The option "Per-topic number of Kafka partitions to consume" meant how many partitions this receiver will read in parallel.

Say for example you have a topic named "Topic1" with 2 partitions and you have provided the option 'Topic1':1, then Kafka receiver will read 1 partition at a time [It will eventually read all the partitions but will read one partition at a time]. The reason for this is to read the messages in partition and also preserving the order in which the data is written to the topic.

Say for example Topic1 has partition1 with messages {1,11,21,31,41} and partition2 with messages {2,12,22,32,42}, then reading with above mentioned settings will yield a stream like {1,11,21,31,41,2,12,22,32,42}. The messages in each partition is read separately hence it didn't mix together.

If you provide option as 'Topic1':2, then the receiver will read 2 partitions at a time and the messages within these partitions will be mixed together. For the same above started example, the receiver with 'Topic1':2 will yield something like {1,2,11,12,21,22....}

Think of this as the number of parallel reads the receiver can perform on given topic-partitions.

5. Can there be more than one topic specified in a single stream? Yes you can.

2
votes

Just specify the topic without partition, and you will get all messages in that topic, no matter how many partitions in the topic.

All you need to do is to have a look at example code: https://github.com/apache/spark/blob/v2.2.1/examples/src/main/python/streaming/direct_kafka_wordcount.py#L48