I was trying to join two dataframes and create a new column with the values of the attribute dynamically (or at least trying to do that).
I have to split the columns from formulaTable and create additional columns and then join it with the attribute table.
However, I am not able to split the columns dynamically properly.
I have two questions which i have kept in bold in following steps.
So Currently in my formulaTable the data is like this.
val attributeFormulaDF = Seq("A0004*A0003","A0003*A0005").toDF("AttributeFormula")
So data is like
+----------------+
|AttributeFormula|
+----------------+
|A0004*A0003 |
|A0003*A0005 |
+----------------+
Attribute data is like this.
val attrValTransposedDF = Seq(
("2007", 201801, "DEL", "A0003", "NA", "ATTRIB_DAY1_VALUE", "801"),
("2007", 201801, "DEL", "A0003", "NA", "ATTRIB_DAY2_VALUE", "802"),
("2007", 201801, "DEL", "A0003", "NA", "ATTRIB_DAY3_VALUE", "803"),
("2007", 201801, "DEL", "A0003", "NA", "ATTRIB_DAY4_VALUE", "804"),
("2007", 201801, "DEL", "A0003", "NA", "ATTRIB_DAY5_VALUE", "805"),
("2007", 201801, "DEL", "A0003", "NA", "ATTRIB_DAY6_VALUE", "736"),
("2007", 201801, "DEL", "A0003", "NA", "ATTRIB_DAY7_VALUE", "1007"),
("2007", 201801, "DEL", "A0004", "NA", "ATTRIB_DAY1_VALUE", "901"),
("2007", 201801, "DEL", "A0004", "NA", "ATTRIB_DAY2_VALUE", "902"),
("2007", 201801, "DEL", "A0004", "NA", "ATTRIB_DAY3_VALUE", "903"),
("2007", 201801, "DEL", "A0004", "NA", "ATTRIB_DAY4_VALUE", "904"),
("2007", 201801, "DEL", "A0004", "NA", "ATTRIB_DAY5_VALUE", "905"),
("2007", 201801, "DEL", "A0004", "NA", "ATTRIB_DAY6_VALUE", "936"),
("2007", 201801, "DEL", "A0004", "NA", "ATTRIB_DAY7_VALUE", "9007"))
.toDF("Store_Number", "Attribute_Week_Number", "Department_Code", "Attribute_Code", "Attribute_General_Value", "Day", "Value")
.select("Attribute_Code", "Day", "Value")
So data is like
+--------------+-----------------+-----+
|Attribute_Code|Day |Value|
+--------------+-----------------+-----+
|A0003 |ATTRIB_DAY1_VALUE|801 |
|A0003 |ATTRIB_DAY2_VALUE|802 |
|A0003 |ATTRIB_DAY3_VALUE|803 |
|A0003 |ATTRIB_DAY4_VALUE|804 |
|A0003 |ATTRIB_DAY5_VALUE|805 |
|A0003 |ATTRIB_DAY6_VALUE|736 |
|A0003 |ATTRIB_DAY7_VALUE|1007 |
|A0004 |ATTRIB_DAY1_VALUE|901 |
|A0004 |ATTRIB_DAY2_VALUE|902 |
|A0004 |ATTRIB_DAY3_VALUE|903 |
|A0004 |ATTRIB_DAY4_VALUE|904 |
|A0004 |ATTRIB_DAY5_VALUE|905 |
|A0004 |ATTRIB_DAY6_VALUE|936 |
|A0004 |ATTRIB_DAY7_VALUE|9007 |
+--------------+-----------------+-----+
Now i am splitting it based on *
val firstDF = attributeFormulaDF.select("AttributeFormula")
val rowVal = firstDF.first.mkString.split("\\*").length
val columnSeq = (0 until rowVal).map(i => col("temp").getItem(i).as(s"col$i"))
val newDFWithSplitColumn = firstDF.withColumn("temp", split(col("AttributeFormula"), "\\*"))
.select(col("*") +: columnSeq :_*).drop("temp")
I have referred to this stackOverFlow post (Split 1 column into 3 columns in spark scala)
So the split columns is like
+----------------+-----+-----+
|AttributeFormula|col0 |col1 |
+----------------+-----+-----+
|A0004*A0003 |A0004|A0003|
|A0003*A0005 |A0003|A0005|
+----------------+-----+-----+
Question 1: if my AttributeFormula can have any number of attributes list(which is just a string) how will i split it dynamically.
eg:
+-----------------+
|AttributeFormula |
+-----------------+
|A0004 |
|A0004*A0003 |
|A0003*A0005 |
|A0003*A0004 |
|A0003*A0004*A0005|
+-----------------+
So I kind of need like this
+---------------- +-----+-----+------+
|AttributeFormula |col0 |col1 | col2 |
+---------------- +-----+-----+------+
|A0004 |A0004|null | null |
|A0003*A0005 |A0003|A0005| null |
|A0003*A0004 |A0003|A0004| null |
|A0003*A0004*A0005|A0003|A0004| A0005|
+----------------+-----+-----+
Again I am joining the attributeFormula with attribute values to get the formula values column .
val joinColumnCondition = newDFWithSplitColumn.columns
.withFilter(_.startsWith("col"))
.map(col(_) === attrValTransposedDF("Attribute_Code"))
//using zipWithIndex to make the value columns separate and to avoid ambiguous error while joining
val dataFrameList = joinColumnCondition.zipWithIndex.map {
i =>
newDFWithSplitColumn.join(attrValTransposedDF, i._1)
.withColumnRenamed("Value", s"Value${i._2}")
.drop("Attribute_Code")
}
val combinedDataFrame = dataFrameList.reduce(_.join(_, Seq("Day","AttributeFormula"),"LEFT"))
val toBeConcatColumn = combinedDataFrame.columns.filter(_.startsWith("Value"))
combinedDataFrame
.withColumn("AttributeFormulaValues", concat_ws("*", toBeConcatColumn.map(c => col(c)): _*))
.select("Day","AttributeFormula","AttributeFormulaValues")
So my final output looks like this.
+-----------------+----------------+----------------------+
|Day |AttributeFormula|AttributeFormulaValues|
+-----------------+----------------+----------------------+
|ATTRIB_DAY7_VALUE|A0004*A0003 |9007*1007 |
|ATTRIB_DAY6_VALUE|A0004*A0003 |936*736 |
|ATTRIB_DAY5_VALUE|A0004*A0003 |905*805 |
|ATTRIB_DAY4_VALUE|A0004*A0003 |904*804 |
|ATTRIB_DAY3_VALUE|A0004*A0003 |903*803 |
|ATTRIB_DAY2_VALUE|A0004*A0003 |902*802 |
|ATTRIB_DAY1_VALUE|A0004*A0003 |901*801 |
|ATTRIB_DAY7_VALUE|A0003 |1007 |
|ATTRIB_DAY6_VALUE|A0003 |736 |
|ATTRIB_DAY5_VALUE|A0003 |805 |
|ATTRIB_DAY4_VALUE|A0003 |804 |
|ATTRIB_DAY3_VALUE|A0003 |803 |
|ATTRIB_DAY2_VALUE|A0003 |802 |
|ATTRIB_DAY1_VALUE|A0003 |801 |
+-----------------+----------------+----------------------+
This code is working fine if i have only fixed attributeFormula(ie. relates to question 1)
Question 2: how will I avoid using Dataframe list and use the reduce function?