1
votes

Examples borrowed from Internet, thanks to those with better insights.

The following can be found on various forums in relation to mapPartitions and map:

... Consider the case of Initializing a database. If we are using map() or 
foreach(), the number of times we would need to initialize will be equal to 
the no of elements in RDD. Whereas if we use mapPartitions(), the no of times 
we would need to initialize would be equal to number of Partitions ...

Then there is this response:

val newRd = myRdd.mapPartitions(
  partition => {

    val connection = new DbConnection /*creates a db connection per partition*/

    val newPartition = partition.map(
       record => {
         readMatchingFromDB(record, connection)
     })
    connection.close()
    newPartition
  })

So, my questions are after having read discussions on various items pertaining to this:

  1. Whilst I can understand the performance improvement using mapPartitions in general, why would according to the first snippet of text, the database connection be called every time for each element of an RDD using map? I can't seem to find the right reason.
  2. The same things does not happen with sc.textFile ... and reading into dataframes from jdbc connections. Or does it? I would be very surprised if this was so.

What am I missing...?

2

2 Answers

4
votes

First of all this code is not correct. While it looks like an adaptation of the established pattern for foreachPartition it cannot be used with mapPartitions like this.

Remember that foreachPartition takes Iterator[_] and returns Iterator[_], where Iterator.map is lazy, so this code is closing connection before it is actually used.

To use some form of resource, which is initialized in mapPartitions, you'll have to use design your code in a way, that doesn't require explicit resource release.

the first snippet of text, the database connection be called every time for each element of an RDD using map? I can't seem to find the right reason.

Without the snippet in question the answer must be generic - map or foreach are not designed to handle external state. With the API shown your in your question you'd have to:

rdd.map(record => readMatchingFromDB(record, new DbConnection))

which in and obvious way creates connection for each element.

It is not impossible to use for example singleton connection pool, doing something similar to:

object Pool {
  lazy val pool = ???
}

rdd.map(record => readMatchingFromDB(record, pool.getConnection))

but it is not always easy to to do it right (think about thread safety). And because connections and similar objects, cannot be in general serialized, we cannot just used closures.

In contrast foreachPartition pattern is both explicit and simple.

It is of course possible to force eager execution to make things work, for example:

val newRd = myRdd.mapPartitions(
  partition => {

    val connection = new DbConnection /*creates a db connection per partition*/

    val newPartition = partition.map(
       record => {
         readMatchingFromDB(record, connection)
    }).toList
    connection.close()
    newPartition.toIterator
  })

but it is of course risky, can actually decrease performance.

The same things does not happen with sc.textFile ... and reading into dataframes from jdbc connections. Or does it?

Both operate using much lower API, but of course resources are not initialized for each record.

0
votes

In my opinion, connection should be kept out and created just once before map and closed post task completion.

val connection = new DbConnection /creates a db connection per partition/

val newRd = myRdd.mapPartitions(
  partition => {    

    val newPartition = partition.map(
       record => {
         readMatchingFromDB(record, connection)
     })

    newPartition
  })

connection.close()