1
votes

I have two dataframes, lets say df1 and df2 in Spark Scala

df1 has two fields, 'ID' and 'Text' where 'Text' has some description (Multiple words). I have already removed all special characters and numeric characters from field 'Text' leaving only alphabets and spaces.

df1 Sample

+--------------++--------------------+
|ID            ||Text                |     
+--------------++--------------------+
| 1            ||helo how are you    |
| 2            ||hai haiden          |
| 3            ||hw are u uma        |
--------------------------------------

df2 contains a list of words and corresponding replacement words

df2 Sample

+--------------++--------------------+
|Word          ||Replace             |     
+--------------++--------------------+
| helo         ||hello               |
| hai          ||hi                  |
| hw           ||how                 |
| u            ||you                 |
--------------------------------------

I would need to find all occurrence of words in df2("Word") from df1("Text") and replace it with df2("Replace")

With the sample dataframes above, I would expect a resulting dataframe, DF3 as given below

df3 Sample

+--------------++--------------------+
|ID            ||Text                |     
+--------------++--------------------+
| 1            ||hello how are you   |
| 2            ||hi haiden           |
| 3            ||how are you uma     |
--------------------------------------

Your help is greatly appreciated in doing the same in Spark using Scala.

3
how big is df2?mtoto

3 Answers

2
votes

It'd be easier to accomplish this if you convert your df2 to a Map. Assuming it's not a huge table, you can do the following :

val keyVal = df2.map( r =>( r(0).toString, r(1).toString ) ).collect.toMap

This will give you a Map to refer to :

scala.collection.immutable.Map[String,String] = Map(helo -> hello, hai -> hi, hw -> how, u -> you)

Now you can use UDF to create a function that will utilize keyVal Map to replace values :

val getVal = udf[String, String] (x => x.split(" ").map(x => res18.get(x).getOrElse(x) ).mkString( " " ) )

Now, you can call the udf getVal on your dataframe to get the desired result.

df1.withColumn("text" , getVal(df1("text")) ).show


+---+-----------------+
| id|             text|
+---+-----------------+
|  1|hello how are you|
|  2|        hi haiden|
|  3|  how are you uma|
+---+-----------------+
1
votes

I will demonstrate only for the first id and assume that you can not do a collect action on your df2. First you need to be sure that the schema for your dataframe is and array for text column on your df1

+---+--------------------+
| id|                text|
+---+--------------------+
|  1|[helo, how, are, ...|
+---+--------------------+

with schema like this:

 |-- id: integer (nullable = true)
 |-- text: array (nullable = true)
 |    |-- element: string (containsNull = true)

After that you can do an explode on the text column

res1.withColumn("text", explode(res1("text")))

+---+----+
| id|text|
+---+----+
|  1|helo|
|  1| how|
|  1| are|
|  1| you|
+---+----+

Assuming you're replace dataframe looks like this:

+----+-------+
|word|replace|
+----+-------+
|helo|  hello|
| hai|     hi|
+----+-------+

Joining the two dataframe will look like this:

res6.join(res8, res6("text") === res8("word"), "left_outer")

+---+----+----+-------+
| id|text|word|replace|
+---+----+----+-------+
|  1| you|null|   null|
|  1| how|null|   null|
|  1|helo|helo|  hello|
|  1| are|null|   null|
+---+----+----+-------+

Do a select with coalescing null values:

res26.select(res26("id"), coalesce(res26("replace"), res26("text")).as("replaced_text"))

+---+-------------+
| id|replaced_text|
+---+-------------+
|  1|          you|
|  1|          how|
|  1|        hello|
|  1|          are|
+---+-------------+

and then group by id and aggregate into a collect list function:

res33.groupBy("id").agg(collect_list("replaced_text"))

+---+---------------------------+
| id|collect_list(replaced_text)|
+---+---------------------------+
|  1|       [you, how, hello,...|
+---+---------------------------+

Keep in mind that you should preserve you initial order of text elements.

0
votes

I Suppose code below should solve your problem

I have solved this by using RDD

 val wordRdd = df1.rdd.flatMap{ row =>
 val wordList = row.getAs[String]("Text").split(" ").toList
 wordList.map{word => Row.fromTuple(row.getAs[Int]("id"),word)}
}.zipWithIndex()

val wordDf = sqlContext.createDataFrame(wordRdd.map(x => Row.fromSeq(x._1.toSeq++Seq(x._2))),StructType(List(StructField("id",IntegerType),StructField("word",StringType),StructField("index",LongType))))
val opRdd =  wordDf.join(df2,wordDf("word")===df2("word"),"left_outer").drop(df2("word")).rdd.groupBy(_.getAs[Int]("id")).map(x => Row.fromTuple(x._1,x._2.toList.sortBy(x => x.getAs[Long]("index")).map(row => if(row.getAs[String]("Replace")!=null) row.getAs[String]("Replace") else row.getAs[String]("word")).mkString(" ")))
val opDF = sqlContext.createDataFrame(opRdd,StructType(List(StructField("id",IntegerType),StructField("Text",StringType))))