1
votes

What would be the best equivalent with Spark dataframes to SQL

update table1 set colx = "some value" where coly in (select coltab2 from table2 where [another condition)]

I have some working solution but I am really not very satisfied with it. Looks really cumbersome and I hope I miss a simplier way

First I get the value for the where clause (there could be thousands so I don't wand to use a collection)

val df2 = xxxx.select("coltab2")
df2: org.apache.spark.sql.DataFrame = [coltab2: string]

this dataframe contains all the values I want to retain in the where clause

Then I perform a left outer join with table1 to add coltab2 on df2.coltab2=df1.coly. If the value of added coltab2 is not null this means that it was present in table2 so I use this condition to update another column from original table1 (df1) and then drop this added column coltab2 that served only as a condition to update another column

val df_updated = df1.join(df2, df1("coly") === df2("coltab2"), "left_outer").withColumn("colx", when(!isnull($"coltab2"), "some value").otherwise(col("colx"))).drop(col("coltab2"))

Hope I am completly wrong and there is a more efficient way to do it ;)

1
it seems to be perfect solution. There are two improvements to be done. 1 instead of left_outer join you can simply use left join and 2 .otherwise(col("colx")), colx is just being and cannot be used in when .Ramesh Maharjan

1 Answers

0
votes

I think what you have is a neat solution with good readability. If wanted, you could explore another approach using RDD. With the assumption that your column list is not big, you can collect the column list into a set and map colx in df1 accordingly as follows:

val df1 = Seq(
  ("x1", "y1"), ("x2", "y2"), ("x3", "y3")
).toDF("colx", "coly")

val df2 = Seq(
  ("y1"), ("y3"), ("y5")
).toDF("coltab2")

import org.apache.spark.sql.Row

val colList: Set[String] = df2.rdd.map{ case Row(c: String) => c }.collect.toSet

val dfUpdated = df1.rdd.map{
    case Row(x: String, y: String) => (if (colList contains y) "some value" else x, y)
  }.toDF("colx", "coly")

dfUpdated.show
+----------+----+
|      colx|coly|
+----------+----+
|some value|  y1|
|        x2|  y2|
|some value|  y3|
+----------+----+