0
votes

Suppose we have a lot of JSON-s in HDFS, but for a prototype we load some JSON-s locally into Spark with:

val eachJson = sc.textFile("JSON_Folder/*.json")

I want to write a job which goes through the eachJson RDD[String] and calculates the size of each JSON. The size then is added to an accumulator and the corresponding JSON is added to a StringBuilder. But when the size of the concatenated JSON-s exceeds a threshold, then we start to store the other JSON-s in a new StringBuilder.

For instance, if we have 100 JSON-s, and we start to calculate the size of them one by one, we observe that from the 32th element the size of the concatenated JSON-s exceeds the threshold, then we group together only the first 31 JSON-s. After that we start again from the 32th element.

What I managed to do until now is to obtain the indexes where we have to split the RDD based on the following code:

eachJson.collect()
  .map(_.getBytes("UTF-8").length)
  .scanLeft(0){_ + _}
  .takeWhile(_ < 20000) //threshold = 20000
  .length-1

Also I tried:

val accum = sc.accumulator(0, "My Accumulator")
val buf = new StringBuilder
while(accum.value < 20000)
  {
    for(i <- eachJson)
      {
        accum.add(i.getBytes("UTF-8").length)
        buf ++= i
      }    
  }

But I receive the following error: org.apache.spark.SparkException: Task not serializable.

How can I do this in Spark via Scala? I use Spark 1.6.0 and Scala 2.10.6

2
You requirement sounds like you need to keep sequential order of the input JSONs. Is that a fact ? Do you must have the 31 first jsons together and then the exact next ones (in this case, how do you define the order of the files ? by their name ?) ? Or could it be OK to aggregate the inputs in any order, as long as the resulting size is OK ? - GPI
Well, my JSONs contain time, and maybe in the future I will be interested in the time frame which is will be the time of the last JSON - the first one. And in this case the sequential order is important. But let's suppose this is not a requirement, do you have an idea how to do it? Wouldn't be the solution more sophisticated? - sanyi14ka

2 Answers

1
votes

Spark's progamming model is not ideal for what you are trying to achieve, if we take the general problem of "aggregating elements depending on something that can only be known by inspecting previous elements", for two reasons :

  1. Spark does not, generally speaking, impose an ordering over the datas (but it can do it)
  2. Sparks deals with datas in partitions, and the sizes of the partitions are not usually (e.g. by default) dependant on the contents of the data, but by a default partitionner whose role is to divide datas evenly into partitions.

So it's not really a question of possible (it is), it rather is a question of "how much does it cost" (CPU / memory / time), for what it buys you.

A draft for an exact solution

If I were to shoot for an exact solution (by exact, I mean : preserving elements order, defined by, e.g. a timestamp in the JSONs, and grouping exactly consecutive inputs to the largest amount that approaches the boundary), I would :

  1. Impose an ordering on the RDD (there is a sortBy function, which does that) : this is a full data shuffle, so it IS expensive.
  2. Give each row an id, after the sort, (there is a RDD version of zipWithIndex which respects ordering on the RDD, if it exists. There is also a faster dataframe equivalent, that creates monotically increasing indexes, albeit non consecutive ones).
  3. Collect the fraction of the result that is necessary to calculate size boundaries (the boundaries being the ids defined at step 2), pretty much as you did. This again is a full pass on the datas.
  4. Create a partitionner of datas that respects these boundaries (e.g. make sure that each elements of a single boundary are all in the same partition), and apply this partitionner to the RDD obtained at step 2 (another full shuffle on the datas). You just got yourself partitions that are logically equivalent to what you expect, e.g. groups of elements whose sum of sizes is under a certain limit. But the ordering inside each partition may have been lost in the repartitionning process. So you are not over yet !
  5. Then I would mapPartitions on this result to :
    5.1. resort the datas locally to each partition,
    5.2. group items in the data structure I need once sorted

One of the key being not to apply anything that messes with partitions between step 4 and 5. As long as the "partition map" fits into the driver's memory, this is almost a practical solution, but a very costly one.

A simpler version (with relaxed constraints)

If it is ok for groups not to reach an optimal size, then the solution becomes much simpler (and it respects the ordering of the RDD if you have set one) : it is pretty much what you would code if there was no Spark at all, just an Iterator of JSON files.

Personnaly, I'd define a recursive accumulator function (nothing spark related) like so (I guess you could write your shorter, more efficient version using takeWhile) :

  /**
    * Aggregate recursively the contents of an iterator into a Seq[Seq[]]
    * @param remainingJSONs the remaining original JSON contents to be aggregated
    * @param currentAccSize the size of the active accumulation
    * @param currentAcc the current aggregation of json strings
    * @param resultAccumulation the result of aggregated JSON strings
    */
  @tailrec
  def acc(remainingJSONs: Iterator[String], currentAccSize: Int, currentAcc: Seq[String], resultAccumulation: Seq[Seq[String]]): Seq[Seq[String]] = {
    // IF there is nothing more in the current partition
    if (remainingJSONs.isEmpty) {
      // And were not in the process of acumulating
      if (currentAccSize == 0)
        // Then return what was accumulated before
        resultAccumulation
      else
        // Return what was accumulated before, and what was in the process of being accumulated
        resultAccumulation :+ currentAcc
    } else {
      // We still have JSON items to process
      val itemToAggregate = remainingJSONs.next()
      // Is this item too large for the current accumulation ?
      if (currentAccSize + itemToAggregate.size > MAX_SIZE) {
        // Finish the current aggregation, and proceed with a fresh one
        acc(remainingJSONs, itemToAggregate.size, Seq(itemToAggregate), resultAccumulation :+ currentAcc)
      } else {
        // Accumulate the current item on top of the current aggregation
        acc(remainingJSONs, currentAccSize + itemToAggregate.size, currentAcc :+ itemToAggregate, resultAccumulation)
      }
    }
  }

No you take this accumulating code, and make it run for each partition of spark's dataframe :

val jsonRDD = ...
val groupedJSONs = jsonRDD.mapPartitions(aPartition => {
  acc(aPartition, 0, Seq(), Seq()).iterator
})

This will turn your RDD[String] into a RDD[Seq[String]] where each Seq[String] is made of consecutive RDD elements (which may be predictible if the RDD has been sorted, and may not otherwise), whose total length is below the threshold. What may be "sub-optimal" is that, at the end of each partition, may lie a Seq[String] with just a few (possibly, a single) JSONs, while at the beginning of the following partition, a full one was created.

1
votes

Not an answer; just to point you to the right direction. You get "Task is not serializable" exception because your val buf = new StringBuilder is used inside RDD's foreach (for(i <- eachJson)). Spark cannot distribute your buf variable as StringBuilder itself is not serializable. Besides you shouldn't access mutable state directly. So recommendation is to put all data you need to Accumulator, not just sizes:

case class MyAccumulator(size: Int, result: String)

And use something like rdd.aggregate or rdd.fold:

eachJson.fold(MyAccumulator(0, ""))(...)

//or

eachJson.fold(List.empty[MyAccumulator])(...)

Or just use it with scanLeft as you collect anyway.

Be aware that this won't be scalable (same as StringBuilder/collect solution). In order to make it scalable - use mapPartitions.


Update. mapPartitions would give you an ability to partially aggregate your JSONs as you would get "local" iterator (partition) as your input - you can operate it as a regular scala collection. It might be enough if you ok with some small percent JSONs not being concatenated.

 eachJson.mapPartitions{ localCollection =>
    ... //compression logic here
 }