3
votes

I'm working on a research project where I installed a complete data analysis pipeline on Google Cloud Platform. We estimate unique visitors per URL in real-time using HyperLogLog on Spark. I used Dataproc to set up the Spark Cluster. One goal of this work is to measure the throughput of the architecture depending on the cluster size. The Spark cluster has three nodes (minimal configuration)

A data stream is simulated with own data generators written in Java where I used the kafka producer API. The architecture looks as follows:

Data generators -> Kafka -> Spark Streaming -> Elasticsearch.

The problem is: As I increase the number of produced events per second on my data generators and it goes beyond ~ 1000 events/s the input rate in my Spark job suddenly collapses and begin to vary a lot.

As you can see on the screenshot from the Spark Web UI, the processing times and scheduling delays keep constant short, while the input rate goes down.

Screenshot from Spark Web UI

I tested it with a complete simple Spark job which only does a simple mapping, to exclude causes like slow Elasticsearch writes or problems with the job itself. Kafka also seems to receive and send all the events correctly.

Furthermore I experimented with the Spark configuration parameters: spark.streaming.kafka.maxRatePerPartition and spark.streaming.receiver.maxRate with the same result.

Does anybody have some ideas what goes wrong here? It really seems to be up to the Spark Job or Dataproc... but I'm not sure. All CPU and memory utilizations seem to be okay.

EDIT: Currently I have two kafka partitions on that topic (placed on one machine). But I think Kafka should even with only one partition do more than 1500 Events/s. The problem also was with one partition at the beginning of my experiments. I use direct approach with no receivers, so Spark reads with two worker nodes concurretly from the topic.

EDIT 2: I found out what causes this bad throughput. I forgot to mention one component in my architecture. I use one central Flume agent to log all the events from my simulator instances via log4j via netcat. This flume agent is the cause of the performance problem! I changed the log4j configuration to use asynchronuous loggers (https://logging.apache.org/log4j/2.x/manual/async.html) via disruptor. I scaled the Flume agent up to more CPU cores and RAM and changed the channel to a file channel. But it still has a bad performance. No effect... any other ideas how to tune Flume performance?

2
How many Kafka partitions are in your topic? Which Spark streaming Kafka integration are you using (direct or receivers)? Have you had an opportunity to use custom Spark streaming receivers to rule out any Kafka <-> spark integration issues? Have you had a chance to replace the write to ES with something else (e.g., an hdfs file, bigtable write, pubsub publish, log statement) to rule out any spark <-> ES integration issues?Angus Davis
Currently I have two kafka partitions on that topic (placed on one machine). But I think Kafka should even with only one partition do more than 1500 Events/s. The problem also was with one partition at the beginning of my experiments. I use direct approach with no receivers, so Spark reads with two worker nodes concurretly from the topic. And as I said in the first post I removed the part of the Job where the results are written to Elasticsearch so we can rule out an ES issue.JayKay

2 Answers

0
votes

Hard to say given the sparse amount of information. I would suspect a memory issue - at some point, the servers may even start swapping. So, check the JVM memory utilizations and swapping activity on all servers. Elasticsearch should be capable of handling ~15.000 records/second with little tweaking. Check the free and committed RAM on the servers.

0
votes

As I mentioned before CPU and RAM utilizations are totally fine. I found out a "magic limit", it seems to be exactly 1500 events per second. As I exceed this limit the input rate immediately begins to wobble.

The misterious thing is that processing times and scheduling delays stay constant. So one can exclude backpressure effects, right?

The only thing I can guess is a technical limit with GCP/Dataproc... I didn't find any hints on the Google documentation.

Some other ideas?