Object System

Jason Cairns


In order to perform calculations on larger-than-memory data, we need some means of representing the data, in order for it to be tangible and useful. Let us start at the beginning, where we accept that some region of memory in the computer must hold a form of the data. We label such a region an object, and maintain that it is general enough that it could very well be a function, a variable, an instance of a class, or a reference to another object, among a multitude of other possibilities. Given that the whole data is larger than memory, it can’t exist as a regular object taking up some contiguous region of memory, and must necessarily take some other form, which at the moment can be left for later description. Irrespectively, the data exists as an abstract object, albeit one that is somewhat irregular.

With respect to our motivating example, the NYC taxi data must necessarily exist in memory somewhere.

All objects, regular or irregular, are facilitated in their access and manipulation by way of an object system, which is an organised manner of interaction with objects. We choose the description of an object system as the basis for the theoretical development of our project as it is that aspect which is most proximal to the constraint imposed by the scale of data dealt with, in being that aspect which defines how data is dealt with. We may begin this description by asking what may be intended of an object system within the scope of this project. In order to answer this, we must first understand precisely who the audience is for this project. Different audiences necessarily seek different interactions with objects, through their differing emphases and required end-results. It may be that multiple audiences require multiple object systems, or that one object system may be sufficiently general to please all users.

With the consideration that our audience is to be working statisticians, as per the initial scope, there still remains some variation within this greater audience that can be described as separate audiences. A notable split is that of depth of use of the program provided by the project. Depth of use separates developers, who will take the physical form of the data into consideration, from users, who simply want to use the data. Alternative splits exist, and a pattern emerges among them of a distinction between power-users and end-users. Without holding this split as the defining audience differentiator, its existence proves that more than a singular monolithic audience needs consideration.

Therefore, the object system must engage multiple audiences. This endeavour may take several different forms, through the existence of multiple object systems, or one singular object system. Such a component will necessarily be driven by the experience of use, but it is worthwhile to explore these options in greater detail prior to implementation. Considering a single object system first, there is the difficulty of appeasing multiple audiences simultaneously. This object system may favour one set of audiences over another, which in turn decreases the value of this project for those audiences unfavoured by the system. Using multiple object systems, with each geared to a particular audience, may overcome such a difficulty, though raises another question, in how the systems relate to each other. As an example solution, with power-users and end-users as illustrative audiences, consider the standard approach taken within software engineering, of layering. In this case, the power-user sits at a layer below that of the end-user, in the sense that the power-user is closer to the data with less abstraction. The abstraction is then what changes the object system of the power-user into one interfaced with by the end-user. The assumption in this case is that the end-user object system builds upon that of the power-user. This aligns with standard software engineering practice to build more user-friendly tools out of more powerful ones. This layering effect also provides a separation of concerns, where separate concerns (logical components) are entirely encapsulated from each other. The separation enables the program to be developed independently at each layer, aiding maintenance as well as design.

Such separation of concerns and layering in the object system may take it’s embodiment in the representative objects offered by each layer. The highest layer is that in which the user has no concern for the representation of the object on the disk. Here, the object is no more than a vessel for whatever data the user seeks to hold, with no thought toward the implementation details. Such an object would define the higher layer of the object system, and would necessarily be supported by lower layers. Compare this with the lowest layer, in which the implementation of this data container could be manipulated directly. This requires a consideration of the nature of the objects, now that implementation details are important. The central constraint is that the data is larger than computer memory. This, coupled with the fact that they must be worked on in-memory, leads to an impossible design if combined naively. The only possible means is to split the data into smaller pieces that do fit in memory. Such a structure, common among big data systems, is known as a shard, or a chunk. This is the only possible result to such a limitation, though it may have many manifestations; we leave such details of implementation to follow experimentation, keeping the description as high-level as possible for now.

In order to understand chunks, the notion of references must be first understood: A reference is some object that acts as a means of connection to some other object, known as a referent. The reference is a value in itself, and is likewise referred to by some symbol, as any other object. The reference serves as a means of indirection, and takes many forms; for instance, the hyperlink, or the pointer. References may be “dereferenced” in order to access the other object that it leads to as a value - clicking the hyperlink, or *ing the pointer.

On a system where referents do not necessarily exist in the same memory space as the reference, we may say that the reference is local, and the referent remote. Upon being dereferenced, the referent is pulled into the same memory space as the reference and therefore also becomes local. Why would references be used, when direct access to the underlying object may be thought more straightforward? One reason is specific to the local-remote distinction; as the only possible means of interacting with remote objects.

For instance, were subsets of a too-large-for-memory dataset to be interacted with, they must be distributed, over space or time. Over space, each subset may sit on a separate node in a computing cluster, within it’s own memory. Over time, each subset may be pulled into the memory of a single node and operated on sequentially, without all subsets existing in the same memory space simultaneously. Each subset is therefore remote to whatever may be controlling their total operations from some central position, and references provide a means of access to each subset. Chunks in all systems are likewise dependent on references, for their capacity to stand in as proxies to the underlying data subset.

The chunk thus serves as the lowest level object manipulable by the user, and defines the lower level of the layered object system. Based on the lower layer serving as a basis for the higher layer, chunks would be used to create the abstract object interacted with by the end-user unconcerned with implementation details; the end-user not interested in the chunks making the object. With the high-level object composed of chunks, it therefore forms some variation of a distributed object. Whether the chunks are necessarily physically dispersed, or accessed from the same location at differing timepoints is a question to be settled later - the overall object is irrespectively distributed over space or time. The power-user would therefore have access to chunks, and the end-user access to the objects composed of chunks, a generalised distributed object, whose manner of distribution may remain undefined for now. Within this range, there may be intermediate objects. For instance, there may be some object which serves as a container for chunks, though still retains access to implementation-specifics of chunks, without hiding such information as the abstract distributed object does. Such an object may be defined by widely differing container shapes, such as a vector, matrix, array, or some other nonlinear form.

Coming back to the example analysis introduced earlier, we may consider that the NYC Taxi data, being a heterogeneous table, would find its optimal existence as an abstract dataframe. Implementation-wise, the underlying chunks would have to split along some dimensions of the dataframe. Given that each row is independent, splitting along rows is a natural point of separation. Were the size of each chunk small enough to fit in memory, this would serve as a sufficient description for chunking. Assuming that the dataset is stored somewhere on disk, at the low level it would have to be read in as chunks. This may take the following form:

nyc_taxi_chunk_collection <- read.chunks(nyc_connection, chunksize=x_mB)

All of the chunks can’t simultaneously exist in memory on one single computer, for the same reason that the entire dataset can’t. Thus this collection of chunks will be a set of pointers to pieces of the dataset across time or space. This low-level form may be abstracted over in a read.csv, or dbConnect, serving as just one particular method which allows for chunks.

The key result of such objects, beyond the multiple audiences allowed for, is that potentially arbitrary data structures may be held in chunks, or distributed objects. This underlying data is therefore able to be specified at the chunk level, and were the system to allow for arbitrary data, special means would be required to interact with it. Specifically, the underlying data will at some points be interacted with directly, and for the system to maintain sufficient generality that the data may take any form, the interaction points would have to be well-specified and extensible to allow correct behaviour at interaction. This would have to be a specifically considered and described component of the object system. Experience will determine how this may best be afforded, whether through polymorphism through an object-oriented system, untyped procedures, or various other means.

Some basic necessities of interaction are reads and writes. Reads are means of access to the data underlying chunks. While it is not possible to access the full dataset directly, each chunk may be summarised to a degree sufficiently small enough to read in and use as a regular object, through some massive dimensionality reducing operation, such as a max or sum. For example, when looking for a max, a chunk consisting of a billion integers is transformed into a chunk of a single integer. This single integer may be pulled out of the chunk and managed as a regular integer, through some read operation, which we name an emerge. The operation is equivalent to accessing the value of some data held in a container such as a list, though in this case the important distinction is the movement of the value from remote to local memory space. Take the following example code as illustration:


# 1 chunk:
# int [1:1E9] 304 12348 -5899 ...

Here, we have the symbol d1 specifying a reference to some chunk, which is a subset of some unspecified greater whole. The chunk is an integer vector of length one billion and exists remotely, while the reference to the chunk is local and immediately accessible through the symbol d1. To interact with the integer vector directly, it may be emerged, which pulls the vector into local memory space as a standard, regular, integer vector. In the following listing, we assign the emerged vector to l1; that is, “local”1.

l1 <- emerge(d1)

# int [1:1E9] 304 12348 -5899 ...

The write operation is the more general complement of actually transforming a chunk in some manner. It will be discussed in greater detail in the following chapter, though an example of how some interaction may be stated syntactically is given in the following listing; Here, the act of maximisation is shown at a high level, with implicit write operations, and an explicit read operation using emerge.


# 1 chunk:
# int [1:1E9] 304 12348 -5899 ...

d2 <- max(d1)

# 1 chunk:
# int 109230

l2 <- emerge(d2)

# int 109230

Note that if run in a functional manner, the object d2 is entirely distinct and separate from d1. The mechanism of such a transformation is described further in the following chapter. Importantly, d2 is also a chunk reference, with no emerges having taken place. The chunk referred to may exist in a separate memory space to the reference, just like d1. No specification is given as to whether it necessarily exists in the same memory space as d1 however, only that both are remote to the reference.

The above is interaction purely at the pure chunk level. When considering collections of chunks, or abstracting over them as the user level would require, certain differences arise. For example, when seeking the maximum of a set of chunks, due to the transitive nature of such a function, the maximum of the chunks is the maximum of the maximums of each chunk. But this is only so if the underlying data structure possesses meaning for such a function. There must be some means of mapping from a set of chunks to the entire distributed object considered as a whole. One example was hinted at before, where chunk references are held together in an array, with a defined underlying datastructure making up the chunks as pointed to. Operations over a group of chunks may take a similar form to operations over a collection of other objects, which are commonly referred to as apply functions, both in R and in the wider computer science world. A means of operating over a group of chunks may be given in a chunk.apply function, acting as a higher-order function that orders the running of whatever function argument is given, over the set of chunk arguments. Take the earlier example of maximisation, with new object d3 standing for a collection of chunks, being three chunk references, each pointing to it’s own chunk of length one billion; The distributed object is therefore a three billion length integer.


# 3 chunks
# chunk 1: int [1:1E9] 304 12348 -5899 ...
# chunk 2: int [1:1E9] 3284 23984 8932 ...
# chunk 3: int [1:1E9] -589 238 239874 ...

Over each chunk, send the max function, to be performed remotely - this is distinct from the earlier example of pulling in the data locally first and then operating, with the order here being reversed. The maximisation takes place in the separate memory spaces where each chunk resides, possibly on other nodes. The result of the operation, dmaxes, consists of the remote results of the operations, itself being a distributed object of three chunks. Each chunk in dmaxes is the maximum value of each of the respective d3 chunks, as one scalar integer each, equivalently a distributed object of three integers.

dmaxes <- chunk.apply(d3, max)

# 3 chunks
# chunk 1: int 2397885392
# chunk 2: int 2347234
# chunk 3: int 28346324

Each chunk is now sufficiently small to bring into local memory, and may be emerged as a single cohesive vector, stored in the local variable lmaxes

lmaxes <- emerge(dmaxes)

# int [1:3] 2397885392 2347234 28346324

With max being a transitive operation, the max of lmaxes may be taken in turn, with the operation being performed as normal, locally. This resulting value, assigned as lmax, is equivalently the maximum of the entire distributed object d3.

lmax <- max(lmaxes)

# int 2397885392

Thus, an operation was performed over a distributed object without the entire object existing in local memory at any one point in time. It is clear to see that such behaviour is generalisable to all other transitive functions that result in sufficiently small summaries that may be read as a set out of chunks. The chunk.apply function would sit as a valuable tool for the power-user layer, while max may have some method defined for distributed objects that uses chunk.apply in its implementation, and sits at the user layer.

Let’s pursue this further though, with respect to non-singular summarisations, as this raises further questions. It is easy to conceive of what form the data from a single chunk should be emerged as. It is entirely another question as to what the data from multiple chunks, conceived of as a singular object, should be emerged as. For instance, consider some function that returns a sample with replacement of a set of chunks holding numeric data. At the high level, a user for whom chunks are out of the realm of concern, may simply wish for the following to take place:


# 3 chunks
# chunk 1: int [1:1E9] 304 12348 -5899 ...
# chunk 2: int [1:1E9] 3284 23984 8932 ...
# chunk 3: int [1:1E9] -589 238 239874 ...

sample(d3, 3)

# int [1:3] 245 -1619364 -1918

Here, the results of the operation are automatically emerged from chunks, and the output of the chunks is combined automatically. Automatic unmarshalling is straightforward and can be handled in a wrapper. Automatic combination is less so, as different underlying data types have different means of combination, and indeed there may be different intended means of combination for different operations on the same data type. The means by which such direction can be encoded may take many forms: It may be polymorphic to be dispatched according to datatype upon emergence; it may be held as a stored procedure with the data, or with the chunk; it may be specified manually each time. As with other facets of the object system, it will have to be tested emperically, with the contribution made here to recognise its significance.

For the sake of demonstration, let’s continue with the example of sampling at a lower level, as it will serve to highlight in a more complex light the importance of the chunk structure and the system layering, hopefully cementing the concepts explored in this section. Consider a different distributed object, with chunks of different sizes, d4 – consider this as sharing the same first chunk as d3, with length 1 × 109, but the second and third chunks split from the greater whole differently, at lengths 1 × 107 and 9.9 × 108 respectively.


# 3 chunks
# chunk 1: int [1:1E9] 304 12348 -5899 ...
# chunk 2: int [1:1E7] 3284 23984 8932 ...
# chunk 3: int [1:9.9E8] 3874 8392 2398 ...

To sample n elements from this distributed object, each chunk will have to be sampled from. However, in this case with the lengths of the chunks differing, the weighting applied to each chunk must vary. Because the operations on each chunk occur independently and without the possibility of combining them all for comparison, the remote chunk samples must occur in isolation. One manner in which such a problem may be approached is to sample n times from some probability distribution over the chunks, then use the results to sample from the chunks in turn. The chunk probability distribution must be discrete, with support i being the integers corresponding to an enumeration of chunk numbers, and probability of selection for each element in a chunk, pi, is proportional to the length xi of each chunk. Given the assumed fungibility of elements in the distributed object, this is equivalent to a probability-proportional-to-size sampling process, where the specific probability of selection in each draw is given by:

$$ p_i = \frac{x_i}{\sum_{i=1}^{N}x_i} $$

For this particular example, we have the following probability density function:

$$ p_i = \begin{cases} 0.5 & i=1 \\ 0.005 & i=2 \\ 0.495 & i=3 \end{cases} $$

Which, for n = 3 draws, may yield the following series:


This implies the first element of the output must be a a sample from the first chunk, the second element a sample from the third chunk, and the third element another sample from the first chunk. With the counts of the series above given as the variable chunk_count, we have the following contingency table, with counts labelled by their corresponding chunk:


# chunk1 chunk2 chunk3 
#      2      0      1 

Treating the table as a vector, we may use chunk.apply, to run a sample over each chunk, using chunk_count as the n for each chunk. Therefore, the first chunk will run a sample with n = 2, the second chunk will run a sample with n = 0, and the third will run a sample with n = 1.

d5 <- chunk.apply(d4, sample, n=chunk_count)

# 3 chunks
# chunk 1: int [1:2] 283 9823
# chunk 2: int(0)
# chunk 3: int -9237

This may then be emerged as a local object:

l3 <- emerge(d5)

# int [1:3] 283 9823 -9237

And assuming the existence of some function to rearrange into the order as given in the original series, perhaps named rearrange, with the series given as the variable series, we are left with a random sample with replacement from a distributed object, and none of the sampling of each chunk necessarily coinciding within the same memory space as any other chunk:

l4 <- rearrange(l3, to=series)

# int [1:3] 283 -9237 9823

Some considerations that were brushed over relate to what it actually means to run a computation remotely. How can local arguments be mixed with distributed, and what does a distributed argument actually mean? These questions and considerations will be considered in further detail in the following section on computation.