• Non ci sono risultati.

Implementing a Regular Register in an Eventually Synchronous Distributed System prone to Continuous Churn

N/A
N/A
Protected

Academic year: 2021

Condividi "Implementing a Regular Register in an Eventually Synchronous Distributed System prone to Continuous Churn"

Copied!
8
0
0

Testo completo

(1)

Implementing a Regular Register in an

Eventually Synchronous Distributed System

prone to Continuous Churn

Roberto Baldoni Silvia Bonomi Michel Raynal

Abstract—Due to their capability to hide the complexity generated by the messages exchanged between processes, shared objects are one of the main abstractions provided to developers of distributed applications. Implementations of such objects, in modern distributed systems, have to take into account the fact that almost all services, implemented on top of distributed infrastructures, are no longer fully managed due to either their size or their maintenance cost. Therefore, these infrastructures exhibit several autonomic behaviors in order to, for example, tolerate failures and continuous arrival and departure of nodes (churn phenomenon).

Among all the shared objects, the register object is a fundamental one. Several protocols have been proposed to build fault resilient registers on top of message-passing system, but, unfortunately, failures are not the only challenge in modern distributed systems and new issues arise from the presence of churn. This paper addresses the construction of a multi-writer/multi-reader regular register in an eventually synchronous distributed system affected by the continuous arrival/departure of participants. In particular, a general protocol implementing a regular register is proposed and feasibility conditions associated with the arrival and departure of the processes are given. The protocol is proved correct under the assumption that a constraint on the churn is satisfied.

Index Terms—Regular Register, Dynamic Distributed Systems, Churn, Distributed Algorithms.

F

1 I

NTRODUCTION

Context. Dealing with failures has been one of the main challenges in the construction of real reliable applications able to work in a distributed system. These applications are inherently managed, in the sense that, they implicitly assume the existence of a superior manager (i.e., the application/service provider) that controls processes running the application. The manager does its best to guarantee that assumptions made on the underlying distributed system (e.g. a majority of correct processes) hold along time by activating appropriate reactive or proactive recovery procedures[26]. As an example, the manager can either add new processes when crashes occur or ensure the required degree of synchrony of the underlying distributed platform in terms of processes and communication links. Air traffic control, telecommunication, banking systems and e-government systems are just a few examples of such application domains. In this context, robust abstractions have been defined (shared memory, communication, agreement, etc.) that behave correctly despite asynchrony and failures and that simplify application design and development. When considering protocols implementing such abstractions, in nearly all the cases, the system is always “well defined” in the sense that the whole set of participating processes is finite and known (directly or transitively) by each process.

Universit´a La Sapienza, via Ariosto 25, I-00185 Roma, Italy Universit´a La Sapienza, via Ariosto 25, I-00185 Roma, Italy

Senior Member, Institut Universitaire de France, IRISA, Universit´e de Rennes, Campus de Beaulieu, F-35042 Rennes, France

The system composition is modified only when either a process crashes or a new process is added. Therefore, if a process does not crash, it lives for the entire duration of the computation.

Motivation. A new challenge is emerging due to the advent of new classes of applications and technologies such as smart environments, sensor networks, mobile systems, peer- to-peer systems, cloud computing etc. In these settings, the underlying distributed systems cannot be fully managed but it needs some degree of self-management that depends on the specific application domain. However, it is possible to delineate some common consequences of the presence of such self management: first, there is no entity that can always ensure the validity of the system assumptions during the entire computation and, second, no one knows accurately who joins and who leaves the system at any time introducing a kind of unpredictability in the system composition (this phenomenon of arrival and departure of processes in a system is also known as churn) [6].

As a consequence, distributed computing abstractions have to deal not only with asynchrony and failures, but also with this dynamic dimension where a process that does not crash can leave the system at any time implying that membership can fully change several times during the same computation. Moreover, this dynamic behavior means each process cannot have a precise knowledge on the number of processes composing the system at any given time. Thus, it becomes of primary importance to check under which churn assumption, a protocol implementing a distributed

(2)

computing abstraction is correct. Hence, the abstractions and protocols implementing them have to be reconsidered to take into account this new “adversary” setting. This self- defined and continuously evolving distributed system, that we will name in the following dynamic distributed system, makes abstractions more difficult to understand and master than in distributed systems where the set of processes is fixed and known by all participants. The churn notion becomes thus a system parameter whose aim is to make tractable systems having their composition evolving along the time (e.g., [15], [19], [23]).

Contribution and roadmap. In this paper, a general churn model that we defined in [5] is considered and used to characterize a dynamic distributed computation where the number of participants change in a given range and the arrival and departure of processes is a non-quiescent phenomenon that depends on join and leave distributions.

Such a model places constraints on process arrivals and departures. Specifically the computation size is constrained in a range that is between n0− k1and n0+ k2

where n0 is the number of processes participating in the computation at time t0 while k1 and k2 are two integers greater than or equal to zero that depend on the join and leave distributions. In particular, this paper addresses the problem of deterministically building and maintaining a distributed computation implementing a multiple-writers/multiple-readers regular register. Processes participating in the computation are called active processes.

We provide an implementation of a regular register based on a request/reply message pattern and we prove that:

• any operation issued on the regular register terminates if the number of reply messages needed to perform the operation is at most n0− k1 (Lemma 1), and

• any operation issued on the regular register is valid if the number of reply messages needed to perform the operation is at least d(n0+ k2)/2e (Lemma 2).

From these two conditions it follows that n0, k1and k2

cannot be chosen arbitrarily. They are closely related by the condition n0> 2k1+ k2 (Corollary 1).

Let us finally remark that the interest in addressing the regular register abstraction lies in the fact that it is a fundamental notion for building storage systems. Up to now, storage systems that cope with churn ensure regular register consistency criteria in a probabilistic way [2].

The result of this paper gives thus a bound on the churn that a storage system can cope with while still providing deterministic regular consistency guarantee.

The paper is structured as follows: Section 2 defines the system model, and in particular Section 2.3 defines the churn model. Section 3 introduces the regular register specification for a dynamic distributed system. Section 4 presents a protocol implementing a regular register and its correctness proof in an eventually synchronous system. Fi- nally, two sections on related work and concluding remarks

conclude the paper.

2 S

YSTEM

M

ODEL

2.1 Dynamic Distributed System.

In a dynamic distributed system, processes may join and leave the system at their will. In order to model processes continuously arriving to and departing from the system, we assume the infinite arrival model (as defined in [24]). The set of processes that can participate in the distributed system, i.e. the distributed system population, is composed by a potentially infinite set of processes Π = {. . . pi, pj, pk. . . }, each one having a unique identifier (i.e. its index). However, the distributed system is composed, at each time, by a finite subset of the population. A process enters the distributed system by executing the join System procedure. Such operation aims at connecting the new process to the processes that already belong to the system. A process leaves the system by means of a leave System operation. Processes belonging to the distributed system may fail by crashing before leaving the system; if a process crashes, it stops performing any action. A process that never crashes is said to be correct. In the following we assume the existence of a protocol managing the arrival and the departures of processes from the distributed system; such protocol is also responsible for the connectivity maintenance among processes part of the distributed system. Some examples of topologies and protocols keeping the system connected in a dynamic environment are [16], [17], [20], [27]. The system is eventually synchronous1, that is after an unknown but finite time the system behaves synchronously [7], [9].

The passage of time is measured by a fictional global clock, represented by integer values, not accessible by processes. Processes belonging to the distributed system communicate by exchanging messages through t either point-to-point reliable channels or broadcast primitives.

Both the communication primitives can be characterized by the following property:

Eventual Time Delivery: there exists a bound δ, known by processes, and a time t such that any message sent (broadcast) at time t0 ≥ t, is delivered by time t0+ δ by all the processes that are in the system during the whole interval [t0, t0+ δ].

It is important to notice that processes only know that the time t exists. They never know and nor can deduce or predict when the synchrony period starts.

2.2 Distributed Computation

Processes belonging to the distributed system may decide autonomously to join a distributed computation running on top of the system (e.g. a regular register computation).

Hence, a distributed computation is composed, at each

1. Sometime also called partially synchronous system.

(3)

Distributed Computation (e.g., Regular Register)

Distributed System

join_System()

join_Computation() leave_Computation()

leave_System()

leave_System()

Fig. 1. Distributed System and Distributed Computa- tion

instant of time, by a subset of processes of the dis- tributed system. A process pi, belonging to the distributed system, that wants to join the distributed computation has to execute the join Computation() operation. Such operation, invoked at some time t, is not instantaneous and it takes time to be executed; how much this time is, depends from the specific implementation provided for the join Computation() operation. However, from time t, the process pi can receive and process messages sent by any other process that participate in the computation.

When a process pj, participating in the distributed com- putation, wishes to leave the computation, it executes the leave Computation operation. Without loss of generality, we assume that if a process leaves the computation and later wishes to re-join, it executes again the join Computation() operation with a new identity. Figure 1 shows the dis- tributed system and the distributed computation layers.

It is important to notice that (i) there may exist processes belonging to the distributed system that never join the distributed computation (i.e. they execute the join System() procedure but they never invoke the join Computation() operation) and (ii) there may exist processes that after leaving the distributed computation remain inside the distributed system (i.e. they are correct but they stop to process messages related to the computation). To this aim, it is important to identify the subset of processes that are actively participating in the distributed computation.

Definition 1: A process is active in the distributed computation from the time it returns from the join Computation() operation until the time it start executing the leave Computation() operation. A(t) denotes the set of processes that are active at time t, while A([t, t0]) denotes the set of processes that are active during the whole interval [t, t0] (i.e. pi ∈ A([t, t0]) iff pi ∈ A(τ ) for each τ ∈ [t, t0]).

2.3 Churn Model

Processes may join and leave the distributed computation at any time. To model this activity, we consider the churn model that we introduced in [5]. The model is

based on the definition of two functions (i) the join function λ(t) (defining the join of new processes to the distributed computation with respect to time) and (ii) the leave function µ(t) (defining the leave of processes from the distributed computation with respect to time). Such functions are discrete functions of time.

Definition 2: (Join function) The join function λ(t) is a discrete time function that returns the number of processes that invoke the join Computation() operation at time t.

Definition 3: (Leave function) The leave function µ(t) is a discrete time function that returns the number of processes that invoke the leave Computation() operation at time t.

Let t0 be the starting time of the system. We assume that at time t0 no process joins or leaves the distributed computation (i.e. λ(t0) = 0 and µ(t0) = 0) and therefore we can say that at t0the computation is composed by a set Π0of processes and the size of the distributed computation is n0 (i.e., |Π0| = n0). Moreover, for any time t < t0 we have λ(t) = µ(t) = 0.

The churn is continuous meaning that processes never stop to join and to leave the computation and the following conditions hold.

@ t : ∀τ > t : λ(τ ) = 0, and

@ t0: ∀τ > t0 : µ(τ ) = 0.

As soon as churn starts, the size of the computation and computation membership change. The number of participants of the computation can be calculated as follows.

Definition 4: (Node function) Let n0 be the number of processes participating in the computation at start time t0. N (t) is the number of processes of the computation at time t for every t ≥ t0(i.e. N (t) = N (t − 1) + λ(t) − µ(t), with N (t0) = n0).

Based on the previous definitions, let us derive the constraint that a join function and a leave function have to satisfy in order that the distributed computation size remains in a given interval. Note that, such behavior is typical of real applications like peer-to-peer systems, VoIP based application etc. [13], [14].

Let n0 be the number of processes of the distributed computation at the start time t0 and k1, k2 be two positive integers, the following Lemma (proved in [5]) states the constraints on the join function and the leave function such that the distributed computation size falls in the interval

∆N = [n0− k1, n0+ k2].

Lemma 1: Let k1 and k2 be two integers such that k1, k2 ≥ 0 and let n0 be the number of processes in the distributed computation at starting time t0. Given a join and leave function λ(t) and µ(t), the node function N (t) falls in the interval ∆N = [n0− k1, n0+ k2] if and only if:

(4)

t N(t)

n0

t1 t2 n0 +k2

n0 – k1

t N(t)

t N(t)

n0

t1 t2 n0 +k2

n0 – k1

Fig. 2. Distributed System Size in an interval ∆N = [n0− k1, n0+ k2]

(c1) Pt

τ =t0µ(τ ) ≤Pt

τ =t0λ(τ ) + k1 ∀t, (c2) Pt

τ =t0µ(τ ) ≥Pt

τ =t0λ(τ ) − k2 ∀t.

An example the evolution of the size of a distributed computation along the time is shown in Figure 2.

Note that, constraints (c1) and (c2) have to be satisfied independently of the computation. In fact, they just follow from the requirement of having the computation size falling in the range defined by n0, k1 and k2.

3 R

EGULAR REGISTER IN A DYNAMIC DIS

-

TRIBUTED SYSTEM

.

A register is a shared variable accessed by a set of processes through two operations, namely read() and write(). Informally, the write() operation updates the value stored in the shared variable while the read() obtains the value contained in the variable (i.e. the last written value).

In case of concurrency while accessing the shared variable, the meaning of last written value becomes ambiguous.

Depending on the semantics of the operations, three types of register have been defined by Lamport [18]: safe, regularand atomic.

3.1 Regular Register Computation

Processes participating in the distributed computation im- plement a regular register abstraction. As a specialization of the generic model of the computation defined in Section 2, in the following we consider the existence of a join register operation and of a leave register operation. In particular, in the case of a regular register computation, the aim of the join register operation is to “transfer” the current value of the register variable to the new process to guarantee the persistence of the value of the register despite churn.

The protocol implementing the join register operation is presented in Section 4. We model the leave register op- eration as an implicit operation; when a process pi leaves the computation it just stops to send and process messages related to the register computation. In this way, it is possible to address the same way (from the register computation point of view) both process failures and process leaves.

Thus, in the following, we do not distinguish among voluntary leaves and failures but we refer to both of them as leave. Moreover, to simplify the notation, whenever not strictly necessary, we refer to the join register() operation as join() operation.

3.2 Operation executions

Every operation issued on a register is, generally, not instantaneous and it can be characterized by two events occurring at its boundary: an invocation event and a reply event. These events occur at two time instants (invocation time and reply time) according to the fictional global time.

An operation op is complete if both the invocation event and the reply event occur (i.e. the process executing the operation does not crash between the invocation and the reply).

Contrary, an operation op is said to be failed if it is invoked by a process that crashes before the reply event occurs.

Given two operations op and op0, and their invocation event and reply event times (tB(op) and tB(op0)) and return times (tE(op) and tE(op0)), we say that op precedes op0 (op ≺ op0) iff tE(op) < tB(op0). If op does not precede op0 and op0 does not precede op then op and op0 are concurrent (op||op0).

Given a write(v) operation, the value v is said to be written when the operation is complete. As a consequence, failed write() operations are incomplete operations. As in [12], we consider that if a process crashes during a write() operation, such write() is concurrent with all the successive operations.

3.3 Multi-reader/Multi-writer Specification

The notion of a regular register, as specified in [18], is not directly applicable in a dynamic distributed system like the one presented in the previous section, because it does not consider failures, process joins and leaves. To this aim, we focus on the multi-writer/multi-reader regular register abstraction as defined in [22] and in [25] and we adapt it to consider arrivals and departures of processes. Before introducing the specification, let us introduce the notion of relevant write.

Definition 5: A write() operation w is relevant for a read() operation r if:

(i) w||r or

(ii) @ w0: w ≺ w0≺ r

We are now in the position to specify a regular register for a dynamic distributed system. A protocol implements a regular register in a dynamic distributed system if the following properties are satisfied.

Termination: If a correct process participating in the computation invokes a read or write operation and does not leave the system, it eventually returns from that operation.

Multi-Writer Regularity 1(MWR1): A read operation op returns any of the values written by some write() that is relevant for op.

We assumed that each process pi issues either a read() or a write() operation only after it has returned from its join register() operation [4].

(5)

4 R

EGULAR

R

EGISTER IN

E

VENTUALLY

S

YNCHRONOUS

D

ISTRIBUTED

S

YSTEMS

In [5] we presented an implementation of the regular register for a synchronous distributed system. Such im- plementation is based on the following considerations:

(i) the join register() operation is executed once from each process and (ii) read() and write() operations are executed frequently. This led us to design a protocol having local read and fast write operations, by exploiting the synchrony of the communication. Moving to an eventually synchronous system, read() operations are no longer local.

They indeed require to gather information from a certain number of active processes in the system in order to retrieve the last written value. Hence, the price to pay for not relying on synchrony is that read() operations cannot be local anymore.

This section presents a protocol implementing a regular register in an eventually synchronous distributed system with continuous churn and where the number of processes participating in the distributed computation is alway in the range [n0− k1, n0+ k2].

To master the absence of synchrony assumptions holding at each time, the protocol implements join register(), read() and write() operations involving all the processes belonging to the computation. The basic idea behind join register() and read() operation is to have two phases: (i) the process issuing the operation broadcasts anINQUIRYmessage, then waits until it receives “enough” replies to confirm that the operation has been processed by “enough” processes; (ii) the process helps other processes that join the computation concurrently to terminate the operation by sending them the updated value. Concerning the write() operation, the basic idea is that the writer broadcasts aWRITEmessage and then just waits until it receives “enough” acknowledgments for such operation.

In the following section, we provide the details of the protocols implementing such operations.

4.1 Protocol

Each process pi maintains the following local variables.

Two variables denoted registeri and sni, such that registeri is the local copy of the register, while sni

is the sequence number of the last write operation that updated registeri.

A boolean activei, initialized to false. It flips to true just after pi has joined the regular register computa- tion.

Two set variables, denoted repliesi and reply toi. The first one is used both in the join register() op- eration and in the read() operation while reply toi is used only during the join period. The local variable repliesi contains the 3-uples < id, value, sn > that pihas received from other processes, while reply toi

contains the processes that are joining the regular register computation concurrently with pi.

read sni is a sequence number used by pi to times- tamp its read requests. The value read sni equal to

zero is used by the join operation.

readingi is boolean whose value is true when pi is reading.

write acki is a set used by pi (when it writes a new value) to store identifiers of processes that have acknowledged pi’s last write.

while pi is joining the distributed computation, dl previ is a set where pi stores identifiers of processes that have acknowledged pi’s inquiry message while these processes were not yet active (so, these processes were joining the computation too) or while they are reading. When it terminates its join operation, pi has to send them a reply to prevent them from being blocked forever.

The join register() operation. The protocol implement- ing this operation is described in Figure 3. After having initialized its local variables, pi broadcasts an

INQUIRY(i, read sni) message to inform the other pro- cesses that it wants to obtain the value of the regular register (line 04, as indicated read sni is then equal to 0). Then, after it has received a number C of replies (line 05)2, pi updates its local pair (registeri, sni) (lines 06- 07), becomes active (line 08), and sends a reply to the processes in the set reply toi (line 09-11). It sends such a reply message also to the processes in its set dl previ

to prevent them from waiting forever (see proof of Lemma 3). In addition to the triple < i, registeri, sni >, a reply message sent to a process pj, from a process pi, carries also the read sequence number r sn that identifies the corresponding request issued by pj.

When a process pi delivers a message INQUIRY(j), it answers pj sending back a REPLY(< i, registeri, sni >) message containing its local variable. If pi is active and reading (line 15), it also sends aDL PREV() message to pj

(line 17); this is required in order that pj sends to pi the value pj has obtained when it terminated its join operation.

If pi is not yet active, it postpones its answer until it becomes active (line 19 and lines 09-11) and it sends a

DL PREVmessage (line 20).

When pi delivers a REPLY(< j, value, sn >, r sn) message from a process pj, if the reply message is an answer to its INQUIRY(i, read sn) message (line 23), pi

adds < j, value, sn > to the set of replies it has received so far and it sends back an ACK(i, r sn) message to pj

(lines 24 - 25).

Finally, when pi delivers a messageDL PREV(j, r sn), it adds its content to the set dl previ (line 28), in order to remember that it has to send a reply to pj when it will become active (lines 09-10).

The read() operation. A read is a simplified version of the join operation3. Hence, the code of the read() operation,

2. In the correctness proofs section we will compute the value of C that allows any operation to terminate and be valid.

3. As indicated before, the “read” identified (i, 0) is the join register() operation issued by pi.

(6)

operation join register(i):

(01) registeri← ⊥; sni← −1; activei← false;

(02) readingi← false; repliesi← ∅; reply toi← ∅;

(03) write acki← ∅; dl previ← ∅; read sni← 0;

(04) broadcastINQUIRY(i, read sni);

(05) wait until`|repliesi| > C);

(06) let < id, val, sn >∈ repliesi

such that (∀ < −, −, sn0>∈ replies i : sn ≥ sn0);

(07) if (sn > sni) then sni← sn; registeri← val end if (08) activei← true;

(09) for each < j, r sn >∈ reply toi∪ dl prevido (10) sendREPLY(< i, registeri, sni>, r sn) to pj (11) end for;

(12) return(ok).

———————————————————————————————

(13) whenINQUIRY(j, r sn) is delivered:

(14) if (activei)

(15) then sendREPLY(< i, registeri, sni>, r sn) to pj (16) if (readingi) then

(17) sendDL PREV(i, r sn) to pj

(18) end if;

(19) else reply toi← reply toi∪ {< j, r sn >};

(20) sendDL PREV(i, r sn) to pj

(21) end if.

(22) whenREPLY(< j, value, sn >, r sn) is delivered:

(23) if ((r sn = read sni) then

(24) repliesi← repliesi ∪ {< j, value, sn >};

(25) sendACK(i, r sn) to pj

(26) end if.

(27) whenDL PREV(j, r sn) is delivered:

(28) dl previ← dl previ ∪ {< j, r sn >}.

Fig. 3. The join register() protocol (code for pi)

described in Figure 4, is a simplified version of the code of the join register() operation.

Each read invocation is identified by a pair made up of the process index i and a sequence number read sni (line 03). pi first broadcasts a read requestREAD(i, read sni).

Then, after it has received C replies, piselects the one with the greatest sequence number, updates (if needed) its local pair (registeri, sni), and returns the value of registeri.

When pi delivers a messageREAD(j, r sn) while being active (line 09). If it is joining the system, pi stores the pj’s identifier to remember that pi has to send back a reply to pj when pi will terminate the join operation (line 11).

operation read(i):

(01) read sni← read sni+ 1;

(02) repliesi← ∅; readingi← true;

(03) broadcastREAD(i, read sni);

(04) wait until(|repliesi| > C);

(05) let < id, val, sn >∈ repliesi

such that (∀ < −, −, sn0>∈ repliesi: sn ≥ sn0);

(06) if (sn > sni) then sni← sn; registeri← val end if;

(07) readingi← false; return(registeri).

————————————————————————————————

(08) whenREAD(j, r sn) is delivered:

(09) if (activei)

(10) then sendREPLY(< i, registeri, sni>, r sn) to pj (11) else reply toi← reply toi∪ {< j, r sn >}

(12) end if.

Fig. 4. The read() protocol (code for pi)

The write() operation. The code of the write operation is described in Figure 5. Let us recall that it is assumed that a single process at a time issues a write.

When a process pi wants to write, it issues first a read operation in order to obtain the sequence number associated with the last value written (line 01)4. Then, after it has broadcast the WRITE(i, < v, sni >) message to disseminate the new value and its sequence number to the other processes (line 04), pi waits until it has received C acknowledgments. When this happens, it terminates the write operation by returning the control value ok (line 05).

When a messageWRITE(j, < val, sn >) is delivered, pi

takes into account the pair (val, sn) if it is more up-to-date than its current pair (line 08). In all cases, it sends back to the sender pj a message ACK (i, sn) to terminate its write operation (line 09).

When anACK (j, sn) message is delivered, piadds it to its set write acki if this message is an answer to its last write (line 11).

operation write(v):

(01) read(i);

(02) sni← sni+ 1; registeri← v;

(03) write acki← ∅;

(04) broadcastWRITE(i, < v, sni>);

(05) wait until(|write acki| > C);

(06) return(ok).

———————————————————————————

(07) whenWRITE(j, < val, sn >) is delivered:

(08) if (sn > sni) then registeri← val; sni← sn end if;

(09) sendACK(i, sn) to pj.

(10) whenACK(j, sn) is delivered:

(11) if (sn = sni) then write acki← write acki∪ {j} end if.

Fig. 5. The write() protocol (code for pi)

Due to lack of space, we omit thee correctness proofs that can be found in the supplementary material.

5 R

ELATED

W

ORK

Several works have been done recently with the aim to address the implementation of concurrent data structures on wired message passing dynamic systems (e.g., [1], [4], [8], [11], [21]).

In [21], a Reconfigurable Atomic Memory for Basic Object (RAMBO) is presented. RAMBO works in a distributed system where processes can join and fail during the exe- cution of the algorithm. To guarantee reliability of data, in spite of network changes, RAMBO replicates data at several network location and defines configuration to manage small and transient changes. Each configuration is composed by a set of members, a set of read-quorum and a set of write- quorums. In order to manage large changes to the set of participant process, RAMBO defines a reconfiguration pro- cedure whose aim is to move from an existing configuration to a new one where the set of members, read-quorum or write-quorum are modified. In order to ensure atomicity, the reconfiguration procedure is implemented by a distributed consensus algorithm that makes all the processes agree on the same successive configurations. Therefore, RAMBO cannot be implemented in a fully asynchronous system. It

4. In absence of concurrent write operations, this read obtains the great- est sequence number. The same strategy is used in protocols implementing atomic registers (e.g., [3], [10]).

(7)

is important to note that in RAMBO the notion of churn is abstracted by defining a sequence of configurations.

Note that, RAMBO poses some constraints on the removal of old configurations and in particular, a certain configura- tion S cannot be removed until each operation, executed by processes belonging S, is ended; as a consequence, many old configurations may take long time to be removed.

[11] and [8] presents some improvements to the original RAMBO protocol and in particular to its reconfiguration mechanism. In [11] the reconfiguration protocol has been changed by parallelizing new configuration installations and the removal of an arbitrary number of old configurations.

In [8], the authors present a mechanism that combines the features of RAMBO and the underling consensus algorithm to speed up the reconfiguration and reduce the time during which old configurations are accessible.

In [1] Aguilera et al. show that an atomic register can be realized without consensus and, thus, on a fully asyn- chronous distributed system provided that the number of reconfiguration operations is finite and thus the churn is quiescent (i.e., there exists a finite time after which there are no more joins or failures). Configurations are managed by taking into account all the changes (i.e. join and failure of processes) suggested by the participant and the quorums are represented by any majority of processes. To ensure liveness of read and write operations, the authors assume that the number of reconfigurations is finite and that there is a majority of correct processes in each reconfiguration.

In [4], we presented an implementation of a regular register in an eventually synchronous distributed systems prone to continuous churn. Contrarily to what has been presented in this paper, [4] assumes the size of the distributed system is constant (i.e., at any instant of time the same number of processes join and leave the distributed system). In particular, we have shown that if the distributed system size n does not change, a regular register implementation can be done if at any time at least dn2e active processes participate in the regular register implementation and no constraint is given on the value n. The same paper shows that no regular register can be implemented in a fully asynchronous system in presence of continuous churn. Let us finally remark that the result presented in [4] can be seen as a particular case of the result presented in the previous section when considering k1 = k2 = 0 and n0c processes (where c is a percentage of nodes) invokes the join operation and n0c processes leave the system at every time unit (i.e., λ(t) = µ(t) = n0c).

Figure 6 summarizes the system model assumptions and the assumption on the constraints on processes employed by different algorithms. Note that churn-quiescent imple- mentations (e.g., [1], [8], [11], [21]) do not explicitly use the notion of active processes (they instead use the notion of correct process; it is however possible to consider an active process as being a correct process.). The other direction is not true because a correct process does not pass through a join operation. This is a consequence of the fact that churn-quiescent implementations do not separate distributed system from distributed computation.











































































[9,12,23]

Fig. 6. Register in Dynamic Systems prone to Churn

6 C

ONCLUSION

In modern distributed systems the notion of processes continuously departing and joining the system (churn) is actually part of the system model and creates additional unpredictability to be mastered by distributed applications.

As an example churn creates condition of consistency violations in large scale storage systems and the prob- ability of consistency violations usually increases as the churn increases. This is why such storage system does not provide any deterministic consistency guarantees (e.g.

regular or atomic registers). Hence, there is the need to capture the churn of a dynamic system through tractable realistic models in order to pave the way to distributed applications whose correctness can be formally proved.

This paper, based on a generic churn model defined in [5]

has presented the implementation of a single writer/multiple reader regular register in such a model. It has been formally proved that a regular register can be implemented in an eventually synchronous distributed system if, at any time, the number of active processes is greater than dn0+k2 2e, the number of processes in the distributed system remains between n0− k1 and n0+ k2 (where n0 is the number of processes in the system at time t0) and n0 is greater than 2k1+ k2. Interestingly, this implementation has shown in a precise way that, when one wants to implement a shared register in a dynamic system there is a tradeoff relating the acceptable degree of churn and the synchrony of the underlying system (namely, the churn has to decrease when one wants to go from a synchronous system to an eventually synchronous system).

A

CKNOWLEDGEMENTS

The authors want to thank the anonymous reviewers for their comments that greatly imporoved content and presen- tation of the paper. The work has been partially supported by the STREP EU project SM4ALL and the IP EU project SOFIA.

(8)

R

EFERENCES

[1] Aguilera M. K., Keidar I., Malkhi D., Shraer A. Dynamic atomic storage without consensus in proceedings of 28th Annual ACM Symposium on Principles of Distributed Computing (PODC) 2009, 17-25

[2] Anderson, E. and Li, X. and Shah, M. A. and Tucek, J. and Wylie, J.

J. What consistency does your key-value store actually provide? (To Appear) in proceedings of 6th Workshop on Hot Topics in System Dependability (HotDep) 2010.

[3] Attiya H., Bar-Noy A. and Dolev D., Sharing Memory Robustly in Message-Passing Systems. JACM, 42(1):129-142, 1995.

[4] Baldoni R., Bonomi S., Kermarrec A.M., Raynal M., Implementing a Register in a Dynamic Distributed System. in Proc. 29th IEEE Int’l Conference on Distributed Computing Systems (ICDCS’09), IEEE Computer Society Press, Montreal (Canada), June 2009.

[5] Baldoni R., Bonomi S., Raynal M. Regular Register: an Implementa- tion in a Churn Prone Environment. 16th International Colloquium on Structural Information and Communication Complexity(SIROCCO), Springer-Verlag #5869, pp. 15-29, 2009.

[6] Baldoni R., and Shvartsman, A. A., Theoretical aspects of dynamic distributed systems: report on the workshop. SIGACT News, 40(4):87- 89, 2009.

[7] Chandra T. and Toueg S., Unreliable Failure Detectors for Reliable Distributed Systems. JACM, 43(2):225-267, 1996.

[8] Chockler G., Gilbert S., Gramoli V., Musial P. M. and Shvartsman A. Reconfigurable distributed storage for dynamic networks Journal Parallel Distributed Computing 69(1): 100-116 (2009)

[9] Dwork C., Lynch N. and Stockmeyer L., Consensus in the Presence of Partial Synchrony. JACM, 35(2):288-323, 1988.

[10] Friedman R., Raynal M. and Travers C., Abstractions for Imple- menting Atomic Objects in Distributed Systems. 9th Int’l Conference on Principles of Distributed Systems (OPODIS’05), LNCS #3974, pp.

73-87, 2005.

[11] Gilbert S., Lynch N., and Shvartsman A. RAMBO II: Rapidly Reconfigurable Atomic Memory for Dynamic Networksin proceeding of International Conference on Dependable Systems and Networks (DSN 2003).

[12] Guerraoui, R. and Levy, R. R. and Pochon, B. and Pugh, J.

The collective memory of amnesic processes ACM Transactions on Algorithms 4(1): (2008)

[13] Godfrey B., Shenker S., Stoica I., Minimizing churn in distributed systems. Proceedings of the 2006 conference on Applications, tech- nologies, architectures, and protocols for computer communications (SIGCOMM), 147-158, 2006.

[14] Guha S.,Daswani N. and Jain R. An experimental study of the skype peer-to-peer voip systemIn Proceeding of he 5th International Workshop on Peer-to-Peer Systems (IPTPS), 2006

[15] Ko S., Hoque I. and Gupta I., Using Tractable and Realistic Churn Models to Analyze Quiescence Behavior of Distributed Protocols.

Proc. 27th IEEE Int’l Symposium on Reliable Distributed Systems (SRDS’08), 2008.

[16] Kuhn F., Schmid S., Wattenhofer R. A Self-repairing Peer-to-Peer System Resilient to Dynamic Adversarial Churn. in Proceeding of 4th International Workshop on Peer-to-Peer Systems (IPTPS) 20053-23 [17] Kuhn F., Schmid S., Smit J., Wattenhofer R. A Blueprint for

Constructing Peer-to-Peer Systems Robust to Dynamic Worst-Case Joins and Leaves in Proceeding of 14th IEEE International Workshop on Quality of Service (IWQoS) 2006

[18] Lamport. L., On Interprocess Communication, Part 1: Models, Part 2: Algorirhms. Distributed Computing, 1(2):77-101, 1986.

[19] Liben-Nowell D., Balakrishnan H., and Karger D.R., Analysis of the Evolution of Peer-to-peer Systems. 21th ACM Symp. PODC, ACM press, pp 233-242, 2002.

[20] Liben-Nowell D., Karger D. R., Kaashoek M. F., Dabek F., Bal- akrishnan H. Stoica I. and Morris R. Chord: A Scalable Peer- to-peer Lookup Protocol for Internet Applications. in IEEE/ACM Transactions on Networking, 11(1): 17-32 (2003).

[21] Lynch, N. and Shvartsman A., RAMBO: A Reconfigurable Atomic Memory Service for Dynamic Networks. Proc. 16th Int’l Symposium on Distributed Computing (DISC’02), Springer-Verlag LNCS #2508, pp. 173-190, 2002.

[22] Malkhi D. and Reiter M. K. Byzantine Quorum Systems, Distributed Computing 11(4): 203-213 (1998)

[23] Mostefaoui A., Raynal M., Travers C., Peterson S., El Abbadi, Agrawal D., From Static Distributed Systems to Dynamic Systems.

24th IEEE Symposium on Reliable Distributed Systems (SRDS’05), IEEE Computer Society Press, pp. 109-119, 2005.

[24] Merritt M. and Taubenfeld G., Computing with Infinitely Many Processes. Proc. 14th Int’l Symposium on Distributed Computing (DISC’00), LNCS #1914, pp. 164-178, 2000.

[25] Shao C., Pierce E. and Welch J., Multi-writer consistency conditions for shared memory objects. Proc. 17th Int’l Symposium on Distributed Computing (DISC’03), Springer-Verlag, LNCS #2848, pp.106-120, 2003.

[26] Sousa P., Bessani A. N., Correia M, Ferreira Neves N., and Verssimo P. Highly Available Intrusion-Tolerant Services with Proactive- Reactive Recovery IEEE Transaction on Parallel and Distributed Systems21(4): pp. 452-465 (2010)

[27] Voulgaris S., Gavidia D., and van Steen M. CYCLON: Inexpensive Membership Management for Unstructured P2P Overlays. Journal of Network and Systems Management. 13(2): (2005)

Roberto Baldoni is Professor at the Uni- versity of Rome ”La Sapienza” where he leads the Distributed Systems group and the MIDLAB Laboratory. His research interests include distributed computing, dependable and secure distributed systems, distributed information systems and distributed event based processing. Roberto’s research at the University of Rome has been funded along the years by the European Commission, the Ministry of Italian Research, IBM, Microsoft, Finmeccanica and Telecom Italia. In 2010, he received the Sci- ence2business Award and the IBM Faculty Award. Roberto is author of around 150 research papers from theory to practice of distributed systems. Roberto belongs to the Steering Committee of ACM DEBS that he chaired in 2008 and he is a member of ACM, IEEE and of the IFIP WG 10.4.

Silvia Bonomi is a PhD in Computer Sci- ence at the University of Rome La Sapienza.

She is doing research on various computer- science fields including dynamic distributed systems and event-based systems. In these research fields, she published several papers in peer reviewed scientic forums. As a part of the MIDLAB research group, she is currently involved in an EU-funded project dealing with energy saving in private and public buildings (GreenerBuildings project) and she worked on dependable distributed systems (ReSIST network of excellence) and on the definition of new semantic tools for e-Government (Se- manticGov).

Michel Raynal is a professor of computer science at the University of Rennes, France.

His main research interests are the basic principles of distributed computing systems.

He is a world leading researcher in the domain of distributed computing. He is the author of numerous papers on distributed computing (more than 120 in journals and 250 papers in int’l conferences) and is well- known for his distributed algorithms and his (9) books on distributed computing. He has chaired the program committee of the major conferences on the topic (e.g., ICDCS, DISC, SIROCCO, and OPODIS). He has also served on the program committees of many international conferences, and is the recipient of several ”Best Paper” awards (ICDCS 1999, 2000 and 2001, SSS 2009, Europar 2010). He has been invited by many universities all over the world to give lectures on distributed comput- ing. His h-index is 45. He has recently written two books published by Morgan & Clayppool: ”Communication and Agreement Abstractions for Fault-Tolerant Asynchronous Distributed Systems” (June 2010) and ”Fault-Tolerant Agreement in Synchronous Distributed Systems”

(September 2010). Since 2010, Michel Raynal is a senior member to the prestigious ”Institut Universitaire de France”

Riferimenti

Documenti correlati

In this paper, we consider a distributed system composed of n servers implementing a distributed storage service, where at any given time t, the number of servers that can be

If a server invokes the join() operation, and does not leave the system for at least 3δ time units, or a client invokes the read() operation, or invokes the write () operation and

When a process p i wants to execute an update, it first per- forms a get()operation, in order to obtain the most updated sequence number, updates the local variables and then it

Considering that (i) active processes participating in the computation at time t B (join i ) can be replaced along time by processes joining after t B (join i ), (ii) all these

Consider a class of message passing protocols implementing a set S using finite memory, we show that it does not exists any algorithm, belonging to such a class, in an

Then to show that even in presence of failures the protocol assures eventual strong connectivity it is also shown that: any restore occurs in a finite time (Lemma 2) and at the end

If a process invokes the join() operation and does not leave the system for at least 3δ time units, or invokes the read() operation, or invokes the write () operation and does not

Upon this churn model, we prove that: (i) a regular register cannot be build in an asynchronous system, (ii) a regular register can be implemented in an eventually