2022-10-18
In order to perform calculations on larger-than-memory data, we need some means of representing the data, in order for it to be tangible and useful. Let us start at the beginning, where we accept that some region of memory in the computer must hold a form of the data. We label such a region an object, and maintain that it is general enough that it could very well be a function, a variable, an instance of a class, or a reference to another object, among a multitude of other possibilities. Given that the whole data is larger than memory, it can’t exist as a regular object taking up some contiguous region of memory, and must necessarily take some other form, which at the moment can be left for later description. Irrespectively, the data exists as an abstract object, albeit one that is somewhat irregular.
With respect to our motivating example, the NYC taxi data must necessarily exist in memory somewhere.
All objects, regular or irregular, are facilitated in their access and manipulation by way of an object system, which is an organised manner of interaction with objects. We choose the description of an object system as the basis for the theoretical development of our project as it is that aspect which is most proximal to the constraint imposed by the scale of data dealt with, in being that aspect which defines how data is dealt with. We may begin this description by asking what may be intended of an object system within the scope of this project. In order to answer this, we must first understand precisely who the audience is for this project. Different audiences necessarily seek different interactions with objects, through their differing emphases and required end-results. It may be that multiple audiences require multiple object systems, or that one object system may be sufficiently general to please all users.
With the consideration that our audience is to be working statisticians, as per the initial scope, there still remains some variation within this greater audience that can be described as separate audiences. A notable split is that of depth of use of the program provided by the project. Depth of use separates developers, who will take the physical form of the data into consideration, from users, who simply want to use the data. Alternative splits exist, and a pattern emerges among them of a distinction between power-users and end-users. Without holding this split as the defining audience differentiator, its existence proves that more than a singular monolithic audience needs consideration.
Therefore, the object system must engage multiple audiences. This endeavour may take several different forms, through the existence of multiple object systems, or one singular object system. Such a component will necessarily be driven by the experience of use, but it is worthwhile to explore these options in greater detail prior to implementation. Considering a single object system first, there is the difficulty of appeasing multiple audiences simultaneously. This object system may favour one set of audiences over another, which in turn decreases the value of this project for those audiences unfavoured by the system. Using multiple object systems, with each geared to a particular audience, may overcome such a difficulty, though raises another question, in how the systems relate to each other. As an example solution, with power-users and end-users as illustrative audiences, consider the standard approach taken within software engineering, of layering. In this case, the power-user sits at a layer below that of the end-user, in the sense that the power-user is closer to the data with less abstraction. The abstraction is then what changes the object system of the power-user into one interfaced with by the end-user. The assumption in this case is that the end-user object system builds upon that of the power-user. This aligns with standard software engineering practice to build more user-friendly tools out of more powerful ones. This layering effect also provides a separation of concerns, where separate concerns (logical components) are entirely encapsulated from each other. The separation enables the program to be developed independently at each layer, aiding maintenance as well as design.
Such separation of concerns and layering in the object system may take it’s embodiment in the representative objects offered by each layer. The highest layer is that in which the user has no concern for the representation of the object on the disk. Here, the object is no more than a vessel for whatever data the user seeks to hold, with no thought toward the implementation details. Such an object would define the higher layer of the object system, and would necessarily be supported by lower layers. Compare this with the lowest layer, in which the implementation of this data container could be manipulated directly. This requires a consideration of the nature of the objects, now that implementation details are important. The central constraint is that the data is larger than computer memory. This, coupled with the fact that they must be worked on in-memory, leads to an impossible design if combined naively. The only possible means is to split the data into smaller pieces that do fit in memory. Such a structure, common among big data systems, is known as a shard, or a chunk. This is the only possible result to such a limitation, though it may have many manifestations; we leave such details of implementation to follow experimentation, keeping the description as high-level as possible for now.
In order to understand chunks, the notion of
references must be first understood: A reference is
some object that acts as a means of connection to some other object,
known as a referent. The reference is a value in
itself, and is likewise referred to by some symbol, as any other object.
The reference serves as a means of indirection, and takes many forms;
for instance, the hyperlink, or the pointer. References may be
“dereferenced” in order to access the other object that it leads to as a
value - clicking the hyperlink, or *
ing the pointer.
On a system where referents do not necessarily exist in the same memory space as the reference, we may say that the reference is local, and the referent remote. Upon being dereferenced, the referent is pulled into the same memory space as the reference and therefore also becomes local. Why would references be used, when direct access to the underlying object may be thought more straightforward? One reason is specific to the local-remote distinction; as the only possible means of interacting with remote objects.
For instance, were subsets of a too-large-for-memory dataset to be interacted with, they must be distributed, over space or time. Over space, each subset may sit on a separate node in a computing cluster, within it’s own memory. Over time, each subset may be pulled into the memory of a single node and operated on sequentially, without all subsets existing in the same memory space simultaneously. Each subset is therefore remote to whatever may be controlling their total operations from some central position, and references provide a means of access to each subset. Chunks in all systems are likewise dependent on references, for their capacity to stand in as proxies to the underlying data subset.
The chunk thus serves as the lowest level object manipulable by the user, and defines the lower level of the layered object system. Based on the lower layer serving as a basis for the higher layer, chunks would be used to create the abstract object interacted with by the end-user unconcerned with implementation details; the end-user not interested in the chunks making the object. With the high-level object composed of chunks, it therefore forms some variation of a distributed object. Whether the chunks are necessarily physically dispersed, or accessed from the same location at differing timepoints is a question to be settled later - the overall object is irrespectively distributed over space or time. The power-user would therefore have access to chunks, and the end-user access to the objects composed of chunks, a generalised distributed object, whose manner of distribution may remain undefined for now. Within this range, there may be intermediate objects. For instance, there may be some object which serves as a container for chunks, though still retains access to implementation-specifics of chunks, without hiding such information as the abstract distributed object does. Such an object may be defined by widely differing container shapes, such as a vector, matrix, array, or some other nonlinear form.
Coming back to the example analysis introduced earlier, we may consider that the NYC Taxi data, being a heterogeneous table, would find its optimal existence as an abstract dataframe. Implementation-wise, the underlying chunks would have to split along some dimensions of the dataframe. Given that each row is independent, splitting along rows is a natural point of separation. Were the size of each chunk small enough to fit in memory, this would serve as a sufficient description for chunking. Assuming that the dataset is stored somewhere on disk, at the low level it would have to be read in as chunks. This may take the following form:
<- read.chunks(nyc_connection, chunksize=x_mB) nyc_taxi_chunk_collection
All of the chunks can’t simultaneously exist in memory on one single
computer, for the same reason that the entire dataset can’t. Thus this
collection of chunks will be a set of pointers to pieces of the dataset
across time or space. This low-level form may be abstracted over in a
read.csv
, or dbConnect
, serving as just one
particular method which allows for chunks.
The key result of such objects, beyond the multiple audiences allowed for, is that potentially arbitrary data structures may be held in chunks, or distributed objects. This underlying data is therefore able to be specified at the chunk level, and were the system to allow for arbitrary data, special means would be required to interact with it. Specifically, the underlying data will at some points be interacted with directly, and for the system to maintain sufficient generality that the data may take any form, the interaction points would have to be well-specified and extensible to allow correct behaviour at interaction. This would have to be a specifically considered and described component of the object system. Experience will determine how this may best be afforded, whether through polymorphism through an object-oriented system, untyped procedures, or various other means.
Some basic necessities of interaction are reads and writes. Reads are
means of access to the data underlying chunks. While it is not possible
to access the full dataset directly, each chunk may be summarised to a
degree sufficiently small enough to read in and use as a regular object,
through some massive dimensionality reducing operation, such as a
max
or sum
. For example, when looking for a
max
, a chunk consisting of a billion integers is
transformed into a chunk of a single integer. This single integer may be
pulled out of the chunk and managed as a regular integer, through some
read operation, which we name an emerge. The operation
is equivalent to accessing the value of some data held in a container
such as a list, though in this case the important distinction is the
movement of the value from remote to local memory space. Take the
following example code as illustration:
print(d1)
# 1 chunk:
# int [1:1E9] 304 12348 -5899 ...
Here, we have the symbol d1
specifying a reference to
some chunk, which is a subset of some unspecified greater whole. The
chunk is an integer vector of length one billion and exists remotely,
while the reference to the chunk is local and immediately accessible
through the symbol d1
. To interact with the integer vector
directly, it may be emerged, which pulls the vector into local memory
space as a standard, regular, integer vector. In the following listing,
we assign the emerged vector to l1
; that is,
“local”1
.
<- emerge(d1)
l1 print(l1)
# int [1:1E9] 304 12348 -5899 ...
The write operation is the more general complement of actually
transforming a chunk in some manner. It will be discussed in greater
detail in the following chapter, though an example of how some
interaction may be stated syntactically is given in the following
listing; Here, the act of maximisation is shown at a high level, with
implicit write operations, and an explicit read operation using
emerge
.
print(d1)
# 1 chunk:
# int [1:1E9] 304 12348 -5899 ...
<- max(d1)
d2 print(d2)
# 1 chunk:
# int 109230
<- emerge(d2)
l2 print(l2)
# int 109230
Note that if run in a functional manner, the object d2
is entirely distinct and separate from d1
. The mechanism of
such a transformation is described further in the following chapter.
Importantly, d2
is also a chunk reference, with no emerges
having taken place. The chunk referred to may exist in a separate memory
space to the reference, just like d1
. No specification is
given as to whether it necessarily exists in the same memory space as
d1
however, only that both are remote to the reference.
The above is interaction purely at the pure chunk level. When
considering collections of chunks, or abstracting over them as the user
level would require, certain differences arise. For example, when
seeking the maximum of a set of chunks, due to the transitive nature of
such a function, the maximum of the chunks is the maximum of the
maximums of each chunk. But this is only so if the underlying data
structure possesses meaning for such a function. There must be some
means of mapping from a set of chunks to the entire distributed object
considered as a whole. One example was hinted at before, where chunk
references are held together in an array, with a defined underlying
datastructure making up the chunks as pointed to. Operations over a
group of chunks may take a similar form to operations over a collection
of other objects, which are commonly referred to as
apply functions, both in R and in the wider computer
science world. A means of operating over a group of chunks may be given
in a chunk.apply
function, acting as a higher-order
function that orders the running of whatever function argument is given,
over the set of chunk arguments. Take the earlier example of
maximisation, with new object d3
standing for a collection
of chunks, being three chunk references, each pointing to it’s own chunk
of length one billion; The distributed object is therefore a three
billion length integer.
print(d3)
# 3 chunks
# chunk 1: int [1:1E9] 304 12348 -5899 ...
# chunk 2: int [1:1E9] 3284 23984 8932 ...
# chunk 3: int [1:1E9] -589 238 239874 ...
Over each chunk, send the max
function, to be performed
remotely - this is distinct from the earlier example of pulling in the
data locally first and then operating, with the order here being
reversed. The maximisation takes place in the separate memory spaces
where each chunk resides, possibly on other nodes. The result of the
operation, dmaxes
, consists of the remote results of the
operations, itself being a distributed object of three chunks. Each
chunk in dmaxes
is the maximum value of each of the
respective d3
chunks, as one scalar integer each,
equivalently a distributed object of three integers.
<- chunk.apply(d3, max)
dmaxes print(dmaxes)
# 3 chunks
# chunk 1: int 2397885392
# chunk 2: int 2347234
# chunk 3: int 28346324
Each chunk is now sufficiently small to bring into local memory, and
may be emerged as a single cohesive vector, stored in the local variable
lmaxes
<- emerge(dmaxes)
lmaxes print(lmaxes)
# int [1:3] 2397885392 2347234 28346324
With max
being a transitive operation, the max of
lmaxes
may be taken in turn, with the operation being
performed as normal, locally. This resulting value, assigned as
lmax
, is equivalently the maximum of the entire distributed
object d3
.
<- max(lmaxes)
lmax print(lmax)
# int 2397885392
Thus, an operation was performed over a distributed object without
the entire object existing in local memory at any one point in time. It
is clear to see that such behaviour is generalisable to all other
transitive functions that result in sufficiently small summaries that
may be read as a set out of chunks. The chunk.apply
function would sit as a valuable tool for the power-user layer, while
max
may have some method defined for distributed objects
that uses chunk.apply
in its implementation, and sits at
the user layer.
Let’s pursue this further though, with respect to non-singular summarisations, as this raises further questions. It is easy to conceive of what form the data from a single chunk should be emerged as. It is entirely another question as to what the data from multiple chunks, conceived of as a singular object, should be emerged as. For instance, consider some function that returns a sample with replacement of a set of chunks holding numeric data. At the high level, a user for whom chunks are out of the realm of concern, may simply wish for the following to take place:
print(d3)
# 3 chunks
# chunk 1: int [1:1E9] 304 12348 -5899 ...
# chunk 2: int [1:1E9] 3284 23984 8932 ...
# chunk 3: int [1:1E9] -589 238 239874 ...
sample(d3, 3)
# int [1:3] 245 -1619364 -1918
Here, the results of the operation are automatically emerged from chunks, and the output of the chunks is combined automatically. Automatic unmarshalling is straightforward and can be handled in a wrapper. Automatic combination is less so, as different underlying data types have different means of combination, and indeed there may be different intended means of combination for different operations on the same data type. The means by which such direction can be encoded may take many forms: It may be polymorphic to be dispatched according to datatype upon emergence; it may be held as a stored procedure with the data, or with the chunk; it may be specified manually each time. As with other facets of the object system, it will have to be tested emperically, with the contribution made here to recognise its significance.
For the sake of demonstration, let’s continue with the example of
sampling at a lower level, as it will serve to highlight in a more
complex light the importance of the chunk structure and the system
layering, hopefully cementing the concepts explored in this section.
Consider a different distributed object, with chunks of different sizes,
d4
– consider this as sharing the same first chunk as
d3
, with length 1 × 109, but the second and third
chunks split from the greater whole differently, at lengths 1 × 107 and 9.9 × 108 respectively.
print(d4)
# 3 chunks
# chunk 1: int [1:1E9] 304 12348 -5899 ...
# chunk 2: int [1:1E7] 3284 23984 8932 ...
# chunk 3: int [1:9.9E8] 3874 8392 2398 ...
To sample n elements from this distributed object, each chunk will have to be sampled from. However, in this case with the lengths of the chunks differing, the weighting applied to each chunk must vary. Because the operations on each chunk occur independently and without the possibility of combining them all for comparison, the remote chunk samples must occur in isolation. One manner in which such a problem may be approached is to sample n times from some probability distribution over the chunks, then use the results to sample from the chunks in turn. The chunk probability distribution must be discrete, with support i being the integers corresponding to an enumeration of chunk numbers, and probability of selection for each element in a chunk, pi, is proportional to the length xi of each chunk. Given the assumed fungibility of elements in the distributed object, this is equivalent to a probability-proportional-to-size sampling process, where the specific probability of selection in each draw is given by:
$$ p_i = \frac{x_i}{\sum_{i=1}^{N}x_i} $$
For this particular example, we have the following probability density function:
$$ p_i = \begin{cases} 0.5 & i=1 \\ 0.005 & i=2 \\ 0.495 & i=3 \end{cases} $$
Which, for n = 3 draws, may yield the following series:
(1,3,1)
This implies the first element of the output must be a a sample from
the first chunk, the second element a sample from the third chunk, and
the third element another sample from the first chunk. With the counts
of the series above given as the variable chunk_count
, we
have the following contingency table, with counts labelled by their
corresponding chunk:
print(chunk_count)
# chunk1 chunk2 chunk3
# 2 0 1
Treating the table as a vector, we may use chunk.apply
,
to run a sample over each chunk, using chunk_count
as the
n for each chunk. Therefore,
the first chunk will run a sample with n = 2, the second chunk will run a
sample with n = 0, and the
third will run a sample with n = 1.
<- chunk.apply(d4, sample, n=chunk_count)
d5 print(d5)
# 3 chunks
# chunk 1: int [1:2] 283 9823
# chunk 2: int(0)
# chunk 3: int -9237
This may then be emerged as a local object:
<- emerge(d5)
l3 print(l3)
# int [1:3] 283 9823 -9237
And assuming the existence of some function to rearrange into the
order as given in the original series, perhaps named
rearrange
, with the series given as the variable
series
, we are left with a random sample with replacement
from a distributed object, and none of the sampling of each chunk
necessarily coinciding within the same memory space as any other
chunk:
<- rearrange(l3, to=series)
l4 print(l4)
# int [1:3] 283 -9237 9823
Some considerations that were brushed over relate to what it actually means to run a computation remotely. How can local arguments be mixed with distributed, and what does a distributed argument actually mean? These questions and considerations will be considered in further detail in the following section on computation.