0
votes

I have dataset which like this:

+-------+------+-------+
|groupid|rownum|column2|
+-------+------+-------+
|   1   |  1   |   7   |
|   1   |  2   |   9   |
|   1   |  3   |   8   |
|   1   |  4   |   5   |
|   1   |  5   |   1   |
|   1   |  6   |   0   |
|   1   |  7   |   15  |
|   1   |  8   |   1   |
|   1   |  9   |   13  |
|   1   |  10  |   20  |
|   2   |  1   |   8   |
|   2   |  2   |   1   |
|   2   |  3   |   4   |
|   2   |  4   |   2   |
|   2   |  5   |   19  |
|   2   |  6   |   11  |
|   2   |  7   |   5   |
|   2   |  8   |   6   |
|   2   |  9   |   15  |
|   2   |  10  |   8   |
 still have more rows......

I want to add a new column "column3" , which if the continuous column2 values are less than 10,then they will be arranged a same number such as 1. if their appear a value larger than 10 in column2, this row will be dropped ,then the following column3 row’s value will increase 1. For example, when groupid = 1,the column3's value from rownum 1 to 6 will be 1 and the rownum7 will be dropped, the column3's value of rownum 8 will be 2 and the rownum9,10 will be dropped.After the procedure, the table will like this:

+-------+------+-------+-------+
|groupid|rownum|column2|column3|
+-------+------+-------+-------+
|   1   |  1   |   7   |   1   |
|   1   |  2   |   9   |   1   | 
|   1   |  3   |   8   |   1   |
|   1   |  4   |   5   |   1   |
|   1   |  5   |   1   |   1   |
|   1   |  6   |   0   |   1   |
|   1   |  7   |   15  |  drop | this row will be dropped, in fact not exist  
|   1   |  8   |   1   |   2   |
|   1   |  9   |   13  |  drop |  like above
|   1   |  10  |   20  |  drop |  like above
|   2   |  1   |   8   |   1   |
|   2   |  2   |   1   |   1   |
|   2   |  3   |   4   |   1   |
|   2   |  4   |   2   |   1   |
|   2   |  5   |   19  |  drop |   ...
|   2   |  6   |   11  |  drop |   ...
|   2   |  7   |   5   |   2   |
|   2   |  8   |   6   |   2   |
|   2   |  9   |   15  |  drop |   ...
|   2   |  10  |   8   |   3   |

In our project, the dataset is expressed as dataframe in spark sql I try to solve this problem by udf in this way:

var last_rowNum: Int = 1
var column3_Num: Int = 1    
def assign_column3_Num(rowNum:Int): Int = {
    if (rowNum == 1){   //do nothing, just arrange 1
      column3_Num = 1
      last_rowNum = 1
      return column3_Num
    }
    /*** if the difference between rownum is 1, they have the same column3 
     * value, if not, column3_Num++, so they are different
     */ 
    if(rowNum - last_rowNum == 1){  
      last_rowNum = rowNum
      return column3_Num
    }else{
      column3_Num += 1
      last_rowNum = rowNum
      return column3_Num
    }
}
spark.sqlContext.udf.register("assign_column3_Num",assign_column3_Num _)
df.filter("column2>10")   //drop the larger rows
  .withColumn("column3",assign_column3_Num(col("column2"))) //add column3

as you can see, I use global variable. However, it's only effective in spark local[1] model. if i use local[8] or yarn-client, the result will totally wrong! this is because spark's running mechanism,they operate the global variable without distinguishing groupid and order!

So the question is how can i arrange right number when spark running on cluster? use udf or udaf or RDD or other ? thank you!

1
How do you get | 2 | 10 | 8 | 3 | in the last row of output, since you have | 2 | 9 | 7 | in the input - koiralo
sorry, it's my problem. I've typed the wrong. and now i correct it.@Shankar Koirala - Chuang

1 Answers

0
votes

You can achieve your requirement by defining a udf function as below (comments are given for clarity)

import org.apache.spark.sql.functions._
def createNewCol = udf((rownum: collection.mutable.WrappedArray[Int], column2: collection.mutable.WrappedArray[Int]) => {  // udf function
  var value = 1                                        //value for column3
  var previousValue = 0                                //value for checking condition
  var arrayBuffer = Array.empty[(Int, Int, Int)]       //initialization of array to be returned
  for((a, b) <- rownum.zip(column2)){                  //zipping the collected lists and looping
    if(b > 10 && previousValue < 10)                   //checking condition for column3
      value = value +1                                 //adding 1 for column3
    arrayBuffer = arrayBuffer ++ Array((a, b, value))  //adding the values
    previousValue = b
  }
  arrayBuffer
})

Now utilize the algorithm defined in the udf function and to get the desired result, you would need to collect the values of rownum and column2 grouping them by groupid and sorting them by rownum and then call the udf function. Next steps would be to explode and select necessary columns. (commented for clarity)

   df.orderBy("rownum").groupBy("groupid").agg(collect_list("rownum").as("rownum"), collect_list("column2").as("column2"))  //collecting in order for generating values for column3
    .withColumn("new", createNewCol(col("rownum"), col("column2")))                      //calling udf function and storing the array of struct(rownum, column2, column3) in new column
    .drop("rownum", "column2")                                                         //droping unnecessary columns
    .withColumn("new", explode(col("new")))                                          //exploding the new column array so that each row can have struct(rownum, column2, column3)
    .select(col("groupid"), col("new._1").as("rownum"), col("new._2").as("column2"), col("new._3").as("column3"))  //selecting as separate columns
    .filter(col("column2") < 10)                                       // filtering the rows with column2 greater than 10
    .show(false)

You should have your desired output as

+-------+------+-------+-------+
|groupid|rownum|column2|column3|
+-------+------+-------+-------+
|1      |1     |7      |1      |
|1      |2     |9      |1      |
|1      |3     |8      |1      |
|1      |4     |5      |1      |
|1      |5     |1      |1      |
|1      |6     |0      |1      |
|1      |8     |1      |2      |
|2      |1     |8      |1      |
|2      |2     |1      |1      |
|2      |3     |4      |1      |
|2      |4     |2      |1      |
|2      |7     |5      |2      |
|2      |8     |6      |2      |
|2      |10    |8      |3      |
+-------+------+-------+-------+