Home

Literature Review

Jason Cairns

2021-02-17

1 Prose Form

1.1 Introduction

Statistics is concerned with the analysis of datasets, which are continually growing bigger, and at a faster rate; the global datasphere is expected to grow from 33 zettabytes in 2018 to 175 zettabytes by 2025[1].

The scale of this growth is staggering, and continues to outpace attempts to engage meaningfully with such large datasets. By one measure, information storage capacity has grown at a compound annual rate of 23% per capita over recent decades[2]. In spite of such massive growths in storage capacity, they are far outstripped by computational capacity over time [3]. Specifically, the number of components comprising an integrated circuit for computer processing have been exponentially increasing, with an additional exponential decrease in their cost[4]. This observation, known as Moore’s Law, has been the root cause for much of computational advancement over the past half-century. The corresponding law for computer storage posits increase in bit density of storage media along with corresponding decreases in price, which has been found to track lower than expected by Moore’s law metrics. Such differentials, between the generation of data, computational capacity for data processing, and constraints on data storage, have forced new techniques in computing for the analysis of large-scale data.

The architecture of a computer further constrains the required approach for analysis of big data. Most general-purpose PC’s are modelled by a random-access stored-program machine, wherein a program and data are stored in registers, and data must move in and out of registers to a processing element, most commonly a Central Processing Unit (CPU). The movement takes at least one cycle of a computer’s clock, thereby leading to larger processing time for larger data.

Reality dictates many different forms of data storage, with a Memory Hierarchy ranking different forms of computer storage based on their response times[5]. The volatility of memory (whether or not it persists with no power) and the expense of faster storage forms dictates the design of commodity computers. An example of a standard build is given by the Dell Optiplex 5080, with 16Gb of Random Access Memory (RAM) for fast main memory, to be used as a program data store; and a 256Gb Solid State Drive (SSD) for slow long-term disk storage[6]. For reasonable speed when accessing data, a program would prioritise main memory over disk storage - something not always possible when dataset size exceeds memory capacity, larger-than-memory datasets being a central issue in big data. A program that is primarily slowed by data movement is described as I/O-bound, or memory-bound. Much of the issue in modelling large data is the I/O-bound nature of much statistical computation.

The complement to I/O-bound computation is computation-bound, wherein the speed (or lack thereof) is determined primarily through the performance of the processing unit. This is less significant in large-scale applications than memory-bound, but remains an important design consideration when the number of computations scale with the dataset size in any nontrivial algorithm with greater than 𝒪(1) complexity.

The solution to both memory- and computation-bound problems has largely been that of using more hardware; more memory, and more CPU cores. Even with this in place, more complex software is required to manage the more complex systems. As an example, with additional CPU cores, constructs such as multithreading are used to perform processing across multiple CPU cores simultaneously (in parallel).

The means for writing software for large-scale data is typically through the use of a structured, high-level programming language. Of the myriad programming languages, the most widespread language used for statistics is R. As of 2021, R increased in popularity to rank 9th in the TIOBE index. R also has a special relevance for this proposal, having been initially developed at the University of Auckland by Ross Ihaka and Robert Gentleman in 1991[7].

Major developments in contemporary statistical computing are typically published alongside R code implementation, usually in the form of an R package, which is a mechanism for extending R and sharing functions. As of March 2021, the Comprehensive R Archive Network (CRAN) hosts over 17000 available packages[8]. Several of these packages are oriented towards managing large datasets, and will be assessed in secs. 1.3, 2 below. This project aims to develop an R package that provides a means for writing software to analyse very large data on clusters consisting of multiple general-purpose computers.

1.2 Parallelism as a Strategy

The central strategy for manipulating large datasets, from which most other patterns derive, is parallelisation. To parallelise is to engage in many computations simultaneously - this typically takes the form of either task parallelism, wherein tasks are distributed across different processors; data parallelism, where the same task operates on different pieces of data across different processors; or some mix of the two.

Parallelisation of a computational process can potentially offer speedups proportional to the number of processors available to take on work, and with recent improvements in multiprocessor hardware, the number of processors available is increasing over time. Most general-purpose personal computers produced in the past 5 years have multiple processor cores to enable parallel computation.

Parallelism can afford major speedups, albeit with certain limitations. Amdahl’s law, formulated in 1967, aims to capture the speedup limitations, with a model derived from the argument given below in [eq. 1][9], [10]:

$$ \textrm{Speedup} = \frac{1}{s+\frac{p}{N}} \qquad(1)$$

Where,

The implication is that speedup of an entire task when parallelised is granted only through the portion of the task that is otherwise constrained by singular system resources, at the proportion of execution time spent in that task. Thus a measure of skepticism is contained in Amdahl’s argument, with many tasks predicted to show no benefit to parallelise - and in reality, some likely to slow down with increased overhead given in parallelisation.

The major response to the skepticism of Amdahl’s law is given by Gustafson’s law, generated from timing results in a highly parallelised system. Gustafson’s law presents a scaled speedup as per eq. 2

Scaled speedup = s′ + pN = N + (1−N)s′   (2)

Where,

This law implies far higher potential parallel speedup, varying linearly with the number of processors.

An example of an ideal task for parallelisation is the category of embarassingly parallel workload. Such a problem is one where the separation into parallel tasks is trivial, such as performing the same operation over a dataset independently[11]. Many problems in statistics fall into this category, such as tabulation, monte-carlo simulation and many matrix manipulation tasks.

1.3 Local Solutions

While not specifically engaging with larger-than-memory data, a number of packages take advantage of various parallel strategies in order to process large datasets efficiently. multicore is one such package, now subsumed into the parallel package, that grants functions that can make direct use of multiprocessor systems, thereby reducing the processing time in proportionality to the number of processors available on the system.

data.table also makes use of multi-processor systems, with many operations involving threading in order to rapidly perform operations on it’s dataframe equivalent, the data.table.

In spite of all of these potential solutions, a major constraint remains in that only a single machine is used. As long as there is only one machine available, bottlenecks form and no redundancy protection is offered in real-time in the event of a crash or power outage.

The first steps typically taken to manage larger-than-memory data is to shift part of the data into secondary storage, which generally possesses significantly more space than main memory.

This is the approach taken by the disk.frame package, developed by Dai ZJ. disk.frame provides an eponymously named dataframe replacement class, which is able to represent a dataset far larger than RAM, constrained now only by disk size[12].

The mechanism of disk.frame is introduced on it’s homepage with the following explanation:

{disk.frame} works by breaking large datasets into smaller individual chunks and storing the chunks in fst files inside a folder. Each chunk is a fst file containing a data.frame/data.table. One can construct the original large dataset by loading all the chunks into RAM and row-bind all the chunks into one large data.frame. Of course, in practice this isn’t always possible; hence why we store them as smaller individual chunks. {disk.frame} makes it easy to manipulate the underlying chunks by implementing dplyr functions/verbs and other convenient functions (e.g. the cmap(a.disk.frame, fn, lazy = F) function which applies the function fn to each chunk of a.disk.frame in parallel). So that {disk.frame} can be manipulated in a similar fashion to in-memory data.frames.

It works through two main principles: chunking, and an array of methods taking advantage of data.frame generics, including dplyr and data.table functions.

Another component that isn’t mentioned in the explanation, but is crucial to performance, is the parallelisation offered transparently by the package.

disk.frames are actually references to numbered fst files in a folder, with each file serving as a chunk.

This is made use of through manipulation of each chunk separately, sparing RAM from dealing with a single monolithic file[13].

Fst is a means of serialising dataframes, as an alternative to RDS files[14].

It makes use of an extremely fast compression algorithm developed at facebook.

Functions are usually mapped over chunks using some functional, but more complex functions such as those implementing a glm require custom solutions; as an example the direct modelling function of dfglm(){.R} is implemented to allow for fitting glms to the data.

From inspection of the source code, the function is a utility wrapper for streaming disk.frame data by default into bigglm, a biglm derivative.

For grouped or aggregated functions, there is more complexity involved, due to the chunked nature of disk.frame.

When functions are applied, they are by default applied to each chunk.

If groups don’t correspond injectively to chunks, then the syntactic chunk-wise summaries and their derivatives may not correspond to the semantic group-wise summaries expected.

For example, summarising the median is performed by using a median-of-medians method; finding the overall median of all chunks’ respective medians.

Therefore, computing grouped medians in disk.frame result in estimates only — this is also true of other software, such as spark, as noted in [15].

For parallelisation, future is used as the backend package, with most function mappings on chunks making use of future::future_lapply(){.R} to have each chunk mapped with the intended function in parallel.

future is initialised with access to cores through the wrapper function, setup_disk.frame(){.R} [16].

This sets up the correct number of workers, with the minimum of workers and chunks being processed in parallel.

An important aspect to parallelisation through future is that, for purposes of cross-platform compatibility, new R processes are started for each worker[17].

Each process will possess it’s own environment, and disk.frame makes use of future’s detection capabilities to capture external variables referred to in calls, and send them to each worker.

The strategy taken by disk.frame has several inherent limitations, however. disk.frame allows only embarassingly parallel operations for custom operations as part of a split-apply-combine (MapReduce) pattern.

While there may theoretically be future provision for non-embarrassingly parallel operations, a significant limitation to real-time operation is the massive slowdown brought by the data movement from disk to RAM and back.

2 Distributed Computing as a Strategy

The specs of a single contemporary commodity computer are higher than those that were used in the Apollo lunar landing, yet the management of large datasets still creates major issues, driven by a simple lack of capacity to hold them in memory. Supercomputers can surmount this by holding orders of magnitude higher memory, though only a few organisations or individuals can bear the financial costs of purchasing and maintaining a supercomputer.

In a similar form, cloud computing is not a universal solution, owing to expense, security issues, and data transportation problems. Despite this, systems rivalling supercomputers can be formed through combining many commodity computers. An amusing illustration of this was given in 2004, when a flash mob connected hundreds of laptops to attempt running the linpack benchmark, achieving 180 gigaflops in processing output[18].

The combination of multiple independent computers to form one cohesive computing system forms part of what is known as distributed computing. More serious efforts to connect multiple commodity computers into a larger computational system is now standard, with software such as Hadoop and Spark being commonplace in large companies for the purpose of creating distributed systems.

Distributed systems make possible the real-time manipulation of datasets larger than a single computer’s RAM, by splitting up data and holding it in the RAM of multiple computers. A factor strongly serving in favour of distributed computing is that commodity hardware exists in large quantities in most offices, oftentimes completely unused. This means that many organisations already have the necessary base infrastructure to create a distributed system, likely only requiring some software and configuration to set it all up. Beyond the benefit of pre-existing infrastructure, a major feature commonly offered by distributed systems, and lacking in high-powered single computer systems, is that of fault tolerance - when one computer goes down, as does happen, another computer in the system had redundant copies of much of the information of the crashed computer, and computation can resume with very little inconvenience. A single computer, even very high-powered, doesn’t usually offer fault-tolerance to this degree.

All of the packages examined the above sec. 1.3 have no immediate capability to create a distributed system, and have all of the ease-of-use benefits and all of the drawbacks as discussed.

3 Distributed Large-Scale Computing

R does have some well-established packages used for distributed large-scale computing. Of these, the parallel package is contained in the standard R image, and encapsulates SNOW (Simple Network Of Workstations), which provides support for distributed computing over a simple network of compputers. The general architecture of SNOW makes use of a master process that holds the data and launches the cluster, pushing the data to worker processes that operate upon it and return the results to the master. SNOW makes use of several different communications mechanisms, including sockets or the greater MPI distributed computing library. Some shortcomings of the described architecture is the difficulty of persisting data, meaning the expense of data transportation every time operations are requested by the master process. In addition, as the data must originate from the master (barring generated data etc.), the master’s memory size serves as a bottleneck for the whole system.

The pbdR (programming with big data in R) project provides persistent data, with the pbdDMAT (programming with big data Distributed MATrices) package offering a user-friendly distributed matrix class to program with over a distributed system. It is introduced on it’s main page with the following description:

The “Programming with Big Data in R” project (pbdR) is a set of highly scalable R packages for distributed computing and profiling in data science. Our packages include high performance, high-level interfaces to MPI, ZeroMQ, ScaLAPACK, NetCDF4, PAPI, and more. While these libraries shine brightest on large distributed systems, they also work rather well on small clusters and usually, surprisingly, even on a laptop with only two cores. Winner of the Oak Ridge National Laboratory 2016 Significant Event Award for “Harnessing HPC Capability at OLCF with the R Language for Deep Data Science.” OLCF is the Oak Ridge Leadership Computing Facility, which currently includes Summit, the most powerful computer system in the world.[19]

The project seeks especially to serve minimal wrappers around the BLAS and LAPACK libraries along with their distributed derivatives, with the intention of introducing as little overhead as possible. Standard R also uses routines from the library for most matrix operations, but suffers from numerous inefficiencies relating to the structure of the language; for example, copies of all objects being manipulated will be typically be created, often having devastating performance aspects unless specific functions are used for linear algebra operations, as discussed in [20] (e.g., crossprod(X) instead of t(X) %*% X)

Distributed linear algebra operations in pbdR depend further on the ScaLAPACK library, which can be provided through the pbdSLAP package [21]. The principal interface for direct distributed computations is the pbdMPI package, which presents a simplified API to MPI through R [22]. All major MPI libraries are supported, but the project tends to make use of openMPI in explanatory documentation. A very important consideration that isn’t immediately clear is that pbdMPI can only be used in batch mode through MPI, rather than any interactive option as in Rmpi [23].

The actual manipulation of distributed matrices is enabled through the pbdDMAT package, which offers S4 classes encapsulating distributed matrices [24]. These are specialised for dense matrices through the ddmatrix class, though the project offers some support for other matrices. The ddmatrix class has nearly all of the standard matrix generics implemented for it, with nearly identical syntax for all.

The package is geared heavily towards matrix operations in a statistical programming language, so a test of it’s capabilities would quite reasonably involve statistical linear algebra. An example non-trivial routine is that of generating data, to test randomisation capability, then fitting a generalised linear model to the data through iteratively reweighted least squares. In this way, not only are the basic algebraic qualities considered, but communication over iteration on distributed objects is tested.

To work comparatively, a simple working local-only version of the algorithm is produced in listing lst. 1.

Listing 1: Local GLM with RWLS

set.seed(1234)
# Generate the data

n <- 1000
B <- matrix(c(1,3))
x0 <- rep(1, n)
x1 <- rnorm(n, 0, 1)
X <- cbind(x0, x1)
p <- 1 / (1 + exp(- X %*% B))
y <- rbinom(n, 1, p)

# Base comparison
#glm(y ~ x1, family = "binomial")

# RWLS as Newton-Raphson for GLM (logistic regression here)

logReg <- function(X, y, maxIter=80, tolerance=0.01){
    pr <- function(X, B){
        1 / (1 + exp(-X  %*% B))
    }
    ##
    weights <- function(X, B, y){
        diag(as.vector(pr(X, B)))
    }
    ##
    oldB <- matrix(c(Inf,Inf))
    newB <- matrix(c(0, 0))
    nIter <- 0
    while (colSums((newB - oldB)^2) > tolerance &&
           nIter < maxIter) {
        oldB <- newB
    ## N-R as RWLS
        W <- weights(X, oldB, y)
        hessian <- - t(X) %*% W %*% X
        z <- X %*% oldB + solve(W) %*% (y - pr(X, oldB))
        newB <- solve(-hessian) %*% crossprod(X, W %*% z)
    ##
        nIter <- nIter + 1
    }
    newB
}

print(logReg(X, y, tolerance=1E-6, maxIter=100))

It outputs a β̂ matrix after several seconds of computation.

Were pbdDMAT matrices to function perfectly transparently as regular matrices, then all that would be required to convert a local algorithm to distributed would be to prefix a dd to every matrix call, and bracket the program with a template as per listing lst. 2.

Listing 2: Idealised Common Wrap for Local to Distributed Matrices

suppressMessages(library(pbdDMAT))
init.grid()

# program code with `dd` prefixed to every `matrix` call

finalize()

The program halts however, as forms of matrix creation other than through explicit matrix(){.R} calls are not necessarily picked up by that process; cbind requires a second formation of a ddmatrix.

The first issue comes when performing conditional evaluation; predicates involving distributed matrices are themselves distributed matrices, and can’t be mixed in logical evaluation with local predicates.

Turning local predicates to distributed matrices, then converting them all back to a local matrix for the loop to understand, finally results in a program run, however the results are still not accurate.

This is due to diag()<- assignment not having been implemented, so several further changes are necessary, including specifying return type of the diag matrix as a replacement.

This serves to outline the difficulty of complete distributed transparency.

The final working code of pbdDMAT GLM through RWLS is given in listing lst. 3

Listing 3: pbdDMAT GLM with RWLS

suppressMessages(library(pbdDMAT))
init.grid()

set.seed(1234)
# Generate the data

n <- 1000
B <- ddmatrix(c(1,3))
x0 <- rep(1, n)
x1 <- rnorm(n, 0, 1)
X <- as.ddmatrix(cbind(x0, x1))
p <- 1 / (1 + exp(- X %*% B))
y <- ddmatrix(rbinom(n, 1, as.vector(p)))

# Base comparison
#glm(y ~ x1, family = "binomial")

# RWLS as Newton-Raphson for GLM (logistic regression here)

logReg <- function(X, y, maxIter=80, tolerance=0.01){
    pr <- function(X, B){
        1 / (1 + exp(-X  %*% B))
    }
    ##
    weights <- function(X, B, y){
        diag(as.vector(pr(X, B)), type="ddmatrix")
    }
    ##
    oldB <- ddmatrix(c(Inf,Inf))
    newB <- ddmatrix(c(0, 0))
    nIter <- ddmatrix(0)
    maxIter <- as.ddmatrix(maxIter)
    while (as.matrix(colSums((newB - oldB)^2) > tolerance &
           nIter < maxIter)) {
        oldB <- newB
    ## N-R as RWLS
        W <- weights(X, oldB, y)
        hessian <- - t(X) %*% W %*% X
        z <- X %*% oldB + solve(W) %*% (y - pr(X, oldB))
        newB <- solve(-hessian) %*% crossprod(X, W %*% z)
    ##
        nIter <- nIter + 1
    }
    newB
}

print(logReg(X, y, tolerance=1E-6, maxIter=100))

finalize()

Decidedly more user-friendly is the sparklyr package, which meshes dplyr syntax with a Spark backend. Simple analyses are made very simple (assuming a well-configured and already running Spark instance), but custom iterative models are extremely difficult to create through the package in spite of Spark’s support for it.

Given that iteration is cited by a principal author of Spark as a motivating factor in it’s development when compared to Hadoop, it is reasonable to consider whether the most popular R interface to Spark, sparklyr, has support for iteration[[25]][26]. One immediate hesitation to the suitability of sparklyr to iteration is the syntactic rooting in dplyr; dplyr is a “Grammar of Data Manipulation” and part of the tidyverse, which in turn is an ecosystem of packages with a shared philosophy[[27]][28]. The promoted paradigm is functional in nature, with iteration using for loops in R being described as “not as important” as in other languages; map functions from the tidyverse purrr package are instead promoted as providing greater abstraction and taking much less time to solve iteration problems. Maps do provide a simple abstraction for function application over elements in a collection, similar to internal iterators, however they offer no control over the form of traversal, and most importantly, lack mutable state between iterations that standard loops or generators allow[29].

A common functional strategy for handling a changing state is to make use of recursion, with tail-recursive functions specifically referred to as a form of iteration in [30]. Reliance on recursion for iteration is naively non-optimal in R however, as it lacks tail-call elimination and call stack optimisations[31]; at present the elements for efficient, idiomatic functional iteration are not present in R, given that it is not as functional a language as the tidyverse philosophy considers it to be, and sparklyr’s attachment to the the ecosystem prevents a cohesive model of iteration until said elements are in place.

Iteration takes place in Spark through caching results in memory, allowing faster access speed and decreased data movement than MapReduce[25]. sparklyr can use this functionality through the tbl_cache() function to cache Spark dataframes in memory, as well as caching upon import with memory=TRUE as a formal parameter to sdf_copy_to(). Iteration can also make use of persisting Spark Dataframes to memory, forcing evaluation then caching; performed in sparklyr through sdf_persist().

An important aspect of consideration is that sparklyr methods for dplyr generics execute through a translation of the formal parameters to Spark SQL. This is particularly relevant in that separate Spark Data Frames can’t be accessed together as in a multivariable function. In addition, very R-specific functions such as those from the stats and matrix core libraries are not able to be evaluated, as there is no Spark SQL cognate for them.

Canned models are the only option for most users, due to sparklyr’s reliance on Spark SQL rather than the Spark core API made available through the official SparkR interface.

sparklyr is excellent when used for what it is designed for. Iteration, in the form of an iterated function, does not appear to be part of this design.

Furthermore, all references to “iteration” in the primary sparklyr literature refer either to the iteration inherent in the inbuilt Spark ML functions, or the “wrangle-visualise-model” process popularised by Hadley Wickham[28], [32]. None of such references connect with iterated functions.

3.1 Other Systems

In the search for a distributed system for statistics, the world outside of R is not entirely barren. The central issue with non-R distributed systems is that their focus is very obviously not statistics, and this shows in the level of support the platforms provide for statistical purposes.

The classical distributed system for high-performance computing is MPI. R actually has a high-level interface to MPI through the rmpi package. This package is excellent, but extremely low-level, offering little more than wrappers around MPI functions. For the statistician who just wants to implement a model for a large dataset, such concern with minutiae is prohibitive.

Hadoop and Spark are two closely related systems which were mentioned earlier.

Apache Hadoop is a collection of utilities that facilitates cluster computing.

Jobs can be sent for parallel processing on the cluster directly to the utilities using .jar files, “streamed” using any executable file, or accessed through language-specific APIs.

The project began in 2006, by Doug Cutting, a Yahoo employee, and Mike Cafarella.

The inspiration for the project was a paper from Google describing the Google File System (described in [33]), which was followed by another Google paper detailing the MapReduce programming model, [34].

Hadoop consists of a file-store component, known as Hadoop Distributed File System (HDFS), and a processing component, known as MapReduce.

In operation, Hadoop splits files into blocks, then distributes them across nodes in a cluster (HDFS), where they are then processed by the node in parallel (MapReduce). This creates the advantage of data locality, wherein data is processed by the node they exist in.

Hadoop has seen extensive industrial use as the premier big data platform upon it’s release.

In recent years it has been overshadowed by Spark, due to the greater speed gains offered by Spark for many problem sets.

Spark was developed with the shortcomings of Hadoop in mind; Much of it’s definition is in relation to Hadoop, which it intended to improve upon in terms of speed and usability for certain tasks[25].

It’s fundamental operating concept is the Resiliant Distributed Dataset (RDD), which is immutable, and generated through external data, as well as actions and transformations on prior RDD’s.

The RDD interface is exposed through an API in various languages, including R, however it appears to be abandoned to some degree, having removed from the CRAN repository at 2020-07-10 due to failing checks.

Spark requires a distributed storage system, as well as a cluster manager; both can be provided by Hadoop, among others.

Spark is known for possessing a fairly user-friendly API, intended to improve upon the MapReduce interface.

Another major selling point for Spark is the libraries available that have pre-made functions for RDD’s, including many iterative algorithms.

The availability of broadcast variables and accumulators allow for custom iterative programming.

Spark has seen major use since it’s introduction, with effectively all major big data companies having some use of Spark.

In the python world, the closest match to a high-level distributed system that could have statistical application is given by the python library dask[35]. dask offers dynamic task scheduling through a central task graph, as well as a set of classes that encapsulate standard data manipulation structures such as NumPy arrays and Pandas dataframes.

The main difference is that the dask classes take advantage of the task scheduling, including online persistence across multiple nodes. dask is a large and mature library, catering to many use-cases, and exists largely in the Pythonic “Machine Learning” culture in comparison to the R “Statistics” culture. Accordingly, the focus is more tuned to the Python software developer putting existing ML models into a large-scale capacity. Of all the distributed systems assessed so far, dask comes the closest to what an ideal platform would look like for a statistician, but it misses out on the statistical ecosystem of R, provides only a few select classes, and is tied entirely to the structure of the task graph.

4 A Survey of Large-Scale Platform Features

4.1 Introduction

To guide the development of the platform, desirable features are drawn from existing platforms; inferred as logical extensions; and arrived at through identification of needs. Some features are mutually exclusive, others are suggestive of each other, but are worth considering and contrasting their merits.

4.2 Feature List

A list of features and their descriptions follows:

Distributed Computation
The ability to spread computation and data over separate computers. The value of distributed computing is well recognised for large-scale computing, in the increased capacity for processing, memory, and storage. Distributed computing typically gains latency speedup through parallel processing; both Amdahl’s law and Gustafson’s law give theoretical speedups for parallel jobs [9], [10]. In addition, each node typically adds more working memory to the distributed system, allowing for larger datasets to be manipulated in-memory. For exceedingly large datasets, the benefits of distributed file systems commonly allow for resiliant storage, with well-regarded examples including HDFS and the Google File System it is based upon [33], [36].
Evaluation of User-Specified Code
The ability to make use of user-specified code in processing. Most R packages for large-scale computing interact well with arbitrary code, however they typically have some limitations, such as an inability to recognise global variables, as is the case with sparklyr and to a lesser extent future [37], [38].
Native Support for Iteration
The ability to process user-specified code involving iteration over the whole dataset natively, keeping results in memory between iterations. This reflects the inherently iterative nature of many statistical algorithms. Furthermore, this shouldn’t initiate a new job or process for every new iteration. This is seen as important enough that it serves as a major motivating factor behind Spark’s development, overcoming a perceived major deficiency of Hadoop by Spark’s developers [25].
Object Persistence at Nodes
The ability to retain objects in-memory at their point of processing. The standard motivation for such a feature revolves around a reduction in data movement, which serves to slow down processing enormously through forcing programs to be I/O bound. In-memory persistence is closely related to the capacity for iterative code evaluation in a distributed system, and was similarly referenced by the Spark developers as an apparent benefit of Spark[25].
Support for Distributed File Systems
Capacity to work with data and computation on distributed file systems, with a particular target of Hadoop Distributed File System (HDFS). As a well-established distributed file system, HDFS is targeted by a number of R packages, as well as serving as a file system base for other platforms such as spark [39][42]. HDFS offers several features that make it particularly attractive as a filesystem for a large-scale statistical analysis; being distributed and capable of running on commodity hardware allows for truly big data analysis. In addition, the system is built to be resiliant to hardware failure, so long-running analyses aren’t cut short or forced to revert to a checkpoint because of singular component failure [36].
Ease of Setup
Is setup suitable for a computationally-focussed statistician, or does it require a system administrator? At it’s base, R is a statistical programming language [43]. The particular skills of statisticians seldom correspond to the those requisite of system administration, with such a focus unlikely to compete successfully with their main research. Ease of deployment can determine a platform’s success, with such a feature being one of the many motivations for the use and development of tools such as docker in recent years. The easiest possible setup would be a regular install.packages(), with no more than several lines specifying the platform configuration.
Inter-Node Communication
Can any pair of nodes communicate with each other, or do they only report to a master node? While many tasks process efficiently within a standard master-slave architecture, and inter-node communication is inherently expensive, there is still a large class of tasks that benefit from inter-node communication[44]; particularly graph-based statistical methods.
Interactive Usage
The ability to make use of the package in an interactive R session, without mandatory batch execution. A major benefit of R as being interpreted is the availability of the REPL. The benefits of interactivity stemming from a REPL are well-documented, most notably aiding debugging [45]. For statistical analysese in particular, interactive analyses play a major role in exploratory data analysis, wherein insights can be tested and arrived at rapidly with an interactive session.
Backend Decoupling
The implementation is maintained entirely separately to the interface. This is standard in most of the performant parallel R systems as described by [46], including foreach as a key example[38]. As a software pattern, this is a case of separation of concerns, described in detail by [47]. Such a pattern fosters modularity and allows for a broader range of backends to be made use of, maximising the uptake of the platform. The ability for a system to adhere to a similar interface despite changes in internal behaviour is additionally useful for the sake of referential transparency, which prevents the need to rewrite programs upon making changes, as well as for human-computer interaction considerations [48], [49]. For example, the foreach package can change parallel adaptors in a single line of setup, without needing any changes made in the code referencing future, despite making use of a different internal interface [50].
Evaluation of Arbitrary Classes
Any class, including user-defined classes, can be used in evaluation. There is proven value in rich user-defined objects, with the weight of much of the object-oriented programming paradigm serving to further that point [51]. Conversely, many major packages limit themselves through provisioning only a few classes, such as pbdDMAT with distributed matrices, or the tidyverse and it’s derivatives including sparklyr with “tibbles” [24], [27]
Package-specific API
The platform is primarily explicitly programmed against at a package-specific interface. This is in contrast to packages mostly providing methods which overload standard generics or language structure; at a loss of general transparency, direct API’s can ensure greater encapsulation and a closer mapping of code with the underlying implementation, thus potentially resulting in performance gains [52]. An example in R is the interface to the foreach package not overloading the existing for-loop syntax in R, but defining it’s own specific interface [38].
Methods for Standard Generics
The platform is primarily programmed against using a polymorphic interface, with the package methods taking advantage of common generics. pbdDMAT takes this approach, as well as bigmemory, in providing matrix-like classes which are operated upon using standard matrix generics [24], [53].
Methods for dplyr Generics
The platform makes use of dplyr functions as the primary set of generics to program over. Using a dplyr interface is a common trend in several R packages including sparklyr, disk.frame, and many database interfaces [12], [26]. Such an interface is claimed by the dplyr creators to aid beginners through being simple to remember [27]. In this way, it may serve to ease the learning curve for the platform.

4.3 Comparison Table

Feature RHadoop sparklyr pbdR disk.frame foreach
Distributed Computation yes1 yes2 yes3 no yes4
Evaluation of User-Specified Code yes5 mostly6 mostly7 some8 mostly9
Native Support for Iteration no no10 yes no[^3x4] no11
Object Persistence at Nodes no yes12 yes NA no
Support for Distributed File Systems yes13 yes14 no no yes15
Ease of Setup mediocre16 acceptable17 acceptable18 simple simple
Inter-Node Communication no no yes19 NA no
Interactive Usage yes yes no yes yes
Backend Decoupling no20 no21 no22 no yes
Evaluation of Arbitrary Classes yes23 some24 yes25 no yes26
Package-specific API yes27 yes yes28 no yes
Methods for Standard Generics no no some29 no no
Methods for dplyr Generics no30 yes31 no yes no

5 A Survey of Distributed Computing Systems

5.1 Hadoop

Apache Hadoop is a collection of utilities that facilitates cluster computing. Jobs can be sent for parallel processing on the cluster directly to the utilities using .jar files, “streamed” using any executable file, or accessed through language-specific APIs.

The project began in 2006, by Doug Cutting, a Yahoo employee, and Mike Cafarella. The inspiration for the project was a paper from Google describing the Google File System (described in [33]), which was followed by another Google paper detailing the MapReduce programming model, [34].

Hadoop consists of a memory part, known as Hadoop Distributed File System (HDFS), described in subsection~sec. 5.1.1, and a processing part, known as MapReduce, described in subsectionsec. 5.1.2.

In operation, Hadoop splits files into blocks, then distributes them across nodes in a cluster, where they are then processed by the node. This creates the advantage of data locality, wherein data is processed by the node they exist in.

Hadoop has seen extensive industrial use as the premier big data platform upon it’s release. In recent years it has been overshadowed by Spark, due to the greater speed gains offered by spark. The key reason Spark is so much faster than Hadoop comes down to their different processing approaches: Hadoop MapReduce requires reading from disk and writing to it, for the purposes of fault-tolerance, while Spark can run processing entirely in-memory. However, in-memory MapReduce is provided by another Apache project, Ignite[59].

5.1.1 Hadoop Distributed File System

The file system has 5 primary services.

Name Node
Contains all of the data and manages the system. The master node.
Secondary Name Node
Creates checkpoints of the metadata from the main name node, to potentially restart the single point of failure that is the name node. Not the same as a backup, as it only stores metadata.
Data Node
Contains the blocks of data. Sends “Heartbeat Message” to the name node every 3 seconds. If two minutes passes with no heartbeat message from a particular data node, the name node marks it as dead, and sends it’s blocks to another data node.
Job Tracker
Receives requests for MapReduce from the client, then queries the name node for locations of the data.
Task Tracker
Takes tasks, code, and locations from the job tracker, then applies such code at the location. The slave node for the job tracker.

HDFS is written in Java and C. It is described in more detail in [36]

5.1.2 MapReduce

MapReduce is a programming model consisting of map and reduce staps, alongside making use of keys.

Map
applies a “map” function to a dataset, in the mathematical sense of the word. The output data is temporarily stored before being shuffled based on output key, and sent to the reduce step.
Reduce
produces a summary of the dataset yielded by the map operation
Keys
are associated with the data at both steps. Prior to the application of mapping, the data is sorted and distributed among data nodes by the data’s associated keys, with each key being mapped as a logical unit. Likewise, mapping produces output keys for the mapped data, and the data is shuffled based upon these keys, before being reduced.

After sorting, mapping, shuffling, and reducing, the output is collected, sorting by the second keys and given as final output.

The implementation of MapReduce is provided by the HDFS services of job tracker and task tracker. The actual processing is performed by the task trackers, with scheduling using the job tracker, but other scheduling systems are available to be made use of.

Development at Google no longer makes as much use of MapReduce as they originally did, using stream processing technologies such as MillWheel, rather than the standard batch processing enabled by MapReduce[60].

5.2 Spark

Spark is a framework for cluster computing[25]. Much of it’s definition is in relation to Hadoop, which it intended to improve upon in terms of speed and usability for certain tasks.

It’s fundamental operating concept is the Resiliant Distributed Dataset (RDD), which is immutable, and generated through external data, as well as actions and transformations on prior RDD’s. The RDD interface is exposed through an API in various languages, including R.

Spark requires a distributed storage system, as well as a cluster manager; both can be provided by Hadoop, among others.

Spark is known for possessing a fairly user-friendly API, intended to improve upon the MapReduce interface. Another major selling point for Spark is the libraries available that have pre-made functions for RDD’s, including many iterative algorithms. The availability of broadcast variables and accumulators allow for custom iterative programming.

Spark has seen major use since it’s introduction, with effectively all major big data companies having some use of Spark. It’s features and implementations are outlined in [42].

5.3 H2O

The H2O software bills itself as,

an in-memory platform for distributed, scalable machine learning. H2O uses familiar interfaces like R, Python, Scala, Java, JSON and the Flow notebook/web interface, and works seamlessly with big data technologies like Hadoop and Spark. H2O provides implementations of many popular algorithms such as GBM, Random Forest, Deep Neural Networks, Word2Vec and Stacked Ensembles. H2O is extensible so that developers can add data transformations and custom algorithms of their choice and access them through all of those clients.

H2O typically runs on HDFS, along with spark for computation and bespoke data structures serving as the backbone of the architecture.

H2O can communicate with R through a REST api. Users write functions in R, passing user-made functions to be applied on the objects existing in the H2O system[61].

The company H2O is backed by $146M in funding, partnering with large institutions in the financial and tech world. Their business model follows an open source offering with the same moniker as the company, and a small set of heavily-marketed proprietary software in aid of it. They have some important figures working with them, such as Matt Dowle, creator of data.table.

6 A Survey of R Packages for Local Large-Scale Computing

6.1 Bigmemory Collection

The bigmemory package enables the creation of “massive matrices” through a “big.matrix” S4 class with a similar interface to ‘matrix’[53]. These matrices may take up gigabytes of memory, typically larger than RAM, and have simple operations defined that speed up their usage. A variety of extension packages are also available that provide more functionality for big.matrices. The massive capacity of big.matrices is given through their default memory allocation to shared memory, rather than working memory as in most R objects. The objects in R are therefore pointers, and the big.matrix “show” method prints a description and memory location instead of a standard matrix display, given that it is likely far too big a matrix to print reasonably. Parallel processing is made use of for the advantage of computations and subsetting of matrices. Development on the package is still active, however it is stable enough that updates are intermittent. Some of the main extension packages:

biganalytics
Extends bigmemory with matrix summary statistics such as colmeans, apply, as well as integration with the biglm package[62]. Biganalytics is authored by the same creators of the main bigmemory package.
bigtabulate
Extends bigmemory with tabulation functions and tapply, allowing for contingency tables and summary of big.matrix objects [63].
biglasso
Extends bigmemory matrices to allow for lasso, ridge and elastic-net model fitting. It can be take advantage of multicore machines, with the ability to be run in parallel. Biglasso is authored by Yaohui Zeng, and described in detail in [64].
bigalgebra
Provides BLAS routines for bigmemory and native R matrices. Linear Algebra functionality is given through matrix arithmetic methods, such as the standard %*%. The package is archived on CRAN as of February 2020, only being accessible through R-Forge. This is likely due to a merger with the main bigmemory package (must investigate).
bigstatsr
Was originally a set of functions for complex statistical analyses on big.matrices, having since implemented and provided it’s own “filebacked big matrices”[65]. The provided functions include matrix operations particularly relating to bioinformatics, such as PCA, sparse linear supervised models, etc. Bigstatsr is described in detail in [65].

6.1.1 LAPACK, BLAS, ATLAS

BLAS is a specification for a set of low-level “building block” linear algebra routines[66]. Most linear algebra libraries conform to the BLAS specifications, including the most prominent linear algebra library, LAPACK, with it’s own set of extensions[67]. LAPACK has been extended in turn to support a variety of systems, with implementations such as ScaLAPACK being introduced to attend to distributed memory systems[68].

6.2 disk.frame

The package description outlines disk.frame with the following:

A disk-based data manipulation tool for working with large-than-RAM datasets. Aims to lower the barrier-to-entry for manipulating large datasets by adhering closely to popular and familiar data manipulation paradigms like dplyr verbs and data.table syntax.

disk.frame provides a disk.frame class and derivatives, which model a data.frame. The primary functional difference is that disk.frames can be far larger than total RAM. This is enabled through the disk.frame objects being allocated to shared memory, rather than working memory as in data.frames. The transparency offered by the class is well-known to be at a very high level, with most standard manipulations of dataframes being applicable to disk.frame objects. I have written more on disk.frame in the document, A disk.frame Case Study

6.3 data.table

data.table is another dataframe alternative, focussing on speed through multithreading and well-tuned database algorithms[69]. The package has introduced a unique syntax for data.table manipulation, which is also made available in disk.frame. data.table objects are held in RAM, so big data processing is not easily enabled, however the package allows for serialisation of data.tables, and chunking is possible through splitting via the shard function. The package is authored by Matt Dowle, currently an employee at H2O.ai. An overview is given in [70], with extensive benchmarking available in [71].

6.4 fst

fst is a means of serialising dataframes, as an alternative to RDS files[14]. Serialisation takes place extremely fast, using compression to minimise disk usage. The package speed is increased through parallel computation. Author Mark Klik and Yann Collet, of Facebook, Inc. fst is a dependency of disk.frame, performing some of the background functionality.

6.5 iotools

iotools is a set of tools for managing big I/O, with an emphasis on speed and efficiency for big data through chunking[72]. The package provides several functions for creating and manipulating chunks directly. Authored by Simon Urbanek and Taylor Arnold.

6.6 ff

The package description outlines ff with the following:

The ff package provides atomic data structures that are stored on disk but behave (almost) as if they were in RAM by mapping only a subsection (pagesize) into main memory (the effective main memory consumption per ff object). Several access optimization techniques such as Hyrid Index Preprocessing (as.hi, update.ff) and Virtualization (virtual, vt, vw) are implemented to achieve good performance even with large datasets.

The package provides a disk-based storage for most base types in R. This also enables sharing of objects between different R processes. ff is authored by a German-based team, and maintained by Jens Oehlschlägel, the author of True Cluster. First introduced in 2008[73], there have been no updates since mid-2018.

ffbase[74]
is an extension to ff, providing standard statistical methods for ff objects. The package is still actively maintained. The package has been the subject of several talks, most notably the author’s overview, [75]. The package is currently being revised for a second version that provides generics functionality for dplyr on ff objects, under the title, “ffbase2”[76].

7 A Survey of R Packages for Distributed Large-Scale Computing

7.1 DistributedR

DistributedR offers cluster access for various R data structures, particularly arrays, and providing S3 methods for a fair range of standard functions. It has no regular cluster access interface, such as with Hadoop or MPI, being made largely from scratch.

The package creators have ceased development as of December 2015. The company, Vertica, has moved on to offering an enterprise database platform[77].

7.2 foreach and doX

foreach offers a high-level looping construct compatible with a variety of backends[38]. The backends are provided by other packages, typically named with some form of “DoX”. Parallelisation is enabled by some backends, with doParallel allowing parallel computations[78], doSNOW enabling cluster access through the SNOW package[79], and doMPI allowing for direct MPI access[57].

foreach is managed by Revolution Analytics, with many of the DoX corollary packages also being produced by them. Further information of foreach is given in [50].

I have written more on future in A Detail of foreach

7.3 future

future captures R expressions for evaluation, allowing them to be passed on for parallel and ad-hoc cluster evaluation, through the parallel package[80]. Such parallelisation uses the standard MPI or SOCK protocols.

The author of future is Henrik Bengtsson, Associate Professor at UCSF. Development on the package remains strong, with Dr. Bengtsson possessing a completely full commit calendar and 81,328 contributions on GitHub. I have written more on future in the document, A Detail of future. future has many aspects to it, captured in it’s extensive series of vignettes[81][86].

Furrr is a frontend to future, amending the functions from the package purrr to be compatible with future, thus enabling parallelisation in a similar form to multicore, though with a tidyverse style[87].

Furrr is developed by Matt Dancho, and Davis Vaughn, an employee at RStudio.

7.4 Parallel, snow, and multicore

Parallel is a package included with R, born from the merge of the packages snow and multicore[88]. Parallel enables various means of performing computations in R in parallel, allowing not only multiple cores in a node, but multiple nodes through snow’s interfaces to MPI and SOCK[89].

Parallel takes from multicore the ability to perform multicore processing, with the mcapply function. multicore creates forked R sessions, which is very resource-efficient, but not supported by windows.

From snow, distributed computing is enabled for multiple nodes.

multicore was developed by Simon Urbanek (!). snow was developed by Luke Tierney, a professor at the University of Iowa, who also originated the byte-compiler for R

7.5 pbdR

pbdR is a collection of packages allowing for distributed computing with R[90], with the name being the abbreviation of Programming with Big Data in R. The packages include high-performance communication and computation capabilities, including RPC, ZeroMQ, and MPI interfaces.

The collection is extensive, offering several packages for each of the main categories of application functionality, communication, computation, development, I/O, and profiling.

Some selected packages of interest include the following:

pbdBASE
Includes the base utilities for distributed matrices used in the project, including bindings and extensions to ScaLAPACK[90].
pbdDMAT
Higher level classes and methods for distributed matrices, including manipulation, linear algebra, and statistics routines. Uses the same syntax as base R through S4[24].
pbdMPI
Offers a high-level interface to MPI, using the S4 system to program in the SPMD style, with no “master” nodes[22].
pbdCS
A client/server framework for pbdR packages[56].
pbdML
Offers machine learning algorithms, consisting at present of only PCA and similar linear algebra routines, primarily for demonstration purposes[91].
hpcvis
Provides profiler visualisations generated by the other profiler packages within the collection[92].

The project is funded by major government sources and research labs in the US. In 2016, the project won the Oak Ridge National Laboratory 2016 Significant Event Award; as per [19],

OLCF is the Oak Ridge Leadership Computing Facility, which currently includes Summit, the most powerful computer system in the world.

More detail is given in [93].

7.6 RHadoop

RHadoop is a collection of five packages to run Hadoop directly from R[39]. The packages are divided by logical function, including rmr2, which runs MapReduce jobs, and rhdfs, which can access the HDFS. The packages also include plyrmr, which makes available plyr-like data manipulation functions, in a similar vein to sparklyr.

It is offered and developed by Revolution Analytics.

7.7 RHIPE and DeltaRho

RHIPE is a means of “using Hadoop from R”[40]. The provided functions primarily attain this through interfacing and manipulating HDFS, with a function, rhwatch, to submit MapReduce jobs. The easiest means of setup for it is to use a VM, and for all Hadoop computation, MapReduce is directly programmed for by the user.

There is currently no support for the most recent version of Hadoop, and it doesn’t appear to be under active open development, with the last commit being 2015. RHIPE has mostly been subsumed into the backend of DeltaRho, a simple frontend.

7.8 sparklyr

sparklyr is an interface to Spark from within R[26]. The user connects to spark and accumulates instructions for the manipulation of a Spark DataFrame object using dplyr commands, then executing the request on the Spark cluster.

Of particular interest is the capacity to execute arbitrary R functions on the Spark cluster. This can be performed directly, with the spark_apply() function, taking a user-defined function as a formal parameter. It can also be used as part of a dplyr chain through the mutate() function. Extending these, Spark-defined hive functions and windowing functions are enabled for use in mutate() calls. Limitations to arbitrary code execution include the lack of support for global references due to the underlying lack in the serialize package.

Some support for graphs and graph manipulation is enabled via usage with the graphframes package, which follows the Tidyverse pattern of working solely with dataframes and dataframe derivatives[94]. This binds to the GraphX component of Spark, enabling manipulation of graphs in Spark through pre-defined commands.

sparklyr is managed and maintained by RStudio, who also manage the rest of the Tidyverse (including dplyr).

7.9 SparkR

SparkR provides a front-end to Spark from R[95]. Like sparklyr, it provides the DataFrame as the primary object of interest. However, there is no support for the dplyr model of programming, with functions closer resembling base R being provided by the package instead.

SparkR is maintained directly by Apache Spark, with ongoing regular maintenance provided. Usage of the package is described in the vignette, [96], with implementation explained in [97].

7.10 hmr

hmr is an interface to MapReduce from R[41]. It runs super fast, making use of chunked data. Much of the process is handled by the package, with automatic R object conversion. hmr integrates with iotools, of which it is based upon. The author, like that of iotools, is Simon Urbanek.

7.11 big.data.table

big.data.table runs data.table over many nodes in an ad-hoc cluster[98]. This allows for big data manipulation using a data.table interface. The package makes use of Rserve (authored by Simon Urbanek) to facilitate communication between nodes when running from R. Alternatively, the nodes can be run as docker services, for fast remote environment setup, using RSclient for connections. Beyond greater storage capacity, speed is increased through manipulations on big.data.tables occurring in parallel. The package is authored by Jan Gorecki, but hasn’t been actively developed since mid-2016.

8 A Survey of R Derivatives for Large-Scale Computing

8.1 ML Services on HDInight

ML Services on HDInsight, also referred to as R Server, is a distribution of R with particular emphasis on parallel and capabilities[99]. Specific parallel capabilities advertised include multi-threaded mathematics libraries. R Server for HDInsight was previously named Revolution R, and developed by Revolution Analytics, before being bought out by Microsoft, renamed Microsoft R, before acquiring the name R server for HDInsight, before changing to ML Services.

8.2 IBM Big R

IBM Big R is a library of functions integrating R with the IBM InfoSphere BigInsights platform[100]. This is implemented through the introduction of various bigr classes replicating base R types. These are then run in the background on the BigInsights platform, which is in turn powered by Apache Hadoop. The user is therefore able to run MapReduce jobs without having to explicitly write MapReduce-specific code.

8.3 MapR

MapR initially provided R access to Hadoop, being mainly HDFS access[101]. It was then bought out by HP in May 2019, pivoting to selling an enterprise database platform and analytics services, running on Hadoop among other backends. Development has ceased on R access.

9 A Survey of R Packages for Large-Scale Statistical Modelling

9.1 partools

partools provides utilities for the parallel package[102]. It offers functions to split files and process the splits across nodes provided by parallel, along with bespoke statistical functions.

It consists mainly of wrapper functions, designed to follow it’s philosophy of “keep it distributed”.

It is authored by Norm Matloff, a professor at UC, Davis and current Editor-in-Chief of the R Journal.

In more detail, [103] and [104] presents the motivation behind partools with reference to Hadoop and Spark. Matloff describes partools as more “sensible” for large data sets than Hadoop and Spark, due to their difficulty of setup, abstract programming paradigms, and the overhead caused by their fault tolerance. The alternative approach favoured by partools, termed “software alchemy”, is to use base R to split the data into distributed chunks, run analyses on each chunk, then average the results. This is proven to have asymptotic equivalence to standard analyses under certain assumptions, such as iid data. Effectively, it is a map-reduce, with map being some analysis, and reduce being an average.

The analyses amenable to software alchemy have bespoke functions for them in the package, typically consisting of their base R name with the prefix “ca” alluding to “chunk averaging”, such as calm(). Other functions in which it doesn’t make sense to average are also supported, such as column sums, which also have specific functions made for them. Complex cases such as fitting LASSO models, in which each chunk may have settled on different explanatory variables, are catered for in partools through subsetting them. Finally, aggregate functions, akin to R’s aggregate function, provide for arbitrary functions to be applied to distributed data.

In terms of applications of the package, it is difficult to estimate the usage of it; as it has a more complex setup than a simple library call, it won’t be included in many other packages. Similarly, the nature of the work skews towards interactive usage, and custom business-specific programs that are difficult to attain data on. The reverse dependencies/imports have all been authored by Matloff, so aren’t entirely informative, but their usage is interesting: one package (cdparcoord) to plot coordinates for large datasets in parallel, and one (polyreg) to form and evaluate polynomial regression models.

9.2 biglm

biglm is described succinctly as

bounded memory linear and generalized linear models[105].

biglm has been extended by other packages, and can integrate with bigmemory matrices through biganalytics. The package is developed by Dr.Thomas Lumley of the University of Auckland.

10 A Review of Iteration with sparklyr

10.1 Introduction

Given that iteration is cited by a principal author of Spark as a motivating factor in it’s development when compared to Hadoop, it is reasonable to consider whether the most popular R interface to Spark, sparklyr, has support for iteration[25], [26]. One immediate hesitation to the suitability of sparklyr to iteration is the syntactic rooting in dplyr; dplyr is a “Grammar of Data Manipulation” and part of the tidyverse, which in turn is an ecosystem of packages with a shared philosophy[27], [28].

The promoted paradigm is functional in nature, with iteration using for loops in R being described as “not as important” as in other languages; map functions from the tidyverse purrr package are instead promoted as providing greater abstraction and taking much less time to solve iteration problems. Maps do provide a simple abstraction for function application over elements in a collection, similar to internal iterators, however they offer no control over the form of traversal, and most importantly, lack mutable state between iterations that standard loops or generators allow[29]. A common functional strategy for handling a changing state is to make use of recursion, with tail-recursive functions specifically referred to as a form of iteration in [30]. Reliance on recursion for iteration is naively non-optimal in R however, as it lacks tail-call elimination and call stack optimisations[31]; at present the elements for efficient, idiomatic functional iteration are not present in R, given that it is not as functional a language as the tidyverse philosophy considers it to be, and sparklyr’s attachment to the the ecosystem prevents a cohesive model of iteration until said elements are in place.

10.2 Iteration

Iteration takes place in Spark through caching results in memory, allowing faster access speed and decreased data movement than MapReduce[25]. sparklyr can use this functionality through the tbl_cache() function to cache Spark dataframes in memory, as well as caching upon import with memory=TRUE as a formal parameter to sdf_copy_to().

Iteration can also make use of persisting Spark Dataframes to memory, forcing evaluation then caching; performed in sparklyr through sdf_persist().

The Babylonian method for calculating a square root is a simple iterative procedure, used here as an example. A standard form in R with non-optmimised initial value is given in lst. 4.

Listing 4: Simple Iteration with the Babylonian Method

basic_sqrt <- function(S, frac_tolerance=0.01, initial=1){
    x <- initial
    while(abs(x\^2 - S)/S > frac_tolerance){
        x <- (x + S/x)/2
    }
    x
}

This iterative function is trivial, but translation to sparklyr is not entirely so.

The first aspect that must be considered is that sparklyr works on Spark Data Frames; x and S must be copied to Spark with the aforementioned sdf_copy_to() function.

The execution of the function in Spark is the next consideration, and sparklyr provides two means for this to occur; spark_apply() evaluates arbitrary R code over an entire data frame. The means of operation vary across Spark versions, ranging from launching and running RScripts in Spark 1.5.2, to Apache Arrow conversion in Spark 3.0.0.

The evaluation strategy of 1.5.2 is unsuitable in this instance as it is excessive overhead to launch RScripts every iteration.

The other form of evaluation is through using dplyr generics, which is what will be made use of in this example.

An important aspect of consideration is that sparklyr methods for dplyr generics execute through a translation of the formal parameters to Spark SQL. This is particularly relevant in that separate Spark Data Frames can’t be accessed together as in a multivariable function. In addition, very R-specific functions such as those from the stats and matrix core libraries are not able to be evaluated, as there is no Spark SQL cognate for them. The SQL query generated by the methods can be accessed and “explained” through show_query() and explain() respectively; When attempting to combine two Spark Data Frames in a single query without joining them, show_query() reveals that the Data Frame that is referenced through the .data variable is translated, but the other Data Frame has it’s list representation passed through, which Spark SQL doesn’t have the capacity to parse; an example is given in lst. 6 (generated through listing lst. 5), showing an attempt to create a new column from the difference between two seperate Data Frames

Listing 5: Attempt in R to form new column from the difference between two separate Spark data frames S and x

show_query(mutate(S, S = S - x)

Listing 6: Spark SQL query generated from attempt to form the difference from two seperate data frames

SELECT `S` - list(con = list(master = "yarn", method = "shell", app_name =
    "sparklyr", config = list(spark.env.SPARK_LOCAL_IP.local = "127.0.0.1",
    sparklyr.connect.csv.embedded = "\^1.*",
    spark.sql.legacy.utcTimestampFunc.enabled = TRUE,
    sparklyr.connect.cores.local = 4, spark.sql.shuffle.partitions.local =
    4), state = <environment>, extensions = list(jars = character(0),
    packages = character(0), initializers = list(), catalog_jars =
    character(0)), spark_home =
    "/shared/spark-3.0.0-preview2-bin-hadoop3.2", backend = 4, monitoring =
    5, gateway = 3, output_file =
    "/tmp/Rtmpbi2dqk/file44ec187daaf4_spark.log", sessionId = 58600,
    home_version = "3.0.0")) AS `S1`, `S` - list(x = "x", vars = "initial")
    AS `S2` FROM `S`

Global variables that evaluate to SQL-friendly objects can be passed and are evaluated prior to translation. An example is given through lst. 7, generated through lst. 8, where the difference between a variable holding a numeric and a Spark Data Frame is translated into the evaluation of the variable, transformed to a float for Spark SQL, and its difference with the Spark Data Frame, referenced directly.

\begin{listing} \begin{minted}{sql}

Listing 7: Spark SQL query generated from attempt to form the difference between a data frame and a numeric

SELECT `S` - 3.0 AS `S`
FROM `S`

\begin{listing} \begin{minted}{r}

Listing 8: Capacity in sparklyr to form new column from the difference between a spark data frame and a numeric

S
# Source: spark<S> [?? x 1]
#      S
#  <dbl>
#     9
x = 3
mutate(S, S = S - x)
# Source: spark<?> [?? x 1]
#      S
#  <dbl>
#     6

A reasonable approach to implementing a Babylonian method in sparklyr is then to combine S and x in one dataframe, and iterate within columns.

\begin{listing} \begin{minted}{R}

Listing 9: Babylonian method implementation using sparklyr

library(sparklyr)

sc <- spark_connect(master = "yarn")

sparklyr_sqrt <- function(S, sc, frac_tolerance=0.01, initial=1){
        bab = sdf_copy_to(sc,
                          data.frame(x=initial, S=S, unfinished=TRUE),
                          "bab", memory = TRUE, overwrite = TRUE)
    while(any(collect(bab)\$unfinished)){
                compute(mutate(bab, x = (x + S/x)/2,
                               unfinished = abs(x^2 - S)/S > frac_tolerance),
                        "bab")
        }
        collect(bab)\$x
}

10.3 Conclusion

sparklyr is excellent when used for what it is designed for. Iteration, in the form of an iterated function, does not appear to be part of this design; this was clear in the abuse required to implement a simple iterated function in the form of the Babylonian Method. Furthermore, all references to “iteration” in the primary sparklyr literature refer either to the iteration inherent in the inbuilt Spark ML functions, or the “wrangle-visualise-model” process popularised by Hadley Wickham[28], [32]. None of such references connect with iterated functions.

Thus, it is fair to conclude that sparklyr is incapable of sensible iteration of arbitrary R code beyond what maps directly to SQL; even with mutate, it is a very convoluted interface for attempting any iteration more complex than the Babylonian Method. Implementation of a GLM function with sparklyr iteration was initially planned, but the point was already proven by something far simpler, and the point is one that did not need to be laboured.

Ultimately, sparklyr is excellent at what it does, but convoluted and inefficient when abused, as when attempting to implement iterated functions.

11 A Review of pbdR

11.1 Introduction

pbdR is a collection of packages allowing for distributed computing with R[90]. The name is an acronym for the collection’s purpose; Programming with Big Data in R. It is introduced on it’s main page with the following description:

The “Programming with Big Data in R” project (pbdR) is a set of highly scalable R packages for distributed computing and profiling in data science. > Our packages include high performance, high-level interfaces to MPI, ZeroMQ, ScaLAPACK, NetCDF4, PAPI, and more. While these libraries shine brightest on large distributed systems, they also work rather well on small clusters and usually, surprisingly, even on a laptop with only two cores. > Winner of the Oak Ridge National Laboratory 2016 Significant Event Award for “Harnessing HPC Capability at OLCF with the R Language for Deep Data Science.” OLCF is the Oak Ridge Leadership Computing Facility, which currently includes Summit, the most powerful computer system in the world.[19]

11.2 Interface and Backend

The project seeks especially to serve minimal wrappers around the BLAS and LAPACK libraries along with their distributed derivatives, with the intention of introducing as little overhead as possible. Standard R also uses routines from the library for most matrix operations, but suffers from numerous inefficiencies relating to the structure of the language; for example, copies of all objects being manipulated will be typically be created, often having devastating performance aspects unless specific functions are used for linear algebra operations, as discussed in [20] (e.g., crossprod(X) instead of t(X) %*% X)

Distributed linear algebra operations in pbdR depend further on the ScaLAPACK library, which can be provided through the pbdSLAP package [21].

The principal interface for direct distributed computations is the pbdMPI package, which presents a simplified API to MPI through R [22]. All major MPI libraries are supported, but the project tends to make use of openMPI in explanatory documentation. A very important consideration that isn’t immediately clear is that pbdMPI can only be used in batch mode through MPI, rather than any interactive option as in Rmpi [23].

The actual manipulation of distributed matrices is enabled through the pbdDMAT package, which offers S4 classes encapsulating distributed matrices [24]. These are specialised for dense matrices through the ddmatrix class, though the project offers some support for other matrices. The ddmatrix class has nearly all of the standard matrix generics implemented for it, with nearly identical syntax for all.

11.3 Setup

The setup for pbdR is simple, being no more than a CRAN installation, but a well tuned environment, which is the main purpose for using the package in the first place, requires BLAS, LAPACK and derivatives, a parallel file system with data in an appropriate format such as HDF5, and a standard MPI library. Much of the pain of setup is ameliorated with a docker container, provided by the project.

12 A Detail of future

12.1 Overview

future is introduced with the following summary:

The purpose of this package is to provide a lightweight and unified future API for sequential and parallel processing of R expression via futures.

The simplest way to evaluate an expression in parallel is to use x %<-% { expression } with plan(multiprocess). This package implements sequential, multicore, multisession, and cluster futures. With these, R expressions can be evaluated on the local machine, in parallel a set of local machines, or distributed on a mix of local and remote machines. Extensions to this package implement additional backends for processing futures via compute cluster schedulers etc. Because of its unified API, there is no need to modify any code in order switch from sequential on the local machine to, say, distributed processing on a remote compute cluster. Another strength of this package is that global variables and functions are automatically identified and exported as needed, making it straightforward to tweak existing code to make use of futures.[80]} futures are abstractions for values that may be available at some point in the future, taking the form of objects possessing state, being either resolved and therefore available immediately, or unresolved, wherein the process blocks until resolution.

futures find their greatest use when run asynchronously. The future package has the inbuilt capacity to resolve futures asynchronously, including in parallel and through a cluster, making use of the parallel package. This typically runs a separate process for each future, resolving separately to the current R session and modifying the object state and value according to it’s resolution status.

12.2 Comparison with Substitution and Quoting

R lays open a powerful set of metaprogramming functions, which bear similarity to future. R expressions can be captured in a quote(), then evaluated in an environment with eval() at some point in the future. Additionally, substitute() substitutes any variables in the expression passed to it with the values bound in an environment argument, thus allowing “non-standard evaluation” in functions.

future offers a delay of evaluation as well, however such a delay is not due to manual control of the programmer through eval() functions and the like, but due to background computation of an expression instead.

12.3 Example Usage

Through substitution and quoting, R can, for example, run a console within the language. Futures allows the extension of this to a parallel evaluation scheme. Listing lst. 10 gives a simple implementation of this idea: a console that accepts basic expressions, evaluating them in the background and presenting them upon request when complete. Error handling and shared variables are not implemented.

Listing 10: Usage of future to implement a basic multicore console

library(future)

multicore.console <- function(){
    get.input <- function(){
        cat("Type \"e\" to enter an expression for",
            "evaluation \nand \"r\" to see",
            "resolved expressions\n", sep="")
        readline()
    }

    send.expr <- function(){
        cat("Multicore Console> ")
        input <- readline()
        futs[[i]] <<- future(eval(str2expression(input)))
        cat("\nResolving as: ", as.character(i), "\n")
    }

    see.resolved <- function(){
        for (i in 1:length(futs)){
            if (is(futs[[i]], "Future") &
                resolved(futs[[i]])) {
                cat("Resolved: ", as.character(i), " ")
                print(value(futs[[i]]))
            }
        }
    }

    plan(multicore)
    futs <- list()
    i <- 1
    while(TRUE){
        input <- get.input()
        if (input == "e") {
            send.expr()
            i <- i + 1
        } else if (input == "r") {
            see.resolved()
        } else {
            cat("Try again")
        }
    }
}

multicore.console()

12.4 Extension Packages

doFuture
[106] provides an adapter for foreach[38] that works on a future-based backend. Note that this does does not return foreach() calls as futures. The multicore features enabled with future are redundant over the existing parallel package, but because future backends can include other clusters, such as those provided by batchtools, there is some additional functionality, including additional degrees of control over backends.
future.batchtools
[107] provides a future API for batchtools[108], or equivalently, a batchtools backend for future. This allows the use of various cluster schedulers such as TORQUE, Slurm, Docker Swarm, as well as custom cluster functions.
future.apply
[109] provides equivalent functions to R’s apply procedures, with a future backend enabling parallel, cluster, and other functionality as enabled by backends such as batchtools through future.batchtools.
future.callr
[110] provides a callr[111] backend to future, with all of the associated advantages and overhead. Callr “call[s] R from R”. It provides functions to run expressions in a background R process, beginning a new session. An advantage of callr is that it allows more than 125 connections, due to not making use of R-specific connections. Additionally, no ports are made use of, unlike the SOCKcluster provided by the snow component of parallel.
furrr
[87] allows the use of future as a backend to purrr functions. purrr is a set of functional programming tools for R, including map, typed map, reduce, predicates, and monads. Much of it is redundant to what already exists in R, but it has the advantage and goal of adhering to a consistent standard.

12.5 Further Considerations

One initial drawback to future is the lack of callback functionality, which would open enormous potential. However, this feature is made available in the promises package, which has been developed by Joe Cheng at RStudio, which allows for user-defined handlers to be applied to futures upon resolution[112].

Issues that aren’t resolved by other packages include the copying of objects referenced by future, with mutable objects thereby unable to be directly updated by future (though this may be ameliorated with well-defined callbacks). This also means that data movement is mandatory, and costly; future raises an error if the data to be processed is over 500Mb, though this can be overridden.

Referencing variables automatically is a major unsung feature of future, though it doesn’t always work reliably; future relies on code inspection, and allows a global parameter to have manual variable specification.

It seems likely that the future package will have some value to it’s use, especially if asynchronous processing is required on the R end; it is the simplest means of enabling asynchrony in R without having to manipulate networks or threads.

13 A Review of foreach

13.1 Introduction

foreach introduces itself on CRAN with the following description:

Support for the foreach looping construct. Foreach is an idiom that allows for iterating over elements in a collection, without the use of an explicit loop counter. This package in particular is intended to be used for its return value, rather than for its side effects. In that sense, it is similar to the standard lapply function, but doesn’t require the evaluation of a function. Using foreach without side effects also facilitates executing the loop in parallel.

From the user end, the package is conceptually simple, revolving entirely around a looping construct and the one-off backend registration.

The principal goal of the package, which it hasn’t strayed from, is the enabling of parallelisation through backend transparency within the foreach construct. Notably, more complex functionality, such as side effects and parallel recurrance, are not part of the package’s intention.

Thus, the primary driver for the practicality of the package, beyond the support offered for parallel backends, is the backends themselves, currently enabling a broad variety of parallel systems.

foreach is developed by Steve Weston and Hoong Ooi.

13.2 Usage

foreach doesn’t require setup for simple serial execution, but parallel backends require registration by the user, typically with a single function as in the registration for doParallel, registerDoParallel().

The syntax of foreach consists of a foreach() function call next to a %do% operator, and some expression to the right[50]. Without loss in generality, the syntactic form is given in lst. 11.

Listing 11: Standard foreach syntax

foreach(i=1:n) %do% {expr}

The foreach() function can take other arguments including changing the means of combination along iterations, whether iterations should be performed in order, as well as the export of environmental variables and packages to each iteration instance.

In addition to %do%, other binary operators can be appended or substituted. Parallel iteration is performed by simply replacing %do% with %dopar%. Nested loops can be created by inserting %:% between main and nested foreach functions, prior to the %do% call[113]. The last step to composition of foreach as capable of list comprehension is the filtering function %when%, which filters iterables based on some predicate to control evaluation.

13.3 Implementation

The mechanism of action in foreach is often forgotten in the face of the atypical form of the standard syntax. Going one-by-one, the foreach() function returns an iterable object, %do% and derivatives are binary functions operating on the iterable object returned by foreach() on the left, and the expression on the right; the rightmost expression is simply captured as such in %do%. Thus, the main beast of burder is the %do% function, where the evaluation of the iteration takes place.

In greater detail, %do% captures and creates environments, enabling sequential evaluation. %dopar% captures the environment of an expression, as well taking as a formal parameter a vector of names of libraries used in the expression, then passing that to the backend, which will in turn do additional work on capturing references to variables in expressions and adding them to evaluation environment, as well as ensure packages are loaded on worker nodes.

%do% and %dopar%, after correct error checking, send calls to getDoSeq() and getDoPar() respectively, which return lists determined by the registered backend, which contain a function used backend, used to operate on the main expression along with other environmental data.

foreach depends strongly upon the iterators package, which gives the ability to construct custom iterators. These custom iterators can be used in turn with the foreach() function, as the interface to them is transparent.

13.4 Form of Iteration}

The name of the package and function interface refer to the foreach programming language construct, present in many other languages. By definition, the foreach construct performs traversal over some collection, not necessarily requiring any traversal order. In this case, the collection is an iterator object or an object coercible to one, but in other languages with foreach as part of the core language, such as python (whose for loop is actually only a foreach loop), collections can include sets, lists, and a variety of other classes which have an __iter__ and __next__ defined[114].

Due to the constraints imposed by a foreach construct, loop optimisation is simplified relative to a for loop, and the lack of explicit traversal ordering permits parallelisation, which is the primary reason for usage of the foreach package. The constraints are not insignificant however, and they do impose a limit on what can be expressed through their usage. Most notably, iterated functions, wherein the function depends on it’s prior output, are not necessarily supported, and certainly not supported in parallel. This is a result of the order of traversal being undefined, and when order is essential to maintain coherent state, as in iterated functions, the two concepts are mutually exclusive.

In spite of the constraints, iterated functions can actually be emulated in foreach through the use of destructive reassignment within the passed expression, or through the use of stateful iterators. Examples of both are given in lsts. 12, 13.

Listing 12: Serial iterated function through destructive reassignment

x <- 10
foreach(i=1:5) %do% {x <- x+1}

Listing 13: Serial iterated function through creation of a stateful iterator

addsone <- function(start, to) {
    nextEl <- function(){
        start <<- start + 1
        if (start >= to) {
            stop('StopIteration')
        }
        start}
    obj <- list(nextElem=nextEl)
    class(obj) <- c('addsone', 'abstractiter', 'iter')
    obj
}

it <- addsone(10, 15)
nextElem(it)

foreach(i = addsone(10, 15), .combine = c) %do% i

As alluded to earlier, the functionality breaks down when attempting to run them in parallel. lsts. 14, 15 demonstrate attempts to evaluate these iterated functions in parallel. They only return a list of 5 repetitions of the same “next” number, not iterating beyond it.

Listing 14: Parallel Iteration attempt through destructive reassignment

cl <- makeCluster(2)
doParallel::registerDoParallel(cl)
x <- 10
foreach(i=1:5) %dopar% {x <- x+1}

\begin{listing} \begin{minted}{R}

Listing 15: Parallel Iteration attempt through a stateful iterator

doParallel::registerDoParallel
foreach(i = addsone(10, 15), .combine = c) %dopar% i

13.5 Extensions

The key point of success in foreach is it’s backend extensibility, without which, foreach would lack any major advantages over a standard for loop.

Other parallel backends are enabled through specific functions made available by the foreach package. The packages define their parallel evaluation procedures with reference to the iterator and accumulator methods from foreach.

Numerous backends exist, most notably:

doParallel
the primary parallel backend for foreach, using the parallel package[78].
doRedis
provides a Redis backend, through the redux package[115].
doFuture
uses the future package to make use of future’s many backends[106].
doAzureParallel
allows for direct submission of parallel workloads to an Azure Virtual Machine[116].
doMPI
provides MPI access as a backend, using Rmpi[57].
doRNG
provides for reproducible random number usage within parallel iterations, using L’Ecuyer’s method; provides %dorng%[117].
doSNOW
provides an ad-hoc cluster backend, using the snow package[79].

13.6 Relevance

foreach serves as an example of a well-constructed package supported by it’s transparency and extensibility.

For packages looking to provide any parallel capabilities, a foreach extension would certainly aid it’s potential usefulness and visibility.

14 A Disk.Frame Case Study

14.1 Introduction

disk.frame works through two main principles: chunking, and generic function implementation (alongside special functions). Another component that isn’t mentioned in the explanation, but is crucial to performance, is the parallelisation offered transparently by the package.

disk.frame is developed by Dai ZJ.

14.2 Chunks and Chunking

14.2.1 Chunk Representation

disk.frames are actually references to numbered fst files in a folder, with each file serving as a chunk. This is made use of through manipulation of each chunk separately, sparing RAM from dealing with a single monolithic file[13].

Fst is a means of serialising dataframes, as an alternative to RDS files[14]. It makes use of an extremely fast compression algorithm developed at facebook, with the R package enabling fst written on in R Packages for Local Large-Scale Computing.

From inspection of the source code, data.table manipulations are enabled directly through transformations of each chunk to data.tables through the fst backend.

14.2.2 Chunk Usage

Chunks are created transparently by disk.frame; the user can theoretically remain ignorant of chunking. In R, the disk.frame object serves as a reference to the chunk files. Operations on disk.frame objects are by default lazy, waiting until the collect() command to perform the collected operations and pull the chunks into R as a single data.table. As noted in [118], this form of lazy evaluation is similar to the implementation of sparklyr.

Chunks are by default assigned rows through hashing the source rows, but can be composed of individual levels of some source column, which can provide an enormous efficiency boost for grouped operations, where the computation visits the data, rather than the other way around.

Chunks can be manipulated individually, having individual ID’s, through get_chunk(), as well as added or removed from additional fst files directly, through add_chunk() and remove_chunk(), respectively.

In a computationally intensive procedure, the rows can be rearranged between chunks based on a particular column level as a hash, through functions such as rechunk().

14.3 Functions

The disk.frame object has standard procedures for construction and access. disk.frame can be constructed from data.frames and data.tables through as.disk.frame(), single or multiple csv files through csv_to_disk.frame(), as well as zip files holding csv files. Time can be saved later on through the application of functions to the data during the conversion, as well as specifying what to chunk by, keeping like data together. The process of breaking up data into chunks is referred to by disk.frame as “sharding”, enabled for data.frames and data.tables through the shard() function.

After creating a disk.frame object, functions can be applied directly to all chunks invisibly through using the cmap() family of functions in a form similar to base R *apply()

A highly publicised aspect of disk.frame is the functional cross-compatibility with dplyr verbs. These operate on disk.frame objects lazily, and are applied through translation by disk.frame; they are just S3 methods defined for the disk.frame class. They are fully functioning, with the exception of group_by (and it’s data.table cognate, [by=], considered in more detail in subsection sec. 14.3.1.

Beyond higher-order functions and dplyr or data.table analogues for data manipulation, the direct modelling function of dfglm() is implemented to allow for fitting glms to the data. From inspection of the source code, the function is a utility wrapper for streaming disk.frame data by default into bigglm, a biglm derivative.

14.3.1 Grouping

For a select set of functions, disk.frame offers a transparent grouped summarise(). These are mainly composed of simple statistics such as mean(), min(), etc.

For other grouped functions, there is more complexity involved, due to the chunked nature of disk.frame. When functions are applied, they are by default applied to each chunk. If groups don’t correspond injectively to chunks, then the syntactic chunk-wise summaries and their derivatives may not correspond to the semantic group-wise summaries expected. For example, summarising the median is performed by using a median-of-medians method; finding the overall median of all chunks’ respective medians. Therefore, computing grouped medians in disk.frame result in estimates only — this is also true of other software, such as spark, as noted in [15].

Grouped functions are thereby divided into one-stage and two-stage; one-stage functions “just work” with the group_by() function, and two-stage functions requiring manual chunk aggregation (using chunk_group_by and chunk_summarize), followed by an overall collected aggregation (using regular group_by() and summarise()). [15] points out that explicit two-stage approach is similar to a MapReduce operation.

Custom one-stage functions can be created, where user-defined chunk aggregation and collected aggregation functions are converted into one-stage functions by disk.frame[119]. These take the forms fn_df.chunk_agg.disk.frame() and fn_df.collected_agg.disk.frame() respectively, where “fn” is used as the name of the function, and appended to the defined name by disk.frame, through meta-programming.

To de-complicate the situation, but add one-off computational overhead, chunks can be rearranged to correspond to groups, thereby allowing for one-stage summaries just through chunk_summarize(), and exact computations of group medians.

14.4 Parallelism

An essential component of disk.frame’s speed is parallelisation; as chunks are conceptually separate entities, function application to each can take place with no side effects to other chunks, and can therefore be trivially parallelised.

For parallelisation, future is used as the backend package, with most function mappings on chunks making use of future::future_lapply() to have each chunk mapped with the intended function in parallel. Future is a package with complexities in it’s own right; I have written more on future in the document, A Detail of Future

future is initialised with access to cores through the wrapper function, setup_disk.frame()[16]. This sets up the correct number of workers, with the minimum of workers and chunks being processed in parallel.

An important aspect to parallelisation through future is that, for purposes of cross-platform compatibility, new R processes are started for each worker[17]. Each process will possess it’s own environment, and disk.frame makes use of future’s detection capabilities to capture external variables referred to in calls, and send them to each worker.

14.5 Relevance

disk.frame serves as an example of a very well-received and used package for larger-than-RAM processing. The major keys to it’s success have been it’s chart-topping performance, even in comparison with dask and Julia, and it’s user-friendliness enabled through procedural transparency and well-articulated concepts.

disk.frame as a concept also lends itself well to extension:

The storage of chunks is currently file-based and managed by an operating system; if fault tolerance was desired, HDFS support for chunk storage would likely serve this purpose well.

Alternatively, OS-based file manipulation could be embraced in greater depth, focussing on interaction with faster external tools for file manipulation; this would lead to issues with portability, but with reasonable checks, could enable great speedups through making use of system utilities such as sort on UNIX-based systems.

The type of file may also be open to extension, with other file formats competing for high speeds and cross-language communication including feather, developed by Wes McKinney and Hadley Wickham[120].

In terms of finer-grained extension, more functionality for direct manipulation of individual chunks would potentially aid computation when performing iterative algorithms and others of greater complexity.

15 A Survey of s-u

15.1 Introduction

15.2 iotools and DistributedR

15.3 Rserve and RSclient

15.4 hmr, hdfsc, and hbase

15.5 roctopus

[1]
D. R. G. Rydning, “The digitization of the world from edge to core,” Framingham: International Data Corporation, 2018.
[2]
M. Hilbert and P. López, “The world?s technological capacity to store, communicate, and compute information,” science, vol. 332, no. 6025, pp. 60–65, 2011.
[3]
R. E. Fontana Jr and G. M. Decad, “Moore’s law realities for recording systems and memory storage components: HDD, tape, NAND, and optical,” AIP Advances, vol. 8, no. 5, p. 056506, 2018.
[4]
G. E. Moore et al., “Progress in digital integrated electronics,” in Electron devices meeting, 1975, vol. 21, pp. 11–13.
[5]
W. Toy, Computer hardware/software architecture. Englewood Cliffs, N.J: Prentice-Hall, 1986.
[6]
C. IT, “Standard computer build specifications.”
[7]
R. Ihaka and R. Gentleman, “R: A language for data analysis and graphics,” Journal of computational and graphical statistics, vol. 5, no. 3, pp. 299–314, 1996.
[8]
R. C. Team, R: A language and environment for statistical computing. Vienna, Austria: R Foundation for Statistical Computing, 2020.
[9]
G. M. Amdahl, “Validity of the single processor approach to achieving large scale computing capabilities,” in Proceedings of the april 18-20, 1967, spring joint computer conference, 1967, pp. 483–485.
[10]
J. L. Gustafson, “Reevaluating amdahl’s law,” Communications of the ACM, vol. 31, no. 5, pp. 532–533, 1988.
[11]
I. Foster, Designing and building parallel programs : Concepts and tools for parallel software engineering. Reading, Mass: Addison-Wesley, 1995.
[12]
D. ZJ, Disk.frame: Larger-than-RAM disk-based data manipulation framework. 2020.
[13]
D. ZJ, “Ingesting data disk.frame. Exploiting the structure of a disk.frame.” 2019.
[14]
M. Klik, Fst: Lightning fast serialization of data frames for r. 2019.
[15]
D. ZJ, “Group-by disk.frame.” 2019.
[16]
D. ZJ, “Key ‘disk.frame‘ concepts disk.frame.” 2019.
[17]
D. ZJ, “Using data.table syntax with disk.frame disk.frame. External variables are captured.” 2019.
[18]
T. S. Perry, “Building a supercomputer in a flash.” 2004.
[19]
G. Ostrouchov, W.-C. Chen, D. Schmidt, and P. Patel, “Programming with big data in r.” 2012.
[20]
D. Schmidt, W.-C. Chen, M. A. Matheson, and G. Ostrouchov, “Programming with BIG data in r: Scaling analytics from one to thousands of nodes,” Big Data Research, vol. 8, pp. 1–11, 2017.
[21]
W.-C. Chen, D. Schmidt, G. Ostrouchov, and P. Patel, pbdSLAP: Programming with big data – scalable linear algebra packages.” 2012.
[22]
W.-C. Chen, G. Ostrouchov, D. Schmidt, P. Patel, and H. Yu, pbdMPI: Programming with big data – interface to MPI.” 2012.
[23]
H. Yu, “Rmpi: Parallel statistical computing in r,” R News, vol. 2, no. 2, pp. 10–14, 2002.
[24]
D. Schmidt, W.-C. Chen, S. L. de la Chapelle, G. Ostrouchov, and P. Patel, pbdDMAT: pbdR distributed matrix methods.” 2020.
[25]
M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, I. Stoica, et al., “Spark: Cluster computing with working sets.” HotCloud, vol. 10, no. 10–10, p. 95, 2010.
[26]
J. Luraschi, K. Kuo, K. Ushey, J. Allaire, and T. A. S. Foundation, Sparklyr: R interface to apache spark. 2020.
[27]
H. Wickham et al., “Welcome to the tidyverse,” Journal of Open Source Software, vol. 4, no. 43, p. 1686, 2019.
[28]
H. Wickham and G. Grolemund, R for data science: Import, tidy, transform, visualize, and model data. " O’Reilly Media, Inc.", 2016.
[29]
G. Cousineau and M. Mauny, The functional approach to programming. Cambridge University Press, 1998.
[30]
H. Abelson, G. J. Sussman, and J. Sussman, Structure and interpretation of computer programs. Justin Kelly, 1996.
[31]
R. C. Team, R language definition. 2020.
[32]
J. Luraschi, Mastering spark with r : The complete guide to large-scale analysis and modeling. Sebastopol, CA: O’Reilly Media, 2019.
[33]
S. Ghemawat, H. Gobioff, and S.-T. Leung, “The google file system,” in Proceedings of the nineteenth ACM symposium on operating systems principles, 2003, pp. 29–43.
[34]
J. Dean and S. Ghemawat, “MapReduce: Simplified data processing on large clusters,” Google, Inc., 2004.
[35]
M. Rocklin, “Dask: Parallel computation with blocked algorithms and task scheduling,” 2015.
[36]
K. Shvachko, H. Kuang, S. Radia, and R. Chansler, “The hadoop distributed file system,” in 2010 IEEE 26th symposium on mass storage systems and technologies (MSST), 2010, pp. 1–10.
[37]
RStudio, “Distributing R computations.” 2020.
[38]
Microsoft and S. Weston, Foreach: Provides foreach looping construct. 2020.
[39]
R. Analytics, “RHadoop wiki.” Nov. 2015.
[40]
DeltaRho, “RHIPE: R and hadoop integrated programming environment.” Dec. 2015.
[41]
S. Urbanek and T. Arnold, Hmr: High-performance hadoop map/reduce r interface based on iotools. 2020.
[42]
M. Zaharia et al., “Apache spark: A unified engine for big data processing,” Communications of the ACM, vol. 59, no. 11, pp. 56–65, 2016.
[43]
W. N. Venables, D. M. Smith, and R. C. Teamthe, An introduction to r: Notes on r: A programming environment for data analysis and graphics. 2020.
[44]
D. W. Walker and J. J. Dongarra, “MPI: A standard message passing interface,” Supercomputer, vol. 12, pp. 56–68, 1996.
[45]
J. McCarthy, “History of LISP,” in History of programming languages, 1978, pp. 173–185.
[46]
D. Eddelbuettel, “Parallel computing with r: A brief review,” arXiv preprint arXiv:1912.11144, 2019.
[47]
E. W. Dijkstra, “On the role of scientific thought,” in Selected writings on computing: A personal perspective, Springer, 1982, pp. 60–66.
[48]
H. Søndergaard and P. Sestoft, “Referential transparency, definiteness and unfoldability,” Acta Informatica, vol. 27, no. 6, pp. 505–517, 1990.
[49]
D. Norman, The design of everyday things: Revised and expanded edition. Basic books, 2013.
[50]
S. Weston and H. Oi, Using the foreach package. 2019.
[51]
O.-J. Dahl, “The birth of object orientation: The simula languages,” in From object-orientation to formal methods, Springer, 2004, pp. 15–25.
[52]
K. Bierhoff, “Api protocol compliance in object-oriented software,” CARNEGIE-MELLON UNIV PITTSBURGH PA SCHOOL OF COMPUTER SCIENCE, 2009.
[53]
M. J. Kane, J. Emerson, and S. Weston, “Scalable strategies for computing with massive data,” Journal of Statistical Software, vol. 55, no. 14, pp. 1–19, 2013.
[54]
R. Analytics, Rhdfs: R and hadoop distributed filesystem. 2013.
[55]
R. Analytics, Plyrmr: Data manipulation backed by Rmr2 and hadoop. 2014.
[56]
D. Schmidt and W.-C. Chen, pbdCS: pbdR client/server utilities.” 2015.
[57]
S. Weston, doMPI: Foreach parallel adaptor for the rmpi package. 2017.
[58]
R. Analytics, Rmr2: R and hadoop streaming connector. 2015.
[59]
M. Zheludkov, T. Isachenko, et al., High performance in-memory computing with apache ignite. Lulu. com, 2017.
[60]
T. Akidau et al., “MillWheel: Fault-tolerant stream processing at internet scale,” Proceedings of the VLDB Endowment, vol. 6, no. 11, pp. 1033–1044, 2013.
[61]
Inc. H2O.ai, “H2o 3.28.1.1 document. H2O architecture.” Mar. 2020.
[62]
J. W. Emerson and M. J. Kane, Biganalytics: Utilities for ’big.matrix’ objects from package ’bigmemory’. 2016.
[63]
M. J. Kane and J. W. Emerson, Bigtabulate: Table, apply, and split functionality for matrix and ’big.matrix’ objects. 2016.
[64]
Y. Zeng and P. Breheny, “The biglasso package: A memory-and computation-efficient solver for lasso model fitting with big data in r,” arXiv preprint arXiv:1701.05936, 2017.
[65]
F. Privé, H. Aschard, A. Ziyatdinov, and M. G. Blum, “Efficient analysis of large-scale genome-wide data with two r packages: Bigstatsr and bigsnpr,” Bioinformatics, vol. 34, no. 16, pp. 2781–2787, 2018.
[66]
C. L. Lawson, R. J. Hanson, D. R. Kincaid, and F. T. Krogh, “Basic linear algebra subprograms for fortran usage,” ACM Transactions on Mathematical Software (TOMS), vol. 5, no. 3, pp. 308–323, 1979.
[67]
J. Demmel, “LAPACK: A portable linear algebra library for supercomputers,” in IEEE control systems society workshop on computer-aided control system design, 1989, pp. 1–7.
[68]
J. Choi, J. J. Dongarra, R. Pozo, and D. W. Walker, “ScaLAPACK: A scalable linear algebra library for distributed memory concurrent computers,” in [Proceedings 1992] the fourth symposium on the frontiers of massively parallel computation, 1992, pp. 120–127.
[69]
M. Dowle and A. Srinivasan, Data.table: Extension of ‘data.frame‘. 2019.
[70]
M. Dowle, Introduction to data.table. 2019.
[71]
M. Dowle, Benchmarking data.table. 2019.
[72]
S. Urbanek and T. Arnold, Iotools: I/o tools for streaming. 2020.
[73]
D. Adler, J. Oehlschlägel, O. Nenadic, and W. Zucchini, Large atomic data in r package ’ff’. 2008.
[74]
E. de Jonge, J. Wijffels, and J. van der Laan, Ffbase: Basic statistical functions for package ’ff’. 2020.
[75]
J. Wijffels and E. de Jonge, Ffbase, statistical functions for large datasets. 2013.
[76]
E. de Jonge, Ffbase2: Dplyr for ’ff’. 2015.
[77]
Vertica, “DistributedR.” Dec. 2015.
[78]
M. Corporation and S. Weston, doParallel: Foreach parallel adaptor for the ’parallel’ package. 2019.
[79]
M. Corporation and S. Weston, doSNOW: Foreach parallel adaptor for the ’snow’ package. 2019.
[80]
H. Bengtsson, Future: Unified parallel and distributed processing in r for everyone. 2020.
[81]
H. Bengtsson, A future for r: A comprehensive overview. 2020.
[82]
H. Bengtsson, A future for r: Text and message output. 2020.
[83]
H. Bengtsson, A future for r: Future topologies. 2020.
[84]
H. Bengtsson, A future for r: Common issues with solutions. 2020.
[85]
H. Bengtsson, A future for r: Non-exportable objects. 2020.
[86]
H. Bengtsson, A future for r: Controlling default future strategy. 2020.
[87]
D. Vaughan and M. Dancho, Furrr: Apply mapping functions in parallel using futures. 2018.
[88]
R. Core, Package ’parallel’. 2018.
[89]
L. Tierney, A. J. Rossini, N. Li, and H. Sevcikova, SNOW: Simple network of workstations. 2018.
[90]
D. Schmidt, W.-C. Chen, S. L. de la Chapelle, G. Ostrouchov, and P. Patel, pbdBASE: pbdR base wrappers for distributed matrices.” 2020.
[91]
D. Schmidt, G. Ostrouchov, and W.-C. Chen, pbdML: Programming with big data – machine learning. 2020.
[92]
D. Schmidt and W.-C. Chen, Hpcvis: Plot utilities for MPI codes and hardware counters. 2020.
[93]
D. Schmidt, W.-C. Chen, G. Ostrouchov, and P. Patel, A quick guide for the pbdBASE package. 2020.
[94]
K. Kuo, Graphframes: Interface for ’GraphFrames’. 2018.
[95]
S. Venkataraman, X. Meng, F. Cheung, and T. A. S. Foundation, SparkR: R front end for ’apache spark’. 2020.
[96]
S. Venktaraman, SparkR - practical guide. 2019.
[97]
S. Venkataraman et al., “Sparkr: Scaling r programs with spark,” in Proceedings of the 2016 international conference on management of data, 2016, pp. 1099–1104.
[98]
J. Gorecki, Big.data.table: Parallel distributed computing for data.table. 2016.
[99]
M. Azure, “R server for HDInsight - r analytics.” 2016.
[100]
I. Inc., InfoSphere BigInsights big r. IBM Inc., 2014.
[101]
MAPR, “Industry’s next generation data platform for AI and analytics.” 2019.
[102]
N. Matloff, “Software alchemy: Turning complex statistical computations into embarrassingly-parallel ones,” Journal of Statistical Software, vol. 71, no. 4, pp. 1–15, 2016.
[103]
N. Matloff, “Partools: A sensible r package for large data sets.” Aug. 2015.
[104]
N. Matloff, Partools: A sensible r package for large data sets. University of California, Davis, 2017.
[105]
T. Lumley, Biglm: Bounded memory linear and generalized linear models. 2013.
[106]
H. Bengtsson, doFuture: A universal foreach parallel adapter using the future API of the ’future’ package. 2020.
[107]
H. Bengtsson, Future.batchtools: A future API for parallel and distributed processing using ’batchtools’. 2019.
[108]
M. Lang, B. Bischl, and D. Surmann, “Batchtools: Tools for r to work on batch systems,” The Journal of Open Source Software, no. 10, Feb. 2017.
[109]
H. Bengtsson, Future.apply: Apply function to elements in parallel using futures. 2020.
[110]
H. Bengtsson, Future.callr: A future API for parallel processing using ’callr’. 2019.
[111]
G. Csárdi and W. Chang, Callr: Call r from r. 2020.
[112]
J. Cheng, Promises: Abstractions for promise-based asynchronous programming. 2019.
[113]
S. Weston, Nesting foreach loops. 2019.
[114]
P. S. Foundation, “The python standard library documentation. Built-in types.” 2020.
[115]
B. W. Lewis, doRedis: ’Foreach’ parallel adapter using the ’redis’ database. 2020.
[116]
B. Hoang, doAzureParallel: doAzureParallel. 2020.
[117]
R. Gaujoux, doRNG: Generic reproducible parallel backend for ’foreach’ loops. 2020.
[118]
D. ZJ, “Simple dplyr verbs and lazy evaluation disk.frame.” 2019.
[119]
D. ZJ, “Custom one-stage group-by functions disk.frame.” 2019.
[120]
H. W. Wes McKinney, “Wesm/feather: Feather: Fast, interoperable binary data frame storage for python, r, and more powered by apache arrow.” 2016.

  1. Use of HDFS through rhdfs[54], and MapReduce through rmr2[55]↩︎

  2. Use of Spark[26]↩︎

  3. Distributed computation performed by pbdMPI, with support for several remote messaging protocols[22], [56]↩︎

  4. Through the use of additional packages such as doMPI and sparklyr[[57]][26]↩︎

  5. rmr2[58] allows for arbitrary R code to be executed through MapReduce↩︎

  6. Provides mutate() function to enable user-defined code, however there are limitations in not being capable of parsing global variables↩︎

  7. Adhering to a SPMD paradigm↩︎

  8. Many functions used for grouped summarisation are only estimates, such as median[15]↩︎

  9. %dopar% accepts any expression, and tries its best to handle references to global variables, however it is still recommended to manually define the global references as well as packages used↩︎

  10. See doc/review-sparklyr-iteration.tex↩︎

  11. foreach makes use of iterators, which can be defined to perform recurrance relations (see doc/review-foreach.tex, subsection “Form of Iteration”) but these rely on closures and may in fact be slower than serial relations↩︎

  12. See [^2x3]↩︎

  13. Direct access to HDFS through rhdfs[54]↩︎

  14. Allows for Spark over HDFS, but offers no HDFS-specific filesystem manipulation functions↩︎

  15. Through sparklyr as a backend↩︎

  16. Source repositories only exist on GitHub, following a non-standard package structure at the root level, and Hadoop is required to be set up beforehand↩︎

  17. Installs from CRAN, requires Spark set up beforehand and environmental variables configured↩︎

  18. Installation can be performed with install.packages() alongside the installation of openmpi externally↩︎

  19. Inter-node communication facilitated through pbdMPI wrappers to standard MPI communication functions such as scatter, gather, send, etc.↩︎

  20. While the collection is separate from Hadoop, it is entirely tied to Hadoop and MapReduce, and can’t be switched to any other distributed platform↩︎

  21. Package tied to Spark as evaluative backend↩︎

  22. Tied to the usage of the MPI protocol↩︎

  23. Within the mapreduce() function from rmr2, no prescription is given for any particular class over another↩︎

  24. S3 Objects that have an sdf_import() method implemented can make use of the sdf_copy_to() function to copy objects from R to Spark↩︎

  25. Arbitrary classes may be made use of and passed through the communicator generics when methods are defined for them, using pbdMPI↩︎

  26. foreach makes use of iterator objects, which any class can inherit from to define nextElem()↩︎

  27. rmr2 has the package-specific mapreduce() function as the primary interface↩︎

  28. pbdMPI provides package-specific generics to use and define further methods for↩︎

  29. pbdDMAT provides a distributed matrix class with methods defined to make transparent usage of standard matrix manipulation generics↩︎

  30. The collection has suffered from the lack of updates; plyrmr provides functionality that is near-transparent to plyr, but this is still some distance from dplyr[55].↩︎

  31. The principal interaction is via dplyr generics, albeit with a difference of lazy evaluation↩︎