Skip to content

Commit 4f31206

Browse files
committed
gossip: fix state machine for active set buckets
1 parent 14d10eb commit 4f31206

File tree

12 files changed

+991
-735
lines changed

12 files changed

+991
-735
lines changed

src/flamenco/gossip/crds/fd_crds.c

Lines changed: 77 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "fd_crds.h"
22
#include "fd_crds_contact_info.c"
33
#include "../fd_gossip_types.h"
4+
#include "../fd_gossip_wpeer_sampler.h"
45

56
#include "../../../ballet/sha256/fd_sha256.h"
67
#include "../../../funk/fd_funk_base.h" /* no link dependency, only using hash */
@@ -40,7 +41,6 @@ struct fd_crds_entry_private {
4041
fd_crds_contact_info_entry_t * ci;
4142
long instance_creation_wallclock_nanos;
4243
uchar is_active;
43-
ulong sampler_idx;
4444

4545
/* A list of "fresh" contact info entries is maintained, holding
4646
entries that have been refreshed/inserted in the last 60s in
@@ -269,8 +269,6 @@ lookup_eq( fd_crds_key_t const * key0,
269269

270270
#include "../../../util/tmpl/fd_map_chain.c"
271271

272-
#include "fd_crds_peer_samplers.c"
273-
274272
struct fd_crds_purged {
275273
uchar hash[ 32UL ];
276274
struct {
@@ -340,6 +338,8 @@ typedef struct fd_crds_purged fd_crds_purged_t;
340338

341339
struct fd_crds_private {
342340
fd_gossip_out_ctx_t * gossip_update;
341+
fd_crds_ci_change_fn change_fn;
342+
void * change_fn_ctx;
343343

344344
fd_sha256_t sha256[1];
345345

@@ -366,7 +366,7 @@ struct fd_crds_private {
366366
crds_contact_info_evict_dlist_t * evict_dlist;
367367
} contact_info;
368368

369-
crds_samplers_t samplers[1];
369+
wpeer_sampler_t * crds_sampler;
370370

371371
fd_crds_metrics_t metrics[1];
372372

@@ -397,15 +397,18 @@ fd_crds_footprint( ulong ele_max,
397397
l = FD_LAYOUT_APPEND( l, crds_contact_info_pool_align(), crds_contact_info_pool_footprint( CRDS_MAX_CONTACT_INFO ) );
398398
l = FD_LAYOUT_APPEND( l, crds_contact_info_fresh_list_align(), crds_contact_info_fresh_list_footprint() );
399399
l = FD_LAYOUT_APPEND( l, crds_contact_info_evict_dlist_align(), crds_contact_info_evict_dlist_footprint() );
400+
l = FD_LAYOUT_APPEND( l, wpeer_sampler_align(), wpeer_sampler_footprint( CRDS_MAX_CONTACT_INFO ) );
400401
return FD_LAYOUT_FINI( l, FD_CRDS_ALIGN );
401402
}
402403

403404
void *
404-
fd_crds_new( void * shmem,
405-
fd_rng_t * rng,
406-
ulong ele_max,
407-
ulong purged_max,
408-
fd_gossip_out_ctx_t * gossip_update_out ) {
405+
fd_crds_new( void * shmem,
406+
fd_rng_t * rng,
407+
ulong ele_max,
408+
ulong purged_max,
409+
fd_gossip_out_ctx_t * gossip_update_out,
410+
fd_crds_ci_change_fn change_fn,
411+
void * change_fn_ctx ) {
409412
if( FD_UNLIKELY( !shmem ) ) {
410413
FD_LOG_WARNING(( "NULL shmem" ));
411414
return NULL;
@@ -451,6 +454,7 @@ fd_crds_new( void * shmem,
451454
void * _ci_pool = FD_SCRATCH_ALLOC_APPEND( l, crds_contact_info_pool_align(), crds_contact_info_pool_footprint( CRDS_MAX_CONTACT_INFO ) );
452455
void * _ci_dlist = FD_SCRATCH_ALLOC_APPEND( l, crds_contact_info_fresh_list_align(), crds_contact_info_fresh_list_footprint() );
453456
void * _ci_evict_dlist = FD_SCRATCH_ALLOC_APPEND( l, crds_contact_info_evict_dlist_align(), crds_contact_info_evict_dlist_footprint() );
457+
void * _wpeer_sampler = FD_SCRATCH_ALLOC_APPEND( l, wpeer_sampler_align(), wpeer_sampler_footprint( CRDS_MAX_CONTACT_INFO ) );
454458
FD_TEST( FD_SCRATCH_ALLOC_FINI( l, FD_CRDS_ALIGN ) == (ulong)shmem + fd_crds_footprint( ele_max, purged_max ) );
455459

456460
crds->pool = crds_pool_join( crds_pool_new( _pool, ele_max ) );
@@ -495,13 +499,17 @@ fd_crds_new( void * shmem,
495499
crds->contact_info.evict_dlist = crds_contact_info_evict_dlist_join( crds_contact_info_evict_dlist_new( _ci_evict_dlist ) );
496500
FD_TEST( crds->contact_info.evict_dlist );
497501

502+
crds->crds_sampler = wpeer_sampler_join( wpeer_sampler_new( _wpeer_sampler, CRDS_MAX_CONTACT_INFO ) );
503+
FD_TEST( crds->crds_sampler );
504+
498505
FD_TEST( fd_sha256_join( fd_sha256_new( crds->sha256 ) ) );
499506

500-
crds_samplers_new( crds->samplers );
501507

502508
memset( crds->metrics, 0, sizeof(fd_crds_metrics_t) );
503509

504510
crds->gossip_update = gossip_update_out;
511+
crds->change_fn = change_fn;
512+
crds->change_fn_ctx = change_fn_ctx;
505513
crds->has_staked_node = 0;
506514

507515
FD_COMPILER_MFENCE();
@@ -551,6 +559,10 @@ remove_contact_info( fd_crds_t * crds,
551559
msg->contact_info_remove.idx = crds_contact_info_pool_idx( crds->contact_info.pool, ci->contact_info.ci );
552560
fd_gossip_tx_publish_chunk( crds->gossip_update, stem, (ulong)msg->tag, FD_GOSSIP_UPDATE_SZ_CONTACT_INFO_REMOVE, now );
553561

562+
ulong pool_idx = crds_contact_info_pool_idx( crds->contact_info.pool, ci->contact_info.ci );
563+
crds->change_fn( crds->change_fn_ctx, pool_idx, ci->stake, FD_CRDS_CONTACT_INFO_CHANGE_TYPE_REMOVED, stem, now );
564+
wpeer_sampler_upd( crds->crds_sampler, 0, pool_idx );
565+
554566
if( FD_LIKELY( ci->stake ) ) crds->metrics->peer_staked_cnt--;
555567
else crds->metrics->peer_unstaked_cnt--;
556568

@@ -561,13 +573,6 @@ remove_contact_info( fd_crds_t * crds,
561573
}
562574
crds_contact_info_evict_dlist_ele_remove( crds->contact_info.evict_dlist, ci, crds->pool );
563575
crds_contact_info_pool_ele_release( crds->contact_info.pool, ci->contact_info.ci );
564-
565-
/* FIXME: If the peer is in any active set bucket, it is NOT removed
566-
here. If the peer is re-inserted into the CRDS table in the future,
567-
it is added back into the bucket's sampler. This means a peer can
568-
be sampled in a bucket (at least) twice during
569-
fd_active_set_rotate. */
570-
crds_samplers_rem_peer( crds->samplers, ci );
571576
}
572577

573578
ulong
@@ -658,6 +663,24 @@ expire( fd_crds_t * crds,
658663
}
659664
}
660665

666+
/* Freshness is based on how recent a peer's contact info was last
667+
updated.
668+
669+
Activity is based on whether a peer responds to a ping-pong request,
670+
tracked by fd_ping_tracker. */
671+
672+
#define BASE_WEIGHT 256UL /* TODO: test this out!! */
673+
ulong
674+
peer_sampler_score( ulong stake,
675+
int is_active,
676+
int is_fresh ) {
677+
if( FD_UNLIKELY( !is_active ) ) return 0UL;
678+
679+
ulong score = BASE_WEIGHT + stake;
680+
if( FD_UNLIKELY( !is_fresh ) ) score>>=7;
681+
return score;
682+
}
683+
661684
void
662685
unfresh( fd_crds_t * crds,
663686
long now ) {
@@ -668,7 +691,11 @@ unfresh( fd_crds_t * crds,
668691

669692
head = crds_contact_info_fresh_list_ele_pop_head( crds->contact_info.fresh_dlist, crds->pool );
670693
head->contact_info.fresh_dlist.in_list = 0;
671-
crds_samplers_upd_peer_at_idx( crds->samplers, head, head->contact_info.sampler_idx, now );
694+
695+
ulong pool_idx = crds_contact_info_pool_idx( crds->contact_info.pool, head->contact_info.ci );
696+
ulong score = peer_sampler_score( head->stake, head->contact_info.is_active, 0 );
697+
698+
wpeer_sampler_upd( crds->crds_sampler, score, pool_idx );
672699
}
673700
}
674701

@@ -762,8 +789,6 @@ crds_entry_init( fd_gossip_view_crds_value_t const * view,
762789
out_value->node_instance.token = view->node_instance->token;
763790
} else if( FD_UNLIKELY( key->tag==FD_GOSSIP_VALUE_CONTACT_INFO ) ) {
764791
out_value->contact_info.instance_creation_wallclock_nanos = view->ci_view->contact_info->instance_creation_wallclock_nanos;
765-
/* Contact Info entry will be added to sampler upon successful insertion */
766-
out_value->contact_info.sampler_idx = SAMPLE_IDX_SENTINEL;
767792
}
768793
}
769794

@@ -1011,13 +1036,11 @@ fd_crds_insert( fd_crds_t * crds,
10111036
and is used in sampler score calculations. So we inherit the
10121037
incumbent's setting. */
10131038
candidate->contact_info.is_active = incumbent->contact_info.is_active;
1014-
if( FD_LIKELY( !is_from_me ) ) {
1015-
if( FD_UNLIKELY( candidate->stake!=incumbent->stake ) ) {
1016-
/* Perform a rescore here (expensive) */
1017-
crds_samplers_upd_peer_at_idx( crds->samplers, candidate, incumbent->contact_info.sampler_idx, now );
1018-
} else {
1019-
crds_samplers_swap_peer_at_idx( crds->samplers, candidate, incumbent->contact_info.sampler_idx );
1020-
}
1039+
if( FD_UNLIKELY( candidate->stake!=incumbent->stake && !is_from_me ) ) {
1040+
ulong score = peer_sampler_score( origin_stake, candidate->contact_info.is_active, 0 );
1041+
ulong pool_idx = crds_contact_info_pool_idx( crds->contact_info.pool, candidate->contact_info.ci );
1042+
wpeer_sampler_upd( crds->crds_sampler, score, pool_idx );
1043+
crds->change_fn( crds->change_fn_ctx, pool_idx, candidate->stake, FD_CRDS_CONTACT_INFO_CHANGE_TYPE_STAKE_CHANGED, stem, now );
10211044
}
10221045

10231046
if( FD_LIKELY( incumbent->stake ) ) crds->metrics->peer_staked_cnt--;
@@ -1085,7 +1108,10 @@ fd_crds_insert( fd_crds_t * crds,
10851108
}
10861109

10871110
if( FD_UNLIKELY( !is_replacing && !is_from_me ) ) {
1088-
crds_samplers_add_peer( crds->samplers, candidate, now);
1111+
ulong pool_idx = crds_contact_info_pool_idx( crds->contact_info.pool, candidate->contact_info.ci );
1112+
ulong score = peer_sampler_score( origin_stake, candidate->contact_info.is_active, 1 );
1113+
wpeer_sampler_upd( crds->crds_sampler, score, pool_idx );
1114+
crds->change_fn( crds->change_fn_ctx, pool_idx, candidate->stake, FD_CRDS_CONTACT_INFO_CHANGE_TYPE_NEW, stem, now );
10891115
}
10901116

10911117
if( FD_LIKELY( candidate->stake ) ) crds->metrics->peer_staked_cnt++;
@@ -1142,6 +1168,13 @@ fd_crds_contact_info_lookup( fd_crds_t const * crds,
11421168
return peer_ci->contact_info.ci->contact_info;
11431169
}
11441170

1171+
fd_contact_info_t const *
1172+
fd_crds_contact_info_idx_lookup( fd_crds_t const * crds,
1173+
ulong idx ) {
1174+
fd_crds_contact_info_entry_t const * ci = crds_contact_info_pool_ele_const( crds->contact_info.pool, idx );
1175+
return ci->contact_info;
1176+
}
1177+
11451178
ulong
11461179
fd_crds_peer_count( fd_crds_t const * crds ){
11471180
return crds_contact_info_pool_used( crds->contact_info.pool );
@@ -1150,88 +1183,46 @@ fd_crds_peer_count( fd_crds_t const * crds ){
11501183
static inline void
11511184
set_peer_active_status( fd_crds_t * crds,
11521185
uchar const * peer_pubkey,
1153-
uchar status,
1154-
long now ) {
1186+
uchar status ) {
11551187

11561188
fd_crds_key_t key[1];
11571189
make_contact_info_key( peer_pubkey, key );
11581190

11591191
fd_crds_entry_t * peer_ci = lookup_map_ele_query( crds->lookup_map, key, NULL, crds->pool );
1160-
/* TODO: error handling? This technically should never hit */
1192+
1193+
/* In its first inactive->active transition, a peer will not have a
1194+
corresponding contact info entry since their contact infos are
1195+
dropped on ingress until marked active by ping tracker. */
11611196
if( FD_UNLIKELY( !peer_ci ) ) return;
1162-
uchar old_status = peer_ci->contact_info.is_active;
1197+
1198+
uchar old_status = peer_ci->contact_info.is_active;
11631199
peer_ci->contact_info.is_active = status;
11641200

11651201
if( FD_UNLIKELY( old_status!=status ) ) {
11661202
/* Trigger sampler update */
1167-
crds_samplers_upd_peer_at_idx( crds->samplers,
1168-
peer_ci,
1169-
peer_ci->contact_info.sampler_idx,
1170-
now );
1203+
ulong pool_idx = crds_contact_info_pool_idx( crds->contact_info.pool, peer_ci->contact_info.ci );
1204+
ulong score = peer_sampler_score( peer_ci->stake, status, !!peer_ci->contact_info.fresh_dlist.in_list );
1205+
wpeer_sampler_upd( crds->crds_sampler, score, pool_idx );
11711206
}
11721207
}
11731208
void
11741209
fd_crds_peer_active( fd_crds_t * crds,
1175-
uchar const * peer_pubkey,
1176-
long now ) {
1177-
set_peer_active_status( crds, peer_pubkey, 1 /* active */, now );
1210+
uchar const * peer_pubkey ) {
1211+
set_peer_active_status( crds, peer_pubkey, 1 /* active */ );
11781212
}
11791213

11801214
void
11811215
fd_crds_peer_inactive( fd_crds_t * crds,
1182-
uchar const * peer_pubkey,
1183-
long now ) {
1184-
set_peer_active_status( crds, peer_pubkey, 0 /* inactive */, now );
1216+
uchar const * peer_pubkey ) {
1217+
set_peer_active_status( crds, peer_pubkey, 0 /* inactive */);
11851218
}
11861219

11871220
fd_contact_info_t const *
11881221
fd_crds_peer_sample( fd_crds_t const * crds,
11891222
fd_rng_t * rng ) {
1190-
ulong idx = wpeer_sampler_sample( crds->samplers->pr_sampler,
1191-
rng,
1192-
crds->samplers->ele_cnt );
1193-
if( FD_UNLIKELY( idx==SAMPLE_IDX_SENTINEL ) ) return NULL;
1194-
return fd_crds_entry_contact_info( crds->samplers->ele[idx] );
1195-
}
1196-
1197-
fd_contact_info_t const *
1198-
fd_crds_bucket_sample_and_remove( fd_crds_t * crds,
1199-
fd_rng_t * rng,
1200-
ulong bucket ) {
1201-
ulong idx = wpeer_sampler_sample( &crds->samplers->bucket_samplers[bucket],
1202-
rng,
1203-
crds->samplers->ele_cnt );
1223+
ulong idx = wpeer_sampler_sample( crds->crds_sampler, rng );
12041224
if( FD_UNLIKELY( idx==SAMPLE_IDX_SENTINEL ) ) return NULL;
1205-
/* Disable peer to prevent future sampling until added back with
1206-
fd_crds_bucket_add */
1207-
wpeer_sampler_disable( &crds->samplers->bucket_samplers[bucket],
1208-
idx,
1209-
crds->samplers->ele_cnt );
1210-
1211-
return fd_crds_entry_contact_info( crds->samplers->ele[idx] );
1212-
}
1213-
1214-
void
1215-
fd_crds_bucket_add( fd_crds_t * crds,
1216-
ulong bucket,
1217-
uchar const * pubkey ) {
1218-
fd_crds_key_t key[1];
1219-
make_contact_info_key( pubkey, key );
1220-
fd_crds_entry_t * peer_ci = lookup_map_ele_query( crds->lookup_map, key, NULL, crds->pool );
1221-
if( FD_UNLIKELY( !peer_ci ) ) {
1222-
FD_LOG_DEBUG(( "Sample peer not found in CRDS. Likely dropped." ));
1223-
return;
1224-
}
1225-
wpeer_sampler_t * bucket_sampler = &crds->samplers->bucket_samplers[bucket];
1226-
wpeer_sampler_enable( bucket_sampler,
1227-
peer_ci->contact_info.sampler_idx,
1228-
crds->samplers->ele_cnt );
1229-
1230-
ulong score = wpeer_sampler_bucket_score( peer_ci, bucket );
1231-
wpeer_sampler_upd( bucket_sampler,
1232-
score,
1233-
peer_ci->contact_info.sampler_idx,
1234-
crds->samplers->ele_cnt );
1225+
return crds_contact_info_pool_ele( crds->contact_info.pool, idx )->contact_info;
12351226
}
12361227

12371228
struct fd_crds_mask_iter_private {

0 commit comments

Comments
 (0)