Experiment: Eager Distributed Object Supplement

Jason Cairns


1 Introduction

This report serves to document the development status and associated evaluation of the eager distributed object architecture successive to the previous report, The Precursory Report. The prior report was necessarily largely speculative, considerate of a number of potential architectural choices, with relatively equal weighting; now that non-trivial further development has taken place, more concrete evaluation can be produced, and alternatives can be compared against what has been explicitly observed to work and not work. In order to maintain direction in this subsequent development, a distributed statistical model was developed concurrently, making use of the distributed object as described in this report. This is documented in detail in the report, Experiment: Distributed Decision Tree. In addition, several new and prior unconsidered aspects have arisen; following a full description of the current system as well as elaboration on changes since the previous report, an evaluation of the architecture is given, with suggestions for future research given as part of the conclusion.

2 Current System Overview

Functions are provided to start the instances through hostnames, to connect to the servers, and to kill the instances. A “cluster” object, akin to the SNOW cluster object, serves as a reference with which to declare distributed objects upon. Distributed objects exist conceptually as the actual data split across RServe instances, and a reference containing the minimum of necessary information existing on the users R session. The users R session thus serves as the master node in this distributed system. Importantly, the object references are registered with reg.finalize to run a cleanup on the hosts of the data when being garbage collected themselves. Distributed objects are formed from two possible sources; master side, or slave side. Formation on the master side involves taking an existing R object, splitting it up according to some function (currently splitting according to most even element distribution), and sending the splits to their associated RServe instances, described in a supplied cluster object formal parameter to the creation function. All of the splits are assigned a name on the server side, stored in the distributed object reference. It is an unlikely user-end scenario that the big data is small enough to fit on the host in order to send it, so reading in data on the worker end, then feeding information back to produce a reference, is the other means of producing a distributed object. At present, this takes the form of a read.distributed.csv call that forwards on most of the arguments to server-side read.csv functions. The nodes then measure the number of rows on their chunk, sending back the information to inform the creation of a reference. Beyond the storage of data larger than the user-end memory, distributed objects can mimic standard R objects, with an example being distributed vectors having Ops defined. Operations between vecto #forwarded on to the hosts, along with the names of their associated chunks, for the hosts to perform at the distributed end.} The results are assigned to a provided UUID, and a distributed vector reference is returned on the master session, pointing to the new chunks that have been created under the ID. Operations between distributed vectors and non-distributed vectors take place through distributing the non-distributed vectors, and recursing on the operation call, along with the original distributed vector and the newly created distributed vector. An important aspect to operations between vectors is that the actual processing of operations at corresponding locations between vectors necessarily requires the relevant elements to exist on the same RServe instance. This creates a complication in operations between vectors of different lengths, or distributed vectors with corresponding elements on different nodes. In this case the scalar is distributed to all of the hosts of the distributed vector, with the distributed end running the operation on the chunk and scalar, providing recycling equivalent to a non-distributed object due to scalar length being a multiplicative identity over the length of a vector. Beyond operations between vectors, some common functions of vector formal parameters have been implemented for distributed vectors. The nature of the returned object is of great importance in providing consistency and transparency, along with reasonable performance; such goals are often in tension against each other.

Functions returning distributed vectors are arguably the more transparent among the functions acting on distributed vectors, in the sense that they return the same degree of distributution as their input, as regular R functions implicitly do. Such functions include the head function as well as simple surjective functions.

Data often requires a secondary function acting in a reductive capacity over combined data resulting from initial node-specific operations. The resultant algorithms are typically not possible to entirely parallelise, and the reductive locus in this distributed system is set to be the user-side master R session. As an example, the unique function attains the unique elements within an array; it is implemented in MapReduce form of unique operations within each node, followed by a unique over their combined output within the master node. In this way, the amount of data moved is minimised through initial reduction. As the resulting data exists in a non-distributed form on the master, it is returned in that form without forcing distribution. It is a trivial matter to redistribute the data, but this is not presumed on behalf of the system.

Data Frames are implemented in distributed form in a very similar manner to vectors, their only difference being a row-wise splitting, compared to and element-wise splitting. On the nodes, the data frame chunks are in fact fully functional data frames, along with the full set of attributes.

Selection and subsetting of distributed data frames and vectors are limited to subsetting by distributed and non-distributed logical vectors, and local consecutive numeric vectors. Specification of columns for data frames can take any standard form.

Of the different forms, subsetting by a distributed logical vector is the most straightforward; simply directing each node to subset the node-local object chunk by the node-local logical vector chunk, assuming alignment of the two. The resulting subset is then measured (if even existing), with the information sent back to the master as a new distributed object reference.

Subsetting through a local logical vector requires distributing the vector in alignment to the object, then carrying out subsetting as described by a distributed logical vector.

Local consecutive numeric vectors rely on the consecutive nature of object distribution in their subsetting of objects. The elements of the vector are compared to the known object indices on each host, then translated into the equivalent node-specific indices for each element. To illustrate: a host containing elements 100 to 150 will have the example selection 100:130 translated to 1:30, as these are the equivalent elements relative to that specific node.

3 Description of System Changes

In comparison with the previous report, the changes made are fairly extensive. Beyond the obvious additions alluded to in sec. 2 that weren’t previously present, some more notable changes are described in this section.

Memory was considered a significant issue in the system at the time of the previous report. The primary issues have been hugely reduced through the usage of server-side UUID’s for chunk assignment, and finalizer functions to remove the remote chunks upon master-side garbage collection. This massively reduces the virtual memory leaks, but some issues remain, discussed in section sec. 4

Previous operations on chunks took place sequentially within an lapply. This has been replaced with a semi-parallel function where the operations are sent as directives to the nodes without waiting for the completion of the respective operations. This is then immediately followed by a makeshift barrier function which effectively polls for completion of all the operations. Regardless of the form of these operations, issues still remain, again discussed in section sec. 4.

Significant steps towards interaction between objects of different shapes and locations have taken place, with recycling of length one vectors in operations and space for a formal alignment method now implemented.

Perhaps most significant change in terms of user experience, though only a surface-level layer in implementation, manual communication syntax has been reduced in favour of new syntax that adheres closer to a declarative and more R-like paradigm. Concretely, send and receive have been replaced with as.distributed and the empty subset function ([]) respectively. This is matched with an increasing developmental attitude of user-end simplicity.

4 Architectural Evaluation

The experimental system has yielded significant information at this point such that judgements can be made on successes and failures, as well as possible solutions.

The central success of this approach is that to a large extent it does actually work. As a case in point, the Bureau of Transportation’s flights [1] dataset was loaded in its entireity as a distributed data frame, taking up a combined 17Gb and 119 million rows. This was able to be manipulated and subset in great speed, along with operations and reductive functions such as table run upon it.

In close second to this success is the transparency inherent in the system. Every single operation exactly mirrors the standard R syntax for equivalent operations, beyond the cluster setup and act of distribution.

There still remain some important issues to solve before claiming any major success to such an approach; beyond these, some issues appear inherent to this approach to a distributed system, and will inform later experimentation in other forms of distributed systems.

The act of object creation through operations on existing distributed objects has already been the source of some issue. First, a success in this regard has been the lack of a need for cluster-specific directives from the user in the creation, taking instead the information already encapsulated in the existing objects to form the necessary data to be used in the generation of new distributed objects. However, the disconnect between creating an object in the cluster and returning a handle has already lead to errors; interruption in the space within this disconnect through errors or even garbage collection can lead to objects still being created on the worker nodes, but no handle returned. This is another virtual memory leak, with a possible solution being to encapsulate existence of the chunks of an object within its reference, with a future-like resolve attribute indicating this.

Interaction of non-aligned vectors is lacking beyond the few cases implemented, such as the recycling of length one vectors. This leaves significant room for improvement, along with many potential means to enact this improvement. The resolution of this issue would result in two immediate features: non-consecutive numerical indexing, as well as smoother operations between two objects existing in different locations or shapes. A possible solution to this is the encapsulation of row-number with chunks, and requests for selections or alignments being sent to all hosts, with only those containing the relevant elements responding.

The return type of functions on distributed vectors is not very consistent, as alluded to in sec. 2. Some operations yield distributed objects, while others yield local. It is fairly clear from an implementation perspective which return type is simplest, however from a user perspective it only appears inconsistent. It is a trivial matter to distribute the local objects, thereby setting all functions to return distributed, but it a potentially unnecessary and costly operation.

A very common operation is to check the type of objects in R, including related operations such as checking is.na and all.equal. When interacting with distributed objects, it is not entirely clear how this should be performed. For example, the class of every distributed vector is distributed.vector. This is the case regardless of the class of the underlying chunks that make it up. Even if the chunks are all logical, a call to is.logical will return false on the distributed vector, as it should. However, this leads to breaking transparency, as an equivalent non-distributed vector will yield true. A possible solution could be to have finer distributed vector classes that couple with their underlying types, and inherit from a parent distributed.vector class.

The nature of the connections to RServe instances have proven exceedingly efficient to work with, and have performed extremely well despite not being used as designed. The problem of inter-node communication was already covered in the initial report. In addition to this, some further issues have appeared, including the difficulty of debugging errors occurring at the server end, as well as occasional and somewhat unpredictable connections dropping 1. Likely, a more mature distributed object system will require the creation of something like an app manager at some point, replacing the current RServe form of connection. In the mean time, an abstraction layer for communication will ease any potential transition to such a communication system.

In a similar vein to the connection issues, part of the problem is that there always will be uncontrollable connection issues in some form, and most distributed systems take this into account, offering varying degrees of resilience to hardware and connection failure. It would be worth considering how this can be integrated into this experiment, or whether it is of a scale to be an experiment of its own; regardless, it is possibly time to start taking note of this.

Finally, speed is something that certainly has plenty of room for improvement. Premature optimisation was very consciously avoided, being the root of all evil and all. In spite of this, many operations, such as selection and subsetting, take place extremely fast due to the design of the system. Some operations are significantly slower, such as table, to the point of slowing down the distributed decision tree to unusability for big data. With profiling and optimisation this can very likely be significantly improved.

5 Conclusion

The experiment has yielded very functional distributed vector and data frame objects, and raised a high volume of information relating to optimal construction, interaction, and architecture of such a system. Key successes as well as failings have been enumerated, mostly paired with potential solutions. The volume of issues and solutions created since the initial report demonstrates the success of the experiment thus far in exploring the nature of this specific eager distributed object approach to large-scale statistical modelling using R.

U. S. B. of Transportation, Data Expo 2009: Airline on time data.” Harvard Dataverse, 2008.

  1. It appears somehow related to the occurance of thrashing on the server side↩︎