2020-05-14
Given that iteration is cited by a principal author of Spark as a motivating factor in it’s development when compared to Hadoop, it is reasonable to consider whether the most popular R interface to Spark, sparklyr, has support for iteration[1], [2]. One immediate hesitation to the suitability of sparklyr to iteration is the syntactic rooting in dplyr; dplyr is a “Grammar of Data Manipulation” and part of the tidyverse, which in turn is an ecosystem of packages with a shared philosophy[3], [4]. The promoted paradigm is functional in nature, with iteration using for loops in R being described as “not as important” as in other languages; map functions from the tidyverse purrr package are instead promoted as providing greater abstraction and taking much less time to solve iteration problems. Maps do provide a simple abstraction for function application over elements in a collection, similar to internal iterators, however they offer no control over the form of traversal, and most importantly, lack mutable state between iterations that standard loops or generators allow[5]. A common functional strategy for handling a changing state is to make use of recursion, with tail-recursive functions specifically referred to as a form of iteration in [6]. Reliance on recursion for iteration is naively non-optimal in R however, as it lacks tail-call elimination and call stack optimisations[7]; at present the elements for efficient, idiomatic functional iteration are not present in R, given that it is not as functional a language as the tidyverse philosophy considers it to be, and sparklyr’s attachment to the the ecosystem prevents a cohesive model of iteration until said elements are in place.
Iteration takes place in Spark through caching results in memory,
allowing faster access speed and decreased data movement than
MapReduce[1].
sparklyr can use this functionality through the tbl_cache()
function
to cache Spark dataframes in memory, as well as caching upon import with
memory=TRUE
as a formal parameter to sdf_copy_to()
.
Iteration can also make use of persisting Spark Dataframes to memory,
forcing evaluation then caching; performed in sparklyr through sdf_persist()
.
The Babylonian method for calculating a square root is a simple iterative procedure, used here as an example. A standard form in R with non-optmimised initial value is given in listing lst. 1.
Listing 1: Simple Iteration with the Babylonian Method
<- function(S, frac_tolerance=0.01, initial=1){
basic_sqrt <- initial
x while(abs(x\^2 - S)/S > frac_tolerance){
<- (x + S/x)/2
x
}
x }
This iterative function is trivial, but translation to sparklyr is not entirely so.
The first aspect that must be considered is that sparklyr works on
Spark Data Frames; x
and S
must be copied to Spark with the
aforementioned sdf_copy_to()
function.
The execution of the function in Spark is the next consideration, and
sparklyr provides two means for this to occur; spark_apply()
evaluates arbitrary R code over an entire data frame. The means of
operation vary across Spark versions, ranging from launching and running
RScripts in Spark 1.5.2, to Apache Arrow conversion in Spark 3.0.0. The
evaluation strategy of 1.5.2 is unsuitable in this instance as it is
excessive overhead to launch RScripts every iteration. The other form of
evaluation is through using dplyr generics, which is what will be made
use of in this example.
An important aspect of consideration is that sparklyr methods for
dplyr generics execute through a translation of the formal parameters to
Spark SQL. This is particularly relevant in that separate Spark Data
Frames can’t be accessed together as in a multivariable function. In
addition, very R-specific functions such as those from the stats
and matrix
core libraries are not able to be
evaluated, as there is no Spark SQL cognate for them. The SQL query
generated by the methods can be accessed and “explained” through show_query()
and
explain()
respectively; When attempting to combine two Spark Data Frames in a
single query without joining them, show_query()
reveals
that the Data Frame that is referenced through the .data
variable is translated, but the other
Data Frame has it’s list representation passed through, which Spark SQL
doesn’t have the capacity to parse; an example is given in listing
lst. 3 (generated through listing lst. 2), showing an attempt to create a new column from
the difference between two seperate Data Frames
Listing 2: Attempt in R to form new column from the difference between two separate Spark data frames `S`{.R} and `x`{.R}
show_query(mutate(S, S = S - x)
Listing 3: Spark SQL query generated from attempt to form the difference from two seperate data frames
SELECT `S` - list(con = list(master = "yarn", method = "shell", app_name = "sparklyr", config = list(spark.env.SPARK_LOCAL_IP.local = "127.0.0.1", sparklyr.connect.csv.embedded = "\^1.*", spark.sql.legacy.utcTimestampFunc.enabled = TRUE, sparklyr.connect.cores.local = 4, spark.sql.shuffle.partitions.local = 4), state = <environment>, extensions = list(jars = character(0), packages = character(0), initializers = list(), catalog_jars = character(0)), spark_home = "/shared/spark-3.0.0-preview2-bin-hadoop3.2", backend = 4,
monitoring = 5, gateway = 3, output_file = "/tmp/Rtmpbi2dqk/file44ec187daaf4_spark.log", sessionId = 58600, home_version = "3.0.0")) AS `S1`, `S` - list(x = "x", vars = "initial") AS `S2`
FROM `S`
Global variables that evaluate to SQL-friendly objects can be passed and are evaluated prior to translation. An example is given through listing lst. 4, generated through listing lst. 5, where the difference between a variable holding a numeric and a Spark Data Frame is translated into the evaluation of the variable, transformed to a float for Spark SQL, and its difference with the Spark Data Frame, referenced directly.
Listing 4: Spark SQL query generated from attempt to form the difference between a data frame and a numeric
SELECT `S` - 3.0 AS `S`
FROM `S`
Listing 5: Capacity in sparklyr to form new column from the difference between a spark data frame and a numeric
S# Source: spark<S> [?? x 1]
# S
# <dbl>
# 9
= 3
x mutate(S, S = S - x)
# Source: spark<?> [?? x 1]
# S
# <dbl>
# 6
A reasonable approach to implementing a Babylonian method in sparklyr
is then to combine S
and x
in one dataframe, and iterate within
columns.
Listing 6: Babylonian method implementation using sparklyr
library(sparklyr)
<- spark_connect(master = "yarn")
sc
<- function(S, sc, frac_tolerance=0.01, initial=1){
sparklyr_sqrt = sdf_copy_to(sc,
bab data.frame(x=initial, S=S, unfinished=TRUE),
"bab", memory = TRUE, overwrite = TRUE)
while(any(collect(bab)\$unfinished)){
compute(mutate(bab, x = (x + S/x)/2,
unfinished = abs(x^2 - S)/S > frac_tolerance),
"bab")
}collect(bab)$x
}
sparklyr is excellent when used for what it is designed for. Iteration, in the form of an iterated function, does not appear to be part of this design; this was clear in the abuse required to implement a simple iterated function in the form of the Babylonian Method. Furthermore, all references to “iteration” in the primary sparklyr literature refer either to the iteration inherent in the inbuilt Spark ML functions, or the “wrangle-visualise-model” process popularised by Hadley Wickham[4], [8]. None of such references connect with iterated functions.
Thus, it is fair to conclude that sparklyr is incapable of sensible iteration of arbitrary R code beyond what maps directly to SQL; even with mutate, it is a very convoluted interface for attempting any iteration more complex than the Babylonian Method. Implementation of a GLM function with sparklyr iteration was initially planned, but the point was already proven by something far simpler, and the point is one that did not need to be laboured.
Ultimately, sparklyr is excellent at what it does, but convoluted and inefficient when abused, as when attempting to implement iterated functions.