Parallelism in R
Contents:
There are a few different libraries in R which can help parallelise code. Let's consider a new example of calculating the volume of an N-dimensional hypersphere with Monte Carlo:
sphere_volume <- function(steps, dimension) {
in_hypersphere <- 0
for (n in 1:steps) {
coordinates <- runif(dimension, min = -1, max = 1)
if (sum(coordinates ** 2) < 1) {
in_hypersphere <- in_hypersphere + 1
}
}
2 ** dimension * in_hypersphere / steps
}
This uses the idea that we can calculate the volume of a hypercube in N-dimensions with sides of length two by \( 2^N \). Inside that hypercube, we can fit a hypersphere with radius \(r = 1\). If we randomly generate points inside the hypercube, and count the proportion which are inside the hypersphere, we will get the ratio of the volume of the hypersphere to the hypercube, so can calculate the hypersphere volume with: \[ V_{\mathrm{sphere}} = V_{\mathrm{cube}} * \frac{\mathrm{draw}_\mathrm{inside}}{\mathrm{draw} _ \mathrm{total}} \] Making draws between -1 and 1 for each dimension will be within the cube. We can then use Pythagoras to determine whether draws are inside the sphere: \[ r^2 < x^2 + y^2 + z^2 + ...\]
See the wikipedia page on Monte Carlo integration for more info and drawings.
Exercise: We know the formulae for a 2D-sphere (circle) and 3D sphere. Run the above function and verify the result.
Using parallel::mclapply
The 2D example is an estimate of \( \pi \), but a slowly converging one. The error goes down only by \( \frac{1}{\sqrt N} \), so we need many samples to get a good estimate. Using \(10^7 \) samples takes around 10s, but only gets the first four digits correct.
These samples are totally independent, so they should be easy to parallelise, right? Let's first use a parallel version of lapply:
sphere_volume <- function(steps, dimension) {
in_hypersphere <- 0
for (n in 1:steps) {
coordinates <- runif(dimension, min = -1, max = 1)
if (sum(coordinates ** 2) < 1) {
in_hypersphere <- in_hypersphere + 1
}
}
2 ** dimension * in_hypersphere / steps
}
steps <- 10000000
dimension <- 2
system.time(sphere_volume(steps, dimension)) # 7.258s
cores <- 4
system.time(mean(unlist(parallel::mclapply(rep(10000000/cores, cores), sphere_volume, 2, mc.cores = cores))))
# 2.211s
# Speedup =~3.3x
Similar to writing rust code using iterables, it's often better to write R code by applying functions to lists or arrays. This is true even in serial R code, as it can vectorise operations. Here we divide the work (the draws) equally among the cores, making a list with the number of steps that each thread will compute. We can then calculate the average of the estimate from each core. Though, it may be more precise to return the number of draws inside the circle as an integer and sum these, then do the division once at the end.
What happens if you add set.seed(1)
in the function?
All of the estimates are the same, because the random draws are the same. We are using a pseudorandom number generator (RNG), which given the same starting state will generate the same sequence of numbers.
You can achieve the same by adding mc.set.seed = FALSE
to mclapply
, which shares
the same RNG state when the threads are forked. The default of TRUE
will set a seed based on the time and process ID,
which will likely give a different set of random numbers on each thread. If you want
your results to be reproducible, you'd want to manually set a consistent seed e.g. using the thread index
at the top of the sphere_volume
function.
But note, for very long streams (or if you are unlucky) these numbers may start to overlap! There are true parallel random number generators which solve this (rust has these). See Figure 3 of this paper for an illustration of this.
Using foreach and doParallel
We can also parallelise for loops more directly using the dopar
operator:
library(doParallel)
library(foreach)
cores <- 4
registerDoParallel(cores)
sphere_volume <- function(steps, dimension) {
in_hypersphere <- foreach (n=1:steps, .combine = '+') %dopar% {
coordinates <- runif(dimension, min = -1, max = 1)
if (sum(coordinates ** 2) <= 1) {
1
}
}
2 ** dimension * in_hypersphere / steps
}
steps <- 10000000
dimension <- 2
system.time(sphere_volume(steps, dimension)) # dnf after 30s
Here using a sum reduction to count the number of points in the circle. This doesn't parallelise particularly well at all (small amount of work on lots of threads), and didn't even finish for me.
Dividing the work up manually with a parallelised outer loop works a lot better:
library(doParallel)
library(foreach)
cores <- 4
registerDoParallel(cores)
sphere_volume <- function(steps, dimension, cores) {
# Outer loop parallelised over cores, sum results
total_count <- foreach (n=1:cores, .combine = '+') %dopar% {
set.seed(n)
in_hypersphere <- 0
# Inner loop run on each thread
for (i in 1:(steps/cores)) {
coordinates <- runif(dimension, min = -1, max = 1)
if (sum(coordinates ** 2) <= 1) {
in_hypersphere <- in_hypersphere + 1
}
}
in_hypersphere
}
2 ** dimension * total_count / steps
}
steps <- 10000000
dimension <- 2
system.time(sphere_volume(steps, dimension, cores)) # 2.05s
This gave a ~3.7x speedup, a bit better than mclapply
.
Parallel R using furrr
Finally, if you are a fan of tidyverse type R and functional programming with purrr
,
there is a parallel version called furrr
.
This gives you a parallel version of .map()
, which is similar to the rust iterators,
and looks quite similar to lapply
.
Here it is with the integration example:
library(furrr)
library(purrr)
# Various different ways to parallelise are supported
cores <- 4
plan(multisession, workers = cores)
sphere_volume <- function(steps, dimension) {
in_hypersphere <- 0
for (n in 1:steps) {
coordinates <- runif(dimension, min = -1, max = 1)
if (sum(coordinates ** 2) < 1) {
in_hypersphere <- in_hypersphere + 1
}
}
2 ** dimension * in_hypersphere / steps
}
steps <- 10000000
dimension <- 2
# future_map_dbl returns a vector of doubles (decimal numbers), which is applied
# the same way as the mclapply example above.
# future_<> is the purrr (parallel) version of functions from furrr
# We also set the seed in a similar way
system.time(mean(future_map_dbl(rep(steps/cores, cores),
sphere_volume, dimension,
.options=furrr_options(seed=TRUE))))
# 2.743s
This gives me a speedup of ~2.6x with multicore
, or ~3.1x with multisession
and cluster
.