I have setup Spark 2.0 and Cassandra 3.0 on a local machine (8 cores, 16gb ram) for testing purposes and edited spark-defaults.conf
as follows:
spark.python.worker.memory 1g
spark.executor.cores 4
spark.executor.instances 4
spark.sql.shuffle.partitions 4
Next I imported 1.5 million rows in Cassandra:
test(
tid int,
cid int,
pid int,
ev list<double>,
primary key (tid)
)
test.ev
is a list containing numeric values i.e. [2240,2081,159,304,1189,1125,1779,693,2187,1738,546,496,382,1761,680]
Now in the code, to test the whole thing I just created a SparkSession
, connected to Cassandra and make a simple select count:
cassandra = spark.read.format("org.apache.spark.sql.cassandra")
df = cassandra.load(keyspace="testks",table="test")
df.select().count()
At this point, Spark outputs the count
and takes about 28 seconds to finish the Job
, distributed in 13 Tasks
(in Spark UI
, the total Input for the Tasks is 331.6MB)
Questions:
- Is that the expected performance? If not, what am I missing?
- Theory says the number of partitions of a DataFrame determines the number of tasks Spark will distribute the job in. If I am setting the
spark.sql.shuffle.partitions
to 4, why is creating 13 Tasks? (Also made sure the number of partitions callingrdd.getNumPartitions()
on my DataFrame)
Update
A common operation I would like to test over this data:
- Query a large data set, say, from 100,000 ~ N rows grouped by
pid
- Select
ev
, alist<double>
- Perform an average on each member, assuming by now each list has the same length i.e
df.groupBy('pid').agg(avg(df['ev'][1]))
As @zero323 suggested, I deployed a external machine (2Gb RAM, 4 cores, SSD) with Cassandra just for this test, and loaded the same data set. The result of the df.select().count()
was an expected greater latency and overall poorer performance in comparison with my previous test (took about 70 seconds to finish the Job
).
Edit: I misunderstood his suggestion. @zero323 meant to let Cassandra perform the count instead of using Spark SQL, as explained in here
Also I wanted to point out that I am aware of the inherent anti-pattern of setting a list<double>
instead a wide row for this type of data, but my concerns at this moment are more the time spent on retrieval of a large dataset rather than the actual average computation time.
spark.sql.shuffle.partitions
is not used here. Initial number of partitions is set by data source and count always uses 1 task for final aggregation. – zero323spark.cassandra.input.split.size_in_mb
parameter and how it relates to the total number of partitions. Also for simple count take a look atcassandraCount
on Cassandra backed RDDs. – zero323