• Non ci sono risultati.

Implementing Distributed Computing Abstractions in the presence of Churn: the Case of the Regular Register

N/A
N/A
Protected

Academic year: 2021

Condividi "Implementing Distributed Computing Abstractions in the presence of Churn: the Case of the Regular Register"

Copied!
23
0
0

Testo completo

(1)

Implementing Distributed Computing Abstractions in the presence

of Churn: the Case of the Regular Register

Roberto BALDONI Silvia BONOMI Michel RAYNAL

Universit´a La Sapienza, via Ariosto 25, I-00185 Roma, Italy {baldoni, bonomi}@dis.uniroma1.it

IRISA, Universit´e de Rennes, Campus de Beaulieu, F-35042 Rennes, France raynal@irisa.fr

Abstract

Due to their capability to hide the complexity generated by the messages exchanged between pro- cesses, shared objects are one of the main abstractions provided to the developers of distributed applica- tions. Implementations of such objects, in modern distributed systems, have to take into account the fact that almost all services implemented on the top of distributed infrastructures are no longer fully man- aged due to either their size or their maintainance cost. Therefore these infrastructures exhibit several autonomic behaviors in order to, for example, tolerate failures and continuous arrival and departure of nodes (churn phenomenon). Among all the shared objects, the register object is fundamental. Several protocols have been proposed to build fault resilient registers on top of message-passing system, but, unfortunately, failure are not the only challenge in modern distributed systems. New issues arise from the dynamicity introduced in the system by the continuous arrival and departure of nodes (churn phe- nomenon). This paper addresses the construction of a single writer/multiple readers regular register in a distributed system affected by the continuous arrival/departure of participants. In particular, a general protocol implementing a regular register is proposed and feasibility conditions on the arrival and depar- ture of the processes are given. Interestingly, the protocol is proved correct under the assumption that the constraint on the churn is satisfied.

1 Introduction

Context. Dealing with failure has been one of the main challenge in the construction of real reliable applica- tions able to work in a distributed system. These applications are inherently managed, in the sense that, they need the existence of a superior manager (i.e., the application/service provider) that has the control on the processes running the application. The provider does its best to guarantee along the time the verification of the assumptions of the underlying distributed system (e.g. a majority of correct processes) by, for example, adding new processes when some crashes and by keeping the required degree of synchrony on the underly- ing distributed platform in terms of processes and communication links. Air traffic control, telco systems, banking systems, e-government are examples of such application areas. Under this context, robust abstrac- tions have been defined (shared memory, communication, agreement, etc.) that behave correctly despite asynchrony and failures and that simplify application design and development. When considering protocols implementing such abstractions, in nearly all the cases, the system is always “well defined” in the sense that the whole set of participating processes is finite and known (directly or transitively) by each process. The

(2)

system composition is modified only when either a process crashes or new process is added to the system.

Therefore if a process does not crash, this process lives for the entire time being of the computation.

Motivation. A new challenge is emerging due to the advent of new classes of applications and technologies such as smart environment, sensors networks, mobile systems, peer-to-peer systems, cloud computing etc.

In these settings the environment cannot be fully managed but it needs some degree of self-management being the type and the quantity of this self management specific of each setting. However, we can delineate some common consequences of the presence of such self management: first, there is no entity that can fully ensure the validity of the underlying system assumptions along the time and, second, no one knows accurately who joins and who leaves the system at any time bringing to an unpredictability of the system composition (this phenomenon of arrival departure of processes in a system is also known as churn).

As consequence of such unpredictability, distributed computing abstractions have to deal not only with asynchrony and failures, but also with this dynamic dimension where a process that does not crash can leave the system at any time implying that membership can fully change several times along the time being of the same computation. Moreover, this dynamic behavior means each process does not have a either a precise membership nor the knowledge on the number of processes forming the system at any given time.

Thus, it becomes of primary importance to check under which churn assumption a protocol implementing a distributed computing abstraction is correct. Hence, the abstractions and protocols implementing them have to be reconsidered to take into account this new “adversary” setting.

This self-defined and continuously evolving distributed system, that we will name in the following dy- namic distributed system, makes abstractions more difficult to understand and master than in distributed systems where the set of processes is fixed and known by all participants. The churn notion becomes thus a system parameter the aim of which is to make tractable systems whose composition evolves with time (e.g., [14, 19, 21]). Although already some protocols have been designed for dynamic distributed message-passing systems, few papers (such as [1, 4, 25]) strive to present models suited to such systems, and extremely few protocols have been proved correct. While up to now the most common approach used to address dynamic systems is mainly experimental

Contribution and roadmap. In this paper, we first introduce a very generic model of churn, based on a joining and a departure distributions, in which the number of processes in the system remains, at any time, in a given range [n0− k1, n0 + k2] where n0 is the initial number of processes at time t0 and k1 and k2 represent two positive integers. Once the range is set, this turns out in constraints on joining and departure distributions. Interestingly, this churn model is able to capture the infinite arrival model presented in [26]

and the constant churn model presented in [14, 5] as specific cases. Secondly, we take the implementation of a single writer/multiple reader regular register and we prove that:

1. a regular register cannot be build in an asynchronous system in the presence of churn;

2. a regular register can be implemented in a synchronous distributed system if at any time there is at least one active process (i.e., a process that terminated a join operation successfully) and the churn is below a certain threshold. This threshold depends on the underlying communication systems and on the implementation itself. The presence of one active process is necessary to propagate the value of the register along the time;

3. a regular register can be implemented in an eventually synchronous distributed system if at any time there is a number of active processes greater than d(n0+ k2)/2e. Interestingly these results point out the price payed for loosing synchrony. The paper indeed shows that the presence at any time in the system of at least d(n0+ k2)/2e active processes is necessary to terminate correctly any operation.

(3)

The paper is structured as follows: Section 2 introduces the notion of register and the dynamic distributed system model while Section 3 defines the churn model. In section 4 an implementation of regular register in a synchronous system and its correctness proof is given, Section 5 show the impossibility to build a regular register in a fully asynchronous systems and Section 6 show a protocol for a regular register and its correctness proof in an eventually synchronous system. Finally, a related work and a concluding remark sections conclude the paper.

2 Regular Register in Dynamic Distributed Systems

Registers. A register is a shared object by a set of processes. Such an object provides processes with two operations, namely read() and write(), that allow them to read the value contained in the object or to modify such a value.

According to the value domain of the register, the set of processes that are allowed to read it, the ones that are allowed to write it, and the specification of which value is the value returned by a read operation, a family of types of registers can be defined. As far as the last point (the value returned by a read operation) is concerned, Lamport has defined three types of register [17]:

• a safe register can be written by one writer only. Moreover, a read operation on such a register returns its current value if no write operation is concurrent with that read. In case of concurrency, the read can return any value of the value domain of the register (which means that a read concurrent with a write can return a value that has never been written). This type of register is very weak.

• A regular register can have any number of writers and any number of readers [24]. The writes appear as if they were executed sequentially, complying with their real time order (i.e., if two writes w1and w2are concurrent they can appear in any order, but if w1terminates before w2starts, w1has to appear as being executed before w2). As far as a read operation is concerned we have the following. If no write operation is concurrent with a read operation, that read operation returns the current value kept in the register. Otherwise, the read operation returns any value written by a concurrent write operation or the last value of the register before these concurrent writes. A regular register is stronger than a safe register, as the value returned in presence of concurrent write operations is no longer arbitrary.

Nevertheless, a regular register can exhibit what is called a new/old inversion. In Figure 1 are depicted two write operations w1and w2and two read operations r1and r2that are concurrent (r1is concurrent with w1and w2, while r2is concurrent with w2only).

w1

r1

w2

r2

Figure 1: New/old inversion

According to the definition of register regularity, it is possible that r1 returns the value written by w2

while r2returns the value written by w1.

• An atomic register is a regular register without new/old inversion. This means that an atomic register is such that all its read and write operations appear as if they have been executed sequentially, this

(4)

sequential total order respecting the real time order of the operations. (Linearizability [13] is nothing else than atomicity when we consider objects defined by a sequential specification).

In this paper, we focus our attention on a one-writer/multi-reader regular register1.

Dynamic system model. The distributed system is composed, at each time, by a bounded number of processes (also called nodes) that communicate by exchanging messages. Processes are uniquely identified (with their indexes) and they may join and leave the system at any point in time.

Processes belonging to the system may decide autonomously to participate or not to a distributed com- putation running on the distributed system. Hence, a distributed computation is formed, at each instant of time, by a subset of processes of the distributed system. A process pi, belonging to the system, that wants to participate to the distributed computation has to execute the join() operation. Such operation, invoked at some time t, is not instantaneous: it consumes time. But, from time t, the process pi can receive and pro- cess messages sent by any other process that belongs to the system and that participate to the computation.

Processes participating to the distributed computation implements a regular register abstraction.

When a process pj, belonging to the distributed computation, does no more want to participate, it leaves the computation. The leave operation is performed in an implicit way and pj leaves the computation forever and does not longer send messages. From a practical point of view, if a process that has left the computation wants to re-enter, it has to do it as a new process (i.e., with a new name).

We assume that a process does not crash during the distributed computation (i.e. it does not crash from the time it joins the system until it leaves).

In order to formalize the set of processes that participate actively to the computation we give the follow- ing definition.

Definition 1 A process is active from the time it returns from the join() operation until the time it leaves the system. A(t) denotes the set of processes that are active at time t, while A([t1, t2]) denotes the set of processes that are active during the interval[t1, t2].

Regular register specification for a dynamic system. The notion of a regular register, as specified by Lamport in [17], is not directly applicable in a dynamic distributed system because it does not consider the possibility that processes join or leave the computation and then it has to be adapted to dynamic systems.

We consider that a protocol implements a regular register in a dynamic system if the following properties are satisfied.

• Liveness: If a process invokes a read or a write operation and does not leave the system, it eventually returns from that operation.

• Safety: A read operation returns the last value written before the read invocation, or a value written by a write operation concurrent with it.

Moreover, it is assumed that a process invokes the read or write operation only after it has returned from its join() invocation [5].

1Actually, the protocols proposed in Section 4 and Section 6 work for any number of writers as long as the writes are not concurrent. Considering a single writer makes the exposition easier.

(5)

3 Churn Model

3.1 Definitions

The dynamicity due to the continuous join and leave of nodes in the system is a phenomenon identified under the name churn. This section introduces a general model able to characterize such dynamicity. The model proposed here is based mainly on the definition of two distributions (i) the join distribution λ(t) that defines the join of new processes to the system with respect to time and (ii) the leave distribution µ(t) that defines the leave of processes from the system with respect to time. Such distributions are discrete function of time:

Definition 2 (Join distribution) The join distribution λ(t) is a discrete time function that returns the number of processes that invoke the join operation at timet.

Definition 3 (Leave distribution) The leave distribution µ(t) is a discrete time function that returns the number of processes that have left the system at timet.

Let t0 be the starting time of the system. We assume that at time t0 no process joins or leaves the system then λ(t0) = 0 and µ(t0) = 0 therefore we can say that in t0 the system is composed by a set Π0 of processes and the size of the system is n0 (i.e. |Π0| = n0). Moreover, for any time t < t0 we say that λ(t) = µ(t) = 0.

Note that a static system is a system characterized by a join distribution and a leave distribution that are always equal to 0 for any time t (i.e. λ(t) = µ(t) = 0 ∀t).

As soon as the dynamicity introduced by the joins and the leaves appears in the system, it is possible to observe that the size of network and its composition can change.

Let N (t) be a discrete time function that returns the number of processes inside the system at time t.

Depending on the values of λ(t) and µ(t) we have:

Definition 4 (Node distribution) Let n0be the number of processes in the system at start timet0. N (t) is the number of processes within the system at timet for every t ≥ t0(i.e. N (t) = N (t − 1) + λ(t) − µ(t), withN (t0) = n0).

Composition of the System. The variation of the network size is not the only effect generated by the dynamicity introduced by the join and leave of nodes. The composition of network is also affected. As an example consider a network of size n, and a join distribution and a leave distribution that are constant and equal (i.e λ(ti) = µ(ti) = c, ∀i with c ≤ n ∈ N). In such a system, the dynamicity does not affect the network size, it affect only its composition. In order to capture this dynamicity effect, we define the leave percentageof the network in a given interval.

Definition 5 (LeavePercentage) Let n0 be the number of processes in the system at start timet0, letλ(t) the join distribution andµ(t) the leave distribution of the system. Let ∆t = [t, t + ∆] be a time interval.

L(∆t) is the percentage of processes that have been refreshed (i.e., that have left the system) during the interval∆t, i.e. L(∆t) =

Pt+∆

τ =tµ(τ )

N (t−1) (withN (t0) = n0).

This metric allows to state a simple lemma on the composition of the system

Lemma 1 Let ∆t = [t, t + ∆] be a time interval. If there exists a process p that is in the system at time t and does not leave for all the interval∆t then L(∆t) < 1.

(6)

3.2 Churn constraint on the upper and lower bounds on the network size

Based on the previous definitions, this section derives the constraint that a join distribution and a leave distribution have to satisfy in order the network size remains in a given interval. Let n0 be the number of processes inside the system at the start time t0 and k1, k2 be two positive integers. The aim is to model a network affected by the dynamicity whose effect is the variation of the system size in an interval ∆N that is defined by the upper bound n0+ k2and the lower bound n0− k1(i.e. ∆N = [n0− k1, n0+ k2]).

Lemma 2 Let k1andk2be two integers such thatk1, k2 ≥ 0 and let n0 be the number of processes in the system at the starting timet0. Given the join and the leave distributionλ(t) and µ(t), the node distribution N (t) is always comprised inside the interval ∆N = [n0− k1, n0+ k2] if and only if:

(c1) Pt

τ =t0µ(τ ) ≤Pt

τ =t0λ(τ ) + k1∀t, (c2) Pt

τ =t0µ(τ ) ≥Pt

τ =t0λ(τ ) − k2∀t.

Proof Due to Definition 4, we have that for each time t, N (t) = N (t − 1) + λ(t) − µ(t) and iterating, we can express the node distribution as function of n0as N (t) = n0+Pt

τ =t0λ(τ )−Pt

τ =t0µ(τ ). Constraining N (t) to be in the interval ∆N we obtain, for every t, for the lower bound

N (t) ≥ n0− k1 n0+Pt

τ =t0λ(τ ) −Pt

τ =t0µ(τ ) ≥ n0− k1 Pt

τ =t0µ(τ ) ≤ Pt

τ =t0λ(τ ) + k1, and we obtain, for every t, for the upper bound

N (t) ≤ n0+ k2 n0+Pt

τ =t0λ(τ ) −Pt

τ =t0µ(τ ) ≤ n0+ k2 Pt

τ =t0µ(τ ) ≥ Pt

τ =t0λ(τ ) − k2.

2Lemma 2

4 Regular Register in a Synchronous Systems

In this section we show an algorithm implementing a regular register in a synchronous dynamic distributed system where the join distribution and the leave distribution satisfy the two constraints of Lemma 2 (i.e. the node distribution is always upper-bounded by a constant n0+ k2and lower-bounded by a constant n0− k1).

4.1 Synchronous System Model

The system is synchronous in the following sense. The processing times of local computations (but the wait statement) are negligible with respect to communication delays, so they are assumed to be equal to 0.

Contrarily, messages take time to travel to their destination processes. Each process can communicate by means of a point-to-point communication channel or by means of a broadcast primitive.

Point-to-point communication. The network allows a process pi to send a message to another process pj as soon as pi knows that pj has entered the system. The network is reliable in the sense that it does not loose, create or modify messages. Moreover, the synchrony assumption guarantees that if pi invokes

(7)

“send m to pj” at time t, then pj receives that message by time t + δ0 (if it has not left the system by that time). In that case, the message is said to be “sent” and “received”.

Broadcast. It is assumed that the system is equipped with an appropriate broadcast communication sub- system that provides the processes with two operations, denoted broadcast() and deliver(). The former allows a process to send a message to all the processes in the system, while the latter allows a process to de- liver a message. Consequently, we say that such a message is “broadcast” and “delivered”. These operations satisfy the following property.

• Timely delivery: Let t be the time at which a process p invokes broadcast(m). There is a constant δ (δ ≥ δ0) (known by the processes) such that if p does not leave the system by time t + δ, then all the processes that are in the system at time t and do not leave by time t + δ, deliver m by time t + δ.

Such a pair of broadcast operations has first been formalized in [10] in the context of systems where process can commit crash failures. It has been extended to the context of dynamic systems in [9].

4.2 Protocol

The principle that underlies the design of the protocol is to have fast reads operations: a process willing to read has to do it locally. From an operational point of view, this means that a read is not allowed to use a wait() statement, or to send messages and wait for associated responses. Hence, albeit the proposed protocol works in all cases, it is targeted for applications where the number of reads outperforms the number of writes.

Local variables at a process pi. Each process pihas the following local variables.

• Two variables denoted registeri and sni; registeri contains the local copy of the regular register, while sniis the associated sequence number.

• A boolean activei, initialized to false, that is switched to true just after pihas joined the system.

• Two set variables, denoted repliesi and reply toi, that are used during the period during which pi

joins the system. The local variable repliesi contains the 3-uples < id, value, sn > that pi has received from other processes during its join period, while reply toi contains the processes that are joining the system concurrently with pi(as far as piknows).

Initially, n0 processes compose the system. The local variables of each of these processes pkare such that registerkcontains the initial value of the regular register2, snk = 0, activek= true, and repliesk = reply tok= ∅.

The join() operation. When a process pienters the system, it first invokes the join operation. The algorithm implementing that operation, described in Figure 2, involves all the processes that are currently present (be them active or not).

First, piinitializes its local variables (line 01), and waits for a period of δ time units (line 02); this waiting period is explained later. If registerihas not been updated during this waiting period (line 03), pibroadcasts (with the broadcast() operation) an INQUIRY(i) message to the processes that are in the system (line 05)

2Without loss of generality, we assume that at the beginning every process pkhas in its variable registerkthe value 0

(8)

and waits for 2δ time units, i.e., the maximum round trip delay (line 06)3. When this period terminates, pi updates its local variables registeri and sni to the most uptodate values it has received (lines 07-08).

Then, pi becomes active (line 10), which means that it can answer the inquiries it has received from other processes, and does it if reply to 6= ∅ (line 11). Finally, pi returns ok to indicate the end of the join() operation (line 12).

operation join(i):

(01) registeri← ⊥; sni← −1; active i ← false; repliesi← ∅; reply toi← ∅;

(02) wait(δ);

(03) if (registeri= ⊥) then (04) repliesi← ∅;

(05) broadcastINQUIRY(i);

(06) wait(2δ);

(07) let < id, val, sn >∈ repliesisuch that (∀ < −, −, sn0>∈ repliesi: sn ≥ sn0);

(08) if (sn > sni) then sni← sn; registeri← val end if (09) end if;

(10) activei← true;

(11) for each j ∈ reply toido sendREPLY(< i, registeri, sni>) to pj; (12) return(ok).

————————————————————————————————————–

(13) whenINQUIRY(j) is delivered:

(14) if (activei) then sendREPLY(< i, registeri, sni>) to pj

(15) else reply toi← reply toi∪ {j}

(16) end if.

(17) whenREPLY(< j, value, sn >) is received: repliesi← repliesi ∪ {< j, value, sn >}.

Figure 2: The join() protocol for a synchronous system (code for pi)

When a process pi receives a message INQUIRY(j), it answers pj by return sending back a REPLY(<

i, registeri, sni >) message containing its local variable if it is active (line 14). Otherwise, pi postpones its answer until it becomes active (line 15 and lines 10-11). Finally, when pireceives a messageREPLY(<

j, value, sn >) from a process pjit adds the corresponding 3-uple to its set repliesi(line 17).

The read() and write(v) operations. The algorithms for the read and write operations associated with the regular register are described in Figure 3. The read is purely local (i.e., fast): it consists in returning the current value of the local variable registeri.

The write consists in disseminating the new value v (together with its sequence number) to all the processes that are currently in the system (line 01). In order to guarantee the correct delivery of that value, the writer is required to wait for δ time units before terminating the write operation (line 02).

Why the wait(δ) statement at line 02 of the join() operation? To motivate the wait(δ) statement at line 02, let us consider the execution of the join() operation depicted in Figure 4(a). At time t, the processes pj, phand pkare the three processes composing the system, and pj is the writer. Moreover, the process pi

executes join() just after t. The value of the copies of the regular register is 0 (square on the left of pj, ph

and pk), while registeri = ⊥ (square on its left). The ‘timely delivery” property of the broadcast invoked by the writer pj ensures that pj and pkdeliver the new value v = 1 by t + δ. But, as it entered the system after t, there is no such a guarantee for pi. Hence, if pidoes not execute the wait(δ) statement at line 02, its execution of the lines 03-09 can provide it with the previous value of the regular register, namely 0. If after

3The statement wait(2δ) can be replaced by wait(δ + δ0), which provides a more efficient join operation; δ is the upper bound for the dissemination of the message sent by the reliable broadcast that is a one-to-many communication primitive, while δ0is the upper bound for a response that is sent to a process whose id is known, using a one-to-one communication primitive. So, wait(δ) is related to the broadcast, while wait(δ0) is related to point-to-point communication. We use the wait(2δ) statement to make easier the presentation.

(9)

operation read(): return(registeri). % issued by any process pi%

————————————————————————————–

operation write(v): % issued only by the writer pw%

(01) snw← snw+ 1; registerw← v broadcastWRITE(v, snw);

(02) wait(δ); return(ok).

(03) whenWRITE(< val, sn >) is delivered: % at any process pi% (04) if (sn > sni) then registeri← val; sni← sn end if.

Figure 3: The read() and write() protocols for a synchronous system

write (1)

Join()

Join Write Reply Join Write Reply read()

δ

δ δ

0 0 0

1 1 1

0 pi

pj ph pk

(a) Without wait(δ)

write (1)

Join() read()

δ

δ δ 0 0 0

1 1 1

1 pi

pj ph pk

δ

Join Write Reply Join Write Reply

(b) With wait(δ)

Figure 4: Why wait(δ) is required

obtaining 0, piissues another read it obtains again 0, while it should obtain the new value v = 1 (because 1 is the last value written and there is no write concurrent with this second read issued by pi).

The execution depicted in Figure 4(b) shows that this incorrect scenario cannot occur if pi is forced to wait for δ time units before inquiring to obtain the last value of the regular register.

4.3 Correctness

Correctness of the register has to be proved by showing liveness and safety of the implementation.

Lemma 3 Termination. If a process invokes thejoin() operation and does not leave the system for at least 3δ time units, or invokes the read() operation, or invokes the write () operation and does not leave the system for at leastδ time units, it does terminates the invoked operation.

Proof The read() operation trivially terminates. The termination of the join() and write() operations follows

from the fact that the wait() statement terminates. 2Lemma 3

Lemma 4 Let t0 be the starting time of the system and letn0be the number of processes inside the system at timet0. LetPt

τ =t−3δ+1µ(τ ) < N (t − 3δ) ∀t, then |A(t)| > 0 ∀t.

Proof At time t0 the system is composed of n0 processes and each of these process maintains the value of the register so we have that |A(t0)| = n0. By the definition of the join and leave distribution, we have that at time t0 + 1 λ(t0 + 1) processes start the join operation and µ(t0 + 1) processes leave the system then |A(t0 + 1)| = n0− µ(t0 + 1). At time t0 + 2 we have that λ(t0 + 2) more processes invoke the join and µ(t0+ 2) more processes leave the system. In the worse case, all the processes leaving the system are active hence |A(t0 + 2)| ≥ n0 − µ(t0 + 1) − µ(t0 + 2). To be completed, a join operation needs, in the worse case, 3δ time then until time t0+ 3δ the number of the active processes always decrease and

(10)

|A(t0+ 3δ)| ≥ n0−Pt0+3δ

τ =t0+1µ(τ ). At time t0+ 3δ + 1, the joins invoked at time t0+ 1 terminate and such joining processes become active so |A(t0+ 3δ + 1)| ≥ n0−Pt0+3δ+1

τ =t0+1 µ(τ ) + λ(t0+ 1). At time t0+ 3δ + 2, also the join operations invoked at time t0+ 2 terminate and λ(t0+ 2) more processes become active. By iteration, for a given t, |A(t)| ≥ n0−Pt

τ =t0+1µ(τ ) +Pt−3δ

τ0=t0+1λ(τ0) = N (t − 3δ) −Pt

τ =t−3δ+1µ(τ ).

Moreover, asPt

τ =t−3δ+1µ(τ ) < N (t − 3δ) ∀t, we have that |A(t)| > 0 for any time t. 2Lemma 4

Lemma 5 Let n0be the number of processes inside the system at timet0. Lett ≥ t0, let∆t = [t, t + ∆] a time interval and letPt+∆

τ =t+∆−3δ+1µ(τ ) < N (t + ∆ − 3δ) ∀t then |A(∆t)| > 0 ∀t.

Proof Considering that processes become active, in the worse case, 3δ time after the invocation of their join and that in every time unit there is always at least one active process (due to Lemma 4), it follows that in every time interval there is always at least one active process. 2Lemma 5

From this two Lemmas it is possible to derive a relation between the number of active processes and the refresh of the system.

Corollary 1 Let t ≥ t0, let∆t = [t, t + ∆] a time interval.

|A(∆t)| > 0 ⇔ L(∆t) < 1.

Proof From Lemma 4 and Lemma 5 we know that |A(∆t)| ≥ N (t + ∆ − 3δ) −Pt+∆

τ =t+∆−3δ+1µ(τ ). In order to have at least one active process inside the system, we have to constraint such quantity to be greater than 0 and then

N (t + ∆ − 3δ) −Pt+∆

τ =t+∆−3δ+1µ(τ ) > 0 N (t + ∆ − 3δ) > Pt+∆

τ =t+∆−3δ+1µ(τ )

Pt+∆

τ =t+∆−3δ+1µ(τ )

N (t+∆−3δ) < 1,

that is exactly the definition of L(∆t). 2Corollary 1

Lemma 6 Let the join function λ(t), the leave function µ(t) such thatPt+3δ

τ =t+1µ(τ ) < N (t) ∀t ≥ t0. When a processpi terminates the execution ofjoin(), its local variable registericontains the last value written in the regular register (i.e., the last value before thejoin() invocation), or a value whose write is concurrent with thejoin() operation.

Proof Let pibe a process that issues a join() operation. It always executes the wait(δ) statement at line 02.

Then, there are two cases according to the value of the predicate registeri = ⊥ evaluated at line 03 of the join operation.

• Case registeri 6= ⊥. We then conclude that pi has received a WRITE(< val, sn >) message and accordingly updated registeri line 04, Figure 3. As (1) the write operation lasts δ time units (line 02 of the write operation, Figure 3), (2) the join operation lasts at least δ time units, and (3) the message

WRITE(< val, sn >) - sent at the beginning of the write - takes at most δ time units, it follows from registeri 6= ⊥ that the join() and the write() operations overlap, i.e., they are concurrent, which proves the lemma for that case.

(11)

• registeri = ⊥. In that case, pibroadcasts anINQUIRY(i) message and waits for 2δ time units (lines 05-06 of the join() operation, Figure 2). Let t be the time at with pi broadcasts the INQUIRY(i) message. At the end of the 2δ round trip upper bound delay, pi updates registeri with the value associated with the highest sequence number it has received (lines 07-08). We consider two sub- cases.

Case 1: No write is concurrent with the join operation. As |A[t, t + 3δ]| > 0 (Lemma 5), A[t, t + 3δ]

is not empty. Consequently, at least one process that has a copy of the last written value answers the inquiry of pi and consequently pi sets registeri to that value by 2δ time units after the broadcast, which proves the lemma.

Case 2: There is (at least) one write issued by a process pj concurrent with the join operation. In that case, pi can receive bothWRITE(< val, sn >) messages andREPLY(< j, val, sn >) messages.

According the values received at time t + 2δ, pi will update registeri to the value written by a concurrent update, or the value written before the concurrent writes.

2Lemma 6

Now we are in the position to prove the safety of the implementation of the regular register. Informally, the following lemma states that to guarantee safety, during the join of a process, the number of processes departing from the system has to be strictly lesser than the ones joining the system.

Lemma 7 Safety. Let the join function λ(t), the leave function µ(t) such that Pt+3δ

τ =t+1µ(τ ) < N (t)

∀t ≥ t0 and consider the protocol described in Section 5. The read operation returns the last value written before the read invocation, or a value written by a write operation concurrent with it.

Proof Let us observe that a read by any process can be invoked only after that process has terminated its join operation. It follows from that observation, Lemma 6, and the lines 03-04 associated with the write operation (Figure 3) that a read always returns a value that has been written.

Let us observe that a write that starts at time t, terminates at time t + δ, and all the processes in A[t, t + δ]

have delivered the value it has written by t + δ. Considering that observation, the proof that a read operation returns the last written value or a value written by a concurrent write is similar to the proof of the previous

lemma. It is left to the reader. 2Lemma 7

Theorem 1 Let the join function λ(t), the leave function µ(t) such thatPt+3δ

τ =t+1µ(τ ) < N (t) ∀t ≥ t0. The protocol described is the Figures 2 and 3 implements a regular register in a synchronous dynamic distributed system.

Proof The proof follows directly from Lemmas 3 and 7. 2T heorem 1

4.4 Discussion

In this section we have proposed a protocol to implement a regular register in a dynamic distributed system where the network size changes in a given range. In order to propagate the value of the register along time, such protocol only requires that at least one process is active in every time unit (i.e. |A(t)| ≥ 1 ∀ t). Note that such active process can change over time: the protocol only requires that at least an active process participates to the computation and it does not require that such a process is always the same.

(12)

Another interesting point of the proposed implementation is that it does not give any constraint on the value of k1and k2. Hence, in a synchronous system it is possible to run this algorithm independently from the size variation.

Note that if we consider the particular case where the network size does not change (i.e. k1 = k2 = 0) and n0c processes (where n0 is the number of processes inside the system at time t0and c is a percentage of nodes) invokes the join operation at each time unit and n0c processes leave the system at each time unit (i.e. λ(t) = µ(t) = n0c) we fall down in the results of [5].

5 Asynchronous System

This section shows that (not surprisingly) it is not possible to implement a regular register in a fully asyn- chronous dynamic system. Such a system is similar to the synchronous system defined in Section 4 with the following difference: there are no bound on message transfer delays such as δ and δ0.

5.1 Impossibility Result

Theorem 2 It is not possible to implement a regular register in a fully asynchronous dynamic system.

Proof The proof is a consequence of the impossibility to implement a register in a fully asynchronous static message-passing system prone to any number of process crashes [3]4. More specifically, if processes enter and leave the system, due to the absence of an upper bound on message transfer delays, it is easy possible a run in which the value obtained by a process is always older than the last value whose write is terminated.

2T heorem 2

6 Eventually Synchronous Systems

6.1 Eventually Synchronous System Model

In static systems, an eventually synchronous distributed system5is a system that after an unknown but finite time behaves synchronously [6, 8]. We adopt the same definition for dynamic systems. More precisely:

• As in an asynchronous system, there is a time notion (whose domain is the set of integers), but this time notion is inaccessible to the processes.

• Eventual timley delivery: There is a time t and a bound δ such that any message sent (broadcast) at time t0 ≥ t, is received (delivered) by time t0 + δ to the processes that are in the system during the interval [t0, t0+ δ].

It is important to observe that the date t and the bound δ do exist but can never be explicitly known by the processes. In this section we show an algorithm implementing a regular register in an eventually syn- chronous dynamic distributed system where the join distribution and the leave distribution satisfy the two constraints of Lemma 2 (i.e. the node distribution is always upper-bounded by a constant n0 + k2 and lower-bounded by a constant n0− k1).

4This paper shows also that such a construction is possible as soon as a majority of processes do not crash.

5Sometime also called partially synchronous system.

(13)

6.2 Protocol

As consequence of the impossibility to know when the system behaves as synchronous, it is not possible to rely on the passage of time combined with a safe known bound on message transfer delay. Hence the protocol has to be based on acknowledgment messages. As in the synchronous case, it is allowed to any process to write under the assumption that no two processes write concurrently (this assumption could be implemented with appropriate quorums, but we do not consider its implementation in this paper).

Local variables at a process pi. Each process pihas to manage local variables, where (as before) registeri

denotes the local copy of the regular register. In order to ease the understanding and the presentation of the protocol, the control variables that have the same meaning as in the synchronous protocol are denoted with the same identifiers (but their management can differ). Those are the variables denoted sni, activei, repliesi and reply toi in Section 4. In addition to these control variables, the protocol uses the following ones.

• read sni is a sequence number used by pi to distinguish its successive read requests. The particular value read sni = 0 is used by the join operation.

• readingiis boolean whose value is true when piis reading.

• write acki is a set used by pi (when it writes a new value) to remember the processes that have acknowledged its last write.

• dl previis a set where (while it is joining the system) pirecords the processes that have acknowledged its inquiry message while they were not yet active (so, these processes were joining the system too) or while they are reading. When it terminates its join operation, pihas to send them a reply to prevent them to be blocked forever.

Remark. In the following, the processing associated with a message reception (if the message has been sent) or a message delivery (if the message has been broadcast) appears in the description of one of the operations join(), read(), or write(). But the sending (or broadcast) of the message that triggers this processing is not necessarily restricted to that operation.

The join() operation. The algorithm implementing this operation is described in Figure 5. (The read algorithm -Figure 6- can be seen as a simplified version of it.)

After having initialized its local variables, the process pibroadcasts anINQUIRY(i, read sni) message to inform the other processes that it enters the system and wants to obtain the value of the regular register (line 04, as indicated read sni is then equal to 0). Then, after it has received “enough” replies (line 05), piproceeds as in the synchronous protocol: it updates its local pair (registeri, sni) (lines 06-07), becomes active (line 08), and sends a reply to the processes in the set reply toi (line 09-11). It sends such a reply message also to the processes in its set dl previ to prevents them from waiting forever. In addition to the tern < i, registeri, sni >, a reply message sent to a process pj, from a process pi, has now to carry also the read sequence number r sn that identifies the corresponding request issued by pj.

The behavior of pi when it receives anINQUIRY(j, r sn) message is similar to one of the synchronous protocol, with two differences. The first is that it always sends back a message to pj. It sends aREPLY() message if it is active (line 14 - 15), and aDL PREV() if it not active yet (line 20). The second difference is that, if piis reading, it also sends aDL PREV() message to pj (line 16); this is required to have pj sends to pithe value it has obtained when it terminates its join operation.

(14)

operation join(i):

(01) registeri← ⊥; sni← −1; activei← false; readingi← false;

(02) repliesi← ∅; reply toi← ∅; write acki← ∅;

(03) dl previ← ∅; read sni← 0;

(04) broadcastINQUIRY(i, read sni);

(05) wait until`|repliesi| > C);

(06) let < id, val, sn >∈ repliesi

such that (∀ < −, −, sn0>∈ replies i : sn ≥ sn0);

(07) if (sn > sni) then sni← sn; registeri← val end if (08) activei← true;

(09) for each < j, r sn >∈ reply toi∪ dl prevido

(10) do sendREPLY(< i, registeri, sni>, r sn) to pj

(11) end for;

(12) return(ok).

—————————————————————————————————

(13) whenINQUIRY(j, r sn) is delivered:

(14) if (activei) then

(15) sendREPLY(< i, registeri, sni>, r sn) to pj

(16) if (readingi) then

(17) sendDL PREV(i, r sn) to pj

(18) end if;

(19) else reply toi← reply toi∪ {< j, r sn >};

(20) sendDL PREV(i, r sn) to pj

(21) end if.

(22) whenREPLY(< j, value, sn >, r sn) is received:

(23) if ((r sn = read sni) then

(24) repliesi← repliesi ∪ {< j, value, sn >};

(25) sendACK(i, r sn) to pj

(26) end if.

(27) whenDL PREV(j, r sn) is received:

(28) dl previ← dl previ ∪ {< j, r sn >}.

Figure 5: The join() protocol for an eventually synchronous system (code for pi)

(15)

When pi receives aREPLY(< j, value, sn >, r sn) message from a process pj, if the reply message is an answer to itsINQUIRY(i, read sn) message (line23), piadds < j, value, sn > to the set of replies it has received so far and sends back anACK(i, r sn) message to pj (line 24 - 25).

Finally, when pireceives a messageDL PREV(j, r sn), it adds its content to the set dl previ (line 28), in order to remember that it has to send a reply to pj when it will become active (lines 09-10).

The read() operation. The read() and join() operations are strongly related. More specifically, a read is a simplified version of the join operation6. Hence, the code of the read() operation, described in Figure 6, is a simplified version of the code of the join() operation.

Each read invocation is identified by a pair made up of the process index i and a read sequence number read sni (line 03). So, pi first broadcasts a read request READ(i, read sni). Then, after it has received

“enough” replies, pi selects the one with the greatest sequence number, updates (if needed) its local pair (registeri, sni), and returns the value of registeri.

When it receives a message READ(j, r sn), pi sends back a reply to pj if it is active (line 09). If it is joining the system, it remembers that it will have to send back a reply to pj when it will terminate its join operation (line 10).

operation read(i):

(01) read sni← read sni+ 1;

(02) repliesi← ∅; readingi← true;

(03) broadcastREAD(i, read sni);

(04) wait until(|repliesi| > C);

(05) let < id, val, sn >∈ repliesi

such that (∀ < −, −, sn0>∈ repliesi: sn ≥ sn0);

(06) if (sn > sni) then sni← sn; registeri← val end if;

(07) readingi← false; return(registeri).

——————————————————————————————————

(08) whenREAD(j, r sn) is delivered:

(09) if (activei) then sendREPLY(< i, registeri, sni>, r sn) to pj

(10) else reply toi← reply toi∪ {< j, r sn >}

(11) end if.

Figure 6: The read() protocol for an eventually synchronous system (code for pi)

The write() operation. The code of the write operation is described in Figure 7. Let us recall that it is assumed that a single process at a time issues a write.

When a process piwants to write, it issues first a read operation in order to obtain the sequence number associated with the last value written (line 01)7. Then, after it has broadcast the message WRITE(i, <

v, sni >) to disseminate the new value and its sequence number to the other processes (line 04), pi waits until it has received “enough” acknowledgments. When this happens, it terminates the write operation by returning the control value ok (line 05).

When it is delivered a messageWRITE(j, < val, sn >), pi takes into account the pair (val, sn) if it is more uptodate than its current pair (line 08). In all cases, it sends back to the sender pj a message ACK

(i, sn) to allow it to terminate its write operation (line 09).

When it receives a messageACK(j, sn), piadds it to its set write ackiif this message is an answer to its last write (line 11).

6As indicated before, the “read” identified (i, 0) is the join() operation issued by pi.

7As the write operations are not concurrent, this read obtains the greatest sequence number. The same strategy to obtain the last sequence number is used in protocols implementing an atomic registers (e.g., [3, 9]).

(16)

operation write(v):

(01) read(i);

(02) sni← sni+ 1; registeri← v;

(03) write acki← ∅;

(04) broadcastWRITE(i, < v, sni>);

(05) wait until(|write acki| > C);

(06) return(ok).

———————————————————————————

(07) whenWRITE(j, < val, sn >) is delivered:

(08) if (sn > sni) then registeri← val; sni← sn end if;

(09) sendACK(i, sn) to pj.

(10) whenACK(j, sn) is received:

(11) if (sn = sni) then write acki← write acki∪ {j} end if.

Figure 7: The write() protocol for an eventually synchronous system (code for pi)

6.3 Correctness

The structure of the proof is as follows: first we compute the necessary number of messages C that a process needs to wait in order to terminate a join() operation or a read() operation (Lemma 8) and then we compute the necessary number of messages C to ensure the validity property of the regular register (Lemma 9).

Lemma 10, Lemma 11 and Lemma 12 show respectively that, waiting a number of messages according with Lemma 8 and Lemma 9, if a ”majority” of active processes is always active then join() operations, read() operations and write() operations terminate. Finally, Theorem 3 and Theorem 4 state respectively the termination and the validity property of a regular register.

Lemma 8 Let op be a join() operation or a read() operation issued on the regular register. Let C the number of replies in the protocols of Figures 5 and 6. Letn0 be the number of processes belonging to the computation at timet0and letk1andk2two positive integers. It is necessary condition to ensure termination ofop that C ≤ n0− k1.

Proof Let t be the time at which the system reaches the minimum size n0− k1and let op be the first join() operation issued by a process piafter time t.

Let us suppose by contradiction that C > n0 − k1 and op terminates. A join() operation terminates when |replyi| > C (line 05, Figure 5). Initially empty, replyi is filled when a reply message is received (line 24, Figure 5). Such a message is sent by an active process pjto piwhen an inquiry message is received (lines 10 and 15, Figure 5). The inquiry message is sent by pi at the beginning of the join operation and, for the broadcast property, it is deterministically delivered only by processes belonging to the system at that time. Since at the time t N (t) = n0− k1, |A(t)| ≤ N (t) and op is the first join after t, we have that at most n0− k1 processes will receive the inquiry and generate the reply. Hence pi will receive at most n0− k1 replies remaining blocked. The same reasoning is applied to all the join operations issued after time t then no more processes become active and all the join will be blocked waiting for ever.

Since a read is only a simplified version of a join, we can repeat the same reasoning also for read operations: all the read operations issued after time t will wait for ever.

Note that the system size still change after time t as effect of the churn but the set of active processes can only decrease (due to leave of processes).

2Lemma 8

(17)

Lemma 9 Let C the number of replies in the protocols of Figures 5, 6 and 7. Let n0 be the number of processes belonging to the computation at timet0 and letk1 andk2 two positive integers. It is necessary condition to ensure validity thatC > n0+k2 2.

Proof Let us suppose by contradiction that C ≤ n0+k2 2 and a read() operation op returns the last value written or a value concurrently written. Let t0 be the time at which the system reaches the maximum size n0+ k2 and let oprbe a read operation issued at time t0 from a process pr. Let opw be the last operation terminated before pr. From the algorithm of Figure 7 opw terminates when pw has received at least C acknowledgment messages (line 05) and C is at most n0+k2 2 meaning that at most n0+k2 2 processes have received the value written and executed the update of the local copy of the register (line 08 Figure 7). In order to terminate the read operation opr, prneeds to receive at least C replies (line 04 Figure 6) and C is at most n0+k2 2. Considering that at the beginning of the read n0+ k2 processes belong to the computation and that at most n0+k2 2 of them stores the new written value, it is possible that all the replies that prwill receives, come from the n0+k2 2 processes that does not store the last value. Hence the read may return a value that is not the last one.

2Lemma 9

Corollary 2 Let C the number of replies in the protocols of Figures 5, 6 and 7. Let n0 be the number of processes belonging to the computation at timet0and letk1 andk2 two positive integers. IfC satisfies the constraints of Lemma 8 and Lemma 9 thenn0 > 2k1+ k2.

Proof The proof trivially follows by combining the two condition on C of Lemma 8 and Lemma 9.

2Corollary 2

Lemma 10 Let n0 > 2k1+ k2and letC = n0+k2 2 + 1. Let us assume that a process that invokes the join() operation remains in the system for at least3δ time units. If ∀ t : |A(t)| > n0+k2 2 then a processpi that invokes thejoin() operation and does not leave the system, this join operation terminates.

Proof Let us first observe that, in order to terminate its join() operation, a process pi has to wait until its set repliesicontains C = n0+k2 2 elements (line 05, Figure 5). Empty at the beginning of the join operation (line 01, Figure 5), this set is filled in by pi when it receives the correspondingREPLY() messages (line 24 of Figure 5).

A process pjsends aREPLY() message to piif (i) either it is active and has received anINQUIRYmessage from pi, (line 14, Figure 5), or (ii) it terminates its join() join operation and < i, − >∈ reply toj∪ dl prevj (lines 09-10, Figure 5).

Let us suppose by contradiction that |repliesi| remains smaller thann0+k2 2. This means that pidoes not receive enoughREPLY() carrying the appropriate sequence number. Let t be the time at which the system becomes synchronous and let us consider a time t0 > t at which a new process pj invokes the join operation.

At time t0, pj broadcasts anINQUIRYmessage (line 04, Figure 5). As the system is synchronous from time t, every process present in the system during [t0, t0+ δ] receives suchINQUIRYmessage by time t0+ δ.

As it is not yet active when it receives pj’sINQUIRYmessage, the process piexecutes line 20 of Figure 5 and sends back aDL PREVmessage to pj. Due to the assumption that every process that joins the system remains inside for at least 3δ time units, pjreceives pi’sDL PREVand executes consequently line 27 (Figure 5) adding < i, − > to dl prevj.

Riferimenti

Documenti correlati

With respect to the actual conditions for larger pillars, the moment of complete destruction of the sample corresponded to the first peak in the pressure gradient measured in

Analysis characteristics determination of electrohydraulic control system operation to reduce the operation time of a powered roof support.. Dawid

If a server invokes the join() operation, and does not leave the system for at least 3δ time units, or a client invokes the read() operation, or invokes the write () operation and

If a process invokes the join() operation and does not leave the system for at least 3δ time units, or invokes the read() operation, or invokes the write () operation and does not

Abbiamo infatti notizia di come, durante la reggenza del Regno da parte di Roberto II conte di Artois (1285-1289), Giovanni de Virgilio, un procuratore del

Dunque, a disegnare - certamente in maniera non esaustiva, ma neppure affidandosi alla casualità - un discorso sulle Alpi del pieno medioevo, furono scelte tre prospettive

The possibility to assess the contribution of brain histamine (HA) to specific behavioral changes is a complex task, taking into consideration the broad modulatory

«Il Campanile della Chiesa di S. Lorenzo insiste sull’ultima crociera di destra e risulta con molta probabilità iniziato con la ricostruzione dell’edificio fatta nella seconda