1
votes

I want to process the messages reported at a web server in real time. The messages reported at web server belong to different sessions and I want to do some session level aggregations. For this purpose I plan to use Spark Streaming front ended by Kafka. Even before I start, I have listed down a few challenges which this architecture is going to throw. Can someone familiar with this ecosystem help me out with these questions:

  1. If each Kafka message belongs to a particular session, how to manage session affinity so that the same Spark executor sees all the messages linked to a session?
  2. How to ensure that messages belonging to a session are processed by a Spark executor in the order they were reported at Kafka? Can we somehow achieve this without putting a constraint on thread count and incurring processing overheads (like sorting by message timestamp)?
  3. When to checkpoint session state? How state is resurrected from last checkpoint in case of executor node crash? How state is resurrected from last checkpoint in case of driver node crash?
  4. How state is resurrected if a node(executor/driver) crashes before checkpointing its state? If Spark recreates state RDD by replaying messages then where does it start replaying the Kafka messages from: last checkpoint on wards or does it process all the messages needed to recreate the partition? Can/does Spark streaming resurrect state across multiple spark streaming batches or only for the current batch i.e. can the state be recovered if checkpointing was not done during the last batch?
1

1 Answers

6
votes

If each Kafka message belongs to a particular session, how to manage session affinity so that the same Spark executor sees all the messages linked to a session?

Kafka divides topics into partitions, and every partition can only be read by one consumer at a time, so you need to make sure that all messages belonging to one session go into the same partition. Partition assignment is controlled via the key that you assign to every message, so the easiest way to achieve this would probably be to use the session id as key when sending data. That way the same consumer will get all messages for one session. There is one caveat though: Kafka will rebalance assignment of partitions to consumers, when a consumer joins or leaves the consumergroup. If this happens mid-session, it can (and will) happen, that half the messages for that session go to one consumer and the other half go to a different consumer after the rebalance. To avoid this, you'll need to manually subscribe to specific partitions in your code so that every processor has its specific set of partitions and does not change those. Have a look at ConsumerStrategies.Assign in the SparkKafka Component Code for this.


How to ensure that messages belonging to a session are processed by a Spark executor in the order they were reported at Kafka? Can we somehow achieve this without putting a constraint on thread count and incurring processing overheads (like sorting by message timestamp)?

Kafka preserves ordering per partition, so there is not much you need to do here. The only thing is to avoid having multiple requests from the producer to the broker at the same time, which you can configure via the producer parameter max.in.flight.requests.per.connection. As long as you keep this at 1, you should be safe if I understand your setup correctly.


When to checkpoint session state? How state is resurrected from last checkpoint in case of executor node crash? How state is resurrected from last checkpoint in case of driver node crash?

I'd suggest reading the offset storage section of the Spark Streaming + Kafka Integration Guide, which should answer a lot of questions already.

The short version is, you can persist your last read offset into Kafka and should definitely do this whenever you checkpoint your executors. That way, whenever a new executor picks up processing, no matter whether it was restored from a checkpoint or not, it will know where to read from in Kafka.


How state is resurrected if a node(executor/driver) crashes before checkpointing its state? If Spark recreates state RDD by replaying messages then where does it start replaying the Kafka messages from: last checkpoint on wards or does it process all the messages needed to recreate the partition? Can/does Spark streaming resurrect state across multiple spark streaming batches or only for the current batch i.e. can the state be recovered if checkpointing was not done during the last batch?

My Spark knowledge here is a bit shaky, but I would say that this not something that is done by Kafka/Spark but rather something that you actively need to influence with your code. By default, if a new Kafka Stream is started up and finds no previous committed offset, it will simply start reading from the end of the topic, so it would get any message that is produced after the consumer is started. If you need to resurrect state, then you'd either need to know from what exact offset you want to start re-reading messages, or just start reading from the beginning again. You can pass an offset to read from into the above mentioned .Assign() method, when distributing partitions.

I hope this helps a bit, I am sure it is by no means a complete answer to all questions, but it is a fairly wide field to work, let me know if I can be of further help.