@@ -49,6 +49,8 @@ MessageType == {
4949 \* recovery_step_by_step().
5050 "RECOVERY_BUCKET_STAT" ,
5151 "RECOVERY_BUCKET_STAT_RESPONSE" ,
52+ "RECOVERY_BUCKET_FULLSCAN" ,
53+ "RECOVERY_BUCKET_FULLSCAN_RESPONSE" ,
5254 \* gc_bucket_process_sent_one_batch_xc().
5355 "BUCKET_TEST_GC" ,
5456 "BUCKET_TEST_GC_RESPONSE" ,
@@ -63,7 +65,11 @@ BucketRecvResponseContent ==
6365RecoveryBucketStatContent ==
6466 [ bucket : BucketIds ]
6567RecoveryBucketStatResponseContent ==
66- [ bucket : BucketIds , status : BucketState , transferring : BOOLEAN ]
68+ [ bucket : BucketIds , status : BucketState , transferring : BOOLEAN , generation : Nat ]
69+ RecoveryBucketFullscanContent ==
70+ [ bucket : BucketIds ]
71+ RecoveryBucketFullscanResponseContent ==
72+ [ bucket : BucketIds , status : BucketState , generation : Nat ]
6773BucketTestGcContent ==
6874 [ bucket : BucketIds ]
6975BucketTestGcResponseContent ==
@@ -77,6 +83,8 @@ MessageContent ==
7783 BucketRecvResponseContent \union
7884 RecoveryBucketStatContent \union
7985 RecoveryBucketStatResponseContent \union
86+ RecoveryBucketFullscanContent \union
87+ RecoveryBucketFullscanResponseContent \union
8088 BucketTestGcContent \union
8189 BucketTestGcResponseContent \union
8290 ReplicationBucketContent
@@ -106,6 +114,8 @@ StorageType == [
106114 sendWaitTarget : [ BucketIds -> [ Storages -> Nat ] ] ,
107115 sendingBuckets : SUBSET BucketIds ,
108116 masterWaitTarget : [ Storages -> Nat ] ,
117+ recoveryAck : [ BucketIds ->
118+ [ ReplicaSets -> [ status : BucketState , generation : Int ] ] ] ,
109119 \* For limiting tests, it cannot be done outside of the module, since
110120 \* a test doesn't know, whether the bucket send or ref actually happened.
111121 errinj : [
@@ -162,6 +172,8 @@ StorageInit ==
162172 sendWaitTarget |-> [ b \in BucketIds |-> [ s \in Storages |-> 0 ] ] ,
163173 sendingBuckets |-> { } ,
164174 masterWaitTarget |-> [ s \in Storages |-> 0 ] ,
175+ recoveryAck |-> [ b \in BucketIds |->
176+ [ rs \in ReplicaSets |-> [ status |-> NULL , generation |-> - 1 ] ] ] ,
165177 errinj |-> [
166178 bucketSendCount |-> 0 ,
167179 bucketRWRefCount |-> 0 ,
@@ -193,6 +205,7 @@ StorageState(i) == [
193205 sendWaitTarget |-> storages [ i ] . sendWaitTarget ,
194206 sendingBuckets |-> storages [ i ] . sendingBuckets ,
195207 masterWaitTarget |-> storages [ i ] . masterWaitTarget ,
208+ recoveryAck |-> storages [ i ] . recoveryAck ,
196209 errinj |-> storages [ i ] . errinj
197210]
198211
@@ -206,7 +219,8 @@ StorageStateApply(i, state) ==
206219 VarSet ( i , "sendWaitTarget" , state . sendWaitTarget ,
207220 VarSet ( i , "sendingBuckets" , state . sendingBuckets ,
208221 VarSet ( i , "masterWaitTarget" , state . masterWaitTarget ,
209- VarSet ( i , "errinj" , state . errinj , storages ) ) ) ) ) ) ) ) ) )
222+ VarSet ( i , "recoveryAck" , state . recoveryAck ,
223+ VarSet ( i , "errinj" , state . errinj , storages ) ) ) ) ) ) ) ) ) ) )
210224 /\ network ' =
211225 [ s \in Storages |->
212226 [ t \in Storages |->
@@ -466,6 +480,7 @@ RecoverySendStatRequest(state, b) ==
466480 /\ ~ ( b \in state . transferingBuckets )
467481 /\ dest_rs # NULL
468482 /\ MasterSyncDone ( state )
483+ /\ ~ ( \E rs \in ReplicaSets : state . recoveryAck [ b ] [ rs ] . generation # - 1 )
469484 THEN
470485 \* Choose any storage in the destination replicaset.
471486 LET candidates == { s \in Storages :
@@ -490,12 +505,32 @@ ProcessRecoveryStatRequest(state, j) ==
490505 content |-> [
491506 bucket |-> b ,
492507 status |-> state . buckets [ b ] . status ,
493- transferring |-> ( b \in state . transferingBuckets )
508+ transferring |-> ( b \in state . transferingBuckets ) ,
509+ generation |-> state . buckets [ b ] . generation
494510 ] ]
495511 IN [ state EXCEPT
496512 ! . networkReceive [ j ] = Tail ( @ ) ,
497513 ! . networkSend [ j ] = Append ( @ , reply ) ]
498514
515+ SendRecoveryFullscanRequests ( state , b ) ==
516+ LET msg == [ type |-> "RECOVERY_BUCKET_FULLSCAN" ,
517+ content |-> [ bucket |-> b ] ]
518+ currentRs == ReplicasetOf ( state . id )
519+ chosen == [ rs \in ReplicaSets \ { currentRs } |->
520+ CHOOSE s \in Storages :
521+ storageToReplicaset [ s ] = rs ]
522+ localGen == state . buckets [ b ] . generation
523+ localStatus == state . buckets [ b ] . status
524+ IN [ state EXCEPT
525+ ! . networkSend =
526+ [ j \in Storages |->
527+ IF \E rs \in ReplicaSets \ { currentRs } : j = chosen [ rs ]
528+ THEN Append ( state . networkSend [ j ] , msg )
529+ ELSE state . networkSend [ j ] ] ,
530+ ! . recoveryAck = [ state . recoveryAck EXCEPT ! [ b ] [ currentRs ] =
531+ [ status |-> localStatus , generation |-> localGen ] ] ]
532+
533+
499534ProcessRecoveryStatResponse ( state , j ) ==
500535 LET msg == Head ( state . networkReceive [ j ] ) IN
501536 IF msg . type # "RECOVERY_BUCKET_STAT_RESPONSE" THEN state
@@ -507,28 +542,87 @@ ProcessRecoveryStatResponse(state, j) ==
507542 LET b == msg . content . bucket
508543 remoteStatus == msg . content . status
509544 remoteTransf == msg . content . transferring
545+ remoteGen == msg . content . generation
510546 localStatus == state . buckets [ b ] . status
547+ localGen == state . buckets [ b ] . generation
548+ state1 == [ state EXCEPT ! . networkReceive [ j ] = Tail ( @ ) ]
511549 IN
512550 IF ~ ( localStatus \in TransferStates ) THEN
513- [ state EXCEPT ! . networkReceive [ j ] = Tail ( @ ) ]
514- \* Recovery policy: sender adjusts state after getting peer's status.
515- ELSE IF localStatus = "SENDING" /\ remoteStatus \in { "ACTIVE" } THEN
516- LET state1 == [ state EXCEPT ! . networkReceive [ j ] = Tail ( @ ) ] IN
517- BucketStatusChange ( state1 , state . id , b , "SENT" ,
518- state . buckets [ b ] . destination , state1 . buckets [ b ] . generation )
519- ELSE IF localStatus = "RECEIVING" /\
520- ( remoteStatus \in { "ACTIVE" , "PINNED" }
521- \/ ( remoteStatus = "SENDING" /\ ~ remoteTransf ) ) THEN
522- LET state1 == [ state EXCEPT ! . networkReceive [ j ] = Tail ( @ ) ] IN
551+ state1
552+ ELSE IF remoteGen > localGen THEN
523553 BucketStatusChange ( state1 , state . id , b , "GARBAGE" , NULL ,
524554 state1 . buckets [ b ] . generation )
525- ELSE IF ( b \notin state . transferingBuckets )
526- /\ ( remoteStatus \in { "SENT" , "GARBAGE" } \/ remoteStatus = NULL ) THEN
527- LET state1 == [ state EXCEPT ! . networkReceive [ j ] = Tail ( @ ) ] IN
528- BucketStatusChange ( state1 , state . id , b , "ACTIVE" , NULL ,
529- state1 . buckets [ b ] . generation )
530- ELSE
531- [ state EXCEPT ! . networkReceive [ j ] = Tail ( @ ) ]
555+ ELSE IF remoteGen = localGen THEN
556+ \* Recovery policy: sender adjusts state after getting peer's status.
557+ IF localStatus = "SENDING" /\ remoteStatus \in { "ACTIVE" } THEN
558+ BucketStatusChange ( state1 , state . id , b , "SENT" ,
559+ state . buckets [ b ] . destination , state1 . buckets [ b ] . generation )
560+ ELSE IF localStatus = "RECEIVING" /\
561+ ( remoteStatus \in { "ACTIVE" , "PINNED" }
562+ \/ ( remoteStatus = "SENDING" /\ ~ remoteTransf ) ) THEN
563+ BucketStatusChange ( state1 , state . id , b , "GARBAGE" , NULL ,
564+ state1 . buckets [ b ] . generation )
565+ ELSE IF ( b \notin state . transferingBuckets )
566+ /\ ( remoteStatus \in { "SENT" , "GARBAGE" } ) THEN
567+ BucketStatusChange ( state1 , state . id , b , "ACTIVE" , NULL ,
568+ state1 . buckets [ b ] . generation )
569+ ELSE
570+ state1
571+ ELSE IF remoteGen = 0 THEN
572+ SendRecoveryFullscanRequests ( state1 , b )
573+ ELSE LET a == Assert ( FALSE , "remote gen < local gen" )
574+ \* This should never happen.
575+ IN state1
576+
577+ ProcessRecoveryFullscanRequest ( state , j ) ==
578+ LET msg == Head ( state . networkReceive [ j ] ) IN
579+ IF msg . type # "RECOVERY_BUCKET_FULLSCAN" THEN state
580+ ELSE
581+ IF ~ IsMaster ( state ) \/ ~ MasterSyncDone ( state ) THEN
582+ [ state EXCEPT ! . networkReceive [ j ] = Tail ( @ ) ]
583+ ELSE
584+ LET b == msg . content . bucket
585+ reply == [ type |-> "RECOVERY_BUCKET_FULLSCAN_RESPONSE" ,
586+ content |-> [
587+ bucket |-> b ,
588+ status |-> state . buckets [ b ] . status ,
589+ generation |-> state . buckets [ b ] . generation ] ]
590+ IN [ state EXCEPT
591+ ! . networkReceive [ j ] = Tail ( @ ) ,
592+ ! . networkSend [ j ] = Append ( @ , reply ) ]
593+
594+ BucketClearRecoveryAck ( state , b ) ==
595+ [ state EXCEPT ! . recoveryAck [ b ] =
596+ [ rs \in ReplicaSets |-> [ status |-> NULL , generation |-> - 1 ] ] ]
597+
598+ ProcessRecoveryFullscanResponse ( state , j ) ==
599+ LET msg == Head ( state . networkReceive [ j ] ) IN
600+ IF msg . type # "RECOVERY_BUCKET_FULLSCAN_RESPONSE" THEN state
601+ ELSE
602+ LET b == msg . content . bucket
603+ rs == storageToReplicaset [ j ]
604+ remoteStatus == msg . content . status
605+ remoteGen == msg . content . generation
606+ localGen == state . buckets [ b ] . generation
607+ localStatus == state . buckets [ b ] . status
608+ ack_after == [ state . recoveryAck EXCEPT
609+ ! [ b ] [ rs ] = [ status |-> remoteStatus , generation |-> remoteGen ] ]
610+ state1 == [ state EXCEPT
611+ ! . networkReceive [ j ] = Tail ( @ ) ,
612+ ! . recoveryAck = ack_after ]
613+ IN LET receivedRS == { r \in ReplicaSets : ack_after [ b ] [ r ] . generation # - 1 }
614+ allResponded == receivedRS = ReplicaSets
615+ IN
616+ IF ~ allResponded THEN state1
617+ ELSE
618+ LET gens == [ r \in ReplicaSets |-> ack_after [ b ] [ r ] . generation ]
619+ higherGen == \E r \in ReplicaSets : gens [ r ] > localGen IN
620+ IF higherGen THEN
621+ BucketClearRecoveryAck (
622+ BucketStatusChange ( state1 , state . id , b , "GARBAGE" , NULL , localGen ) , b )
623+ ELSE
624+ BucketClearRecoveryAck (
625+ BucketStatusChange ( state1 , state . id , b , "ACTIVE" , NULL , localGen ) , b )
532626
533627(* **************************************************************************)
534628(* Garbage Collector *)
@@ -707,6 +801,8 @@ Next ==
707801 \/ StorageStateApply ( i , BucketRecvFinish ( state , j ) )
708802 \/ StorageStateApply ( i , ProcessRecoveryStatRequest ( state , j ) )
709803 \/ StorageStateApply ( i , ProcessRecoveryStatResponse ( state , j ) )
804+ \/ StorageStateApply ( i , ProcessRecoveryFullscanRequest ( state , j ) )
805+ \/ StorageStateApply ( i , ProcessRecoveryFullscanResponse ( state , j ) )
710806 \/ StorageStateApply ( i , ProcessGcTestRequest ( state , j ) )
711807 \/ StorageStateApply ( i , ProcessGcTestResponse ( state , j ) )
712808 \/ \E b \in BucketIds , mode \in { "read" , "write" } :
@@ -721,5 +817,6 @@ Next ==
721817 \/ StorageStateApply ( i , GcSendTestRequest ( state , b ) )
722818 \/ StorageStateApply ( i , BucketDropFromTransfering ( state , b ) )
723819 \/ StorageStateApply ( i , BucketDropFromSending ( state , b ) )
820+ \/ StorageStateApply ( i , BucketClearRecoveryAck ( state , b ) )
724821
725822================================================================================
0 commit comments