0
votes

I have two Spark DataFrames where one of them has two cols, id and Tag. A second DataFrame has an id col, but missing the Tag. The first Dataframe is essentially a dictionary, each id appears once, while in the second DataFrame and id may appear several times. What I need is to create a new col in the second DataFrame that has the Tag as a function of the id in each row (in the second DataFrame). I think this can be done by converting to RDDs first ..etc, but I thought there must be a more elegant way using DataFrames (in Java). Example: given a df1 Row-> id: 0, Tag: "A", a df2 Row1-> id: 0, Tag: null, a df2 Row2-> id: 0, Tag: "B", I need to create a Tag col in the resulting DataFrame df3 equal to df1(id=0) = "A" IF df2 Tag was null, but keep original Tag if not null => resulting in df3 Row1-> id: 0, Tag: "A", df3 Row2-> id: 0, Tag: "B". Hope the example is clear.

|   ID  |   No.   |  Tag  | new Tag Col |
|    1  |  10002  |   A   |      A      |
|    2  |  10003  |   B   |      B      | 
|    1  |  10004  | null  |      A      |
|    2  |  10005  | null  |      B      |
1
Any reason why a simple LEFT OUTER JOIN won't work for you?zero323
I edited the question and will look into LOJ..Kai
LOJ doesn't completely solve it, but I think if I follow it with a udf() then I get what I need. Thanks,Kai
If it is not enough you can try to update the question with example input and expected output. It is really hard to understand what you want right now.zero323
when Tag is null, new Tag = Tag(id). E.g. Tag(id=1) = A, so we assign A to 10004 and Tag(id=2) = B to 10005. I think I need a udf() for this.Kai

1 Answers

1
votes

All you need here is left outer join and coalesce:

import org.apache.spark.sql.functions.coalesce

val df = sc.parallelize(Seq(
  (1, 10002, Some("A")), (2, 10003, Some("B")),
  (1, 10004, None), (2, 10005, None)
)).toDF("id", "no", "tag")

val lookup = sc.parallelize(Seq(
  (1, "A"), (2, "B")
)).toDF("id", "tag")


df.join(lookup, df.col("id").equalTo(lookup.col("id")), "leftouter")
  .withColumn("new_tag", coalesce(df.col("tag"), lookup.col("tag")))

This should almost identical to Java version.