Emulating a shared atomic, read/write storage system is a fundamental problem in distributed computing. Replicating atomic objects among a set of data hosts was the norm for traditional implementations (e.g., [11]) in order to guarantee the availability and accessibility of the data despite host failures. As replication is highly storage demanding, recent approaches suggested the use of erasure-codes to offer the same fault-tolerance while optimizing storage usage at the hosts. Initial works focused on a fixed set of data hosts. To guarantee longevity and scalability, a storage service should be able to dynamically mask hosts failures by allowing new hosts to join, and failed host to be removed without service interruptions. This work presents the first erasure-code -based atomic algorithm, called Ares, which allows the set of hosts to be modified in the course of an execution. Ares is composed of three main components: (i) a reconfiguration protocol, (ii) a read/write protocol, and (iii) a set of data access primitives(DAPs). The design of Ares is modular and is such to accommodate the usage of various erasure-code parameters on a per-configuration basis. We provide bounds on the latency of read/write operations and analyze the storage and communication costs of the Ares algorithm.
1 Introduction
Distributed Storage Systems (DSSes) store large amounts of data in an affordable manner. Cloud vendors deploy hundreds to thousands of commodity machines, networked together to act as a single giant storage system. Component failures of commodity devices and network delays are the norm, therefore, ensuring consistent data access and availability at the same time is challenging. Vendors often solve availability by replicating data across multiple servers. These services use carefully constructed algorithms that ensure that these copies are consistent, especially when they can be accessed concurrently by different operations. The problem of keeping copies consistent becomes even more challenging when failed servers need to be replaced or new servers are added, without interrupting the service. Any type of service interruption in a heavily used DSS usually translates to immense revenue loss.
The goal of this work is to provide an algorithm for implementing strongly consistent (i.e., atomic/linearizable), fault-tolerant distributed read/write storage, with low storage and communication footprint, and the ability to reconfigure the set of data hosts without service interruptions.
Replication-based Atomic Storage. A long stream of work used replication of data across multiple servers to implement atomic (linearizable) read/write objects in message-passing, asynchronous environments where servers (data hosts) may crash fail [10, 11, 21, 22, 23, 25, 26, 40]. A notable replication-based algorithm appears in the work by Attiya, Bar-Noy, and Dolev [11] (we refer to as the ABD algorithm) which implemented non-blocking atomic read/write data storage via logical timestamps paired with values to order read/write operations. Replication-based strategies, however, incur high storage and communication costs; for example, to store 1,000,000 objects each of size 1 MB (a total size of 1TB) across a 3 server system, the ABD algorithm replicates the objects in all the 3 servers, which blows up the worst-case storage cost to 3TB. Additionally, every write or read operation may need to transmit up to 3 MB of data (while retrieving an object value of size 1 MB), incurring high communication cost.
Erasure Code-based Atomic Storage. Erasure Coded-based DSSes are extremely beneficial to save storage and communication costs while maintaining similar fault-tolerance levels as in replication-based DSSes [16]. Mechanisms using an \([n, k]\) erasure code splits a value v of size, say 1 unit, into k elements, each of size \(\frac{1}{k}\) units, creates ncoded elements of the same size, and stores one coded element per server, for a total storage cost of \(\frac{n}{k}\) units. So the \([n = 3, k = 2]\) code in the previous example will reduce the storage cost to 1.5 TB and the communication cost to 1.5 MB (improving also operation latency). Maximum Distance Separable (MDS) codes have the property that value v can be reconstructed from any k out of these n coded elements; note that replication is a special case of MDS codes with \(k=1\). In addition to the potential cost-savings, the suitability of erasure-codes for DSSes is amplified with the emergence of highly optimized erasure coding libraries, that reduce encoding/decoding overheads [3, 12, 46]. In fact, an exciting recent body of systems and optimization works [7, 33, 46, 49, 52, 53, 54, 58] has demonstrated that for several data stores, the use of erasure coding results in lower latencies than replication-based approaches. This is achieved by allowing the system to carefully tune erasure coding parameters, data placement strategies, and other system parameters that improve workload characteristics—such as load and spatial distribution. A complementary body of work has proposed novel non-blocking algorithms that use erasure coding to provide an atomic storage over asynchronous message passing models [13, 15, 16, 20, 34, 35, 56]. Since erasure code-based algorithms, unlike their replication-based counter parts, incur the additional burden of synchronizing the access of multiple pieces of coded-elements from the same version of the data object, these algorithms are quite complex.
Reconfigurable Atomic Storage.Configuration refers to the set of storage servers that are collectively used to host the data and implement the DSS. Reconfiguration is the process of adding or removing servers in a DSS. In practice, reconfigurations are often desirable by system administrators [9], for a wide range of purposes, especially during system maintenance. As the set of storage servers becomes older and unreliable they are replaced with new ones to ensure data durability. Furthermore, to scale the storage service to increased or decreased load, larger (or smaller) configurations may be needed to be deployed. Therefore, in order to carry out such reconfiguration steps, in addition to the usual read and write operations, an operation called reconfig is invoked by reconfiguration clients. Performing reconfiguration of a system, without service interruption, is a very challenging task and an active area of research. RAMBO [39] and DynaStore [8] are two of the handful of algorithms [17, 24, 27, 32, 47, 48] that allow reconfiguration on live systems; all these algorithms are replication-based.
A related body of work appeared for erasure coded scaling, although there exist important differences that distinguish the two problems. In particular works like [30, 50, 51, 55, 57] consider RAID-based systems with synchronous network communication and local computation. Synchrony allows processes to make assumptions on the time of message delivery, and in turn, help them to infer whether a communicating party has failed or not. On an asynchronous system, similar to the one we consider in this work, messages may be delivered with arbitrary delays. Therefore, it is impossible to distinguish whether a message from a source is in transit or the source has crashed before sending a message. This uncertainty makes it impossible to detect failed from operating nodes, and thus challenging to design algorithms to guarantee atomicity (strong consistency) and completion of reads and writes.
Despite the attractive prospects of creating strongly consistent DSSes with low storage and communication costs, so far, no algorithmic framework for reconfigurable atomic DSS employed erasure coding for fault-tolerance or provided any analysis of bandwidth and storage costs. Our article fills this vital gap in algorithms literature, through the development of a novel reconfigurable approach for atomic storage that uses erasure codes for fault-tolerance. From a practical viewpoint, our work may be interpreted as a bridge between the systems optimization works [7, 33, 46, 49, 52, 53, 54, 58] and non-blocking erasure coded based consistent storage [13, 15, 16, 20, 34, 35, 56]. Specifically, the uses of our reconfigurable algorithm would potentially enable a data storage service to dynamically shift between different erasure coding-based parameters and placement strategies, as the demand characteristics (such as load and spatial distribution) change, without service interruption.
Our Contributions. We develop a reconfigurable, erasure-coded, atomic, or strongly consistent [29, 38] read/write storage algorithm, called Ares. Motivated by many practical systems, Ares assumes clients and servers are separate processes* that communicate via logical point-to-point channels.
In contrast to the replication-based reconfigurable algorithms [8, 17, 24, 27, 32, 39, 47, 48], where a configuration essentially corresponds to the set of servers that stores the data, the same concept for erasure coding need to be much more involved. In particular, in erasure coding, even if the same set of n servers are used, a change in the value of k defines a new configuration. Furthermore, several erasure coding-based algorithms [15, 20] have additional parameters that tune how many older versions each server stores, which in turn influences the concurrency level allowed. Tuning of such parameters can also fall under the purview of reconfiguration.
To accommodate these various reconfiguration requirements, Ares takes a modular approach. In particular, Ares uses a set of primitives, called data-access primitives (DAPs). A different implementation of the DAP primitives may be specified in each configuration. Ares uses DAPs as a “black box” to: (i) transfer the object state from one configuration to the next during reconfig operations, and (ii) invoke read/write operations on a single configuration. Given the DAP implementation for each configuration we show that Ares correctly implements a reconfigurable, atomic read/write storage.
The DAP primitives provide Ares a much broader view of the notion of a configuration as compared to replication-based algorithms. Specifically, the DAP primitives may be parameterized, following the parameters of protocols used for their implementation (e.g., erasure coding parameters, set of servers, quorum design, and concurrency level). While transitioning from one configuration to another, our modular construction allows Ares to reconfigure between different sets of servers, quorum configurations, and erasure coding parameters. In principle, Ares even allows reconfiguring between completely different protocols as long as they can be interpreted/expressed in terms of the primitives; though in this article, we only present one implementation of the DAP primitives to keep the scope of the article reasonable. From a technical point of view, our modular structure makes the atomicity proof of a complex algorithm (like Ares) easier.
An important consideration in the design choice of Ares is to ensure that we gain/retain the advantages that come with erasure codes—cost of data storage and communication is low—while having the flexibility to reconfigure the system. Towards this end, we present an erasure-coded implementation of DAPs which satisfy the necessary properties and are used by Ares to yield the first reconfigurable, erasure-coded, read/write atomic storage implementation, where read and write operations complete in two-rounds. We provide the atomicity property and latency analysis for any operation in Ares, along with the storage and communication costs resulting from the erasure-coded DAP implementation. In particular, we specify lower and upper bounds on the communication latency between the service participants, and we provide the necessary conditions to guarantee the termination of each read/write operation while concurrent with reconfig operations.
Table 1 compares Ares with a few well-known erasure-coded and replication-based (static and reconfigurable) atomic memory algorithms. From the table we observe that Ares is the only algorithm to combine a dynamic behavior with the use of erasure codes, while reducing the storage and communication costs associated with the read or write operations. Moreover, in Ares the number of rounds per write and read is at least as good as in any of the remaining algorithms.
Table 1. Comparison of Ares with Previous Algorithms Emulating Atomic Read/Write Memory for Replication (Repl.) and Erasure-code Based (EC) Algorithms
\(\delta\) is the maximum number of concurrent writes with any read during the course of an execution of the algorithm. In practice, \(\delta \lt 4\) [16].
We developed a proof-of-concept (PoC) implementation of Ares and deployed it over a set of distributed devices in the experimental testbed Emulab [2]. The most important take-home message from our experimental results is to show that it is possible to implement our algorithm according to the specifications and produces a correct execution and remains available during reconfiguration. Although the correctness of the algorithm is shown analytically, the experimental validation corroborates the correctness. For this purpose, we have chosen simple parameterization (e.g., uniform selection of read/write invocation intervals), and picked ABD [11] as a benchmark algorithm which, despite being proposed more than 25 years ago, is the fundamental algorithm for emulating replicated quorum-based atomic shared memory. For instance, it is adopted in commercial/open-source implementations like Cassandra 36,† and is being used as a standard benchmark algorithm (as can be seen in other recent works [19]). However, to demonstrate a real-world application we would need to compare with more algoritrhms and utilize a wide range of read/write distributions, and this is planned as a separate work.
Document Structure. Section 2 presents the model assumptions and Section 3, the DAP primitives. In Section 4, we present the implementation of the reconfiguration and read/write protocols in Ares using the DAPs. In Section 5, we present an erasure-coded implementation of a set of DAPs, which can be used in every configuration of the Ares algorithm. Section 7 provides operation latency and cost analysis, and Section 8 the DAP flexibility. Section 9 presents an experimental evaluation of the proposed algorithms. We conclude our work in Section 10. Due to lack of space omitted proofs can be found in [43].
2 Model and Definitions
A shared atomic storage, consisting of any number of individual objects, can be emulated by composing individual atomic memory objects. Therefore, herein we aim in implementing a single atomic read/write memory object. A read/write object takes a value from a set \({\mathcal {V}}\). We assume a system consisting of four distinct sets of processes: a set \(\mathcal {W}\) of writers, a set \(\mathcal {R}\) of readers, a set \(\mathcal {G}\) of reconfiguration clients, and a set \(\mathcal {S}\) of servers. Let \(\mathcal {I}= \mathcal {W}\cup \mathcal {R}\cup \mathcal {G}\) be the set of clients. Servers host data elements (replicas or encoded data fragments). Each writer is allowed to modify the value of a shared object, and each reader is allowed to obtain the value of that object. Reconfiguration clients attempt to introduce new configuration of servers to the system in order to mask transient errors and to ensure the longevity of the service. Processes communicate via messages through asynchronous, and reliable channels.
Configurations. A configuration, with a unique identifier from a set \(\mathcal {C}\), is a data type that describes the finite set of servers that are used to implement the atomic storage service. In our setting, each configuration is also used to describe the way the servers are grouped into sets, called quorums, s.t. each pair of quorums intersect, the consensus instance that is used as an external service to determine the next configuration, and a set of DAPs that specify the interaction of the clients and servers in the configuration (see Section 3).
More formally, a configuration, \(c\in \mathcal {C}\), consists of: \((i)\)\(c.Servers\subseteq \mathcal {S}\): a set of server identifiers; \((ii)\)\(c.Quorums\): the set of quorums on \(c.Servers\), s.t. \(\forall Q_1,Q_2\in c.Quorums, Q_1,Q_2\subseteq c.Servers\) and \(Q_1\cap Q_2\ne \emptyset\); \((iii)\)\({DAP(c)}\): the set of primitives (operations at level lower than reads or writes) that clients in \(\mathcal {I}\) may invoke on \(c.Servers\); and \((iv)\)\(c.Con\): a consensus instance with the values from \(\mathcal {C}\), implemented and running on top of the servers in \(c.Servers\). We refer to a server \(s \in c.Servers\) as a member of configuration c. The consensus instance \(c.Con\) in each configuration c is used as a service that returns the identifier of the configuration that follows c.
Executions. An algorithm A is a collection of processes, where process \(A_p\) is assigned to process\(p\in \mathcal {I}\cup \mathcal {S}\). The state, of a process \(A_p\) is determined over a set of state variables, and the state \(\sigma\) of A is a vector that contains the state of each process. Each process \(A_p\) implements a set of actions. When an action \(\alpha {}\) occurs it causes the state of \(A_p\) to change, say from some state \(\sigma _p\) to some different state \(\sigma _p^{\prime }\). We call the triple \(\langle \sigma _p, \alpha {}, \sigma _p^{\prime } \rangle\) a step of \(A_p\). Algorithm A performs a step, when some process \(A_p\) performs a step. An action \(\alpha {}\) is enabled in a state \(\sigma\) if \(\exists\) a step \(\langle \sigma , \alpha {}, \sigma ^{\prime } \rangle\) to some state \(\sigma ^{\prime }\). An execution is an alternating sequence of states and actions of A starting with the initial state and ending in a state. An execution \({\xi }\) is fair if enabled actions perform a step infinitely often. In the rest of the article, we consider executions that are fair and well-formed. A process pcrashes in an execution if it stops taking steps; otherwise p is correct or non-faulty. We assume a function \(c.\mathcal {F}\) to describe the failure model of a configuration c.
Reconfigurable Atomic Read/Write Objects. A reconfigurable atomic object supports three operations: \({\sf read}()\), \({\sf write}(v)\) and \({\sf reconfig}(c)\). A read() operation returns the value of the atomic object, \({\sf write}(v)\) attempts to modify the value of the object to \(v\in {\mathcal {V}}\), and the \({\sf reconfig}(c)\) that attempts to install a new configuration \(c\in \mathcal {C}\). We assume well-formed executions where each client may invoke one operation (\({\sf read}()\), \({\sf write}(v)\) or \({\sf reconfig}(c)\)) at a time.
An implementation of a read/write or a reconfig operation contains an invocation action (such as a call to a procedure) and a response action (such as a return from the procedure). An operation \(\pi\) is complete in an execution, if it contains both the invocation and the matching response actions for \(\pi\); otherwise \(\pi\) is incomplete. We say that an operation \(\pi\)precedes an operation \(\pi ^{\prime }\) in an execution \({\xi }\), denoted by \(\pi \rightarrow \pi ^{\prime }\), if the response step of \(\pi\) appears before the invocation step of \(\pi ^{\prime }\) in \({\xi }\). Two operations are concurrent if neither precedes the other. An implementation A of a read/write object satisfies the atomicity (linearizability [29]) property if the following holds [38]. Let the set \(\Pi\) contain all complete read/write operations in any well-formed execution of A. Then there exists an irreflexive partial ordering \(\prec\) satisfying the following:
A1.
For any operations \(\pi _1\) and \(\pi _2\) in \(\Pi\), if \(\pi _1\rightarrow \pi _2\), then it cannot be the case that \(\pi _2\prec \pi _1\).
A2.
If \(\pi \in \Pi\) is a write operation and \(\pi ^{\prime }\in \Pi\) is any read/write operation, then either \(\pi \prec \pi ^{\prime }\) or \(\pi ^{\prime }\prec \pi\).
A3.
The value returned by a read operation is the value written by the last preceding write operation according to \(\prec\) (or the initial value if there is no such write).
Storage and Communication Costs. We are interested in the complexity of each read and write operation. The complexity of each operation \(\pi\) invoked by a process p, is measured with respect to three metrics, during the interval between the invocation and the response of \(\pi\): \((i)\)number of communication round, accounting the number of messages exchanged during \(\pi\), \((ii)\)storage efficiency (storage cost), accounting the maximum storage requirements for any single object at the servers during \(\pi\), and \((iii)\)message bit complexity (communication cost) which measures the size of the messages used during \(\pi\).
We define the total storage cost as the size of the data stored across all servers, at any point during the execution of the algorithm. The communication cost associated with a read or write operation is the size of the total data that gets transmitted in the messages sent as part of the operation. We assume that metadata, such as version number, process ID, and so on used by various operations is of negligible size, and is hence ignored in the calculation of storage and communication cost. Furthermore, we normalize both costs with respect to the size of the value v; in other words, we compute the costs under the assumption that v has size 1 unit.
Erasure Codes. We use an \([n, k]\) linear MDS code [31] over a finite field \(\mathbb {F}_q\) to encode and store the value v among the n servers. An \([n, k]\) MDS code has the property that any k out of the n coded elements can be used to recover (decode) the value v. For encoding, v is divided into k elements \(v_1, v_2, \ldots v_k\) with each element having size \(\frac{1}{k}\) (assuming size of v is 1). The encoder takes the k elements as input and produces n coded elements \(e_1, e_2, \ldots , e_n\) as output, i.e., \([e_1, \ldots , e_n] = \Phi ([v_1, \ldots , v_k])\), where \(\Phi\) denotes the encoder. For ease of notation, we simply write \(\Phi (v)\) to mean \([e_1, \ldots , e_n]\). The vector \([e_1, \ldots , e_n]\) is referred to as the codeword corresponding to the value v. Each coded element \(c_i\) also has size \(\frac{1}{k}\). In our scheme, we store one coded element per server. We use \(\Phi _i\) to denote the projection of \(\Phi\) on to the ith output component, i.e., \(e_i = \Phi _i(v)\). Without loss of generality, we associate the coded element \(e_i\) with server i, \(1 \le i \le n\).
Tags. We use logical tags to order operations. A tag \(\tau _{}\) is defined as a pair \((z, w)\), where \(z \in \mathbb {N}\) and \(w \in \mathcal {W}\), an ID of a writer. Let \(\mathcal {T}\) be the set of all tags. Notice that tags could be defined in any totally ordered domain and given that this domain is countably infinite, then there can be a direct mapping to the domain we assume. For any \(\tau _{1}, \tau _{2} \in \mathcal {T}\) we define \(\tau _{2} \gt \tau _{1}\) if \((i)\)\(\tau _{2}.z \gt \tau _{1}.z\) or \((ii)\)\(\tau _{2}.z = \tau _{1}.z\) and \(\tau _{2}.w \gt \tau _{1}.w\).
For ease of reference, Table 2 presents the key notation used in this article. Notice that some of the symbols shown are defined and used in the following sections.
Table 2.
\(\mathcal {S}\)
the set of server identifiers
\(\mathcal {I}\)
the set of client identifiers
\(\mathcal {R}\)
the set of reader identifiers in \(\mathcal {I}\)
\(\mathcal {W}\)
the set of writer identifiers in \(\mathcal {I}\)
\(\mathcal {G}\)
the set of reconfigurer identifiers in \(\mathcal {I}\)
\({\mathcal {V}}\)
the set of values allowed to be written on the shared object
v
a value in \({\mathcal {V}}\)
\(\mathcal {T}\)
the set of pairs in \(\mathbb {N}\times \mathcal {W}\)
\(\tau _{}\)
a pair \((z, w)\in \mathcal {T}\)
\(\mathcal {C}\)
the set of configuration identifiers
c
a configuration with identifier in \(\mathcal {C}\)
\(c.Servers\)
the set of servers s.t. \(c.Servers\subseteq \mathcal {S}\) in configuration c
\(c.Quorums\)
the set of subsets of servers s.t. \(\forall Q\in c.Quorums\), \(Q\subseteq c.Servers\) and \(\forall Q_1, Q_2\in c.Quorums, Q_1\cap Q_2\ne \emptyset\)
\(\sigma\)
the state of an algorithm A
\(\sigma _p\)
the state of process \(p\in \mathcal {I}\cup \mathcal {S}\) in state \(\sigma\) determined over a set of state variables
\(p.var|_{\sigma }\)
the value of the state variable var at process p in state \(\sigma\)
\({\xi }\)
an execution of algorithm A which is a finite or infinite sequence of alternative states and actions beginning with the initial state of A
\(\Phi ([v_1, \ldots , v_k])\) or \(\Phi ([v])\)
the \([n,k]\) encoder function given k fragments of value v, \([v_1, \ldots , v_k]\)
\(e_i\)
the ith encoded word, for \(1\le i\le n\), produced by \(\Phi ([v])\)
\({\mathcal {G}_L}\)
configuration sequence composed of pairs in \(\lbrace \mathcal {C}\cup \lbrace \bot \rbrace \rbrace \times \lbrace F,P\rbrace\), where F finalized and P pending, and initially contains \(\langle c_0, F \rangle\)
Table 2. List of Symbols Used to Describe Our Model of Computation
3 Data Access Primitives
In this section we introduce a set of primitives, we refer to as DAP, which are invoked by the clients during read/write/reconfig operations and are defined for any configuration c in Ares. The DAPs allow us: \((i)\) to describe Ares in a modular manner, and \((ii)\) a cleaner reasoning about the correctness of Ares.
We define three DAPs for each \(c\in \mathcal {C}\): \((i)\)\({c}.{{\sf put-data}(\langle \tau _{},v \rangle)}\), via which a client can ingest the tag value pair \(\langle \tau _{},v \rangle\) in to the configuration c; \((ii)\)\({c}.{{\sf get-data}()}\), used to retrieve the most up to date tag and vlaue pair stored in the configuration c; and \((iii)\)\({c}.{{\sf get-tag}()}\), used to retrieve the most up to date tag for an object stored in a configuration c. More formally, assuming a tag \(\tau _{}\) from a set of totally ordered tags \({\mathcal {T}}\), a value v from a domain \({\mathcal {V}}\), and a configuration c from a set of identifiers \(\mathcal {C}\), the three primitives are defined as follows:
In order for the DAPs to be useful in designing the Ares algorithm we further require the following consistency properties. As we see later in Section 6, the safety property of Ares holds, given that these properties hold for the DAPs in each configuration.
In Section 5, we show how to implement a set of DAPs, where erasure-codes are used to reduce storage and communication costs. Our DAP implementation satisfies Property 1.
As noted earlier, expressing Ares in terms of the DAPs allows one to achieve a modular design. Modularity enables the usage of different DAP implementations per configuration, during any execution of Ares, as long as the DAPs implemented in each configuration satisfy Property 1. For example, the DAPs in a configuration c may be implemented using replication, while the DAPs in the next configuration say \(c^{\prime }\), may be implemented using erasure-codes. Thus, a system may use a scheme that offers higher fault tolerance (e.g., replication) when storage is not an issue while switching to a more storage efficient (less fault-tolerant) scheme when storage gets limited.
In Section 8, we show that the presented DAPs are not only suitable for algorithm Ares, but can also be used to implement a large family of atomic read/write storage implementations. By describing an algorithm A according to a simple algorithmic template (see Algorithm 1), we show that A preserves safety (atomicity) if the used DAPs satisfy Property 1, and A preserves liveness (termination), if every invocation of the used DAPs terminates, under the failure model assumed.
4 The Ares Protocol
In this section, we describe Ares. In the presentation of Ares algorithm we decouple the reconfiguration service from the shared memory emulation, by utilizing the DAPs presented in Section 3. This allows Ares, to handle both the reorganization of the servers that host the data, as well as utilize a different atomic memory implementation per configuration. It is also important to note that Ares adopts a client-server architecture and separates the reader, writer and reconfiguration processes from the server processes that host the object data. More precisely, Ares algorithm comprises of three major components: \((i)\) The reconfiguration protocol which consists of invoking, and subsequently installing new configuration via the reconfig operation by recon clients. \((ii)\) The read/write protocol for executing the read and write operations invoked by readers and writers. \((iii)\) The implementation of the DAPs for each installed configuration that respect Property 1 and which are used by the reconfig, read and write operations.
4.1 Implementation of the Reconfiguration Service
In this section, we describe the reconfiguration service in Ares. The service relies on an underlying sequence of configurations (already proposed or installed by reconfig operations), in the form of a “distributed list”, which we refer to as the global configuration sequence (or list)\({\mathcal {G}_L}\). Conceptually, \({\mathcal {G}_L}\) represents an ordered list of pairs \(\langle cfg, status \rangle\), where cfg is a configuration identifier (\(cfg \in \mathcal {C}\)), and a binary state variable \(status \in \lbrace F, P\rbrace\) that denotes whether c is finalized (F) or is still pending (P). Initially, \({\mathcal {G}_L}\) contains a single element, say \(\langle c_0, F \rangle\), which is known to every participant in the service.
To facilitate the creation of \({\mathcal {G}_L}\), each server in \(c.Servers\) maintains a local variable \(nextC \in \lbrace \mathcal {C}\cup \lbrace \bot \rbrace \rbrace \times \lbrace P,F\rbrace\), which is used to point to the configuration that follows c in \({\mathcal {G}_L}\). Initially, at any server \(nextC = \langle \bot , F \rangle\). Once nextC is set to a value it is never altered. As we show below, at any point in the execution of Ares and in any configuration c, the nextC variables of the non-faulty servers in c that are not equal to \(\bot\) agree, i.e., \(\lbrace s.nextC : s \in c.Servers \wedge s.nextC\ne \bot \rbrace\) is either empty of has only one element.
Clients discover the configuration that follows a \(\langle c,* \rangle\) in the sequence by contacting a subset of servers in \(c.Servers\) and collecting their nextC variables. Every client in \(\mathcal {I}\) maintains a local variable cseq that is expected to be some subsequence of \({\mathcal {G}_L}\). Initially, at every client the value of cseq is \(\langle c_0,F \rangle\).
Reconfiguration clients may introduce new configurations, each associated with a unique configuration identifier from \(\mathcal {C}\). Multiple clients may concurrently attempt to introduce different configurations for same next link in \({\mathcal {G}_L}\). Ares uses consensus to resolve such conflicts: a subset of servers in \(c.Servers\), in each configuration c, implements a distributed consensus service (such as Paxos [37], RAFT [45]), denoted by \(c.Con\).
The reconfiguration service consists of two major components: \((i)\)sequence traversal, responsible of discovering a recent configuration in \({\mathcal {G}_L}\), and \((ii)\)reconfiguration operation that installs new configurations in \({\mathcal {G}_L}\).
Sequence Traversal. Any read/write/reconfig operation \(\pi\) utilizes the sequence traversal mechanism to discover the latest state of the global configuration sequence, as well as to ensure that such a state is discoverable by any subsequent operation \(\pi ^{\prime }\). See Figure 1 for an example execution in the case of a reconfig operation. In a high level, a client starts by collecting the nextC variables from a quorum of servers in a configuration c, such that \(\langle c,F \rangle\) is the last finalized configuration in that client’s local cseq variable (or \(c_0\) if no other finalized configuration exists). If any server s returns a nextC variable such that \(nextC.cfg\ne \bot\), then the client \((i)\) adds nextC in its local cseq, \((ii)\) propagates nextC in a quorum of servers in \(c.Servers\), and \((iii)\) repeats this process in the configuration \(nextC.cfg\). The client terminates when all servers reply with \(nextC.cfg=\bot\). More precisely, the sequence parsing consists of three actions (see Algorithm 1):
Fig. 1.
get-next-config\((c)\): The action \({\sf get-next-config}\) returns the configuration that follows c in \({\mathcal {G}_L}\). During get-next-config\((c)\), a client sends read-config messages to all the servers in \(c.Servers\), and waits for replies containing nextC from a quorum in \(c.Quorums\). If there exists a reply with \(nextC.cfg\ne \bot\) the action returns nextC; otherwise it returns \(\bot\).
put-config\((c, c^{\prime })\): The \({\sf put-config}(c, c^{\prime })\) action propagates \(c^{\prime }\) to a quorum of servers in \(c.Servers\). During the action, the client sends \((\mbox{{write-config}}, c^{\prime })\) messages, to the servers in \(c.Servers\) and waits for each server s in some quorum \(Q\in c.Quorums\) to respond.
read-config\((seq)\): A \({\sf read-config}(seq)\) sequentially traverses the installed configurations in order to discover the latest state of the sequence \({\mathcal {G}_L}\). At invocation, the client starts with the last finalized configuration \(\langle c, F \rangle\) in the given seq (Line A1:2), and enters a loop to traverse \({\mathcal {G}_L}\) by invoking \({\sf get-next-config}()\), which returns the next configuration, assigned to \(\widehat{c}\). While \(\widehat{c} \ne \bot\), then: (a) \(\widehat{c}\) is appended at the end of the sequence seq; (b) a \({\sf put-config}\) action is invoked to inform a quorum of servers in \(c.Servers\) to update the value of their nextC variable to \(\widehat{c}\). If \(\widehat{c} = \bot\) the loop terminates and the action read-config returns seq.
Reconfiguration operation. A reconfiguration operation \({\sf reconfig}(c)\), \(c \in \mathcal {C}\), invoked by any reconfiguration client \(rec_i\), attempts to append c to \({\mathcal {G}_L}\). The set of server processes in c are not a part of any other configuration different from c. In a high level, \(rec_i\) first executes a sequence traversal to discover the latest state of \({\mathcal {G}_L}\). Then it attempts to add the new configuration c, at the end of the discovered sequence by proposing c in the consensus instance of the last configuration in the sequence. The client accepts and appends the decision of the consensus instance (that might be different than c). Then it attempts to transfer the latest value of the read/write object to the latest installed configuration. Once the information is transferred, \(rec_i\) finalizes the last configuration in its local sequence and propagates the finalized tuple to a quorum of servers in that configuration. The operation consists of four phases, executed consecutively by \(rec_i\) (see Algorithm 2):
\({\sf read-config}(seq)\): The phase \({\sf read-config}(seq)\) at \(rec_i\), reads the recent global configuration sequence as described in the sequence traversal.
\({\sf add-config}(seq, c)\): The \({\sf add-config}(seq, c)\) attempts to append a new configuration c to the end of seq (client’s view of \({\mathcal {G}_L}\)). Suppose the last configuration in seq is \(c^{\prime }\) (with status either F or P), then in order to decide the most recent configuration, \(rec_i\) invokes \(c^{\prime }.Con.propose(c)\), on the consensus object associated with configuration \(c^{\prime }\). Let \(d\in \mathcal {C}\) be the configuration identifier decided by the consensus service. If \(d \ne c\), this implies that another (possibly concurrent) reconfiguration operation, invoked by a reconfigurer \(rec_j\ne rec_i\), proposed and succeeded d as the configuration to follow \(c^{\prime }\). In this case, \(rec_i\) adopts d as it own propose configuration, by adding \(\langle d, P \rangle\) to the end of its local cseq (entirely ignoring c), using the operation \({\sf put-config}(c^{\prime }, \langle d, P \rangle)\), and returns the extended configuration seq.
\({\sf update-config}(seq)\): Let us denote by \(\mu\) the index of the last configuration in the local sequence cseq, at \(rec_i\), such that its corresponding status is F; and \(\nu\) denotes the last index of cseq. Next \(rec_i\) invokes \({\sf update-config}(cseq)\), which gathers the tag-value pair corresponding to the maximum tag in each of the configurations in \(cseq[i]\) for \(\mu \le i \le \nu\), and transfers that pair to the configuration that was added by the \({\sf add-config}\) action. The \({\sf get-data}\) and \({\sf put-data}\) DAPs are used to transfer the value of the object to the new configuration, and they are implemented with respect to the configuration that is accessed. Suppose \(\langle t_{max}, v_{max} \rangle\) is the tag value pair corresponding to the highest tag among the responses from all the \(\nu - \mu + 1\) configurations. Then, \(\langle t_{max}, v_{max} \rangle\) is written to the configuration d via the invocation of \(cseq[\nu ].cfg.{\sf put-data}(\langle \tau _{max},v_{max} \rangle)\).
\({\sf finalize-config}(cseq)\): Once the tag-value pair is transferred, in the last phase of the reconfiguration operation, \(rec_i\) executes \({\sf finalize-config}(cseq)\), to update the status of the last configuration in cseq, say \(d = cseq[\nu ].cfg\), to F. The reconfigurer \(rec_i\) informs a quorum of servers in the previous configuration \(c=cseq[\nu -1].cfg\), i.e., in some \(Q \in c.Quorums\), about the change of status, by executing the \({\sf put-config}(c, \langle d,F \rangle)\) action.
Server Protocol. Each server responds to requests from clients (Algorithm 3). A server waits for two types of messages: read-config and write-config. When a read-config message is received for a particular configuration \(c_k\), then the server returns nextC variables of the servers in \(c_k.Servers\). A write-config message attempts to update the nextC variable of the server with a particular tuple \(cfgT_{in}\). A server changes the value of its local \(nextC.cfg\) in two cases: (i) \(nextC.cfg=\bot\), or (ii) \(nextC.status= P\).
Figure 1 illustrates an example execution of a reconfiguration operation \({\sf recon}(c_5)\). In this example, the reconfigurer \(rec_i\) goes through a number of configuration queries (get-next-config) before it reaches configuration \(c_4\) in which a quorum of servers replies with \(nextC.cfg=\bot\). There it proposes \(c_5\) to the consensus object of \(c_4\) (\(c_4.Con.propose(c_5)\) on arrow 10), and once \(c_5\) is decided, \({\sf recon}(c_5)\) completes after executing \({\sf finalize-config}(c_5)\).
4.2 Implementation of Read and Write Operations
The read and write operations in Ares are expressed in terms of the DAP primitives (see Section 3). This provides the flexibility to Ares to use different implementation of DAP primitives in different configurations, without changing the basic structure of Ares. At a high-level, a write (or read) operation is executed where the client: \((i)\) obtains the latest configuration sequence by using the \({\sf read-config}\) action of the reconfiguration service, \((ii)\) queries the configurations, in cseq, starting from the last finalized configuration to the end of the discovered configuration sequence, for the latest \(\langle tag,value \rangle\) pair with a help of \({\sf get-tag}\) (or \({\sf get-data}\)) operation as specified for each configuration, and \((iii)\) repeatedly propagates a new \(\langle tag^{\prime }, value^{\prime } \rangle\) pair (the largest \(\langle tag,value \rangle\) pair) with \({\sf put-data}\) in the last configuration of its local sequence, until no additional configuration is observed. In more detail, the algorithm of a read or write operation \(\pi\) is as follows (see Algorithm 4):
A write (or read) operation is invoked at a client p when line Algorithm 4:8 (respectively, line Algorithm 4:31) is executed. At first, p issues a \({\sf read-config}\) action to obtain the latest introduced configuration in \({\mathcal {G}_L}\), in both operations.
If \(\pi\) is a writep detects the last finalized entry in cseq, say \(\mu\), and performs a \(cseq[j].conf.{\sf get-tag}()\) action, for \(\mu \le j\le |cseq|\) (line Algorithm 4:9). Then p discovers the maximum tag among all the returned tags (\(\tau _{max}\)), and it increments the maximum tag discovered (by incrementing the integer part of \(\tau _{max}\)), generating a new tag, say \(\tau _{new}\). It assigns \(\langle \tau _{}, v \rangle\) to \(\langle \tau _{new}, val \rangle\), where val is the value he wants to write (Line Algorithm 4:13).
if \(\pi\) is a read, p detects the last finalized entry in cseq, say \(\mu\), and performs a \(cseq[j].conf.{\sf get-data}()\) action, for \(\mu \le j\le |cseq|\) (line Algorithm 4:32). Then p discovers the maximum tag-value pair (\(\langle \tau _{max},v_{max} \rangle\)) among the replies, and assigns \(\langle \tau _{}, v \rangle\) to \(\langle \tau _{max},v_{max} \rangle\).
Once specifying the \(\langle \tau _{}, v \rangle\) to be propagated, both reads and writes execute the \(cseq[\nu ].cfg.{\sf put-data}(\langle \tau _{}, v \rangle)\) action, where \(\nu =|cseq|\), followed by executing \({\sf read-config}\) action, to examine whether new configurations were introduced in \({\mathcal {G}_L}\). This is an essential step that ensures that any new value of the object is propagated in any recently introduced configuration. Omission to do so may lead an operation that reads from a newly established configuration to obtain an outdated value for the shared object, violating this way atomic consistency. Each operation repeats these steps until no new configuration is discovered (lines Algorithm 4:15–21, or lines Algorithm 4:37–43). Let \(cseq^{\prime }\) be the sequence returned by the \({\sf read-config}\) action. If \(|cseq^{\prime }| = |cseq|\) then no new configuration is introduced, and the read/write operation terminates; otherwise, p sets cseq to \(cseq^{\prime }\) and repeats the two actions. Note, in an execution of Ares, two consecutive \({\sf read-config}\) operations that return \(cseq^{\prime }\) and \(cseq^{\prime \prime }\) respectively must hold that \(cseq^{\prime }\) is a prefix of \(cseq^{\prime \prime }\), and hence \(|cseq^{\prime }|=|cseq^{\prime \prime }|\) only if \(cseq^{\prime } = cseq^{\prime \prime }\). Finally, if \(\pi\) is a read operation the value with the highest tag discovered is returned to the client.
Discussion.Ares shares similarities with previous algorithms like RAMBO [28] and the framework in [48]. The reconfiguration technique used in Ares ensures the prefix property on the configuration sequence (resembling a blockchain data structure [42]) while the array structure in RAMBO allowed nodes to maintain an incomplete reconfiguration history. On the other hand, the DAP usage, exploits a different viewpoint compared to [48], allowing implementations of atomic read/write registers without relying on strong objects, like ranked registers [18]. Note that Ares is designed to capture a wide class of algorithms with different redundancy strategies. So while not directly implementing an EC-based atomic memory, it provides the “vehicle” without which dynamic EC-based implementations would not have been possible. Lastly, even though Ares is designed to support crash failures, as noted by [41], reconfiguration is more general and allows an algorithm to handle benign recoveries. That is, a recovered node that loses its state can be introduced as a new member of a new configuration. Stateful recoveries on the other hand are indistinguishable from long delays, thus can be handled effectively by an algorithm designed for the asynchronous model like Ares.
5 Implementation of the DAPs
In this section, we present an implementation of the DAPs, that satisfies the properties in Property 1, for a configuration c, with n servers using a \([n, k]\) MDS coding scheme for storage. Notice that the total number of servers in the system can be larger than n; however, we can pick a subset of n servers to use for this particular key and instance of the algorithm. We store each coded element \(c_i\), corresponding to an object at server \(s_i\), where \(i=1, \ldots , n\). The implementations of DAP primitives used in Ares are shown in Algorithm 5, and the servers’ responses in Algorithm 6.
Each server \(s_i\) stores one state variable, List, which is a set of up to \((\delta + 1)\) (tag, coded-element) pairs. Initially the set at \(s_i\) contains a single element, \(List = \lbrace (t_0, \Phi _i(v_0)\rbrace\). Below we describe the implementation of the DAPs.
\({c}.{{\sf get-tag}()}\): A client, during the execution of a \({c}.{{\sf get-tag}()}\) primitive, queries all the servers in \(c.Servers\) for the highest tags in their Lists, and await responses from \(\lceil \frac{n+k}{2} \rceil\) servers. A server upon receiving the get-tag request, responds to the client with the highest tag, as \(\tau _{max} \equiv \max _{(t,c) \in List}t\). Once the client receives the tags from \(\lceil \frac{n+k}{2}\) servers, it selects the highest tag t and returns it.
\(c.{\sf put-data}(\langle t_w, v \rangle)\): During the execution of the primitive \(c.{\sf put-data}(\langle t_w, v \rangle)\), a client sends the pair \((t_w, \Phi _i(v))\) to each server \(s_i\in c.Servers\). When a server \(s_i\) receives a message \((\text{put-data}, t_w, c_i)\), it adds the pair in its local List, trims the pairs with the smallest tags exceeding the length \((\delta +1)\) of the List, and replies with an ack to the client. In particular, \(s_i\) replaces the coded-elements of the older tags with \(\bot\), and maintains only the coded-elements associated with the \((\delta +1)\) highest tags in the List (see Line Algorithm 6:16). The client completes the primitive operation after getting acks from \(\lceil \frac{n+k}{2} \rceil\) servers.
\({c}.{{\sf get-data}()}\): A client, during the execution of a \({c}.{{\sf get-data}()}\) primitive, queries all the servers in \(c.Servers\) for their local variable List, and awaits responses from \(\lceil \frac{n+k}{2} \rceil\) servers. Once the client receives Lists from \(\lceil \frac{n+k}{2} \rceil\) servers, it selects the highest tag t, such that: \((i)\) its corresponding value v is decodable from the coded elements in the lists; and \((ii)\)t is the highest tag seen from the responses of at least kLists (see lines Algorithm 5:11–14) and returns the pair \((t, v)\). Note that in the case where anyone of the above conditions is not satisfied the corresponding read operation does not complete.
5.1 Correctness of the DAPs
To proove the correctness of the proposed DAPs, we need to show that they are both safe, i.e., ensure the necessary Property 1, and live, i.e., they allow each operation to terminate. We first proceed to prove that for any given execution \({\xi }\) containing operations of the proposed implementation, then examining any pair of operations in \({\xi }\) satisfy the DAP consistency properties (i.e., Property 1). That is, the tag returned by a \({\sf get-tag}()\) operation is larger than the value written by any preceding \({\sf put-data}()\) operation, and the value returned by a \({\sf get-data}()\) operation is either written by a \({\sf put-data}()\) operation or is the initial value of the object. Next, assuming that there cannot be more that \(\delta\)\({\sf put-data}()\) operations concurrent with a single \({\sf get-data}()\) operation, we show that each operation in our implementation terminates. Otherwise, a \({\sf get-data}()\) operation is at risk of not being able to discover a decodable value and thus fails to terminate and return a value.
Safety (Property 1). In this section, we are concerned with only one configuration c, consisting of a set of servers \(c.Servers\). We assume that at most \(f \le \frac{n-k}{2}\) servers from \(c.Servers\) may crash. Lemma 2 states that the DAP implementation satisfies the consistency properties of Property 1 which will be used to imply the atomicity of the Ares algorithm.
Liveness. To reason about the liveness of the proposed DAPs, we define a concurrency parameter \(\delta\) which captures all the \({\sf put-data}\) operations that overlap with the \({\sf get-data}\), until the time the client has all data needed to attempt decoding a value. However, we ignore those \({\sf put-data}\) operations that might have started in the past, and never completed yet, if their tags are less than that of any \({\sf put-data}\) that completed before the \({\sf get-data}\) started. This allows us to ignore \({\sf put-data}\) operations due to failed clients, while counting concurrency, as long as the failed \({\sf put-data}\) operations are followed by a successful \({\sf put-data}\) that completed before the \({\sf get-data}\) started. In order to define the amount of concurrency in our specific implementation of the DAPs presented in this section the following definition captures the \({\sf put-data}\) operations that overlap with the \({\sf get-data}\) until the client has all data required to decode the value.
Termination (and hence liveness) of the DAPs is guaranteed in an execution \({\xi }\), provided that a process no more than f servers in \(c.Servers\) crash, and no more that \(\delta\)\({\sf put-data}\) may be concurrent at any point in \({\xi }\). If the failure model is satisfied, then any operation invoked by a non-faulty client will collect the necessary replies independently of the progress of any other client process in the system. Preserving \(\delta\) on the other hand, ensures that any operation will be able to decode a written value. These are captured in the following theorem:
6 Correctness of Ares
In this section, we prove that Ares correctly implements an atomic, read/write, shared storage service. The correctness of Ares highly depends on the way the configuration sequence is constructed at each client process. Also, atomicity is ensured if the DAP implementation in each configuration \(c_i\) satisfies Property 1.
As a roadmap, we begin by showing that some critical properties are preserved by the reconfiguration service proposed in Section 6.1. In particular, we show that the configuration sequence maintained in two processes is either the same or one is the prefix of the other. This in turn helps us to proove the correctness of Ares in Section 6.2 by showing that all the properties of atomicity (see Section 2) are satisfied, given the properties on the configuration sequence hold, and that the DAPs used in each configuration satisfy Property 1.
We proceed by first introducing some definitions and notation, that we use in the proofs that follow.
Notations and definitions. For a server s, we use the notation \(s.var|_{\sigma }\) to refer to the value of the state variable var, in s, at a state \(\sigma\) of an execution \({\xi }\). If server s crashes at a state \(\sigma _f\) in an execution \({\xi }\) then \(s.var|_{\sigma }\triangleq s.var|_{\sigma _f}\) for any state variable var and for any state \(\sigma\) that appears after \(\sigma _f\) in \({\xi }\) (i.e., the value of the variable remains unchanged).
We define as the tag of a configuration c the smallest tag among the maximum tags found in each quorum of c. This is essentially the smallest tag that an operation may witness when receiving replies from a single quorum in c. More formally:
Next we provide the notation to express the configuration sequence witnessed by a process p in a state \(\sigma\) (as \(p.cseq|_{\sigma }\)), the last finalized configuration in that sequence (as \(\mu (\mathbf {c}^{p}_{\sigma })\)), and the length of that sequence (as \(\nu (\mathbf {c}^{p}_{\sigma })\)). More formally:
Last, we define the prefix operation on two configuration sequences.
Table 3 summarizes the new notation for ease of reference.
Table 3.
\(\mathbf {c}^{p}_{\sigma }\)
the value of the configuration sequence variable cseq at process p in state \(\sigma\), i.e., a shorthand of \(p.cseq|_{\sigma }\)
\(\mathbf {c}^{p}_{\sigma }[i]\)
the \(i{th}\) element in the configuration sequence \(\mathbf {c}^{p}_{\sigma }\)
\(\mu (\mathbf {c}^{p}_{\sigma })\)
last finalized configuration in \(\mathbf {c}^{p}_{\sigma }\)
\(\nu (\mathbf {c}^{p}_{\sigma })\)
the length of \(\mathbf {c}^{p}_{\sigma }\), i.e., \(|\mathbf {c}^{p}_{\sigma }|\)
Table 3. Additional Notation Used in This Section
6.1 Reconfiguration Protocol Properties
In this section, we analyze the properties that we can achieve through our reconfiguration algorithm. In high-level, we do show that the following properties are preserved:
(i)
configuration uniqueness: the configuration sequences in any two processes have identical configuration at any place i,
(ii)
sequence prefix: the configuration sequence witnessed by an operation is a prefix of the sequence witnessed by any succeeding operation, and
(iii)
sequence progress: if the configuration with index i is finalized during an operation, then a configuration j, for \(j\ge i\), will be finalized for a succeeding operation.
The first lemma shows that any two configuration sequences have the same configuration identifiers in the same indexes.
Lemma 11 showed that any two operations store the same configuration in any cell k of their cseq variable. It is not known however if the two processes discover the same number of configuration ids. In the following lemmas we will show that if a process learns about a configuration in a cell k then it also learns about all configuration ids for every index i, such that \(0\le i\le k-1\).
We can now move to an important lemma that shows that any read-config action returns an extension of the configuration sequence returned by any previous read-config action. First, we show that the last finalized configuration observed by any read-config action is at least as recent as the finalized configuration observed by any subsequent read-config action.
Thus far we focused on the configuration member of each element in cseq. As operations do get into account the status of a configuration, i.e., P or F, in the next lemma we will examine the relationship of the last finalized configuration as detected by two operations. First, we present a lemma that shows the monotonicity of the finalized configurations.
Using the previous Lemmas we can conclude the main result of this section.
6.2 Atomicity Property of Ares
Given the properties satisfied by the reconfiguration algorithm of Ares in any execution, we can now proceed to examine whether our algorithm satisfies the safety (atomicity) conditions. The propagation of the information of the distributed object is achieved using the get-tag, get-data, and put-data actions. We assume that the DAP used satisfies Property 1 as presented in Section 3, and we will show that, given such an assumption, Ares satisfies atomicity.
We begin with a lemma stating that if a reconfiguration operation retrieves a configuration sequence of length k during its \({\sf read-config}\) action, then it installs/finalizes the \(k + 1\) configuration in the global configuration sequence.
The next lemma states that only some reconfiguration operation \(\pi\) may finalize a configuration c at index j in a configuration sequence \(p.cseq\) at any process p. To finalize c, the lemma shows that \(\pi\) must witness a configuration sequence such that its last finalized configuration appears at an index \(i\lt j\) in the configuration sequence \(p.cseq\). In other words, reconfigurations always finalize configurations that are ahead of their latest observed final configuration, and it seems like “jumping” from one final configuration to the next.
We now reach an important lemma of this section. By Ares, before a read/write/reconfig operation completes it propagates the maximum tag it discovered by executing the \({\sf put-data}\) action in the last configuration of its local configuration sequence (Lines A2:18, A4:16, A4:38). When a subsequent operation is invoked, it reads the latest configuration sequence by beginning from the last finalized configuration in its local sequence and invoking \({\sf read-data}\) to all the configurations until the end of that sequence. The lemma shows that the latter operation will retrieve a tag, which is higher than the tag used in the \({\sf put-data}\) action of any preceding operation.
The following lemma shows the consistency of operations as long as the DAP used satisfies Property 1.
And the main result of this section follows:
As algorithm Ares handles each configuration separately, then we can observe that the algorithm may utilize a different mechanism for the put and get primitives in each configuration. So the following remark:
7 Performance Analysis of Ares
A major challenge in reconfigurable atomic services is to examine the latency of terminating read and write operations, especially when those are invoked concurrently with reconfiguration operations. In this section, we provide an in-depth analysis of the latency of operations in Ares. Additionally, a storage and communication analysis is shown when Ares utilizes the erasure-coding algorithm presented in Section 5, in each configuration.
7.1 Latency Analysis
The idea behind our latency analysis is quite straightforward: we construct the worst-case execution that would allow all concurrent reconfigurations to add their proposed configuration. This leads to the longest configuration sequence that a read/write operation needs to traverse before completing. Thus, given a bounded delay, we compute the delay for each operation and we finally compute how long it is going to take for a read/write operation to catch up in the worst-case and complete.
Liveness (termination) properties cannot be specified for Ares, without restricting asynchrony or the rate of arrival of reconfig operations, or if the consensus protocol never terminates. Here, we provide some conditional performance analysis of the operation, based on latency bounds on the message delivery. We assume that local computations take negligible time and the latency of an operation is due to the delays in the messages exchanged during the execution. We measure delays in time units of some global clock, which is visible only to an external viewer. No process has access to the clock. Let d and D be the minimum and maximum durations taken by messages, sent during an execution of Ares, to reach their destinations. Also, let \(T(\pi)\) denote the duration of an operation (or action) \(\pi\). In the statements that follow, we consider any execution \({\xi }\) of Ares, which contains kreconfig operations. For any configuration c in an execution of Ares, we assume that any \(c.Con.{\sf propose}\) operation, takes at least \(T_{min}(CN)\) time units.
Let us first examine what is the action delays based on the boundaries we assume. It is easy to see that actions put-config, get-next-config perform two message exchanges thus take time \(2d\le T(\phi)\le 2D\). From this we can derive the delay of a read-config action.
From Lemma 25 it is clear that the latency of a \({\sf read-config}\) action depends on the number of configurations installed since the last finalized configuration known to the recon client.
Given the latency of a read-config, we can compute the minimum amount of time it takes for k configurations to be installed.
The following lemma shows the maximum latency of a read or a write operation, invoked by any non-faulty client. From Ares algorithm, the latency of a read/write operation depends on the delays of the DAPs operations. For our analysis we assume that all \({\sf get-data}\), \({\sf get-tag}\) and \({\sf put-data}\) primitives use two phases of communication. Each phase consists of a communication between the client and the servers.
In the following lemma, we estimate the time taken for a read or a write operation to complete, when it discovers k configurations between its invocation and response steps.
It remains now to examine the conditions under which a read/write operation may “catch up” with an infinite number of reconfiguration operations. For the sake of a worst-case analysis, we will assume that reconfiguration operations suffer the minimum delay d, whereas read and write operations suffer the maximum delay D in each message exchange. We first show how long it takes for k configurations to be installed.
The following theorem is the main result of this section, in which we define the relationship between \(T_{min}(CN)\), \(d,\) and D so to guarantee that any read or write issued by a non-faulty client always terminates.
Theorem 29.
Suppose \(T_{min}(CN) \ge 3(6D-d)\), then any read or write operation \(\pi\) completes in any execution \({\xi }\) of Ares for any number of reconfiguration operations in \({\xi }\).
Proof.
By Lemma 28, k configurations may be installed in: \(T(k) \ge 4d\sum _{i=1}^{k}i+ k\left(T_{min}(CN)+2d\right)\). Also by Lemma 27, we know that operation \(\pi\) takes at most \(T(\pi)\le 6D\left(\nu (\mathbf {c}^{p}_{\sigma _e}) - \mu (\mathbf {c}^{p}_{\sigma _s})+2\right)\). Assuming that \(k=\nu (\mathbf {c}^{p}_{\sigma _e}) - \mu (\mathbf {c}^{p}_{\sigma _s})\), the total number of configurations observed during \(\pi\), then \(\pi\) may terminate before a \(k+1\) configuration is added in the configuration sequence if \(6D(k+2) \le 4d\sum _{i=1}^{k}i+ k\left(T_{min}(CN)+2d\right)\) then we have \(d \ge \frac{3D}{k}-\frac{T_{min}(CN)}{2(k+2)}\). And that completes the lemma.□
7.2 Storage and Communication Costs for Ares
Storage and Communication costs for Ares highly depend on the DAP that we use in each configuration. For our analysis, we assume that each configuration utilizes the algorithms and the DAPs presented in Section 5.
Recall that by our assumption, the storage cost counts the size (in bits) of the coded elements stored in the variable List at each server. We ignore the storage cost due to meta-data. For communication cost, we measure the bits sent on the wire between the nodes.
Lemma 30.
The worst-case total storage cost of Algorithm 5 is \((\delta +1)\frac{n}{k}\).
Proof.
The maximum number of (tag, coded-element) pair in the List is \(\delta +1\), and the size of each coded element is \(\frac{1}{k}\) while the tag variable is a metadata and therefore, not counted. So, the total storage cost is \((\delta +1)\frac{n}{k}\).□
We next state the communication cost for the write and read operations in Aglorithm 5. Once again, note that we ignore the communication cost arising from exchange of meta-data.
Lemma 31.
The communication cost associated with a successful write operation in Algorithm 5 is at most \(\frac{n}{k}\).
Proof.
During read operation, in the \({\sf get-tag}\) phase, the servers respond with their highest tags variables, which are metadata. However, in the \({\sf put-data}\) phase, the reader sends each server the coded elements of size \(\frac{1}{k}\) each, and hence the total cost of communication for this is \(\frac{n}{k}\). Therefore, we have the worst case communication cost of a write operation is \(\frac{n}{k}\).□
Lemma 32.
The communication cost associated with a successful read operation in Algorithm 5 is at most \((\delta +2)\frac{n}{k}\).
Proof.
During read operation, in the \({\sf get-data}\) phase the servers respond with their List variables and hence each such list is of size at most \((\delta +1)\frac{1}{k}\), and then counting all such responses give us \((\delta +1)\frac{n}{k}\). In the \({\sf put-data}\) phase, the reader sends each server the coded elements of size \(\frac{1}{k}\) each, and hence the total cost of communication for this is \(\frac{n}{k}\). Therefore, we have the worst-case communication cost of a read operation is \((\delta +2) \frac{n}{k}\).□
From the above Lemmas we get.
Theorem 33.
The Ares algorithm has: (i) storage cost \((\delta +1)\frac{n}{k}\), (ii) communication cost for each write at most to \(\frac{n}{k}\), and (iii) communication cost for each read at most \((\delta +2)\frac{n}{k}\).
8 Flexibility of DAPs
In this section, we argue that various implementations of DAPs can be used in Ares. In fact, via reconfig operations, one can implement a highly adaptive atomic DSS: replication-based can be transformed into erasure-code-based DSS; increase or decrease the number of storage servers; study the performance of the DSS under various code parameters, and so on. The insight to implementing various DAPs comes from the observation that the simple algorithmic template A (see Algorithm 1) for reads and writes protocol combined with any implementation of DAPs, satisfying Property 1 gives rise to a MWMR atomic memory service. Moreover, the read and writes operations terminate as long as the implemented DAPs complete.
A read operation in A performs \({c}.{{\sf get-data}()}\) to retrieve a tag-value pair, \(\langle \tau _{},v \rangle\) from a configuration c, and then it performs a \(c.{\sf put-data}(\langle \tau _{},v \rangle)\) to propagate that pair to the configuration c. A write operation is similar to the read but before performing the \({\sf put-data}\) action it generates a new tag which associates with the value to be written. The following result shows that A is atomic and live, if the DAPs satisfy Property 1 and live.
Theorem 34 (Atomicity of Template A)
Suppose the DAP implementation satisfies the consistency properties C1 and C2 of Property 1 for a configuration \(c\in \mathcal {C}\). Then any execution \({\xi }\) of algorithm A in configuration c is atomic and live if each DAP invocation terminates in \({\xi }\) under the failure model \(c.\mathcal {F}\).
Proof.
We prove the atomicity by proving properties A1, A2 and A3 presented in Section 2 for any execution of the algorithm.
Property A1: Consider two operations \(\phi\) and \(\pi\) such that \(\phi\) completes before \(\pi\) is invoked. We need to show that it cannot be the case that \(\pi \prec \phi\). We break our analysis into the following four cases:
Case \((a)\): Both \(\phi\) and \(\pi\) are writes. The \({c}.{{\sf put-data}(*)}\) of \(\phi\) completes before \(\pi\) is invoked. By property C1 the tag \(\tau _{\pi }\) returned by the \({c}.{{\sf get-data}()}\) at \(\pi\) is at least as large as \(\tau _{\phi }\). Now, since \(\tau _{\pi }\) is incremented by the write operation then \(\pi\) puts a tag \(\tau _{\pi }^{\prime }\) such that \(\tau _{\phi } \lt \tau _{\pi }^{\prime }\) and hence we cannot have \(\pi \prec \phi\).
Case \((b)\): \(\phi\) is a write and \(\pi\) is a read. In execution \({\xi }\) since \({c}.{{\sf put-data}(\langle t_{\phi }, * \rangle)}\) of \(\phi\) completes before the \({c}.{{\sf get-data}()}\) of \(\pi\) is invoked, by property C1 the tag \(\tau _{\pi }\) obtained from the above \({c}.{{\sf get-data}()}\) is at least as large as \(\tau _{\phi }\). Now \(\tau _{\phi } \le \tau _{\pi }\) implies that we cannot have \(\pi \prec \phi\).
Case \((c)\): \(\phi\) is a read and \(\pi\) is a write. Let the id of the writer that invokes \(\pi\) we \(w_{\pi }\). The \({c}.{{\sf put-data}(\langle \tau _{\phi }, * \rangle)}\) call of \(\phi\) completes before \({c}.{{\sf get-tag}()}\) of \(\pi\) is initiated. Therefore, by property C1\({\sf get-tag}(c)\) returns \(\tau _{}\) such that, \(\tau _{\phi } \le \tau _{}\). Since \(\tau _{\pi }\) is equal to \(inc(\tau _{})\) by design of the algorithm, hence \(\tau _{\pi } \gt \tau _{\phi }\) and we cannot have \(\pi \prec \phi\).
Case \((d)\): Both \(\phi\) and \(\pi\) are reads. In execution \({\xi }\) the \({c}.{{\sf put-data}(\langle t_{\phi }, * \rangle)}\) is executed as a part of \(\phi\) and completes before \({c}.{{\sf get-data}()}\) is called in \(\pi\). By property C1 of the data-primitives, we have \(\tau _{\phi } \le \tau _{\pi }\) and hence we cannot have \(\pi \prec \phi\).
Property A2: Note that because the tag set \({\mathcal {T}}\) is well-ordered we can show that A2 holds by first showing that every write has a unique tag. This means that any two pair of writes can be ordered. Note that a read can be ordered w.r.t. any write operation trivially if the respective tags are different, and by definition, if the tags are equal the write is ordered before the read.
Observe that two tags generated from different writers are necessarily distinct because of the id component of the tag. Now if the operations, say \(\phi\) and \(\pi\) are writes from the same writer then, by well-formedness property, the second operation will witness a higher integer part in the tag by property C1, and since the \({c}.{{\sf get-tag}()}\) is followed by \({c}.{{\sf put-data}(*)}\). Hence \(\pi\) is ordered after \(\phi\).
Property A3: By C2 the \({c}.{{\sf get-data}()}\) may return a tag \(\tau _{}\), only when there exists an operation \(\pi\) that invoked a \({c}.{{\sf put-data}(\langle \tau _{},* \rangle)}\). Otherwise, it returns the initial value. Since a write is the only operation to put a new tag \(\tau _{}\) in the system then Property A3 follows from C2.□
8.1 Representing Known Algorithms in Terms of DAPs
A number of known tag-based algorithms that implement atomic read/write objects (e.g., ABD [11] and Fast [21]), can be expressed in terms of DAP. In this subsection, we demonstrate how we can transform the very celebrated ABD algorithm [11].
mwABDAlgorithm. The multi-writer version of the ABD can be transformed to the generic algorithm Template A. Algorithm 8 illustrates the three DAP for the ABD algorithm. The \({\sf get-data}\) primitive encapsulates the query phase of mwABD, while the \({\sf put-data}\) primitive encapsulates the propagation phase of the algorithm.
Let us now examine if the primitives satisfy properties C1 and C2 of Property 1. We begin with a lemma that shows the monotonicity of the tags at each server.
Lemma 35.
Let \(\sigma\) and \(\sigma ^{\prime }\) two states in an execution \({\xi }\) such that \(\sigma\) appears before \(\sigma ^{\prime }\) in \({\xi }\). Then for any server \(s\in \mathcal {S}\) it must hold that \(s.tag|_{\sigma }\le s.tag|_{\sigma ^{\prime }}\).
Proof.
According to the algorithm, a server s updates its local tag-value pairs when it receives a message with a higher tag. So if \(s.tag|_{\sigma }=\tau\) then in a state \(\sigma ^{\prime }\) that appears after \(\sigma\) in \({\xi }\), \(s.tag|_{\sigma ^{\prime }}\ge \tau\).□
In the following two lemmas we show that property C1 is satisfied, that is if a \({\sf put-data}\) action completes, then any subsequent \({\sf get-data}\) and \({\sf get-tag}\) actions will discover a higher tag than the one propagated by that \({\sf put-data}\) action.
Lemma 36.
Let \(\phi\) be a \({c}.{{\sf put-data}(\langle \tau ,v \rangle)}\) action invoked by \(p_1\) and \(\gamma\) be a \({c}.{{\sf get-tag}()}\) action invoked by \(p_2\) in a configuration c, such that \(\phi \rightarrow \gamma\) in an execution \({\xi }\) of the algorithm. Then \(\gamma\) returns a tag \(\tau _\gamma \ge \tau\).
Proof.
The lemma follows from the intersection property of quorums. In particular, during the \({c}.{{\sf put-data}(\langle \tau ,v \rangle)}\) action, \(p_1\) sends the pair \(\langle \tau ,v \rangle\) to all the servers in \(c.Servers\) and waits until all the servers in a quorum \(Q_i\in c.Quorums\) reply. When those replies are received then the action completes.
During a \({c}.{{\sf get-data}()}\) action on the other hand, \(p_2\) sends query messages to all the servers in \(c.Servers\) and waits until all servers in a quorum \(Q_j\in c.Quorums\) (not necessarily different than \(Q_i\)) reply. By definition \(Q_i \cap Q_j\ne \emptyset\), thus any server \(s\in Q_i\cap Q_j\) reply to both \(\phi\) and \(\gamma\) actions. By Lemma 35 and since s received a tag \(\tau\), then s replies to \(p_2\) with a tag \(\tau _s\ge \tau\). Since \(\gamma\) returns the maximum tag it discovers then \(\tau _\gamma \ge \tau _s\). Therefore \(\tau _\gamma \ge \tau\) and this completes the proof.□
With similar arguments and given that each value is associated with a unique tag then we can show the following lemma.
Lemma 37.
Let \(\pi\) be a \({c}.{{\sf put-data}(\langle \tau ,v \rangle)}\) action invoked by \(p_1\) and \(\phi\) be a \({c}.{{\sf get-data}()}\) action invoked by \(p_2\) in a configuration c, such that \(\pi \rightarrow \phi\) in an execution \({\xi }\) of the algorithm. Then \(\phi\) returns a tag-value \(\langle \tau _\phi , v_\phi \rangle\) such that \(\tau _\phi \ge \tau\).
Finally we can now show that property C2 also holds.
Lemma 38.
If \(\phi\) is a \({c}.{{\sf get-data}()}\) that returns \(\langle \tau _{\pi }, v_\pi \rangle \in {\mathcal {T}}\times {\mathcal {V}}\), then there exists \(\pi\) such that \(\pi\) is a \({c}.{{\sf put-data}(\langle \tau _{\pi }, v_{\pi } \rangle)}\) and \(\phi \not\rightarrow \pi\).
Proof.
This follows from the facts that (i) servers set their tag-value pair to a pair received by a \({\sf put-data}\) action, and (ii) a \({\sf get-data}\) action returns a tag-value pair that it received from a server. So if a \({c}.{{\sf get-data}()}\) operation \(\phi\) returns a tag-value pair \(\langle \tau _{\pi }, v_\pi \rangle\), there should be a server s that replied to that operation with \(\langle \tau _{\pi }, v_\pi \rangle\), and s received \(\langle \tau _{\pi }, v_\pi \rangle\) from some \({c}.{{\sf put-data}(\langle \tau _{\pi }, v_\pi \rangle)}\) action, \(\pi\). Thus, \(\pi\) can proceed or be concurrent with \(\phi\), and hence \(\phi \not\rightarrow \pi\).□
9 Experimental Evaluation
The theoretical findings suggest that Ares is an algorithm to provide robustness and flexibility on shared memory implementations, without sacrificing strong consistency. In this section, we present a PoC implementation of Ares and we run preliminary experiments to get better insight on the efficiency and adaptiveness of Ares. In particular, our experiments measure the latency of each read, write, and reconfig operations, and examine the persistence of consistency even when the service is reconfigured between configurations that add/remove servers and utilize different shared memory algorithms.
9.1 Experimental Testbed
We ran experiments on two different setups: (i) simulated locally on a single machine, and (ii) on a LAN. Both type of experiments run on Emulab [2], an emulated WAN environment testbed used for developing, debugging, and evaluating the systems. We used nodes with two 2.4 GHz 64-bit 8-Core E5-2630 “Haswell” processors, \(64 \,\mathrm{G}B\) RAM, with \(1 \,\mathrm{G}B\) and \(10 \,\mathrm{G}B\) NICs. In both setups we used an external implementation of Raft[45] consensus algorithms, which was used for the service reconfiguration (line 16 of Algorithm 2) and was deployed on top of small RPi devices. Small devices introduced further delays in the system, reducing the speed of reconfigurations and creating harsh conditions for longer periods in the service. The Python implementation of Raft used for consensus is PySyncObj [5]. Some modifications were done to allow the execution of Raft in the Ares environment. We built an HTTP API for the management of the Raft subsystem. A reconfigurer can propose a configuration at a particular index in the configuration sequence by sending a POST request to the url of each Raft node, and receives a response from the RAFT on which configuration is decided for that index.
Local Experimental Setup: The local setup was used to have access to a global synchronized clock (the clock of the local machine) in order to examine whether our algorithm preserves global ordering and hence atomicity even when using different algorithms between configurations. Therefore, all the instances are hosted on the same physical machine avoiding the skew between computer clocks in a distributed system. Furthermore, the use of one clock guarantees that when an event occurs after another, it will assign a later time.
Distributed Experimental Setup: The distributed experiments in Emulab enabled the examination of the performance of the algorithm in a close to real environment. For the deployment and remote execution of the experimental tasks on the Emulab, we used Ansible Playbooks [1]. All physical nodes were placed on a single LAN using a DropTail queue without delay or packet loss. Each physical machine runs one server or client process. This guarantees a fair communication delay between a client and a server node.
Node Types: In all experiments, we use four distinct types of nodes, writers, readers, reconfigurers, and servers. Their main role is listed below:
—
writer \(w \in W \subseteq C\) : a client that sends write requests to all servers and waits for a quorum of the servers to reply.
—
reader \(r \in R \subseteq C\): a client that sends read requests to servers and waits for a quorum of the servers to reply.
—
reconfigurer \(g \in G \subseteq C\): a client that sends reconfiguration requests to servers and waits for a quorum of the servers to reply.
—
server \(s \in S\): a server listens for read, write and reconfiguration requests, it updates its object replica according to the atomic shared memory and replies to the process that originated the request.
Performance Metric: The metric for evaluating the algorithms is the operational latency. This includes both communication and computational delays. The operation latency is computed as the average of all clients’ average operation latencies. For better estimations, each experiment in every scenario was repeated six times. In the graphs, we use error bars to illustrate the standard error of the mean (SEM) from the six repeated experiments.
9.2 Experimental Scenarios
In this section, we describe the scenarios we constructed and the settings for each of them. In our scenarios, we constructed the \(DAP_{s}\) and used two different atomic storage algorithms in Ares: (i) the erasure coding-based algorithm presented in Section 5, and (ii) the ABD algorithm (see Section 8.1).
Implementation of \(DAP_{s}\): Clients initialize the appropriate configuration objects to handle any request. Notice that the client creates a configuration object when it is the first time that the client requests an operation or when doing a reconfiguration operation. Once the configuration object is initialized, it is stored on the client cseq and it is retrieved directly on any subsequent request from the client. Therefore, the \(DAP_{s}\) procedures are called from a configuration object. The asynchronous communication between components is achieved by using DEALER and ROUTER sockets, from the ZeroMQ Python library [6].
Erasure Coding: The type of erasure coding we use is (\(n,k\))-Reed-Solomon code, which guarantees that any k of n coded fragments is enough to reassemble the original object. The parameter k is the number of encoded data fragments, n is the total number of servers and m is the number of parity fragments, i.e., \(n-k\). A high number of k and consequently a small number of m means less redundancy with the system tolerating fewer failures. When \(k=1,\) we essentially converge to replication. In practice, the \({\sf get-data}\) and \({\sf put-data}\) functions from Algorithm 5 integrate the standard Reed–Solomon implementation provided by liberasurecode from the PyEClib Python library [4].
Fixed Parameters: In all scenarios, the number of servers is fixed to 10. The number of writers and the value of delta are set to 5; delta being the maximum number of concurrent put-data operations. The parity value of the EC is set to 2 in order to minimize the redundancy, leading to 8 data servers and 2 parity servers. It is worth mentioning that the quorum size of the EC algorithm is \(\lceil \frac{10+8}{2} \rceil =9\), while the quorum size of ABD algorihtm is \(\lfloor \frac{10}{2} \rfloor +1=6\). In relation to the EC algorithm, we can conclude that the parameter k is directly proportional to the quorum size. But as the value of k and quorum size increase, the size of coded elements decreases.
Distributed Experiments: For the distributed experiments we use a stochastic invocation scheme in which readers and writers pick a random time uniformly distributed (discrete) between intervals to invoke their next operations. Respectively the intervals are \([1...rInt]\) and \([1..wInt]\), where \(rInt, wInt = 2sec\). In total, each writer performs 60 writes and each reader 60 reads. The reconfigurer invokes its next operation every 15sec and performs a total of six reconfigurations. The intervals are set within these values in order to generate a continuous flow of operations and stress the concurrency in the system. Note that these values are not based on any real world scenario.
In particular, we present six types of scenarios:.
—
File Size Scalability (Emulab): The first scenario is made to evaluate how the read and write latencies are affected by the size of the shared object. There are two separated runs, one for each examined storage algorithm. The file size is doubled from \(1 \,\mathrm{M}B\) to \(128 \,\mathrm{M}B\). The number of readers is fixed to 5, without any reconfigurers.
—
Reader Scalability (Emulab): This scenario is constructed to compare the read and write latency of the system with two different storage algorithms, while the readers increase. In particular, we execute two separate runs, one for each storage algorithm. We use only one reconfigurer which requests recon operations that lead to the same shared memory emulation and server nodes. The size of the file used is \(4 \,\mathrm{M}B\).
—
Changing Reconfigurations (Emulab): In this scenario, we evaluate how the read and write latencies are affected when increasing the number of readers, while also changing the storage algorithm. We run two different runs which differ in the way the reconfigurer chooses the next storage algorithm: (i) the reconfigurer chooses randomly between the two storage algorithms, and (ii) the reconfigurer switches between the two storage algorithms. The size of the file used, in both scenarios, is \(4 \,\mathrm{M}B\).
—
k Scalability (Emulab, EC only): In this scenario, we examine the read and write latencies with different numbers of k (a parameter of Reed-Solomon). We increase the k of the EC algorithm from 1 to 9. The number of readers is fixed to 5, without any reconfigurers. The size of the file used is \(4 \,\mathrm{M}B\).
—
Changing the number of Reads/Writes (Emulab): In these scenarios, we examine the read and write latencies with different numbers of read and write operations respectively. We change the number of reads/writes that each reader/writer performs, from 10 to 60, increasing by 10. We calculate all possible pairs of writes and reads. The number of readers is fixed to 5. The reconfigurer switches between the two storage algorithms. The size of the file used is \(4 \,\mathrm{M}B\)
—
Consistency Persistence (Local): In this scenario, we run multiple client operations in order to check if the data is consistent across servers. The number of readers is set to 5. The readers and writers invoke their next operations without any time delay, while the reconfigurer waits 15sec for the next invocation. We run two different scenarios which differ in the reconfigurations. In both scenarios, the reconfigurer switches between the two storage algorithms. In the second scenario, the reconfigurer changes concurrently the quorum of servers. In total, each writer performs 500 writes, each reader 500 reads and the reconfigurer 50 reconfigurations. The size of the file used is \(4 \,\mathrm{M}B\).
9.3 Experimental Results
In this section, we present and explain the evaluation results of each scenario. As a general observation, the Ares algorithm with the EC storage provides data redundancy with a lower communicational and storage cost compared to the ABD storage that uses a strict replication technique.
File Size Scalability Results: Figure 3(a) shows the results of the file size scalability experiments. The read and write latencies of both storage algorithms remain at low levels until \(16 \,\mathrm{M}B\). In bigger sizes we observe the latencies of all operations to grow significantly. It is worth noting that the fragmentation applied by the EC algorithm, benefits its write operations which follow a slower increasing curve than the rest of the operations. From the rest, the reads seem to suffer the worst delay hit, as they are engaged in more communication phases. Nevertheless, the larger messages sent by ABD result in slower read operations. We noticed that EC has lower SEM values than ABD, which indicates that the calculated mean latencies of EC align very closely throughout the experiments. As EC breaks each file into smaller fragments, in combination with the fact that the variation is smaller when using smaller files in ABD, may lead to the conclusion that the file size has a significant impact on the error variation. To this end, it appears that larger file sizes introduce a higher variation in the delivery times of the file and hence higher statistical errors.
Fig. 3.
Fig. 3. Simulation results.
Reader Scalability Results: The results of reader scalability experiments can be found in Figure 3(b). The read and write latencies of both algorithms remain almost unchanged, while the number of readers increases. This indicates that the system does not reach a state where it can not handle the concurrent read operations. Still, the reduced message size of read and write operations in EC keep their latencies lower than the corresponding latencies of ABD. On the other hand, the reconfiguration latency in both algorithms witnesses wild fluctuations between about 1 sec and 4 sec. This is probably due to the unstable connection in the external service which handles the reconfigurations. Notice that the number of readers does not have a great impact on the results we obtain in each experiment as the SEM error bars are small. The same goes for the next scenario where the number of readers changes while switching algorithms between reconfigurations.
Changing Reconfigurations Results: Figure 3(c) illustrates the results of experiments with the random storage change. While, in Figure 3(d), we can find the results of the experiments when the reconfigurer switches between storage algorithms. During both experiments, there are cases where a single read/write operation may access configurations that implement both ABD and EC algorithms, when concurrent with a recon operation. Thus, the latencies of such operations are accounted in both ABD and EC latencies. As we mentioned earlier, our choice of k minimizes the coded fragment size but introduces bigger quorums and thus larger communication overhead. As a result, in smaller file sizes, Ares may not benefit from the coding, bringing the delays of the two algorithms closer to each other. It is again obvious that the reconfiguration delays are higher than the delays of all other operations.
k Scalability Results: From Figures 3(e) we can infer that when smaller k is used, the write and read latencies reach their highest values. In both cases, small k results in the generation of smaller number of data fragments and thus bigger sizes of the fragments and higher redundancy. For example, we can see that for RS(10,8) and RS(10,7) we have the same size of quorum, equal to 9, whereas the latter has more redundant information. As a result, with a higher number of m (i.e., smaller k) we achieve higher levels of fault-tolerance, but that would waste storage efficiency. The write latency seems to be less affected by the number of k since the encoding is considerably faster as it requires less computation. In conclusion, there appears to be a tradeoff between operation latency and fault-tolerance in the system: the further increase of the k (and thus lower fault-tolerance) the smaller the latency of read/write operations. This experiment proves that the object size plays a significant role in the error variation. Notice that while k is small, and thus the object we send out is bigger, the error is higher. As k goes bigger and the fragments get smaller the SEM minimizes. This is an indication that communication of larger data over the wire may fluctuate the delivery times (as also seen in the file size scenario).
Changing the number of Reads/Writes Results: Figure 3(f) shows a subset of the results of the experiments where the number of read operations changes and the number of write operations is fixed to 60. The experiments show that the total read/write latency (both EC and ABD) has very similar values for all the combinations of writes and reads, which indicates that the system performance is not affected by the number of reads and writes. This is expected since the number of participants is the same in all cases and by well-formedness (i.e., each participant invokes a single operation at a time) at most 10 operations will be concurrent in the execution at any given state. Higher concurrency can be captured by the scalability scenario. Note again that the read latency is higher than the write one, since the read operation actually transfers data twice: once to fetch the data from the servers, and once during the propagation phase.
Consistency Persistence Results: Though Ares protocol is probably strongly consistent, it is important to ensure that our implementation is correct. Validating strong consistency of an execution requires precise clock synchronization across all processes, so that one can track operations with respect to a global time. This is impossible to achieve in a distributed system where clock drift is inevitable. To circumvent this, we deploy all the processes in a single beefy machine so that every process observes the same clock running in the same physical machine.
Our checker gathers data regarding an execution, and this data includes start and end times of all the operations, as well as other parameters like logical timestamps used by the protocol. The checker logic is based on the conditions appearing in Lemma 13.16 [38], which provide a set of sufficient conditions for guaranteeing strong consistency. The checker validates strong consistency property for every atomic object individually for the execution under consideration. Note that consistency holds despite the existence of concurrent read/write and reconfiguration operations that may add/remove servers and switch the storage algorithm in the system.
10 Conclusions
We presented an algorithmic framework suitable for reconfigurable, erasure code-based atomic memory service in asynchronous, message-passing environments. In particular, we provide a new modular framework, called Ares, which abstracts the implementation of the underlying shared memory within a set of DAPs with specific correctness properties. Using these structures, Ares may implement a large class of atomic shared algorithms (those that can be expressed using the proposed DAPs) allowing any such algorithm to work in a reconfigurable environment. A set of erasure-coded-based atomic memory algorithms are included in this class. To demonstrate the use of our framework, we provided a new two-round erasure code-based algorithm that has near optimal storage cost, implemented in terms of the proposed DAPs. Such implementation gave rise to the first (to our knowledge) reconfigurable erasure-coded atomic shared memory object. We provided a PoC implementation of our framework and obtained initial experimental results proving the feasibility of the presented approach, demonstrating its correctness, and comparing its performance with traditional approaches.
Ares is designed to address the real-world problem of system migration from a replicated system to a system that uses erasure codes and vice-versa. Ares can also enable replacing of failed nodes with new non-failed nodes. Its key difference with the existing state-of-the-art systems is that it can perform such reconfigurations with relatively minimal interruption of service unlike current implementations that would block ongoing operations for reconfiguration. We anticipate that Ares will be very useful for workloads that are prone to fast changes in properties, and have stringent constraints on the latencies. For such workloads, Ares enables the system to adapt itself to the changes in the workload in an agile manner—utilizing the full flexibility that EC brings to the system—without causing latency constraints or consistency violations due to interruptions.
Our main goal was to establish that non-blocking reconfiguration is feasible and compatible with EC-based atomic data storage.Our experimental study is designed as a PoC prototype to verify the correctness properties we have developed and show some benefits. It must be emphasized that a full-fledged system study of our algorithms—albeit an interesting area of future work that is motivated by our article—is outside our current scope. In particular, although our study provides some initial hints, we anticipate such a future study would examine the following questions in more detail:
—
Real-world applications that would indeed benefit from our design.
—
Workloads generated from these real-world applications to test our algorithms, and competing ones.
—
The synergies between our reconfiguration algorithm and existing failure detection and recovery mechanisms.
—
Adding efficient repair and reconfiguration using regenerating codes.
Footnotes
*
In practice, these processes can be on the same node or different nodes.
†
Cassandra~[36] offers tuneable consistency, it uses protocol that is essentially ABD~[11] for what they refer to as level 3 consistency (i.e., atomicity).
M. Abebe, K. Daudjee, B. Glasbergen, and Y. Tian. 2018. EC-store: Bridging the gap between storage and latency in distributed erasure coded systems. In Proceedings of the 2018 IEEE 38th International Conference on Distributed Computing Systems. 255–266. DOI:
Marcos Kawazoe Aguilera, Idit Keidar, Dahlia Malkhi, and Alexander Shraer. 2009. Dynamic atomic storage without consensus. In Proceedings of the 28th ACM Symposium on Principles of Distributed Computing. ACM, 17–25.
Marcos K. Aguilera, Idit Keidar, Dahlia Malkhi, Jean-Philippe Martin, and Alexander Shraery. 2010. Reconfiguring replicated atomic storage: A tutorial. Bulletin of the EATCS 102 (2010), 84–108.
Antonio Fernández Anta, Nicolas Nicolaou, and Alexandru Popa. 2015. Making “fast” atomic operations computationally tractable. In Proceedings of the International Conference on Principles Of Distributed Systems.
Dorian Burihabwa, Pascal Felber, Hugues Mercier, and Valerio Schiavoni. 2016. A performance evaluation of erasure coding libraries for cloud-based data stores. In Proceedings of the Distributed Applications and Interoperable Systems. Springer, 160–173.
Christian Cachin and Stefano Tessaro. 2006. Optimal resilience for erasure-coded byzantine distributed storage. In Proceedings of the International Conference on Dependable Systems and Networks. IEEE Computer Society, 115–124.
V. R. Cadambe, N. Lynch, M. Médard, and P. Musial. 2014. A coded shared atomic memory algorithm for message passing architectures. In Proceedings of the 2014 IEEE 13th International Symposium on Network Computing and Applications.253–260. DOI:
Viveck R. Cadambe, Nancy A. Lynch, Muriel Médard, and Peter M. Musial. 2017. A coded shared atomic memory algorithm for message passing architectures. Distributed Computing 30, 1 (2017), 49–73.
Yu Lin Chen Chen, Shuai Mu, and Jinyang Li. 2017. Giza: Erasure coding objects across global data centers. In Proceedings of the 2017 USENIX Annual Technical Conference. 539–551.
Gregory Chockler, Seth Gilbert, Vincent Gramoli, Peter M. Musial, and Alexander A. Shvartsman. 2009. Reconfigurable distributed storage for dynamic networks. Journal of Parallel and Distributed Computing 69, 1 (2009), 100–116.
Dan Dobre, Ghassan O. Karame, Wenting Li, Matthias Majuntke, Neeraj Suri, and Marko Vukolić. 2019. Proofs of writing for robust storage. IEEE Transactions on Parallel and Distributed Systems 30, 11 (2019), 2547–2566. DOI:
Partha Dutta, Rachid Guerraoui, and Ron R. Levy. 2008. Optimistic erasure-coded distributed storage. In DISC’08: Proceedings of the 22nd International Symposium on Distributed Computing. Springer-Verlag, 182–196.
Partha Dutta, Rachid Guerraoui, Ron R. Levy, and Arindam Chakraborty. 2004. How fast can a distributed atomic read be? In Proceedings of the 23rd ACM Symposium on Principles of Distributed Computing. 236–245.
Rui Fan and Nancy Lynch. 2003. Efficient replication of large data objects. In Proceedings of the Distributed Algorithms.Faith Ellen Fich (Ed.), Lecture Notes in Computer Science, Vol. 2848. 75–91.
Antonio Fernández Anta, Theophanis Hadjistasi, and Nicolas Nicolaou. 2016. Computationally light “multi-speed” atomic memory. In Proceedings of the International Conference on Principles Of Distributed Systems.
Eli Gafni and Dahlia Malkhi. 2015. Elastic configuration maintenance via a parsimonious speculating snapshot solution. In Proceedings of the International Symposium on Distributed Computing. Springer, 140–153.
Chryssis Georgiou, Nicolas C. Nicolaou, and Alexander A. Shvartsman. 2008. On the robustness of (semi) fast quorum-based implementations of atomic shared memory. In DISC ’08: Proceedings of the 22nd International Symposium on Distributed Computing. Springer-Verlag, 289–304.
Chryssis Georgiou, Nicolas C. Nicolaou, and Alexander A. Shvartsman. 2009. Fault-tolerant semifast implementations of atomic read/write registers. Journal of Parallel and Distributed Computing 69, 1 (2009), 62–79.
S. Gilbert, N. Lynch, and A. A. Shvartsman. 2003. RAMBO II: Rapidly reconfigurable atomic memory for dynamic networks. In Proceedings of the International Conference on Dependable Systems and Networks. 259–268.
Maurice P. Herlihy and Jeannette M. Wing. 1990. Linearizability: A correctness condition for concurrent objects. ACM Transactions on Programming Languages and Systems 12, 3 (1990), 463–492.
Leander Jehl, Roman Vitenberg, and Hein Meling. 2015. Smartmerge: A new approach to reconfiguration for atomic storage. In Proceedings of the International Symposium on Distributed Computing. Springer, 154–169.
Gauri Joshi, Emina Soljanin, and Gregory Wornell. 2017. Efficient redundancy techniques for latency reduction in cloud systems. ACM Transactions on Modeling and Performance Evaluation of Computing Systems 2, 2 (2017), 12.
K. M. Konwar, N. Prakash, E. Kantor, N. Lynch, M. Médard, and A. A. Schwarzmann. 2016. Storage-optimized data-atomic algorithms for handling erasures and errors in distributed storage systems. In Proceedings of the 2016 IEEE International Parallel and Distributed Processing Symposium. 720–729.
Kishori M. Konwar, N. Prakash, Nancy Lynch, and Muriel Médard. 2016. RADON: Repairable atomic data object in networks. In Proceedings of the International Conference on Distributed Systems.
N. Lynch and A. A. Shvartsman. 2002. RAMBO: A reconfigurable atomic memory service for dynamic networks. In Proceedings of the16th International Symposium on Distributed Computing. 173–190.
Nancy A. Lynch and Alexander A. Shvartsman. 1997. Robust emulation of shared memory using dynamic quorum-acknowledged broadcasts. In Proceedings of the Symposium on Fault-Tolerant Computing. 272–281.
Ellis Michael, Dan R. K. Ports, Naveen Kr. Sharma, and Adriana Szekeres. 2017. Recovering shared objects without stable storage. In Proceedings of the 31st International Symposium on Distributed Computing.Andréa W. Richa (Ed.), Leibniz International Proceedings in Informatics,Vol. 91. Schloss Dagstuhl–Leibniz-Zentrum fuer Informatik, Dagstuhl, Germany, 36:1–36:16. DOI:
Nicolas Nicolaou, Viveck Cadambe, N. Prakash, Kishori Konwar, Muriel Medard, and Nancy Lynch. 2019. ARES: Adaptive, reconfigurable, erasure coded, atomic storage. In Proceedings of the 2019 IEEE 39th International Conference on Distributed Computing Systems. 2195–2205. DOI:
Diego Ongaro and John Ousterhout. 2014. In search of an understandable consensus algorithm. In Proceedings of the 2014 USENIX Conference on USENIX Annual Technical Conference.USENIX Association, 305–320.
K. V. Rashmi, Mosharaf Chowdhury, Jack Kosaian, Ion Stoica, and Kannan Ramchandran. 2016. EC-cache: Load-balanced, low-latency cluster caching with online erasure coding. In Proceedings of the OSDI. 401–417.
Alexander Shraer, Jean-Philippe Martin, Dahlia Malkhi, and Idit Keidar. 2010. Data-centric reconfiguration with network-attached disks. In Proceedings of the 4th InternationalWorkshop on Large Scale Distributed System and Middleware. 22–26.
Alexander Spiegelman, Idit Keidar, and Dahlia Malkhi. 2017. Dynamic reconfiguration: Abstraction and optimal asynchronous solution. In Proceedings of the 31st International Symposium on Distributed Computing. 40:1–40:15.
Shuang Wang, Jianzhong Huang, Xiao Qin, Qiang Cao, and Changsheng Xie. 2017. WPS: A workload-aware placement scheme for erasure-coded in-memory stores. In Proceedings of the International Conference on Networking, Architecture, and Storage. IEEE, 1–10.
Chentao Wu and Xubin He. 2012. GSR: A global stripe-based redistribution approach to accelerate RAID-5 scaling. In Proceedings of the 2012 41st International Conference on Parallel Processing. 460–469. DOI:
Chentao Wu and Xubin He. 2013. A flexible framework to enhance RAID-6 scalability via exploiting the similarities among MDS codes. In Proceedings of the 2013 42nd International Conference on Parallel Processing. 542–551. DOI:
Yu Xiang, Tian Lan, Vaneet Aggarwal, and Yih-Farn R. Chen. 2015. Multi-tenant latency optimization in erasure-coded storage with differentiated services. In Proceedings of the 2015 IEEE 35th International Conference on Distributed Computing Systems. IEEE, 790–791.
Yu Xiang, Tian Lan, Vaneet Aggarwal, Yih-Farn R. Chen, Yu Xiang, Tian Lan, Vaneet Aggarwal, and Yih-Farn R. Chen. 2016. Joint latency and cost optimization for erasure-coded data center storage. IEEE/ACM Transactions on Networking 24, 4 (2016), 2443–2457.
Yinghao Yu, Renfei Huang, Wei Wang, Jun Zhang, and Khaled Ben Letaief. 2018. SP-cache: Load-balanced, redundancy-free cluster caching with selective partition. In Proceedings of the International Conference for High Performance Computing, Networking, Storage, and Analysis. IEEE, 1–13.
Heng Zhang, Mingkai Dong, and Haibo Chen. 2016. Efficient and available in-memory KV-store with hybrid erasure coding and replication. In Proceedings of the 14th USENIX Conference on File and Storage Technologies. USENIX Association, Santa Clara, CA, 167–180.
Xiaoyang Zhang, Yuchong Hu, Patrick P. C. Lee, and Pan Zhou. 2018. Toward optimal storage scaling via network coding: From theory to practice. In Proceedings of the IEEE INFOCOM 2018 - IEEE Conference on Computer Communications. 1808–1816. DOI:
IDEAS '15: Proceedings of the 19th International Database Engineering & Applications Symposium
For typical IaaS cloud usage, frequent provisioning of virtual volumes is needed. However, performance of HDD storage is not sufficient and becomes bottleneck in IaaS cloud. In this paper, we studied a comparison of HDD-SSD hybrid storage, distributed ...
SIGCOMM '14: Proceedings of the 2014 ACM conference on SIGCOMM
Erasure codes such as Reed-Solomon (RS) codes are being extensively deployed in data centers since they offer significantly higher reliability than data replication methods at much lower storage overheads. These codes however mandate much higher ...
PODC '17: Proceedings of the ACM Symposium on Principles of Distributed Computing
Motivated by emerging applications to the edge computing paradigm, we introduce a two-layer erasure-coded fault-tolerant distributed storage system offering atomic access for read and write operations. In edge computing, clients interact with an edge-...
Permission to make digital or hard copies of all or part of this work for personal or classroom use is granted without fee provided that copies are not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. Copyrights for components of this work owned by others than ACM must be honored. Abstracting with credit is permitted. To copy otherwise, or republish, to post on servers or to redistribute to lists, requires prior specific permission and/or a fee. Request permissions from [email protected].