Home

A Review of pbdR

Jason Cairns

2020-04-17

1 Introduction

pbdR is a collection of packages allowing for distributed computing with R[1]. The name is an acronym for the collection’s purpose; Programming with Big Data in R. It is introduced on it’s main page with the following description:

The “Programming with Big Data in R” project (pbdR) is a set of highly scalable R packages for distributed computing and profiling in data science. > Our packages include high performance, high-level interfaces to MPI, ZeroMQ, ScaLAPACK, NetCDF4, PAPI, and more. While these libraries shine brightest on large distributed systems, they also work rather well on small clusters and usually, surprisingly, even on a laptop with only two cores. > Winner of the Oak Ridge National Laboratory 2016 Significant Event Award for “Harnessing HPC Capability at OLCF with the R Language for Deep Data Science.” OLCF is the Oak Ridge Leadership Computing Facility, which currently includes Summit, the most powerful computer system in the world.[2]

2 Interface and Backend

The project seeks especially to serve minimal wrappers around the BLAS and LAPACK libraries along with their distributed derivatives, with the intention of introducing as little overhead as possible. Standard R also uses routines from the library for most matrix operations, but suffers from numerous inefficiencies relating to the structure of the language; for example, copies of all objects being manipulated will be typically be created, often having devastating performance aspects unless specific functions are used for linear algebra operations, as discussed in [3] (e.g., crossprod(X) instead of t(X) %*% X)

Distributed linear algebra operations in pbdR depend further on the ScaLAPACK library, which can be provided through the pbdSLAP package [4].

The principal interface for direct distributed computations is the pbdMPI package, which presents a simplified API to MPI through R [5]. All major MPI libraries are supported, but the project tends to make use of openMPI in explanatory documentation. A very important consideration that isn’t immediately clear is that pbdMPI can only be used in batch mode through MPI, rather than any interactive option as in Rmpi [6].

The actual manipulation of distributed matrices is enabled through the pbdDMAT package, which offers S4 classes encapsulating distributed matrices [7]. These are specialised for dense matrices through the ddmatrix class, though the project offers some support for other matrices. The ddmatrix class has nearly all of the standard matrix generics implemented for it, with nearly identical syntax for all.

3 Package Interaction

The packages can be made to interact directly, for example with pbdDMAT constructing and performing basic manipulations on distributed matrices, and pbdMPI being used to perform further fine-tuned processing through communicating results across nodes manually, taking advantage of the persistence of objects at nodes through MPI.

4 pbdR in Practice

The package is geared heavily towards matrix operations in a statistical programming language, so a test of it’s capabilities would quite reasonably involve statistical linear algebra. An example non-trivial routine is that of generating data, to test randomisation capability, then fitting a generalised linear model to the data through iteratively reweighted least squares. In this way, not only are the basic algebraic qualities considered, but communication over iteration on distributed objects is tested.

To work comparatively, a simple working local-only version of the algorithm is produced in lst. 1.

Listing 1: Local GLM with RWLS

set.seed(1234)
# Generate the data

n <- 1000
B <- matrix(c(1,3))
x0 <- rep(1, n)
x1 <- rnorm(n, 0, 1)
X <- cbind(x0, x1)
p <- 1 / (1 + exp(- X %*% B))
y <- rbinom(n, 1, p)

# Base comparison
#glm(y ~ x1, family = "binomial")

# RWLS as Newton-Raphson for GLM (logistic regression here)

logReg <- function(X, y, maxIter=80, tolerance=0.01){
    pr <- function(X, B){
        1 / (1 + exp(-X  %*% B))
    }
    ##
    weights <- function(X, B, y){
        diag(as.vector(pr(X, B)))
    }
    ##
    oldB <- matrix(c(Inf,Inf))
    newB <- matrix(c(0, 0))
    nIter <- 0
    while (colSums((newB - oldB)^2) > tolerance &&
           nIter < maxIter) {
        oldB <- newB
    ## N-R as RWLS
        W <- weights(X, oldB, y)
        hessian <- - t(X) %*% W %*% X
        z <- X %*% oldB + solve(W) %*% (y - pr(X, oldB))
        newB <- solve(-hessian) %*% crossprod(X, W %*% z)
    ##
        nIter <- nIter + 1
    }
    newB
}

print(logReg(X, y, tolerance=1E-6, maxIter=100))

It outputs a β̂ matrix after several seconds of computation.

Were pbdDMAT to function transparently as regular matrices, as the package README implies, then all that would be required to convert a local algorithm to distributed would be to prefix a dd to every matrix() call, and bracket the program with a template as per lst. 2.

Listing 2: Idealised Common Wrap for Local to Distributed Matrices

suppressMessages(library(pbdDMAT))
init.grid()

# program code with `dd` prefixed to every `matrix` call

finalize()

This is the form of transparency offered by packages such as parallel, foreach, or sparklyr in relation to dplyr. The program would then be written to disk, then executed, for example with the following:

mpirun -np <# of cores> Rscript <script name>

The program halts however, as forms of matrix creation other than through explicit matrix() calls are not necessarily picked up by that process; cbind requires a second formation of a ddmatrix. The first issue comes when performing conditional evaluation; predicates involving distributed matrices are themselves distributed matrices, and can’t be mixed in logical evaluation with local predicates.

Turning local predicates to distributed matrices, then converting them all back to a local matrix for the loop to understand, finally results in a program run, however the results are still not accurate. This is due to diag()<- assignment not having been implemented, so several further changes are necessary, including specifying return type of the diag matrix as a replacement. The final working code of pbdDMAT GLM through RWLS is given in lst. 3, with the code diff given in lst. 4. Execution time was longer for the pbdR code on a dual-core laptop, however it is likely faster over a cluster.

Listing 3: pbdDMAT GLM with RWLS

suppressMessages(library(pbdDMAT))
init.grid()

set.seed(1234)
# Generate the data

n <- 1000
B <- ddmatrix(c(1,3))
x0 <- rep(1, n)
x1 <- rnorm(n, 0, 1)
X <- as.ddmatrix(cbind(x0, x1))
p <- 1 / (1 + exp(- X %*% B))
y <- ddmatrix(rbinom(n, 1, as.vector(p)))

# Base comparison
#glm(y ~ x1, family = "binomial")

# RWLS as Newton-Raphson for GLM (logistic regression here)

logReg <- function(X, y, maxIter=80, tolerance=0.01){
    pr <- function(X, B){
        1 / (1 + exp(-X  %*% B))
    }
    ##
    weights <- function(X, B, y){
        diag(as.vector(pr(X, B)), type="ddmatrix")
    }
    ##
    oldB <- ddmatrix(c(Inf,Inf))
    newB <- ddmatrix(c(0, 0))
    nIter <- ddmatrix(0)
    maxIter <- as.ddmatrix(maxIter)
    while (as.matrix(colSums((newB - oldB)^2) > tolerance &
           nIter < maxIter)) {
        oldB <- newB
    ## N-R as RWLS
        W <- weights(X, oldB, y)
        hessian <- - t(X) %*% W %*% X
        z <- X %*% oldB + solve(W) %*% (y - pr(X, oldB))
        newB <- solve(-hessian) %*% crossprod(X, W %*% z)
    ##
        nIter <- nIter + 1
    }
    newB
}

print(logReg(X, y, tolerance=1E-6, maxIter=100))

finalize()

Listing 4: Diff between local and pbdR code

0a1,3
> suppressMessages(library(pbdDMAT))
> init.grid()
> 
5c8
< B <- matrix(c(1,3))
---
> B <- ddmatrix(c(1,3))
8c11
< X <- cbind(x0, x1)
---
> X <- as.ddmatrix(cbind(x0, x1))
10c13
< y <- rbinom(n, 1, p)
---
> y <- ddmatrix(rbinom(n, 1, as.vector(p)))
23c26
<       diag(as.vector(pr(X, B)))
---
>       diag(as.vector(pr(X, B)), type="ddmatrix")
26,30c29,34
<   oldB <- matrix(c(Inf,Inf))
<   newB <- matrix(c(0, 0))
<   nIter <- 0
<   while (colSums((newB - oldB)^2) > tolerance &&
<          nIter < maxIter) {
---
>   oldB <- ddmatrix(c(Inf,Inf))
>   newB <- ddmatrix(c(0, 0))
>   nIter <- ddmatrix(0)
>   maxIter <- as.ddmatrix(maxIter)
>   while (as.matrix(colSums((newB - oldB)^2) > tolerance &
>          nIter < maxIter)) {
43a48,49
> 
> finalize()

It is worth noting that options for optimisation and tuning are far more extensive than those utilised in this example, including the capacity to set grid parameters, blocking factors, and BLACS contexts, among others.

5 Setup

The setup for pbdR is simple, being no more than a CRAN installation, but a well tuned environment, which is the main purpose for using the package in the first place, requires BLAS, LAPACK and derivatives, a parallel file system with data in an appropriate format such as HDF5, and a standard MPI library. Much of the pain of setup is ameliorated with a docker container, provided by the project.

[1]
D. Schmidt, W.-C. Chen, S. L. de la Chapelle, G. Ostrouchov, and P. Patel, pbdBASE: pbdR base wrappers for distributed matrices.” 2020.
[2]
G. Ostrouchov, W.-C. Chen, D. Schmidt, and P. Patel, “Programming with big data in r.” 2012.
[3]
D. Schmidt, W.-C. Chen, M. A. Matheson, and G. Ostrouchov, “Programming with BIG data in r: Scaling analytics from one to thousands of nodes,” Big Data Research, vol. 8, pp. 1–11, 2017.
[4]
W.-C. Chen, D. Schmidt, G. Ostrouchov, and P. Patel, pbdSLAP: Programming with big data – scalable linear algebra packages.” 2012.
[5]
W.-C. Chen, G. Ostrouchov, D. Schmidt, P. Patel, and H. Yu, pbdMPI: Programming with big data – interface to MPI.” 2012.
[6]
H. Yu, “Rmpi: Parallel statistical computing in r,” R News, vol. 2, no. 2, pp. 10–14, 2002.
[7]
D. Schmidt, W.-C. Chen, S. L. de la Chapelle, G. Ostrouchov, and P. Patel, pbdDMAT: pbdR distributed matrix methods.” 2020.