1
votes

We are currently facing a performance issue in sparksql written in scala language. Application flow is mentioned below.

  1. Spark application reads a text file from input hdfs directory
  2. Creates a data frame on top of the file using programmatically specifying schema. This dataframe will be an exact replication of the input file kept in memory. Will have around 18 columns in the dataframe

    var eqpDF = sqlContext.createDataFrame(eqpRowRdd, eqpSchema)

  3. Creates a filtered dataframe from the first data frame constructed in step 2. This dataframe will contain unique account numbers with the help of distinct keyword.

    var distAccNrsDF = eqpDF.select("accountnumber").distinct().collect()

  4. Using the two dataframes constructed in step 2 & 3, we will get all the records which belong to one account number and do some Json parsing logic on top of the filtered data.

    var filtrEqpDF = eqpDF.where("accountnumber='" + data.getString(0) + "'").collect()

  5. Finally the json parsed data will be put into Hbase table

Here we are facing performance issues while calling the collect method on top of the data frames. Because collect will fetch all the data into a single node and then do the processing, thus losing the parallel processing benefit. Also in real scenario there will be 10 billion records of data which we can expect. Hence collecting all those records in to driver node will might crash the program itself due to memory or disk space limitations.

I don't think the take method can be used in our case which will fetch limited number of records at a time. We have to get all the unique account numbers from the whole data and hence I am not sure whether take method, which takes limited records at a time, will suit our requirements

Appreciate any help to avoid calling collect methods and have some other best practises to follow. Code snippets/suggestions/git links will be very helpful if anyone have had faced similar issues

Code snippet

    val eqpSchemaString = "acoountnumber ....."
  val eqpSchema = StructType(eqpSchemaString.split(" ").map(fieldName => 
StructField(fieldName, StringType, true))); 
    val eqpRdd = sc.textFile(inputPath)
    val eqpRowRdd = eqpRdd.map(_.split(",")).map(eqpRow => Row(eqpRow(0).trim, eqpRow(1).trim, ....)

    var eqpDF = sqlContext.createDataFrame(eqpRowRdd, eqpSchema);


    var distAccNrsDF = eqpDF.select("accountnumber").distinct().collect()


    distAccNrsDF.foreach { data =>

      var filtrEqpDF = eqpDF.where("accountnumber='" + data.getString(0) + "'").collect()



      var result = new JSONObject()

      result.put("jsonSchemaVersion", "1.0")
      val firstRowAcc = filtrEqpDF(0)
      //Json parsing logic 
      {
         .....
         .....
      }
    }
2
what do you actually want to do? just write to Hbase table?. If that's the case why do you want to collect or do a take? collect and take are to be used just to view sample data. apart from that there is no need to use collect or take.Mohan
Basically we need to group all the data which belongs to the same account number(in the source file) and the grouped data has to be pushed to hbase. Collect we are using because, in order to find out the distinct account numbers globally. If we don't call the collect, how can we globally find out the unique account numbers that might be spread in multiple nodes.afzal

2 Answers

2
votes

The approach usually take in this kind of situation is:

  • Instead of collect, invoke foreachPartition: foreachPartition applies a function to each partition (represented by an Iterator[Row]) of the underlying DataFrame separately (the partition being the atomic unit of parallelism of Spark)
  • the function will open a connection to HBase (thus making it one per partition) and send all the contained values through this connection

This means the every executor opens a connection (which is not serializable but lives within the boundaries of the function, thus not needing to be sent across the network) and independently sends its contents to HBase, without any need to collect all data on the driver (or any one node, for that matter).

It looks like you are reading a CSV file, so probably something like the following will do the trick:

spark.read.csv(inputPath).         // Using DataFrameReader but your way works too
  foreachPartition { rows =>
    val conn = ???                 // Create HBase connection
    for (row <- rows) {            // Loop over the iterator
      val data = parseJson(row)    // Your parsing logic
      ???                          // Use 'conn' to save 'data'
    }
  }
2
votes

You can ignore collect in your code if you have large set of data.

Collect Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

Also this can cause the driver to run out of memory, though, because collect() fetches the entire RDD/DF to a single machine.

I have just edited your code, which should work for you.

        var distAccNrsDF = eqpDF.select("accountnumber").distinct()
            distAccNrsDF.foreach { data =>
              var filtrEqpDF = eqpDF.where("accountnumber='" + data.getString(0) + "'")
              var result = new JSONObject()
              result.put("jsonSchemaVersion", "1.0")
              val firstRowAcc = filtrEqpDF(0)
              //Json parsing logic 
              {
                 .....
                 .....
              }
            }