• Non ci sono risultati.

Set Objects in Eventually Synchronous Distributed Systems with Churn and Continuous Accesses

N/A
N/A
Protected

Academic year: 2021

Condividi "Set Objects in Eventually Synchronous Distributed Systems with Churn and Continuous Accesses"

Copied!
21
0
0

Testo completo

(1)

Set Objects in Eventually Synchronous Distributed Systems

with Churn and Continuous Accesses

Roberto Baldoni, Silvia Bonomi, Michel Raynal

Sapienza Universit`a di Roma, Via Ariosto 25, 00185 Roma, Italy

IRISA, Univerist´e de Rennes 1 Campus de Beaulieu, 35042 Rennes Cedex, France {baldoni,bonomi}@dis.uniroma1.it, raynal@irisa.fr

MIDLAB - Technical Report 7/2010

Abstract

Shared objects are one of the main abstractions provided to the developers of applications on the top of message-passing distributed systems. Recent large-scale long-live applications such as data-mining, network analytics, event processing etc., change drastically both the setting where these objects have to be deployed and the requirements on their implementation. Applications and objects run indeed continuously in an unmanaged (or partially managed) environment (e.g. networked datacenters). This means that an object will be continuously and frequently accessed (i.e., object accesses are not quiescent) and the set of nodes implementing objects may change continuously due to their continuous arrival and departure (i.e., churn is not quiescent). Additionally to sustain high throughput objects’ implementation has to be in-memory. This paper considers a ”set object”, namely a shared object that allows processes to add and remove values as well as take a snapshot of its content. We show that implementing such a set object under continuous accesses and churn in an eventually synchronous distributed systems using finite memory is impossible. Then we introduce a k-bounded set object, namely a set that has limited memory of the execution history. We present an implementation of this object and prove its correctness. The interest of the k-bounded set lies in the fact that it can be used to build useful abstractions for dynamic distributed systems, such as an eventual participant detector, as well as k can be used as a parameter to trade the in-memory availability on the nodes used to implement the object with the need to approximate precisely a set object.

Keywords : Set object, k-bounded set object, Continuous accesses, Churn, Consistency condition, Dynamic distributed system, Infinite arrival model, Eventually Synchronous system.

1 Introduction

Context and Motivation. One of the main aspects of `a-la-google distributed computing is to help non-stop in-memory processing of data objects, possibly residing on a large number of machines deployed on the top of a message-passing distributed system (e.g. datacenters). These data are continuously and frequently updated and retrieved by hundreds or thousands of users (either processes or software modules) for applica- tive, profiling, mining or accounting purposes. The traffic accessing such a data object is continuous and may reach hundreds of thousands of requests per second (data object accesses are non-quiescent) and, to sustain such a high throughput an object implementation has to be in-memory. Additionally, these objects are implemented in a setting with an high degree of self-management due to the huge number of software

(2)

and hardware components. As a consequence, data objects must tolerate continuous arrivals and departures of processes due to failures, maintenance procedures, etc. Said differently, the object implementation must resist to continuous churn (churn is non-quiescent). Hence, there is the need to define suitable data structures that support concurrent accesses guaranteeing, at the same time, a degree of consistency which is adequate to solve a given problem and whose implementation is both done using finite memory and working in the presence of continuous churn and without assumption on a bound on the number of accesses.

The set Object S. A set object S is a shared object that stores a (possibly empty) finite set of values.

A process can acquire the content of S through a get operation while it can add (remove) an element to S through an add (remove) operation. S guarantees the value-based sequential consistency introduced in [4]. Informally this condition allows concurrent get operations to return the same output in the absence of concurrent add and/or remove operation. This condition is weaker than linearizability [12]. This is because processes are required to see the same order only for concurrent add and remove operations that are on a same value. Concurrent operations executed on distinct values can be perceived in different order by distinct processes.

Contribution of the paper. The first contribution is an impossibility result. Consider a class of message passing protocols implementing a set S using finite memory, we show that it does not exists any algorithm, belonging to such a class, in an eventually synchronous distributed system with non-quiescent churn and continuous accesses.

Then the paper presents a weaker form of set called k-bounded set object, which is implementable in an eventually synchronous distributed system with churn, continuous accesses and finite memory. Infor- mally, a k-bounded set object is a set that has limited memory of the execution history. In particular, given a get()operation, a k-bounded set behaves as a set where the execution history is limited only to the k most recent update operations. In this way, k can be used as a parameter to trade in-memory availability on the node, used to implement the object, with the need to approximate precisely a set object. We show addition- ally how a k-bounded set can be used to implement interesting abstractions such as participant detectors in dynamic distributed systems [6].

2 Related Work

Several works have been done recently in the general context of dynamic distributed systems. Some of them related to the modeling of churn (e.g., [13], [15]) others address the implementation of concurrent data structures on wired message passing dynamic systems (eg., [1], [2], [17]). All the latter works address the problem of implementing a register data structure.

Regular Registers in Dynamic Distributed Systems. In [17], the authors deal with the implementation of an atomic R/W object where the churn is abstracted by the notion of system reconfiguration. A reconfigura- tion could happen every time there is a process joining or leaving the system. To be valid a reconfiguration requires processes to agree upon an unique sequence of configuration changes. This agreement implies the need of consensus and thus cannot be implemented in a fully asynchronous system. In [1] Aguilera et al. show that an atomic R/W object can be realized without consensus and, thus, on a fully asynchronous distributed system provided that the number of reconfiguration operations is finite and thus the churn is qui-

(3)

escent1. Finally in [2] we proved that if the churn is not quiescent, it is not possible to implement a regular register in a fully asynchronous system.

From Registers to Sets. In [2], we implemented a regular register in an eventually synchronous distributed systems with continuous churn while we show in this paper that a set object is impossible to be realized in the same churn condition. The impossibility stems from the non-quiescence of the set accesses. In a register implementation, such a dimension does not play any role because the register does not have to store anything else that the last written value. This is not true for a set that has to be able to reconstruct all the update operation history. If the update operations are continuous, this implies that the set implementation needs infinite memory.

Let us note that a weaker notion of set, namely weak set, has been introduced by Delporte and Foucon- nier in [8]. A weak set is a non-linearizable object representing a restricted form of set that does not include remove operations. The authors show how it is possible to implement a weak-set in a system with no churn, by using a finite number of atomic registers assuming either (i) the number of processes is finite and known or (ii) the domain of the weak set is bounded and known by processes. The system model considered in this paper assumes (i) the domain of a set is finite but not bounded and (ii) the presence of continuous churn. In both cases, it is not possible to define a finite number of registers necessary to implement the weak set.

3 The System Model

Distributed System. The distributed system is composed, at each time, by a bounded number of processes that communicate by exchanging messages. Processes are uniquely identified (with their indexes), has finite memory space for local computation, and they may join and leave the system at any point in time. The processing times of local computations are negligible with respect to communication delays, so they are assumed to be equal to 0. Contrarily, messages take time to travel to their destination processes. The dis- tributed system is eventually synchronous, that is after an unknown but finite time, it behaves synchronously [7, 10]. Similarly to [11], we consider:

(i) there is a time notion (whose domain is the set of integers), but this time notion is inaccessible to the processes.

(ii) Eventual timely delivery: There is a time t and a bound δ such that any message sent (broadcast) at time t0 ≥ t, is received (delivered) by time t0+ δ to the processes that are in the system during the interval [t0, t0+ δ]. It is important to observe that, while t and δ exist, they are not known by processes.

Distributed Computation. A distributed computation is formed, at each instant of time, by a subset of processes of the distributed system. A process p, belonging to the system, that wants to participate to the distributed computation has to execute a join() operation. Such an operation, invoked at some time t, is not instantaneous: it consumes time. But, from time t, the process p can receive and process messages related to the computation.

A process leaves the computation in an implicit way. When it does, it leaves the computation forever and does no send computation messages anymore. From a practical point of view, if a process wants to re-enter the system, it has to enter it as a new process (i.e., with a new name). We assume that a process does not crash during the distributed computation (i.e. it does not crash from the time it joins the system until it leaves)2. The set of processes that actively participate to the computation is defined as follows.

1This assumption has been also employed in [19] and in [17].

2Actually in the proof we just need processes do not fail during the execution of add and remove operations.

(4)

Definition 1 A process is active from the time it returns from the join() operation until the time it leaves the system. A(t) denotes the set of processes that are active at time t, while A([t1, t2]) denotes the set of processesp such that p ∈ A(τ ) for each time τ in the interval [t1, t2].

Continuous Churn Model As in [3], we model the churn of the system by means of the join distribution λ(t), the leave distribution µ(t) and the node distribution N (t) [3]. The join and the leave distribution are deterministic discrete functions of the time that return, for any time t, respectively the number of processes that have invoked the join operation at time t and the number of processes that have left the system at time t. The node distribution returns, for every time t, the number of processes inside the system and it is defined as N (t) = N (t − 1) + λ(t) − µ(t). We assume, at the beginning, n0processes are inside the system (i.e.

N (t0) = n0) and we assume that, for each time t, λ(t) = µ(t) = c × n0 (where c ∈ [0, 1] is a percentage of node of the system) meaning that, at each time unit, the number of processes that join the system is the same as the number of process that leave it, i.e. the number of processes inside the system N (t) is always equal to n0.

Continuous set access. In our model, an active process does not stop doing operations on the set (accesses to the set are non-quiescent). I.e., it does not exists a time t after that an active process stops issuing operations on the set. This implies that the number of operations issued by active processes on the set during an execution cannot be bounded.

4 The set object S

A set object S is a shared object used to store values. Without loss of generality, we assume that (i) S contains only values taken from an arbitrary finite domain (i.e. a subset of integers) and (ii) at the beginning of the computation S is empty. Given a set object S, any process can add, remove or get values to/from the set. More precisely: The add operation, denoted add(v), takes an input parameter v and returns a confirmation that the operation has been executed. It adds v to S. If v is already in the set, the add operation has no effect. The remove operation, denoted remove(v), takes an input parameter v and returns a confirmation that the operation has been executed. If v belongs to S, it suppresses it from S. Otherwise it has no effect. The get operation, denoted get(), takes no input parameter. It returns the current content of S without modifying its.

Generally, none of these operation is instantaneous. We assume that every process executes operations sequentially (i.e., a process does not invoke any operation before it has received a response from its previous invocation). Hence, given two operations executed by two different processes, they may overlap and the current content of the set may be not univocally defined.

Every operation can be characterized by two events occurring at its boundary: an invocation event and a reply event. These events occur at two time instants (invocation time and reply time). According to these time instants, it is possible to state when two operations are concurrent with respect to the real time execution. For ease of presentation we assume the existence of a fictional global clock (not accessible by processes). The invocation time and response time of every operation are defined with respect to that clock.

Given two operations op and op0, having respectively invocation times tB(op) and tB(op0) and return times tE(op) and tE(op0), we say that op precedes op0 (op ≺ op0) iff tE(op) < tB(op0). If op does not precede op0and op0 does not precede op then they are concurrent (op||op0).

Considering all the operations invoked on the set object and the previous precedence relation, it is possible to define the history of the computation as a partial order between the operations, induced by the precedence relation. More formally an execution history can be defined as follow:

(5)

Definition 2 (Execution History) Let H be the set of all the operations issued on the set object S. An execution history bH = (H, ≺) is a partial order on H satisfying the relation ≺.

As a get()operation does not modify the content of a set, in order to define which are the admissible values that can be returned by each get(), it is possible to consider the sub-history of the execution con- taining only the operations that update the set (i.e. add() and remove() operations) and the considered get()instead of the entire history bH. To this aim, let us introduce the concept of update sub-history induced by aget()operation.

Definition 3 (Update sub-history induced by a get()operation) Let bH = (H, ≺) be the execution history of all theadd(v), remove(v) and get()operations invoked on the shared object. The update sub-history of H induced by a get()operation op, denoted as bb U

H,opb = (U, ≺), is defined as follows:

• U ⊆ H;

• U = {o ∈ H|(o =add(v) ∨ o =remove(v)) ∧ tB(o) < tE(op)} ∪ {op};

• for each pair of operations o, o0such thato, o0 ∈ U , if o precedes o0in bH then o precedes o0in bU

H,opb .

Add(2)

Add(1) Remove(2)

Remove(1) Get() pi

pj

pk

tE(op) Add(4)

Add(3)

Figure 1: Execution Histoy bH and update sub-history bU

H,opb of a set object.

As an example, Figure 1 shows the updated sub-history induced by the get()operation op issued by pj

on the history bH. In the following, when it is clear from the context which are the execution history bH and the operation op the updated sub-history refers to, the notation bU is used instead of bU

H,opb .

Note that, for a given execution history containing concurrent operations, it is possible to find several linearizations of the operations, that differ one from the other for the order given to concurrent operations.

In the following we introduce the concepts of consistent permutation to identify each of these linearizations and permutation set to identify all the consistent permutations following from a given execution history.

Definition 4 (Permutation π Consistent with bH) Given an execution history bH = (H, ≺), a permutation π of all the operations belonging to H is consistent with bH if, for any pair of operations op, op0 inπ, op precedesop0inπ whenever op precedes op0in bH.

As an example, consider the execution history bH shown in Figure 1. One of the possible permutations consistent with bH is π = (add(1)i, add(2)j, add(3)k, remove(2)i, remove(1)k, get()j, add(4)i)3. Definition 5 (Permutation set) Let bH = (H, ≺) be the execution history of the set object. A permutation set of an execution history bH, denoted as Π

Hb, is the set of all the permutationsπ that are consistent with bH.

(6)

Π

Hc1= (add(1)i, add(2)j, add(3)k, remove(2)i, remove(1)k, get()j, add(4)i) π2= (add(1)i, add(2)j, add(3)k, remove(2)i, remove(1)k, add(4)i, get()j) π3= (add(1)i, add(2)j, add(3)k, remove(2)i, get()j, add(4)i, remove(1)k) π4= (add(1)i, add(2)j, add(3)k, remove(2)i, get()j, remove(1)k, add(4)i) π5= (add(1)i, add(2)j, add(3)k, remove(2)i, add(4)i, get()j, remove(1)k) π6= (add(1)i, add(2)j, add(3)k, remove(2)i, add(4)i, remove(1)k, get()j) }

(a) Permutation Set

Π3, cH(op) = {π1= (add(3)k, remove(2)i, remove(1)k, get()j) π2= (remove(2)i, remove(1)k, add(4)i), get()j) π3= (add(1)i, add(2)j, remove(2)i, get()j) π4= (add(3)k, remove(2)i, add(4)i, get()j) π5= (remove(2)i, add(4)i, remove(1)k), get()j) }

(b) 3-Permutation Set

Figure 2: Permutation Set and 3-permutation set of the History bH of Figure 1

As an example, consider the execution history bH shown in Figure 1. The permutation set Π

Hb of the execution history bH is shown in Figure 2(a). Let us now define what is an admissible set for a given get()operation op.

Definition 6 (Set Generated by a Permutation) Let bH = (H, ≺) be an execution history and let op be a get()operation of H. Given the update sub-history bU

H,opb = (U, ≺) induced by op on bH, let π be a permutation consistent with bU

H,opb , then the set of valuesV generated by π for op contains all the values v such that:

• ∃ add(v) ∈ π : add(v) ≺ op and

• @ remove(v) ∈ π : add(v) ≺ remove(v) ≺ op;

As an example, consider the execution history bH shown in Figure 1 and its update sub-history bU in- duced by the operation op =get()j. Given a permutation π1 = (add(1)i, add(2)j, add(3)k, remove(2)i, remove(1)k, get()j, add(4)i) (belonging to Π

Ub consistent with bU ), the set of values V generated by π1is V = {3}.

Definition 7 (Admissible set for a get()operation) Given an execution history bH = (H, ≺) of a set object S, let op be a get()operation of H. Let bU

H,opb be the update sub-history induced byop on bH. An admissible set forop, denoted as Vad(op), is any set generated by any permutation π belonging to the permutation set ofΠ

Ub.

As an example, consider the execution history bH of Figure 1 and consider the get()operation op issued by pj. Given the permutation set Π

Ub, all the possible admissible sets for op are: Vad1= {3}, Vad2= {3, 4}

Vad3= {1, 3} Vad4= {1, 3} Vad5= {1, 3, 4} Vad6= {3, 4}.

4.1 Impossibility Proof

An algorithm A is an I/O automaton, composed by: the signature, the set of the states, an initial state, state-transition relations and a task partition [16].

An algorithm Aset implementing the set object, is a finite state automata, executed by each process, where we denote as V the arbitrary bounded domain of the set, and having the following signature:

3With the notation op()id it is represented the operation op issued by the process with identifier id

(7)

Input: Output:

send(v)i, v ∈ V receive(v)i, v ∈ V

get()i get return(V )i, V ∈ 2|V|

add(v)i, v ∈ V remove(v)i, v ∈ V join()i

Invoking a get()input, Asetshould be able to return a set of values that is admissible for the execution history generated by the operations. To this aim, the state of the automata implementing a set object can be represented either by (i) the list of operations invoked on the set or by (ii) a subset of values of the set domain. Figure 3 represents the automaton, implementing a set object, executed by each process pi.

pi

Get() Get_return(V)

Add(v) Remove(v)

Join()

receive(v) send(v)

Figure 3: A set I/O Automaton

In the following we will show that it is not possible to define an automata for the set object that uses finite memory if (i) the accesses to the set are continuos, (ii) the churn is continuos and (iii) the system is eventually synchronous.

Lemma 1 Let Aset be an automata implementing the set object. If the accesses to the set are continuos and the states of the automata are represented by the list of operations issued on the object thenAset uses infinite memory.

Proof The lemma trivially follows by considering that accesses to the set are continuos and then the list of operations tends to infinite thus requiring infinite memory to be stored. 2Lemma 1

Lemma 2 Let Aset be an automata implementing the set object. If the churn is continuos, the system is eventually synchronous and the states of the automata are represented by subset of values of the domain of the set thenAsetis not able to output an admissible set.

Proof Let consider a get()input occurring at time t and let us assume that there exists in the computation at least one process piwhose state V ∈ 2|V|is an admissible set.

In order that the output action get return() might return V , the following necessary condition must hold:

”pi has to belong to the computation for all the time required by the state-transition generated by the input get().”

The combined effect of the continuous churn and the eventual synchrony of the system can prevent this condition to hold for every input get(). The eventual synchrony indeed implies that no finite bound can be

(8)

established on the time taken by a transition to be completed (finite bounds can be established only when the synchrony period starts). This implies that for any value of churn c greater than 0 (see Section 4) the member of the computation at the time the transition starts could be completely replaced at the time the transition ends. As a consequence, pi could leave the computation before the transition be completed and the necessary condition is not satisfied.

2Lemma 2

Theorem 1 Let Aset be an automata implementing the set object. If (i) the accesses to the ser are con- tinuous, (ii) the churn is continuos and (iii) the system is eventually synchronous then there not exists any algorithmAsetable to implement the set object using finite memory.

Proof It follows by Lemma 1 and Lemma 2. 2T heorem 1

5 k-bounded Set Object

Due to the impossibility shown in Theorem 1, the specification of the set object is weakened and the notion of k-bounded set object is introduced. Informally, a k-bounded set object is a set that has limited memory of the execution history. In particular, given a get()operation, a k-bounded set behaves as a set where the execution history is limited only to the k most recent update operations defining thus a “window” on the execution history.

Add(2)

Add(1) Remove(2)

Remove(1) Get() pi

pj

pk Add(3) Get()

Figure 4: Examples of 3-bounded set wrt two get()operations.

As an example consider the execution shown in Figure 4 for a k-bounded set object where k = 3.

For each of the two get()operations, the execution history is restricted to the last 3 update operations; the set returned by the the get()invoked by pk is {1, 2, 3} (as the one that would be returned by the same get()invoked on a set) while the set returned by the get()invoked by pj will be ∅, instead of {3} that would be returned by the same get()invoked on a set (this is actually due to the add(1) and add(3) operations that are out of the window for such get()and then they are forgot).

In order to specify the correct behavior of the k-bounded set object in case of concurrency, let us intro- duce the notions of k-cut permutation and k-cut permutation set induced by a get()operation. Informally, a k-cut permutation induced by an operation opiis a subset of a permutation (induced by opi) consistent with the execution history. This subset contains k operations preceding opiin the permutation. The k-cut permu- tation set, is the set of permutations induced by opiobtained by the permutation set ΠHˆ of the execution by considering for each permutation its k-cut.

Definition 8 (k-cut permutation induced by opi) Given an execution history bH = (H, ≺) let π = (op1, op2, . . . , opn) a permutation consistent with bH. Given an operation opi ofπ and an integer k, the k-cut

(9)

permutation induced by opi onπ, denoted as πk(opi), is the sub-set of π ending with opiand including the k operations twidehat precedeopiinπ (i.e. πk(opi) = (opi−k, . . . , opi−1, opi)).

As an example, consider the execution history bH shown in Figure 1 and consider the permutation π1 = (add(1)i, add(2)j, add(3)k, remove(2)i, remove(1)k, get()j, add(4)i) consistent with bH. If op =get()j and k = 3, the 3-cut permutation induced by op on π1 is π13(op)=(add(3)k, remove(2)i, remove(1)k, get()j).

Definition 9 (k-cut permutation set induced by opi) Given an execution history bH = (H, ≺) let Π

Hb its permutation set. Given an operationop of H and an integer k, the k-cut permutation set induced by op on ΠHb, denoted asΠk, bH(op), is the set of all the k-cut permutations induced by op on each permutation π of ΠHb.

Consider the execution history bH depicted in Figure 1 and the permutation set Π

Hb of the execution history bH shown above. If op =get()j and k = 3, the 3-cut permutation set induced by op is shown in Figure 2(b).

Definition 10 (Admissible set for a get()operation) Given an execution history bH = (H, ≺) of a k-bounded set object, letop=get()be an operation of H. Let bUH,opb be the update sub-history induced byop on bH and letk be an integer. An admissible set for op, denoted as Vad(op), is any set generated by any permutation π belonging to thek-cut permutation set of Πk, bU

H,opb

.

As an example, consider the execution history bH shown in Figure 1 and its update sub-history bU induced by the operation op =get()j. Given its 3-cut permutation set Π3, bU, all the possible admissible sets for op are Vad 1= {3}, Vad 2= {4}, Vad 3= {1}, Vad 4= {3, 4}, Vad 5= {4}.

It is interesting to remark that if k is equal to ∞, we get a set as specified in Section 2. If k is equal to 1, the k-bounded set boils down to a one-bit regular register where add() and remove() operations set and reset the memory bit respectively. Any value of k in the between allows to remember only the recent history.

This makes possible a finite memory implementation of the k-bounded set in an eventually synchronous distributed systems with continuous accesses and churn as shown in the next section.

6 A distributed protocol implementing k-bounded set

This section presents an algorithm of the class MP where each quorum has a cardinality of at least dn/2e processes and we assume dn/2e active processes at any time present in the system where n = n0is the size of the system. Each process pihas the following local variables:

A set variable denoted seticontaining the local copy of the set.

Two integers update sniand get snithat represent the sequence numbers used by pito distinguish, respec- tively, its successive update operations (add() and remove()) and get() operations.

A variable runningi that stores the tuple corresponding to the update operation (add() or remove()) cur- rently executed by the process. It is initialized to a default value < null, ⊥, 0, i >.

A set variable last opsi, that can contain at most k entries, used to maintain an history of recent update operations executed by pi. Such variable contains 4-uples < type, val, sn, id > each one characterizing an operation of type type = {A or R} (respectively for add() and remove()) of the value val, with a sequence number sn, issued by a process with identity id.

(10)

A boolean activei, initialized to false, that is switched to true just after pihas joined the system.

Two boolean variables gettingiand updatingiwhose value is true when piis executing respectively a get() operation or an add()/remove() operation.

Four set variables, denoted repliesi, reply toi, pendingi and dl previ that are used in the period during which pi joins the system. The local variable repliesicontains the pairs < sn, ops > that pi has received from other processes during its join period, while reply toicontains the processes that are joining the system concurrently with pi(as far as piknows). The set pendingicontains the 4-uples < type, val, sn, id > each one characterizes an update operation executed concurrently with the join. The set dl previ is a variable where pi stores the processes that have acknowledged its inquiry message while they were still joining or while they were accessing the set by issuing the get(). Once pibecomes active, it has to send a reply to all the processes in the dl previset to prevent them to be blocked forever.

An update ackiset used by piwhile adding or removing a value. The update ackiset is used to remember the processes that have acknowledged its operation.

The join()operation (Figure 5). The algorithm involves all the processes present in the distributed com- putation. Initially, pi broadcasts an INQUIRY(i, get sni) message to inform processes belonging to the computation that it wish joining the k-bounded set computation and thus wants to obtain the history of most recent operations (line 02). Then, pireceives and orders operations seen from a majority of other processes in the system. The sort function simply orders the set variable historyiusing the lexicographic order on the pairs (sn, id) (line 03-06) . Now, for each operation in the historyiset, piexecute theUPDATE() procedure (line 07). Then, pi becomes active (line 08), which means that it can answer the inquiries it has received from other processes, and does it if reply to 6= ∅. pisends such a reply message also to the processes in its set dl previto prevents them from waiting forever (line 09-11). Finally, pireturns ok to indicate the end of the join() operation (line 12).

When pireceives a messageINQUIRY(j, snj), it answers pjby sending back aREPLY(< update sni, last opsi>

, snj) message containing its local variables if it is active and, in case it is accessing the set by means of a get() operation, it sends also aDL PREV() message or in case it is accessing the set by means of an add() or remove() operation, it sends also anUPDATE() message (line 14-16). If pi is not active, it postpones its answer until it becomes active (line 17 and line 10) and sends aDL PREV() message (line 18).

When pi receives a messageREPLY(< sn, ops >, sn) from pj, if the reply message is an answer to its

INQUIRY(j, snj) it adds the corresponding pair to its set repliesi(line 21).

Finally, when pi receives a messageDL PREV(j, snj), it adds its content to the set dl previ(line 22), in order to remember that it has to send a reply to pjwhen it will become active (lines 09-11).

The get()operation (Figure 6). A get() operation is a simplified version of the join(). Each get invocation is identified by a pair (id, sn) where id is the process index and sn is the sequence number of the get get sni

So, pibroadcasts a get requestGET(i, get sni) and waits for a quorum.

Then pirecomputes and order its history by making the union of all the partial history received so far and the operations received during the waiting period (lines 03-04). Afterwards piexecutes all the operations by invoking theUPDATEprocedure (line 05) and returns the set (line 06).

When a process pireceives a messageGET(j, snj), it answers pjby sending back aREPLY(< update sni, last opsi >, snj) message containing its local variables if it is active (line 08). Otherwise, pipostpones its answer until it becomes active (line 09).

The add(v) and remove(v) operations (Figure 7). The structure of the two protocols is basically the same and in the following text the term adjournment operation is used as synonym for both the operations.

(11)

operation join(i):

(01) seti← ∅; update sni← 0; get sni← 0; runningi← < null, ⊥, 0, i >; last opsi← ∅; pending i ← ∅; repliesi← ∅;

update acki← ∅; reply toi← ∅; dl previ← ∅; active i ← false; gettingi← false; updatingi← false;

(02) broadcastINQUIRY(i, get sni);

(03) wait until(|repliesi| >n2);

(04) for each < −, ls >∈ repliesido historyi← historyi∪ ls endfor (05) historyi← historyi∪ pendingi;

(06) sort (historyi);

(07) for each < type, val, sn, id >∈ historyido executeUPDATE(type, val, sn, id) end for;

(08) activei← true;

(09) for each < j, snj>∈ (reply toi∪ dl previ) do (10) sendREPLY(< update sni, lastopi>, snj) to pj; (11) end for;

(12) return(ok).

—————————————————————————————————————————————————————

(13) whenINQUIRY(j, snj) is delivered:

(14) if (activei) then sendREPLY(< update sni, lastopi>, snj) to pj

(15) if (gettingi) then sendDL PREV(j, snj) endif (16) if (updatingi) then sendUPDATE(runningi) endif (17) else reply toi← reply toi∪ {j, snj};

(18) sendDL PREV(j, get sn);

(19) end if;

(20) whenREPLY(< usn, ops >, sn) is received:

(21) if (get sni= sn) then repliesi← repliesi ∪ {< usn, ops >} endif

(22) whenDL PREV(j, snj) is received: dl previ← dl previ ∪ {< j, snj>}.

Figure 5: The join() protocol for a k-bounded set object (code for pi)

When a process piwants to execute an adjournment, it first performs a get()operation, in order to obtain the most updated sequence number (line 01, line 07), updates the local variables (line 02, line 08) and then it sends anUPDATE() message to perform the current adjournment (line 03, line 09) and waits until it receives enough ack (line 04, line 10). Once piis unblocked from the wait statement, it resets its state by setting its variable updateito false and its runningivariable to a default value (line 05, line 11).

When an UPDATE(< type, val, snj, j >) message is delivered to pi from some process pj, if pi is not active, it puts the 4-uple corresponding to the current operation into its pendingiset (line 14) and will process the operation at the end of the join(), once it will be active (line 05, Figure 5) otherwise, pi execute theUPDATEprocedure for the current operation (line 15).

When an ACK(sn, j) message is delivered to pi from some process pj, if the sequence number sn attached to the message is the same as the current operation then piadds j to the set of processes that have

operation get(): % issued by any process pi%

(01) get sni← get sn1+ 1; repliesi← ∅; pendingi← ∅; gettingi← true;

(02) broadcastGET(i, get sni); wait until(|repliesi| >n

2); historyi← ∅;

(03) for each < −, ls >∈ repliesido historyi← historyi∪ ls endfor;

(04) historyi← historyi∪ pendingi; sort (historyi);

(05) for each < type, val, sn, id >∈ historyido executeUPDATE(type, val, sn, id) end for;

(06) gettingi← f alse; return(seti).

————————————————————————————————–

(07) whenGET(j, snj) is delivered: % at any process pi%

(08) if (activei) then sendREPLY(< update sni, lastopi>, snj) to pj

(09) else reply toi← reply toi∪ {j, snj} endif

Figure 6: The get() protocol for a k-bounded set object (code for pi)

(12)

operation add(v): % issued by any process pi% (01) get();

(02) update sni← update sni+ 1; update acki← ∅; updatingi← true; runningi← < A, v, update sni, i >;

(03) broadcastUPDATE(< A, v, update sni, i >);

(04) wait until(|update acki| >n2);

(05) updatingi← false; runningi←< null, ⊥, 0, i >;

(06) return(ok).

————————————————————————————————————————————————–

operation remove(v): % issued by any process pi% (07) get();

(08) update sni← update sni+ 1; update acki← ∅; updatingi← true; runningi← < R, v, update sni, i >;

(09) broadcastUPDATE(< R, v, update sni, i >);

(10) wait until(|update acki| >n2);

(11) updatingi← false; runningi←< null, ⊥, 0, i >;

(12) return(ok).

————————————————————————————————————————————————–

(13) whenUPDATE(< type, val, snj, j >) is delivered: % at any process pi% (14) if (¬activei) then pendingi← pendingi∪ {< type, val, snj, j >}

(15) else executeUPDATE(< type, val, snj, j >) endif.

(16) whenACK(sn, j) is delivered: % at any process pi%

(17) if (sn = update sni) then update acki← update acki∪ {j} endif.

Figure 7: The add() and remove() protocol for a k-bounded set object (code for pi)

acknowledged its operation (line 17).

TheUPDATE() procedure (Figure 8). Such procedure is called by every operation and its aim is to keep updated the local variables of a process pito the most recent k operations. Specifically, when a process pi

executes the UPDATE(< type, val, snj, j >) procedure, it first sends an ACK(snj, i) message to prevent the process pito be blocked forever (line 01) and then checks if the current operation is one of the k most recent ones (lines 02 - 07). Once pi has the set of the most recent k operations, it sorts the set of recent operations according the lexicographic order of pairs (sn, id) (line 08) and then executes the operations contained in the last opsiset according to this order (lines 09 - 11). Finally piupdates its sequence number to the maximum between its own and the one received with the update (line 12).

procedureUPDATE(< type, val, snj, j >) % at any process pi% (01) sendACK(snj, i) to pj;

(02) select < −, −, sn, id >∈ last opsi

such that (∀ < −, −, sn0, id0>∈ last opsi: (sn, id) ≤ (sn0, id0));

(03) if (|last opsi| < k) then last opsi← last opsi∪ {< type, val, snj, j >};

(04) else if ((snj, j) > (sn, id))

(05) then last opsi← last opsi/{< −, −, sn, id >};

(06) last opsi← last opsi∪ {< type, val, snj, j >} endif (07) endif

(08) seti← ∅; sort (last opsi);

(09) for each < type, val, sn, id >∈ last opsi

(10) if (type = A) then seti← seti∪ {val}; else seti← seti/{val} endif (11) endfor

(12) update sni← max(update sni, snj).

Figure 8: TheUPDATE() protocol (code for pi)

(13)

6.1 Correctness Proofs

We first prove that any set operation terminates and that the k-bounded set returned by any get()operation is admissible.

Lemma 3 Let n be the number of processes belonging to the computation at time t0. If (i) |A(t)| > dn2e ∀t and (ii) a process piinvokes ajoin() operation and does not leave the system for at least 3δ time units then it eventually returns from thejoin() operation.

Proof Let us first observe that, in order to terminate its join() operation, a process pihas to wait until its set repliesi contains, at least, dn2e elements (line 03, Figure 5). This set is filled in by pi when it receives the

REPLY() messages for the current operation (line 21 of Figure 5). A process pj sends aREPLY() message to pi if (i) either it is active and has received an INQUIRY message from pi, (line 14, Figure 5), or (ii) it terminates its join() operation and < i, − > ∈ reply toj∪ dl prevj (lines 09-11, Figure 5).

Let us suppose by contradiction that |repliesi| remains smaller than dn2e. This means that pi does not receive enoughREPLY() carrying the appropriate sequence number. Let t be the time at which the system becomes synchronous and let us consider a time t0 > t at which a new process pj invokes the join operation.

At time t0, pj broadcasts anINQUIRYmessage (line 02, Figure 5). As the system is synchronous from time t, every process present in the system during [t0, t0+ δ] receives suchINQUIRYmessage by time t0+ δ.

As piis not active yet when it receives pj’sINQUIRYmessage, the process piexecutes line 18 of Figure 5 and sends back aDL PREVmessage to pj. Due to the assumption that every process that joins the system remains inside for at least 3δ time units, pjreceives pi’sDL PREVand executes consequently line 22 (Figure 5) adding < i, − > to dl prevj. Due to the assumption that there are always at least dn2e active processes in the system, we have that at time τ0+ δ at least dn2e processes receive the INQUIRYmessage of pj, and each of them will execute line 14 (Figure 5) and send a REPLY message to pj. Due to the synchrony of the system, pj receives these messages by time t0+ 2δ and then stops waiting and becomes active (line 08, Figure 5). Consequently (lines 09-11) pj sends aREPLYto pias i ∈ reply toj∪ dl prevj. By δ time units, pi receives that REPLY message and executes line 21, Figure 5. Due to churn rate, there are an infinity of processes invoking the join after time t and piwill receive a reply from any of them so piwill fill in its set

repliesiand terminate its join operation. 2Lemma 3

Lemma 4 Let n be the number of processes belonging to the computation at time t0. If (i) |A(t)| > dn2e ∀t and (ii) each process that invokes a join() operation does not leave the system for at least 3δ time units then a processpithat invokes aget() operation and does not leave the system eventually returns from such operation.

Proof Since the get() is a simplified case of a join(), the proof is the same of Lemma 3. 2Lemma 4

Lemma 5 Let n be the number of processes belonging to the computation at time t0. If (i) |A(t)| > dn2e ∀t and (ii) each process that invokes a join() operation does not leave the system for at least 3δ time units then a processpi that invokes anadd() operation or a remove() operation and does not leave the system eventually returns from such operation.

Proof Let us first assume that the get() operation invoked at line 01 and line 07 terminates (this is proved in Lemma 4). Before terminating the add() (or remove()) of a value v with an update sequence number usn

(14)

a process pihas to wait until its set update acki contains at least dn2e elements (line 04 and line 10, Figure 7).

Empty at the beginning of the add(v) operation (line 02, Figure 7) and at the beginning of the remove(v) operation (line 08, Figure 7), this set is filled in when theACK(usn, −)) messages are delivered to pi (line 16, Figure 7).

Such an ack message is sent by every process pjwhen theUPDATEprocedure is activated due to (i) the receipt of anUPDATEmessage from a process pi(line 15, Figure 7) or (ii) the termination of a join()operation and < −, −, usn, i > ∈ historyj∪ pendingj (lines 09 - 11, Figure 5).

Suppose by contradiction that pinever fills in update acki. This means that pimissesACK() messages carrying the sequence number usn. Let us consider the time t at which the system becomes synchronous, i.e., every message sent by any process pj at time t0 > t is delivered by time t0+ δ. Due to the assumption that the process issuing the update does not leave before the termination of its operation, it follows that pi

will receive all theINQUIRYmessages sent by processes joining after time t.

When it receives an INQUIRY() message from some joining process pj, pi executes lines 14- 16 of Figure 54and sends aREPLY message to pj as reply to its request and forwards an UPDATEmessage with the sequence number usn.

Since, after t, the system is synchronous, pjreceives both theREPLYmessage and theUPDATEmessage in at most δ time units. When pj receives theUPDATEmessage, since it is not yet active, it puts the current update < −, −, usn, i > in the pendingj set (line 14, Figure 7). Due to Lemma 3 pj will terminate the join executing lines 05-07 by processing the pending update (and also the one of pi) and sending the

ACK(ubn, −) message to pi(line 01, Figure 8).

As (1) by assumption a process that joins the system does not leave for at least 3δ time units and (2) the system is now synchronous, such anACK(usn, −) message is received by piin at most δ time units and consequently piexecutes line 17, Figure 7 and adds pj to the set update acki.

Due to the dynamicity of the system, processes continuously join the system. Due to the chain of messagesINQUIRY(),UPDATE(),ACK(), the reception of each message triggering the sending of the next one, it follows that pieventually receives dn2eACK(usn, −) messages and terminates its update operation.

2Lemma 5

Theorem 2 Let n be the number of processes belonging to the computation at time t0. If (i) |A(t)| > dn2e ∀t and (ii) each process that invokes a join() operation does not leave the system for at least 3δ time units then a processpithat invokes ajoin(), get(), add() or remove() operation and does not leave the system eventually terminates its operation.

Proof It follows from Lemma 3, Lemma 4 and Lemma 5. 2T heorem 2

Lemma 6 Let bH = (H, ≺) an execution history of a k-bounded set object S. Given the algorithms shown in Figures 5-8, there exists a total order of the pairs (sn, id) that identify each add(v) and remove(v) operation such that the total order is consistent with the partial order given by the execution history bH.

Proof

Let op and op0two update operations and let sn(op) and sn(op0) the corresponding sequence numbers given to the operations by the algorithm.

4The process pithat issues an update operation always executes lines 14-16 of Figure 5 because it is always in the active mode.

(15)

Case 1: op ≺ op0 ⇒ sn(op) < sn(op0). Let us suppose by contradiction that the two sequence numbers are the same. Without loss of generality, let pi be the process that issues op, pj be the process issuing op0 and let x be the sequence number associated by pi to its operation.

• @ op00: op ≺ op00≺ op0: when pistarts an update operation (both add(v) or remove(v)), it sends an

UPDATE(< −, −, x, i >) message and waits until it receives at least (n2 + 1)ACKmessages carrying the sequence number x (lines 09-10, Figure 7). This means that in the system there exists at least (n2+ 1) processes that executes line 12, Figure 8 and update their update snkto a value that is greater equal to x. When pj starts its operation, it first issues a get()to retrieve the most up-to-date sequence number (line 07, Figure 7). During the get()operation, processes sent to pjtheir local copies of the list last opskand to terminate the get(), pj executes theUPDATEprocedure by updating its update snj

variable. Since |A(t)| > dn2e ∀t there exists at least one process that has seen the operation of pi and that send a sequence number that is at least x. Doing the update, pj always select the maximum sequence number and at the end of the get()it has in its update snjvariable a value grater equal than x. Since before sending theUPDATEmessage pj increments its sequence number (line 08, Figure 7) the operation of pj will have a sequence number greater that pi and there is a contraddiction.

• ∃op00, . . . , opi : op ≺ op00 ≺ · · · ≺ opi ≺ op0: in this case, the statement simply follows by the point above observing that the operator “<” is transitive (i.e. if sn(op) < sn(op00) and sn(op00) < sn(op0) then sn(op) < sn(op0)).

Case 2: op||op0. In this case, the two sequence numbers assigned by the algorithm to the operation, depend by the delivery order of the broadcast messages sent during the operation. In case of concurrency of the broadcasts, sequence numbers can be the same. However, in the UPDATEprocedure, the operations are ordered and executed according to the pairs (sn, id) and since the process identifiers are unique in the system, there exists a total order also among concurrent operations.

2Lemma 6

Lemma 7 Let bH = (H, ≺) an execution history of a k-bounded set object S and let pi be a process that invokes ajoin()operation at time t. Let op be an instantaneous get()operation issued at time tE(join) and U be the update sub-history of bb H induced by op. Given an integer k, if |A(t)| > dn2e ∀t, then at the end of thejoin()operation, pi maintains in itslast opsivariable ak-cut permutation induced by op consistent with bU .

Proof

In order to terminate the join(), a process pihas to wait until it receives at least (dn2e + 1) replies from other processes (line 03, Figure 5). Each reply received by pifrom a process pj contains the list of the most recent operations saw by pj.

Step 1. Let pi be the first process that terminates the join()operation. Let O be the set of operations started before the end of the join()of pi.

• |O|≤ k. Since pi is the first process that concludes the the join()operation, the set of active processes at the end of the join()is a subset of the processes that were active at time t0 (i.e. A(tE(join)) ⊂ A(t0)). Therefore, for each update operation op belonging to O, there exists at least one process pk that has sent anACK() message for op (line 01, Figure 8), that has inserted a tuple < −, −, sn, id > in

(16)

its last opskvariable (line 03, Figure 8) and that has sent aREPLY() message to pi5. When piexecutes line 04 of Figure 5, it obtains an historyivariable containing all the operations terminated before the join()invocation, and possibly some of the concurrent ones. Executing line 06 of Figure 5, pi will obtain an history totally ordered by the pairs (sn, id) and due to Lemma 6 this total order is consistent with the partial order of the execution history bU . Since theUPDATEprocedure is executed following such total order and considering that all the operations will be stored in the last opsi variable, it is possible to conclude that at the end of the join()operation, the last opsi variable contains a k-cut permutation consistent with bU .

• |O|> k. Let consider the worse case where there exists a process pj that replies to all the |O| opera- tions. Note that, for the first kUPDATEmessages received by pj, it always executes line 03 of Figure 8 because it has empty slots in its last opsj variable. When pjreceives the (k + 1)-thUPDATEmes- sage, it executes line 04, Figure 8 and checks if the current update is “new” with respect to the ones contained in its last opsj variable, and if it is so, it substitutes the one with the smaller pair (sn, id) with the new one (line 05-06, Figure 8). Note that substituting the update operation identified by the smaller pairs (sn, id), the total order is preserved. Since piis the first process that concludes the the join()operation, the set of active processes at the end of the join()is a subset of the processes that were active at time t0 (i.e. A(tE(join)) ⊂ A(t0)). Therefore, for each update operation op belonging to O, there exists at least one process pkthat has sent anACK() message for op (line 01, Figure 8), that has inserted a tuple < −, −, sn, id > in its last opsk variable (line 03, Figure 8). Considering that an operation op is deleted by the last opskvariable from some process pkiff there exist k operations that follows op in the total order, and that all the operations have been acknowledged by at least one process that replies to pi, follows that even in this case at the end of the join()operation, the last opsi

variable contains a k-cut permutation consistent with bU .

Step i. Let pi be the i-th process that terminates the join()operation. Let P be the set of processes that send a REPLYmessage to pi. The set P can be partitioned in the set of processes that were active at time t0 and the set of processes that became active at some t > t0. Note that all the processes that were active at time t0 will send to pi a copy of their last opsk variable where, for each new update, they substitute the “older” operation and then, their list represents k-cut permutation consistent with bU . Each process that became active after t0 has received a list from some other processes. Iterating the reasoning we come back to the situation of step 1 and the Lemma follows.

2Lemma 7

Lemma 8 Let bH = (H, ≺) an execution history of a k-bounded set object S and let pi be a process that invokes a get()operation op. Let bU be the update sub-history of bH induced by op. Given an integer k, if |A(t)| > dn2e ∀t, then at the end of the get()operation, pi maintains in its last opsi variable a k-cut permutation induced byop consistent with bU .

Proof

In order to terminate the get(), a process pi has to wait until it receives at least (n2 + 1) replies from other processes (line 02, Figure 6). Each reply received by pifrom a process pj contains the list of the most recent operations saw by pj. Let P be the set of processes that send aREPLYmessage to pi. The set P can

5Note that, since the total number of operations is smaller than k, processing theUPDATEmessage, each process always executes line 03, Figure 8.

Riferimenti

Documenti correlati

[External memory implicit searching] Given a static input array A of N keys in EM, describe how to organize the keys inside A by suitably permuting them during a prepro- cessing

Show that the problem cannot be solved deterministically under the following rules: the algorithm can access only O(log c n) information for each of the k items that it can store,

When a process p i wants to execute an update, it first per- forms a get()operation, in order to obtain the most updated sequence number, updates the local variables and then it

Considering that (i) active processes participating in the computation at time t B (join i ) can be replaced along time by processes joining after t B (join i ), (ii) all these

Then to show that even in presence of failures the protocol assures eventual strong connectivity it is also shown that: any restore occurs in a finite time (Lemma 2) and at the end

number attached to the message was greater than the one maintained locally by p i , then it executes immediately the update by adding v to its local copy of the set (lines 01-04

Cartesian reference system belonging to the triangle

The description will be based on the review of existing literature on trust and credibility and on information diffusion models in contemporary social media applied to a case