Home

A Review of Iteration with sparklyr

Jason Cairns

2020-05-14

1 Introduction

Given that iteration is cited by a principal author of Spark as a motivating factor in it’s development when compared to Hadoop, it is reasonable to consider whether the most popular R interface to Spark, sparklyr, has support for iteration[1], [2]. One immediate hesitation to the suitability of sparklyr to iteration is the syntactic rooting in dplyr; dplyr is a “Grammar of Data Manipulation” and part of the tidyverse, which in turn is an ecosystem of packages with a shared philosophy[3], [4]. The promoted paradigm is functional in nature, with iteration using for loops in R being described as “not as important” as in other languages; map functions from the tidyverse purrr package are instead promoted as providing greater abstraction and taking much less time to solve iteration problems. Maps do provide a simple abstraction for function application over elements in a collection, similar to internal iterators, however they offer no control over the form of traversal, and most importantly, lack mutable state between iterations that standard loops or generators allow[5]. A common functional strategy for handling a changing state is to make use of recursion, with tail-recursive functions specifically referred to as a form of iteration in [6]. Reliance on recursion for iteration is naively non-optimal in R however, as it lacks tail-call elimination and call stack optimisations[7]; at present the elements for efficient, idiomatic functional iteration are not present in R, given that it is not as functional a language as the tidyverse philosophy considers it to be, and sparklyr’s attachment to the the ecosystem prevents a cohesive model of iteration until said elements are in place.

2 Iteration

Iteration takes place in Spark through caching results in memory, allowing faster access speed and decreased data movement than MapReduce[1]. sparklyr can use this functionality through the tbl_cache() function to cache Spark dataframes in memory, as well as caching upon import with memory=TRUE as a formal parameter to sdf_copy_to(). Iteration can also make use of persisting Spark Dataframes to memory, forcing evaluation then caching; performed in sparklyr through sdf_persist().

The Babylonian method for calculating a square root is a simple iterative procedure, used here as an example. A standard form in R with non-optmimised initial value is given in listing lst. 1.

Listing 1: Simple Iteration with the Babylonian Method

basic_sqrt <- function(S, frac_tolerance=0.01, initial=1){
    x <- initial
    while(abs(x\^2 - S)/S > frac_tolerance){
        x <- (x + S/x)/2
    }
    x
}

This iterative function is trivial, but translation to sparklyr is not entirely so.

The first aspect that must be considered is that sparklyr works on Spark Data Frames; x and S must be copied to Spark with the aforementioned sdf_copy_to() function.

The execution of the function in Spark is the next consideration, and sparklyr provides two means for this to occur; spark_apply() evaluates arbitrary R code over an entire data frame. The means of operation vary across Spark versions, ranging from launching and running RScripts in Spark 1.5.2, to Apache Arrow conversion in Spark 3.0.0. The evaluation strategy of 1.5.2 is unsuitable in this instance as it is excessive overhead to launch RScripts every iteration. The other form of evaluation is through using dplyr generics, which is what will be made use of in this example.

An important aspect of consideration is that sparklyr methods for dplyr generics execute through a translation of the formal parameters to Spark SQL. This is particularly relevant in that separate Spark Data Frames can’t be accessed together as in a multivariable function. In addition, very R-specific functions such as those from the stats and matrix core libraries are not able to be evaluated, as there is no Spark SQL cognate for them. The SQL query generated by the methods can be accessed and “explained” through show_query() and explain() respectively; When attempting to combine two Spark Data Frames in a single query without joining them, show_query() reveals that the Data Frame that is referenced through the .data variable is translated, but the other Data Frame has it’s list representation passed through, which Spark SQL doesn’t have the capacity to parse; an example is given in listing lst. 3 (generated through listing lst. 2), showing an attempt to create a new column from the difference between two seperate Data Frames

Listing 2: Attempt in R to form new column from the difference between two separate Spark data frames `S`{.R} and `x`{.R}

show_query(mutate(S, S = S - x)

Listing 3: Spark SQL query generated from attempt to form the difference from two seperate data frames

SELECT `S` - list(con = list(master = "yarn", method = "shell", app_name = "sparklyr", config = list(spark.env.SPARK_LOCAL_IP.local = "127.0.0.1", sparklyr.connect.csv.embedded = "\^1.*", spark.sql.legacy.utcTimestampFunc.enabled = TRUE, sparklyr.connect.cores.local = 4, spark.sql.shuffle.partitions.local = 4), state = <environment>, extensions = list(jars = character(0), packages = character(0), initializers = list(), catalog_jars = character(0)), spark_home = "/shared/spark-3.0.0-preview2-bin-hadoop3.2", backend = 4,
    monitoring = 5, gateway = 3, output_file = "/tmp/Rtmpbi2dqk/file44ec187daaf4_spark.log", sessionId = 58600, home_version = "3.0.0")) AS `S1`, `S` - list(x = "x", vars = "initial") AS `S2`
FROM `S`

Global variables that evaluate to SQL-friendly objects can be passed and are evaluated prior to translation. An example is given through listing lst. 4, generated through listing lst. 5, where the difference between a variable holding a numeric and a Spark Data Frame is translated into the evaluation of the variable, transformed to a float for Spark SQL, and its difference with the Spark Data Frame, referenced directly.

Listing 4: Spark SQL query generated from attempt to form the difference between a data frame and a numeric

SELECT `S` - 3.0 AS `S`
FROM `S`

Listing 5: Capacity in sparklyr to form new column from the difference between a spark data frame and a numeric

S
# Source: spark<S> [?? x 1]
#      S
#  <dbl>
#     9
x = 3
mutate(S, S = S - x)
# Source: spark<?> [?? x 1]
#      S
#  <dbl>
#     6

A reasonable approach to implementing a Babylonian method in sparklyr is then to combine S and x in one dataframe, and iterate within columns.

Listing 6: Babylonian method implementation using sparklyr

library(sparklyr)

sc <- spark_connect(master = "yarn")

sparklyr_sqrt <- function(S, sc, frac_tolerance=0.01, initial=1){
        bab = sdf_copy_to(sc,
                          data.frame(x=initial, S=S, unfinished=TRUE),
                          "bab", memory = TRUE, overwrite = TRUE)
    while(any(collect(bab)\$unfinished)){
                compute(mutate(bab, x = (x + S/x)/2,
                               unfinished = abs(x^2 - S)/S > frac_tolerance),
                        "bab")
        }
        collect(bab)$x
}

3 Conclusion

sparklyr is excellent when used for what it is designed for. Iteration, in the form of an iterated function, does not appear to be part of this design; this was clear in the abuse required to implement a simple iterated function in the form of the Babylonian Method. Furthermore, all references to “iteration” in the primary sparklyr literature refer either to the iteration inherent in the inbuilt Spark ML functions, or the “wrangle-visualise-model” process popularised by Hadley Wickham[4], [8]. None of such references connect with iterated functions.

Thus, it is fair to conclude that sparklyr is incapable of sensible iteration of arbitrary R code beyond what maps directly to SQL; even with mutate, it is a very convoluted interface for attempting any iteration more complex than the Babylonian Method. Implementation of a GLM function with sparklyr iteration was initially planned, but the point was already proven by something far simpler, and the point is one that did not need to be laboured.

Ultimately, sparklyr is excellent at what it does, but convoluted and inefficient when abused, as when attempting to implement iterated functions.

[1]
M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, I. Stoica, et al., “Spark: Cluster computing with working sets.” HotCloud, vol. 10, no. 10–10, p. 95, 2010.
[2]
J. Luraschi, K. Kuo, K. Ushey, J. Allaire, and T. A. S. Foundation, Sparklyr: R interface to apache spark. 2020.
[3]
H. Wickham et al., “Welcome to the tidyverse,” Journal of Open Source Software, vol. 4, no. 43, p. 1686, 2019.
[4]
H. Wickham and G. Grolemund, R for data science: Import, tidy, transform, visualize, and model data. " O’Reilly Media, Inc.", 2016.
[5]
G. Cousineau and M. Mauny, The functional approach to programming. Cambridge University Press, 1998.
[6]
H. Abelson, G. J. Sussman, and J. Sussman, Structure and interpretation of computer programs. Justin Kelly, 1996.
[7]
R. C. Team, R language definition. 2020.
[8]
J. Luraschi, Mastering spark with r : The complete guide to large-scale analysis and modeling. Sebastopol, CA: O’Reilly Media, 2019.