16
16
//! Node, and so this should not be problematic.
17
17
18
18
use crate :: validators:: { ReconfigurationError , ValidatedReconfigureMsg } ;
19
- use crate :: {
20
- CoordinatorState , Envelope , Epoch , PersistentState , PlatformId , messages:: * ,
21
- } ;
19
+ use crate :: { CoordinatorState , Epoch , NodeHandlerCtx , PlatformId , messages:: * } ;
22
20
use slog:: { Logger , error, o, warn} ;
23
- use std:: time:: Instant ;
24
21
25
22
/// An entity capable of participating in trust quorum
26
23
///
@@ -30,30 +27,16 @@ use std::time::Instant;
30
27
pub struct Node {
31
28
log : Logger ,
32
29
33
- /// The unique hardware ID of a sled
34
- platform_id : PlatformId ,
35
-
36
- /// State that gets persistenly stored in ledgers
37
- persistent_state : PersistentState ,
38
-
39
30
/// In memory state for when this node is coordinating a reconfiguration
40
31
coordinator_state : Option < CoordinatorState > ,
41
32
}
42
33
43
34
impl Node {
44
- pub fn new (
45
- log : Logger ,
46
- platform_id : PlatformId ,
47
- persistent_state : PersistentState ,
48
- ) -> Node {
49
- let id_str = format ! ( "{platform_id:?}" ) ;
35
+ pub fn new ( log : Logger , ctx : & mut impl NodeHandlerCtx ) -> Node {
36
+ let id_str = format ! ( "{:?}" , ctx. platform_id( ) ) ;
50
37
let log =
51
38
log. new ( o ! ( "component" => "trust-quorum" , "platform_id" => id_str) ) ;
52
- Node { log, platform_id, persistent_state, coordinator_state : None }
53
- }
54
-
55
- pub fn platform_id ( & self ) -> & PlatformId {
56
- & self . platform_id
39
+ Node { log, coordinator_state : None }
57
40
}
58
41
59
42
/// Start coordinating a reconfiguration
@@ -64,58 +47,54 @@ impl Node {
64
47
/// For upgrading from LRTQ, use `coordinate_upgrade_from_lrtq`
65
48
pub fn coordinate_reconfiguration (
66
49
& mut self ,
67
- now : Instant ,
68
- outbox : & mut Vec < Envelope > ,
50
+ ctx : & mut impl NodeHandlerCtx ,
69
51
msg : ReconfigureMsg ,
70
- ) -> Result < Option < PersistentState > , ReconfigurationError > {
52
+ ) -> Result < ( ) , ReconfigurationError > {
71
53
let Some ( validated_msg) = ValidatedReconfigureMsg :: new (
72
54
& self . log ,
73
- & self . platform_id ,
55
+ ctx . platform_id ( ) ,
74
56
msg,
75
- ( & self . persistent_state ) . into ( ) ,
57
+ ctx . persistent_state ( ) . into ( ) ,
76
58
self . coordinator_state . as_ref ( ) . map ( |cs| cs. reconfigure_msg ( ) ) ,
77
59
) ?
78
60
else {
79
61
// This was an idempotent (duplicate) request.
80
- return Ok ( None ) ;
62
+ return Ok ( ( ) ) ;
81
63
} ;
82
64
83
- let persistent_state =
84
- self . set_coordinator_state ( now, validated_msg) ?;
85
- self . send_coordinator_msgs ( now, outbox) ;
86
- Ok ( persistent_state)
65
+ self . set_coordinator_state ( ctx, validated_msg) ?;
66
+ self . send_coordinator_msgs ( ctx) ;
67
+ Ok ( ( ) )
87
68
}
88
69
89
70
/// Process a timer tick
90
71
///
91
72
/// Ticks are issued by the caller in order to move the protocol forward.
92
73
/// The current time is passed in to make the calls deterministic.
93
- pub fn tick ( & mut self , now : Instant , outbox : & mut Vec < Envelope > ) {
94
- self . send_coordinator_msgs ( now , outbox ) ;
74
+ pub fn tick ( & mut self , ctx : & mut impl NodeHandlerCtx ) {
75
+ self . send_coordinator_msgs ( ctx ) ;
95
76
}
96
77
97
78
/// Handle a message from another node
98
79
pub fn handle (
99
80
& mut self ,
100
- _now : Instant ,
101
- _outbox : & mut Vec < Envelope > ,
81
+ ctx : & mut impl NodeHandlerCtx ,
102
82
from : PlatformId ,
103
83
msg : PeerMsg ,
104
- ) -> Option < PersistentState > {
105
- if let Some ( rack_id) = self . persistent_state . rack_id ( ) {
84
+ ) {
85
+ if let Some ( rack_id) = ctx . persistent_state ( ) . rack_id ( ) {
106
86
if rack_id != msg. rack_id {
107
87
error ! ( self . log, "Mismatched rack id" ;
108
88
"from" => %from,
109
89
"msg" => msg. kind. name( ) ,
110
90
"expected" => %rack_id,
111
91
"got" => %msg. rack_id) ;
112
- return None ;
92
+ return ;
113
93
}
114
94
}
115
95
match msg. kind {
116
96
PeerMsgKind :: PrepareAck ( epoch) => {
117
97
self . handle_prepare_ack ( from, epoch) ;
118
- None
119
98
}
120
99
_ => todo ! (
121
100
"cannot handle message variant yet - not implemented: {msg:?}"
@@ -154,16 +133,12 @@ impl Node {
154
133
}
155
134
156
135
// Send any required messages as a reconfiguration coordinator
157
- fn send_coordinator_msgs (
158
- & mut self ,
159
- now : Instant ,
160
- outbox : & mut Vec < Envelope > ,
161
- ) {
136
+ fn send_coordinator_msgs ( & mut self , ctx : & mut impl NodeHandlerCtx ) {
162
137
// This function is called unconditionally in `tick` callbacks. In this
163
138
// case we may not actually be a coordinator. We ignore the call in
164
139
// that case.
165
140
if let Some ( c) = self . coordinator_state . as_mut ( ) {
166
- c. send_msgs ( now , outbox ) ;
141
+ c. send_msgs ( ctx ) ;
167
142
}
168
143
}
169
144
@@ -175,53 +150,51 @@ impl Node {
175
150
/// we have a `ValidatedReconfigureMsg`.
176
151
fn set_coordinator_state (
177
152
& mut self ,
178
- now : Instant ,
153
+ ctx : & mut impl NodeHandlerCtx ,
179
154
msg : ValidatedReconfigureMsg ,
180
- ) -> Result < Option < PersistentState > , ReconfigurationError > {
155
+ ) -> Result < ( ) , ReconfigurationError > {
181
156
// We have no committed configuration or lrtq ledger
182
- if self . persistent_state . is_uninitialized ( ) {
157
+ if ctx . persistent_state ( ) . is_uninitialized ( ) {
183
158
let ( coordinator_state, my_config, my_share) =
184
159
CoordinatorState :: new_uninitialized (
185
160
self . log . clone ( ) ,
186
- now,
161
+ ctx . now ( ) ,
187
162
msg,
188
163
) ?;
189
164
self . coordinator_state = Some ( coordinator_state) ;
190
- self . persistent_state . shares . insert ( my_config . epoch , my_share ) ;
191
- self . persistent_state
192
- . configs
193
- . insert_unique ( my_config )
194
- . expect ( "empty state" ) ;
165
+ ctx . update_persistent_state ( move |ps| {
166
+ ps . shares . insert ( my_config . epoch , my_share ) ;
167
+ ps . configs . insert_unique ( my_config ) . expect ( "empty state" ) ;
168
+ true
169
+ } ) ;
195
170
196
- return Ok ( Some ( self . persistent_state . clone ( ) ) ) ;
171
+ return Ok ( ( ) ) ;
197
172
}
198
173
199
174
// We have a committed configuration that is not LRTQ
200
175
let config =
201
- self . persistent_state . latest_committed_configuration ( ) . unwrap ( ) ;
176
+ ctx . persistent_state ( ) . latest_committed_configuration ( ) . unwrap ( ) ;
202
177
203
178
self . coordinator_state = Some ( CoordinatorState :: new_reconfiguration (
204
179
self . log . clone ( ) ,
205
- now,
180
+ ctx . now ( ) ,
206
181
msg,
207
182
& config,
208
183
) ?) ;
209
184
210
- Ok ( None )
185
+ Ok ( ( ) )
211
186
}
212
187
}
213
188
214
189
#[ cfg( test) ]
215
190
mod tests {
216
- use std:: time:: Duration ;
217
-
218
- use crate :: { Epoch , Threshold } ;
219
-
220
191
use super :: * ;
192
+ use crate :: { Epoch , NodeCallerCtx , NodeCommonCtx , NodeCtx , Threshold } ;
221
193
use assert_matches:: assert_matches;
222
194
use omicron_test_utils:: dev:: test_setup_log;
223
195
use omicron_uuid_kinds:: RackUuid ;
224
196
use proptest:: prelude:: * ;
197
+ use std:: time:: Duration ;
225
198
use test_strategy:: { Arbitrary , proptest} ;
226
199
227
200
fn arb_member ( ) -> impl Strategy < Value = PlatformId > {
@@ -259,21 +232,21 @@ mod tests {
259
232
let logctx = test_setup_log ( "initial_configuration" ) ;
260
233
let my_platform_id =
261
234
input. reconfigure_msg . members . first ( ) . unwrap ( ) . clone ( ) ;
262
- let mut node = Node :: new (
263
- logctx. log . clone ( ) ,
264
- my_platform_id. clone ( ) ,
265
- PersistentState :: empty ( ) ,
266
- ) ;
235
+ let mut ctx = NodeCtx :: new ( my_platform_id. clone ( ) ) ;
236
+ let mut node = Node :: new ( logctx. log . clone ( ) , & mut ctx) ;
237
+
238
+ node. coordinate_reconfiguration (
239
+ & mut ctx,
240
+ input. reconfigure_msg . clone ( ) ,
241
+ )
242
+ . expect ( "success" ) ;
243
+
244
+ // An initial configuraration always causes a change to persistent state
245
+ assert ! ( ctx. persistent_state_change_check_and_reset( ) ) ;
246
+ // Checking if the persistent state has changed above cleared the bit
247
+ assert ! ( !ctx. persistent_state_change_check_and_reset( ) ) ;
267
248
268
- let mut outbox = Vec :: new ( ) ;
269
- let persistent_state = node
270
- . coordinate_reconfiguration (
271
- Instant :: now ( ) ,
272
- & mut outbox,
273
- input. reconfigure_msg . clone ( ) ,
274
- )
275
- . expect ( "success" )
276
- . expect ( "persistent state" ) ;
249
+ let persistent_state = ctx. persistent_state ( ) . clone ( ) ;
277
250
278
251
// A PersistentState should always be returned
279
252
// It should include the `PrepareMsg` for this node.
@@ -288,15 +261,15 @@ mod tests {
288
261
persistent_state. configs . get ( & input. reconfigure_msg . epoch ) . unwrap ( ) ;
289
262
290
263
assert_eq ! ( config. epoch, input. reconfigure_msg. epoch) ;
291
- assert_eq ! ( config. coordinator, * node . platform_id( ) ) ;
264
+ assert_eq ! ( config. coordinator, * ctx . platform_id( ) ) ;
292
265
assert_eq ! ( config. members. len( ) , input. reconfigure_msg. members. len( ) ) ;
293
266
assert_eq ! ( config. threshold, input. reconfigure_msg. threshold) ;
294
267
assert ! ( config. encrypted_rack_secrets. is_none( ) ) ;
295
268
296
269
// Ensure that prepare messages are properly put in the outbox to be
297
270
// sent by the I/O parts of the codebase
298
- assert_eq ! ( outbox . len ( ) , config. members. len( ) - 1 ) ;
299
- for envelope in outbox {
271
+ assert_eq ! ( ctx . num_envelopes ( ) , config. members. len( ) - 1 ) ;
272
+ for envelope in ctx . drain_envelopes ( ) {
300
273
assert_matches ! (
301
274
envelope. msg. kind,
302
275
PeerMsgKind :: Prepare { config: prepare_config, .. } => {
0 commit comments