0
votes

I have a spark job that runs reads data from one cassandra table and dumps the result back into two tables with slight modifications. My problem is that the job takes much longer than expected.

The code is as follows:

val range = sc.parallelize(0 to 100)

val rdd1 = range.map(x => (some_value, x)).joinWithCassandraTable[Event](keyspace_name, table2).select("col1", "col2", "col3", "col4", "col5", "col6", "col7").map(x => x._2)

val rdd2: RDD[((Int, String, String, String), Iterable[Event])] = rdd1.keyBy(r => (r.col1, r.col2, r.col3, r.col4 )).groupByKey

val rdd3 = rdd2.mapValues(iter => someFunction(iter.toList.sorted))

//STORE 1

rdd3.map(r => (r._1._1, r._1._2, r._1._3, r._1._4, r._2.split('|')(1).toDouble )).saveToCassandra(keyspace_name, table1, SomeColumns("col1","col2", "col3","col4", "col5"))

//STORE 2  

rdd3.map(r => (to, r._1%100, to, "MANUAL_"+r._1+"_"+r._2+"_"+r._3+"_"+r._4+"_"+java.util.UUID.randomUUID(), "M", to, r._4, r._3, r._1, r._5, r._2) ).saveToCassandra(keyspace_name, table2, SomeColumns("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10", "col11"))

For around a million records, STORE 1 takes close to 40 seconds and STORE 2 (slight modification to rdd3) takes more than a minute. Am not sure where I am going wrong or why is taking so much time. My spark environment is as follows:

DSE 4.8.9 with 6 nodes 70 GB RAM 12 cores each

Any help is appreciated.

1
have you tried adding rdd3 to checkpoint and see it goes faster? - Marko Švaljek
Nope. It does not go faster - Mohammed Khusro Siddiqui

1 Answers

0
votes

Let me do my guess. Logs, perf monitoring output and C* data model is needed for more precise answer. But some math: You have

  • joinWithCassandra — random C* read
  • saveToCassandra — sec C* write
  • spark repartition? / reduce

(I expect saveToCassadndra takes half of all time) and if you do not run any queries before you need to minus 12-20 sec for spark to start executors and other things

SO for 1M entries on 6nodes and 40 sec you got:
 1000000 / 6 / 40 = 4166 record/sec/node. That's not bad. 10K/s per node with mixed workload is a good result.

The second write is 2 times bigger (11 column compared to 5) and it run after the first one, so i expect Cassandra to start spilling previous data to disk at thas moment, so you can get more perf degradation here.

do I understand correctly that when you add rdd3.cache() call, nothing changed for the second run? That strange.

and yes you can get better results with tuning of C* data model and Spark/C* parameters