I want to define a udf. In the function body, it will search data from external dataframe. How can I do that? I tried to pass the dataframe to udf. But it cannot work.
Sample code:
val countryDF = spark.read
.option("inferSchema", "true")
.option("header", "true")
.csv("Country.csv")
val geo = (originString: String, dataFrame: DataFrame) => {
// Search data from countryDF
val row = dataFrame.where(col("CountryName") === originString)
if (row != Nil){
// set data to row index 2
row.getAs[String](2)
}
else{
"0"
}
}
val udfGeo = udf(geo)
val cLatitudeAndLongitude = udfGeo(countryTestDF.col("CountryName"), lit(countryDF))
countryTestDF = countryTestDF.withColumn("Latitude", cLatitudeAndLongitude)
lit
can be used for strings. we can't parse dataframe to UDF. – GiricountryDF.withColumn("cLatitudeAndLongitudeFlag", when(col("CountryName") === "originString" , "1").otherwise("0")).select("cLatitudeAndLongitudeFlag").distinct.show()
– Giri