0
votes

I have a R code that does some distributed data preprocessing in sparklyr, and then collects the data to R local dataframe to finally save the result in the CSV. Everything works as expected and now I plan to re-use the spark context across multiple input files processing.

My code looks similar to this reproducible example:

library(dplyr)
library(sparklyr)

sc <- spark_connect(master = "local")

# Generate random input
matrix(rbinom(1000, 1, .5), ncol=1) %>% write.csv('/tmp/input/df0.csv')
matrix(rbinom(1000, 1, .5), ncol=1) %>% write.csv('/tmp/input/df1.csv')

# Multi-job input
input = list(
    list(name="df0", path="/tmp/input/df0.csv"),
    list(name="df1", path="/tmp/input/df1.csv")
)
global_parallelism = 2
results_dir = "/tmp/results2"

# Function executed on each file
f <- function (job) {
    spark_df <- spark_read_csv(sc, "df_tbl", job$path)
    local_df <- spark_df %>% 
      group_by(V1) %>%           
      summarise(n=n()) %>%
      sdf_collect

    output_path <- paste(results_dir, "/", job$name, ".csv", sep="")
    local_df %>% write.csv(output_path)
    return (output_path)
}

If I execute the function of a job inputs in sequential way with lapply everything works as expected:

> lapply(input, f)

[[1]]
[1] "/tmp/results2/df0.csv"

[[2]]
[1] "/tmp/results2/df1.csv"

However, if I plan to run it in parallel to maximize usage of spark context (if df0 spark processing is done and the local R is working on it, df1 can be already processed by spark):

> library(parallel)
> library(MASS)
> mclapply(input, f, mc.cores = global_parallelism)

 *** caught segfault ***
address 0x560b2c134003, cause 'memory not mapped'
[[1]]
[1] "Error in as.vector(x, \"list\") : \n  cannot coerce type 'environment' to vector of type 'list'\n"
attr(,"class")
[1] "try-error"
attr(,"condition")
<simpleError in as.vector(x, "list"): cannot coerce type 'environment' to vector of type 'list'>

[[2]]
NULL

Warning messages:
1: In mclapply(input, f, mc.cores = global_parallelism) :
  scheduled core 2 did not deliver a result, all values of the job will be affected
2: In mclapply(input, f, mc.cores = global_parallelism) :
  scheduled core 1 encountered error in user code, all values of the job will be affected

When I'm doing similar with Python and ThreadPoolExcutor, the spark context is shared across threads, same for Scala and Java.

Is this possible to reuse sparklyr context in parallel execution in R?

1
Considering that R doesn't support multi-threading the answer is definitely negative. Nor it is very clear why one wanted to do that - separate apps are usually the way to go in such cases anyway.10465355
@10465355saysReinstateMonica Thanks! I wanted to avoid separate apps, because of the overhead on the start time and because I could benefit from caching (cache some global dataset once and use across jobs).Mariusz
I guess these are as good reason as any. It is technically possible to achieve these without any script level parallelism, but that's not something for the faint of heart. Realistically if context init time is an issue, then Spark is probably not the right choice. And achieving stability with multiple concurrent jobs within a single app is not a trivial thing either. As of caching - it is most of the time overrated.10465355
Actually stability with concurrent jobs in spark in trivial, I wrote a lot of pyspark jobs using this feature and they are super stable. Spark has it's own jobs scheduler built-in (it's FIFO), so with this method you use all the available executor cores as long as context is up.Mariusz

1 Answers

2
votes

Yeah, unfortunately, the sc object, which is of class spark_connection, cannot be exported to another R process (even if forked processing is used). If you use the future.apply package, part of the future ecosystem, you can see this if you use:

library(future.apply)
plan(multicore)

## Look for non-exportable objects and given an error if found
options(future.globals.onReference = "error")

y <- future_lapply(input, f)

That will throw:

Error: Detected a non-exportable reference (‘externalptr’) in one of the
globals (‘sc’ of class ‘spark_connection’) used in the future expression