0
votes

I am trying to process each row in a Spark dataframe, and convert this to another data frame. Essentially, I have a frame A which contains a column ("id") and another column which is array of sentences. I'd like to convert this to another dataframe with each sentence uniquely identified with a "docID:count" identifier string. My code is:

var sentencesCollection:Seq[SentenceIdentifier] = Seq()
tokenized.foreach(row => {
    val docID = row.getAs[String]("id")
    val sentences = row.getAs[Seq[String]]("sentences")
    var count:Integer = 0
    for (elem <- sentences) {
        val sentenceID:String = docID + ":" + count
        count = count + 1

        val si = SentenceIdentifier(sentenceID, elem)
        sentencesCollection = sentencesCollection :+ si
     }

})

println(sentencesCollection.length)

However, the println statement prints "0".

Any idea how I can have sentencesCollection be the sequence that I can further process downstream ? (Possibly thought a .toDF() call).

1
That wont work, because the foreach is executed by the Executors (probably in another machine in another jvm) while your Seq only lives in the Driver. This has bee asked before uncountable amount of times. If you want to have a local copy of your data, use collect. - However, since what you want to do is create a new DataFrame, create a local collection would be a waste of memory and a performance bottle neck. There are a lot of methods to create a new DataFrame from another, like select, filter or map (on Dataset). I think you should read more about how Spark works - Luis Miguel Mejía Suárez

1 Answers

5
votes

As @Luis Miguel Mejía Suárez explained very well in a comment, any function passed as the argument of DataFrame.foreach will be executed on one or more executor machine, not on the driver running this code, therefore any change to mutable state will be lost (it will be performed on the executors and thrown away).

When working with DataFrames, you should always think in terms of transforming one DF to another, using nothing but Spark's APIs to do so. These transformations are "instructions" for Spark, to be executed by it distributedly.

In this case, your requirement can be achieved with this in mind. You want to:

  • Explode the records, meaning turn every record containing an array to multiple records each with one element from the array
  • Keep track of the exploded element's position in the array
  • Concat that position to the existing value of the "id" column with ":" as separator

Each one of these actions can be achieved through one of Spark's functions meant to be executed on DataFrame columns. Here's how the solution looks like:

import org.apache.spark.sql.functions._
import spark.implicits._

// Sample data
val tokenized = Seq(
  (1, Array("Hi there", "Hello there")),
  (2, Array("Bye now")),
  (3, Array("Thank you", "Thanks", "Many thanks"))
).toDF("id", "sentences")

val result = tokenized
   // we'll use posexplode function which creates "pos" and "col" columns
  .select($"id", posexplode($"sentences")) 
   // we'll create a new docID column using concat function, and rename "col"
  .select(concat($"id", lit(":"), $"pos") as "docID", $"col" as "sentence")

result.show()
// +-----+-----------+
// |docID|   sentence|
// +-----+-----------+
// |  1:0|   Hi there|
// |  1:1|Hello there|
// |  2:0|    Bye now|
// |  3:0|  Thank you|
// |  3:1|     Thanks|
// |  3:2|Many thanks|
// +-----+-----------+