Nothing Special   »   [go: up one dir, main page]

skip to main content
research-article
Public Access

Ares: Adaptive, Reconfigurable, Erasure coded, Atomic Storage

Published: 12 November 2022 Publication History

Abstract

Emulating a shared atomic, read/write storage system is a fundamental problem in distributed computing. Replicating atomic objects among a set of data hosts was the norm for traditional implementations (e.g., [11]) in order to guarantee the availability and accessibility of the data despite host failures. As replication is highly storage demanding, recent approaches suggested the use of erasure-codes to offer the same fault-tolerance while optimizing storage usage at the hosts. Initial works focused on a fixed set of data hosts. To guarantee longevity and scalability, a storage service should be able to dynamically mask hosts failures by allowing new hosts to join, and failed host to be removed without service interruptions. This work presents the first erasure-code -based atomic algorithm, called Ares, which allows the set of hosts to be modified in the course of an execution. Ares is composed of three main components: (i) a reconfiguration protocol, (ii) a read/write protocol, and (iii) a set of data access primitives (DAPs). The design of Ares is modular and is such to accommodate the usage of various erasure-code parameters on a per-configuration basis. We provide bounds on the latency of read/write operations and analyze the storage and communication costs of the Ares algorithm.

1 Introduction

Distributed Storage Systems (DSSes) store large amounts of data in an affordable manner. Cloud vendors deploy hundreds to thousands of commodity machines, networked together to act as a single giant storage system. Component failures of commodity devices and network delays are the norm, therefore, ensuring consistent data access and availability at the same time is challenging. Vendors often solve availability by replicating data across multiple servers. These services use carefully constructed algorithms that ensure that these copies are consistent, especially when they can be accessed concurrently by different operations. The problem of keeping copies consistent becomes even more challenging when failed servers need to be replaced or new servers are added, without interrupting the service. Any type of service interruption in a heavily used DSS usually translates to immense revenue loss.
The goal of this work is to provide an algorithm for implementing strongly consistent (i.e., atomic/linearizable), fault-tolerant distributed read/write storage, with low storage and communication footprint, and the ability to reconfigure the set of data hosts without service interruptions.
Replication-based Atomic Storage. A long stream of work used replication of data across multiple servers to implement atomic (linearizable) read/write objects in message-passing, asynchronous environments where servers (data hosts) may crash fail [10, 11, 21, 22, 23, 25, 26, 40]. A notable replication-based algorithm appears in the work by Attiya, Bar-Noy, and Dolev [11] (we refer to as the ABD algorithm) which implemented non-blocking atomic read/write data storage via logical timestamps paired with values to order read/write operations. Replication-based strategies, however, incur high storage and communication costs; for example, to store 1,000,000 objects each of size 1 MB (a total size of 1TB) across a 3 server system, the ABD algorithm replicates the objects in all the 3 servers, which blows up the worst-case storage cost to 3TB. Additionally, every write or read operation may need to transmit up to 3 MB of data (while retrieving an object value of size 1 MB), incurring high communication cost.
Erasure Code-based Atomic Storage. Erasure Coded-based DSSes are extremely beneficial to save storage and communication costs while maintaining similar fault-tolerance levels as in replication-based DSSes [16]. Mechanisms using an \([n, k]\) erasure code splits a value v of size, say 1 unit, into k elements, each of size \(\frac{1}{k}\) units, creates n coded elements of the same size, and stores one coded element per server, for a total storage cost of \(\frac{n}{k}\) units. So the \([n = 3, k = 2]\) code in the previous example will reduce the storage cost to 1.5 TB and the communication cost to 1.5 MB (improving also operation latency). Maximum Distance Separable (MDS) codes have the property that value v can be reconstructed from any k out of these n coded elements; note that replication is a special case of MDS codes with \(k=1\). In addition to the potential cost-savings, the suitability of erasure-codes for DSSes is amplified with the emergence of highly optimized erasure coding libraries, that reduce encoding/decoding overheads [3, 12, 46]. In fact, an exciting recent body of systems and optimization works [7, 33, 46, 49, 52, 53, 54, 58] has demonstrated that for several data stores, the use of erasure coding results in lower latencies than replication-based approaches. This is achieved by allowing the system to carefully tune erasure coding parameters, data placement strategies, and other system parameters that improve workload characteristics—such as load and spatial distribution. A complementary body of work has proposed novel non-blocking algorithms that use erasure coding to provide an atomic storage over asynchronous message passing models [13, 15, 16, 20, 34, 35, 56]. Since erasure code-based algorithms, unlike their replication-based counter parts, incur the additional burden of synchronizing the access of multiple pieces of coded-elements from the same version of the data object, these algorithms are quite complex.
Reconfigurable Atomic Storage. Configuration refers to the set of storage servers that are collectively used to host the data and implement the DSS. Reconfiguration is the process of adding or removing servers in a DSS. In practice, reconfigurations are often desirable by system administrators [9], for a wide range of purposes, especially during system maintenance. As the set of storage servers becomes older and unreliable they are replaced with new ones to ensure data durability. Furthermore, to scale the storage service to increased or decreased load, larger (or smaller) configurations may be needed to be deployed. Therefore, in order to carry out such reconfiguration steps, in addition to the usual read and write operations, an operation called reconfig is invoked by reconfiguration clients. Performing reconfiguration of a system, without service interruption, is a very challenging task and an active area of research. RAMBO [39] and DynaStore [8] are two of the handful of algorithms [17, 24, 27, 32, 47, 48] that allow reconfiguration on live systems; all these algorithms are replication-based.
A related body of work appeared for erasure coded scaling, although there exist important differences that distinguish the two problems. In particular works like [30, 50, 51, 55, 57] consider RAID-based systems with synchronous network communication and local computation. Synchrony allows processes to make assumptions on the time of message delivery, and in turn, help them to infer whether a communicating party has failed or not. On an asynchronous system, similar to the one we consider in this work, messages may be delivered with arbitrary delays. Therefore, it is impossible to distinguish whether a message from a source is in transit or the source has crashed before sending a message. This uncertainty makes it impossible to detect failed from operating nodes, and thus challenging to design algorithms to guarantee atomicity (strong consistency) and completion of reads and writes.
Despite the attractive prospects of creating strongly consistent DSSes with low storage and communication costs, so far, no algorithmic framework for reconfigurable atomic DSS employed erasure coding for fault-tolerance or provided any analysis of bandwidth and storage costs. Our article fills this vital gap in algorithms literature, through the development of a novel reconfigurable approach for atomic storage that uses erasure codes for fault-tolerance. From a practical viewpoint, our work may be interpreted as a bridge between the systems optimization works [7, 33, 46, 49, 52, 53, 54, 58] and non-blocking erasure coded based consistent storage [13, 15, 16, 20, 34, 35, 56]. Specifically, the uses of our reconfigurable algorithm would potentially enable a data storage service to dynamically shift between different erasure coding-based parameters and placement strategies, as the demand characteristics (such as load and spatial distribution) change, without service interruption.
Our Contributions. We develop a reconfigurable, erasure-coded, atomic, or strongly consistent [29, 38] read/write storage algorithm, called Ares. Motivated by many practical systems, Ares assumes clients and servers are separate processes* that communicate via logical point-to-point channels.
In contrast to the replication-based reconfigurable algorithms [8, 17, 24, 27, 32, 39, 47, 48], where a configuration essentially corresponds to the set of servers that stores the data, the same concept for erasure coding need to be much more involved. In particular, in erasure coding, even if the same set of n servers are used, a change in the value of k defines a new configuration. Furthermore, several erasure coding-based algorithms [15, 20] have additional parameters that tune how many older versions each server stores, which in turn influences the concurrency level allowed. Tuning of such parameters can also fall under the purview of reconfiguration.
To accommodate these various reconfiguration requirements, Ares takes a modular approach. In particular, Ares uses a set of primitives, called data-access primitives (DAPs). A different implementation of the DAP primitives may be specified in each configuration. Ares uses DAPs as a “black box” to: (i) transfer the object state from one configuration to the next during reconfig operations, and (ii) invoke read/write operations on a single configuration. Given the DAP implementation for each configuration we show that Ares correctly implements a reconfigurable, atomic read/write storage.
The DAP primitives provide Ares a much broader view of the notion of a configuration as compared to replication-based algorithms. Specifically, the DAP primitives may be parameterized, following the parameters of protocols used for their implementation (e.g., erasure coding parameters, set of servers, quorum design, and concurrency level). While transitioning from one configuration to another, our modular construction allows Ares to reconfigure between different sets of servers, quorum configurations, and erasure coding parameters. In principle, Ares even allows reconfiguring between completely different protocols as long as they can be interpreted/expressed in terms of the primitives; though in this article, we only present one implementation of the DAP primitives to keep the scope of the article reasonable. From a technical point of view, our modular structure makes the atomicity proof of a complex algorithm (like Ares) easier.
An important consideration in the design choice of Ares is to ensure that we gain/retain the advantages that come with erasure codes—cost of data storage and communication is low—while having the flexibility to reconfigure the system. Towards this end, we present an erasure-coded implementation of DAPs which satisfy the necessary properties and are used by Ares to yield the first reconfigurable, erasure-coded, read/write atomic storage implementation, where read and write operations complete in two-rounds. We provide the atomicity property and latency analysis for any operation in Ares, along with the storage and communication costs resulting from the erasure-coded DAP implementation. In particular, we specify lower and upper bounds on the communication latency between the service participants, and we provide the necessary conditions to guarantee the termination of each read/write operation while concurrent with reconfig operations.
Table 1 compares Ares with a few well-known erasure-coded and replication-based (static and reconfigurable) atomic memory algorithms. From the table we observe that Ares is the only algorithm to combine a dynamic behavior with the use of erasure codes, while reducing the storage and communication costs associated with the read or write operations. Moreover, in Ares the number of rounds per write and read is at least as good as in any of the remaining algorithms.
Table 1.
Algorithm#rounds/ write#rounds/ readReconfig.Repl. or ECStorage costread bandwidthwrite bandwidth
CASGC [14]32NoEC\((\delta +1)\frac{n}{k}\)\(\frac{n}{k}\)\(\frac{n}{k}\)
SODA [34]22NoEC\(\frac{n}{k}\)\((\delta +1)\frac{n}{k}\)\(\frac{n^2}{k}\)
ORCAS-A [20]3\(\ge\)2NoECnnn
ORCAS-B [20]33NoEC\(\infty\)\(\infty\)\(\infty\)
ABD [11]22NoRepl.n2nn
RAMBO [39]22YesRepl.\(\ge \! n\)\(\ge \! n\)\(\ge \! n\)
Dynastore [8]\(\ge\)4\(\ge\)4YesRepl.\(\ge \! n\)\(\ge \! n\)\(\ge \! n\)
SmartMerge [32]22YesRepl.\(\ge \! n\)\(\ge \! n\)\(\ge \! n\)
Ares (this article)22YesEC\((\delta +1)\frac{n}{k}\)\((\delta +1)\frac{n}{k}\)\(\frac{n}{k}\)
Table 1. Comparison of Ares with Previous Algorithms Emulating Atomic Read/Write Memory for Replication (Repl.) and Erasure-code Based (EC) Algorithms
\(\delta\) is the maximum number of concurrent writes with any read during the course of an execution of the algorithm. In practice, \(\delta \lt 4\) [16].
We developed a proof-of-concept (PoC) implementation of Ares and deployed it over a set of distributed devices in the experimental testbed Emulab [2]. The most important take-home message from our experimental results is to show that it is possible to implement our algorithm according to the specifications and produces a correct execution and remains available during reconfiguration. Although the correctness of the algorithm is shown analytically, the experimental validation corroborates the correctness. For this purpose, we have chosen simple parameterization (e.g., uniform selection of read/write invocation intervals), and picked ABD [11] as a benchmark algorithm which, despite being proposed more than 25 years ago, is the fundamental algorithm for emulating replicated quorum-based atomic shared memory. For instance, it is adopted in commercial/open-source implementations like Cassandra 36, and is being used as a standard benchmark algorithm (as can be seen in other recent works [19]). However, to demonstrate a real-world application we would need to compare with more algoritrhms and utilize a wide range of read/write distributions, and this is planned as a separate work.
Document Structure. Section 2 presents the model assumptions and Section 3, the DAP primitives. In Section 4, we present the implementation of the reconfiguration and read/write protocols in Ares using the DAPs. In Section 5, we present an erasure-coded implementation of a set of DAPs, which can be used in every configuration of the Ares algorithm. Section 7 provides operation latency and cost analysis, and Section 8 the DAP flexibility. Section 9 presents an experimental evaluation of the proposed algorithms. We conclude our work in Section 10. Due to lack of space omitted proofs can be found in [43].

2 Model and Definitions

A shared atomic storage, consisting of any number of individual objects, can be emulated by composing individual atomic memory objects. Therefore, herein we aim in implementing a single atomic read/write memory object. A read/write object takes a value from a set \({\mathcal {V}}\). We assume a system consisting of four distinct sets of processes: a set \(\mathcal {W}\) of writers, a set \(\mathcal {R}\) of readers, a set \(\mathcal {G}\) of reconfiguration clients, and a set \(\mathcal {S}\) of servers. Let \(\mathcal {I}= \mathcal {W}\cup \mathcal {R}\cup \mathcal {G}\) be the set of clients. Servers host data elements (replicas or encoded data fragments). Each writer is allowed to modify the value of a shared object, and each reader is allowed to obtain the value of that object. Reconfiguration clients attempt to introduce new configuration of servers to the system in order to mask transient errors and to ensure the longevity of the service. Processes communicate via messages through asynchronous, and reliable channels.
Configurations. A configuration, with a unique identifier from a set \(\mathcal {C}\), is a data type that describes the finite set of servers that are used to implement the atomic storage service. In our setting, each configuration is also used to describe the way the servers are grouped into sets, called quorums, s.t. each pair of quorums intersect, the consensus instance that is used as an external service to determine the next configuration, and a set of DAPs that specify the interaction of the clients and servers in the configuration (see Section 3).
More formally, a configuration, \(c\in \mathcal {C}\), consists of: \((i)\) \(c.Servers\subseteq \mathcal {S}\): a set of server identifiers; \((ii)\) \(c.Quorums\): the set of quorums on \(c.Servers\), s.t. \(\forall Q_1,Q_2\in c.Quorums, Q_1,Q_2\subseteq c.Servers\) and \(Q_1\cap Q_2\ne \emptyset\); \((iii)\) \({DAP(c)}\): the set of primitives (operations at level lower than reads or writes) that clients in \(\mathcal {I}\) may invoke on \(c.Servers\); and \((iv)\) \(c.Con\): a consensus instance with the values from \(\mathcal {C}\), implemented and running on top of the servers in \(c.Servers\). We refer to a server \(s \in c.Servers\) as a member of configuration c. The consensus instance \(c.Con\) in each configuration c is used as a service that returns the identifier of the configuration that follows c.
Executions. An algorithm A is a collection of processes, where process \(A_p\) is assigned to process \(p\in \mathcal {I}\cup \mathcal {S}\). The state, of a process \(A_p\) is determined over a set of state variables, and the state \(\sigma\) of A is a vector that contains the state of each process. Each process \(A_p\) implements a set of actions. When an action \(\alpha {}\) occurs it causes the state of \(A_p\) to change, say from some state \(\sigma _p\) to some different state \(\sigma _p^{\prime }\). We call the triple \(\langle \sigma _p, \alpha {}, \sigma _p^{\prime } \rangle\) a step of \(A_p\). Algorithm A performs a step, when some process \(A_p\) performs a step. An action \(\alpha {}\) is enabled in a state \(\sigma\) if \(\exists\) a step \(\langle \sigma , \alpha {}, \sigma ^{\prime } \rangle\) to some state \(\sigma ^{\prime }\). An execution is an alternating sequence of states and actions of A starting with the initial state and ending in a state. An execution \({\xi }\) is fair if enabled actions perform a step infinitely often. In the rest of the article, we consider executions that are fair and well-formed. A process p crashes in an execution if it stops taking steps; otherwise p is correct or non-faulty. We assume a function \(c.\mathcal {F}\) to describe the failure model of a configuration c.
Reconfigurable Atomic Read/Write Objects. A reconfigurable atomic object supports three operations: \({\sf read}()\), \({\sf write}(v)\) and \({\sf reconfig}(c)\). A read() operation returns the value of the atomic object, \({\sf write}(v)\) attempts to modify the value of the object to \(v\in {\mathcal {V}}\), and the \({\sf reconfig}(c)\) that attempts to install a new configuration \(c\in \mathcal {C}\). We assume well-formed executions where each client may invoke one operation (\({\sf read}()\), \({\sf write}(v)\) or \({\sf reconfig}(c)\)) at a time.
An implementation of a read/write or a reconfig operation contains an invocation action (such as a call to a procedure) and a response action (such as a return from the procedure). An operation \(\pi\) is complete in an execution, if it contains both the invocation and the matching response actions for \(\pi\); otherwise \(\pi\) is incomplete. We say that an operation \(\pi\) precedes an operation \(\pi ^{\prime }\) in an execution \({\xi }\), denoted by \(\pi \rightarrow \pi ^{\prime }\), if the response step of \(\pi\) appears before the invocation step of \(\pi ^{\prime }\) in \({\xi }\). Two operations are concurrent if neither precedes the other. An implementation A of a read/write object satisfies the atomicity (linearizability [29]) property if the following holds [38]. Let the set \(\Pi\) contain all complete read/write operations in any well-formed execution of A. Then there exists an irreflexive partial ordering \(\prec\) satisfying the following:
A1.
For any operations \(\pi _1\) and \(\pi _2\) in \(\Pi\), if \(\pi _1\rightarrow \pi _2\), then it cannot be the case that \(\pi _2\prec \pi _1\).
A2.
If \(\pi \in \Pi\) is a write operation and \(\pi ^{\prime }\in \Pi\) is any read/write operation, then either \(\pi \prec \pi ^{\prime }\) or \(\pi ^{\prime }\prec \pi\).
A3.
The value returned by a read operation is the value written by the last preceding write operation according to \(\prec\) (or the initial value if there is no such write).
Storage and Communication Costs. We are interested in the complexity of each read and write operation. The complexity of each operation \(\pi\) invoked by a process p, is measured with respect to three metrics, during the interval between the invocation and the response of \(\pi\): \((i)\) number of communication round, accounting the number of messages exchanged during \(\pi\), \((ii)\) storage efficiency (storage cost), accounting the maximum storage requirements for any single object at the servers during \(\pi\), and \((iii)\) message bit complexity (communication cost) which measures the size of the messages used during \(\pi\).
We define the total storage cost as the size of the data stored across all servers, at any point during the execution of the algorithm. The communication cost associated with a read or write operation is the size of the total data that gets transmitted in the messages sent as part of the operation. We assume that metadata, such as version number, process ID, and so on used by various operations is of negligible size, and is hence ignored in the calculation of storage and communication cost. Furthermore, we normalize both costs with respect to the size of the value v; in other words, we compute the costs under the assumption that v has size 1 unit.
Erasure Codes. We use an \([n, k]\) linear MDS code [31] over a finite field \(\mathbb {F}_q\) to encode and store the value v among the n servers. An \([n, k]\) MDS code has the property that any k out of the n coded elements can be used to recover (decode) the value v. For encoding, v is divided into k elements \(v_1, v_2, \ldots v_k\) with each element having size \(\frac{1}{k}\) (assuming size of v is 1). The encoder takes the k elements as input and produces n coded elements \(e_1, e_2, \ldots , e_n\) as output, i.e., \([e_1, \ldots , e_n] = \Phi ([v_1, \ldots , v_k])\), where \(\Phi\) denotes the encoder. For ease of notation, we simply write \(\Phi (v)\) to mean \([e_1, \ldots , e_n]\). The vector \([e_1, \ldots , e_n]\) is referred to as the codeword corresponding to the value v. Each coded element \(c_i\) also has size \(\frac{1}{k}\). In our scheme, we store one coded element per server. We use \(\Phi _i\) to denote the projection of \(\Phi\) on to the ith output component, i.e., \(e_i = \Phi _i(v)\). Without loss of generality, we associate the coded element \(e_i\) with server i, \(1 \le i \le n\).
Tags. We use logical tags to order operations. A tag \(\tau _{}\) is defined as a pair \((z, w)\), where \(z \in \mathbb {N}\) and \(w \in \mathcal {W}\), an ID of a writer. Let \(\mathcal {T}\) be the set of all tags. Notice that tags could be defined in any totally ordered domain and given that this domain is countably infinite, then there can be a direct mapping to the domain we assume. For any \(\tau _{1}, \tau _{2} \in \mathcal {T}\) we define \(\tau _{2} \gt \tau _{1}\) if \((i)\) \(\tau _{2}.z \gt \tau _{1}.z\) or \((ii)\) \(\tau _{2}.z = \tau _{1}.z\) and \(\tau _{2}.w \gt \tau _{1}.w\).
For ease of reference, Table 2 presents the key notation used in this article. Notice that some of the symbols shown are defined and used in the following sections.
Table 2.
\(\mathcal {S}\)the set of server identifiers
\(\mathcal {I}\)the set of client identifiers
\(\mathcal {R}\)the set of reader identifiers in \(\mathcal {I}\)
\(\mathcal {W}\)the set of writer identifiers in \(\mathcal {I}\)
\(\mathcal {G}\)the set of reconfigurer identifiers in \(\mathcal {I}\)
\({\mathcal {V}}\)the set of values allowed to be written on the shared object
va value in \({\mathcal {V}}\)
\(\mathcal {T}\)the set of pairs in \(\mathbb {N}\times \mathcal {W}\)
\(\tau _{}\)a pair \((z, w)\in \mathcal {T}\)
\(\mathcal {C}\)the set of configuration identifiers
ca configuration with identifier in \(\mathcal {C}\)
\(c.Servers\)the set of servers s.t. \(c.Servers\subseteq \mathcal {S}\) in configuration c
\(c.Quorums\)the set of subsets of servers s.t. \(\forall Q\in c.Quorums\), \(Q\subseteq c.Servers\) and \(\forall Q_1, Q_2\in c.Quorums, Q_1\cap Q_2\ne \emptyset\)
\(\sigma\)the state of an algorithm A
\(\sigma _p\)the state of process \(p\in \mathcal {I}\cup \mathcal {S}\) in state \(\sigma\) determined over a set of state variables
\(p.var|_{\sigma }\)the value of the state variable var at process p in state \(\sigma\)
\({\xi }\)an execution of algorithm A which is a finite or infinite sequence of alternative states and actions beginning with the initial state of A
\(\Phi ([v_1, \ldots , v_k])\) or \(\Phi ([v])\)the \([n,k]\) encoder function given k fragments of value v, \([v_1, \ldots , v_k]\)
\(e_i\)the ith encoded word, for \(1\le i\le n\), produced by \(\Phi ([v])\)
\({\mathcal {G}_L}\)configuration sequence composed of pairs in \(\lbrace \mathcal {C}\cup \lbrace \bot \rbrace \rbrace \times \lbrace F,P\rbrace\), where F finalized and P pending, and initially contains \(\langle c_0, F \rangle\)
Table 2. List of Symbols Used to Describe Our Model of Computation

3 Data Access Primitives

In this section we introduce a set of primitives, we refer to as DAP, which are invoked by the clients during read/write/reconfig operations and are defined for any configuration c in Ares. The DAPs allow us: \((i)\) to describe Ares in a modular manner, and \((ii)\) a cleaner reasoning about the correctness of Ares.
We define three DAPs for each \(c\in \mathcal {C}\): \((i)\) \({c}.{{\sf put-data}(\langle \tau _{},v \rangle)}\), via which a client can ingest the tag value pair \(\langle \tau _{},v \rangle\) in to the configuration c; \((ii)\) \({c}.{{\sf get-data}()}\), used to retrieve the most up to date tag and vlaue pair stored in the configuration c; and \((iii)\) \({c}.{{\sf get-tag}()}\), used to retrieve the most up to date tag for an object stored in a configuration c. More formally, assuming a tag \(\tau _{}\) from a set of totally ordered tags \({\mathcal {T}}\), a value v from a domain \({\mathcal {V}}\), and a configuration c from a set of identifiers \(\mathcal {C}\), the three primitives are defined as follows:
Definition 1 (DAPs).
Given a configuration identifier \(c\in \mathcal {C}\), any non-faulty client process p may invoke the following DAPs during an execution \({\xi }\), where c is added to specify the configuration specific implementation of the primitives:
D1: \({c}.{{\sf get-tag}()}\) that returns a tag \(\tau _{}\in {\mathcal {T}}\);
D2: \({c}.{{\sf get-data}()}\) that returns a tag-value pair \((\tau _{}, v) \in {\mathcal {T}}\times {\mathcal {V}}\),
D3: \({c}.{{\sf put-data}(\langle \tau _{}, v \rangle)}\) which accepts the tag-value pair \((\tau _{}, v) \in {\mathcal {T}}\times {\mathcal {V}}\) as argument.
In order for the DAPs to be useful in designing the Ares algorithm we further require the following consistency properties. As we see later in Section 6, the safety property of Ares holds, given that these properties hold for the DAPs in each configuration.
Property 1 (DAP Consistency Properties).
In an execution \({\xi }\) we say that a DAP operation in an execution \({\xi }\) is complete if both the invocation and the matching response step appear in \({\xi }\). If \(\Pi\) is the set of complete DAP operations in execution \({\xi }\) then for any \(\phi ,\pi \in \Pi\):
C1
If \(\phi\) is \({c}.{{\sf put-data}(\langle \tau _{\phi }, v_\phi \rangle)}\), for \(c \in \mathcal {C}\), \(\langle \tau _{\phi }, v_\phi \rangle \in {\mathcal {T}}\times {\mathcal {V}}\), and \(\pi\) is \({c}.{{\sf get-tag}()}\) (or \({c}.{{\sf get-data}()}\)) that returns \(\tau _{\pi } \in {\mathcal {T}}\) (or \(\langle \tau _{\pi }, v_{\pi } \rangle \in {\mathcal {T}}\times {\mathcal {V}}\)) and \(\phi\) completes before \(\pi\) is invoked in \({\xi }\), then \(\tau _{\pi } \ge \tau _{\phi }\).
C2
If \(\phi\) is a \({c}.{{\sf get-data}()}\) that returns \(\langle \tau _{\pi }, v_\pi \rangle \in {\mathcal {T}}\times {\mathcal {V}}\), then there exists \(\pi\) such that \(\pi\) is a \({c}.{{\sf put-data}(\langle \tau _{\pi }, v_{\pi } \rangle)}\) and \(\phi\) did not complete before the invocation of \(\pi\). If no such \(\pi\) exists in \({\xi }\), then \((\tau _{\pi }, v_{\pi })\) is equal to \((t_0, v_0)\).
In Section 5, we show how to implement a set of DAPs, where erasure-codes are used to reduce storage and communication costs. Our DAP implementation satisfies Property 1.
As noted earlier, expressing Ares in terms of the DAPs allows one to achieve a modular design. Modularity enables the usage of different DAP implementations per configuration, during any execution of Ares, as long as the DAPs implemented in each configuration satisfy Property 1. For example, the DAPs in a configuration c may be implemented using replication, while the DAPs in the next configuration say \(c^{\prime }\), may be implemented using erasure-codes. Thus, a system may use a scheme that offers higher fault tolerance (e.g., replication) when storage is not an issue while switching to a more storage efficient (less fault-tolerant) scheme when storage gets limited.
In Section 8, we show that the presented DAPs are not only suitable for algorithm Ares, but can also be used to implement a large family of atomic read/write storage implementations. By describing an algorithm A according to a simple algorithmic template (see Algorithm 1), we show that A preserves safety (atomicity) if the used DAPs satisfy Property 1, and A preserves liveness (termination), if every invocation of the used DAPs terminates, under the failure model assumed.

4 The Ares Protocol

In this section, we describe Ares. In the presentation of Ares algorithm we decouple the reconfiguration service from the shared memory emulation, by utilizing the DAPs presented in Section 3. This allows Ares, to handle both the reorganization of the servers that host the data, as well as utilize a different atomic memory implementation per configuration. It is also important to note that Ares adopts a client-server architecture and separates the reader, writer and reconfiguration processes from the server processes that host the object data. More precisely, Ares algorithm comprises of three major components: \((i)\) The reconfiguration protocol which consists of invoking, and subsequently installing new configuration via the reconfig operation by recon clients. \((ii)\) The read/write protocol for executing the read and write operations invoked by readers and writers. \((iii)\) The implementation of the DAPs for each installed configuration that respect Property 1 and which are used by the reconfig, read and write operations.

4.1 Implementation of the Reconfiguration Service

In this section, we describe the reconfiguration service in Ares. The service relies on an underlying sequence of configurations (already proposed or installed by reconfig operations), in the form of a “distributed list”, which we refer to as the global configuration sequence (or list) \({\mathcal {G}_L}\). Conceptually, \({\mathcal {G}_L}\) represents an ordered list of pairs \(\langle cfg, status \rangle\), where cfg is a configuration identifier (\(cfg \in \mathcal {C}\)), and a binary state variable \(status \in \lbrace F, P\rbrace\) that denotes whether c is finalized (F) or is still pending (P). Initially, \({\mathcal {G}_L}\) contains a single element, say \(\langle c_0, F \rangle\), which is known to every participant in the service.
To facilitate the creation of \({\mathcal {G}_L}\), each server in \(c.Servers\) maintains a local variable \(nextC \in \lbrace \mathcal {C}\cup \lbrace \bot \rbrace \rbrace \times \lbrace P,F\rbrace\), which is used to point to the configuration that follows c in \({\mathcal {G}_L}\). Initially, at any server \(nextC = \langle \bot , F \rangle\). Once nextC is set to a value it is never altered. As we show below, at any point in the execution of Ares and in any configuration c, the nextC variables of the non-faulty servers in c that are not equal to \(\bot\) agree, i.e., \(\lbrace s.nextC : s \in c.Servers \wedge s.nextC\ne \bot \rbrace\) is either empty of has only one element.
Clients discover the configuration that follows a \(\langle c,* \rangle\) in the sequence by contacting a subset of servers in \(c.Servers\) and collecting their nextC variables. Every client in \(\mathcal {I}\) maintains a local variable cseq that is expected to be some subsequence of \({\mathcal {G}_L}\). Initially, at every client the value of cseq is \(\langle c_0,F \rangle\).
Reconfiguration clients may introduce new configurations, each associated with a unique configuration identifier from \(\mathcal {C}\). Multiple clients may concurrently attempt to introduce different configurations for same next link in \({\mathcal {G}_L}\). Ares uses consensus to resolve such conflicts: a subset of servers in \(c.Servers\), in each configuration c, implements a distributed consensus service (such as Paxos [37], RAFT [45]), denoted by \(c.Con\).
The reconfiguration service consists of two major components: \((i)\) sequence traversal, responsible of discovering a recent configuration in \({\mathcal {G}_L}\), and \((ii)\) reconfiguration operation that installs new configurations in \({\mathcal {G}_L}\).
Sequence Traversal. Any read/write/reconfig operation \(\pi\) utilizes the sequence traversal mechanism to discover the latest state of the global configuration sequence, as well as to ensure that such a state is discoverable by any subsequent operation \(\pi ^{\prime }\). See Figure 1 for an example execution in the case of a reconfig operation. In a high level, a client starts by collecting the nextC variables from a quorum of servers in a configuration c, such that \(\langle c,F \rangle\) is the last finalized configuration in that client’s local cseq variable (or \(c_0\) if no other finalized configuration exists). If any server s returns a nextC variable such that \(nextC.cfg\ne \bot\), then the client \((i)\) adds nextC in its local cseq, \((ii)\) propagates nextC in a quorum of servers in \(c.Servers\), and \((iii)\) repeats this process in the configuration \(nextC.cfg\). The client terminates when all servers reply with \(nextC.cfg=\bot\). More precisely, the sequence parsing consists of three actions (see Algorithm 1):
Fig. 1.
Fig. 1. Illustration of an execution of the reconfiguration steps.
get-next-config \((c)\): The action \({\sf get-next-config}\) returns the configuration that follows c in \({\mathcal {G}_L}\). During get-next-config\((c)\), a client sends read-config messages to all the servers in \(c.Servers\), and waits for replies containing nextC from a quorum in \(c.Quorums\). If there exists a reply with \(nextC.cfg\ne \bot\) the action returns nextC; otherwise it returns \(\bot\).
put-config \((c, c^{\prime })\): The \({\sf put-config}(c, c^{\prime })\) action propagates \(c^{\prime }\) to a quorum of servers in \(c.Servers\). During the action, the client sends \((\mbox{{write-config}}, c^{\prime })\) messages, to the servers in \(c.Servers\) and waits for each server s in some quorum \(Q\in c.Quorums\) to respond.
read-config \((seq)\): A \({\sf read-config}(seq)\) sequentially traverses the installed configurations in order to discover the latest state of the sequence \({\mathcal {G}_L}\). At invocation, the client starts with the last finalized configuration \(\langle c, F \rangle\) in the given seq (Line A1:2), and enters a loop to traverse \({\mathcal {G}_L}\) by invoking \({\sf get-next-config}()\), which returns the next configuration, assigned to \(\widehat{c}\). While \(\widehat{c} \ne \bot\), then: (a) \(\widehat{c}\) is appended at the end of the sequence seq; (b) a \({\sf put-config}\) action is invoked to inform a quorum of servers in \(c.Servers\) to update the value of their nextC variable to \(\widehat{c}\). If \(\widehat{c} = \bot\) the loop terminates and the action read-config returns seq.
Reconfiguration operation. A reconfiguration operation \({\sf reconfig}(c)\), \(c \in \mathcal {C}\), invoked by any reconfiguration client \(rec_i\), attempts to append c to \({\mathcal {G}_L}\). The set of server processes in c are not a part of any other configuration different from c. In a high level, \(rec_i\) first executes a sequence traversal to discover the latest state of \({\mathcal {G}_L}\). Then it attempts to add the new configuration c, at the end of the discovered sequence by proposing c in the consensus instance of the last configuration in the sequence. The client accepts and appends the decision of the consensus instance (that might be different than c). Then it attempts to transfer the latest value of the read/write object to the latest installed configuration. Once the information is transferred, \(rec_i\) finalizes the last configuration in its local sequence and propagates the finalized tuple to a quorum of servers in that configuration. The operation consists of four phases, executed consecutively by \(rec_i\) (see Algorithm 2):
\({\sf read-config}(seq)\): The phase \({\sf read-config}(seq)\) at \(rec_i\), reads the recent global configuration sequence as described in the sequence traversal.
\({\sf add-config}(seq, c)\): The \({\sf add-config}(seq, c)\) attempts to append a new configuration c to the end of seq (client’s view of \({\mathcal {G}_L}\)). Suppose the last configuration in seq is \(c^{\prime }\) (with status either F or P), then in order to decide the most recent configuration, \(rec_i\) invokes \(c^{\prime }.Con.propose(c)\), on the consensus object associated with configuration \(c^{\prime }\). Let \(d\in \mathcal {C}\) be the configuration identifier decided by the consensus service. If \(d \ne c\), this implies that another (possibly concurrent) reconfiguration operation, invoked by a reconfigurer \(rec_j\ne rec_i\), proposed and succeeded d as the configuration to follow \(c^{\prime }\). In this case, \(rec_i\) adopts d as it own propose configuration, by adding \(\langle d, P \rangle\) to the end of its local cseq (entirely ignoring c), using the operation \({\sf put-config}(c^{\prime }, \langle d, P \rangle)\), and returns the extended configuration seq.
\({\sf update-config}(seq)\): Let us denote by \(\mu\) the index of the last configuration in the local sequence cseq, at \(rec_i\), such that its corresponding status is F; and \(\nu\) denotes the last index of cseq. Next \(rec_i\) invokes \({\sf update-config}(cseq)\), which gathers the tag-value pair corresponding to the maximum tag in each of the configurations in \(cseq[i]\) for \(\mu \le i \le \nu\), and transfers that pair to the configuration that was added by the \({\sf add-config}\) action. The \({\sf get-data}\) and \({\sf put-data}\) DAPs are used to transfer the value of the object to the new configuration, and they are implemented with respect to the configuration that is accessed. Suppose \(\langle t_{max}, v_{max} \rangle\) is the tag value pair corresponding to the highest tag among the responses from all the \(\nu - \mu + 1\) configurations. Then, \(\langle t_{max}, v_{max} \rangle\) is written to the configuration d via the invocation of \(cseq[\nu ].cfg.{\sf put-data}(\langle \tau _{max},v_{max} \rangle)\).
\({\sf finalize-config}(cseq)\): Once the tag-value pair is transferred, in the last phase of the reconfiguration operation, \(rec_i\) executes \({\sf finalize-config}(cseq)\), to update the status of the last configuration in cseq, say \(d = cseq[\nu ].cfg\), to F. The reconfigurer \(rec_i\) informs a quorum of servers in the previous configuration \(c=cseq[\nu -1].cfg\), i.e., in some \(Q \in c.Quorums\), about the change of status, by executing the \({\sf put-config}(c, \langle d,F \rangle)\) action.
Server Protocol. Each server responds to requests from clients (Algorithm 3). A server waits for two types of messages: read-config and write-config. When a read-config message is received for a particular configuration \(c_k\), then the server returns nextC variables of the servers in \(c_k.Servers\). A write-config message attempts to update the nextC variable of the server with a particular tuple \(cfgT_{in}\). A server changes the value of its local \(nextC.cfg\) in two cases: (i) \(nextC.cfg=\bot\), or (ii) \(nextC.status= P\).
Figure 1 illustrates an example execution of a reconfiguration operation \({\sf recon}(c_5)\). In this example, the reconfigurer \(rec_i\) goes through a number of configuration queries (get-next-config) before it reaches configuration \(c_4\) in which a quorum of servers replies with \(nextC.cfg=\bot\). There it proposes \(c_5\) to the consensus object of \(c_4\) (\(c_4.Con.propose(c_5)\) on arrow 10), and once \(c_5\) is decided, \({\sf recon}(c_5)\) completes after executing \({\sf finalize-config}(c_5)\).

4.2 Implementation of Read and Write Operations

The read and write operations in Ares are expressed in terms of the DAP primitives (see Section 3). This provides the flexibility to Ares to use different implementation of DAP primitives in different configurations, without changing the basic structure of Ares. At a high-level, a write (or read) operation is executed where the client: \((i)\) obtains the latest configuration sequence by using the \({\sf read-config}\) action of the reconfiguration service, \((ii)\) queries the configurations, in cseq, starting from the last finalized configuration to the end of the discovered configuration sequence, for the latest \(\langle tag,value \rangle\) pair with a help of \({\sf get-tag}\) (or \({\sf get-data}\)) operation as specified for each configuration, and \((iii)\) repeatedly propagates a new \(\langle tag^{\prime }, value^{\prime } \rangle\) pair (the largest \(\langle tag,value \rangle\) pair) with \({\sf put-data}\) in the last configuration of its local sequence, until no additional configuration is observed. In more detail, the algorithm of a read or write operation \(\pi\) is as follows (see Algorithm 4):
A write (or read) operation is invoked at a client p when line Algorithm 4:8 (respectively, line Algorithm 4:31) is executed. At first, p issues a \({\sf read-config}\) action to obtain the latest introduced configuration in \({\mathcal {G}_L}\), in both operations.
If \(\pi\) is a writep detects the last finalized entry in cseq, say \(\mu\), and performs a \(cseq[j].conf.{\sf get-tag}()\) action, for \(\mu \le j\le |cseq|\) (line Algorithm 4:9). Then p discovers the maximum tag among all the returned tags (\(\tau _{max}\)), and it increments the maximum tag discovered (by incrementing the integer part of \(\tau _{max}\)), generating a new tag, say \(\tau _{new}\). It assigns \(\langle \tau _{}, v \rangle\) to \(\langle \tau _{new}, val \rangle\), where val is the value he wants to write (Line Algorithm 4:13).
if \(\pi\) is a read, p detects the last finalized entry in cseq, say \(\mu\), and performs a \(cseq[j].conf.{\sf get-data}()\) action, for \(\mu \le j\le |cseq|\) (line Algorithm 4:32). Then p discovers the maximum tag-value pair (\(\langle \tau _{max},v_{max} \rangle\)) among the replies, and assigns \(\langle \tau _{}, v \rangle\) to \(\langle \tau _{max},v_{max} \rangle\).
Once specifying the \(\langle \tau _{}, v \rangle\) to be propagated, both reads and writes execute the \(cseq[\nu ].cfg.{\sf put-data}(\langle \tau _{}, v \rangle)\) action, where \(\nu =|cseq|\), followed by executing \({\sf read-config}\) action, to examine whether new configurations were introduced in \({\mathcal {G}_L}\). This is an essential step that ensures that any new value of the object is propagated in any recently introduced configuration. Omission to do so may lead an operation that reads from a newly established configuration to obtain an outdated value for the shared object, violating this way atomic consistency. Each operation repeats these steps until no new configuration is discovered (lines Algorithm 4:15–21, or lines Algorithm 4:37–43). Let \(cseq^{\prime }\) be the sequence returned by the \({\sf read-config}\) action. If \(|cseq^{\prime }| = |cseq|\) then no new configuration is introduced, and the read/write operation terminates; otherwise, p sets cseq to \(cseq^{\prime }\) and repeats the two actions. Note, in an execution of Ares, two consecutive \({\sf read-config}\) operations that return \(cseq^{\prime }\) and \(cseq^{\prime \prime }\) respectively must hold that \(cseq^{\prime }\) is a prefix of \(cseq^{\prime \prime }\), and hence \(|cseq^{\prime }|=|cseq^{\prime \prime }|\) only if \(cseq^{\prime } = cseq^{\prime \prime }\). Finally, if \(\pi\) is a read operation the value with the highest tag discovered is returned to the client.
Discussion. Ares shares similarities with previous algorithms like RAMBO [28] and the framework in [48]. The reconfiguration technique used in Ares ensures the prefix property on the configuration sequence (resembling a blockchain data structure [42]) while the array structure in RAMBO allowed nodes to maintain an incomplete reconfiguration history. On the other hand, the DAP usage, exploits a different viewpoint compared to [48], allowing implementations of atomic read/write registers without relying on strong objects, like ranked registers [18]. Note that Ares is designed to capture a wide class of algorithms with different redundancy strategies. So while not directly implementing an EC-based atomic memory, it provides the “vehicle” without which dynamic EC-based implementations would not have been possible. Lastly, even though Ares is designed to support crash failures, as noted by [41], reconfiguration is more general and allows an algorithm to handle benign recoveries. That is, a recovered node that loses its state can be introduced as a new member of a new configuration. Stateful recoveries on the other hand are indistinguishable from long delays, thus can be handled effectively by an algorithm designed for the asynchronous model like Ares.

5 Implementation of the DAPs

In this section, we present an implementation of the DAPs, that satisfies the properties in Property 1, for a configuration c, with n servers using a \([n, k]\) MDS coding scheme for storage. Notice that the total number of servers in the system can be larger than n; however, we can pick a subset of n servers to use for this particular key and instance of the algorithm. We store each coded element \(c_i\), corresponding to an object at server \(s_i\), where \(i=1, \ldots , n\). The implementations of DAP primitives used in Ares are shown in Algorithm 5, and the servers’ responses in Algorithm 6.
Each server \(s_i\) stores one state variable, List, which is a set of up to \((\delta + 1)\) (tag, coded-element) pairs. Initially the set at \(s_i\) contains a single element, \(List = \lbrace (t_0, \Phi _i(v_0)\rbrace\). Below we describe the implementation of the DAPs.
\({c}.{{\sf get-tag}()}\): A client, during the execution of a \({c}.{{\sf get-tag}()}\) primitive, queries all the servers in \(c.Servers\) for the highest tags in their Lists, and await responses from \(\lceil \frac{n+k}{2} \rceil\) servers. A server upon receiving the get-tag request, responds to the client with the highest tag, as \(\tau _{max} \equiv \max _{(t,c) \in List}t\). Once the client receives the tags from \(\lceil \frac{n+k}{2}\) servers, it selects the highest tag t and returns it.
\(c.{\sf put-data}(\langle t_w, v \rangle)\): During the execution of the primitive \(c.{\sf put-data}(\langle t_w, v \rangle)\), a client sends the pair \((t_w, \Phi _i(v))\) to each server \(s_i\in c.Servers\). When a server \(s_i\) receives a message \((\text{put-data}, t_w, c_i)\), it adds the pair in its local List, trims the pairs with the smallest tags exceeding the length \((\delta +1)\) of the List, and replies with an ack to the client. In particular, \(s_i\) replaces the coded-elements of the older tags with \(\bot\), and maintains only the coded-elements associated with the \((\delta +1)\) highest tags in the List (see Line Algorithm 6:16). The client completes the primitive operation after getting acks from \(\lceil \frac{n+k}{2} \rceil\) servers.
\({c}.{{\sf get-data}()}\): A client, during the execution of a \({c}.{{\sf get-data}()}\) primitive, queries all the servers in \(c.Servers\) for their local variable List, and awaits responses from \(\lceil \frac{n+k}{2} \rceil\) servers. Once the client receives Lists from \(\lceil \frac{n+k}{2} \rceil\) servers, it selects the highest tag t, such that: \((i)\) its corresponding value v is decodable from the coded elements in the lists; and \((ii)\) t is the highest tag seen from the responses of at least k Lists (see lines Algorithm 5:11–14) and returns the pair \((t, v)\). Note that in the case where anyone of the above conditions is not satisfied the corresponding read operation does not complete.

5.1 Correctness of the DAPs

To proove the correctness of the proposed DAPs, we need to show that they are both safe, i.e., ensure the necessary Property 1, and live, i.e., they allow each operation to terminate. We first proceed to prove that for any given execution \({\xi }\) containing operations of the proposed implementation, then examining any pair of operations in \({\xi }\) satisfy the DAP consistency properties (i.e., Property 1). That is, the tag returned by a \({\sf get-tag}()\) operation is larger than the value written by any preceding \({\sf put-data}()\) operation, and the value returned by a \({\sf get-data}()\) operation is either written by a \({\sf put-data}()\) operation or is the initial value of the object. Next, assuming that there cannot be more that \(\delta\) \({\sf put-data}()\) operations concurrent with a single \({\sf get-data}()\) operation, we show that each operation in our implementation terminates. Otherwise, a \({\sf get-data}()\) operation is at risk of not being able to discover a decodable value and thus fails to terminate and return a value.
Safety (Property 1). In this section, we are concerned with only one configuration c, consisting of a set of servers \(c.Servers\). We assume that at most \(f \le \frac{n-k}{2}\) servers from \(c.Servers\) may crash. Lemma 2 states that the DAP implementation satisfies the consistency properties of Property 1 which will be used to imply the atomicity of the Ares algorithm.
Theorem 2 (Safety).
Let \(\Pi\) a set of complete DAP operations of Algorithm 5 in a configuration \(c\in \mathcal {C}\), \({\sf c.get-tag}\), \({\sf c.get-data}\) and \({\sf c.put-data}\), of an execution \({\xi }\). Then, every pair of operations \(\phi ,\pi \in \Pi\) satisfy Property 1.
Proof.
As mentioned above we are concerned with only configuration c, and, therefore, in our proofs it suffices to examine only one configuration. Let \({\xi }\) be some execution of Ares, then we consider two cases for \(\pi\) for proving property C1: \(\pi\) is a \({\sf get-tag}\), or \(\pi\) is a \({\sf get-data}\) primitive.
Case \((a)\): \(\phi\) is \({c}.{{\sf put-data}(\langle \tau _{\phi }, v_\phi \rangle)}\) and \(\pi\) is a \({c}.{{\sf get-tag}()}\) returns \(\tau _{\pi } \in {\mathcal {T}}\). Let \(c_{\phi }\) and \(c_{\pi }\) denote the clients that invokes \(\phi\) and \(\pi\) in \({\xi }\). Let \(S_{\phi } \subset \mathcal {S}\) denote the set of \(\lceil \frac{n+k}{2} \rceil\) servers that responds to \(c_{\phi }\), during \(\phi\). Denote by \(S_{\pi }\) the set of \(\lceil \frac{n+k}{2} \rceil\) servers that responds to \(c_{\pi }\), during \(\pi\). Let \(T_1\) be a point in execution \({\xi }\) after the completion of \(\phi\) and before the invocation of \(\pi\). Because \(\pi\) is invoked after \(T_1\), therefore, at \(T_1\) each of the servers in \(S_{\phi }\) contains \(t_{\phi }\) in its List variable. Note that, once a tag is added to List, it is never removed. Therefore, during \(\pi\), any server in \(S_{\phi }\cap S_{\pi }\) responds with List containing \(t_{\phi }\) to \(c_{\pi }\). Note that since \(|S_{\phi }| = |S_{\pi }| =\lceil \frac{n+k}{2} \rceil\) implies \(| S_{\phi } \cap S_{\pi } | \ge k\), and hence \(t^{dec}_{max}\) at \(c_{\pi }\), during \(\pi\) is at least as large as \(t_{\phi }\), i.e., \(t_{\pi } \ge t_{\phi }\). Therefore, it suffices to prove our claim with respect to the tags and the decodability of its corresponding value.
Case \((b)\): \(\phi\) is \({c}.{{\sf put-data}(\langle \tau _{\phi }, v_\phi \rangle)}\) and \(\pi\) is a \({c}.{{\sf get-data}()}\) returns \(\langle \tau _{\pi }, v_{\pi } \rangle \in {\mathcal {T}}\times {\mathcal {V}}\). As above, let \(c_{\phi }\) and \(c_{\pi }\) be the clients that invokes \(\phi\) and \(\pi\). Let \(S_{\phi }\) and \(S_{\pi }\) be the set of servers that responds to \(c_{\phi }\) and \(c_{\pi }\), respectively. Arguing as above, \(| S_{\phi } \cap S_{\pi } | \ge k\) and every server in \(S_{\phi } \cap S_{\pi }\) sends \(t_{\phi }\) in response to \(c_{\phi }\), during \(\pi\), in their List’s and hence \(t_{\phi } \in Tags_{*}^{\ge k}\). Now, because \(\pi\) completes in \({\xi }\), hence we have \(t^*_{max} = t^{dec}_{max}\). Note that \(\max Tags_{*}^{\ge k} \ge \max Tags_{dec}^{\ge k}\) so \(t_{\pi } \ge \max Tags_{dec}^{\ge k} = \max Tags_{*}^{\ge k} \ge t_{\phi }\). Note that each tag is always associated with its corresponding value \(v_{\pi }\), or the corresponding coded elements \(\Phi _s(v_{\pi })\) for \(s \in \mathcal {S}\).
Next, we prove the C2 property of DAP for the Ares algorithm. Note that the initial values of the List variable in each servers s in \(\mathcal {S}\) is \(\lbrace (t_0, \Phi _s(v_{\pi }))\rbrace\). Moreover, from an inspection of the steps of the algorithm, new tags in the List variable of any servers of any servers is introduced via \({\sf put-data}\) operation. Since \(t_{\pi }\) is returned by a \({\sf get-tag}\) or \({\sf get-data}\) operation then it must be that either \(t_{\pi }=t_0\) or \(t_{\pi } \gt t_0\). In the case where \(t_{\pi } = t_0\) then we have nothing to prove. If \(t_{\pi } \gt t_0\) then there must be a \({\sf put-data}(t_{\pi }, v_{\pi })\) operation \(\phi\). To show that for every \(\pi\) it cannot be that \(\phi\) completes before \(\pi\), we adopt by a contradiction. Suppose for every \(\pi\), \(\phi\) completes before \(\pi\) begins, then clearly \(t_{\pi }\) cannot be returned \(\phi\), a contradiction.□
Liveness. To reason about the liveness of the proposed DAPs, we define a concurrency parameter \(\delta\) which captures all the \({\sf put-data}\) operations that overlap with the \({\sf get-data}\), until the time the client has all data needed to attempt decoding a value. However, we ignore those \({\sf put-data}\) operations that might have started in the past, and never completed yet, if their tags are less than that of any \({\sf put-data}\) that completed before the \({\sf get-data}\) started. This allows us to ignore \({\sf put-data}\) operations due to failed clients, while counting concurrency, as long as the failed \({\sf put-data}\) operations are followed by a successful \({\sf put-data}\) that completed before the \({\sf get-data}\) started. In order to define the amount of concurrency in our specific implementation of the DAPs presented in this section the following definition captures the \({\sf put-data}\) operations that overlap with the \({\sf get-data}\) until the client has all data required to decode the value.
Definition 3 (Valid get-data Operations)
A \({\sf get-data}\) operation \(\pi\) from a process p is valid if p does not crash until the reception of \(\lceil \frac{n+k}{2} \rceil\) responses during the get-data phase.
Definition 4 (put-data Concurrent With a Valid get-data)
Consider a valid \({\sf get-data}\) operation \(\pi\) from a process p. Let \(T_1\) denote the point of initiation of \(\pi\). For \(\pi\), let \(T_2\) denote the earliest point of time during the execution when p receives all the \(\lceil \frac{n+k}{2} \rceil\) responses. Consider the set \(\Sigma = \lbrace \phi : \phi\) is any \({\sf put-data}\) operation that completes before \(\pi \text{ is initiated} \rbrace\), and let \(\phi ^* = \arg \max _{\phi \in \Sigma }tag(\phi)\). Next, consider the set \(\Lambda = \lbrace \lambda : \lambda\) is any \({\sf put-data}\) operation that starts before \(T_2 \text{ such that } tag(\lambda) \gt tag(\phi ^*)\rbrace\). We define the number of \({\sf put-data}\) concurrent with the valid \({\sf get-data}\) \(\pi\) to be the cardinality of the set \(\Lambda\).
Termination (and hence liveness) of the DAPs is guaranteed in an execution \({\xi }\), provided that a process no more than f servers in \(c.Servers\) crash, and no more that \(\delta\) \({\sf put-data}\) may be concurrent at any point in \({\xi }\). If the failure model is satisfied, then any operation invoked by a non-faulty client will collect the necessary replies independently of the progress of any other client process in the system. Preserving \(\delta\) on the other hand, ensures that any operation will be able to decode a written value. These are captured in the following theorem:
Theorem 5 (Liveness).
Let \({\xi }\) be well-formed and fair execution of DAPs, with an \([n, k]\) MDS code, where n is the number of servers out of which no more than \(\frac{n-k}{2}\) may crash, and \(\delta\) be the maximum number of \({\sf put-data}\) operations concurrent with any valid \({\sf get-data}\) operation. Then any \({\sf get-data}\) and \({\sf put-data}\) operation \(\pi\) invoked by a process p terminates in \({\xi }\) if p does not crash between the invocation and response steps of \(\pi\).
Proof.
Note that in the read and write operation the \({\sf get-tag}\) and \({\sf put-data}\) operations initiated by any non-faulty client always complete. Therefore, the liveness property with respect to any write operation is clear because it uses only \({\sf get-tag}\) and \({\sf put-data}\) operations of the DAP. So, we focus on proving the liveness property of any read operation \(\pi\), specifically, the \({\sf get-data}\) operation completes. Let \({\xi }\) be and execution of Ares and let \(c_{\omega }\) and \(c_{\pi }\) be the clients that invokes the write operation \(\omega\) and read operation \(\pi\), respectively.
Let \(S_{\omega }\) be the set of \(\lceil \frac{n+k}{2} \rceil\) servers that responds to \(c_{\omega }\), in the \({\sf put-data}\) operations, in \(\omega\). Let \(S_{\pi }\) be the set of \(\lceil \frac{n+k}{2} \rceil\) servers that responds to \(c_{\pi }\) during the \({\sf get-data}\) step of \(\pi\). Note that in \({\xi }\) at the point execution \(T_1\), just before the execution of \(\pi\), none of the write operations in \(\Lambda\) is complete. Observe that, by algorithm design, the coded-elements corresponding to \(t_{\omega }\) are garbage-collected from the List variable of a server only if more than \(\delta\) higher tags are introduced by subsequent writes into the server. Since the number of concurrent writes \(|\Lambda |\), s.t. \(\delta \gt | \Lambda |\) the corresponding value of tag \(t_{\omega }\) is not garbage collected in \({\xi }\), at least until execution point \(T_2\) in any of the servers in \(S_{\omega }\).
Therefore, during the execution fragment between the execution points \(T_1\) and \(T_2\) of the execution \({\xi }\), the tag and coded-element pair is present in the List variable of every in \(S_{\omega }\) that is active. As a result, the tag and coded-element pairs, \((t_{\omega }, \Phi _s(v_{\omega }))\) exists in the List received from any \(s \in S_{\omega } \cap S_{\pi }\) during operation \(\pi\). Note that since \(|S_{\omega }| = |S_{\pi }| =\lceil \frac{n+k}{2} \rceil\) hence \(|S_{\omega } \cap S_{\pi } | \ge k\) and hence \(t_{\omega } \in Tags_{dec}^{\ge k}\), the set of decode-able tag, i.e., the value \(v_{\omega }\) can be decoded by \(c_{\pi }\) in \(\pi\), which demonstrates that \(Tags_{dec}^{\ge k} \ne \emptyset\). Next we want to argue that \(t_{max}^* = t_{max}^{dec}\) via a contradiction: we assume \(\max Tags_{*}^{\ge k} \gt \max Tags_{dec}^{\ge k}\). Now, consider any tag t, which exists due to our assumption, such that, \(t \in Tags_{*}^{\ge k}\), \(t \not\in Tags_{dec}^{\ge k}\) and \(t \gt t_{max}^{dec}\). Let \(S^k_{\pi } \subset S\) be any subset of k servers that responds with \(t^*_{max}\) in their List variables to \(c_{\pi }\). Note that since \(k \gt n/3\) hence \(|S_{\omega } \cap S^k_{\pi }| \ge \lceil \frac{n+k}{2} \rceil + \lceil \frac{n+1}{3} \rceil \ge 1\), i.e., \(S_{\omega } \cap S^k_{\pi } \ne \emptyset\). Then t must be in some servers in \(S_{\omega }\) at \(T_2\) and since \(t \gt t_{max}^{dec} \ge t_{\omega }\). Now since \(|\Lambda | \lt \delta\) hence \((t, \bot)\) cannot be in any server at \(T_2\) because there are not enough concurrent write operations (i.e., writes in \(\Lambda\)) to garbage-collect the coded-elements corresponding to tag t, which also holds for tag \(t^{*}_{max}\). In that case, t must be in \(Tag_{dec}^{\ge k}\), a contradiction.□

6 Correctness of Ares

In this section, we prove that Ares correctly implements an atomic, read/write, shared storage service. The correctness of Ares highly depends on the way the configuration sequence is constructed at each client process. Also, atomicity is ensured if the DAP implementation in each configuration \(c_i\) satisfies Property 1.
As a roadmap, we begin by showing that some critical properties are preserved by the reconfiguration service proposed in Section 6.1. In particular, we show that the configuration sequence maintained in two processes is either the same or one is the prefix of the other. This in turn helps us to proove the correctness of Ares in Section 6.2 by showing that all the properties of atomicity (see Section 2) are satisfied, given the properties on the configuration sequence hold, and that the DAPs used in each configuration satisfy Property 1.
We proceed by first introducing some definitions and notation, that we use in the proofs that follow.
Notations and definitions. For a server s, we use the notation \(s.var|_{\sigma }\) to refer to the value of the state variable var, in s, at a state \(\sigma\) of an execution \({\xi }\). If server s crashes at a state \(\sigma _f\) in an execution \({\xi }\) then \(s.var|_{\sigma }\triangleq s.var|_{\sigma _f}\) for any state variable var and for any state \(\sigma\) that appears after \(\sigma _f\) in \({\xi }\) (i.e., the value of the variable remains unchanged).
We define as the tag of a configuration c the smallest tag among the maximum tags found in each quorum of c. This is essentially the smallest tag that an operation may witness when receiving replies from a single quorum in c. More formally:
Definition 6 (Tag of a Configuration).
Let \(c \in \mathcal {C}\) be a configuration, \(\sigma\) be a state in some execution \({\xi }\) then we define the tag of c at state \(\sigma\) as \(tag(c)|_{\sigma } \triangleq \min _{Q \in c.Quorums} \max _{s \in Q}~(s.tag|_{\sigma }).\) We drop the suffix \(|_\sigma\), and simply denote as \(tag(c)\), when the state is clear from the context.
Next we provide the notation to express the configuration sequence witnessed by a process p in a state \(\sigma\) (as \(p.cseq|_{\sigma }\)), the last finalized configuration in that sequence (as \(\mu (\mathbf {c}^{p}_{\sigma })\)), and the length of that sequence (as \(\nu (\mathbf {c}^{p}_{\sigma })\)). More formally:
Definition 7.
Let \(\sigma\) be any point in an execution of Ares and suppose we use the notation \(\mathbf {c}^{p}_{\sigma }\) for \(p.cseq|_{\sigma }\), i.e., the cseq variable at process p at the state \(\sigma\). Then we define as \(\mu (\mathbf {c}^{p}_{\sigma }) \triangleq \max \lbrace i : \mathbf {c}^{p}_{\sigma }[i].status = F\rbrace\) and \(\nu (\mathbf {c}^{p}_{\sigma }) \triangleq |\mathbf {c}^{p}_{\sigma }|\), where \(|\mathbf {c}^{p}_{\sigma }|\) is the length of the configuration vector \(\mathbf {c}^{p}_{\sigma }\).
Last, we define the prefix operation on two configuration sequences.
Definition 8 (Prefix Order).
Let \(\mathbf {x}\) and \(\mathbf {y}\) be any two configuration sequences. We say that \(\mathbf {x}\) is a prefix of \(\mathbf {y}\), denoted by \(\mathbf {x} \preceq _p \mathbf {y}\), if \(\mathbf {x}[j].cfg=\mathbf {y}[j].cfg\), for all j such that \(\mathbf {x}[j]\ne \bot\).
Table 3 summarizes the new notation for ease of reference.
Table 3.
\(\mathbf {c}^{p}_{\sigma }\)the value of the configuration sequence variable cseq at process p in state \(\sigma\), i.e., a shorthand of \(p.cseq|_{\sigma }\)
\(\mathbf {c}^{p}_{\sigma }[i]\)the \(i{th}\) element in the configuration sequence \(\mathbf {c}^{p}_{\sigma }\)
\(\mu (\mathbf {c}^{p}_{\sigma })\)last finalized configuration in \(\mathbf {c}^{p}_{\sigma }\)
\(\nu (\mathbf {c}^{p}_{\sigma })\)the length of \(\mathbf {c}^{p}_{\sigma }\), i.e., \(|\mathbf {c}^{p}_{\sigma }|\)
Table 3. Additional Notation Used in This Section

6.1 Reconfiguration Protocol Properties

In this section, we analyze the properties that we can achieve through our reconfiguration algorithm. In high-level, we do show that the following properties are preserved:
(i)
configuration uniqueness: the configuration sequences in any two processes have identical configuration at any place i,
(ii)
sequence prefix: the configuration sequence witnessed by an operation is a prefix of the sequence witnessed by any succeeding operation, and
(iii)
sequence progress: if the configuration with index i is finalized during an operation, then a configuration j, for \(j\ge i\), will be finalized for a succeeding operation.
The first lemma shows that any two configuration sequences have the same configuration identifiers in the same indexes.
Lemma 9.
For any reconfigurer r that invokes an \({\sf reconfig}(c)\) action in an execution \({\xi }\) of the algorithm, If r chooses to install c in index k of its local \(r.cseq\) vector, then r invokes the \(Cons[k-1].propose(c)\) instance over configuration \(r.cseq[k-1].cfg\).
Proof.
It follows directly from the algorithm.□
Lemma 10.
If a server s sets \(s.nextC\) to \(\langle c,F \rangle\) at some state \(\sigma\) in an execution \({\xi }\) of the algorithm, then \(s.nextC = \langle c,F \rangle\) for any state \(\sigma ^{\prime }\) that appears after \(\sigma\) in \({\xi }\).
Proof.
Notice that a server s updates its \(s.nextC\) variable for some specific configuration \(c_k\) in a state \(\sigma\) only when it receives a write-conf message. This is either the first write-conf message received at s for \(c_k\) (and thus \(nextC=\bot\)), or \(s.nextC = \langle *,P \rangle\) and the message received contains a tuple \(\langle c,F \rangle\). Once the tuple becomes equal to \(\langle c,F \rangle\) then s does not satisfy the update condition for \(c_k\), and hence in any state \(\sigma ^{\prime }\) after \(\sigma\) it does not change \(\langle c,F \rangle\).□
Lemma 11 (Configuration Uniqueness).
For any processes \(p, q\in \mathcal {I}\) and any states \(\sigma _1, \sigma _2\) in an execution \({\xi }\), it must hold that \(\mathbf {c}^{p}_{\sigma _1}[i].cfg=\mathbf {c}^{q}_{\sigma _2}[i].cfg\), \(\forall i\) s.t. \(\mathbf {c}^{p}_{\sigma _1}[i].cfg,\mathbf {c}^{q}_{\sigma _2}[i].cfg\ne \bot\).
Proof.
The lemma holds trivially for \(\mathbf {c}^{p}_{\sigma _1}[0].cfg=\mathbf {c}^{q}_{\sigma _2}[0].cfg=c_0\). So in the rest of the proof we focus in the case where \(i \gt 0\). Let us assume w.l.o.g. that \(\sigma _1\) appears before \(\sigma _2\) in \({\xi }\).
According to our algorithm a process p sets \(p.cseq[i].cfg\) to a configuration identifier c in two cases: (i) either it received c as the result of the consensus instance in configuration \(p.cseq[i-1].cfg\), or (ii) p receives \(s.nextC.cfg = c\) from a server \(s\in p.cseq[i-1].cfg.Servers\). Note here that (i) is possible only when p is a reconfigurer and attempts to install a new configuration. On the other hand (ii) may be executed by any process in any operation that invokes the \({\sf read-config}\) action. We are going to proove this lemma by induction on the configuration index.
Base case: The base case of the lemma is when \(i=1\). Let us first assume that p and q receive \(c_p\) and \(c_q\), as the result of the consensus instance at \(p.cseq[0].cfg\) and \(q.cseq[0].cfg\), respectively. By Lemma 9, since both processes want to install a configuration in \(i=1\), then they have to run \(Cons[0]\) instance over the configuration stored in their local \(cseq[0].cfg\) variable. Since \(p.cseq[0].cfg=q.cseq[0].cfg=c_0\) then both \(Cons[0]\) instances run over the same configuration \(c_0\) and thus by the agreement property they have to decide the same value, say \(c_1\). Hence \(c_p=c_q=c_1\) and \(p.cseq[1].cfg=q.cseq[1].cfg=c_1\).
Let us examine the case now where p or q assign a configuration c they received from some server \(s\in c_0.Servers\). According to the algorithm, only the configuration that has been decided by the consensus instance on \(c_0\) is propagated to the servers in \(c_0.Servers\). If \(c_1\) is the decided configuration, then \(\forall s\in c_0.Servers\) such that \(s.nextC(c_0)\ne \bot\), it holds that \(s.nextC(C_0) = \langle c_1,* \rangle\). So if p or q set \(p.cseq[1].cfg\) or \(q.cseq[1].cfg\) to some received configuration, then \(p.cseq[1].cfg = q.cseq[1].cfg = c_1\) in this case as well.
Hypothesis: We assume that \(\mathbf {c}^{p}_{\sigma _1}[k]=\mathbf {c}^{q}_{\sigma _2}[k]\) for some k, \(k \ge 1\).
Induction Step: We need to show that the lemma holds for \(i=k+1\). If both processes retrieve \(p.cseq[k+1].cfg\) and \(q.cseq[k+1].cfg\) through consensus, then both p and q run consensus over the previous configuration. Since according to our hypothesis \(\mathbf {c}^{p}_{\sigma _1}[k]=\mathbf {c}^{q}_{\sigma _2}[k]\) then both process will receive the same decided value, say \(c_{k+1}\), and hence \(p.cseq[k+1].cfg=q.cseq[k+1].cfg=c_{k+1}\). Similar to the base case, a server in \(c_k.Servers\) only receives the configuration \(c_{k+1}\) decided by the consensus instance run over \(c_k\). So processes p and q can only receive \(c_{k+1}\) from some server in \(c_k.Servers\) so they can only assign \(p.cseq[k+1].cfg=q.cseq[k+1].cfg=c_{k+1}\) at Line A2:8. That completes the proof.□
Lemma 11 showed that any two operations store the same configuration in any cell k of their cseq variable. It is not known however if the two processes discover the same number of configuration ids. In the following lemmas we will show that if a process learns about a configuration in a cell k then it also learns about all configuration ids for every index i, such that \(0\le i\le k-1\).
Lemma 12.
In any execution \({\xi }\) of the algorithm, If for any process \(p\in \mathcal {I}\), \(\mathbf {c}^{p}_{\sigma }[i]\ne \bot\) in some state \(\sigma\) in \({\xi }\), then \(\mathbf {c}^{p}_{\sigma ^{\prime }}[i]\ne \bot\) in any state \(\sigma ^{\prime }\) that appears after \(\sigma\) in \({\xi }\).
Proof.
A value is assigned to \(\mathbf {c}^{p}_{*}[i]\) either after the invocation of a consensus instance, or while executing the \({\sf read-config}\) action. Since any configuration proposed for installation cannot be \(\bot\) (A2:7), and since there is at least one configuration proposed in the consensus instance (the one from p), then by the validity of the consensus service the decision will be a configuration \(c\ne \bot\). Thus, in this case \(\mathbf {c}^{p}_{*}[i]\) cannot be \(\bot\). Also in the \({\sf read-config}\) procedure, \(\mathbf {c}^{p}_{*}[i]\) is assigned to a value different than \(\bot\) according to Line A2:8. Hence, if \(\mathbf {c}^{p}_{\sigma }[i]\ne \bot\) at state \(\sigma\) then it cannot become \(\bot\) in any state \(\sigma ^{\prime }\) after \(\sigma\) in execution \({\xi }\).□
Lemma 13.
Let \(\sigma _1\) be some state in an execution \({\xi }\) of the algorithm. Then for any process p, if \(k = max\lbrace i: \mathbf {c}^{p}_{\sigma _1}[i]\ne \bot \rbrace\), then \(\mathbf {c}^{p}_{\sigma _1}[j]\ne \bot\), for \(0\le j \lt k\).
Proof.
Let us assume to derive contradiction that there exists \(j \lt k\) such that \(\mathbf {c}^{p}_{\sigma _1}[j]=\bot\) and \(\mathbf {c}^{p}_{\sigma _1}[j+1]\ne \bot\). Consider first that \(j = k-1\) and that \(\sigma _1\) is the state immediately after the assignment of a value to \(\mathbf {c}^{p}_{\sigma _1}[k]\), say \(c_k\). Since \(\mathbf {c}^{p}_{\sigma _1}[k]\ne \bot\), then p assigned \(c_k\) to \(\mathbf {c}^{p}_{\sigma _1}[k]\) in one of the following cases: (i) \(c_k\) was the result of the consensus instance, or (ii) p received \(c_k\) from a server during a \({\sf read-config}\) action. The first case is trivially impossible as according to Lemma 9p decides for k when it runs consensus over configuration \(\mathbf {c}^{p}_{\sigma _1}[k-1].cfg\). Since this is equal to \(\bot\), then we cannot run consensus over a non-existent set of processes. In the second case, p assigns \(\mathbf {c}^{p}_{\sigma _1}[k] = c_k\) in Line A1:8. The value \(c_k\) was however obtained when p invoked \({\sf get-next-config}\) on configuration \(\mathbf {c}^{p}_{\sigma _1}[k-1].cfg\). In that action, p sends read-config messages to the servers in \(\mathbf {c}^{p}_{\sigma _1}[k-1].cfg.Servers\) and waits until a quorum of servers replies. Since we assigned \(\mathbf {c}^{p}_{\sigma _1}[k] = c_k\) it means that \({\sf get-next-config}\) terminated at some state \(\sigma ^{\prime }\) before \(\sigma _1\) in \({\xi }\), and thus: (a) a quorum of servers in \(\mathbf {c}^{p}_{\sigma ^{\prime }}[k-1].cfg.Servers\) replied, and (b) there exists a server s among those that replied with \(c_k\). According to our assumption however, \(\mathbf {c}^{p}_{\sigma _1}[k-1] = \bot\) at \(\sigma _1\). So if state \(\sigma ^{\prime }\) is before \(\sigma _1\) in \({\xi }\), then by Lemma 12, it follows that \(\mathbf {c}^{p}_{\sigma ^{\prime }}[k-1] = \bot\). This however implies that p communicated with an empty configuration, and thus no server replied to p. This however contradicts the assumption that a server replied with \(c_k\) to p.
Since any process traverses the configuration sequence starting from the initial configuration \(c_0\), then with a simple induction and similar reasoning we can show that \(\mathbf {c}^{p}_{\sigma _1}[j]\ne \bot\), for \(0\le j\le k-1\).□
We can now move to an important lemma that shows that any read-config action returns an extension of the configuration sequence returned by any previous read-config action. First, we show that the last finalized configuration observed by any read-config action is at least as recent as the finalized configuration observed by any subsequent read-config action.
Lemma 14.
If at a state \(\sigma\) of an execution \({\xi }\) of the algorithm \(\mu (\mathbf {c}^{p}_{\sigma }) = k\) for some process p, then for any element \(0\le j \lt k\), \(\exists Q\in \mathbf {c}^{p}_{\sigma }[j].cfg.Quorums\) such that \(\forall s\in Q, s.nextC(\mathbf {c}^{p}_{\sigma }[j].cfg)= \mathbf {c}^{p}_{\sigma }[j+1]\).
Proof.
This lemma follows directly from the algorithm. Notice that whenever a process assigns a value to an element of its local configuration (Lines A1:8 and A2:17), it then propagates this value to a quorum of the previous configuration (Lines A1:9 and A2:18). So if a process p assigned \(c_j\) to an element \(\mathbf {c}^{p}_{\sigma ^{\prime }}[j]\) in some state \(\sigma ^{\prime }\) in \({\xi }\), then p may assign a value to the \(j+1\) element of \(\mathbf {c}^{p}_{\sigma ^{\prime \prime }}[j+1]\) only after \({\sf put-config}(\mathbf {c}^{p}_{\sigma ^{\prime }}[j-1].cfg,\mathbf {c}^{p}_{\sigma ^{\prime }}[j])\) occurs. During \({\sf put-config}\) action, p propagates \(\mathbf {c}^{p}_{\sigma ^{\prime }}[j]\) in a quorum \(Q\in \mathbf {c}^{p}_{\sigma ^{\prime }}[j-1].cfg.Quorums\). Hence, if \(\mathbf {c}^{p}_{\sigma }[k]\ne \bot\), then p propagated each \(\mathbf {c}^{p}_{\sigma ^{\prime }}[j]\), for \(0\lt j\le k\) to a quorum of servers \(Q\in \mathbf {c}^{p}_{\sigma ^{\prime }}[j-1].cfg.Quorums\). And this completes the proof.□
Lemma 15 (Sequence Prefix).
Let \(\pi _1\) and \(\pi _2\) two completed read-config actions invoked by processes \(p_1, p_2\in \mathcal {I}\) respectively, such that \(\pi _1\rightarrow \pi _2\) in an execution \({\xi }\). Let \(\sigma _1\) be the state after the response step of \(\pi _1\) and \(\sigma _2\) the state after the response step of \(\pi _2\). Then \(\mathbf {c}^{p_1}_{\sigma _1}\preceq _p\mathbf {c}^{p_2}_{\sigma _2}\).
Proof.
Let \(\nu _1 = \nu (\mathbf {c}^{p_1}_{\sigma _1})\) and \(\nu _2 = \nu (\mathbf {c}^{p_2}_{\sigma _2})\). By Lemma 11 for any i such that \(\mathbf {c}^{p_1}_{\sigma _1}[i]\ne \bot\) and \(\mathbf {c}^{p_2}_{\sigma _2}[i]\ne \bot\), then \(\mathbf {c}^{p_1}_{\sigma _1}[i].cfg=\mathbf {c}^{p_2}_{\sigma _2}[i].cfg\). Also from Lemma 13 we know that for \(0\le j\le \nu _1, \mathbf {c}^{p_1}_{\sigma _1}[j] \ne \bot\), and \(0\le j\le \nu _2, \mathbf {c}^{p_2}_{\sigma _2}[j] \ne \bot\). So if we can show that \(\nu _1\le \nu _2\) then the lemma follows.
Let \(\mu = \mu (\mathbf {c}^{p_2}_{\sigma ^{\prime }})\) be the last finalized element which \(p_2\) established at the beginning of the \({\sf read-config}\) action \(\pi _2\) (Line A2:2) at some state \(\sigma ^{\prime }\) before \(\sigma _2\). It is easy to see that \(\mu \le \nu _2\). If \(\nu _1 \le \mu\) then \(\nu _1\le \nu _2\) and the lemma follows. Thus, it remains to examine the case where \(\mu \lt \nu _1\). Notice that since \(\pi _1\rightarrow \pi _2\) then \(\sigma _1\) appears before \(\sigma ^{\prime }\) in execution \({\xi }\). By Lemma 14, we know that by \(\sigma _1\), \(\exists Q\in \mathbf {c}^{p_1}_{\sigma _1}[j].cfg.Quorums\), for \(0\le j \lt \nu _1\), such that \(\forall s\in Q, s.nextC = \mathbf {c}^{p_1}_{\sigma _1}[j+1]\). Since \(\mu \lt \nu _1\), then it must be the case that \(\exists Q\in \mathbf {c}^{p_1}_{\sigma _1}[\mu ].cfg.Quorums\) such that \(\forall s\in Q, s.nextC = \mathbf {c}^{p_1}_{\sigma _1}[\mu +1]\). But by Lemma 11, we know that \(\mathbf {c}^{p_1}_{\sigma _1}[\mu ].cfg= \mathbf {c}^{p_2}_{\sigma ^{\prime }}[\mu ].cfg\). Let \(Q^{\prime }\) be the quorum that replies to the \({\sf read-next-config}\) occurred in \(p_2\), on configuration \(\mathbf {c}^{p_2}_{\sigma ^{\prime }}[\mu ].cfg\). By definition \(Q\cap Q^{\prime }\ne \emptyset\), thus there is a server \(s\in Q\cap Q^{\prime }\) that sends \(s.nextC = \mathbf {c}^{p_1}_{\sigma _1}[\mu +1]\) to \(p_2\) during \(\pi _2\). Since \(\mathbf {c}^{p_1}_{\sigma _1}[\mu +1]\ne \bot\) then \(p_2\) assigns \(\mathbf {c}^{p_2}_{*}[\mu +1]=\mathbf {c}^{p_1}_{\sigma _1}[\mu +1]\), and repeats the process in the configuration \(\mathbf {c}^{p_2}_{*}[\mu +1].cfg\). Since every configuration \(\mathbf {c}^{p_1}_{\sigma _1}[j].cfg\), for \(\mu \le j\lt \nu _1\), has a quorum of servers with \(s.nextC\), then by a simple induction it can be shown that the process will be repeated for at least \(\nu _1-\mu\) iterations, and every configuration \(\mathbf {c}^{p_2}_{\sigma ^{\prime \prime }}[j]=\mathbf {c}^{p_1}_{\sigma _1}[j]\), at some state \(\sigma ^{\prime \prime }\) before \(\sigma _2\). Thus, \(\mathbf {c}^{p_2}_{\sigma _2}[j]=\mathbf {c}^{p_1}_{\sigma _1}[j]\), for \(0\le j\le \nu _1\). Hence \(\nu _1\le \nu _2\) and the lemma follows in this case as well.□
Thus far we focused on the configuration member of each element in cseq. As operations do get into account the status of a configuration, i.e., P or F, in the next lemma we will examine the relationship of the last finalized configuration as detected by two operations. First, we present a lemma that shows the monotonicity of the finalized configurations.
Lemma 16.
Let \(\sigma\) and \(\sigma ^{\prime }\) two states in an execution \({\xi }\) such that \(\sigma\) appears before \(\sigma ^{\prime }\) in \({\xi }\). Then for any process p must hold that \(\mu (\mathbf {c}^{p}_{\sigma })\le \mu (\mathbf {c}^{p}_{\sigma ^{\prime }})\).
Proof.
This lemma follows from the fact that if a configuration k is such that \(\mathbf {c}^{p}_{\sigma }[k].status=F\) at a state \(\sigma\), then p will start any future \({\sf read-config}\) action from a configuration \(\mathbf {c}^{p}_{\sigma ^{\prime }}[j].cfg\) such that \(j\ge k\). But \(\mathbf {c}^{p}_{\sigma ^{\prime }}[j].cfg\) is the last finalized configuration at \(\sigma ^{\prime }\) and hence \(\mu (\mathbf {c}^{p}_{\sigma ^{\prime }})\ge \mu (\mathbf {c}^{p}_{\sigma })\).□
Lemma 17 (Sequence Progress).
Let \(\pi _1\) and \(\pi _2\) two completed read-config actions invoked by processes \(p_1, p_2\in \mathcal {I}\), respectively, such that \(\pi _1\rightarrow \pi _2\) in an execution \({\xi }\). Let \(\sigma _1\) be the state after the response step of \(\pi _1\) and \(\sigma _2\) the state after the response step of \(\pi _2\). Then \(\mu (\mathbf {c}^{p_1}_{\sigma _1})\le \mu (\mathbf {c}^{p_2}_{\sigma _2})\).
Proof.
By Lemma 15 it follows that \(\mathbf {c}^{p_1}_{\sigma _1}\) is a prefix of \(\mathbf {c}^{p_2}_{\sigma _2}\). Thus, if \(\nu _1 = \nu (\mathbf {c}^{p_1}_{\sigma _1})\) and \(\nu _2 = \nu (\mathbf {c}^{p_2}_{\sigma _2})\), \(\nu _1\le \nu _2\). Let \(\mu _1=\mu (\mathbf {c}^{p_1}_{\sigma _1})\), such that \(\mu _1\le \nu _1\), be the last element in \(\mathbf {c}^{p_1}_{\sigma _1}\), where \(\mathbf {c}^{p_1}_{\sigma _1}[\mu _1].status = F\). Let now \(\mu _2=\mu (\mathbf {c}^{p_2}_{\sigma ^{\prime }})\), be the last element which \(p_2\) obtained in Line A1:2 during \(\pi _2\) such that \(\mathbf {c}^{p_2}_{\sigma ^{\prime }}[\mu _2].status = F\) in some state \(\sigma ^{\prime }\) before \(\sigma _2\). If \(\mu _2\ge \mu _1\), and since \(\sigma _2\) is after \(\sigma ^{\prime }\), then by Lemma 16\(\mu _2\le \mu (\mathbf {c}^{p_2}_{\sigma _2})\) and hence \(\mu _1\le \mu (\mathbf {c}^{p_2}_{\sigma _2})\) as well.
It remains to examine the case where \(\mu _2\lt \mu _1\). Process \(p_1\) sets the status of \(\mathbf {c}^{p_1}_{\sigma _1}[\mu _1]\) to F in two cases: (i) either when finalizing a reconfiguration, or (ii) when receiving an \(s.nextC = \langle \mathbf {c}^{p_1}_{\sigma _1}[\mu _1].cfg, F \rangle\) from some server s during a \({\sf read-config}\) action. In both cases \(p_1\) propagates the \(\langle \mathbf {c}^{p_1}_{\sigma _1}[\mu _1].cfg, F \rangle\) to a quorum of servers in \(\mathbf {c}^{p_1}_{\sigma _1}[\mu _1-1].cfg\) before completing. We know by Lemma 15 that since \(\pi _1\rightarrow \pi _2\) then \(\mathbf {c}^{p_1}_{\sigma _1}\) is a prefix in terms of configurations of the \(\mathbf {c}^{p_2}_{\sigma _2}\). So it must be the case that \(\mu _2 \lt \mu _1 \le \nu (\mathbf {c}^{p_2}_{\sigma _2})\). Thus, during \(\pi _2\), \(p_2\) starts from the configuration at index \(\mu _2\) and in some iterations performs \({\sf get-next-config}\) in configuration \(\mathbf {c}^{p_2}_{\sigma _2}[\mu _1-1]\). According to Lemma 11, \(\mathbf {c}^{p_1}_{\sigma _1}[\mu _1-1].cfg = \mathbf {c}^{p_2}_{\sigma _2}[\mu _1-1].cfg\). Since \(\pi _1\) completed before \(\pi _2\), then it must be the case that \(\sigma _1\) appears before \(\sigma ^{\prime }\) in \({\xi }\). However, \(p_2\) invokes the \({\sf get-next-config}\) operation in a state \(\sigma ^{\prime \prime }\) which is either equal to \(\sigma ^{\prime }\) or appears after \(\sigma ^{\prime }\) in \({\xi }\). Thus, \(\sigma ^{\prime \prime }\) must appear after \(\sigma _1\) in \({\xi }\). From that it follows that when the \({\sf get-next-config}\) is executed by \(p_2\) there is already a quorum of servers in \(\mathbf {c}^{p_2}_{\sigma _2}[\mu _1-1].cfg\), say \(Q_1\), that received \(\langle \mathbf {c}^{p_1}_{\sigma _1}[\mu _1].cfg, F \rangle\)from \(p_1\). Since, \(p_2\) waits for replies from a quorum of servers from the same configuration, say \(Q_2\), and since the nextC variable at each server is monotonic (Lemma 10), then there is a server \(s\in Q_{1}\cap Q_{2}\), such that s replies to \(p_2\) with \(s.nextC = \langle \mathbf {c}^{p_1}_{\sigma _1}[\mu _1].cfg, F \rangle\). So, \(\mathbf {c}^{p_2}_{\sigma _2}[\mu _1]\) gets \(\langle \mathbf {c}^{p_1}_{\sigma _1}[\mu _1].cfg, F \rangle\), and hence \(\mu (\mathbf {c}^{p_2}_{\sigma _2})\ge \mu _1\) in this case as well. This completes our proof.□
Using the previous Lemmas we can conclude the main result of this section.
Theorem 18.
Let \(\pi _1\) and \(\pi _2\) two completed read-config actions invoked by processes \(p_1, p_2\in \mathcal {I}\) respectively, such that \(\pi _1\rightarrow \pi _2\) in an execution \({\xi }\). Let \(\sigma _1\) be the state after the response step of \(\pi _1\) and \(\sigma _2\) the state after the response step of \(\pi _2\). Then the following properties hold:
\((a)\) Configuration Consistency: \(\mathbf {c}^{p_2}_{\sigma _2}[i].cfg = \mathbf {c}^{p_1}_{\sigma _1}[i].cfg\), for \(1 \le i \le \nu (\mathbf {c}^{p_1}_{\sigma _1})\),
\((b)\) Sequence Prefix: \(\mathbf {c}^{p_1}_{\sigma _1} \preceq _p \mathbf {c}^{p_2}_{\sigma _2}\), and
\((c)\) Sequence Progress: \(\mu (\mathbf {c}^{p_1}_{\sigma _1}) \le \mu (\mathbf {c}^{p_2}_{\sigma _2})\)
Proof.
Statements \((a)\), \((b),\) and \((c)\) follow from Lemmas 11, 15, and 16.□

6.2 Atomicity Property of Ares

Given the properties satisfied by the reconfiguration algorithm of Ares in any execution, we can now proceed to examine whether our algorithm satisfies the safety (atomicity) conditions. The propagation of the information of the distributed object is achieved using the get-tag, get-data, and put-data actions. We assume that the DAP used satisfies Property 1 as presented in Section 3, and we will show that, given such an assumption, Ares satisfies atomicity.
We begin with a lemma stating that if a reconfiguration operation retrieves a configuration sequence of length k during its \({\sf read-config}\) action, then it installs/finalizes the \(k + 1\) configuration in the global configuration sequence.
Lemma 19.
Let \(\pi\) be a complete reconfiguration operation by a reconfigurer rc in an execution \({\xi }\) of Ares. if \(\sigma _1\) is the state in \({\xi }\) following the termination of the \({\sf read-config}\) action during \(\pi\), then \(\pi\) invokes a \({\sf finalize-config}(\mathbf {c}^{rc}_{\sigma _2})\) at a state \(\sigma _2\) in \({\xi }\), with \(\nu (\mathbf {c}^{rc}_{\sigma _2}) = \nu (\mathbf {c}^{rc}_{\sigma _1}) + 1\).
Proof.
This lemma follows directly from the implementation of the \({\sf reconfig}\) operation. Let \(\pi\) be a reconfiguration operation \({\sf reconfig}(c)\). At first, \(\pi\) invokes a \({\sf read-config}\) to retrieve a latest value of the global configuration sequence, \(\mathbf {c}^{rc}_{\sigma _1}\), in the state \(\sigma _1\) in \({\xi }\). During the \({\sf add-config}\) action, \(\pi\) proposes the addition of c, and appends at the end of \(\mathbf {c}^{rc}_{\sigma _1}\) the decision d of the consensus protocol. Therefore, if \(\mathbf {c}^{rc}_{\sigma _1}\) is extended by \(\langle d, P \rangle\) (Line A 2:17), and hence the \({\sf add-config}\) action returns a configuration sequence \(\mathbf {c}^{rc}_{\sigma _1^{\prime }}\) with length \(\nu (\mathbf {c}^{rc}_{\sigma _1^{\prime }})= \nu (\mathbf {c}^{rc}_{\sigma _1}) + 1\). As \(\nu (\mathbf {c}^{rc}_{\sigma _1^{\prime }}\) does not change during the \({\sf update-config}\) action, then \(\mathbf {c}^{rc}_{\sigma _1^{\prime }}\) is passed to the \({\sf finalize-config}\) action at state \(\sigma _2\), and hence \(\mathbf {c}^{rc}_{\sigma _2}=\mathbf {c}^{rc}_{\sigma _1^{\prime }}\). Thus, \(\nu (\mathbf {c}^{rc}_{\sigma _2})=\nu (\mathbf {c}^{rc}_{\sigma _1^{\prime }})= \nu (\mathbf {c}^{rc}_{\sigma _1}) + 1\) and the lemma follows.□
The next lemma states that only some reconfiguration operation \(\pi\) may finalize a configuration c at index j in a configuration sequence \(p.cseq\) at any process p. To finalize c, the lemma shows that \(\pi\) must witness a configuration sequence such that its last finalized configuration appears at an index \(i\lt j\) in the configuration sequence \(p.cseq\). In other words, reconfigurations always finalize configurations that are ahead of their latest observed final configuration, and it seems like “jumping” from one final configuration to the next.
Lemma 20.
Suppose \({\xi }\) is an execution of Ares. For any state \(\sigma\) in \({\xi }\), if \(\mathbf {c}^{p}_{\sigma }[j].status = F\) for some process \(p\in \mathcal {I}\), then there exists a \({\sf reconfig}\) operation \(\pi\) by a reconfigurer \(rc\in \mathcal {G}\), such that (i) rc invokes \({\sf finalize-config}(\mathbf {c}^{rc}_{\sigma ^{\prime }})\) during \(\pi\) at some state \(\sigma ^{\prime }\) in \({\xi }\), (ii) \(\nu (\mathbf {c}^{rc}_{\sigma ^{\prime }}) = j\), and (iii) \(\mu (\mathbf {c}^{rc}_{\sigma ^{\prime }}) \lt j\).
Proof.
A process sets the status of a configuration c to F in two cases: (i) either during a \({\sf finalize-config}(seq)\) action such that \(\nu (seq) = \langle c,P \rangle\) (Line A2:33), or (ii) when it receives \(\langle c,F \rangle\) from a server s during a \({\sf read-next-config}\) action. Server s sets the status of a configuration c to F only if it receives a message that contains \(\langle c,F \rangle\) (Line A3:10). So, (ii) is possible only if c is finalized during a \({\sf reconfig}\) operation.
Let, w.l.o.g., \(\pi\) be the first reconfiguration operation that finalizes \(\mathbf {c}^{p}_{\sigma }[j].cfg\). To do so, process rc invokes \({\sf finalize-config}(\mathbf {c}^{rc}_{\sigma _1^{\prime }})\) during \(\pi\), at some state \(\sigma ^{\prime }\) that appears before \(\sigma\) in \({\xi }\). By Lemma 11, \(\mathbf {c}^{p}_{\sigma }[j].cfg = \mathbf {c}^{rc}_{\sigma ^{\prime }}[j].cfg\). Since, rc finalizes \(\mathbf {c}^{rc}_{\sigma ^{\prime }}[j]\), then this is the last entry of \(\mathbf {c}^{rc}_{\sigma ^{\prime }}\) and hence \(\nu (\mathbf {c}^{rc}_{\sigma ^{\prime }}) = j\). Also, by Lemma 20 it follows that the \({\sf read-config}\) action of \(\pi\) returned a configuration \(\mathbf {c}^{rc}_{\sigma ^{\prime \prime }}\) in some state \(\sigma ^{\prime \prime }\) that appeared before \(\sigma ^{\prime }\) in \({\xi }\), such that \(\nu (\mathbf {c}^{rc}_{\sigma ^{\prime \prime }}) \lt \nu (\mathbf {c}^{rc}_{\sigma ^{\prime }})\). Since by definition, \(\mu (\mathbf {c}^{rc}_{\sigma ^{\prime \prime }})\le \nu (\mathbf {c}^{rc}_{\sigma ^{\prime \prime }})\), then \(\mu (\mathbf {c}^{rc}_{\sigma ^{\prime \prime }}) \lt j\). However, since only \(\langle c, P \rangle\) is added to \(\mathbf {c}^{rc}_{\sigma ^{\prime \prime }}\) to result in \(\mathbf {c}^{rc}_{\sigma ^{\prime }}\), then \(\mu (\mathbf {c}^{rc}_{\sigma ^{\prime \prime }}) = \mu (\mathbf {c}^{rc}_{\sigma ^{\prime }})\). Therefore, \(\mu (\mathbf {c}^{rc}_{\sigma ^{\prime }}) \lt j\) as well and the lemma follows.□
We now reach an important lemma of this section. By Ares, before a read/write/reconfig operation completes it propagates the maximum tag it discovered by executing the \({\sf put-data}\) action in the last configuration of its local configuration sequence (Lines A2:18, A4:16, A4:38). When a subsequent operation is invoked, it reads the latest configuration sequence by beginning from the last finalized configuration in its local sequence and invoking \({\sf read-data}\) to all the configurations until the end of that sequence. The lemma shows that the latter operation will retrieve a tag, which is higher than the tag used in the \({\sf put-data}\) action of any preceding operation.
Lemma 21.
Let \(\pi _1\) and \(\pi _2\) be two completed read/write/reconfig operations invoked by processes \(p_1\) and \(p_2\) in \(\mathcal {I}\), in an execution \({\xi }\) of Ares, such that, \(\pi _1\rightarrow \pi _2\). If \(c_1.{\sf put-data}(\langle \tau _{\pi _1}, v_{\pi _1} \rangle)\) is the last put-data action of \(\pi _1\) and \(\sigma _2\) is the state in \({\xi }\) after the completion of the first \({\sf read-config}\) action of \(\pi _2\), then there exists a \(c_2.{\sf put-data}(\langle \tau _{},v \rangle)\) action in some configuration \(c_2 = \mathbf {c}^{p_2}_{\sigma _2}[k].cfg\), for \(\mu (\mathbf {c}^{p_2}_{\sigma _2}) \le k \le \nu (\mathbf {c}^{p_2}_{\sigma _2})\), such that (i) it completes in a state \(\sigma ^{\prime }\) before \(\sigma _2\) in \({\xi }\), and (ii) \(\tau _{}\ge \tau _{\pi _1}\).
Proof.
Note that from the definitions of \(\nu (\cdot)\) and \(\mu (\cdot)\), we have \(\mu (\mathbf {c}^{p_2}_{\sigma _2}) \le \nu (\mathbf {c}^{p_2}_{\sigma _2})\). Let \(\sigma _1\) be the state in \({\xi }\) after the completion of \(c_1.{\sf put-data}(\langle \tau _{\pi _1}, v_{\pi _1} \rangle)\) and \(\sigma ^{\prime }_1\) be the state in \({\xi }\) following the response step of \(\pi _1\). Since any operation executes \({\sf put-data}\) on the last discovered configuration then \(c_1\) is the last configuration found in \(\mathbf {c}^{p_1}_{\sigma _1}\), and hence \(c_1 = \mathbf {c}^{p_1}_{\sigma _1}[\nu (\mathbf {c}^{p_1}_{\sigma _1})].cfg\). By Lemma 16 we have \(\mu (\mathbf {c}^{p_1}_{\sigma _1}) \le \mu (\mathbf {c}^{p_1}_{{\sigma }^{\prime }_1})\) and by Lemma 17 we have \(\mu (\mathbf {c}^{p_1}_{{\sigma }^{\prime }_1}) \le \mu (\mathbf {c}^{p_2}_{{\sigma }_2})\), since \(\pi _2\) (and thus its first read-config action) is invoked after \({\sigma }^{\prime }_1\) (and thus after the last read-config action during \(\pi _1\)). Hence, combining the two implies that \(\mu (\mathbf {c}^{p_1}_{\sigma _1}) \le \mu (\mathbf {c}^{p_2}_{{\sigma }_2})\). Now from the last implication and the first statement we have \(\mu (\mathbf {c}^{p_1}_{{\sigma }_1}) \le \nu (\mathbf {c}^{p_2}_{{\sigma }_2})\). Therefore, it remains to examine whether the last finalized configuration witnessed by \(p_2\) appears before or after \(c_1\), i.e.,: \((a)\) \(\mu (\mathbf {c}^{p_2}_{\sigma _2}) \le \nu (\mathbf {c}^{p_1}_{{\sigma }_1})\) and \((b)\) \(\mu (\mathbf {c}^{p_2}_{\sigma _2}) \gt \nu (\mathbf {c}^{p_1}_{{\sigma }_1})\).
Case \((a)\): Since \(\pi _1\rightarrow \pi _2\) then, by Theorem 18, \(\mathbf {c}^{p_2}_{\sigma _2}\) value returned by \({\sf read-config}\) at \(p_2\) during the execution of \(\pi _2\) satisfies \(\mathbf {c}^{p_1}_{\sigma _1} \preceq _p \mathbf {c}^{p_2}_{\sigma _2}\). Therefore, \(\nu (\mathbf {c}^{p_1}_{{\sigma }_1}) \le \nu (\mathbf {c}^{p_2}_{{\sigma }_2})\), and hence in this case \(\mu (\mathbf {c}^{p_2}_{\sigma _2}) \le \nu (\mathbf {c}^{p_1}_{{\sigma }_1}) \le \nu (\mathbf {c}^{p_2}_{{\sigma }_2})\). Since \(c_1\) is the last configuration in \(\mathbf {c}^{p_{1}}_{\sigma _1}\), then it has index \(\nu (\mathbf {c}^{p_1}_{{\sigma }_1})\). So if we take \(c_2 = c_1\) then the \(c_1.{\sf put-data}(\langle \tau _{\pi _1}, v_{\pi _1} \rangle)\) action trivially satisfies both conditions of the lemma as(i) it completes in state \(\sigma _1\) which appears before \(\sigma _2\), and (ii) it puts a pair \(\langle \tau _{}, v \rangle\) such that \(\tau _{}=\tau _{\pi _1}\).
Case \((b)\): This case is possible if there exists a reconfiguration client rc that invokes \({\sf reconfig}\) operation \(\rho\), during which it executes the \({\sf finalize-config}(\mathbf {c}^{rc}_{*})\) that finalized configuration with index \(\nu (\mathbf {c}^{rc}_{*}) = \mu (\mathbf {c}^{p_2}_{\sigma _2})\). Let \(\sigma\) be the state immediately after the \({\sf read-config}\) of \(\rho\). Now, we consider two sub-cases: \((i)\) \(\sigma\) appears before \(\sigma _1\) in \({\xi }\), or \((ii)\) \(\sigma\) appears after \(\sigma _1\) in \({\xi }\).
Subcase \((b)(i)\): Since \({\sf read-config}\) at \(\sigma\) completes before the invocation of last \({\sf read-config}\) of operation \(\pi _1\) then, either \(\mathbf {c}^{rc}_{\sigma } \prec _p \mathbf {c}^{p_1}_{\sigma _1}\), or \(\mathbf {c}^{rc}_{\sigma } = \mathbf {c}^{p_1}_{\sigma _1}\) due to Lemma 15. Suppose \(\mathbf {c}^{rc}_{\sigma } \prec _p \mathbf {c}^{p_1}_{\sigma _1}\), then according to Lemma 19 rc executes \({\sf finalize-config}\) on configuration sequence \(\mathbf {c}^{rc}_{*}\) with \(\nu (\mathbf {c}^{rc}_{*}) = \nu (\mathbf {c}^{rc}_{\sigma }) + 1\). Since \(\nu (\mathbf {c}^{rc}_{*}) = \mu (\mathbf {c}^{p_2}_{\sigma _2})\), then \(\mu (\mathbf {c}^{p_2}_{\sigma _2}) = \nu (\mathbf {c}^{rc}_{\sigma }) + 1\). If however, \(\mathbf {c}^{rc}_{\sigma } \prec _p \mathbf {c}^{p_1}_{\sigma _1}\), then \(\nu (\mathbf {c}^{rc}_{\sigma }) \lt \nu (\mathbf {c}^{p_1}_{\sigma _1})\) and thus \(\nu (\mathbf {c}^{rc}_{\sigma })+1 \le \nu (\mathbf {c}^{p_1}_{\sigma _1})\). This implies that \(\mu (\mathbf {c}^{p_2}_{\sigma _2}) \le \nu (\mathbf {c}^{p_1}_{\sigma _1})\) which contradicts our initial assumption for this case that \(\mu (\mathbf {c}^{p_2}_{\sigma _2}) \gt \nu (\mathbf {c}^{p_1}_{\sigma _1})\). So this sub-case is impossible.
Now suppose, that \(\mathbf {c}^{rc}_{\sigma } = \mathbf {c}^{p_1}_{\sigma _1}\). Then it follows that \(\nu (\mathbf {c}^{rc}_{\sigma }) = \nu (\mathbf {c}^{p_1}_{\sigma _1})\), and that \(\mu (\mathbf {c}^{p_2}_{\sigma _2}) = \nu (\mathbf {c}^{p_1}_{\sigma _1}) + 1\) in this case. Since \(\sigma _1\) is the state after the last \({\sf put-data}\) during \(\pi _1\), then if \(\sigma _1^{\prime }\) is the state after the completion of the last \({\sf read-config}\) of \(\pi _1\) (which follows the put-data), it must be the case that \(\mathbf {c}^{p_1}_{\sigma _1} = \mathbf {c}^{p_1}_{\sigma _1^{\prime }}\). So, during its last \({\sf read-config}\) process \(p_1\) does not read the configuration indexed at \(\nu (\mathbf {c}^{p_1}_{\sigma _1}) + 1\). This means that the \({\sf put-config}\) completes in \(\rho\) at state \({\sigma }_{\rho }\) after \({\sigma }^{\prime }_1\) and the \({\sf update-config}\) operation is invoked at state \(\sigma _{\rho }^{\prime }\) after \(\sigma _{\rho }\) with a configuration sequence \(\mathbf {c}^{rc}_{\sigma _{\rho }^{\prime }}\). During the update operation \(\rho\) invokes \({\sf get-data}\) operation in every configuration \(\mathbf {c}^{rc}_{\sigma _{\rho }^{\prime }}[i].cfg\), for \(\mu (\mathbf {c}^{rc}_{\sigma _{\rho }^{\prime }})\le i\le \nu (\mathbf {c}^{rc}_{\sigma _{\rho }^{\prime }})\). Notice that \(\nu (\mathbf {c}^{rc}_{\sigma _{\rho }^{\prime }})= \mu (\mathbf {c}^{p_2}_{\sigma _2}) = \nu (\mathbf {c}^{p_1}_{\sigma _1}) + 1\) and moreover the last configuration of \(\mathbf {c}^{rc}_{\sigma _{\rho }^{\prime }}\) was just added by \(\rho\) and it is not finalized. From this it follows that \(\mu (\mathbf {c}^{rc}_{\sigma _{\rho }^{\prime }})\lt \nu (\mathbf {c}^{rc}_{\sigma _{\rho }^{\prime }})\), and hence \(\mu (\mathbf {c}^{rc}_{\sigma _{\rho }^{\prime }})\le \nu (\mathbf {c}^{p_1}_{\sigma _1})\). Therefore, \(\rho\) executes \({\sf get-data}\) in configuration \(\mathbf {c}^{rc}_{\sigma _{\rho }^{\prime }}[j].cfg\) for \(j = \nu (\mathbf {c}^{p_1}_{\sigma _1})\). Since \(p_1\) invoked \({\sf put-data}(\langle \tau _{\pi _1},v_{\pi _1} \rangle)\) at the same configuration \(c_1\), and completed in a state \(\sigma _1\) before \(\sigma _{\rho }^{\prime }\), then by C1 of Property 1, it follows that the \({\sf get-data}\) action will return a tag \(\tau _{}\ge \tau _{\pi _1}\). Therefore, the maximum tag that \(\rho\) discovers is \(\tau _{max}\ge \tau _{}\ge \tau _{\pi _1}\). Before invoking the \({\sf finalize-config}\) action, \(\rho\) invokes \(c_1.{\sf put-data}(\langle \tau _{max}, v_{max}) \rangle\). Since \(\nu (\mathbf {c}^{rc}_{\sigma _{\rho }^{\prime }})= \mu (\mathbf {c}^{p_2}_{\sigma _2})\), and since by Lemma 11, then the action \({\sf put-data}\) is invoked in a configuration \(c_2 = \mathbf {c}^{p_2}_{\sigma _2}[j].cfg\) such that \(j=\mu (\mathbf {c}^{p_2}_{\sigma _2})\). Since the \({\sf read-config}\) action of \(\pi _2\) observed configuration \(\mu (\mathbf {c}^{p_2}_{\sigma _2})\), then it must be the case that \(\sigma _2\) appears after the state where the \({\sf finalize-config}\) was invoked and, therefore, after the state of the completion of the \({\sf put-data}\) action during \(\rho\). Thus, in this case, both properties are satisfied and the lemma follows.
Subcase \((b)(ii)\): Suppose in this case that \(\sigma\) occurs in \({\xi }\) after \(\sigma _1\). In this case, the last \({\sf put-data}\) in \(\pi _1\) completes before the invocation of the \({\sf read-config}\) in \(\rho\) in execution \({\xi }\). Now we can argue recursively, \(\rho\) taking the place of operation \(\pi _2\), that \(\mu (\mathbf {c}^{rc}_{{\sigma }}) \le \nu (\mathbf {c}^{rc}_{{\sigma }})\) and therefore, we consider two cases: \((a)\) \(\mu (\mathbf {c}^{rc}_{\sigma }) \le \nu (\mathbf {c}^{p_1}_{{\sigma }_1})\) and \((b)\) \(\mu (\mathbf {c}^{rc}_{\sigma }) \gt \nu (\mathbf {c}^{p_1}_{{\sigma }_1})\). Note that there are a finite number of operations invoked in \({\xi }\) before \(\pi _2\) is invoked, and hence the statement of the lemma can be shown to hold by a sequence of inequalities.□
The following lemma shows the consistency of operations as long as the DAP used satisfies Property 1.
Lemma 22.
Let \(\pi _1\) and \(\pi _2\) denote completed read/write operations in an execution \({\xi }\), from processes \(p_1,p_2\in \mathcal {I}\), respectively, such that \(\pi _1\rightarrow \pi _2\). If \(\tau _{\pi _1}\) and \(\tau _{\pi _2}\) are the local tags at \(p_1\) and \(p_2\) after the completion of \(\pi _1\) and \(\pi _2\) respectively, then \(\tau _{\pi _1} \le \tau _{\pi _2}\); if \(\pi _1\) is a write operation then \(\tau _{\pi _1} \lt \tau _{\pi _2}\).
Proof.
Let \(\langle \tau _{\pi _1}, v_{\pi _1} \rangle\) be the pair passed to the last \({\sf put-data}\) action of \(\pi _1\). Also, let \(\sigma _2\) be the state in \({\xi }\) that follows the completion of the first \({\sf read-config}\) action during \(\pi _2\). Notice that \(\pi _2\) executes a loop after the first \({\sf read-config}\) operation and performs \(c.{\sf get-data}\) (if \(\pi _2\) is a read) or \(c.{\sf get-tag}\) (if \(\pi _2\) is a write) from all \(c =\mathbf {c}^{p_2}_{\sigma _2}[i].cfg\), for \(\mu (\mathbf {c}^{p_2}_{\sigma _2})\le i \le \nu (\mathbf {c}^{p_2}_{\sigma _2})\). By Lemma 21, there exists a \(c^{\prime }.{\sf put-data}(\langle \tau _{},v \rangle)\) action by some operation \(\pi ^{\prime }\) on some configuration \(c^{\prime }=\mathbf {c}^{p_2}_{\sigma _2}[j].cfg\), for \(\mu (\mathbf {c}^{p_2}_{\sigma _2})\le j \le \nu (\mathbf {c}^{p_2}_{\sigma _2})\), that completes in some state \(\sigma ^{\prime }\) that appears before \(\sigma _2\) in \({\xi }\). Thus, the \({\sf get-data}\) or \({\sf get-tag}\) invoked by \(p_2\) on \(\mathbf {c}^{p_2}_{\sigma _2}[j].cfg\), occurs after state \(\sigma _2\) and thus after \(\sigma ^{\prime }\). Since the DAP primitives used satisfy C1 and C2 of Property 1, then the \({\sf get-tag}\) action will return a tag \(\tau _{\pi _2}^{\prime }\) or a \({\sf get-data}\) action will return a pair \(\langle \tau _{\pi _2}^{\prime }, v_{\pi _2}^{\prime } \rangle\), with \(\tau _{\pi _2}^{\prime }\ge \tau _{}\). As \(p_2\) gets the maximum of all the tags returned, then by the end of the loop \(p_2\) will retrieve a tag \(\tau _{max}\ge \tau _{\pi _2}^{\prime }\ge \tau _{}\ge \tau _{\pi _1}\).
If now \(\pi _2\) is a read, it returns \(\langle \tau _{max}, v_{max} \rangle\) after propagating that value to the last discovered configuration. Thus, \(\tau _{\pi _2}\ge \tau _{\pi _1}\). If however \(\pi _2\) is a write, then before propagating the new value the writer increments the maximum timestamp discovered (Line A4:13) generating a tag \(\tau _{\pi _2}\gt \tau _{max}\). Therefore the operation \(\pi _2\) propagates a tag \(\tau _{\pi _2}\gt \tau _{\pi _1}\) in this case.□
And the main result of this section follows:
Theorem 23 (Atomicity).
In any execution \({\xi }\) of Ares, if in every configuration \(c\in {\mathcal {G}_L}\), \({c}.{{\sf get-data}()}\), \({c}.{{\sf put-data}()}\), and \({c}.{{\sf get-tag}()}\) satisfy Property 1, then Ares satisfy atomicity.
As algorithm Ares handles each configuration separately, then we can observe that the algorithm may utilize a different mechanism for the put and get primitives in each configuration. So the following remark:
Remark 24.
Algorithm Ares satisfies atomicity even when the implementaton of the DAPs in two different configurations \(c_1\) and \(c_2\) are not the same, given that the \(c_i.{\sf get-tag}\), \(c_i.{\sf get-data}\), and the \(c_i.{\sf put-data}\) primitives in each \(c_i\) satisfy Property 1.

7 Performance Analysis of Ares

A major challenge in reconfigurable atomic services is to examine the latency of terminating read and write operations, especially when those are invoked concurrently with reconfiguration operations. In this section, we provide an in-depth analysis of the latency of operations in Ares. Additionally, a storage and communication analysis is shown when Ares utilizes the erasure-coding algorithm presented in Section 5, in each configuration.

7.1 Latency Analysis

The idea behind our latency analysis is quite straightforward: we construct the worst-case execution that would allow all concurrent reconfigurations to add their proposed configuration. This leads to the longest configuration sequence that a read/write operation needs to traverse before completing. Thus, given a bounded delay, we compute the delay for each operation and we finally compute how long it is going to take for a read/write operation to catch up in the worst-case and complete.
Liveness (termination) properties cannot be specified for Ares, without restricting asynchrony or the rate of arrival of reconfig operations, or if the consensus protocol never terminates. Here, we provide some conditional performance analysis of the operation, based on latency bounds on the message delivery. We assume that local computations take negligible time and the latency of an operation is due to the delays in the messages exchanged during the execution. We measure delays in time units of some global clock, which is visible only to an external viewer. No process has access to the clock. Let d and D be the minimum and maximum durations taken by messages, sent during an execution of Ares, to reach their destinations. Also, let \(T(\pi)\) denote the duration of an operation (or action) \(\pi\). In the statements that follow, we consider any execution \({\xi }\) of Ares, which contains k reconfig operations. For any configuration c in an execution of Ares, we assume that any \(c.Con.{\sf propose}\) operation, takes at least \(T_{min}(CN)\) time units.
Let us first examine what is the action delays based on the boundaries we assume. It is easy to see that actions put-config, get-next-config perform two message exchanges thus take time \(2d\le T(\phi)\le 2D\). From this we can derive the delay of a read-config action.
Lemma 25.
Let \(\phi\) be a \({\sf read-config}\) operation invoked by a non-faulty reconfiguration client rc, with the input argument and returned values of \(\phi\) as \(\mathbf {c}^{rc}_{\sigma }\) and \(\mathbf {c}^{rc}_{\sigma ^{\prime }}\) respectively. Then the delay of \(\phi\) is: \(4d(\nu (\mathbf {c}^{rc}_{\sigma ^{\prime }})-\mu (\mathbf {c}^{rc}_{\sigma })+1)\le T(\phi)\le 4D(\nu (\mathbf {c}^{rc}_{\sigma ^{\prime }})-\mu (\mathbf {c}^{rc}_{\sigma })+1)\).
From Lemma 25 it is clear that the latency of a \({\sf read-config}\) action depends on the number of configurations installed since the last finalized configuration known to the recon client.
Given the latency of a read-config, we can compute the minimum amount of time it takes for k configurations to be installed.
The following lemma shows the maximum latency of a read or a write operation, invoked by any non-faulty client. From Ares algorithm, the latency of a read/write operation depends on the delays of the DAPs operations. For our analysis we assume that all \({\sf get-data}\), \({\sf get-tag}\) and \({\sf put-data}\) primitives use two phases of communication. Each phase consists of a communication between the client and the servers.
Lemma 26.
Suppose \(\pi\), \(\phi\) and \(\psi\) are operations of the type put-data, get-tag and get-data, respectively, invoked by some non-faulty reconfiguration clients, then the latency of these operations are bounded as follows: \((i)\) \(2d\le T(\pi)\le 2D\); \((ii)\)\(2d\le T(\phi)\le 2D\); and \((iii)\)\(2d\le T(\psi)\le 2D\).
In the following lemma, we estimate the time taken for a read or a write operation to complete, when it discovers k configurations between its invocation and response steps.
Lemma 27.
Consider any execution of Ares where at most k reconfiguration operations are invoked. Let \(\sigma _s\) and \(\sigma _e\) be the states before the invocation and after the completion step of a read/write operation \(\pi\), in some fair execution \({\xi }\) of Ares. Then we have \(T(\pi)\le 6D\left(k+2\right)\) to complete.
Proof.
Let \(\sigma _s\) and \(\sigma _e\) be the states before the invocation and after the completion step of a read/write operation \(\pi\) by p, respectively, in some execution \({\xi }\) of Ares. By algorithm examination we can see that any read/write operation performs the following actions in this order: \((i)\) read-config, \((ii)\) get-data (or get-tag), \((iii)\) put-data, and \((iv)\) read-config. Let \(\sigma _1\) be the state when the first read-config, denoted by \({\sf read-config}_1\), action terminates. By Lemma 25 the action will take time:
\begin{equation*} T({\sf read-config}_1) \le 4D(\nu (\mathbf {c}^{p}_{\sigma _1})-\mu (\mathbf {c}^{p}_{\sigma _s})+1). \end{equation*}
The \({\sf get-data}\) action that follows the read-config (Lines Algorithm 4:34-35) also took at most \((\nu (\mathbf {c}^{p}_{\sigma _1})-\mu (\mathbf {c}^{p}_{\sigma _s})+1)\) time units, given that no new finalized configuration was discovered by the read-config action. Finally, the put-data and the second read-config actions of \(\pi\) may be invoked at most \((\nu (\mathbf {c}^{p}_{\sigma _e})-\nu (\mathbf {c}^{p}_{\sigma _1})+1)\) times, given that the read-config action discovers one new configuration every time it runs. Merging all the outcomes, the total time of \(\pi\) can be at most:
\[\begin{eqnarray*} T(\pi) & \le & 4D(\nu (\mathbf {c}^{p}_{\sigma _1})-\mu (\mathbf {c}^{p}_{\sigma _s})+1) + 2D(\nu (\mathbf {c}^{p}_{\sigma _1})-\mu (\mathbf {c}^{p}_{\sigma _s})+1) + (4D+2D)(\nu (\mathbf {c}^{p}_{\sigma _e})-\nu (\mathbf {c}^{p}_{\sigma _1})+1) \\ & \le & 6D\left[\nu (\mathbf {c}^{p}_{\sigma _e}) - \mu (\mathbf {c}^{p}_{\sigma _s})+2\right] \le 6D(k+1), \end{eqnarray*}\]
where \(\nu (\mathbf {c}^{p}_{\sigma _e}) - \mu (\mathbf {c}^{p}_{\sigma _s})\le k + 1\) since there can be at most k new configurations installed. and the result of the lemma follows.□
It remains now to examine the conditions under which a read/write operation may “catch up” with an infinite number of reconfiguration operations. For the sake of a worst-case analysis, we will assume that reconfiguration operations suffer the minimum delay d, whereas read and write operations suffer the maximum delay D in each message exchange. We first show how long it takes for k configurations to be installed.
Lemma 28.
Let \(\sigma\) be the last state of a fair execution of Ares, \({\xi }\). Then k configurations can be installed to \(\mathbf {c}^{}_{\sigma }\), in time \(T(k) \ge 4d\sum _{i=1}^{k}i+ k\left(T_{min}(CN)+2d\right)\) time units.
Proof.
In Ares a reconfig operation has four phases: \((i)\) \({\sf read-config}(cseq)\), reads the latest configuration sequence, \((ii)\) \({{\text add-config}}(cseq, c)\), attempts to add the new configuration at the end of the global sequence \(\mathcal {G}_L\), \((iii)\) \({{\text update-config}}(cseq)\), transfers the knowledge to the added configuration, and \((iv)\) \({{\text finalize-config}}(cseq)\) finalizes the added configuration. So, a new configuration is appended to the end of the configuration sequence (and it becomes visible to any operation) during the add-config action. In turn, the add-config action, runs a consensus algorithm to decide on the added configuration and then invokes a put-config action to add the decided configuration. Any operation that is invoked after the put-config action observes the newly added configuration.
Notice that when multiple reconfigurations are invoked concurrently, then it might be the case that all participate to the same consensus instance and the configuration sequence is appended by a single configuration. The worst-case scenario happens when all concurrent reconfigurations manage to append the configuration sequence by their configuration. In brief, this is possible when the read-config action of each reconfig operation appears after the put-config action of another reconfig operation.
More formally we can build an execution where all reconfig operations append their configuration in the configuration sequence. Consider the partial execution \({\xi }\) that ends in a state \(\sigma\). Suppose that every process \(p\in \mathcal {I}\) knows the same configuration sequence, \(\mathbf {c}^{p}_{\sigma }=\mathbf {c}^{}_{\sigma }\). Also, let the last finalized operation in \(\mathbf {c}^{}_{\sigma }\) be the last configuration of the sequence, e.g., \(\mu (\mathbf {c}^{}_{\sigma }) = \nu (\mathbf {c}^{}_{\sigma })\). Notice that \(\mathbf {c}^{}_{\sigma }\) can also be the initial configuration sequence \(\mathbf {c}^{p}_{\sigma _0}\). We extend \({\xi }_0\) by a series of reconfig operations, such that each reconfiguration \(rc_i\) is invoked by a reconfigurer \(r_i\) and attempts to add a configuration \(c_i\). Let \(rc_1\) be the first reconfiguration that performs the following actions without being concurrent with any other reconfig operation:
read-config starting from \(\mu (\mathbf {c}^{}_{\sigma })\)
add-config completing both the consensus proposing \(c_1\) and the \({\sf put-config}\) action writing the decided configuration
Since \(rc_1\) is not concurrent with any other \({\sf reconfig}\) operation, then is the only process to propose a configuration in \(\mu (\mathbf {c}^{}_{\sigma })\), and hence by the consensus algorithm properties, \(c_1\) is decided. Thus, \(\mathbf {c}^{}_{\sigma }\) is appended by a tuple \(\langle c_1,P \rangle\).
Let now reconfiguration \(rc_2\) be invoked immediately after the completion of the \({\sf add-config}\) action from \(rc_1\). Since the local sequence at the beginning of \(rc_2\) is equal to \(\mathbf {c}^{}_{\sigma }\), then the \({\sf read-config}\) action of \(rc_2\) will also start from \(\mu (\mathbf {c}^{}_{\sigma })\). Since \(rc_1\) already propagated \(c_1\) to \(\mu (\mathbf {c}^{}_{\sigma })\) during is \({\sf put-config}\) action, then \(rc_2\) will discover \(c_1\) during the first iteration of its \({\sf read-config}\) action, and thus it will repeat the iteration on \(c_1\). Configuration \(c_1\) is the last in the sequence and thus the \({\sf read-config}\) action of \(rc_2\) will terminate after the second iteration. Following the \({\sf read-config}\) action, \(rc_2\) attempts to add \(c_2\) in the sequence. Since \(rc_1\) is the only reconfiguration that might be concurrent with \(rc_2\), and since \(rc_1\) already completed consensus in \(\mu (\mathbf {c}^{}_{\sigma })\), then \(rc_2\) is the only operation to run consensus in \(c_1\). Therefore, \(c_2\) is accepted and \(rc_2\) propagates \(c_2\) in \(c_1\) using a \({\sf put-config}\) action.
So in general we let configuration \(rc_i\) to be invoked after the completion of the \({\sf add-config}\) action from \(rc_{i-1}\). As a result, the \({\sf read-config}\) action of \(rc_i\) performs i iterations, and the configuration \(c_i\) is added immediately after configuration \(c_{i-1}\) in the sequence. Figure 2 illustrates our execution construction for the reconfiguration operations.
Fig. 2.
Fig. 2. Successful reconfig operations.
It is easy to notice that such execution results in the worst-case latency for all the reconfiguration operations \(rc_1, rc_2,\ldots , rc_i\). As by Lemma 25, a read-config action takes at least 4d time to complete, then as also seen in Figure 2, k reconfigs may take time \(T(k) \ge \sum _{i=1}^{k}\left[4d*i+ \left(T_{min}(CN)+2d\right)\right]\). Therefore, it will take time \(T(k) \ge 4d\sum _{i=1}^{k}i+ k\left(T_{min}(CN)+2d\right)\) and the lemma follows.□
The following theorem is the main result of this section, in which we define the relationship between \(T_{min}(CN)\), \(d,\) and D so to guarantee that any read or write issued by a non-faulty client always terminates.
Theorem 29.
Suppose \(T_{min}(CN) \ge 3(6D-d)\), then any read or write operation \(\pi\) completes in any execution \({\xi }\) of Ares for any number of reconfiguration operations in \({\xi }\).
Proof.
By Lemma 28, k configurations may be installed in: \(T(k) \ge 4d\sum _{i=1}^{k}i+ k\left(T_{min}(CN)+2d\right)\). Also by Lemma 27, we know that operation \(\pi\) takes at most \(T(\pi)\le 6D\left(\nu (\mathbf {c}^{p}_{\sigma _e}) - \mu (\mathbf {c}^{p}_{\sigma _s})+2\right)\). Assuming that \(k=\nu (\mathbf {c}^{p}_{\sigma _e}) - \mu (\mathbf {c}^{p}_{\sigma _s})\), the total number of configurations observed during \(\pi\), then \(\pi\) may terminate before a \(k+1\) configuration is added in the configuration sequence if \(6D(k+2) \le 4d\sum _{i=1}^{k}i+ k\left(T_{min}(CN)+2d\right)\) then we have \(d \ge \frac{3D}{k}-\frac{T_{min}(CN)}{2(k+2)}\). And that completes the lemma.□

7.2 Storage and Communication Costs for Ares

Storage and Communication costs for Ares highly depend on the DAP that we use in each configuration. For our analysis, we assume that each configuration utilizes the algorithms and the DAPs presented in Section 5.
Recall that by our assumption, the storage cost counts the size (in bits) of the coded elements stored in the variable List at each server. We ignore the storage cost due to meta-data. For communication cost, we measure the bits sent on the wire between the nodes.
Lemma 30.
The worst-case total storage cost of Algorithm 5 is \((\delta +1)\frac{n}{k}\).
Proof.
The maximum number of (tag, coded-element) pair in the List is \(\delta +1\), and the size of each coded element is \(\frac{1}{k}\) while the tag variable is a metadata and therefore, not counted. So, the total storage cost is \((\delta +1)\frac{n}{k}\).□
We next state the communication cost for the write and read operations in Aglorithm 5. Once again, note that we ignore the communication cost arising from exchange of meta-data.
Lemma 31.
The communication cost associated with a successful write operation in Algorithm 5 is at most \(\frac{n}{k}\).
Proof.
During read operation, in the \({\sf get-tag}\) phase, the servers respond with their highest tags variables, which are metadata. However, in the \({\sf put-data}\) phase, the reader sends each server the coded elements of size \(\frac{1}{k}\) each, and hence the total cost of communication for this is \(\frac{n}{k}\). Therefore, we have the worst case communication cost of a write operation is \(\frac{n}{k}\).□
Lemma 32.
The communication cost associated with a successful read operation in Algorithm 5 is at most \((\delta +2)\frac{n}{k}\).
Proof.
During read operation, in the \({\sf get-data}\) phase the servers respond with their List variables and hence each such list is of size at most \((\delta +1)\frac{1}{k}\), and then counting all such responses give us \((\delta +1)\frac{n}{k}\). In the \({\sf put-data}\) phase, the reader sends each server the coded elements of size \(\frac{1}{k}\) each, and hence the total cost of communication for this is \(\frac{n}{k}\). Therefore, we have the worst-case communication cost of a read operation is \((\delta +2) \frac{n}{k}\).□
From the above Lemmas we get.
Theorem 33.
The Ares algorithm has: (i) storage cost \((\delta +1)\frac{n}{k}\), (ii) communication cost for each write at most to \(\frac{n}{k}\), and (iii) communication cost for each read at most \((\delta +2)\frac{n}{k}\).

8 Flexibility of DAPs

In this section, we argue that various implementations of DAPs can be used in Ares. In fact, via reconfig operations, one can implement a highly adaptive atomic DSS: replication-based can be transformed into erasure-code-based DSS; increase or decrease the number of storage servers; study the performance of the DSS under various code parameters, and so on. The insight to implementing various DAPs comes from the observation that the simple algorithmic template A (see Algorithm 1) for reads and writes protocol combined with any implementation of DAPs, satisfying Property 1 gives rise to a MWMR atomic memory service. Moreover, the read and writes operations terminate as long as the implemented DAPs complete.
A read operation in A performs \({c}.{{\sf get-data}()}\) to retrieve a tag-value pair, \(\langle \tau _{},v \rangle\) from a configuration c, and then it performs a \(c.{\sf put-data}(\langle \tau _{},v \rangle)\) to propagate that pair to the configuration c. A write operation is similar to the read but before performing the \({\sf put-data}\) action it generates a new tag which associates with the value to be written. The following result shows that A is atomic and live, if the DAPs satisfy Property 1 and live.
Theorem 34 (Atomicity of Template A)
Suppose the DAP implementation satisfies the consistency properties C1 and C2 of Property 1 for a configuration \(c\in \mathcal {C}\). Then any execution \({\xi }\) of algorithm A in configuration c is atomic and live if each DAP invocation terminates in \({\xi }\) under the failure model \(c.\mathcal {F}\).
Proof.
We prove the atomicity by proving properties A1, A2 and A3 presented in Section 2 for any execution of the algorithm.
Property A1: Consider two operations \(\phi\) and \(\pi\) such that \(\phi\) completes before \(\pi\) is invoked. We need to show that it cannot be the case that \(\pi \prec \phi\). We break our analysis into the following four cases:
Case \((a)\): Both \(\phi\) and \(\pi\) are writes. The \({c}.{{\sf put-data}(*)}\) of \(\phi\) completes before \(\pi\) is invoked. By property C1 the tag \(\tau _{\pi }\) returned by the \({c}.{{\sf get-data}()}\) at \(\pi\) is at least as large as \(\tau _{\phi }\). Now, since \(\tau _{\pi }\) is incremented by the write operation then \(\pi\) puts a tag \(\tau _{\pi }^{\prime }\) such that \(\tau _{\phi } \lt \tau _{\pi }^{\prime }\) and hence we cannot have \(\pi \prec \phi\).
Case \((b)\): \(\phi\) is a write and \(\pi\) is a read. In execution \({\xi }\) since \({c}.{{\sf put-data}(\langle t_{\phi }, * \rangle)}\) of \(\phi\) completes before the \({c}.{{\sf get-data}()}\) of \(\pi\) is invoked, by property C1 the tag \(\tau _{\pi }\) obtained from the above \({c}.{{\sf get-data}()}\) is at least as large as \(\tau _{\phi }\). Now \(\tau _{\phi } \le \tau _{\pi }\) implies that we cannot have \(\pi \prec \phi\).
Case \((c)\): \(\phi\) is a read and \(\pi\) is a write. Let the id of the writer that invokes \(\pi\) we \(w_{\pi }\). The \({c}.{{\sf put-data}(\langle \tau _{\phi }, * \rangle)}\) call of \(\phi\) completes before \({c}.{{\sf get-tag}()}\) of \(\pi\) is initiated. Therefore, by property C1\({\sf get-tag}(c)\) returns \(\tau _{}\) such that, \(\tau _{\phi } \le \tau _{}\). Since \(\tau _{\pi }\) is equal to \(inc(\tau _{})\) by design of the algorithm, hence \(\tau _{\pi } \gt \tau _{\phi }\) and we cannot have \(\pi \prec \phi\).
Case \((d)\): Both \(\phi\) and \(\pi\) are reads. In execution \({\xi }\) the \({c}.{{\sf put-data}(\langle t_{\phi }, * \rangle)}\) is executed as a part of \(\phi\) and completes before \({c}.{{\sf get-data}()}\) is called in \(\pi\). By property C1 of the data-primitives, we have \(\tau _{\phi } \le \tau _{\pi }\) and hence we cannot have \(\pi \prec \phi\).
Property A2: Note that because the tag set \({\mathcal {T}}\) is well-ordered we can show that A2 holds by first showing that every write has a unique tag. This means that any two pair of writes can be ordered. Note that a read can be ordered w.r.t. any write operation trivially if the respective tags are different, and by definition, if the tags are equal the write is ordered before the read.
Observe that two tags generated from different writers are necessarily distinct because of the id component of the tag. Now if the operations, say \(\phi\) and \(\pi\) are writes from the same writer then, by well-formedness property, the second operation will witness a higher integer part in the tag by property C1, and since the \({c}.{{\sf get-tag}()}\) is followed by \({c}.{{\sf put-data}(*)}\). Hence \(\pi\) is ordered after \(\phi\).
Property A3: By C2 the \({c}.{{\sf get-data}()}\) may return a tag \(\tau _{}\), only when there exists an operation \(\pi\) that invoked a \({c}.{{\sf put-data}(\langle \tau _{},* \rangle)}\). Otherwise, it returns the initial value. Since a write is the only operation to put a new tag \(\tau _{}\) in the system then Property A3 follows from C2.□

8.1 Representing Known Algorithms in Terms of DAPs

A number of known tag-based algorithms that implement atomic read/write objects (e.g., ABD [11] and Fast [21]), can be expressed in terms of DAP. In this subsection, we demonstrate how we can transform the very celebrated ABD algorithm [11].
mw ABD Algorithm. The multi-writer version of the ABD can be transformed to the generic algorithm Template A. Algorithm 8 illustrates the three DAP for the ABD algorithm. The \({\sf get-data}\) primitive encapsulates the query phase of mwABD, while the \({\sf put-data}\) primitive encapsulates the propagation phase of the algorithm.
Let us now examine if the primitives satisfy properties C1 and C2 of Property 1. We begin with a lemma that shows the monotonicity of the tags at each server.
Lemma 35.
Let \(\sigma\) and \(\sigma ^{\prime }\) two states in an execution \({\xi }\) such that \(\sigma\) appears before \(\sigma ^{\prime }\) in \({\xi }\). Then for any server \(s\in \mathcal {S}\) it must hold that \(s.tag|_{\sigma }\le s.tag|_{\sigma ^{\prime }}\).
Proof.
According to the algorithm, a server s updates its local tag-value pairs when it receives a message with a higher tag. So if \(s.tag|_{\sigma }=\tau\) then in a state \(\sigma ^{\prime }\) that appears after \(\sigma\) in \({\xi }\), \(s.tag|_{\sigma ^{\prime }}\ge \tau\).□
In the following two lemmas we show that property C1 is satisfied, that is if a \({\sf put-data}\) action completes, then any subsequent \({\sf get-data}\) and \({\sf get-tag}\) actions will discover a higher tag than the one propagated by that \({\sf put-data}\) action.
Lemma 36.
Let \(\phi\) be a \({c}.{{\sf put-data}(\langle \tau ,v \rangle)}\) action invoked by \(p_1\) and \(\gamma\) be a \({c}.{{\sf get-tag}()}\) action invoked by \(p_2\) in a configuration c, such that \(\phi \rightarrow \gamma\) in an execution \({\xi }\) of the algorithm. Then \(\gamma\) returns a tag \(\tau _\gamma \ge \tau\).
Proof.
The lemma follows from the intersection property of quorums. In particular, during the \({c}.{{\sf put-data}(\langle \tau ,v \rangle)}\) action, \(p_1\) sends the pair \(\langle \tau ,v \rangle\) to all the servers in \(c.Servers\) and waits until all the servers in a quorum \(Q_i\in c.Quorums\) reply. When those replies are received then the action completes.
During a \({c}.{{\sf get-data}()}\) action on the other hand, \(p_2\) sends query messages to all the servers in \(c.Servers\) and waits until all servers in a quorum \(Q_j\in c.Quorums\) (not necessarily different than \(Q_i\)) reply. By definition \(Q_i \cap Q_j\ne \emptyset\), thus any server \(s\in Q_i\cap Q_j\) reply to both \(\phi\) and \(\gamma\) actions. By Lemma 35 and since s received a tag \(\tau\), then s replies to \(p_2\) with a tag \(\tau _s\ge \tau\). Since \(\gamma\) returns the maximum tag it discovers then \(\tau _\gamma \ge \tau _s\). Therefore \(\tau _\gamma \ge \tau\) and this completes the proof.□
With similar arguments and given that each value is associated with a unique tag then we can show the following lemma.
Lemma 37.
Let \(\pi\) be a \({c}.{{\sf put-data}(\langle \tau ,v \rangle)}\) action invoked by \(p_1\) and \(\phi\) be a \({c}.{{\sf get-data}()}\) action invoked by \(p_2\) in a configuration c, such that \(\pi \rightarrow \phi\) in an execution \({\xi }\) of the algorithm. Then \(\phi\) returns a tag-value \(\langle \tau _\phi , v_\phi \rangle\) such that \(\tau _\phi \ge \tau\).
Finally we can now show that property C2 also holds.
Lemma 38.
If \(\phi\) is a \({c}.{{\sf get-data}()}\) that returns \(\langle \tau _{\pi }, v_\pi \rangle \in {\mathcal {T}}\times {\mathcal {V}}\), then there exists \(\pi\) such that \(\pi\) is a \({c}.{{\sf put-data}(\langle \tau _{\pi }, v_{\pi } \rangle)}\) and \(\phi \not\rightarrow \pi\).
Proof.
This follows from the facts that (i) servers set their tag-value pair to a pair received by a \({\sf put-data}\) action, and (ii) a \({\sf get-data}\) action returns a tag-value pair that it received from a server. So if a \({c}.{{\sf get-data}()}\) operation \(\phi\) returns a tag-value pair \(\langle \tau _{\pi }, v_\pi \rangle\), there should be a server s that replied to that operation with \(\langle \tau _{\pi }, v_\pi \rangle\), and s received \(\langle \tau _{\pi }, v_\pi \rangle\) from some \({c}.{{\sf put-data}(\langle \tau _{\pi }, v_\pi \rangle)}\) action, \(\pi\). Thus, \(\pi\) can proceed or be concurrent with \(\phi\), and hence \(\phi \not\rightarrow \pi\).□

9 Experimental Evaluation

The theoretical findings suggest that Ares is an algorithm to provide robustness and flexibility on shared memory implementations, without sacrificing strong consistency. In this section, we present a PoC implementation of Ares and we run preliminary experiments to get better insight on the efficiency and adaptiveness of Ares. In particular, our experiments measure the latency of each read, write, and reconfig operations, and examine the persistence of consistency even when the service is reconfigured between configurations that add/remove servers and utilize different shared memory algorithms.

9.1 Experimental Testbed

We ran experiments on two different setups: (i) simulated locally on a single machine, and (ii) on a LAN. Both type of experiments run on Emulab [2], an emulated WAN environment testbed used for developing, debugging, and evaluating the systems. We used nodes with two 2.4 GHz 64-bit 8-Core E5-2630 “Haswell” processors, \(64 \,\mathrm{G}B\) RAM, with \(1 \,\mathrm{G}B\) and \(10 \,\mathrm{G}B\) NICs. In both setups we used an external implementation of Raft[45] consensus algorithms, which was used for the service reconfiguration (line 16 of Algorithm 2) and was deployed on top of small RPi devices. Small devices introduced further delays in the system, reducing the speed of reconfigurations and creating harsh conditions for longer periods in the service. The Python implementation of Raft used for consensus is PySyncObj [5]. Some modifications were done to allow the execution of Raft in the Ares environment. We built an HTTP API for the management of the Raft subsystem. A reconfigurer can propose a configuration at a particular index in the configuration sequence by sending a POST request to the url of each Raft node, and receives a response from the RAFT on which configuration is decided for that index.
Local Experimental Setup: The local setup was used to have access to a global synchronized clock (the clock of the local machine) in order to examine whether our algorithm preserves global ordering and hence atomicity even when using different algorithms between configurations. Therefore, all the instances are hosted on the same physical machine avoiding the skew between computer clocks in a distributed system. Furthermore, the use of one clock guarantees that when an event occurs after another, it will assign a later time.
Distributed Experimental Setup: The distributed experiments in Emulab enabled the examination of the performance of the algorithm in a close to real environment. For the deployment and remote execution of the experimental tasks on the Emulab, we used Ansible Playbooks [1]. All physical nodes were placed on a single LAN using a DropTail queue without delay or packet loss. Each physical machine runs one server or client process. This guarantees a fair communication delay between a client and a server node.
Node Types: In all experiments, we use four distinct types of nodes, writers, readers, reconfigurers, and servers. Their main role is listed below:
writer \(w \in W \subseteq C\) : a client that sends write requests to all servers and waits for a quorum of the servers to reply.
reader \(r \in R \subseteq C\): a client that sends read requests to servers and waits for a quorum of the servers to reply.
reconfigurer \(g \in G \subseteq C\): a client that sends reconfiguration requests to servers and waits for a quorum of the servers to reply.
server \(s \in S\): a server listens for read, write and reconfiguration requests, it updates its object replica according to the atomic shared memory and replies to the process that originated the request.
Performance Metric: The metric for evaluating the algorithms is the operational latency. This includes both communication and computational delays. The operation latency is computed as the average of all clients’ average operation latencies. For better estimations, each experiment in every scenario was repeated six times. In the graphs, we use error bars to illustrate the standard error of the mean (SEM) from the six repeated experiments.

9.2 Experimental Scenarios

In this section, we describe the scenarios we constructed and the settings for each of them. In our scenarios, we constructed the \(DAP_{s}\) and used two different atomic storage algorithms in Ares: (i) the erasure coding-based algorithm presented in Section 5, and (ii) the ABD algorithm (see Section 8.1).
Implementation of \(DAP_{s}\): Clients initialize the appropriate configuration objects to handle any request. Notice that the client creates a configuration object when it is the first time that the client requests an operation or when doing a reconfiguration operation. Once the configuration object is initialized, it is stored on the client cseq and it is retrieved directly on any subsequent request from the client. Therefore, the \(DAP_{s}\) procedures are called from a configuration object. The asynchronous communication between components is achieved by using DEALER and ROUTER sockets, from the ZeroMQ Python library [6].
Erasure Coding: The type of erasure coding we use is (\(n,k\))-Reed-Solomon code, which guarantees that any k of n coded fragments is enough to reassemble the original object. The parameter k is the number of encoded data fragments, n is the total number of servers and m is the number of parity fragments, i.e., \(n-k\). A high number of k and consequently a small number of m means less redundancy with the system tolerating fewer failures. When \(k=1,\) we essentially converge to replication. In practice, the \({\sf get-data}\) and \({\sf put-data}\) functions from Algorithm 5 integrate the standard Reed–Solomon implementation provided by liberasurecode from the PyEClib Python library [4].
Fixed Parameters: In all scenarios, the number of servers is fixed to 10. The number of writers and the value of delta are set to 5; delta being the maximum number of concurrent put-data operations. The parity value of the EC is set to 2 in order to minimize the redundancy, leading to 8 data servers and 2 parity servers. It is worth mentioning that the quorum size of the EC algorithm is \(\lceil \frac{10+8}{2} \rceil =9\), while the quorum size of ABD algorihtm is \(\lfloor \frac{10}{2} \rfloor +1=6\). In relation to the EC algorithm, we can conclude that the parameter k is directly proportional to the quorum size. But as the value of k and quorum size increase, the size of coded elements decreases.
Distributed Experiments: For the distributed experiments we use a stochastic invocation scheme in which readers and writers pick a random time uniformly distributed (discrete) between intervals to invoke their next operations. Respectively the intervals are \([1...rInt]\) and \([1..wInt]\), where \(rInt, wInt = 2sec\). In total, each writer performs 60 writes and each reader 60 reads. The reconfigurer invokes its next operation every 15sec and performs a total of six reconfigurations. The intervals are set within these values in order to generate a continuous flow of operations and stress the concurrency in the system. Note that these values are not based on any real world scenario.
In particular, we present six types of scenarios:.
File Size Scalability (Emulab): The first scenario is made to evaluate how the read and write latencies are affected by the size of the shared object. There are two separated runs, one for each examined storage algorithm. The file size is doubled from \(1 \,\mathrm{M}B\) to \(128 \,\mathrm{M}B\). The number of readers is fixed to 5, without any reconfigurers.
Reader Scalability (Emulab): This scenario is constructed to compare the read and write latency of the system with two different storage algorithms, while the readers increase. In particular, we execute two separate runs, one for each storage algorithm. We use only one reconfigurer which requests recon operations that lead to the same shared memory emulation and server nodes. The size of the file used is \(4 \,\mathrm{M}B\).
Changing Reconfigurations (Emulab): In this scenario, we evaluate how the read and write latencies are affected when increasing the number of readers, while also changing the storage algorithm. We run two different runs which differ in the way the reconfigurer chooses the next storage algorithm: (i) the reconfigurer chooses randomly between the two storage algorithms, and (ii) the reconfigurer switches between the two storage algorithms. The size of the file used, in both scenarios, is \(4 \,\mathrm{M}B\).
k Scalability (Emulab, EC only): In this scenario, we examine the read and write latencies with different numbers of k (a parameter of Reed-Solomon). We increase the k of the EC algorithm from 1 to 9. The number of readers is fixed to 5, without any reconfigurers. The size of the file used is \(4 \,\mathrm{M}B\).
Changing the number of Reads/Writes (Emulab): In these scenarios, we examine the read and write latencies with different numbers of read and write operations respectively. We change the number of reads/writes that each reader/writer performs, from 10 to 60, increasing by 10. We calculate all possible pairs of writes and reads. The number of readers is fixed to 5. The reconfigurer switches between the two storage algorithms. The size of the file used is \(4 \,\mathrm{M}B\)
Consistency Persistence (Local): In this scenario, we run multiple client operations in order to check if the data is consistent across servers. The number of readers is set to 5. The readers and writers invoke their next operations without any time delay, while the reconfigurer waits 15sec for the next invocation. We run two different scenarios which differ in the reconfigurations. In both scenarios, the reconfigurer switches between the two storage algorithms. In the second scenario, the reconfigurer changes concurrently the quorum of servers. In total, each writer performs 500 writes, each reader 500 reads and the reconfigurer 50 reconfigurations. The size of the file used is \(4 \,\mathrm{M}B\).

9.3 Experimental Results

In this section, we present and explain the evaluation results of each scenario. As a general observation, the Ares algorithm with the EC storage provides data redundancy with a lower communicational and storage cost compared to the ABD storage that uses a strict replication technique.
File Size Scalability Results: Figure 3(a) shows the results of the file size scalability experiments. The read and write latencies of both storage algorithms remain at low levels until \(16 \,\mathrm{M}B\). In bigger sizes we observe the latencies of all operations to grow significantly. It is worth noting that the fragmentation applied by the EC algorithm, benefits its write operations which follow a slower increasing curve than the rest of the operations. From the rest, the reads seem to suffer the worst delay hit, as they are engaged in more communication phases. Nevertheless, the larger messages sent by ABD result in slower read operations. We noticed that EC has lower SEM values than ABD, which indicates that the calculated mean latencies of EC align very closely throughout the experiments. As EC breaks each file into smaller fragments, in combination with the fact that the variation is smaller when using smaller files in ABD, may lead to the conclusion that the file size has a significant impact on the error variation. To this end, it appears that larger file sizes introduce a higher variation in the delivery times of the file and hence higher statistical errors.
Fig. 3.
Fig. 3. Simulation results.
Reader Scalability Results: The results of reader scalability experiments can be found in Figure 3(b). The read and write latencies of both algorithms remain almost unchanged, while the number of readers increases. This indicates that the system does not reach a state where it can not handle the concurrent read operations. Still, the reduced message size of read and write operations in EC keep their latencies lower than the corresponding latencies of ABD. On the other hand, the reconfiguration latency in both algorithms witnesses wild fluctuations between about 1 sec and 4 sec. This is probably due to the unstable connection in the external service which handles the reconfigurations. Notice that the number of readers does not have a great impact on the results we obtain in each experiment as the SEM error bars are small. The same goes for the next scenario where the number of readers changes while switching algorithms between reconfigurations.
Changing Reconfigurations Results: Figure 3(c) illustrates the results of experiments with the random storage change. While, in Figure 3(d), we can find the results of the experiments when the reconfigurer switches between storage algorithms. During both experiments, there are cases where a single read/write operation may access configurations that implement both ABD and EC algorithms, when concurrent with a recon operation. Thus, the latencies of such operations are accounted in both ABD and EC latencies. As we mentioned earlier, our choice of k minimizes the coded fragment size but introduces bigger quorums and thus larger communication overhead. As a result, in smaller file sizes, Ares may not benefit from the coding, bringing the delays of the two algorithms closer to each other. It is again obvious that the reconfiguration delays are higher than the delays of all other operations.
k Scalability Results: From Figures 3(e) we can infer that when smaller k is used, the write and read latencies reach their highest values. In both cases, small k results in the generation of smaller number of data fragments and thus bigger sizes of the fragments and higher redundancy. For example, we can see that for RS(10,8) and RS(10,7) we have the same size of quorum, equal to 9, whereas the latter has more redundant information. As a result, with a higher number of m (i.e., smaller k) we achieve higher levels of fault-tolerance, but that would waste storage efficiency. The write latency seems to be less affected by the number of k since the encoding is considerably faster as it requires less computation. In conclusion, there appears to be a tradeoff between operation latency and fault-tolerance in the system: the further increase of the k (and thus lower fault-tolerance) the smaller the latency of read/write operations. This experiment proves that the object size plays a significant role in the error variation. Notice that while k is small, and thus the object we send out is bigger, the error is higher. As k goes bigger and the fragments get smaller the SEM minimizes. This is an indication that communication of larger data over the wire may fluctuate the delivery times (as also seen in the file size scenario).
Changing the number of Reads/Writes Results: Figure 3(f) shows a subset of the results of the experiments where the number of read operations changes and the number of write operations is fixed to 60. The experiments show that the total read/write latency (both EC and ABD) has very similar values for all the combinations of writes and reads, which indicates that the system performance is not affected by the number of reads and writes. This is expected since the number of participants is the same in all cases and by well-formedness (i.e., each participant invokes a single operation at a time) at most 10 operations will be concurrent in the execution at any given state. Higher concurrency can be captured by the scalability scenario. Note again that the read latency is higher than the write one, since the read operation actually transfers data twice: once to fetch the data from the servers, and once during the propagation phase.
Consistency Persistence Results: Though Ares protocol is probably strongly consistent, it is important to ensure that our implementation is correct. Validating strong consistency of an execution requires precise clock synchronization across all processes, so that one can track operations with respect to a global time. This is impossible to achieve in a distributed system where clock drift is inevitable. To circumvent this, we deploy all the processes in a single beefy machine so that every process observes the same clock running in the same physical machine.
Our checker gathers data regarding an execution, and this data includes start and end times of all the operations, as well as other parameters like logical timestamps used by the protocol. The checker logic is based on the conditions appearing in Lemma 13.16 [38], which provide a set of sufficient conditions for guaranteeing strong consistency. The checker validates strong consistency property for every atomic object individually for the execution under consideration. Note that consistency holds despite the existence of concurrent read/write and reconfiguration operations that may add/remove servers and switch the storage algorithm in the system.

10 Conclusions

We presented an algorithmic framework suitable for reconfigurable, erasure code-based atomic memory service in asynchronous, message-passing environments. In particular, we provide a new modular framework, called Ares, which abstracts the implementation of the underlying shared memory within a set of DAPs with specific correctness properties. Using these structures, Ares may implement a large class of atomic shared algorithms (those that can be expressed using the proposed DAPs) allowing any such algorithm to work in a reconfigurable environment. A set of erasure-coded-based atomic memory algorithms are included in this class. To demonstrate the use of our framework, we provided a new two-round erasure code-based algorithm that has near optimal storage cost, implemented in terms of the proposed DAPs. Such implementation gave rise to the first (to our knowledge) reconfigurable erasure-coded atomic shared memory object. We provided a PoC implementation of our framework and obtained initial experimental results proving the feasibility of the presented approach, demonstrating its correctness, and comparing its performance with traditional approaches.
Ares is designed to address the real-world problem of system migration from a replicated system to a system that uses erasure codes and vice-versa. Ares can also enable replacing of failed nodes with new non-failed nodes. Its key difference with the existing state-of-the-art systems is that it can perform such reconfigurations with relatively minimal interruption of service unlike current implementations that would block ongoing operations for reconfiguration. We anticipate that Ares will be very useful for workloads that are prone to fast changes in properties, and have stringent constraints on the latencies. For such workloads, Ares enables the system to adapt itself to the changes in the workload in an agile manner—utilizing the full flexibility that EC brings to the system—without causing latency constraints or consistency violations due to interruptions.
Our main goal was to establish that non-blocking reconfiguration is feasible and compatible with EC-based atomic data storage.Our experimental study is designed as a PoC prototype to verify the correctness properties we have developed and show some benefits. It must be emphasized that a full-fledged system study of our algorithms—albeit an interesting area of future work that is motivated by our article—is outside our current scope. In particular, although our study provides some initial hints, we anticipate such a future study would examine the following questions in more detail:
Real-world applications that would indeed benefit from our design.
Workloads generated from these real-world applications to test our algorithms, and competing ones.
The synergies between our reconfiguration algorithm and existing failure detection and recovery mechanisms.
Adding efficient repair and reconfiguration using regenerating codes.

Footnotes

*
In practice, these processes can be on the same node or different nodes.
Cassandra~[36] offers tuneable consistency, it uses protocol that is essentially ABD~[11] for what they refer to as level 3 consistency (i.e., atomicity).

References

[1]
Ansible. 2022. Retrieved 31 October 2022 from https://www.ansible.com/overview/how-ansible-works.
[2]
Emulab network testbed. 2022. Retrieved 31 October 2022 from https://www.emulab.net/.
[3]
Intel Storage Acceleration Library (Open Source Version). 2022. Retrieved 31 October 2022 from https://goo.gl/zkVl4N.
[4]
PyEClib. 2022. Retrieved 31 October 2022 from https://github.com/openstack/pyeclib.
[5]
PySyncObj. 2022. Retrieved 31 October 2022 from https://github.com/bakwc/PySyncObj.
[6]
ZeroMQ. 2022. Retrieved 31 October 2022 from https://zeromq.org.
[7]
M. Abebe, K. Daudjee, B. Glasbergen, and Y. Tian. 2018. EC-store: Bridging the gap between storage and latency in distributed erasure coded systems. In Proceedings of the 2018 IEEE 38th International Conference on Distributed Computing Systems. 255–266. DOI:
[8]
Marcos Kawazoe Aguilera, Idit Keidar, Dahlia Malkhi, and Alexander Shraer. 2009. Dynamic atomic storage without consensus. In Proceedings of the 28th ACM Symposium on Principles of Distributed Computing. ACM, 17–25.
[9]
Marcos K. Aguilera, Idit Keidar, Dahlia Malkhi, Jean-Philippe Martin, and Alexander Shraery. 2010. Reconfiguring replicated atomic storage: A tutorial. Bulletin of the EATCS 102 (2010), 84–108.
[10]
Antonio Fernández Anta, Nicolas Nicolaou, and Alexandru Popa. 2015. Making “fast” atomic operations computationally tractable. In Proceedings of the International Conference on Principles Of Distributed Systems.
[11]
H. Attiya, A. Bar-Noy, and D. Dolev. 1996. Sharing memory robustly in message passing systems. Journal of the ACM 42, 1 (1996), 124–142.
[12]
Dorian Burihabwa, Pascal Felber, Hugues Mercier, and Valerio Schiavoni. 2016. A performance evaluation of erasure coding libraries for cloud-based data stores. In Proceedings of the Distributed Applications and Interoperable Systems. Springer, 160–173.
[13]
Christian Cachin and Stefano Tessaro. 2006. Optimal resilience for erasure-coded byzantine distributed storage. In Proceedings of the International Conference on Dependable Systems and Networks. IEEE Computer Society, 115–124.
[14]
V. R. Cadambe, N. Lynch, M. Médard, and P. Musial. 2014. A coded shared atomic memory algorithm for message passing architectures. In Proceedings of the 2014 IEEE 13th International Symposium on Network Computing and Applications.253–260. DOI:
[15]
Viveck R. Cadambe, Nancy A. Lynch, Muriel Médard, and Peter M. Musial. 2017. A coded shared atomic memory algorithm for message passing architectures. Distributed Computing 30, 1 (2017), 49–73.
[16]
Yu Lin Chen Chen, Shuai Mu, and Jinyang Li. 2017. Giza: Erasure coding objects across global data centers. In Proceedings of the 2017 USENIX Annual Technical Conference. 539–551.
[17]
Gregory Chockler, Seth Gilbert, Vincent Gramoli, Peter M. Musial, and Alexander A. Shvartsman. 2009. Reconfigurable distributed storage for dynamic networks. Journal of Parallel and Distributed Computing 69, 1 (2009), 100–116.
[18]
Gregory Chockler and Dahlia Malkhi. 2005. Active disk paxos with infinitely many processes. Distributed Computing 18, 1 (2005), 73–84.
[19]
Dan Dobre, Ghassan O. Karame, Wenting Li, Matthias Majuntke, Neeraj Suri, and Marko Vukolić. 2019. Proofs of writing for robust storage. IEEE Transactions on Parallel and Distributed Systems 30, 11 (2019), 2547–2566. DOI:
[20]
Partha Dutta, Rachid Guerraoui, and Ron R. Levy. 2008. Optimistic erasure-coded distributed storage. In DISC’08: Proceedings of the 22nd International Symposium on Distributed Computing. Springer-Verlag, 182–196.
[21]
Partha Dutta, Rachid Guerraoui, Ron R. Levy, and Arindam Chakraborty. 2004. How fast can a distributed atomic read be? In Proceedings of the 23rd ACM Symposium on Principles of Distributed Computing. 236–245.
[22]
Rui Fan and Nancy Lynch. 2003. Efficient replication of large data objects. In Proceedings of the Distributed Algorithms.Faith Ellen Fich (Ed.), Lecture Notes in Computer Science, Vol. 2848. 75–91.
[23]
Antonio Fernández Anta, Theophanis Hadjistasi, and Nicolas Nicolaou. 2016. Computationally light “multi-speed” atomic memory. In Proceedings of the International Conference on Principles Of Distributed Systems.
[24]
Eli Gafni and Dahlia Malkhi. 2015. Elastic configuration maintenance via a parsimonious speculating snapshot solution. In Proceedings of the International Symposium on Distributed Computing. Springer, 140–153.
[25]
Chryssis Georgiou, Nicolas C. Nicolaou, and Alexander A. Shvartsman. 2008. On the robustness of (semi) fast quorum-based implementations of atomic shared memory. In DISC ’08: Proceedings of the 22nd International Symposium on Distributed Computing. Springer-Verlag, 289–304.
[26]
Chryssis Georgiou, Nicolas C. Nicolaou, and Alexander A. Shvartsman. 2009. Fault-tolerant semifast implementations of atomic read/write registers. Journal of Parallel and Distributed Computing 69, 1 (2009), 62–79.
[27]
Seth Gilbert. 2003. RAMBO II: Rapidly Reconfigurable Atomic Memory for Dynamic Networks. Master’s thesis. MIT.
[28]
S. Gilbert, N. Lynch, and A. A. Shvartsman. 2003. RAMBO II: Rapidly reconfigurable atomic memory for dynamic networks. In Proceedings of the International Conference on Dependable Systems and Networks. 259–268.
[29]
Maurice P. Herlihy and Jeannette M. Wing. 1990. Linearizability: A correctness condition for concurrent objects. ACM Transactions on Programming Languages and Systems 12, 3 (1990), 463–492.
[30]
Jianzhong Huang, Xianhai Liang, Xiao Qin, Ping Xie, and Changsheng Xie. 2015. Scale-RS: An efficient scaling scheme for RS-coded storage clusters. IEEE Transactions on Parallel and Distributed Systems 26, 6 (2015), 1704–1717. DOI:
[31]
W. C. Huffman and V. Pless. 2003. Fundamentals of Error-correcting Codes. Cambridge University Press.
[32]
Leander Jehl, Roman Vitenberg, and Hein Meling. 2015. Smartmerge: A new approach to reconfiguration for atomic storage. In Proceedings of the International Symposium on Distributed Computing. Springer, 154–169.
[33]
Gauri Joshi, Emina Soljanin, and Gregory Wornell. 2017. Efficient redundancy techniques for latency reduction in cloud systems. ACM Transactions on Modeling and Performance Evaluation of Computing Systems 2, 2 (2017), 12.
[34]
K. M. Konwar, N. Prakash, E. Kantor, N. Lynch, M. Médard, and A. A. Schwarzmann. 2016. Storage-optimized data-atomic algorithms for handling erasures and errors in distributed storage systems. In Proceedings of the 2016 IEEE International Parallel and Distributed Processing Symposium. 720–729.
[35]
Kishori M. Konwar, N. Prakash, Nancy Lynch, and Muriel Médard. 2016. RADON: Repairable atomic data object in networks. In Proceedings of the International Conference on Distributed Systems.
[36]
Avinash Lakshman and Prashant Malik. 2010. Cassandra: A decentralized structured storage system. ACM SIGOPS Operating Systems Review 44, 2 (2010), 35–40. DOI:
[37]
Leslie Lamport. 1998. The part-time parliament. ACM Transactions on Computer Systems 16, 2 (1998), 133–169. DOI:
[38]
N. A. Lynch. 1996. Distributed Algorithms. Morgan Kaufmann Publishers.
[39]
N. Lynch and A. A. Shvartsman. 2002. RAMBO: A reconfigurable atomic memory service for dynamic networks. In Proceedings of the16th International Symposium on Distributed Computing. 173–190.
[40]
Nancy A. Lynch and Alexander A. Shvartsman. 1997. Robust emulation of shared memory using dynamic quorum-acknowledged broadcasts. In Proceedings of the Symposium on Fault-Tolerant Computing. 272–281.
[41]
Ellis Michael, Dan R. K. Ports, Naveen Kr. Sharma, and Adriana Szekeres. 2017. Recovering shared objects without stable storage. In Proceedings of the 31st International Symposium on Distributed Computing.Andréa W. Richa (Ed.), Leibniz International Proceedings in Informatics,Vol. 91. Schloss Dagstuhl–Leibniz-Zentrum fuer Informatik, Dagstuhl, Germany, 36:1–36:16. DOI:
[42]
Satoshi Nakamoto. 2008. Bitcoin: A peer-to-peer electronic cash system. https://bitcoin.org/bitcoin.pdf.
[43]
Nicolas Nicolaou, Viveck Cadambe, Kishori Konwar, N. Prakash, Nancy Lynch, and Muriel Médard. 2018. ARES: Adaptive, reconfigurable, erasure coded, atomic storage. arXiv:1805.03727. Retrieved from https://arxiv.org/abs/1805.03727.
[44]
Nicolas Nicolaou, Viveck Cadambe, N. Prakash, Kishori Konwar, Muriel Medard, and Nancy Lynch. 2019. ARES: Adaptive, reconfigurable, erasure coded, atomic storage. In Proceedings of the 2019 IEEE 39th International Conference on Distributed Computing Systems. 2195–2205. DOI:
[45]
Diego Ongaro and John Ousterhout. 2014. In search of an understandable consensus algorithm. In Proceedings of the 2014 USENIX Conference on USENIX Annual Technical Conference.USENIX Association, 305–320.
[46]
K. V. Rashmi, Mosharaf Chowdhury, Jack Kosaian, Ion Stoica, and Kannan Ramchandran. 2016. EC-cache: Load-balanced, low-latency cluster caching with online erasure coding. In Proceedings of the OSDI. 401–417.
[47]
Alexander Shraer, Jean-Philippe Martin, Dahlia Malkhi, and Idit Keidar. 2010. Data-centric reconfiguration with network-attached disks. In Proceedings of the 4th InternationalWorkshop on Large Scale Distributed System and Middleware. 22–26.
[48]
Alexander Spiegelman, Idit Keidar, and Dahlia Malkhi. 2017. Dynamic reconfiguration: Abstraction and optimal asynchronous solution. In Proceedings of the 31st International Symposium on Distributed Computing. 40:1–40:15.
[49]
Shuang Wang, Jianzhong Huang, Xiao Qin, Qiang Cao, and Changsheng Xie. 2017. WPS: A workload-aware placement scheme for erasure-coded in-memory stores. In Proceedings of the International Conference on Networking, Architecture, and Storage. IEEE, 1–10.
[50]
Chentao Wu and Xubin He. 2012. GSR: A global stripe-based redistribution approach to accelerate RAID-5 scaling. In Proceedings of the 2012 41st International Conference on Parallel Processing. 460–469. DOI:
[51]
Chentao Wu and Xubin He. 2013. A flexible framework to enhance RAID-6 scalability via exploiting the similarities among MDS codes. In Proceedings of the 2013 42nd International Conference on Parallel Processing. 542–551. DOI:
[52]
Yu Xiang, Tian Lan, Vaneet Aggarwal, and Yih-Farn R. Chen. 2015. Multi-tenant latency optimization in erasure-coded storage with differentiated services. In Proceedings of the 2015 IEEE 35th International Conference on Distributed Computing Systems. IEEE, 790–791.
[53]
Yu Xiang, Tian Lan, Vaneet Aggarwal, Yih-Farn R. Chen, Yu Xiang, Tian Lan, Vaneet Aggarwal, and Yih-Farn R. Chen. 2016. Joint latency and cost optimization for erasure-coded data center storage. IEEE/ACM Transactions on Networking 24, 4 (2016), 2443–2457.
[54]
Yinghao Yu, Renfei Huang, Wei Wang, Jun Zhang, and Khaled Ben Letaief. 2018. SP-cache: Load-balanced, redundancy-free cluster caching with selective partition. In Proceedings of the International Conference for High Performance Computing, Networking, Storage, and Analysis. IEEE, 1–13.
[55]
Guangyan Zhang, Keqin Li, Jingzhe Wang, and Weimin Zheng. 2015. Accelerate RDP RAID-6 scaling by reducing disk I/Os and XOR operations. IEEE Transactions on Computers 64, 1 (2015), 32–44. DOI:
[56]
Heng Zhang, Mingkai Dong, and Haibo Chen. 2016. Efficient and available in-memory KV-store with hybrid erasure coding and replication. In Proceedings of the 14th USENIX Conference on File and Storage Technologies. USENIX Association, Santa Clara, CA, 167–180.
[57]
Xiaoyang Zhang, Yuchong Hu, Patrick P. C. Lee, and Pan Zhou. 2018. Toward optimal storage scaling via network coding: From theory to practice. In Proceedings of the IEEE INFOCOM 2018 - IEEE Conference on Computer Communications. 1808–1816. DOI:
[58]
P. Zhou, J. Huang, X. Qin, and C. Xie. 2019. PaRS: A popularity-aware redundancy scheme for in-memory stores, IEEE Trans. Computers, 68, 4 (2019), 556–569. https://dblp.org/rec/journals/tc/ZhouHQX19.html?view=bibtex.

Cited By

View all

Recommendations

Comments

Please enable JavaScript to view thecomments powered by Disqus.

Information & Contributors

Information

Published In

cover image ACM Transactions on Storage
ACM Transactions on Storage  Volume 18, Issue 4
November 2022
255 pages
ISSN:1553-3077
EISSN:1553-3093
DOI:10.1145/3570642
Issue’s Table of Contents

Publisher

Association for Computing Machinery

New York, NY, United States

Publication History

Published: 12 November 2022
Online AM: 27 September 2022
Accepted: 06 January 2022
Revised: 19 November 2021
Received: 27 April 2021
Published in TOS Volume 18, Issue 4

Permissions

Request permissions for this article.

Check for updates

Author Tags

  1. Atomicity
  2. distributed storage
  3. reconfiguration
  4. fault-tolerance
  5. erasure-codes

Qualifiers

  • Research-article
  • Refereed

Funding Sources

  • Center for Science of Information NSF
  • AFOSR

Contributors

Other Metrics

Bibliometrics & Citations

Bibliometrics

Article Metrics

  • 0
    Total Citations
  • 1,485
    Total Downloads
  • Downloads (Last 12 months)1,282
  • Downloads (Last 6 weeks)47
Reflects downloads up to 19 Nov 2024

Other Metrics

Citations

Cited By

View all

View Options

View options

PDF

View or Download as a PDF file.

PDF

eReader

View online with eReader.

eReader

HTML Format

View this article in HTML Format.

HTML Format

Login options

Full Access

Media

Figures

Other

Tables

Share

Share

Share this Publication link

Share on social media