Skip to content

Commit 89f38dc

Browse files
committed
tla/storage: implement bucket generations
This commit prepares vshard bucket rebalancing algorithm to be resistant for the #214 issue. It introduces the persistent bucket generation, which is incremented on every `SENDING` status. Part of #214 NO_DOC=tla NO_TEST=tla
1 parent 22aa35d commit 89f38dc

File tree

1 file changed

+34
-22
lines changed

1 file changed

+34
-22
lines changed

proofs/tla/src/storage.tla

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ MessageType == {
5757
}
5858

5959
BucketRecvContent ==
60-
[bucket: BucketIds, final: BOOLEAN]
60+
[bucket: BucketIds, final: BOOLEAN, generation : Nat]
6161
BucketRecvResponseContent ==
6262
[bucket: BucketIds, status: BOOLEAN]
6363
RecoveryBucketStatContent ==
@@ -70,7 +70,7 @@ BucketTestGcResponseContent ==
7070
[bucket: BucketIds, can_gc: BOOLEAN]
7171
ReplicationBucketContent ==
7272
[bucket: BucketIds, status: BucketState,
73-
destination: ReplicaSets \union {NULL}]
73+
destination: ReplicaSets \union {NULL}, generation: Nat]
7474

7575
MessageContent ==
7676
BucketRecvContent \union
@@ -97,7 +97,9 @@ StorageType == [
9797
transferingBuckets : SUBSET BucketIds,
9898
buckets :
9999
[BucketIds ->
100-
[status : BucketState, destination: ReplicaSets \union {NULL}]],
100+
[status : BucketState,
101+
destination : ReplicaSets \union {NULL},
102+
generation : Nat]],
101103
bucketRefs : [BucketIds ->
102104
[ro : Nat, rw : Nat, ro_lock : BOOLEAN, rw_lock : BOOLEAN]],
103105
gcAck : [BucketIds -> SUBSET Storages],
@@ -152,8 +154,8 @@ StorageInit ==
152154
LET rs_for_s ==
153155
CHOOSE rs \in ReplicaSets : i \in StorageAssignments[rs]
154156
IN IF b \in BucketAssignments[rs_for_s]
155-
THEN [status |-> "ACTIVE", destination |-> NULL]
156-
ELSE [status |-> NULL, destination |-> NULL]],
157+
THEN [status |-> "ACTIVE", destination |-> NULL, generation |-> 1]
158+
ELSE [status |-> NULL, destination |-> NULL, generation |-> 0]],
157159
bucketRefs |-> [b \in BucketIds |->
158160
[ro |-> 0, rw |-> 0, ro_lock |-> FALSE, rw_lock |-> FALSE]],
159161
gcAck |-> [b \in BucketIds |-> {}],
@@ -225,13 +227,14 @@ StorageStateApply(i, state) ==
225227
(* Replication *)
226228
(***************************************************************************)
227229

228-
BucketStatusChange(state, from, bucket, status, destination) ==
230+
BucketStatusChange(state, from, bucket, status, destination, generation) ==
229231
LET ref_before == state.bucketRefs[bucket]
230232
ref_after == [ref_before EXCEPT
231233
!.ro_lock = ~(status \in ReadableStates),
232234
!.rw_lock = ~(status \in WritableStates)]
233235
state1 == [state EXCEPT
234-
!.buckets[bucket] = [status |-> status, destination |-> destination],
236+
!.buckets[bucket] = [status |-> status, destination |-> destination,
237+
generation |-> generation],
235238
!.bucketRefs[bucket] = ref_after,
236239
!.vclock[from] = @ + 1]
237240
a == Assert(state.status = "master", "Bucket change on non-master") IN
@@ -240,7 +243,7 @@ BucketStatusChange(state, from, bucket, status, destination) ==
240243
\* replicate to all other nodes in replicaset
241244
LET replication_msg == [type |-> "REPLICATION_BUCKET",
242245
content |-> [bucket |-> bucket, status |-> status,
243-
destination |-> destination]] IN
246+
destination |-> destination, generation |-> generation]] IN
244247
[state1 EXCEPT !.networkSend = [
245248
j \in Storages |-> IF j \in OtherReplicasInReplicaset(state.id)
246249
THEN Append(state1.networkSend[j], replication_msg)
@@ -252,8 +255,8 @@ ProcessReplicationBucket(state, j) ==
252255
LET msg == Head(state.networkReceive[j]) IN
253256
IF msg.type = "REPLICATION_BUCKET" THEN
254257
LET stateNew == BucketStatusChange(state, j, msg.content.bucket,
255-
msg.content.status, msg.content.destination) IN
256-
[stateNew EXCEPT !.networkReceive[j] = Tail(@)]
258+
msg.content.status, msg.content.destination, msg.content.generation)
259+
IN [stateNew EXCEPT !.networkReceive[j] = Tail(@)]
257260
ELSE state
258261

259262
(***************************************************************************)
@@ -342,7 +345,8 @@ BucketSendWaitAndSend(state, b, j) ==
342345
/\ MasterSyncDone(state)
343346
THEN
344347
LET msg == [type |-> "BUCKET_RECV",
345-
content |-> [bucket |-> b, final |-> FALSE]]
348+
content |-> [bucket |-> b, final |-> FALSE,
349+
generation |-> state.buckets[b].generation]]
346350
IN [state EXCEPT
347351
!.networkSend[j] = Append(@, msg),
348352
!.sendingBuckets = @ \ {b}]
@@ -364,7 +368,8 @@ BucketSendStart(state, b, j) ==
364368
!.transferingBuckets = @ \union {b},
365369
!.errinj.bucketSendCount = @ + 1] IN
366370
LET state2 == BucketStatusChange(
367-
state1, state.id, b, "SENDING", ReplicasetOf(j))
371+
state1, state.id, b, "SENDING", ReplicasetOf(j),
372+
state1.buckets[b].generation + 1)
368373
state3 == [state2 EXCEPT
369374
!.sendWaitTarget[b] = state2.vclock,
370375
!.sendingBuckets = @ \union {b}]
@@ -382,7 +387,8 @@ BucketRecvStart(state, j) ==
382387
LET state1 == [state EXCEPT
383388
!.networkReceive[j] = Tail(@)] IN
384389
LET state2 == BucketStatusChange(
385-
state1, state.id, b, "RECEIVING", ReplicasetOf(j)) IN
390+
state1, state.id, b, "RECEIVING", ReplicasetOf(j),
391+
msg.content.generation) IN
386392
LET response == [type |-> "BUCKET_RECV_RESPONSE",
387393
content |-> [bucket |-> b, status |-> TRUE]] IN
388394
[state2 EXCEPT !.networkSend[j] = Append(@, response)]
@@ -406,11 +412,12 @@ BucketSendFinish(state, j) ==
406412
IF ok THEN
407413
\* Receiver accepted the bucket — mark as SENT
408414
\* and send final message.
409-
LET state2 == BucketStatusChange(
410-
state1, state.id, b, "SENT", ReplicasetOf(j)) IN
415+
LET state2 == BucketStatusChange(state1, state.id, b, "SENT",
416+
ReplicasetOf(j), state1.buckets[b].generation) IN
411417
LET final_msg ==
412418
[ type |-> "BUCKET_RECV",
413-
content |-> [bucket |-> b, final |-> TRUE] ] IN
419+
content |-> [bucket |-> b, final |-> TRUE,
420+
generation |-> state.buckets[b].generation]] IN
414421
[ state2 EXCEPT
415422
!.transferingBuckets = @ \ {b},
416423
!.networkSend[j] = Append(@, final_msg)
@@ -435,7 +442,8 @@ BucketRecvFinish(state, j) ==
435442
THEN LET state1 == [state EXCEPT
436443
!.networkReceive[j] = Tail(@)] IN
437444
LET state2 ==
438-
BucketStatusChange(state1, state.id, b, "ACTIVE", NULL) IN
445+
BucketStatusChange(state1, state.id, b, "ACTIVE", NULL,
446+
state1.buckets[b].generation) IN
439447
[state2 EXCEPT !.bucketRefs[b].rw_lock = FALSE]
440448
ELSE [state EXCEPT !.networkReceive[j] = Tail(@)] \* Drop if not master
441449
ELSE state \* Leave non-BUCKET_RECV messages in queue
@@ -506,16 +514,19 @@ ProcessRecoveryStatResponse(state, j) ==
506514
\* Recovery policy: sender adjusts state after getting peer's status.
507515
ELSE IF localStatus = "SENDING" /\ remoteStatus \in {"ACTIVE"} THEN
508516
LET state1 == [state EXCEPT !.networkReceive[j] = Tail(@)] IN
509-
BucketStatusChange(state1, state.id, b, "SENT", state.buckets[b].destination)
517+
BucketStatusChange(state1, state.id, b, "SENT",
518+
state.buckets[b].destination, state1.buckets[b].generation)
510519
ELSE IF localStatus = "RECEIVING" /\
511520
(remoteStatus \in {"ACTIVE", "PINNED"}
512521
\/ (remoteStatus = "SENDING" /\ ~remoteTransf)) THEN
513522
LET state1 == [state EXCEPT !.networkReceive[j] = Tail(@)] IN
514-
BucketStatusChange(state1, state.id, b, "GARBAGE", NULL)
523+
BucketStatusChange(state1, state.id, b, "GARBAGE", NULL,
524+
state1.buckets[b].generation)
515525
ELSE IF (b \notin state.transferingBuckets)
516526
/\ (remoteStatus \in {"SENT", "GARBAGE"} \/ remoteStatus = NULL) THEN
517527
LET state1 == [state EXCEPT !.networkReceive[j] = Tail(@)] IN
518-
BucketStatusChange(state1, state.id, b, "ACTIVE", NULL)
528+
BucketStatusChange(state1, state.id, b, "ACTIVE", NULL,
529+
state1.buckets[b].generation)
519530
ELSE
520531
[state EXCEPT !.networkReceive[j] = Tail(@)]
521532

@@ -543,7 +554,8 @@ TryBucketGarbage(state, b) ==
543554
THEN
544555
\* Reset acks and mark bucket as GARBAGE
545556
LET state1 == [state EXCEPT !.gcAck[b] = {}] IN
546-
BucketStatusChange(state1, state.id, b, "GARBAGE", NULL)
557+
BucketStatusChange(state1, state.id, b, "GARBAGE", NULL,
558+
state1.buckets[b].generation)
547559
ELSE
548560
state
549561

@@ -621,7 +633,7 @@ ProcessGcTestResponse(state, j) ==
621633

622634
GcDropGarbage(state, b) ==
623635
IF IsMaster(state) /\ state.buckets[b].status = "GARBAGE" THEN
624-
BucketStatusChange(state, state.id, b, NULL, NULL)
636+
BucketStatusChange(state, state.id, b, NULL, NULL, 0)
625637
ELSE state
626638

627639
(***************************************************************************)

0 commit comments

Comments
 (0)