0
votes

My Spark application is as follow :

1) execute large query with Spark SQL into the dataframe "dataDF"

2) foreach partition involved in "dataDF" :

2.1) get the associated "filtered" dataframe, in order to have only the partition associated data

2.2) do specific work with that "filtered" dataframe and write output

The code is as follow :

val dataSQL = spark.sql("SELECT ...")
val dataDF = dataSQL.repartition($"partition")

for {
  row <- dataDF.dropDuplicates("partition").collect
} yield {

   val partition_str : String = row.getAs[String](0)
   val filtered = dataDF.filter($"partition" .equalTo( lit( partition_str ) ) )

   // ... on each partition, do work depending on the partition, and write result on HDFS

   // Example :

   if( partition_str == "category_A" ){

       // do group by, do pivot, do mean, ...
       val x = filtered
         .groupBy("column1","column2")
         ...

       // write final DF
       x.write.parquet("some/path")

   } else if( partition_str == "category_B" ) {

       // select specific field and apply calculation on it
       val y = filtered.select(...)

       // write final DF
       x.write.parquet("some/path")

   } else if ( ... ) {

      // other kind of calculation
      // write results

   } else {

      // other kind of calculation
      // write results

   }

}

Such algorithm works successfully. The Spark SQL query is fully distributed. However the particular work done on each resulting partition is done sequentially, and the result is inneficient especially because each write related to a partition is done sequentially.

In such case, what are the ways to replace the "for yield" by something in parallel/async ?

Thanks

1
You can do work on workers inside map or flatMap function argument. - Aleksey Isachenkov
Hello, As I said, I have to do specific work (applying specific actions) on each partition of the resulting spark sql dataframe, leading to different calculations and output schema depending on the partition. My goal is to have different calculations that are partition dependant working in parallel. I don't see how to do that with simple map. Could you please elaborate as an answer ? - Klun
Indeed 'for' means sequential and thus not distributed. Not sure why you opted for that - may need to shed some light on what that logic is. - thebluephantom
Hey. I have added some pseudo-code. What I'm trying to achieve, is "just" to have each block inside if statement runing in parallel. A way, that each final logic applied to each particular partition from a common dataframe run in parallel. Thanks - Klun
There are some posts on using .par in Big Data none-the-less. - thebluephantom

1 Answers

1
votes
  1. You could use foreachPartition if writing to data stores outside Hadoop scope with specific logic needed for that particular env.

  2. Else map, etc.

  3. .par parallel collections (Scala) - but that is used with caution. For reading files and pre-processing them, otherwise possibly considered risky.

  4. Threads.

  5. You need to check what you are doing and if the operations can be referenced, usewd within a foreachPartition block, etc. You need to try as some aspects can only be written for the driver and then get distributed to the executors via SPARK to the workers. But you cannot write, for example, spark.sql for the worker as per below - at the end due to some formatting aspect errors I just got here in the block of text. See end of post.

  6. Likewise df.write or df.read cannot be used in the below either. What you can do is write individual execute/mutate statements to, say, ORACLE, mySQL.

Hope this helps.

rdd.foreachPartition(iter => {
       while(iter.hasNext) {
         val item = iter.next()
         // do something
         spark.sql("INSERT INTO tableX VALUES(2,7, 'CORN', 100, item)")
         // do some other stuff
  })

or

RDD.foreachPartition (records => {       
  val JDBCDriver = "com.mysql.jdbc.Driver" ...
  ...
  connectionProperties.put("user", s"${jdbcUsername}")
  connectionProperties.put("password", s"${jdbcPassword}")
 val connection = DriverManager.getConnection(ConnectionURL, jdbcUsername, jdbcPassword)
  ...
  val mutateStatement = connection.createStatement()
  val queryStatement = connection.createStatement()
  ...
      records.foreach (record => { 
              val val1 = record._1
              val val2 = record._2
              ...
              mutateStatement.execute (s"insert into sample (k,v) values(${val1}, ${nIterVal})")      
            })
  }            
)