0
votes

I want to define a udf. In the function body, it will search data from external dataframe. How can I do that? I tried to pass the dataframe to udf. But it cannot work.

Sample code:

val countryDF = spark.read
  .option("inferSchema", "true")
  .option("header", "true")
  .csv("Country.csv")

val geo = (originString: String, dataFrame: DataFrame) => {
  // Search data from countryDF
  val row = dataFrame.where(col("CountryName") === originString)
  if (row != Nil){
    // set data to row index 2
    row.getAs[String](2)
  }
  else{
    "0"
  }
}
val udfGeo = udf(geo)

val cLatitudeAndLongitude = udfGeo(countryTestDF.col("CountryName"), lit(countryDF))

countryTestDF = countryTestDF.withColumn("Latitude", cLatitudeAndLongitude)
1
lit can be used for strings. we can't parse dataframe to UDF.Giri
if I understand correctly, you may use below code [change the when condition ] countryDF.withColumn("cLatitudeAndLongitudeFlag", when(col("CountryName") === "originString" , "1").otherwise("0")).select("cLatitudeAndLongitudeFlag").distinct.show()Giri
Thanks. Sorry for the unclear code. I changed the code a little. Actually, the function is to set row(2) instead of "1" if found row in dataframe countryDF. Then how can I implement it? Do you mean dataframe cannot be passed to UDF?devin
countryTestDF is final dataframe. The task is to search each cell of column "CountryName" from another dataframe countryDF. If found a record. Get the data of corresponding row (column index is 2).devin

1 Answers

0
votes

If you want to use a UDF, you have to work on columns, not on dataframe object You have to create a new column that take the output of the UDF.

def geo(originString : String, CountryName: String) : Int = {

    if (CountryName == originString){
      return 1}
    else{
      return 0}
  }

val geoUDF = udf(geo _)

val newData = countryDF.withColum("isOrignOrNot", geoUDF(col("originString"),col("CountryName"))