3
votes

I need to implement NVL function in spark while joining two dataframes.

Input Dataframes :

ds1.show()
---------------
|key  | Code  |
---------------
|2    | DST   |
|3    | CPT   |
|null | DTS   |
|5    | KTP   |
---------------

ds2.show()
------------------
|key  | PremAmt |
------------------
|2     | 300   |
|-1    | -99   |
|5     | 567   |
------------------

Need to implement "LEFT JOIN NVL(DS1.key, -1) = DS2.key" . So I have written like this, but NVL or Coalesce function is missing .so it returned wrong values.

How to incorporate "NVL" in spark dataframes ?

// nvl function is missing, so wrong output
ds1.join(ds1,Seq("key"),"left_outer")

-------------------------
|key  | Code  |PremAmt  |
-------------------------
|2    | DST   |300      |
|3    | CPT   |null     |
|null | DTS   |null     |
|5    | KTP   |567      |
-------------------------

Expected Result :

-------------------------
|key  | Code  |PremAmt  |
-------------------------
|2    | DST   |300      |
|3    | CPT   |null     |
|null | DTS   |-99      |
|5    | KTP   |567      |
-------------------------
4
ds1.na.fill(-1, $"key").join(ds2 , Seq("key") , "leftouter") ?philantrovert

4 Answers

5
votes

I know one complex way.

 val df = df1.join(df2, coalesce(df1("key"), lit(-1)) === df2("key"), "left_outer")

You should rename column name "key" of one df, and drop the column after join.

4
votes

An implementation of nvl in Scala

import org.apache.spark.sql.Column;
import org.apache.spark.sql.functions.{when, lit};

def nvl(ColIn: Column, ReplaceVal: Any): Column = {
  return(when(ColIn.isNull, lit(ReplaceVal)).otherwise(ColIn))
}

Now you can use nvl as you would use any other function for data frame manipulation, like

val NewDf = DF.withColumn("MyColNullsReplaced", nvl($"MyCol", "<null>"))

Obviously, Replaceval must be of the correct type. The example above assumes $"MyCol" is of type string.

0
votes

The answer is use NVL, this code in python works

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]").appName("CommonMethods").getOrCreate()

Note: SparkSession is being bulit in a "chained" fashion,ie. 3 methods are being applied in teh same line

Read CSV file

df = spark.read.csv('C:\\tableausuperstore1_all.csv',inferSchema='true',header='true')

df.createOrReplaceTempView("ViewSuperstore")

The ViewSuperstore can be ued for SQL NOW

print("*trace1-nvl")

df = spark.sql("select nvl(state,'a') testString, nvl(quantity,0) testInt  from ViewSuperstore where state='Florida' and OrderDate>current_date() ")

df.show()

print("*trace2-FINAL")
0
votes

This worked for me:

intermediateDF.select(col("event_start_timestamp"),
        col("cobrand_id"),
        col("rule_name"),
        col("table_name"),
        coalesce(col("dimension_field1"),lit(0)),
        coalesce(col("dimension_field2"),lit(0)),
        coalesce(col("dimension_field3"),lit(0)),
        coalesce(col("dimension_field4"),lit(0)),
        coalesce(col("dimension_field5"),lit(0))
      )