I am trying to dcast my spark dataframe using sdf_pivot()
function. I want to
display values of columns like value.var parameter in dcast()
from reshape2 package. Please look at the example below.
id <- c(1,1,1,1,1,2,2,2,3,3,3)
name <- c("A","B","C","D","E","A","B","C","D","E","F")
value <- c(1,2,3,1,1,2,3,1,1,2,3)
dt <- data.frame(id,name,value)
reshape2::dcast(dt,id~name,value.var = "value")
output1-
id A B C D E F
1 1 1 2 3 1 1 NA
2 2 2 3 1 NA NA NA
3 3 NA NA NA 1 2 3
spark_dt <- copy_to(sc, dt)
sdf_pivot(spark_dt,id~name)
output2-
id A B C D E F
<dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1 1 1 1 1 1 1 NaN
2 3 NaN NaN NaN 1 1 1
3 2 1 1 1 NaN NaN NaN
It seems we don't have value.var parameter in sdf_pivot()
function.
I am new to spark and any suggestions would be much appreciated.
Do I need to write custom function to do it?
Note**- I tried
##Pivoting
cohort_paste <- function(gdf) {
expr <- invoke_static(
sc,
"org.apache.spark.sql.functions",
"paste",
"value"
)
gdf %>% invoke("agg", expr, list())
}
It is giving error
Error: java.lang.IllegalArgumentException: invalid method paste for object org.apache.spark.sql.functions
I actually want to use paste
function.
df <- tibble(
id = c(rep(1, 9), rep(2, 9)),
name = rep(rep(c("A", "B", "C"), each=3), 2),
value = sample(10,18,replace=T)
)[sample(1:18, size=10), ]
spark_dt <- copy_to(sc, df, overwrite=TRUE)
collect_list <- function(gdf) {
expr <- invoke_static(
sc,
"org.apache.spark.sql.functions",
"collect_list",
"value"
)
gdf %>% invoke("agg", expr, list())
}
sdf_pivot(spark_dt, id ~ name, fun.aggregate=collect_list) %>%
mutate_at(vars(-id), funs(concat_ws(" ", .)))
Error log-
Error: org.apache.spark.sql.AnalysisException: cannot resolve 'concat_ws(' ', sparklyr_tmp_79e15abf584.
A
)' due to data type mismatch: argument 2 requires (array or string) type, however, 'sparklyr_tmp_79e15abf584.A
' is of array type.; line 1 pos 13; 'GlobalLimit 10 +- 'LocalLimit 10 +- 'Project [id#3038, concat_ws( , A#3156) AS A#3172, concat_ws( , B#3158) AS B#3173, concat_ws( , C#3160) AS C#3174] +- SubqueryAlias sparklyr_tmp_79e15abf584 +- Aggregate [id#3038], [id#3038, collect_list(if ((name#3039 = A)) value#3040 else cast(null as int), 0, 0) AS A#3156, collect_list(if ((name#3039 = B)) value#3040 else cast(null as int), 0, 0) AS B#3158, collect_list(if ((name#3039 = C)) value#3040 else cast(null as int), 0, 0) AS C#3160] +- Project [id#3038, name#3039, value#3040] +- SubqueryAlias df +- Relation[id#3038,name#3039,value#3040] csv
fun.aggregate
in linked question. – Alper t. Turker