@@ -103,25 +103,30 @@ impl MithrilInfrastructure {
103
103
let relay_signer_registration_mode = & config. relay_signer_registration_mode ;
104
104
let relay_signature_registration_mode = & config. relay_signature_registration_mode ;
105
105
106
- let aggregators =
107
- Self :: start_aggregators ( config, aggregator_cardano_nodes, chain_observer_type) . await ?;
108
- let aggregator_endpoints = aggregators
106
+ let ( leader_aggregator, follower_aggregators) =
107
+ Self :: prepare_aggregators ( config, aggregator_cardano_nodes, chain_observer_type)
108
+ . await ?;
109
+
110
+ Self :: register_startup_era ( & leader_aggregator, config) . await ?;
111
+ leader_aggregator. serve ( ) . await ?;
112
+
113
+ let follower_aggregator_endpoints = follower_aggregators
109
114
. iter ( )
110
115
. map ( |aggregator| aggregator. endpoint ( ) )
111
116
. collect :: < Vec < _ > > ( ) ;
112
- let leader_aggregator_endpoint = aggregator_endpoints[ 0 ] . to_owned ( ) ;
113
117
114
118
let ( relay_aggregators, relay_signers, relay_passives) = Self :: start_relays (
115
119
config,
116
- & aggregator_endpoints,
120
+ leader_aggregator. endpoint ( ) ,
121
+ & follower_aggregator_endpoints,
117
122
& signer_party_ids,
118
123
relay_signer_registration_mode. to_owned ( ) ,
119
124
relay_signature_registration_mode. to_owned ( ) ,
120
125
) ?;
121
126
122
127
let signers = Self :: start_signers (
123
128
config,
124
- leader_aggregator_endpoint ,
129
+ leader_aggregator . endpoint ( ) ,
125
130
signer_cardano_nodes,
126
131
& relay_signers,
127
132
)
@@ -132,11 +137,14 @@ impl MithrilInfrastructure {
132
137
CardanoNetwork :: DevNet ( DEVNET_MAGIC_ID ) ,
133
138
) ) ;
134
139
140
+ let mut all_aggregators = vec ! [ leader_aggregator] ;
141
+ all_aggregators. extend ( follower_aggregators) ;
142
+
135
143
Ok ( Self {
136
144
bin_dir : config. bin_dir . to_path_buf ( ) ,
137
145
artifacts_dir : config. artifacts_dir . to_path_buf ( ) ,
138
146
devnet : config. devnet . clone ( ) ,
139
- aggregators,
147
+ aggregators : all_aggregators ,
140
148
signers,
141
149
relay_aggregators,
142
150
relay_signers,
@@ -176,90 +184,110 @@ impl MithrilInfrastructure {
176
184
+ 1 ;
177
185
if self . era_reader_adapter == "cardano-chain" {
178
186
let devnet = self . devnet . clone ( ) ;
179
- assertions:: register_era_marker ( self . aggregator ( 0 ) , & devnet, next_era, next_era_epoch)
180
- . await ?;
187
+ assertions:: register_era_marker (
188
+ self . leader_aggregator ( ) ,
189
+ & devnet,
190
+ next_era,
191
+ next_era_epoch,
192
+ )
193
+ . await ?;
181
194
}
182
195
let mut current_era = self . current_era . write ( ) . await ;
183
196
* current_era = next_era. to_owned ( ) ;
184
197
185
198
Ok ( ( ) )
186
199
}
187
200
188
- async fn start_aggregators (
201
+ async fn prepare_aggregators (
189
202
config : & MithrilInfrastructureConfig ,
190
- pool_nodes : & [ FullNode ] ,
203
+ full_nodes : & [ FullNode ] ,
191
204
chain_observer_type : & str ,
192
- ) -> StdResult < Vec < Aggregator > > {
193
- let mut aggregators = vec ! [ ] ;
194
- let mut leader_aggregator_endpoint: Option < String > = None ;
195
- for ( index, full_node) in pool_nodes. iter ( ) . enumerate ( ) {
196
- let aggregator_name = Aggregator :: name_suffix ( index) ;
197
- let aggregator_artifacts_dir = config
198
- . artifacts_dir
199
- . join ( format ! ( "mithril-aggregator-{aggregator_name}" ) ) ;
200
- let aggregator_store_dir =
201
- config. store_dir . join ( format ! ( "aggregator-{aggregator_name}" ) ) ;
202
- let aggregator = Aggregator :: new ( & AggregatorConfig {
203
- index,
204
- name : & aggregator_name,
205
- server_port : config. server_port + index as u64 ,
205
+ ) -> StdResult < ( Aggregator , Vec < Aggregator > ) > {
206
+ let [ leader_node, follower_nodes @ ..] = full_nodes else {
207
+ panic ! ( "Can't prepare Aggregators: No full nodes found" ) ;
208
+ } ;
209
+ let leader_aggregator =
210
+ Self :: prepare_aggregator ( 0 , leader_node, config, chain_observer_type, None ) . await ?;
211
+
212
+ let mut follower_aggregators = vec ! [ ] ;
213
+ for ( index, full_node) in follower_nodes. iter ( ) . enumerate ( ) {
214
+ let aggregator = Self :: prepare_aggregator (
215
+ index + 1 ,
206
216
full_node,
207
- cardano_cli_path : & config. devnet . cardano_cli_path ( ) ,
208
- work_dir : & config. work_dir ,
209
- store_dir : & aggregator_store_dir,
210
- artifacts_dir : & aggregator_artifacts_dir,
211
- bin_dir : & config. bin_dir ,
212
- cardano_node_version : & config. cardano_node_version ,
213
- mithril_run_interval : config. mithril_run_interval ,
214
- mithril_era : & config. mithril_era ,
215
- mithril_era_reader_adapter : & config. mithril_era_reader_adapter ,
216
- mithril_era_marker_address : & config. devnet . mithril_era_marker_address ( ) ?,
217
- signed_entity_types : & config. signed_entity_types ,
217
+ config,
218
218
chain_observer_type,
219
- leader_aggregator_endpoint : & leader_aggregator_endpoint. clone ( ) ,
220
- } ) ?;
221
-
222
- aggregator
223
- . set_protocol_parameters ( & ProtocolParameters {
224
- k : 70 ,
225
- m : 105 ,
226
- phi_f : 0.95 ,
227
- } )
228
- . await ;
229
-
230
- if leader_aggregator_endpoint. is_none ( )
231
- && config. has_leader_follower_signer_registration ( )
232
- {
233
- leader_aggregator_endpoint = Some ( aggregator. endpoint ( ) ) ;
234
- }
235
-
236
- aggregators. push ( aggregator) ;
219
+ Some ( leader_aggregator. endpoint ( ) ) ,
220
+ )
221
+ . await ?;
222
+ follower_aggregators. push ( aggregator) ;
237
223
}
238
224
239
- Self :: register_startup_era ( & aggregators[ 0 ] , config) . await ?;
240
-
241
- for aggregator in & aggregators {
242
- aggregator. serve ( ) . await ?;
243
- }
225
+ Ok ( ( leader_aggregator, follower_aggregators) )
226
+ }
244
227
245
- Ok ( aggregators)
228
+ async fn prepare_aggregator (
229
+ index : usize ,
230
+ full_node : & FullNode ,
231
+ config : & MithrilInfrastructureConfig ,
232
+ chain_observer_type : & str ,
233
+ leader_aggregator_endpoint : Option < String > ,
234
+ ) -> StdResult < Aggregator > {
235
+ let aggregator_name = Aggregator :: name_suffix ( index) ;
236
+ let aggregator_artifacts_dir = config
237
+ . artifacts_dir
238
+ . join ( format ! ( "mithril-aggregator-{aggregator_name}" ) ) ;
239
+ let aggregator_store_dir = config. store_dir . join ( format ! ( "aggregator-{aggregator_name}" ) ) ;
240
+ let aggregator = Aggregator :: new ( & AggregatorConfig {
241
+ index,
242
+ name : & aggregator_name,
243
+ server_port : config. server_port + index as u64 ,
244
+ full_node,
245
+ cardano_cli_path : & config. devnet . cardano_cli_path ( ) ,
246
+ work_dir : & config. work_dir ,
247
+ store_dir : & aggregator_store_dir,
248
+ artifacts_dir : & aggregator_artifacts_dir,
249
+ bin_dir : & config. bin_dir ,
250
+ cardano_node_version : & config. cardano_node_version ,
251
+ mithril_run_interval : config. mithril_run_interval ,
252
+ mithril_era : & config. mithril_era ,
253
+ mithril_era_reader_adapter : & config. mithril_era_reader_adapter ,
254
+ mithril_era_marker_address : & config. devnet . mithril_era_marker_address ( ) ?,
255
+ signed_entity_types : & config. signed_entity_types ,
256
+ chain_observer_type,
257
+ leader_aggregator_endpoint : & leader_aggregator_endpoint,
258
+ } ) ?;
259
+
260
+ aggregator
261
+ . set_protocol_parameters ( & ProtocolParameters {
262
+ k : 70 ,
263
+ m : 105 ,
264
+ phi_f : 0.95 ,
265
+ } )
266
+ . await ;
267
+
268
+ Ok ( aggregator)
246
269
}
247
270
248
271
fn start_relays (
249
272
config : & MithrilInfrastructureConfig ,
250
- aggregator_endpoints : & [ String ] ,
273
+ leader_aggregator_endpoint : String ,
274
+ follower_aggregator_endpoints : & [ String ] ,
251
275
signers_party_ids : & [ PartyId ] ,
252
276
relay_signer_registration_mode : String ,
253
277
relay_signature_registration_mode : String ,
254
278
) -> StdResult < ( Vec < RelayAggregator > , Vec < RelaySigner > , Vec < RelayPassive > ) > {
255
279
if !config. use_relays {
256
280
return Ok ( ( vec ! [ ] , vec ! [ ] , vec ! [ ] ) ) ;
257
281
}
282
+ let aggregator_endpoints = [
283
+ vec ! [ leader_aggregator_endpoint. clone( ) ] ,
284
+ follower_aggregator_endpoints. to_vec ( ) ,
285
+ ]
286
+ . concat ( ) ;
258
287
259
288
let mut relay_aggregators: Vec < RelayAggregator > = vec ! [ ] ;
260
289
let mut relay_signers: Vec < RelaySigner > = vec ! [ ] ;
261
290
let mut relay_passives: Vec < RelayPassive > = vec ! [ ] ;
262
- let leader_aggregator_endpoint = & aggregator_endpoints[ 0 ] ;
263
291
264
292
info ! ( "Starting the Mithril infrastructure in P2P mode (experimental)" ) ;
265
293
@@ -287,7 +315,7 @@ impl MithrilInfrastructure {
287
315
dial_to : bootstrap_peer_addr. clone ( ) ,
288
316
relay_signer_registration_mode : relay_signer_registration_mode. clone ( ) ,
289
317
relay_signature_registration_mode : relay_signature_registration_mode. clone ( ) ,
290
- aggregator_endpoint : leader_aggregator_endpoint,
318
+ aggregator_endpoint : & leader_aggregator_endpoint,
291
319
party_id : party_id. clone ( ) ,
292
320
work_dir : & config. work_dir ,
293
321
bin_dir : & config. bin_dir ,
@@ -377,7 +405,7 @@ impl MithrilInfrastructure {
377
405
signer. stop ( ) . await ?;
378
406
}
379
407
380
- for aggregator in & self . aggregators {
408
+ for aggregator in self . aggregators ( ) {
381
409
aggregator. stop ( ) . await ?;
382
410
}
383
411
@@ -396,6 +424,18 @@ impl MithrilInfrastructure {
396
424
& self . aggregators [ index]
397
425
}
398
426
427
+ pub fn leader_aggregator ( & self ) -> & Aggregator {
428
+ & self . aggregators [ 0 ]
429
+ }
430
+
431
+ pub fn follower_aggregators ( & self ) -> & [ Aggregator ] {
432
+ & self . aggregators [ 1 ..]
433
+ }
434
+
435
+ pub fn follower_aggregator ( & self , index : usize ) -> & Aggregator {
436
+ & self . aggregators [ index + 1 ]
437
+ }
438
+
399
439
pub fn signers ( & self ) -> & [ Signer ] {
400
440
& self . signers
401
441
}
0 commit comments