2020-08-19
Given the simplicity and promise of flexibility as demonstrated in the documents Inter-node Communication with Redis and Message Queue Communications, further experimentation around the concept is undertaken and documented herein. The experiments are built successively upon it’s prior, with the aim of rapidly approximating a functioning prototype via experimentation.
While the RPC-based architecture as described in Experiment: Eager Distributed
Object had significant limitations, a particularly powerful
construct was the higher order function distributed.do.call
, which took functions as
arguments to be performed on the distributed chunks.
This construct is powerful in that it can serve as the basis for nearly every function on distributed chunks, and this section serves to document experiments relating to the creation of a general function that will perform a function at the node hosting a particular chunk.
Regardless of performing the actual function, some means of returning the value of a function must be provided; this section focuses on getting a function to be performed on a server node, with the result send back to the client. Listings of an implementation of these concepts are given by lsts. 1, 2.
Listing 1: Value return to request for client Node
#!/usr/bin/env R
library(rediscc)
<- redis.connect(host="localhost", port=6379L)
RSC <- list(host="localhost", port=12345L)
SELF_ADDR <- structure("chunk1", class = "chunk")
chunk
<- function() {
main doFunAt(fun=exp, chunk=chunk)
}
<- function(fun, chunk, conn) {
doFunAt <- bquote(list(fun=.(fun),
msg chunk=.(chunk),
returnAddr=.(SELF_ADDR)))
writeMsg(msg, chunk)
cat("wrote message: ", format(msg), " to ", chunk, "\n")
listenReply()
}
<- function() {
listenReply <- socketConnection(getHost(), getPort(), server=TRUE)
replySock <- character(0)
response while (length(response) < 1) {
<- tryCatch(unserialize(replySock),
response error = function(e) {
cat("no reply, trying again in 1 sec\n")
Sys.sleep(1); NULL})
}cat("received response: ", format(response), "\n")
close(replySock)
response
}
<- function() SELF_ADDR$host
getHost <- function() SELF_ADDR$port
getPort
<- function(msg, to) {
writeMsg <- rawToChar(serialize(msg, NULL, T))
serializedMsg redis.push(RSC, to, serializedMsg)
}
main()
Listing 2: Value return to request for server Node
#!/usr/bin/env R
library(rediscc)
<- redis.connect(host="localhost", port=6379L)
RSC <- seq(10)
chunk1 <- "chunk1"
QUEUE
<- function() {
main while (TRUE) {
<- readMessage(QUEUE)
msg cat("read message:", format(msg), "\n")
<- doFun(msg)
result cat("result is: ", format(result), "\n")
reply(result, getReturnAddr(msg))
}
}
<- function(msg) {
doFun <- getFun(msg); arg <- getArg(msg)
fun do.call(fun, list(arg))
}
<- function(result, returnAddr) {
reply <- NULL
replySock while (is.null(replySock))
<- tryCatch(socketConnection(getHost(returnAddr),
replySock getPort(returnAddr)),
error = function(e) {
cat("Failed to connect to return address",
", trying again..\n")
NULL})
cat("replying to request...\n")
serialize(result, replySock)
cat("replied\n")
close(replySock)
}
<- function(msg) msg$fun
getFun <- function(msg) get(msg$chunk)
getArg <- function(msg) msg$returnAddr
getReturnAddr <- function(addr) addr$host
getHost <- function(addr) addr$port
getPort
<- function(queues) {
readMessage <- redis.pop(RSC, queues, timeout=Inf)
serializedMsg unserialize(charToRaw(serializedMsg))
}
main()
To this end, the client node has a function defined as doFunAt(fun, chunk)
,
which takes in any function, and the ID of a chunk to perform the
function on. An implementation is given by lst. 1. doFunAt()
first
composes a message to send to the chunk’s queue, being a list consisting
of the function, the chunk name, and a return address, which contains
sufficient information for the node performing the operation on the
chunk to send the results back to via socket connection. The message is
then serialised and pushed to the chunk’s queue, and the requesting node
sits listening on the socket that it has set up and advertised.
On the server node end, it sits waiting on it’s preassigned queues,
each of which correspond to a chunk that it holds. Upon a message coming
through, it runs a doFun()
function on the message, which
in turn runs the function on the chunk named in the message. An
implementation is given by lst. 2. It then
creates a socket connected to the clients location as advertised in the
message, and sends the serialised results through.
A problem with this approach is the fickle aspect of creating and removing sockets for every request; beyond the probability of missed connections and high downtime due to client waiting on a response, R only has a very limited number of connections available to it, so it is impossible to scale beyond that limit.
Assigning the results of distributed operation to a new chunk is a far more common operation in a distributed system in order to minimise data movement. This will involve specifying additional directions as part of the request message, in order to specify that assignment, and not merely the operation, is desired.
It will be clear from the previous example that the problem of point-to-point data movement, somewhat solved via direct sockets in that previous example, is largely an implementation issue, and a problem entirely distinct to the remainder of the logic of the system. From this experiment onwards, the mechanism of data movement is abstracted out, with the assumption that there will exist some additional tool that can serve as a sufficient backend for data movement. In reality, until that tool is developed, data will be sent through redis; not a solution, but something that can be ignored without loss of generality.
The actual creation of a chunk ID in itself demands a system-wide
unique identifier; this is a solved problem with a central message
server, in redis providing an INCR
operation, which can be
used to generate a new chunk ID that is globally unique.
The name origination and option of blocking until a chunk is formed will dictate different algorithms in the creation of the distributed chunk object, as well as the structure of the distributed chunk object. tbl. 1 shows potential forms these may take. In addition, the “jobID” referred to in the table may take the concrete form of a simple key-value store, with the key being passed and monitored by the client node.
Client-Originated chunk ID | Server-Originated chunk ID | |
---|---|---|
Blocking Algorithm | client attains chunk ID, sends operation request withchunk ID to server, creating chunk reference concurrently, blocking until direct signal of completion from server. | client sends operation request with reference to some common information repository and the job ID to server. server attains chunk ID, performs operation, and sends chunk chunk ID to thejob ID at the common information repository, which client watches, releasing chunk object after attaining chunk ID from repository. |
Blocking Structure | String name of chunk. | String name of chunk. |
Non-Blocking Algorithm | client attains chunk ID, sends operation request withchunk ID to server, creating chunk reference concurrently. Nowaiting for server signal of completion. | client sends operation request with reference to some common information repository and the job ID to server. server attains chunk ID, performs operation, and sends chunk ID to the job ID at common information repository. Before server completion, client releases chunk object, not waiting for reception of chunk information. |
Non-Blocking Structure | String name of chunk | Initially, reference to common information repository. Mutable; can become string name of chunk upon accessing that information in the common information repository. |
While it is clearly more straightforward for a client node to
originate a chunk ID, with blocking, the opposite will possibly be the
most flexible; server-originated chunk ID with no blocking. This is
because the very existence of a chunk is presupposed when a client node
originates a chunk ID, while that may not be true in reality. For
instance, the result may be an unexpected NULL
, zero-length
vector, or even an error. In addition, the server-originated chunk ID
with no blocking has every feature common to that of a future, from the
future package; it can be checked for completion, and accessed as a
value, allowing for many asynchronous and parallel operations.
The logic of the client in assigning the result of a distributed
operation on a chunk is largely encapsulated in a new function, assignFunAt()
, as
demonstrated in lst. 3. The function
attains a chunk ID, generates a unique return address, sends a message
to the operand chunk queue, and waits for a reply, before returning the
id as a string belonging to the “chunk” class. There is more information
in the message relative the the function-only message of section sec. 2.1; the chunk ID, request for acknowledgement
of completion, return address, as well as an operation specifier to
direct the intent of the message.
Listing 3: Demonstration of assignment in client Node, with client-Originated chunk name
#!/usr/bin/env R
library(rediscc)
library(uuid)
<- redis.connect(host="localhost", port=6379L)
RSC redis.rm(RSC, "chunkID")
<- structure("chunk1", class = "chunk")
chunk1 redis.rm(RSC, "chunk1")
redis.rm(RSC, as.character(1:10))
<- function() {
main = assignFunAt(fun=expm1, chunk=chunk1, wait=F)
x = assignFunAt(fun=log1p, chunk=x, wait=T)
y
}
<- function(fun, chunk, wait=TRUE) {
assignFunAt <- getChunkID();
id <- UUIDgenerate()
returnAddr sendMsg("ASSIGN", fun, chunk, returnAddr, id, ack = wait)
if (wait) readReply(returnAddr)
structure(id, class = "chunk")
}
<- function(fun, chunk) {
doFunAt <- UUIDgenerate()
returnAddr sendMsg("DOFUN", fun, chunk, returnAddr)
readReply(returnAddr)
}
<- function() as.character(redis.inc(RSC, "chunkID"))
getChunkID
<- function(addr, clear=TRUE) {
readReply <- redis.pop(RSC, addr, timeout = Inf);
reply if (clear) redis.rm(RSC, addr)
unserialize(charToRaw(reply))
}
<- function(op, fun, chunk, returnAddr, id=NULL, ack=NULL) {
sendMsg <- newMsg(op, fun, chunk, id, ack, returnAddr)
msg writeMsg(msg, chunk)
}
<- function(op, fun, chunk, id, ack, returnAddr) {
newMsg structure(list(op = op, fun = fun, chunk = chunk,
id = id, ack = ack, returnAddr = returnAddr),
class = "msg")
}
<- function(msg, to) {
writeMsg <- rawToChar(serialize(msg, NULL, T))
serializedMsg redis.push(RSC, to, serializedMsg)
}
<- function(x, ...) {
format.chunk <- doFunAt(identity, x)
obj format(obj)
}
main()
The server, as shown in lst. 4,
consists in a loop of reading the message and performing an operation
dependent on the operation specifier of the message. For an operation of
DOFUN
, all that is run is a do.call()
on the
function and chunk specified, with a message being returned to the
client with the value of the do.call()
. An
operation of ASSIGN
runs the same as DOFUN
,
with the addition of assigning the value to the ID as passed in the
message, adding the ID to the array of queues to monitor, and
potentially sending acknowledgement back to the client node.
Listing 4: Demonstration of assignment in server Node, with client-Originated chunk name
#!/usr/bin/env R
library(rediscc)
<- redis.connect(host="localhost", port=6379L)
RSC <- seq(10)
chunk1 <- "chunk1"
QUEUE
<- function() {
main while (TRUE) {
<- readMessage(QUEUE)
msg cat("read message:", format(msg), "\n")
switch(getOp(msg),
"ASSIGN" = {assignFun(getFun(msg), getChunk(msg),
getChunkID(msg))
if (getAck(msg))
writeMsg("Complete", getReturnAddr(msg))},
"DOFUN" = writeMsg(doFun(getFun(msg), getChunk(msg)),
getReturnAddr(msg)))
}
}
<- function(fun, chunk, id) {
assignFun <- doFun(fun, chunk)
val assign(id, val, envir = .GlobalEnv)
assign("QUEUE", c(QUEUE, id), envir = .GlobalEnv)
}
<- function(fun, chunk) {
doFun do.call(fun, list(chunk))
}
<- function(field) function(msg) msg[[field]]
getMsgField <- getMsgField("op"); getFun <- getMsgField("fun")
getOp <- function(msg) get(getMsgField("chunk")(msg))
getChunk <- getMsgField("id"); getAck <- getMsgField("ack")
getChunkID <- getMsgField("returnAddr")
getReturnAddr
<- function(queues) {
readMessage <- redis.pop(RSC, queues, timeout=Inf)
serializedMsg unserialize(charToRaw(serializedMsg))
}
<- function(msg, to) {
writeMsg <- rawToChar(serialize(msg, NULL, T))
serializedMsg redis.push(RSC, to, serializedMsg)
cat("wrote message: ", format(msg),
" to queue belonging to chunk \"", to, "\"\n")
}
main()
By this point the client (lst. 5) and server (lst. 6) come to increasingly resemble each other, and most of the functions are shared, as in listings lsts. 7-9.
The principal mechanism of action is best demonstrated via a logical time diagram, given by figure (missing), following a Lamport form of event ordering [1]. The first message, shown by the a arrow in the diagram, involves a client sending a message to a server regarding the request, including the job ID naming a queue in a shared information reference for the server to later place the chunk ID into.
Optionally, the client can immediately create a chunk object with no direct knowledge of the chunk ID, holding the job ID at the information reference instead, and the client continues whatever work it was doing. Only when the chunk ID is required, the chunk object, triggers a blocking pop on it’s associated information reference queue, which the server may at any point push the chunk ID to. The chunk object then has the associate the ID associated with it, and the information reference queue can be deleted.
Listing 5: Demonstration of assignment in client Node, with server-Originated chunk name
#!/usr/bin/env R
source("shared.R")
source("messages.R")
source("chunk.R")
distInit()
::redis.rm(conn(), c("distChunk1", paste0("C", 1:10), paste0("J", 1:10),
rediscc"JOB_ID", "CHUNK_ID"))
<- structure(new.env(), class = "distChunk")
distChunk1 chunkID(distChunk1) <- "distChunk1"
<- function() {
main cat("Value of distChunk1:", format(distChunk1), "\n")
<- do.call.distChunk(what=expm1, chunkArg=distChunk1,
x assign=T, wait=F)
cat("Value of x:", format(x), "\n")
<- do.call.distChunk(log1p, x, assign=T, wait=T)
y cat("Value of y:", format(y), "\n")
}
main()
Listing 6: Demonstration of assignment in server Node, with server-Originated chunk name
#!/usr/bin/env R
source("shared.R")
source("messages.R")
source("chunk.R")
distInit()
<- seq(10)
distChunk1 <- "distChunk1"
QUEUE
<- function() {
main repeat {
<- read.queue(QUEUE)
m switch(op(m),
"ASSIGN" = {cID <- do.call.chunk(what=fun(m),
chunkArg=chunk(m),
distArgs=dist(m),
staticArgs=static(m),
assign=TRUE)
send(CHUNK_ID = cID, to = jobID(m))},
"DOFUN" = {v <- do.call.chunk(what=fun(m),
chunkArg=chunk(m),
distArgs=dist(m),
staticArgs=static(m),
assign=FALSE)
send(VAL = v, to = jobID(m))})
}
}
<- function(what, chunkArg, distArgs, staticArgs, assign=TRUE) {
do.call.chunk if (assign) {
<- chunkID()
cID <- do.call(what, list(chunkArg))
v cat("Assigning value", format(v), "to identifier",
format(cID), "\n")
assign(cID, v, envir = .GlobalEnv)
assign("QUEUE", c(QUEUE, cID), envir = .GlobalEnv)
return(cID)
else do.call(what, list(chunkArg))
}
}
main()
Listing 7: Chunk functions of client and server in server-Originated chunk names for assignment
# distChunk methods
<- function(x, ...) get("JOB_ID", x)
jobID.distChunk
<- function(x, ...) {
chunkID.distChunk if (! exists("CHUNK_ID", x)) {
<- jobID(x)
jID cat("chunkID not yet associated with distChunk; checking jobID",
"\n")
jID, <- chunkID(read.queue(jID, clear=TRUE))
cID cat("chunkID \"", format(cID), "\" found; associating...\n",
sep="")
chunkID(x) <- cID
}get("CHUNK_ID", x)
}
<- function(what, chunkArg, distArgs=NULL, staticArgs=NULL,
do.call.distChunk assign=TRUE, wait=FALSE) {
<- jobID()
jID cat("Requesting to perform function", format(what), "on chunk",
chunkID(chunkArg), "with",
if (assign) "assignment" else "no assignment", "\n")
send(OP = if (assign) "ASSIGN" else "DOFUN", FUN = what,
CHUNK = chunkArg, DIST_ARGS = distArgs, STATIC_ARGS = staticArgs,
JOB_ID = jID, to = chunkID(chunkArg))
<- if (assign) {
dc if (!wait){
cat("not waiting, using job ID", format(jID), "\n")
distChunk(jID)
else {
} distChunk(chunkID(read.queue(jID, clear=TRUE)))
else {
} } val(read.queue(jID, clear=TRUE))
}
dc
}
<- function(x, ...) {
format.distChunk <- do.call.distChunk(identity, x, assign=FALSE)
c format(c)
}
Listing 9: Message functions of client and server in server-Originated chunk names for assignment
# messaging functions
<- function(...) {
msg structure(list(...), class = "msg")
}
<- function(..., to) {
send <- list(...)
items <- do.call(msg, items)
m write.msg(m, to)
}
<- function(m, to) {
write.msg <- rawToChar(serialize(m, NULL, T))
serializedMsg ::redis.push(conn(), to, serializedMsg)
rediscccat("wrote message: ", format(m),
" to queue belonging to chunk \"", to, "\"\n", sep="")
}
<- function(queue, clear = FALSE) {
read.queue cat("Awaiting message on queues: ", format(queue), "\n", sep="")
<- rediscc::redis.pop(conn(), queue, timeout=Inf)
serializedMsg if (clear) rediscc::redis.rm(conn(), queue)
<- unserialize(charToRaw(serializedMsg))
m cat("Received message:", format(m), "\n")
m
}
# message field accessors
<- function(field) function(x, ...) x[[field]]
msgField # Requesters
<- msgField("OP"); fun <- msgField("FUN")
op <- msgField("STATIC_ARGS")
static <- function(x, ...) get(chunkID(msgField("CHUNK")(x)))
chunk.msg <- msgField("JOB_ID"); dist.msg <- msgField("DIST_ARGS")
jobID.msg # Responders
<- msgField("VAL"); chunkID.msg <- msgField("CHUNK_ID") val