1
votes

I have a dataset which is 1,000,000 rows by about 390,000 columns. The fields are all binary, either 0 or 1. The data is very sparse.

I've been using Spark to process this data. My current task is to filter the data--I only want data in 1000 columns that have been preselected. This is the current code that I'm using to achieve this task:

val result = bigdata.map(_.zipWithIndex.filter{case (value, index) => selectedColumns.contains(index)})

bigdata is just an RDD[Array[Int]]

However, this code takes quite a while to run. I'm sure there's a more efficient way to filter my dataset that doesn't involve going in and filtering every single row separately. Would loading my data into a DataFrame, and maniuplating it through the DataFrame API make things faster/easier? Should I be looking into column-store based databases?

2

2 Answers

2
votes

You can start with making your filter a little bit more efficient. Please note that:

  • your RDD contains Array[Int]. It means you can access nth element of each row in O(1) time
  • #selectedColumns << #columns

Considering these two facts it should be obvious that it doesn't make sense to iterate over all elements for each row not to mention contains calls. Instead you can simply map over selectedColumns

// Optional if selectedColumns are not ordered
val orderedSelectedColumns = selectedColumns.toList.sorted.toArray
rdd.map(row => selectedColumns.map(row))

Comparing time complexity:

  1. zipWithIndex + filter (assuming best case scenario when contains is O(1)) - O(#rows * # columns)
  2. map - O(#rows * #selectedColumns)
1
votes

The easiest way to speed up execution is to parallelize it with partitionBy:

bigdata.partitionBy(new HashPartitioner(numPartitions)).foreachPartition(...)

foreachPartition receives a Iterator over which you can map and filter.

numPartitions is a val which you can set with the amount of desired parallel partitions.