1
votes

I need to add a new column to dataframe DF1 but the new column's value should be calculated using other columns' value present in that DF. Which of the other columns to be used will be given in another dataframe DF2.
eg. DF1

|protocolNo|serialNum|testMethod  |testProperty|
+----------+---------+------------+------------+       
|Product1  |  AB     |testMethod1 | TP1        |
|Product2  |  CD     |testMethod2 | TP2        |

DF2-

|action| type|               value       |        exploded |
+------------+---------------------------+-----------------+
|append|hash |        [protocolNo]       | protocolNo      |
|append|text |            _              |     _           | 
|append|hash | [serialNum,testProperty]  | serialNum       |
|append|hash | [serialNum,testProperty]  | testProperty    |

Now the value of exploded column in DF2 will be column names of DF1 if value of type column is hash.

Required - New column should be created in DF1. the value should be calculated like below-

hash[protocolNo]_hash[serialNumTestProperty] ~~~ here on place of column their corresponding row values should come.

eg. for Row1 of DF1, col value should be

hash[Product1]_hash[ABTP1]

this will result into something like this abc-df_egh-45e after hashing.

The above procedure should be followed for each and every row of DF1.

I've tried using map and withColumn function using UDF on DF1. But in UDF, outer dataframe value is not accessible(gives Null Pointer Exception], also I'm not able to give DataFrame as input to UDF.

Input DFs would be DF1 and DF2 as mentioned above.

Desired Output DF-

|protocolNo|serialNum|testMethod  |testProperty| newColumn      |
+----------+---------+------------+------------+----------------+       
|Product1  |  AB     |testMethod1 | TP1        | abc-df_egh-4je |
|Product2  |  CD     |testMethod2 | TP2        | dfg-df_ijk-r56 |

newColumn value is after hashing

2
Edit your question and add your code to see what´s happeningEmiliano Martinez
Can you please list the input DFs and the desired Output DF. That would be helpful.Ganesh
@tamilnad posted input DFs and the output DF.Sonali Sharma
@SonaliSharma Did you check my answer?Ganesh
@tamilnad yes it worked with that. Thanks for the help!!Sonali Sharma

2 Answers

0
votes

Instead of DF2, you can translate DF2 to case class like Specifications, e.g

case class Spec(columnName:String,inputColumns:Seq[String],action:String,action:String,type:String*){}

Create instances of above class

val specifications = Seq(
Spec("new_col_name",Seq("serialNum","testProperty"),"hash","append")
                     )

Then you can process the below columns

 val transformed =  specifications
        .foldLeft(dtFrm)((df: DataFrame, spec: Specification) => df.transform(transformColumn(columnSpec)))

def transformColumn(spec: Spec)(df: DataFrame): DataFrame = { 

 spec.type.foldLeft(df)((df: DataFrame, type : String) => {
           type match {
                  case "append" => {have a case match of the action and do that , then append with df.withColumn}

}
}

Syntax may not be correct

0
votes

Since DF2 has the column names that will be used to calculate a new column from DF1, I have made this assumption that DF2 will not be a huge Dataframe.

First step would be to filter DF2 and get the column names that we want to pick from DF1.

val hashColumns = DF2.filter('type==="hash").select('exploded).collect

Now, hashcolumns will have the columns that we want to use to calculate hash in the newColumn. The hashcolumns is an Array of Row. We need this to be a Column that will be applied while creating the newColumn in DF1.

val newColumnHash = hashColumns.map(f=>hash(col(f.getString(0)))).reduce(concat_ws("_",_,_))

The above line will convert the Row to a Column with hash function applied to it. And we reduce it while concatenating _. Now, the task becomes simple. We just need to apply this to DF1.

DF1.withColumn("newColumn",newColumnHash).show(false)

Hope this helps!