0
votes

I was doing some benchmarking that consists of the following data flow:

Kafka --> Spark Streaming --> Cassandra --> Prestodb

Infrastructure: My spark streaming application runs on 4 executors (2 cores 4g of memory each). Each executor runs on a datanode wherein Cassandra is installed. 4 PrestoDB workers are also co-located in the datanodes. My cluster has 5 nodes, each of them with an Intel core i5, 32GB of DDR3 RAM, 500GB SSD and 1gigabit network.

Spark streaming application: My Spark streaming batch interval is 10s, my kafka producer produces 5000 events each 3 seconds. My streaming application writes to 2 Cassandra tables.

Context in which everything works fine: Everything runs fine, the streaming application is able to process the events and store them in Cassandra. The batch interval is adequate, ingestion rates, scheduling and processing delay stays almost constant for long periods of time.

Context where things get messy and confusing: In my benchmark, every hour I run 6 queries over the Cassandra tables. For the amount of time I am running these queries, the Spark streaming application is no longer able to sustain the write throughput and hangs when writing to Cassandra.

What I've done so far: I searched for this phenomenon in other web posts (including stackoverflow), but I was not able to find a similar phenomenon. The best I've seen was to increase the amount of memory available to Cassandra. Other aspects related to the fetch size of the connectors were also seen, but I don't know if this is a problem, since it only occurs when reading and writing simultaneously.

Question: Cassandra shouldn't lock writes while reading, right? What do you guys think is the source (or sources) of the problem that I need to solve? What configurations should I take into consideration?

I attached a print a print illustrating the job being stuck in the stage that writes to one of the Cassandra tables, when I run the benchmark with the 6 queries, as previously explained. If you need more information to trace the problem, please fell free to ask. I appreciate!

Thank you very much for your support,

Hope I placed the question in a proper manner,

Best regards,

Carlos

3
What heap size is allocated to spark executors and Cassandra ? Do you see GC's or increase in heap utilization during queries? Also check number of connections open to Cassandra (for ingest as well as query)?Nachiket Kate
Each Spark executor has 4GB of memory. I think they have enough memory for this workload, at least while I'm only writing it seems more than enough. No errors, no stuck jobs, no nothing. The problem is when the prestoDB queries start running over the Cassandra tables. When the prestoDB workload is finished, despite having several "paused" jobs, Spark is able to resume all batches, and start normally writing to Cassandra again...Carlos Costa
...Cassandra Heap size is 4GB and HEAP_NEWSIZE is 400M. Do you think I should bump it up according to my workloads? I did not check GC, heap use and open connections during the benchmark, because it was automated, each hour during the night... But thanks for the tip, I will try to reproduce the scenario and look into these aspects right now. It's great to at least have a clear path regarding what to look for. Thank you for the help!Carlos Costa
Yes I was asking about Cassandra heap and not spark. I have tried to summarize suggestions in below answer.Nachiket Kate

3 Answers

1
votes

This problem looks to be on Cassandra-Presto side and not on Spark because of reasons/assumptions

  1. As spark executors are handled by RM (yarn/mesos etc) your queries cannot impact that directly. During off queries, ingestion runs smoothly as mentioned.
  2. Spark side resource starvation will occur only if you share resources directly with other components. In general, Cassandra, Presto workers/threads are not allocated using RM and thus they share resources from node perspective and not RM perspective.

I suspect reasons for stalls could be,

  1. During queries, Cassandra is reading lot of data and thus JVM memory utilization increases and lot of GC's are happening. GC pauses could be the reason behind pauses/stalls.
  2. Number of connections (read/write) to Cassandra are completely used by queries and thus Spark job is not able to insert data and waiting in queue to get connection.
  3. Overall Resource utilization on nodes is increased and possibly one of components have reached its limits (CPU, memory, Disk etc.). IMO CPU, disk are worth checking in this case.

Validate these reasons either by monitoring heap util and GC logs, open connections using JMX for Cassandra and then bump up those values depending on available resources to resolve the issue and try to tune Presto queries as well so have minimal impact.

Presto tuning can be taken as later part once you confirm Cassandra issue. More Presto tuning are available at

https://prestodb.io/docs/current/admin/tuning.html or if teradata solution is used then, https://teradata.github.io/presto/docs/current/admin/tuning.html

0
votes

I wonder if this has anything to do with cassandra.

What is the Spark scheduling policy configured to? By default, it is FIFO. Implying, the query job could consume all cores until it is done. And, will starve the streaming job. If FIFO, change to FAIR and try again.

0
votes

Just to finish this question and following the advise that was given to me by the stack overflow members that were kind enough to reply:

1) I changed the Java garbage collector in Cassandra's jvm.options to G1, which does not require as much tuning as as the default CMS. http://docs.datastax.com/en/cassandra/3.0/cassandra/operations/opsTuneJVM.html#opsTuneJVM__setting-g1-gc I changed it because GC pauses were frequently indicated as a problem in similar scenarios. To be honest, I'm not too familiar with GC monitoring, but the Datastax Cassandra documentation tells to use G1. I changed it, and I noticed a bit of a performance and stability boost in my workloads and infrastructure.

2) I realised that I was being to optimist regarding my cluster's performance, and it was nearly impossible to process and write to Cassandra 20 000 events each 10 seconds, while running heavy prestoDB queries on Cassandra. PrestoDB and Cassandra were consuming almost all CPU in my nodes. I only have 4 cores in each node. Cassandra CPU usage was almost 280%, Presto almost 90%, and therefore Spark streaming executors where starving. Besides, there was no more room for Cassandra to accommodate this write rate, and spark streaming jobs begun to hang, accumulating several batches over a long period of time. Therefore, I set the batch interval higher. I know you lose some data timeliness, but if the infrastructure cannot handle, I don't have the resources to scale :(

Another possibility is tu fine tune CPU limits by applications using linux cgroups, yarn cpu isolation and queues, for example, and see if it helps. For my cluster, as previously said, I think the problem is really trying to "move a montain using a small remote car" :) Every node was in charge of spark streaming jobs + cassandra + presto, with only 4 cores available.

3) Finally I also tuned the spark Cassandra connector, and this worked for my workload, but I think it will depend on your data and other variables. I changed: * number of concurrent writes from 5 to 2; * grouping.key from "partition" to "replica_set", because my partition key has high cardinality (they are almost unique within RDDs), and therefore it was useless to group batches according to the partition key. But this of course depends on your partition key. * batch.size.rows to 50. But this may depend on the amount of data you have for each streaming micro-batch. https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md

I hope this post can help other people facing difficulties in streaming Big Data projects, since things can get very very difficult sometimes :) If some of my decisions are also wrong or misleading, please let me know. Every help is appreciated.

Thank you all,

Best regards,

Carlos Costa