0
votes

I am pretty new to Spark, I have tried to look for something on the web but I haven't found anything satisfactory.

I have always run parallel computations using the command mclapply and I like its structure (i.e., first parameter used as scrolling index, second argument the function to be parallelized, and then other optional parameters passed to the function). Now I am trying to do kind of the same thing via Spark, i.e., I would like to distribute my computations among all the node of the Spark cluster. This is shortly what I have learned and how I think the code should be structured (I'm using the package sparklyr):

  1. I create a connection to Spark using the command spark_connect;
  2. I copy my data.frame in the Spark environment with copy_to and access it through its tibble;
  3. I would like to implement a "Spark-friendly" version of mclapply, but I have seen there is no similar function in the package (I have seen there exists the function spark.lapply in the SparkR package, but unfortunately it is not in the CRAN anymore).

Here below, a simple test script I have implemented that works using the function mclapply.

#### Standard code that works with mclapply #########
dfTest = data.frame(X = rep(1, 10000), Y = rep(2, 10000))

.testFunc = function(X = 1, df, str) {
    rowSelected = df[X, ]
    y = as.numeric(rowSelected[1] + rowSelected[2])
    return(list(y = y, str = str))
}

lOutput = mclapply(X = 1 : nrow(dfTest), FUN = .testFunc, df = dfTest, 
                   str = "useless string", mc.cores = 2)

######################################################

###### Similar code that should work with Spark ######
library(sparklyr)
sc = spark_connect(master = "local")

dfTest = data.frame(X = rep(1, 10000), Y = rep(2, 10000))

.testFunc = function(X = 1, df, str) {
  rowSelected = df[X, ]
  nSum = as.numeric(rowSelected[1] + rowSelected[2])
  return(list(nSum = nSum, str = str))
}

dfTest_tbl = copy_to(sc, dfTest, "test_tbl", overwrite = TRUE)

# Apply similar function mclapply to dfTest_tbl, that works with 
# Spark
# ???
######################################################

If someone has already found a solution for this, then it will be great. Also other references/guides/links are more than welcome. Thanks!

2

2 Answers

5
votes

sparklyr

spark_apply is existing function you're looking for:

spark_apply(sdf, function(data) {
   ...
})

Please refer to Distributed R in sparklyr documentation for details.

SparkR

With SparkR use gapply / gapplyCollect

gapply(df, groupingCols, function(data) {...} schema)

dapply / dapplyCollect

dapply(df, function(data) {...}, schema)

UDFs. Refer to

for details.

Be warned that all solutions are inferior compared to native Spark code and should be avoided when high performance is required.

1
votes

sparklyr::spark_apply now can support pass some external variables like models as context.

Here is my example to run xgboost model on sparklyr:

bst <- xgboost::xgb.load("project/models/xgboost.model")
res3 <- spark_apply(x = ft_union_price %>% sdf_repartition(partitions = 1500, partition_by = "uid"),
                   f = inference_fn,
                   packages = F,
                   memory = F,
                   names = c("uid",
                               "action_1",
                               "pred"), 
                   context = {model <- bst})