5
votes

I have setup Spark 2.0 and Cassandra 3.0 on a local machine (8 cores, 16gb ram) for testing purposes and edited spark-defaults.conf as follows:

spark.python.worker.memory 1g
spark.executor.cores 4
spark.executor.instances 4
spark.sql.shuffle.partitions 4

Next I imported 1.5 million rows in Cassandra:

test(
    tid int,
    cid int,
    pid int,
    ev list<double>,
    primary key (tid)
)

test.ev is a list containing numeric values i.e. [2240,2081,159,304,1189,1125,1779,693,2187,1738,546,496,382,1761,680]

Now in the code, to test the whole thing I just created a SparkSession, connected to Cassandra and make a simple select count:

cassandra = spark.read.format("org.apache.spark.sql.cassandra")
df = cassandra.load(keyspace="testks",table="test")
df.select().count()

At this point, Spark outputs the count and takes about 28 seconds to finish the Job, distributed in 13 Tasks (in Spark UI, the total Input for the Tasks is 331.6MB)

Questions:

  • Is that the expected performance? If not, what am I missing?
  • Theory says the number of partitions of a DataFrame determines the number of tasks Spark will distribute the job in. If I am setting the spark.sql.shuffle.partitions to 4, why is creating 13 Tasks? (Also made sure the number of partitions calling rdd.getNumPartitions() on my DataFrame)

Update

A common operation I would like to test over this data:

  • Query a large data set, say, from 100,000 ~ N rows grouped by pid
  • Select ev, a list<double>
  • Perform an average on each member, assuming by now each list has the same length i.e df.groupBy('pid').agg(avg(df['ev'][1]))

As @zero323 suggested, I deployed a external machine (2Gb RAM, 4 cores, SSD) with Cassandra just for this test, and loaded the same data set. The result of the df.select().count() was an expected greater latency and overall poorer performance in comparison with my previous test (took about 70 seconds to finish the Job).

Edit: I misunderstood his suggestion. @zero323 meant to let Cassandra perform the count instead of using Spark SQL, as explained in here

Also I wanted to point out that I am aware of the inherent anti-pattern of setting a list<double> instead a wide row for this type of data, but my concerns at this moment are more the time spent on retrieval of a large dataset rather than the actual average computation time.

2
If you want to perform count then querying external source is way more efficient. In general a lot depends on what you do. Regarding partitions spark.sql.shuffle.partitions is not used here. Initial number of partitions is set by data source and count always uses 1 task for final aggregation.zero323
thank you again @zero323. Please check my update. Also, if I understand correctly, you are saying the number of partitions are set by Cassandra?TMichel
OK< I think I wasn't clear enough :/ My point was to perform query directly against Cassandra without using Spark SQL if you perform simple actions like count all rows. Not deploying separate server.zero323
Regarding the number of partitions take a look at spark.cassandra.input.split.size_in_mb parameter and how it relates to the total number of partitions. Also for simple count take a look at cassandraCount on Cassandra backed RDDs.zero323

2 Answers

5
votes

Is that the expected performance? If not, what am I missing?

It looks slowish but it is not exactly unexpected. In general count is expressed as

SELECT 1 FROM table

followed by Spark side summation. So while it is optimized it still rather inefficient because you have fetch N long integers from the external source just to sum these locally.

As explained by the docs Cassandra backed RDD (not Datasets) provide optimized cassandraCount method which performs server side counting.

Theory says the number of partitions of a DataFrame determines the number of tasks Spark will distribute the job in. If I am setting the spark.sql.shuffle.partitions to (...), why is creating (...) Tasks?

Because spark.sql.shuffle.partitions is not used here. This property is used to determine number of partitions for shuffles (when data is aggregated by some set of keys) not for Dataset creation or global aggregations like count(*) (which always use 1 partition for final aggregation).

If you interested in controlling number of initial partitions you should take a look at spark.cassandra.input.split.size_in_mb which defines:

Approx amount of data to be fetched into a Spark partition. Minimum number of resulting Spark partitions is 1 + 2 * SparkContext.defaultParallelism

As you can see another factor here is spark.default.parallelism but it is not exactly a subtle configuration so depending on it in general is not an optimal choice.

0
votes

I see that it is very old question but maybe someone needs it now. When running Spark on local machine it is very important to set into SparkConf master "local[*]" that according to documentation allows to run Spark with as many worker threads as logical cores on your machine.

It helped me to increase performance of count() operation by 100% on local machine comparing to master "local".