0
votes

I am trying to count the following RDD elements using count() method. The first is as follows:

scala> val data_wo_header=dropheader(data)
data_wo_header: org.apache.spark.rdd.RDD[String]

When I do Count on this I get :

scala> data_wo_header.count()
res1: Long = 20000263 

This operation is relatively quick and takes about 26sec

Now I Transform the RDD as follows:

scala> val ratings_split = data_wo_header.map(line => line.split(",")).persist()
ratings_split: org.apache.spark.rdd.RDD[Array[String]]

scala> ratings_split.count()
res2: Long = 20000263  

This counting takes around 5 min. Can someone suggest why the time of reading the count increased so dramatically? The drop header function looks like this just to drop the first line:

  def dropheader(data: RDD[String]): RDD[String] = {
   data.mapPartitionsWithIndex((idx, lines) => {
    if (idx == 0) {
     lines.drop(1)
    }
   lines
   })
  }

data is just val data = sc.textFile(file, 2).cache()

2

2 Answers

1
votes

The second is obviously longer because you're not only counting lines, you're also transforming each line to an array of strings.

Using persist() with no option means it uses MEMORY_ONLY and therefore is exactly the same as using cache().

Now 5 minutes seems costly but it depends on your configuration (total memory, CPUs) but also the number of elements per line.

As Chobeat said, you need to investigate with the Spark UI.

0
votes

Well, this is easier for you to verify by looking into the Spark UI and see the stages that take more time. The map over the data may take some time to go over the whole dataset and that explain the slow down. Also persist() could introduce some overhead but I'm not sure about it.

My suggestion is to read that CSV using the CSV data source if you can.