9
votes

We're using Spark Streaming connected to AWS Kinesis stream in order to aggregate (per minute) the metrics that we're receiving and writing the aggregations to influxdb in order to make them available to a real-time dashboard.

Everything is working fine, but we're now considering how should we manage the pauses for deployments and eventual failures of the system.

The docs says that Kinesis integration library is already prepared for failures, checkpoint and so on, but I would like to clarify how checkpoint is working there.

The Kinesis receiver creates an input DStream using the Kinesis Client Library (KCL) provided by Amazon under the Amazon Software License (ASL). The KCL builds on top of the Apache 2.0 licensed AWS Java SDK and provides load-balancing, fault-tolerance, checkpointing through the concepts of Workers, Checkpoints, and Shard Leases.

We can define the checkpoint interval for kinesis, but as far as I understand that is just used to mark until which point of the stream we've consumed the metrics. So, we still need to use the checkpointing feature from spark streaming, right?

As we're aggregating the data per minute, our batch interval is 60 seconds but during those 60 seconds we're continuously receiving data from the stream.

Here are my questions:

  • When I execute JavaStreamingContext.stop(...) (in order to deploy a new version of the job), the receiver will be stopped and the checkpoint will be updated at the end?
  • When will the spark streaming checkpoint happen? After every execution of the job? Before?
  • Assuming that we've both checkpoints working, how can we guarantee the consistency in case of failure? It seems that every time that streaming checkpointing is happening it needs to checkpoint to kinesis at the same time, otherwise we can end reading the same data again. How can we handle this?
  • If the underlying service (in this case influxdb) is down, what should I do? Implement a retry mechanism? If so, it needs to stop retrying after a while, otherwise we'll run Out of Memory.

Thanks in advance!

1

1 Answers

0
votes

Not hundred percent sure that that would be a full answer on your question as checkpointing solution is quite complicated component and each subquestion could require a separate question in SO. Still, maybe this would give some clue on the process:

  • checkpointing works on DStream level, so that means that you can perform checkpoints on different stages of your pipeline. It can be the point when Spark creates your first RDD from the blocks generated by receiver or it can be your transformed RDD which you can get on later stages after you calculated your metrics. So when you call stop (if you stop it gracefully) you will have the state of your checkpoint with the last RDD processed after your receivers were stopped at the point which you've chosen in your pipeline

  • checkpointing is triggered by Spark component called JobGenerator. Before running the job it will generate DStreams which will compute RDDs. On that step if you checkpointing is configured then every RDD of that DStream will additionally create checkpoint metadata and RDD will be marked as one which requires checkpointing. Then SparkContext will run generated jobs and on the end it will call doCheckpoint method which will persist checkpoint data into configured location. JobGenerator will create a separate job for that so you will expect some latency between the actual job complete and checkpoint persistence

  • every time Spark will run your application, it will create streaming context from you checkpoint data. So lets say if you have your metrics at state 7 for example on last Spark shut down after your Kenesis receivers were stopped, then when your streaming context will be recovered, it will be in state 7 again and only next batch generated from new kenesis data will place it to state 8

  • well, thats up to you how you will architect your product. Probably it makes sense to do checkpointing only after your data has been handled successfully by your dependency (of cause I would suggest to apply retry mechanism to avoid short term connectivity issues). But thats too little information to give you full answer on that