Home

Concurrency

Jason Cairns

2022-10-26

Quod est superius est sicut quod inferius, et quod inferius est sicut quod est superius.

That which is above is like to that which is below, and that which is below is like to that which is above.[1]

In a complex distributed system of coarse or fine grain, I/O by way of inter-node communication leads to major constraints on the individual nodes. Synchronous blocking I/O is more than just a speed bottleneck; it has the potential to lock and completely prevent essential sections of program execution. Distributed systems are invariably concurrent at a system-level. A concurrent system in the macro scale (inter-node) is well supported by concurrency in the micro scale (in-process).

At the level of the cluster, most of the packages that were considered in the previous chapter are capable of some degree of concurrency, though they vary significantly in their degree. At the least capable end, SNOW is capable of running the same operation among all nodes in a cluster simultaneously, but there is no support for varying operations on different nodes, as well as no support for asynchronous communication between the client and worker nodes. While limiting in capability, this has lead to a very simple API and structural model of the system, eliminating any chance of the more precarious errors that accompany asynchronous programming, such as race conditions. This can be contrasted with packages making use of MPI as a communication substrate, wherein different nodes can be directly addressed to perform entirely seperate tasks, with such tasks being non-blocking for the client.

Within the range of possible cluster concurrency, the range of ease of programming and simplicity of API exists concurrently. A major challenge is attempting to marry control with ease of use at the cluster scale.

For a distributed system, in-process concurrency is near essential, as a purely sequential system leads to significant bottlenecks if tasks can’t operate independently. With respect to system design, this concurrency provides an even greater challenge, and is to an extent language-dependent. Following different models of concurrency, there are a variety of constructs that enable it within a language.

One such form is that of the promise. Promises are a data structure-based mechanism of handling concurrency[2]. Promise objects serve to represent operations that may occur at some point in time, with the completion of the operation marking a change in state of the promise object from “unresolved” to “resolved”. Most importantly, promise objects possess a then() method, which registers a callback to be run at some point following the resolution of the associated promise, itself returning a promise representing the completion of said callback. Chains of then()s are then used to cleanly compose concurrent operations.

R has a promises package, clearly influenced by JavaScript’s A+ promises[3].

A simple demonstration is given in lst. 1. Note in this listing that there is a very mild race condition in that the prompt is printed in the final line, with the operation of the then() immediately following.

Listing 1: A fully-functioning promise

> library(promises)
> p <- promise(function(resolve, reject) resolve(TRUE))
> print(p)
<Promise [fulfilled: logical]>
> then(p, onFulfilled = function(value) print(value))
> [1] TRUE

Attempting a simple equivalent outside of an empty stack leads to problems, as shown in lst. 2. Here, as long as there are frames on the stack, the then() callback is never triggered. This may make sense in the avoidance of re-entrancy problems, though it is hardly very well advertised.

Listing 2: A non-functioning promise

> {function() {
+     p <- promise(function(resolve, reject) resolve(TRUE))
+     print(p)
+     then(p, onFulfilled = function(value) print(value))
+     repeat {}
+ }}()
<Promise [fulfilled: logical]>

^C
> [1] TRUE

The reason for this inability to function beyond the top level is that if then()s could just be evaluated anywhere in the stack, race conditions far more serious than a prompt mis-print are certain to manifest. If it were possible to explicitly state where evaluation may be safe to take place, perhaps this could be avoided.

The promises source code reveals a dependency on the later package for asynchronous execution, and this is what has lead to the leaky abstraction [4]. The central later() function offered by the package is equivalent to JavaScript’s setTimout() function, and also only executes when there is no other R code on the execution stack. Operations may be forced in later with run_now(), however, and this may serve as an appropriate mechanism for forcing evaluation. This is shown in lst. 3.

Listing 3: A forced promise

> {function() {
+     p <- promise(function(resolve, reject) resolve(TRUE))
+     print(p)
+     then(p, onFulfilled = function(value) print(value))
+     repeat {
+     later::run_now()
+     }
+ }}()
<Promise [fulfilled: logical]>
[1] TRUE

Jumping down a layer of abstraction does appear to solve this problem. However, given that run_now() must be manually called, a new problem is added; scheduling of then()s is now partially in “userspace”.

A layer up is the coro package[5]. This makes use of promises as part of an emulation of coroutines. The source code shows the creation of a state machine that attempts to replicate R’s evaluator, but with allowances for re-entrancy.

An initial attempt within the project at a better-functioning solution was to use continuations in R to implement promises, with coroutines as a base in a similar manner to how it may be implemented in scheme. Unfortunately, continuations in R are downwards-only, and so can’t be relied upon for proper coroutine or promise implementation.

This was followed by an attempt at a solution without continuations: replicate promises, but have the then() run at any point, without needing to force it or be on the top level. This involved the creation of a package that was mostly written in C, to create Promise types, as given in lst. 4, and perform their evaluation.

Listing 4: Internal structure of promises

typedef struct Promise {
    int fd;
    char state[STATE_SIZE+1];
    SEXP value;

    struct Promise *then[MAX_THENS]; /* private */
    int then_i;                  /* private */

    SEXP onFulfilled; /* used only for then */
    SEXP onRejected;  /* used only for then */
} Promise;

The intention was to attach it to R’s event loop, using the addInputHandler() function from R_ext/eventloop.h. This didn’t achieve the goal, as R’s event loop only scans the input handlers at the top level (or when Sys.sleep()ing), leaving precisely the initial problem.

Most other mainstream languages have explicit concurrency as a part of the language. Much of the uptake of concurrency among languages only began in earnest relatively recently, motivated by web servers and services.

Among the older and more powerful methods of concurrency is that of threading. Threading is a very low-level model of preemptive multitasking, where independent “threads” run within a process and are interrupted and context switched by some scheduler, typically the operating system. R is strictly single-threaded. Foreign C functions can be called that themselves involve threading, but all direct interaction with R must remain single-threaded.

pthreads is the standard option for threading in C for R. The library offers aspects that may enable concurrency with R, such as mutexes for critical regions involving R code.

Async/await is a more recent pattern in many programming languages. The name stems from the syntactic constructs which between them enable cooperative multitasking. A function defined with an async keyword may be awaited upon in certain contexts. As described by BBC’s Cloudfit, what takes place is that when a scheduler or event loop encounters an await, the scheduler may be thought of as “bookmarking” the position in program execution, and may jump to any other position that is being awaited, and so on until one of the functions being awaited returns, whereupon execution will continue from that point to the next await. Key examples, though with very different implementations, include Python asyncio and JavaScript’s async/await. The async/await paradigm has had massive uptake, with nearly all major programming languages supporting it, though it is not without criticism.

An example of how async/await may appear within R in the context of the node in the distributed system, is given as lst. 5

Listing 5: Imaginary async/await in R

# N.B. await sleep immediately prior to non-blocking io is roughly equivalent to awaits on async io

main <- async(function() repeat { # this spins; see main2 for a more efficient implementation
    await(async_sleep(0))
    request <- nonblocking_read()
    if (anything_read(request))
        create_task(process_request(request))
})

process_request <- async(function(request) {
    args <- await_lapply(prereqs(request), emerge)
    result <- tryCatch(do.call(computation(request), args), error=identity) # todo: parallel optimisation
    if (response_needed(request))
        send(address(request),result)
})

emerge <- async(function(id) { # Also spinning
    val <- NULL
    while (is.null(val)) {
        await(async_sleep(0))
        val <- get_from_local_cache()
        if (is.null(val))
            val <- non_blocking_get_from_other_host() # a la aiohttp
    }
    store(id, val)
    val
})

async_run(main())

# Non-spinning; relying on some imaginary external async io functions

main2 <- async(function() repeat {
    request <- await(async_read(in_sock)) # c.f. aiohttp
    if (anything_read(request))
        create_task(process_request(request))
})

emerge2 <- async(function(id) {
    val <- async_gather(async_get_from_local_cache(id),
                        async_get_from_other_host(id))
    store(id, val)
    val
})

This depends on the ability to re-enter a context in R.

Another model of concurrency, Communicating Sequential Processes, stems from Tony Hoare’s formal language for describing interaction in a concurrent system[6]. In this model, “processes” may be spawned, and can communicate between each other over channels. Upon sending a message, a process must wait for the receiver to read the message - channels are unbuffered, and the point of execution at a message being received from a sender is known as a “rendezvous” between sender and receiver. Like async/await, the rendezvous point is a signal for the scheduler or event loop to transfer control to any other point of rendezvous, thereby enabling asynchrony. The rendezvous is therefore a point of synchronisation, a powerful feature that implicitly sweeps away whole categories of race conditions. Go has made famous it’s goroutines as taking inspiration from CSP[7]. More recently, Java is experimentally overhauling it’s threading model to use similar concepts in it’s “Project Loom”, and GNU Guile has recently included “fibers” as a variation on CSP as inspired by Concurrent ML.

An example of how a CSP-based concurrency may appear in a node of the distributed system is given in lst. 6. Notably, it is the cleanest and clearest of all the concurrency models assessed thus far.

Listing 6: CSP as imagined in R

main <- function() {

    server <- channel()
    storage <- channel()
    waiting_worker <- channel()
    complete_worker <- channel()
    spawn(routine=serve, channel=server)
    spawn(routine=store, channel=storage)
    repeat {
        select(
            server = function(request) {
                spawn(routine=work, channel=waiting_worker)
                send(channel=waiting_worker, value=c(request, complete_worker))
            },
            complete_worker = function(resolution) {
                send(channel=storage, value=value(resolution))
                if (response_needed(resolution))
                    send(channel=return_address(resolution),
                         value=value(resolution))
            }
        )
    }
}

serve <- function(channel) {
    outside_world <- socket_init()
    repeat send(channel, receive(outside_world))
}

work <- function(channel) {
    send_to_emerger <- function(x) {
        emerger <- channel()
        spawn(routine=emerge, channel=emerger)
        send(channel=emerger, value=x)
        emerger
    }
    request_and_main <- receive(channel)
    request <- request_and_main[[1]]
    main <- request_and_main[[2]]
    prereq_count <- length(prerequisites(request))
    emergers <- lapply(prerequisites(request), send_to_emerger)
    prereqs <- select(list=emergers)
    result <- do.call(computation(request), prereqs)
    send(main, result)
}

store <- function(channel) {
    storage <- new.env()
    repeat {
        item <- receive(channel)
        assign(id(item), value(item), storage)
    }
}

emerge <- function(channel) {
    item <- receive(channel)
    send(channel, GET(id(item), address(item)))
}

The key to understanding the cooperative multitasking interfaces is that they are all powered by coroutines, and these models of asynchrony are largely just variations on access to coroutines. Jumping in and out of a function at will is precisely what a coroutine offers. Surprisingly few languages possessed coroutines until recently, in spite of the clear need. Amusingly, Donald Knuth used a constructed assembly language for his Art of Computer Programming series, with a major contributing factor to this choice being the lack of coroutines in higher-level languages[8].

With these models considered, the challenge remains of enabling concurrency within the implementation language for this project, in order to make use of the well-established architectural patterns involving concurrent servers. Because the implementation of evaluation in R precludes any chance of re-entering contexts, base concurrency will remain unavailable unless there is a near-complete rewrite. As such, were a concurrent server architecture to be pursued in R, the implementation would have to take advantage of a lower-level method of concurrency.

References

[1]
R. Steele and D. W. Singer, “The emerald table,” Proceedings of the Royal Society of Medicine, vol. 21, no. 3, pp. 485–501, 1928, doi: 10.1177/003591572802100361.
[2]
B. Liskov and L. Shrira, “Promises: Linguistic support for efficient asynchronous procedure calls in distributed systems,” ACM SIGPLAN Notices, vol. 23, no. 7, pp. 260–267, 1988.
[3]
J. Cheng, Promises: Abstractions for promise-based asynchronous programming. 2021.Available: https://CRAN.R-project.org/package=promises
[4]
W. Chang and J. Cheng, Later: Utilities for scheduling functions to execute later with event loops. 2021.Available: https://CRAN.R-project.org/package=later
[5]
L. Henry, Coro: ’Coroutines’ for r. 2021.Available: https://CRAN.R-project.org/package=coro
[6]
C. A. R. Hoare, “Communicating sequential processes,” Communications of the ACM, vol. 21, no. 8, pp. 666–677, 1978.
[7]
Go, “The go memory model,” 2014.Available: https://go.dev/ref/mem
[8]
D. E. Knuth, The art of computer programming, volume 1: Fundamental algorithms. Addison-Wesley Professional, 1968.