0
votes

I have a dataframe with 100 columns and col names like col1, col2, col3.... I want to apply certain transformation on the values of columns based on condition matches. I can store the column names in a array of string. And pass the value each element of the array in withColumn and based on When condition i can transform the values of the column vertically. But the question is, as Dataframe is immutable, so each updated version is need to store in a new variable and also new dataframe need to pass in withColumn to transform for next iteration. Is there any way to create array of dataframe so that new dataframe can be stored as a element of array and it can iterate based on the value of iterator. Or is there any other way to handle the same.

var arr_df : Array[DataFrame] = new Array[DataFrame](60)   

--> This throws error "not found type DataFrame"

val df(0) = df1.union(df2)

for(i <- 1 to 99){
  val df(i) = df(i-1).withColumn(col(i), when(col(i)> 0, col(i) + 
   1).otherwise(col(i)))

Here col(i) is an array of strings that stores the name of the columns of the original datframe .

As a example :


scala> val original_df = Seq((1,2,3,4),(2,3,4,5),(3,4,5,6),(4,5,6,7),(5,6,7,8),(6,7,8,9)).toDF("col1","col2","col3","col4")
original_df: org.apache.spark.sql.DataFrame = [col1: int, col2: int ... 2 more fields]

scala> original_df.show()

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   2|   3|   4|
|   2|   3|   4|   5|
|   3|   4|   5|   6|
|   4|   5|   6|   7|
|   5|   6|   7|   8|
|   6|   7|   8|   9|
+----+----+----+----+

I want to iterate 3 columns : col1, col2, col3 if the value of that column is greater than 3, then it will be updated by +1

4
Hi, welcome to SO. As the community standard, can you please post a sample input , what you have tried, expected output and where you are stuck? - Raghu
@Raghu, Please let me know if you want me to more specific. I can not share the exact code but I will try give an example if needed - Ayan Chatterjee
yes a sample input and an excpected output would help. Also try formating your code using the format option -{ when you edit. That improves readablity - Raghu
@Raghu , Hope this helps you to understand the problem more specifically . - Ayan Chatterjee
It would be good if you can add sample data, what kind of transformation and expected output.. - Srinivas

4 Answers

0
votes

If I understand you right, you are trying to do a dataframe wise operation. you dont need to iterate for this . I can show you how it can be done in pyspark. probably it can be taken over in scala.

from pyspark.sql import functions as F
tst= sqlContext.createDataFrame([(1,7,0),(1,8,4),(1,0,10),(5,1,90),(7,6,0),(0,3,11)],schema=['col1','col2','col3'])
expr = [F.when(F.col(coln)>3,F.col(coln)+1).otherwise(F.col(coln)).alias(coln) for coln in tst.columns if 'col3' not in coln]
tst1= tst.select(*expr)

results:

tst1.show()
+----+----+
|col1|col2|
+----+----+
|   1|   8|
|   1|   9|
|   1|   0|
|   6|   1|
|   8|   7|
|   0|   3|
+----+----+

This should give you the desired result

0
votes

You can iterate over all columns and apply the condition in single line as below,

original_df.select(original_df.columns.map(c => (when(col(c) > lit(3), col(c)+1).otherwise(col(c))).alias(c)):_*).show()


+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   2|   3|   5|
|   2|   3|   5|   6|
|   3|   5|   6|   7|
|   5|   6|   7|   8|
|   6|   7|   8|   9|
|   7|   8|   9|  10|
+----+----+----+----+
0
votes

Check below code.

scala> df.show(false)
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|1   |2   |3   |4   |
|2   |3   |4   |5   |
|3   |4   |5   |6   |
|4   |5   |6   |7   |
|5   |6   |7   |8   |
|6   |7   |8   |9   |
+----+----+----+----+
scala>  val requiredColumns = df.columns.zipWithIndex.filter(_._2 < 3).map(_._1).toSet
requiredColumns: scala.collection.immutable.Set[String] = Set(col1, col2, col3)
scala> val allColumns = df.columns
allColumns: Array[String] = Array(col1, col2, col3, col4)
scala> val columnExpr = allColumns.filterNot(requiredColumns(_)).map(col(_)) ++ requiredColumns.map(c => when(col(c) > 3, col(c) + 1).otherwise(col(c)).as(c))
scala> df.select(columnExpr:_*).show(false)
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|1   |2   |3   |4   |
|2   |3   |5   |5   |
|3   |5   |6   |6   |
|5   |6   |7   |7   |
|6   |7   |8   |8   |
|7   |8   |9   |9   |
+----+----+----+----+
0
votes

You can use foldLeft whenever you want to make changes on multiple columns as below

val original_df = Seq(
  (1,2,3,4),
  (2,3,4,5),
  (3,4,5,6),
  (4,5,6,7),
  (5,6,7,8),
  (6,7,8,9)
).toDF("col1","col2","col3","col4")

//Filter the columns that yuou want to update
val columns = original_df.columns

columns.foldLeft(original_df){(acc, colName) =>
  acc.withColumn(colName, when(col(colName) > 3, col(colName) + 1).otherwise(col(colName)))
}
.show(false)

Output:

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|1   |2   |3   |5   |
|2   |3   |5   |6   |
|3   |5   |6   |7   |
|5   |6   |7   |8   |
|6   |7   |8   |9   |
|7   |8   |9   |10  |
+----+----+----+----+