2
votes

I am new to Spark and Cassandra.

We are using Spark on top of Cassandra to read data, since we have requirement to read data using non-primary key columns.

One observation is, number of tasks for a spark job increasing w.r.t data growth. Due to this we are facing lot of latency in fetching data.

What would be the reasons for the spark job task count increase?

What should be considered to increase performance in Spark with Cassandra?

Please suggest me.

Thanks,
Mallikarjun

2
What versions of Spark & Cassandra are you using?Gillespie
We are using Cassandra 2.1.5 and Spark 1.4.0Mallikarjun B

2 Answers

3
votes

The input split size is controlled by the configuration spark.cassandra.input.split.size_in_mb. Each split will generate a task in Spark, therefore, the more data in Cassandra, the longer it will take to process (which is what you would expect)

To improve performance, make sure you are aligning the partitions using joinWithCassandraTable. Don't use context.cassandraTable(...) unless you absolutely need all the data in the table and optimize the retrieved data using select to project only the columns that you need.

If you need data from some rows, it would make sense to build a secondary table where the id of those rows is stored.

Secondary indexes could also help to select subsets of the data, but I've seen reports of if being not highly performant.

1
votes

What would be the reasons for the spark job task count increase?

Following on from maasgs answer, rather than setting the spark.cassandra.input.split.size_in_mb. on the SparkConf, it can be useful to use the ReadConf config when reading from different keyspaces/datacentres in a single job:

    val readConf = ReadConf(
        splitCount = Option(500),
          splitSizeInMB = 64,
          fetchSizeInRows = 1000,
          consistencyLevel = ConsistencyLevel.LOCAL_ONE,
          taskMetricsEnabled = true
        ) 

    val rows = sc.cassandraTable(cassandraKeyspace, cassandraTable).withReadConf(readConf)

What should be considered to increase performance in Spark with Cassandra?

As far as increasing performance is concerned, this will depend on the jobs you are running and the types of transformations required. Some general advice to maximise Spark-Cassandra performance (As can be found here) is outlined below.

Your choice of operations and the order in which they are applied is critical to performance.

You must organize your processes with task distribution and memory in mind.

The first thing is to determine if you data is partitioned appropriately. A partition in this context is merely a block of data. If possible, partition your data before Spark even ingests it. If this is not practical or possible, you may choose to repartition the data immediately following the load. You can repartition to increase the number of partitions or coalesce to reduce the number of partitions.

The number of partitions should, as a lower bound, be at least 2x the number of cores that are going to operate on the data. Having said that, you will also want to ensure any task you perform takes at least 100ms to justify the distribution across the network. Note that a repartition will always cause a shuffle, where coalesce typically won’t. If you’ve worked with MapReduce, you know shuffling is what takes most of the time in a real job.

Filter early and often. Assuming the data source is not preprocessed for reduction, your earliest and best place to reduce the amount of data spark will need to process is on the initial data query. This is often achieved by adding a where clause. Do not bring in any data not necessary to obtain your target result. Bringing in any extra data will affect how much data may be shuffled across the network, and written to disk. Moving data around unnecessarily is a real killer and should be avoided at all costs

At each step you should look for opportunities to filter, distinct, reduce, or aggregate the data as much as possible prior to proceeding to the operation.

Use pipelines as much as possible. Pipelines are a series of transformations that represent independent operations on a piece of data and do not require a reorganization of the data as a whole (shuffle). For example: a map from a string -> string length is independent, where a sort by value requires a comparison against other data elements and a reorganization of data across the network (shuffle).

In jobs which require a shuffle see if you can employ partial aggregation or reduction before the shuffle step (similar to a combiner in MapReduce). This will reduce data movement during the shuffle phase.

Some common tasks that are costly and require a shuffle are sorts, group by key, and reduce by key. These operations require the data to be compared against other data elements which is expensive. It is important to learn the Spark API well to choose the best combination of transformations and where to position them in your job. Create the simplest and most efficient algorithm necessary to answer the question.