0
votes

I am trying to calculate the count of occurrence of a key with respect to the current transaction timestamp over the last 30 days. I know I can calculate this using a window directly, but I have an enormously large amount of data for a single key (partition) in the window, hence the job never completes on my hardware.

I saw some blogs talk about UDAF. Could some one suggest me how do to it, point me to good articles (did not find something that fits the use case I am looking for. Like window + UDAF

Below is my window val rollingWindow = Window .partitionBy(keyColumn) .orderBy(unix_timestamp) .rangeBetween(-30243600, currentRow)

I can't partition the data by any other key, because the counts would go wrong then. Any suggestions would be greatly appreciated

1
UDAFs are hard, try to avoid and look for alternativesthebluephantom

1 Answers

0
votes

Implementing a slice window with UDAFs can be a little bit tricky. If the problem is that the partition is very big it is possible that you have the same problem with the UDAF because you need to aggregate for the key too.

I would check first the problem with the slide Window, why it takes so long to complete(does it complete?) before going to other options. You have to take into account that Spark SQL optimizations improves the memory/CPU/storage efficiency, so processing your data with JVM types with the UDAF, a priori, would not improve the performance.

If your partition is so big you can try working with iterator transformations in RDDs and mapPartitions. An iterator is a way to access collection of elements one-by-one. An iterator doesn't load the whole collection of elements in memory all together. Instead iterator loads elements one after another".

You can use iterator to iterator transformation with mapPartitions. The point here is that you should build a RDD from those partitions first and apply the function mapPartition:

// AN example of pair RDD
val rdd : RDD[(Int, Array[Double])] = ???

rdd.mapPartitions { iterator : Iterator[(Int, Array[Double])] =>
  // Read the elements one by one with the next method
  // 
  iterator.map(el => (el, windowFunction(buffer))
}

Be careful, if you load the iterator in a collection like List you will have the same memory problems. So you need to use the iterator´s map function to perform the aggregation function and transform this iterator in another iterator. Another problem is that doing the function by date ranges can be more difficult because you can only read the iterator once so you would need to deal with that.