0
votes

I was trying to join two dataframes and create a new column with the values of the attribute dynamically (or at least trying to do that).

I have to split the columns from formulaTable and create additional columns and then join it with the attribute table.

However, I am not able to split the columns dynamically properly.

I have two questions which i have kept in bold in following steps.
So Currently in my formulaTable the data is like this.

val attributeFormulaDF = Seq("A0004*A0003","A0003*A0005").toDF("AttributeFormula")  

So data is like

+----------------+
|AttributeFormula|
+----------------+
|A0004*A0003     |
|A0003*A0005     |
+----------------+

Attribute data is like this.

    val attrValTransposedDF = Seq(
      ("2007", 201801, "DEL", "A0003", "NA", "ATTRIB_DAY1_VALUE", "801"),
      ("2007", 201801, "DEL", "A0003", "NA", "ATTRIB_DAY2_VALUE", "802"),
      ("2007", 201801, "DEL", "A0003", "NA", "ATTRIB_DAY3_VALUE", "803"),
      ("2007", 201801, "DEL", "A0003", "NA", "ATTRIB_DAY4_VALUE", "804"),
      ("2007", 201801, "DEL", "A0003", "NA", "ATTRIB_DAY5_VALUE", "805"),
      ("2007", 201801, "DEL", "A0003", "NA", "ATTRIB_DAY6_VALUE", "736"),
      ("2007", 201801, "DEL", "A0003", "NA", "ATTRIB_DAY7_VALUE", "1007"),
      ("2007", 201801, "DEL", "A0004", "NA", "ATTRIB_DAY1_VALUE", "901"),
      ("2007", 201801, "DEL", "A0004", "NA", "ATTRIB_DAY2_VALUE", "902"),
      ("2007", 201801, "DEL", "A0004", "NA", "ATTRIB_DAY3_VALUE", "903"),
      ("2007", 201801, "DEL", "A0004", "NA", "ATTRIB_DAY4_VALUE", "904"),
      ("2007", 201801, "DEL", "A0004", "NA", "ATTRIB_DAY5_VALUE", "905"),
      ("2007", 201801, "DEL", "A0004", "NA", "ATTRIB_DAY6_VALUE", "936"),
      ("2007", 201801, "DEL", "A0004", "NA", "ATTRIB_DAY7_VALUE", "9007"))
      .toDF("Store_Number", "Attribute_Week_Number", "Department_Code", "Attribute_Code", "Attribute_General_Value", "Day", "Value")
      .select("Attribute_Code", "Day", "Value")

So data is like

+--------------+-----------------+-----+
|Attribute_Code|Day              |Value|
+--------------+-----------------+-----+
|A0003         |ATTRIB_DAY1_VALUE|801  |
|A0003         |ATTRIB_DAY2_VALUE|802  |
|A0003         |ATTRIB_DAY3_VALUE|803  |
|A0003         |ATTRIB_DAY4_VALUE|804  |
|A0003         |ATTRIB_DAY5_VALUE|805  |
|A0003         |ATTRIB_DAY6_VALUE|736  |
|A0003         |ATTRIB_DAY7_VALUE|1007 |
|A0004         |ATTRIB_DAY1_VALUE|901  |
|A0004         |ATTRIB_DAY2_VALUE|902  |
|A0004         |ATTRIB_DAY3_VALUE|903  |
|A0004         |ATTRIB_DAY4_VALUE|904  |
|A0004         |ATTRIB_DAY5_VALUE|905  |
|A0004         |ATTRIB_DAY6_VALUE|936  |
|A0004         |ATTRIB_DAY7_VALUE|9007 |
+--------------+-----------------+-----+

Now i am splitting it based on *

val firstDF = attributeFormulaDF.select("AttributeFormula")
    val rowVal = firstDF.first.mkString.split("\\*").length
    val columnSeq = (0 until rowVal).map(i => col("temp").getItem(i).as(s"col$i"))
    val newDFWithSplitColumn = firstDF.withColumn("temp", split(col("AttributeFormula"), "\\*"))
        .select(col("*") +: columnSeq :_*).drop("temp")

I have referred to this stackOverFlow post (Split 1 column into 3 columns in spark scala)

So the split columns is like

+----------------+-----+-----+
|AttributeFormula|col0 |col1 |
+----------------+-----+-----+
|A0004*A0003     |A0004|A0003|
|A0003*A0005     |A0003|A0005|
+----------------+-----+-----+

Question 1: if my AttributeFormula can have any number of attributes list(which is just a string) how will i split it dynamically.

eg: 
+-----------------+
|AttributeFormula |
+-----------------+
|A0004            |
|A0004*A0003      |
|A0003*A0005      |
|A0003*A0004      |
|A0003*A0004*A0005|
+-----------------+

So I kind of need like this

+---------------- +-----+-----+------+
|AttributeFormula |col0 |col1 | col2 | 
+---------------- +-----+-----+------+
|A0004            |A0004|null | null |
|A0003*A0005      |A0003|A0005| null |
|A0003*A0004      |A0003|A0004| null |
|A0003*A0004*A0005|A0003|A0004| A0005|
+----------------+-----+-----+

Again I am joining the attributeFormula with attribute values to get the formula values column .

val joinColumnCondition  = newDFWithSplitColumn.columns
      .withFilter(_.startsWith("col"))
      .map(col(_) === attrValTransposedDF("Attribute_Code"))
//using zipWithIndex to make the value columns separate and to avoid ambiguous error while joining
      val dataFrameList = joinColumnCondition.zipWithIndex.map {  
      i =>
        newDFWithSplitColumn.join(attrValTransposedDF, i._1)
          .withColumnRenamed("Value", s"Value${i._2}")
          .drop("Attribute_Code")
    }

     val combinedDataFrame = dataFrameList.reduce(_.join(_, Seq("Day","AttributeFormula"),"LEFT"))

     val toBeConcatColumn = combinedDataFrame.columns.filter(_.startsWith("Value"))

    combinedDataFrame
      .withColumn("AttributeFormulaValues", concat_ws("*", toBeConcatColumn.map(c => col(c)): _*))
      .select("Day","AttributeFormula","AttributeFormulaValues")

So my final output looks like this.

+-----------------+----------------+----------------------+
|Day              |AttributeFormula|AttributeFormulaValues|
+-----------------+----------------+----------------------+
|ATTRIB_DAY7_VALUE|A0004*A0003     |9007*1007             |
|ATTRIB_DAY6_VALUE|A0004*A0003     |936*736               |
|ATTRIB_DAY5_VALUE|A0004*A0003     |905*805               |
|ATTRIB_DAY4_VALUE|A0004*A0003     |904*804               |
|ATTRIB_DAY3_VALUE|A0004*A0003     |903*803               |
|ATTRIB_DAY2_VALUE|A0004*A0003     |902*802               |
|ATTRIB_DAY1_VALUE|A0004*A0003     |901*801               |
|ATTRIB_DAY7_VALUE|A0003           |1007                  |
|ATTRIB_DAY6_VALUE|A0003           |736                   |
|ATTRIB_DAY5_VALUE|A0003           |805                   |
|ATTRIB_DAY4_VALUE|A0003           |804                   |
|ATTRIB_DAY3_VALUE|A0003           |803                   |
|ATTRIB_DAY2_VALUE|A0003           |802                   |
|ATTRIB_DAY1_VALUE|A0003           |801                   |
+-----------------+----------------+----------------------+

This code is working fine if i have only fixed attributeFormula(ie. relates to question 1)

Question 2: how will I avoid using Dataframe list and use the reduce function?

1

1 Answers

0
votes

For Question 1, here it is a possible solution:

Given that you have a dataframe with formulas:

 val attributeFormulaDF = Seq("A0004*A0003","A0003*A0005", "A0003*A0004*A0005").toDF("formula")

you can split it and form an array

val splitFormula = attributeFormulaDF.select(col("formula"), split(col("formula"), "\\*").as("split"))

After that you select the maximum array size

  val maxSize = splitFormula.select(max(size(col("split")))).first().getInt(0)

Now the interesting part is that based on the max size you can start generating columns and associated it to the previous array

  val enhancedFormula = (0 until(maxSize)).foldLeft(splitFormula)( (df, i) => {
    df.withColumn(s"col_${i}", expr(s"split[${i}]"))
  })

Here is the output

+-----------------+--------------------+-----+-----+-----+
|          formula|               split|col_0|col_1|col_2|
+-----------------+--------------------+-----+-----+-----+
|      A0004*A0003|      [A0004, A0003]|A0004|A0003| null|
|      A0003*A0005|      [A0003, A0005]|A0003|A0005| null|
|A0003*A0004*A0005|[A0003, A0004, A0...|A0003|A0004|A0005|
+-----------------+--------------------+-----+-----+-----+

I think this can easily be used for the question 2