Home

Message Queues for Communication in a Distributed Object System

Jason Cairns

2020-08-19

1 Introduction

A problem exists in the present architecture as described in Distributed Object supplementary report. At base, the co-ordination between nodes in the cluster requires significant effort from the master, with little room for additional features, with all inter-node communication being remote procedure calls only. This issue compromises nearly every aspect of operations on distributed objects, particularly in the initial reading in of data, and later data movement. The proposed amendments as described in Proposed Information Structures provide some degree of amelioration, however the root of the issue in requiring full co-ordinating facilities from the master node is still not solved.

A potential solution exists in the use of message queues. Message queues are commonly used for inter-process communication, consisting of queues through which applications may communicate over [1]. The use of message queues for communication between nodes will allow for significantly less knowledge required about other nodes within the system, and enabling greater independence of action within each node. Further benefits include allowing for asynchrony in more operations, the ability to monitor the system externally through watching queues, as well as the attendant benefits of decentralisation such as potentially greater resilience and decreased central complexity.

Message queues are well established, seeing use from Operating Systems to Web Services. For example, the QNX OS makes heavy use of message queues for its microkernel architecture [2]. Tech companies Stack Overflow and flickr also use message queues from redis as central components of their infrastructure [3], [4]. In this platform, the flexibility of Redis lists and the availability of the rediscc package suggests the use of Redis in the implementation of message queues [5], [6]. Alternatives include Apache ActiveMQ, Kafka, and Disque, among others [7][9].

2 Architecture Concept

The concept retains the notion of data divided into uniquely identified chunks, existing on nodes. The nodes each subscribe to queues dedicated to chunks that they possess, undertaking action dependent on messages received in their queue. In this way, nodes function as state machines, reading messages, performing some operation (or not) depending on the message content, and writing back in some form.

For example, some node has a chunk x, and receives a message on the x queue to add it to another chunk y; if it didn’t have the chunk y, it may post a message on the y queue, requesting it. It is likely that the semantics are more general, and the initial message of operation actually won’t specify which chunk to add to x, giving it a more general request of addition between distributed objects, and the node will have to determine for itself which chunks to pull to it, if any.

A very simple example is given by lsts. 1, 2, demonstrating the master and worker node routines respectively.

Listing 1: Message queue master node

library(rediscc)

REDIS_HOST <- "localhost"
REDIS_PORT <- 6379L
QUEUE <- "chunk"
MSG <- quote({ chunk + 1})

main <- function() {
    rsc <- redis.connect(REDIS_HOST, REDIS_PORT)
    writeMsg(MSG, to=QUEUE, conn=rsc)
}

writeMsg <- function(msg, to, conn) {
    serializedMsg <- rawToChar(serialize(msg, NULL, T))
    redis.push(conn, to, serializedMsg)
}

main()

Listing 2: Message queue worker node

library(rediscc)

REDIS_HOST <- "LOCALHOST"
REDIS_PORT <- 6379L
QUEUE <- "chunk"
chunk <- seq(10)

main <- function() {
    rsc <- redis.connect(REDIS_HOST, REDIS_PORT)
    while (TRUE) {
        msg <- readMessage(QUEUE, conn=rsc)
        print(eval(msg))
    }
}

readMessage <- function(queues, conn) {
    serializedMsg <- redis.pop(conn, queues, timeout=Inf)
    unserialize(charToRaw(serializedMsg))
}

main()

The master simply pushes serialised calls to the appropriate queue, and the worker loops reading messages from it’s particular queue(s), unserialising and evaluating any messages. The master node and the worker node can be initialised in any order and with any time difference, demonstrating the asynchrony. In this particular example, the master puts out a call to add the number 1 to a predefined chunk named “chunk”, with the worker executing the call as expected. The master doesn’t have to have any knowledge about where the chunk exists, and the worker likewise doesn’t necessarily require information on where the message originated from.

With relation to the queues, the aggregate functionality of nodes in a cluster, can be considered distinctly to the functionality of a singular node. The cluster must have some means of initialising the queues to listen to for the individual nodes, for the reading in of an external dataset. The process of operating based on queues is straightforward at the outset, but requires considerable thought on the representation and existence of objects other than the referent object of a queue when the queues operations require those other objects.

The issue of data movement also requires consideration; while this is largely an implementation-specific issue, it has a strong bearing on conceptual architecture. These three considerations mirror the state machine concept at an aggregate level, with resultant decisions affecting the architecture at large.

3 Plausible Extensions

While the use of message queues looks to ease many significant issues, there are additional problems that it require addressing, primarily resulting from the asynchrony and decentralisation.

Most pressingly, the issue of deadlock, in the context of data movement; in the process of nodes requesting and transferring data, a deadlock is almost certain to occur if not dealt with. This specific instance may be helped through careful implementation and more processing on the initiator node (bringing it closer to a master node), but a possibly superior extension is to have queue-adjacent servers on each node that are able to operate concurrently with the main R session that has as it’s sole purpose the transfer of chunks, leaving everything else to the main R session. These servers would also be ideal vertices at which to implement data-duplication as a feature enabling redundancy in the system

In a similar vein, failure detection can be implemented through a concurrent “heartbeat” server, in the same manner as Hadoop [10].

These extensions are worth bearing in mind for now, however the considerations brought up in the prior section need to be answered first.

[1]
E. Curry, “Message-oriented middleware,” Middleware for communications, pp. 1–28, 2004.
[2]
D. Hildebrand, “An architectural overview of QNX. in USENIX workshop on microkernels and other kernel architectures, 1992, pp. 113–126.
[3]
N. Walker and N. Caudill, “Real-time updates on the cheap for fun and profit,” Oct. 2011.
[4]
K. Montrose, “Does stack exchange use caching and if so, how?” Meta Stack Exchange, Aug. 2016.
[5]
S. Sanfilippo and P. Noordhuis, “Redis.” 2009.
[6]
S. Urbanek, Rediscc: Redis client. 2020.
[7]
B. Snyder, D. Bosnanac, and R. Davies, ActiveMQ in action, vol. 47. Manning Greenwich Conn., 2011.
[8]
N. Garg, Apache kafka. Packt Publishing Ltd, 2013.
[9]
S. Sanfilippo, “Disque.” 2016.
[10]
T. White, Hadoop: The definitive guide. " O’Reilly Media, Inc.", 2012.