diff --git a/src/flamenco/gossip/Local.mk b/src/flamenco/gossip/Local.mk index 5fe39e39b1e..241b54857d8 100644 --- a/src/flamenco/gossip/Local.mk +++ b/src/flamenco/gossip/Local.mk @@ -3,7 +3,7 @@ $(call add-objs,fd_gossip fd_gossip_msg_parse fd_gossip_msg_ser fd_gossip_out fd $(call add-hdrs,fd_bloom.h) $(call add-hdrs,fd_gossip_types.h) -$(call add-objs,fd_bloom fd_active_set fd_ping_tracker,fd_flamenco) +$(call add-objs,fd_bloom fd_active_set fd_ping_tracker fd_gossip_wpeer_sampler,fd_flamenco) $(call make-unit-test,test_bloom,test_bloom,fd_flamenco fd_util) $(call run-unit-test,test_bloom) diff --git a/src/flamenco/gossip/crds/fd_crds.c b/src/flamenco/gossip/crds/fd_crds.c index 29bc2354c45..58971f15e44 100644 --- a/src/flamenco/gossip/crds/fd_crds.c +++ b/src/flamenco/gossip/crds/fd_crds.c @@ -1,6 +1,7 @@ #include "fd_crds.h" #include "fd_crds_contact_info.c" #include "../fd_gossip_types.h" +#include "../fd_gossip_wpeer_sampler.h" #include "../../../ballet/sha256/fd_sha256.h" #include "../../../funk/fd_funk_base.h" /* no link dependency, only using hash */ @@ -40,7 +41,6 @@ struct fd_crds_entry_private { fd_crds_contact_info_entry_t * ci; long instance_creation_wallclock_nanos; uchar is_active; - ulong sampler_idx; /* A list of "fresh" contact info entries is maintained, holding entries that have been refreshed/inserted in the last 60s in @@ -269,8 +269,6 @@ lookup_eq( fd_crds_key_t const * key0, #include "../../../util/tmpl/fd_map_chain.c" -#include "fd_crds_peer_samplers.c" - struct fd_crds_purged { uchar hash[ 32UL ]; struct { @@ -340,6 +338,8 @@ typedef struct fd_crds_purged fd_crds_purged_t; struct fd_crds_private { fd_gossip_out_ctx_t * gossip_update; + fd_crds_ci_change_fn change_fn; + void * change_fn_ctx; fd_sha256_t sha256[1]; @@ -366,7 +366,7 @@ struct fd_crds_private { crds_contact_info_evict_dlist_t * evict_dlist; } contact_info; - crds_samplers_t samplers[1]; + wpeer_sampler_t * crds_sampler; fd_crds_metrics_t metrics[1]; @@ -397,15 +397,18 @@ fd_crds_footprint( ulong ele_max, l = FD_LAYOUT_APPEND( l, crds_contact_info_pool_align(), crds_contact_info_pool_footprint( CRDS_MAX_CONTACT_INFO ) ); l = FD_LAYOUT_APPEND( l, crds_contact_info_fresh_list_align(), crds_contact_info_fresh_list_footprint() ); l = FD_LAYOUT_APPEND( l, crds_contact_info_evict_dlist_align(), crds_contact_info_evict_dlist_footprint() ); + l = FD_LAYOUT_APPEND( l, wpeer_sampler_align(), wpeer_sampler_footprint( CRDS_MAX_CONTACT_INFO ) ); return FD_LAYOUT_FINI( l, FD_CRDS_ALIGN ); } void * -fd_crds_new( void * shmem, - fd_rng_t * rng, - ulong ele_max, - ulong purged_max, - fd_gossip_out_ctx_t * gossip_update_out ) { +fd_crds_new( void * shmem, + fd_rng_t * rng, + ulong ele_max, + ulong purged_max, + fd_gossip_out_ctx_t * gossip_update_out, + fd_crds_ci_change_fn change_fn, + void * change_fn_ctx ) { if( FD_UNLIKELY( !shmem ) ) { FD_LOG_WARNING(( "NULL shmem" )); return NULL; @@ -451,6 +454,7 @@ fd_crds_new( void * shmem, void * _ci_pool = FD_SCRATCH_ALLOC_APPEND( l, crds_contact_info_pool_align(), crds_contact_info_pool_footprint( CRDS_MAX_CONTACT_INFO ) ); void * _ci_dlist = FD_SCRATCH_ALLOC_APPEND( l, crds_contact_info_fresh_list_align(), crds_contact_info_fresh_list_footprint() ); void * _ci_evict_dlist = FD_SCRATCH_ALLOC_APPEND( l, crds_contact_info_evict_dlist_align(), crds_contact_info_evict_dlist_footprint() ); + void * _wpeer_sampler = FD_SCRATCH_ALLOC_APPEND( l, wpeer_sampler_align(), wpeer_sampler_footprint( CRDS_MAX_CONTACT_INFO ) ); FD_TEST( FD_SCRATCH_ALLOC_FINI( l, FD_CRDS_ALIGN ) == (ulong)shmem + fd_crds_footprint( ele_max, purged_max ) ); crds->pool = crds_pool_join( crds_pool_new( _pool, ele_max ) ); @@ -495,13 +499,17 @@ fd_crds_new( void * shmem, crds->contact_info.evict_dlist = crds_contact_info_evict_dlist_join( crds_contact_info_evict_dlist_new( _ci_evict_dlist ) ); FD_TEST( crds->contact_info.evict_dlist ); + crds->crds_sampler = wpeer_sampler_join( wpeer_sampler_new( _wpeer_sampler, CRDS_MAX_CONTACT_INFO ) ); + FD_TEST( crds->crds_sampler ); + FD_TEST( fd_sha256_join( fd_sha256_new( crds->sha256 ) ) ); - crds_samplers_new( crds->samplers ); memset( crds->metrics, 0, sizeof(fd_crds_metrics_t) ); crds->gossip_update = gossip_update_out; + crds->change_fn = change_fn; + crds->change_fn_ctx = change_fn_ctx; crds->has_staked_node = 0; FD_COMPILER_MFENCE(); @@ -551,6 +559,10 @@ remove_contact_info( fd_crds_t * crds, msg->contact_info_remove.idx = crds_contact_info_pool_idx( crds->contact_info.pool, ci->contact_info.ci ); fd_gossip_tx_publish_chunk( crds->gossip_update, stem, (ulong)msg->tag, FD_GOSSIP_UPDATE_SZ_CONTACT_INFO_REMOVE, now ); + ulong pool_idx = crds_contact_info_pool_idx( crds->contact_info.pool, ci->contact_info.ci ); + crds->change_fn( crds->change_fn_ctx, pool_idx, ci->stake, FD_CRDS_CONTACT_INFO_CHANGE_TYPE_REMOVED, stem, now ); + wpeer_sampler_upd( crds->crds_sampler, 0, pool_idx ); + if( FD_LIKELY( ci->stake ) ) crds->metrics->peer_staked_cnt--; else crds->metrics->peer_unstaked_cnt--; @@ -561,13 +573,6 @@ remove_contact_info( fd_crds_t * crds, } crds_contact_info_evict_dlist_ele_remove( crds->contact_info.evict_dlist, ci, crds->pool ); crds_contact_info_pool_ele_release( crds->contact_info.pool, ci->contact_info.ci ); - - /* FIXME: If the peer is in any active set bucket, it is NOT removed - here. If the peer is re-inserted into the CRDS table in the future, - it is added back into the bucket's sampler. This means a peer can - be sampled in a bucket (at least) twice during - fd_active_set_rotate. */ - crds_samplers_rem_peer( crds->samplers, ci ); } ulong @@ -658,6 +663,24 @@ expire( fd_crds_t * crds, } } +/* Freshness is based on how recent a peer's contact info was last + updated. + + Activity is based on whether a peer responds to a ping-pong request, + tracked by fd_ping_tracker. */ + +#define BASE_WEIGHT 256UL /* TODO: test this out!! */ +ulong +peer_sampler_score( ulong stake, + int is_active, + int is_fresh ) { + if( FD_UNLIKELY( !is_active ) ) return 0UL; + + ulong score = BASE_WEIGHT + stake; + if( FD_UNLIKELY( !is_fresh ) ) score>>=7; + return score; +} + void unfresh( fd_crds_t * crds, long now ) { @@ -668,7 +691,11 @@ unfresh( fd_crds_t * crds, head = crds_contact_info_fresh_list_ele_pop_head( crds->contact_info.fresh_dlist, crds->pool ); head->contact_info.fresh_dlist.in_list = 0; - crds_samplers_upd_peer_at_idx( crds->samplers, head, head->contact_info.sampler_idx, now ); + + ulong pool_idx = crds_contact_info_pool_idx( crds->contact_info.pool, head->contact_info.ci ); + ulong score = peer_sampler_score( head->stake, head->contact_info.is_active, 0 ); + + wpeer_sampler_upd( crds->crds_sampler, score, pool_idx ); } } @@ -762,8 +789,6 @@ crds_entry_init( fd_gossip_view_crds_value_t const * view, out_value->node_instance.token = view->node_instance->token; } else if( FD_UNLIKELY( key->tag==FD_GOSSIP_VALUE_CONTACT_INFO ) ) { out_value->contact_info.instance_creation_wallclock_nanos = view->ci_view->contact_info->instance_creation_wallclock_nanos; - /* Contact Info entry will be added to sampler upon successful insertion */ - out_value->contact_info.sampler_idx = SAMPLE_IDX_SENTINEL; } } @@ -1022,13 +1047,11 @@ fd_crds_insert( fd_crds_t * crds, and is used in sampler score calculations. So we inherit the incumbent's setting. */ candidate->contact_info.is_active = incumbent->contact_info.is_active; - if( FD_LIKELY( !is_from_me ) ) { - if( FD_UNLIKELY( candidate->stake!=incumbent->stake ) ) { - /* Perform a rescore here (expensive) */ - crds_samplers_upd_peer_at_idx( crds->samplers, candidate, incumbent->contact_info.sampler_idx, now ); - } else { - crds_samplers_swap_peer_at_idx( crds->samplers, candidate, incumbent->contact_info.sampler_idx ); - } + if( FD_UNLIKELY( candidate->stake!=incumbent->stake && !is_from_me ) ) { + ulong score = peer_sampler_score( origin_stake, candidate->contact_info.is_active, 0 ); + ulong pool_idx = crds_contact_info_pool_idx( crds->contact_info.pool, candidate->contact_info.ci ); + wpeer_sampler_upd( crds->crds_sampler, score, pool_idx ); + crds->change_fn( crds->change_fn_ctx, pool_idx, candidate->stake, FD_CRDS_CONTACT_INFO_CHANGE_TYPE_STAKE_CHANGED, stem, now ); } if( FD_LIKELY( incumbent->stake ) ) crds->metrics->peer_staked_cnt--; @@ -1088,7 +1111,7 @@ fd_crds_insert( fd_crds_t * crds, crds_contact_info_evict_dlist_ele_push_tail( crds->contact_info.evict_dlist, candidate, crds->pool ); - if( FD_LIKELY( !is_from_me ) ){ + if( FD_LIKELY( !is_from_me ) ) { crds_contact_info_fresh_list_ele_push_tail( crds->contact_info.fresh_dlist, candidate, crds->pool ); candidate->contact_info.fresh_dlist.in_list = 1; } else { @@ -1096,7 +1119,10 @@ fd_crds_insert( fd_crds_t * crds, } if( FD_UNLIKELY( !is_replacing && !is_from_me ) ) { - crds_samplers_add_peer( crds->samplers, candidate, now); + ulong pool_idx = crds_contact_info_pool_idx( crds->contact_info.pool, candidate->contact_info.ci ); + ulong score = peer_sampler_score( origin_stake, candidate->contact_info.is_active, 1 ); + wpeer_sampler_upd( crds->crds_sampler, score, pool_idx ); + crds->change_fn( crds->change_fn_ctx, pool_idx, candidate->stake, FD_CRDS_CONTACT_INFO_CHANGE_TYPE_NEW, stem, now ); } if( FD_LIKELY( candidate->stake ) ) crds->metrics->peer_staked_cnt++; @@ -1153,6 +1179,13 @@ fd_crds_contact_info_lookup( fd_crds_t const * crds, return peer_ci->contact_info.ci->contact_info; } +fd_contact_info_t const * +fd_crds_contact_info_idx_lookup( fd_crds_t const * crds, + ulong idx ) { + fd_crds_contact_info_entry_t const * ci = crds_contact_info_pool_ele_const( crds->contact_info.pool, idx ); + return ci->contact_info; +} + ulong fd_crds_peer_count( fd_crds_t const * crds ){ return crds_contact_info_pool_used( crds->contact_info.pool ); @@ -1161,88 +1194,61 @@ fd_crds_peer_count( fd_crds_t const * crds ){ static inline void set_peer_active_status( fd_crds_t * crds, uchar const * peer_pubkey, - uchar status, - long now ) { + uchar status ) { fd_crds_key_t key[1]; make_contact_info_key( peer_pubkey, key ); fd_crds_entry_t * peer_ci = lookup_map_ele_query( crds->lookup_map, key, NULL, crds->pool ); - /* TODO: error handling? This technically should never hit */ + + /* In its first inactive->active transition, a peer will not have a + corresponding contact info entry since their contact infos are + dropped on ingress until marked active by ping tracker. */ if( FD_UNLIKELY( !peer_ci ) ) return; - uchar old_status = peer_ci->contact_info.is_active; + + uchar old_status = peer_ci->contact_info.is_active; peer_ci->contact_info.is_active = status; if( FD_UNLIKELY( old_status!=status ) ) { /* Trigger sampler update */ - crds_samplers_upd_peer_at_idx( crds->samplers, - peer_ci, - peer_ci->contact_info.sampler_idx, - now ); + ulong pool_idx = crds_contact_info_pool_idx( crds->contact_info.pool, peer_ci->contact_info.ci ); + ulong score = peer_sampler_score( peer_ci->stake, status, !!peer_ci->contact_info.fresh_dlist.in_list ); + wpeer_sampler_upd( crds->crds_sampler, score, pool_idx ); } } void fd_crds_peer_active( fd_crds_t * crds, - uchar const * peer_pubkey, - long now ) { - set_peer_active_status( crds, peer_pubkey, 1 /* active */, now ); + uchar const * peer_pubkey ) { + set_peer_active_status( crds, peer_pubkey, 1 /* active */ ); } void fd_crds_peer_inactive( fd_crds_t * crds, - uchar const * peer_pubkey, - long now ) { - set_peer_active_status( crds, peer_pubkey, 0 /* inactive */, now ); + uchar const * peer_pubkey ) { + set_peer_active_status( crds, peer_pubkey, 0 /* inactive */); } fd_contact_info_t const * fd_crds_peer_sample( fd_crds_t const * crds, fd_rng_t * rng ) { - ulong idx = wpeer_sampler_sample( crds->samplers->pr_sampler, - rng, - crds->samplers->ele_cnt ); + ulong idx = wpeer_sampler_sample( crds->crds_sampler, rng ); if( FD_UNLIKELY( idx==SAMPLE_IDX_SENTINEL ) ) return NULL; - return fd_crds_entry_contact_info( crds->samplers->ele[idx] ); + return crds_contact_info_pool_ele( crds->contact_info.pool, idx )->contact_info; } -fd_contact_info_t const * -fd_crds_bucket_sample_and_remove( fd_crds_t * crds, - fd_rng_t * rng, - ulong bucket ) { - ulong idx = wpeer_sampler_sample( &crds->samplers->bucket_samplers[bucket], - rng, - crds->samplers->ele_cnt ); - if( FD_UNLIKELY( idx==SAMPLE_IDX_SENTINEL ) ) return NULL; - /* Disable peer to prevent future sampling until added back with - fd_crds_bucket_add */ - wpeer_sampler_disable( &crds->samplers->bucket_samplers[bucket], - idx, - crds->samplers->ele_cnt ); +void +fd_crds_handle_identity_change( fd_crds_t * crds, + uchar const * new_identity_pubkey ) { + fd_crds_key_t old_key[1]; + make_contact_info_key( new_identity_pubkey, old_key ); + fd_crds_entry_t * new_ci = lookup_map_ele_query( crds->lookup_map, old_key, NULL, crds->pool ); + if( FD_UNLIKELY( !new_ci ) ) return; - return fd_crds_entry_contact_info( crds->samplers->ele[idx] ); -} + ulong new_idx = crds_contact_info_pool_idx( crds->contact_info.pool, new_ci->contact_info.ci ); + + wpeer_sampler_upd( crds->crds_sampler, 0UL, new_idx ); + crds->change_fn( crds->change_fn_ctx, new_idx, new_ci->stake, FD_CRDS_CONTACT_INFO_CHANGE_IDENTITY_CHANGED, NULL, 0L ); -void -fd_crds_bucket_add( fd_crds_t * crds, - ulong bucket, - uchar const * pubkey ) { - fd_crds_key_t key[1]; - make_contact_info_key( pubkey, key ); - fd_crds_entry_t * peer_ci = lookup_map_ele_query( crds->lookup_map, key, NULL, crds->pool ); - if( FD_UNLIKELY( !peer_ci ) ) { - FD_LOG_DEBUG(( "Sample peer not found in CRDS. Likely dropped." )); - return; - } - wpeer_sampler_t * bucket_sampler = &crds->samplers->bucket_samplers[bucket]; - wpeer_sampler_enable( bucket_sampler, - peer_ci->contact_info.sampler_idx, - crds->samplers->ele_cnt ); - - ulong score = wpeer_sampler_bucket_score( peer_ci, bucket ); - wpeer_sampler_upd( bucket_sampler, - score, - peer_ci->contact_info.sampler_idx, - crds->samplers->ele_cnt ); } struct fd_crds_mask_iter_private { diff --git a/src/flamenco/gossip/crds/fd_crds.h b/src/flamenco/gossip/crds/fd_crds.h index 4d1b44222aa..17be706d955 100644 --- a/src/flamenco/gossip/crds/fd_crds.h +++ b/src/flamenco/gossip/crds/fd_crds.h @@ -16,6 +16,33 @@ typedef struct fd_crds_private fd_crds_t; struct fd_crds_mask_iter_private; typedef struct fd_crds_mask_iter_private fd_crds_mask_iter_t; +/* fd_crds_ci_change_fn provides a callback that tracks events + related to the contact info table, which is a separate sidetable + maintained by fd_crds. + + A crds_pool_idx value is guaranteed to be tagged to a specific + contact info entry from the moment the contact info is inserted into + the table (denoted by a TYPE_NEW event) to when it is + dropped (TYPE_REMOVED event). + + The contact info can be retrieved with + fd_crds_contact_info_idx_lookup( crds, crds_pool_idx ) within this + lifetime. In a TYPE_REMOVED event, crds_pool_idx is valid for lookup + within the scope of the callback for user-side cleanup, but is + invalid after the callback is completed. */ + +#define FD_CRDS_CONTACT_INFO_CHANGE_TYPE_NEW (0) +#define FD_CRDS_CONTACT_INFO_CHANGE_TYPE_REMOVED (1) +#define FD_CRDS_CONTACT_INFO_CHANGE_TYPE_STAKE_CHANGED (2) +#define FD_CRDS_CONTACT_INFO_CHANGE_IDENTITY_CHANGED (3) + +typedef void (*fd_crds_ci_change_fn)( void * ctx, + ulong crds_pool_idx, + ulong stake, + int change_type, + fd_stem_context_t * stem, + long now ); + #define FD_CRDS_ALIGN 128UL #define FD_CRDS_MAGIC (0xf17eda2c37c7d50UL) /* firedancer crds version 0*/ @@ -56,7 +83,9 @@ fd_crds_new( void * shmem, fd_rng_t * rng, ulong ele_max, ulong purged_max, - fd_gossip_out_ctx_t * gossip_update_out ); + fd_gossip_out_ctx_t * gossip_update_out, + fd_crds_ci_change_fn change_fn, + void * change_fn_ctx ); fd_crds_t * fd_crds_join( void * shcrds ); @@ -229,6 +258,16 @@ fd_contact_info_t const * fd_crds_contact_info_lookup( fd_crds_t const * crds, uchar const * pubkey ); +/* fd_crds_contact_info_idx_lookup returns a pointer to the contact + info structure corresponding to the contact info pool index. Assumes + idx points to a valid contact info pool entry. + + Valid idx (including lifetimes) are described in the + fd_crds_ci_change_fn callback documentation. */ +fd_contact_info_t const * +fd_crds_contact_info_idx_lookup( fd_crds_t const * crds, + ulong idx ); + /* fd_crds_peer_count returns the number of Contact Info entries present in the sidetable. The lifetime of a Contact Info entry tracks the lifetime of the corresponding CRDS entry. */ @@ -243,39 +282,16 @@ fd_crds_peer_count( fd_crds_t const * crds ); A peer's active state is typicallly determined by its ping/pong status. */ void fd_crds_peer_active( fd_crds_t * crds, - uchar const * peer_pubkey, - long now ); + uchar const * peer_pubkey ); void fd_crds_peer_inactive( fd_crds_t * crds, - uchar const * peer_pubkey, - long now ); - -/* The CRDS Table also maintains a set of peer samplers for use in various - Gossip tx cases. Namely - - Rotating the active push set (bucket_samplers) - - Selecting a pull request target (pr_sampler) */ - - -/* fd_crds_bucket_* sample APIs are meant to be used by fd_active_set. - Each bucket has a unique sampler. */ -fd_contact_info_t const * -fd_crds_bucket_sample_and_remove( fd_crds_t * crds, - fd_rng_t * rng, - ulong bucket ); - -/* fd_crds_bucket adds back in a peer that was previously - sampled with fd_crds_bucket_sample_and_remove. */ -void -fd_crds_bucket_add( fd_crds_t * crds, - ulong bucket, - uchar const * pubkey ); - + uchar const * peer_pubkey ); /* fd_crds_sample_peer randomly selects a peer node from the CRDS based weighted by stake. Peers with a ContactInfo that hasn't been refreshed in more than 60 seconds are considered offline, and are - downweighted in the selection by a factor of 100. They are still + downweighted in the selection by a factor of 128. They are still included to mitigate eclipse attacks. Peers with no ContactInfo in the CRDS are not included in the selection. The current node is also excluded from the selection. Low stake peers which are not @@ -293,6 +309,10 @@ fd_contact_info_t const * fd_crds_peer_sample( fd_crds_t const * crds, fd_rng_t * rng ); +void +fd_crds_handle_identity_change( fd_crds_t * crds, + uchar const * new_identity_pubkey ); + /* fd_crds_mask_iter_{init,next,done,entry} provide an API to iterate over the CRDS values in the table that whose hashes match a given mask. In the Gossip CRDS filter, the mask is applied on diff --git a/src/flamenco/gossip/crds/fd_crds_peer_samplers.c b/src/flamenco/gossip/crds/fd_crds_peer_samplers.c deleted file mode 100644 index b02a2f89344..00000000000 --- a/src/flamenco/gossip/crds/fd_crds_peer_samplers.c +++ /dev/null @@ -1,282 +0,0 @@ -/* Owned by CRDS table, tightly coupled with contact info side table. - - The gossip table (currently) needs 25 (active set rotation) - + 1 (tx pull req) stake-weighted peer samplers. - - Each sampler needs a different weight scoring implementation. This - compile unit: - - defines weight scoring implementations for each sampler. - - defines functions to insert, remove, and update all samplers in - one go - - The sample sets are designed to closely track modifications to the - CRDS contact info table. */ - -#include "fd_crds.h" -#include "../fd_active_set_private.h" - -#define SAMPLE_IDX_SENTINEL ULONG_MAX - -#define SET_NAME peer_enabled -#define SET_MAX CRDS_MAX_CONTACT_INFO -#include "../../../util/tmpl/fd_set.c" - -/* This is a very rudimentary implementation of a weighted sampler. We will - eventually want to use a modified fd_wsample. Using this until then. */ -struct wpeer_sampler { - /* Cumulative weight for each peer. - Individual peer weight can be derived with cumul_weight[i]-cumul_weight[i-1] */ - ulong cumul_weight[ CRDS_MAX_CONTACT_INFO ]; - - /* peer_enabled_test( peer_enabled, i ) determines if peer i is enabled. A - disabled peer should not be scored (e.g., during an update) and should - have a peer weight of 0 (i.e,. cumul_weight[i] - cumul_weight[i-1]==0). - - Why not just remove the peer? - Adding/removing should strictly track the CRDS Contact Info - table. That is to say a peer must have an entry in the sampler - if it has an entry in the table, and vice versa. This simplifies - the crds_samplers management logic which ties the peer to the - same index across all samplers. A single sampler can "disable" a - peer without affecting the other samplers. - - Why not just set the peer's weight to 0? - 0 is a legitimate score for a peer (wpeer_sampler_peer_score). - A futre upsert into the CRDS table might trigger a re-score, which - might inadvertently re-enable the peer in the sampler. We want to - distinguish peers that are "removed" from the sampler from peers - that have a score of 0 based on the parameters provided. */ - peer_enabled_t peer_enabled[ peer_enabled_word_cnt ]; -}; - -typedef struct wpeer_sampler wpeer_sampler_t; - - -struct crds_samplers { - wpeer_sampler_t pr_sampler[1]; - wpeer_sampler_t bucket_samplers[25]; - fd_crds_entry_t * ele[ CRDS_MAX_CONTACT_INFO ]; - ulong ele_cnt; -}; - -typedef struct crds_samplers crds_samplers_t; - -#define PREV_PEER_WEIGHT( ps, idx ) \ - ( (idx) ? (ps)->cumul_weight[(idx)-1] : 0UL ) -int -wpeer_sampler_init( wpeer_sampler_t * ps ) { - if( FD_UNLIKELY( !ps ) ) return -1; - for( ulong i = 0UL; i < CRDS_MAX_CONTACT_INFO; i++ ) { - ps->cumul_weight[i] = 0UL; - } - /* All peers are "enabled" by default as they get added. */ - peer_enabled_full( ps->peer_enabled ); - return 0; -} - -ulong -wpeer_sampler_sample( wpeer_sampler_t const * ps, - fd_rng_t * rng, - ulong ele_cnt ) { - if( FD_UNLIKELY( !ele_cnt || !ps->cumul_weight[ele_cnt-1] ) ) { - return SAMPLE_IDX_SENTINEL; - } - - ulong sample = fd_rng_ulong_roll( rng, ps->cumul_weight[ele_cnt-1] ); - /* avoid sampling 0 */ - sample = fd_ulong_min( sample+1UL, ps->cumul_weight[ele_cnt-1] ); - - /* Binary search for the smallest cumulative weight >= sample */ - ulong left = 0UL; - ulong right = ele_cnt; - while( left < right ) { - ulong mid = left + (right - left) / 2UL; - if( ps->cumul_weight[mid]peer_enabled, idx ); - - ulong old_weight = ps->cumul_weight[idx] - PREV_PEER_WEIGHT( ps, idx ); - if( FD_UNLIKELY( old_weight==weight ) ) return 0; - - if( weight>old_weight ) { - for( ulong i=idx; icumul_weight[i] += (weight - old_weight); - } - } else { - for( ulong i=idx; icumul_weight[i] -= (old_weight - weight); - } - } - return 0; -} - -int -wpeer_sampler_disable( wpeer_sampler_t * ps, - ulong idx, - ulong ele_cnt ) { - if( FD_UNLIKELY( !ps || idx>=ele_cnt ) ) return -1; - - /* Set the peer weight to zero */ - if( FD_UNLIKELY( wpeer_sampler_upd( ps, 0UL, idx, ele_cnt )<0 ) ) return -1; - - /* Disable the peer in the enabled set */ - peer_enabled_remove( ps->peer_enabled, idx ); - return 0; -} - -int -wpeer_sampler_enable( wpeer_sampler_t * ps, - ulong idx, - ulong ele_cnt ) { - if( FD_UNLIKELY( !ps || idx>=ele_cnt ) ) return -1; - peer_enabled_insert( ps->peer_enabled, idx ); - return 0; -} - -/* NOTE: this should only be called if the peer is dropped from the - Contact Info table. Use wpeer_sampler_disable otherwise */ -int -wpeer_sampler_rem( wpeer_sampler_t * ps, - ulong idx, - ulong ele_cnt ) { - ulong score = ps->cumul_weight[idx] - PREV_PEER_WEIGHT( ps, idx ); - - for( ulong i = idx+1; i < ele_cnt; i++ ) { - ps->cumul_weight[i] -= score; - ps->cumul_weight[i-1] = ps->cumul_weight[i]; - - peer_enabled_insert_if( ps->peer_enabled, peer_enabled_test( ps->peer_enabled, i ), i-1UL ); - peer_enabled_remove_if( ps->peer_enabled, !peer_enabled_test( ps->peer_enabled, i ), i-1UL ); - } - - peer_enabled_insert( ps->peer_enabled, ele_cnt-1UL ); - return 0; -} - -#define BASE_WEIGHT 100UL /* TODO: figure this out!! */ -ulong -wpeer_sampler_peer_score( fd_crds_entry_t * peer, - long now ) { - if( FD_UNLIKELY( !peer->contact_info.is_active ) ) return 0; - ulong score = BASE_WEIGHT; - score += peer->stake; - if( FD_UNLIKELY( peer->wallclock_nanosstake ); - ulong score = fd_ulong_sat_add( fd_ulong_min( bucket, peer_bucket ), 1UL ); - - return score*score; -} - - - -void -crds_samplers_new( crds_samplers_t * ps ) { - if( FD_UNLIKELY( !ps ) ) return; - - wpeer_sampler_init( ps->pr_sampler ); - for( ulong i=0UL; i<25UL; i++ ) { - wpeer_sampler_init( &ps->bucket_samplers[i] ); - } - ps->ele_cnt = 0UL; - for( ulong i=0UL; iele[i] = NULL; - } -} - -int -crds_samplers_upd_peer_at_idx( crds_samplers_t * ps, - fd_crds_entry_t * peer, - ulong idx, - long now ) { - if( FD_UNLIKELY( idx>=ps->ele_cnt ) ) { - FD_LOG_WARNING(( "Bad peer idx supplied in sample update" )); - return -1; - } - ps->ele[idx] = peer; - peer->contact_info.sampler_idx = idx; - ulong peer_score = wpeer_sampler_peer_score( peer, now ); - if( FD_UNLIKELY( wpeer_sampler_upd( ps->pr_sampler, peer_score, idx, ps->ele_cnt )<0 ) ) return -1; - - for( ulong i=0UL; i<25UL; i++ ) { - ulong bucket_score = wpeer_sampler_bucket_score( peer, i ); - if( FD_UNLIKELY( !bucket_score ) ) FD_LOG_ERR(( "0-weighted peer in bucket, should not be possible" )); - if( FD_UNLIKELY( wpeer_sampler_upd( &ps->bucket_samplers[i], bucket_score, idx, ps->ele_cnt )<0 ) ) return -1; - } - - return 0; -} - -int -crds_samplers_swap_peer_at_idx( crds_samplers_t * ps, - fd_crds_entry_t * new_peer, - ulong idx ) { - if( FD_UNLIKELY( idx>=ps->ele_cnt ) ) { - FD_LOG_WARNING(( "Bad peer idx supplied in sample update" )); - return -1; - } - fd_crds_entry_t * old_peer = ps->ele[idx]; - if( FD_UNLIKELY( !old_peer ) ) { - FD_LOG_ERR(( "No peer at index %lu in samplers" , idx )); - } - - ps->ele[idx] = new_peer; - new_peer->contact_info.sampler_idx = idx; - old_peer->contact_info.sampler_idx = SAMPLE_IDX_SENTINEL; - return 0; -} - -int -crds_samplers_add_peer( crds_samplers_t * ps, - fd_crds_entry_t * peer, - long now ) { - ulong idx = fd_ulong_min( ps->ele_cnt, (CRDS_MAX_CONTACT_INFO)-1UL ); - ps->ele_cnt++; - if( FD_UNLIKELY( !!crds_samplers_upd_peer_at_idx( ps, peer, idx, now ) ) ){ - FD_LOG_WARNING(( "Failed to update peer in samplers" )); - ps->ele_cnt--; - ps->ele[idx] = NULL; - return -1; - } - return 0; -} - -int -crds_samplers_rem_peer( crds_samplers_t * ps, - fd_crds_entry_t * peer ) { - ulong idx = peer->contact_info.sampler_idx; - if( FD_UNLIKELY( idx>=ps->ele_cnt ) ) return -1; - if( FD_UNLIKELY( wpeer_sampler_rem( ps->pr_sampler, idx, ps->ele_cnt )<0 ) ) return -1; - for( ulong i=0UL; i<25UL; i++ ) { - if( FD_UNLIKELY( wpeer_sampler_rem( &ps->bucket_samplers[i], idx, ps->ele_cnt )<0 ) ) return -1; - } - - // Shift the elements down in elems array - for( ulong i = idx+1; i < ps->ele_cnt; i++ ) { - ps->ele[i-1] = ps->ele[i]; - ps->ele[i-1]->contact_info.sampler_idx = i-1; - } - ps->ele_cnt--; - return 0; -} diff --git a/src/flamenco/gossip/crds/test_crds.c b/src/flamenco/gossip/crds/test_crds.c index bbd7ddf6cda..3a26408c815 100644 --- a/src/flamenco/gossip/crds/test_crds.c +++ b/src/flamenco/gossip/crds/test_crds.c @@ -3,6 +3,16 @@ #include +void +stub_ci_change( void * ctx, + ulong crds_pool_idx, + ulong stake, + int change_type, + fd_stem_context_t * ctx_unused, + long now ) { + (void)ctx; (void)crds_pool_idx; (void)stake; (void)change_type; (void)ctx_unused; (void)now; +} + static void test_crds_new_basic( void ) { ulong ele_max = 1024UL; @@ -17,7 +27,7 @@ test_crds_new_basic( void ) { static fd_gossip_out_ctx_t gossip_out = {0}; - void * shcrds = fd_crds_new( mem, rng, ele_max, purged_max, &gossip_out ); + void * shcrds = fd_crds_new( mem, rng, ele_max, purged_max, &gossip_out, stub_ci_change, NULL ); FD_TEST( shcrds ); fd_crds_t * crds = fd_crds_join( shcrds ); diff --git a/src/flamenco/gossip/fd_active_set.c b/src/flamenco/gossip/fd_active_set.c index d85f06fb72f..517d14ba942 100644 --- a/src/flamenco/gossip/fd_active_set.c +++ b/src/flamenco/gossip/fd_active_set.c @@ -1,5 +1,91 @@ #include "fd_active_set.h" +#include "fd_bloom.h" #include "fd_active_set_private.h" +#include "fd_gossip_txbuild.h" +#include "fd_gossip_types.h" +#include "fd_gossip_wpeer_sampler.h" +#include "crds/fd_crds.h" + + +struct fd_active_set_bucket_entry { + uchar pubkey[ 32UL ]; + fd_bloom_t * bloom; + ulong entry_idx; + fd_gossip_txbuild_t push_state[ 1UL ]; + + ulong pool_next; + + struct { + ulong prev; + ulong next; + } insert_dlist; + + struct { + long wallclock_nanos; + ulong prev; + ulong next; + } last_hit; + +}; + +typedef struct fd_active_set_bucket_entry fd_active_set_bucket_entry_t; + +#define POOL_NAME entry_pool +#define POOL_T fd_active_set_bucket_entry_t +#define POOL_NEXT pool_next +#include "../../util/tmpl/fd_pool.c" + +#define DLIST_NAME bucket_insert_dlist +#define DLIST_ELE_T fd_active_set_bucket_entry_t +#define DLIST_PREV insert_dlist.prev +#define DLIST_NEXT insert_dlist.next +#include "../../util/tmpl/fd_dlist.c" + +#define DLIST_NAME last_hit +#define DLIST_ELE_T fd_active_set_bucket_entry_t +#define DLIST_PREV last_hit.prev +#define DLIST_NEXT last_hit.next + +#include "../../util/tmpl/fd_dlist.c" + +struct fd_active_set_bucket_private { + bucket_insert_dlist_t * insert_dlist; + wpeer_sampler_t * sampler; + ulong cnt; +}; + +typedef struct fd_active_set_bucket_private fd_active_set_bucket_t; + + +#define BUCKET_ENTRY_IDX_SENTINEL (USHORT_MAX) +FD_STATIC_ASSERT( FD_ACTIVE_SET_PEERS_PER_BUCKETentry_pool = entry_pool_join( entry_pool_new( _entry_pool, FD_ACTIVE_SET_MAX_PEERS ) ); + FD_TEST( as->entry_pool ); + + as->last_hit = last_hit_join( last_hit_new( _last_hit ) ); + FD_TEST( as->last_hit ); as->rng = rng; - for( ulong i=0UL; i<25UL; i++ ) { - fd_active_set_entry_t * entry = as->entries[ i ]; - entry->nodes_idx = 0UL; - entry->nodes_len = 0UL; - - for( ulong j=0UL; j<12UL; j++ ) { - fd_active_set_peer_t * peer = entry->nodes[ j ]; - peer->bloom = fd_bloom_join( fd_bloom_new( _blooms, rng, 0.1, 32768UL ) ); - if( FD_UNLIKELY( !peer->bloom ) ) { - FD_LOG_WARNING(( "failed to create bloom filter" )); - return NULL; - } - _blooms += bloom_footprint; + for( ulong i=0UL; ibuckets[i] = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_active_set_bucket_t), sizeof(fd_active_set_bucket_t) ); + void * _insert_dlist = FD_SCRATCH_ALLOC_APPEND( l, bucket_insert_dlist_align(), bucket_insert_dlist_footprint() ); + void * _bucket_sampler = FD_SCRATCH_ALLOC_APPEND( l, wpeer_sampler_align(), wpeer_sampler_footprint( CRDS_MAX_CONTACT_INFO ) ); + + fd_active_set_bucket_t * bucket = as->buckets[ i ]; + bucket->cnt = 0UL; + + bucket->insert_dlist = bucket_insert_dlist_join( bucket_insert_dlist_new( _insert_dlist ) ); + FD_TEST( bucket->insert_dlist ); + + bucket->sampler = wpeer_sampler_join( wpeer_sampler_new( _bucket_sampler, CRDS_MAX_CONTACT_INFO ) ); + FD_TEST( bucket->sampler ); + + } + + for( ulong i=0UL; ientry_pool, i ); + + peer->bloom = fd_bloom_join( fd_bloom_new( _blooms, rng, 0.1, 32768UL ) ); + if( FD_UNLIKELY( !peer->bloom ) ) { + FD_LOG_WARNING(( "failed to create bloom filter" )); + return NULL; } + _blooms += bloom_footprint; } + FD_TEST( FD_SCRATCH_ALLOC_FINI( l, FD_ACTIVE_SET_ALIGN ) == (ulong)shmem + fd_active_set_footprint() ); FD_COMPILER_MFENCE(); FD_VOLATILE( as->magic ) = FD_ACTIVE_SET_MAGIC; @@ -82,43 +197,39 @@ fd_active_set_join( void * shas ) { } ulong -fd_active_set_nodes( fd_active_set_t * active_set, - uchar const * identity_pubkey, - ulong identity_stake, - uchar const * origin, - ulong origin_stake, - int ignore_prunes_if_peer_is_origin, - ulong out_nodes[ static 12UL ] ) { - ulong stake_bucket = fd_active_set_stake_bucket( fd_ulong_min( identity_stake, origin_stake ) ); - fd_active_set_entry_t * entry = active_set->entries[ stake_bucket ]; +fd_active_set_nodes( fd_active_set_t * active_set, + uchar const * identity_pubkey, + ulong identity_stake, + uchar const * origin, + ulong origin_stake, + int ignore_prunes_if_peer_is_origin, + long now, + fd_active_set_push_state_t out_push_states[ static FD_ACTIVE_SET_PEERS_PER_BUCKET ] ) { + ulong bucket_idx = fd_active_set_stake_bucket( fd_ulong_min( identity_stake, origin_stake ) ); + fd_active_set_bucket_t * bucket = active_set->buckets[ bucket_idx ]; int identity_eq_origin = !memcmp( identity_pubkey, origin, 32UL ); ulong out_idx = 0UL; - for( ulong i=0UL; inodes_len; i++ ) { - fd_active_set_peer_t * peer = entry->nodes[ (entry->nodes_idx+i) % 12UL ]; + bucket_insert_dlist_iter_t it = bucket_insert_dlist_iter_fwd_init( bucket->insert_dlist, active_set->entry_pool ); + for( ; !bucket_insert_dlist_iter_done ( it, bucket->insert_dlist, active_set->entry_pool ); + it = bucket_insert_dlist_iter_fwd_next( it, bucket->insert_dlist, active_set->entry_pool ) ) { + fd_active_set_bucket_entry_t * peer = bucket_insert_dlist_iter_ele( it, bucket->insert_dlist, active_set->entry_pool ); int must_push_if_peer_is_origin = ignore_prunes_if_peer_is_origin && !memcmp( peer->pubkey, origin, 32UL ); - int must_push_own_values = identity_eq_origin && !memcmp( peer->pubkey, identity_pubkey, 32UL ); /* why ? */ + int must_push_own_values = identity_eq_origin && !memcmp( peer->pubkey, identity_pubkey, 32UL ); /* why ? */ if( FD_UNLIKELY( fd_bloom_contains( peer->bloom, origin, 32UL ) && !must_push_own_values && !must_push_if_peer_is_origin ) ) continue; - out_nodes[ out_idx++ ] = stake_bucket*12UL + i; - } - return out_idx; -} -uchar const * -fd_active_set_node_pubkey( fd_active_set_t * active_set, - ulong peer_idx ){ - ulong bucket = peer_idx / FD_ACTIVE_SET_PEERS_PER_ENTRY; - ulong idx = peer_idx % FD_ACTIVE_SET_PEERS_PER_ENTRY; - if( FD_UNLIKELY( bucket>=FD_ACTIVE_SET_STAKE_ENTRIES ) ) { - FD_LOG_ERR(( "peer_idx out of range" )); - } - if( FD_UNLIKELY( active_set->entries[ bucket ]->nodes_len<=idx ) ) { - FD_LOG_ERR(( "peer_idx out of range within bucket" )); - } + out_push_states[ out_idx++ ] = (fd_active_set_push_state_t){ + .txbuild = peer->push_state, + .crds_idx = peer->entry_idx + }; - return active_set->entries[ bucket ]->nodes[ idx ]->pubkey; + peer->last_hit.wallclock_nanos = now; + last_hit_ele_remove ( active_set->last_hit, peer, active_set->entry_pool ); + last_hit_ele_push_tail( active_set->last_hit, peer, active_set->entry_pool ); + } + return out_idx; } void @@ -130,43 +241,152 @@ fd_active_set_prune( fd_active_set_t * active_set, ulong identity_stake ) { if( FD_UNLIKELY( !memcmp( identity_pubkey, origin, 32UL ) ) ) return; - ulong bucket = fd_active_set_stake_bucket( fd_ulong_min( identity_stake, origin_stake ) ); - for( ulong i=0UL; i<12UL; i++ ) { - if( FD_UNLIKELY( !memcmp( active_set->entries[ bucket ]->nodes[ i ]->pubkey, push_dest, 32UL ) ) ) { - fd_bloom_insert( active_set->entries[ bucket ]->nodes[ i ]->bloom, origin, 32UL ); + ulong bucket_idx = fd_active_set_stake_bucket( fd_ulong_min( identity_stake, origin_stake ) ); + fd_active_set_bucket_t * bucket = active_set->buckets[ bucket_idx ]; + + bucket_insert_dlist_iter_t it = bucket_insert_dlist_iter_fwd_init( bucket->insert_dlist, active_set->entry_pool ); + for( ; !bucket_insert_dlist_iter_done ( it, bucket->insert_dlist, active_set->entry_pool ); + it = bucket_insert_dlist_iter_fwd_next( it, bucket->insert_dlist, active_set->entry_pool ) ) { + fd_active_set_bucket_entry_t * peer = bucket_insert_dlist_iter_ele( it, bucket->insert_dlist, active_set->entry_pool); + if( FD_UNLIKELY( !memcmp( peer->pubkey, push_dest, 32UL ) ) ) { + fd_bloom_insert( peer->bloom, origin, 32UL ); return; } } } -ulong -fd_active_set_rotate( fd_active_set_t * active_set, - fd_crds_t * crds ) { +void +fd_active_set_rotate( fd_active_set_t * active_set, + fd_crds_t * crds, + long now, + fd_active_set_push_state_t * out_maybe_flush ) { ulong num_bloom_filter_items = fd_ulong_max( fd_crds_peer_count( crds ), 512UL ); - ulong bucket = fd_rng_ulong_roll( active_set->rng, 25UL ); - fd_active_set_entry_t * entry = active_set->entries[ bucket ]; + ulong bucket_idx = fd_rng_ulong_roll( active_set->rng, 25UL ); + fd_active_set_bucket_t * bucket = active_set->buckets[ bucket_idx ]; + + ulong replace_idx; + fd_active_set_bucket_entry_t * replace; - ulong replace_idx; + out_maybe_flush->txbuild = NULL; + out_maybe_flush->crds_idx = ULONG_MAX; - if( FD_LIKELY( entry->nodes_len==12UL ) ) { - replace_idx = entry->nodes_idx; - entry->nodes_idx = (entry->nodes_idx+1UL) % 12UL; - fd_crds_bucket_add( crds, bucket, entry->nodes[ replace_idx ]->pubkey ); + ulong crds_idx = wpeer_sampler_sample( bucket->sampler, active_set->rng ); + if( FD_UNLIKELY( crds_idx==SAMPLE_IDX_SENTINEL ) ) return; + + if( FD_LIKELY( bucket->cnt==FD_ACTIVE_SET_PEERS_PER_BUCKET ) ) { + replace_idx = bucket_insert_dlist_idx_pop_head( bucket->insert_dlist, active_set->entry_pool ); + replace = entry_pool_ele( active_set->entry_pool, replace_idx ); + + ulong crds_idx = replace->entry_idx; + + out_maybe_flush->txbuild = replace->push_state; + out_maybe_flush->crds_idx = crds_idx; + last_hit_idx_remove( active_set->last_hit, replace_idx, active_set->entry_pool ); + + /* Replaced peer needs to be reinserted into bucket's sampler */ + peer_meta_t * e = &active_set->peer_metas[ crds_idx ]; + ulong score = peer_bucket_score( e->stake, bucket_idx ); + e->bucket_idx[ bucket_idx ] = BUCKET_ENTRY_IDX_SENTINEL; + wpeer_sampler_upd( bucket->sampler, score, crds_idx ); } else { - replace_idx = entry->nodes_len; + FD_TEST( !!entry_pool_free( active_set->entry_pool ) ); + replace_idx = entry_pool_idx_acquire( active_set->entry_pool ); + replace = entry_pool_ele( active_set->entry_pool, replace_idx ); + fd_gossip_txbuild_init( replace->push_state, FD_GOSSIP_MESSAGE_PUSH ); } - fd_active_set_peer_t * replace = entry->nodes[ replace_idx ]; + wpeer_sampler_upd( bucket->sampler, 0UL, crds_idx ); + + peer_meta_t * e = &active_set->peer_metas[ crds_idx ]; + e->bucket_idx[ bucket_idx ] = (ushort)replace_idx; + + fd_contact_info_t const * new_peer = fd_crds_contact_info_idx_lookup( crds, crds_idx ); + + replace->entry_idx = crds_idx; + replace->last_hit.wallclock_nanos = now; + fd_bloom_initialize( replace->bloom, num_bloom_filter_items ); + fd_bloom_insert ( replace->bloom, new_peer->pubkey.uc, 32UL ); + fd_memcpy ( replace->pubkey, new_peer->pubkey.uc, 32UL ); + + bucket->cnt = fd_ulong_min( bucket->cnt+1UL, FD_ACTIVE_SET_PEERS_PER_BUCKET ); + bucket_insert_dlist_idx_push_tail( bucket->insert_dlist, replace_idx, active_set->entry_pool ); + + last_hit_idx_push_tail( active_set->last_hit, replace_idx, active_set->entry_pool ); +} - fd_contact_info_t const * new_peer = fd_crds_bucket_sample_and_remove( crds, active_set->rng, bucket ); - if( FD_UNLIKELY( !new_peer ) ) { - return ULONG_MAX; +void +fd_active_set_peer_insert( fd_active_set_t * active_set, ulong idx, ulong stake ) { + /* if IDX is reused, we assumed it was cleaned up with remove peer */ + peer_meta_t * e = &active_set->peer_metas[idx]; + + e->stake = stake; + for( ulong j=0UL; jbucket_idx[j] = BUCKET_ENTRY_IDX_SENTINEL; + wpeer_sampler_t * bucket_sampler = active_set->buckets[j]->sampler; + ulong score = peer_bucket_score( stake, j ); + FD_TEST( !wpeer_sampler_upd( bucket_sampler, score, idx ) ); + } +} + +ulong +fd_active_set_peer_remove( fd_active_set_t * active_set, + ulong idx, + fd_active_set_push_state_t out_evicted_states[ static FD_ACTIVE_SET_STAKE_BUCKETS ] ) { + ulong flush_cnt = 0UL; + peer_meta_t * e = &active_set->peer_metas[idx]; + for( ulong j=0UL; jbucket_idx[j]==BUCKET_ENTRY_IDX_SENTINEL ) ) continue; + fd_active_set_bucket_t * bucket = active_set->buckets[j]; + fd_active_set_bucket_entry_t * entry = entry_pool_ele( active_set->entry_pool, e->bucket_idx[j] ); + + fd_active_set_push_state_t * must_flush = &out_evicted_states[flush_cnt++]; + must_flush->txbuild = entry->push_state; + must_flush->crds_idx = idx; + + bucket_insert_dlist_idx_remove( bucket->insert_dlist, e->bucket_idx[j], active_set->entry_pool ); + FD_TEST( !!bucket->cnt ); + bucket->cnt--; + + last_hit_idx_remove ( active_set->last_hit, e->bucket_idx[j], active_set->entry_pool ); + entry_pool_idx_release( active_set->entry_pool, e->bucket_idx[j] ); + + wpeer_sampler_upd( bucket->sampler, 0UL, idx ); + } + + return flush_cnt; +} + +void +fd_active_set_peer_update_stake( fd_active_set_t * active_set, ulong idx, ulong new_stake ) { + peer_meta_t * e = &active_set->peer_metas[idx]; + + e->stake = new_stake; + for( ulong j=0UL; jbucket_idx[j]!=BUCKET_ENTRY_IDX_SENTINEL ) ) continue; + ulong score = peer_bucket_score( new_stake, j ); + wpeer_sampler_t * bucket_sampler = active_set->buckets[j]->sampler; + FD_TEST( !wpeer_sampler_upd( bucket_sampler, score, idx ) ); } +} + +int +fd_active_set_flush_stale_advance( fd_active_set_t * active_set, + long stale_if_before, + long now, + fd_active_set_push_state_t * maybe_flush ) { + if( FD_LIKELY( last_hit_is_empty( active_set->last_hit, active_set->entry_pool ) ) ) return 0; + + fd_active_set_bucket_entry_t * peer = last_hit_ele_peek_head( active_set->last_hit, active_set->entry_pool ); + if( FD_LIKELY( peer->last_hit.wallclock_nanos>=stale_if_before ) ) return 0; + maybe_flush->txbuild = peer->push_state; + maybe_flush->crds_idx = peer->entry_idx; + peer->last_hit.wallclock_nanos = now; + last_hit_ele_pop_head ( active_set->last_hit, active_set->entry_pool ); + last_hit_ele_push_tail( active_set->last_hit, peer, active_set->entry_pool ); + return 1; - fd_bloom_initialize( replace->bloom, num_bloom_filter_items ); - fd_bloom_insert( replace->bloom, new_peer->pubkey.uc, 32UL ); - fd_memcpy( replace->pubkey, new_peer->pubkey.uc, 32UL ); - entry->nodes_len = fd_ulong_min( entry->nodes_len+1UL, 12UL ); - return bucket*12UL+replace_idx; } diff --git a/src/flamenco/gossip/fd_active_set.h b/src/flamenco/gossip/fd_active_set.h index 1a9e67814cb..d2e25d8e965 100644 --- a/src/flamenco/gossip/fd_active_set.h +++ b/src/flamenco/gossip/fd_active_set.h @@ -1,11 +1,13 @@ #ifndef HEADER_fd_src_flamenco_gossip_fd_active_set_h #define HEADER_fd_src_flamenco_gossip_fd_active_set_h -#include "fd_bloom.h" +#include "fd_gossip_txbuild.h" #include "crds/fd_crds.h" /* fd_active_set provides APIs for tracking the active set of nodes we - should push messages to in a gossip network. + should push messages to in a gossip network. It is tightly coupled + with the contact info sidetable in fd_crds, making use of the + index into the sidetable as a stable identifier for peers. In the Solana gossip protocol, each node selects a random set of up to 300 peers to send messages to, and then rotates one of the nodes @@ -14,7 +16,7 @@ This is simple enough: just keep a list of the peer pubkeys, and occasionally replace one? - There's two complications: + There's three complications: (1) We want to select peers with a good distribution of stakes, so that we don't end up sending to a lot of low-stake peers if @@ -24,46 +26,42 @@ other originating (origin) nodes to them, because they already have a lot of paths from that node. This is called a prune. - Complication (1) is handled by keeping a list of the top 12 peers - (sorted by stake) for each of 25 buckets of stakes. These buckets - are all rotated together. + (3) We need to gracefully update the active set if a peer either + changes its stake or enters/leaves the network. - And problem (2) is solved by keeping a bloom filter for each of the + Complication (1) is handled by keeping a list of 12 peers for each + of 25 buckets of stakes. These buckets are rotated with a weighted + shuffle specific to the stake bucket. Note that a single peer can + appear in multiple buckets, but each bucket has a unique set of + peers. + + Problem (2) is solved by keeping a bloom filter for each of the 12 peers in each bucket. The bloom filter is used to track which - origins the peer has pruned. */ - -#define FD_ACTIVE_SET_STAKE_ENTRIES (25UL) -#define FD_ACTIVE_SET_PEERS_PER_ENTRY (12UL) -#define FD_ACTIVE_SET_MAX_PEERS (FD_ACTIVE_SET_STAKE_ENTRIES*FD_ACTIVE_SET_PEERS_PER_ENTRY) /* 300 */ -struct fd_active_set_peer { - uchar pubkey[ 32UL ]; - fd_bloom_t * bloom; -}; + origins the peer has pruned. -typedef struct fd_active_set_peer fd_active_set_peer_t; + A set of peer update APIs are provided to handle the peer's changes + described in (3). The supplied index maps to the peer's entry in + contact info sidetable in fd_crds. */ -struct fd_active_set_entry { - ulong nodes_idx; /* points to oldest entry in set */ - ulong nodes_len; - fd_active_set_peer_t nodes[ FD_ACTIVE_SET_PEERS_PER_ENTRY ][ 1UL ]; -}; +#define FD_ACTIVE_SET_STAKE_BUCKETS (25UL) +#define FD_ACTIVE_SET_PEERS_PER_BUCKET (12UL) +#define FD_ACTIVE_SET_MAX_PEERS (FD_ACTIVE_SET_STAKE_BUCKETS*FD_ACTIVE_SET_PEERS_PER_BUCKET) /* 300 */ -typedef struct fd_active_set_entry fd_active_set_entry_t; - -#define FD_ACTIVE_SET_ALIGN (64UL) +/* fd_active_set_push_state holds the state for a particular + (bucket, peer) pair that is in rotation. */ +struct fd_active_set_push_state { + fd_gossip_txbuild_t * txbuild; + ulong crds_idx; /* index into the CRDS contact info sidetable. */ +}; -struct __attribute__((aligned(FD_ACTIVE_SET_ALIGN))) fd_active_set_private { - fd_active_set_entry_t entries[ FD_ACTIVE_SET_STAKE_ENTRIES ][ 1UL ]; +typedef struct fd_active_set_push_state fd_active_set_push_state_t; - fd_rng_t * rng; - ulong magic; /* ==FD_ACTIVE_SET_MAGIC */ -}; +#define FD_ACTIVE_SET_ALIGN (128UL) +struct fd_active_set_private; typedef struct fd_active_set_private fd_active_set_t; -#define FD_ACTIVE_SET_FOOTPRINT (sizeof(fd_active_set_t)) - #define FD_ACTIVE_SET_MAGIC (0xF17EDA2CEA5E1000) /* FIREDANCE ASET V0 */ FD_PROTOTYPES_BEGIN @@ -87,45 +85,104 @@ fd_active_set_join( void * shas ); is non-zero, in which case the list will include a peer if its pubkey matches the origin pubkey. - Up to 12 peer nodes will be returned in out_nodes. The values - returned in out_nodes are an internal peer index of the active set - and should not be used for anything other than calling - fd_active_set_node_pubkey to get the pubkey of the peer. The - peer index is only valid for the current active set and should not be - used after a call to fd_active_set_rotate or fd_active_set_prune. */ + Up to 12 peer push states will be returned in out_push_states. The + states are expected to be used (for appending) immediately, + and the user is expected to flush and reset (with + fd_gossip_txbuild_init) the push states if they are too full to fit + a new CRDS value. The states returned in out_push_states are only + valid for the current active set and should not be used after a call + to any of the fd_active_set APIs below. */ ulong -fd_active_set_nodes( fd_active_set_t * active_set, - uchar const * identity_pubkey, - ulong identity_stake, - uchar const * origin, - ulong origin_stake, - int ignore_prunes_if_peer_is_origin, - ulong out_nodes[ static 12UL ] ); - -uchar const * -fd_active_set_node_pubkey( fd_active_set_t * active_set, - ulong peer_idx ); +fd_active_set_nodes( fd_active_set_t * active_set, + uchar const * identity_pubkey, + ulong identity_stake, + uchar const * origin, + ulong origin_stake, + int ignore_prunes_if_peer_is_origin, + long now, + fd_active_set_push_state_t out_push_states[ static FD_ACTIVE_SET_PEERS_PER_BUCKET ] ); + +/* fd_active_set_prune adds origin to a peer's pruned bloom filter. The + prune record persists for the time in which the peer is in the active + set. */ void fd_active_set_prune( fd_active_set_t * active_set, - uchar const * push_dest, + uchar const * peer, uchar const * origin, ulong origin_stake, uchar const * identity_pubkey, ulong identity_stake ); -/* fd_active_set_rotate chooses a random active set entry to swap/introduce - a peer into. The peer is sampled from a distribution - (provided by crds) specific to the active set bucket. +/* fd_active_set_rotate chooses a random active bucket entry to + swap/introduce a peer into. The peer is sampled from a distribution + (provided by crds) specific to the active set bucket. If there are + no peers available to sample from, the function is a no-op. + + If a peer is swapped out of the bucket, its push state + will be supplied in out_maybe_flush. out_maybe_flush->txbuild is NULL + otherwise. The push state is valid until the next call to any + fd_active_set API. If a state is supplied, the user is expected to + flush and reset (with fd_gossip_txbuild_init) the state immediately. + */ - returns the index that is being replaced within the - 300 peer set. This allows users to maintain data structures that track the - active set. Returns ULONG_MAX if no peer replacement is found. */ +void +fd_active_set_rotate( fd_active_set_t * active_set, + fd_crds_t * crds, + long now, + fd_active_set_push_state_t * out_maybe_flush ); + +/* fd_active_set_flush_stale_advance checks the least recently hit push + state in the active set and determines if it should be flushed. A + push state is considered stale if its last-updated timestamp is older + than stale_if_before. + + If the least recently hit push state is stale, it is is extracted + into maybe_flush, its timestamp is refreshed to now, and it is moved + to the back of the LRU queue. The caller is expected to flush and + reset (with fd_gossip_txbuild_init) the push state immediately. + + This function processes at most one push state per call, allowing + the caller to interleave flushing with other operations. Since push + states are ordered by last_hit timestamp, if the least recently hit + push state is not stale, no other states will be stale either. + + Returns 1 if a stale state was found and maybe_flush contains valid + state to flush. Returns 0 if no stale states exist or the active + set is empty. The push state in maybe_flush is valid until the next + call to any fd_active_set API. */ + +int +fd_active_set_flush_stale_advance( fd_active_set_t * active_set, + long stale_if_before, + long now, + fd_active_set_push_state_t * maybe_flush ); + +/* The fd_active_set_peer_{insert, remove, update_stake} APIs track + the relevant changes to the fd_crds contact info sidetable. crds_idx + refers to the index to the peer in the sidetable, and should be + retrieved by the callback API provided in fd_crds. */ + +void +fd_active_set_peer_insert( fd_active_set_t * active_set, ulong crds_idx, ulong stake ); +/* When a peer is removed from the active set entirely, + fd_active_set_peer_remove returns the number of active push states + (across all buckets) belonging to the evicted peer. These states are + populated in out_evicted_states, and are only valid until the next + call to any fd_active_set API. + + The user is expected to reset these push states + (with fd_gossip_txbuild_init). Flushing these states is optional. */ ulong -fd_active_set_rotate( fd_active_set_t * active_set, - fd_crds_t * crds ); +fd_active_set_peer_remove( fd_active_set_t * active_set, + ulong crds_idx, + fd_active_set_push_state_t out_evicted_states[ static FD_ACTIVE_SET_STAKE_BUCKETS ] ); + +void +fd_active_set_peer_update_stake( fd_active_set_t * active_set, ulong crds_idx, ulong new_stake ); + FD_PROTOTYPES_END diff --git a/src/flamenco/gossip/fd_gossip.c b/src/flamenco/gossip/fd_gossip.c index 859d5e6ddc5..97f4f719853 100644 --- a/src/flamenco/gossip/fd_gossip.c +++ b/src/flamenco/gossip/fd_gossip.c @@ -3,6 +3,7 @@ #include "fd_gossip_private.h" #include "fd_gossip_txbuild.h" #include "fd_active_set.h" +#include "fd_gossip_types.h" #include "fd_ping_tracker.h" #include "crds/fd_crds.h" #include "../../disco/keyguard/fd_keyguard.h" @@ -57,8 +58,6 @@ typedef struct stake stake_t; #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1 #include "../../util/tmpl/fd_map_chain.c" -#include "fd_push_set_private.c" - struct fd_gossip_private { uchar identity_pubkey[ 32UL ]; ulong identity_stake; @@ -106,10 +105,6 @@ struct fd_gossip_private { fd_contact_info_t ci[1]; } my_contact_info; - /* Push state for each peer in the active set. Tracks the active set, - and must be flushed prior to a call to fd_active_set_rotate or - fd_active_set_prune. */ - push_set_t * active_pset; fd_gossip_out_ctx_t * gossip_net_out; }; @@ -129,7 +124,6 @@ fd_gossip_footprint( ulong max_values, l = FD_LAYOUT_APPEND( l, fd_ping_tracker_align(), fd_ping_tracker_footprint( entrypoints_len ) ); l = FD_LAYOUT_APPEND( l, stake_pool_align(), stake_pool_footprint( CRDS_MAX_CONTACT_INFO ) ); l = FD_LAYOUT_APPEND( l, stake_map_align(), stake_map_footprint( stake_map_chain_cnt_est( CRDS_MAX_CONTACT_INFO ) ) ); - l = FD_LAYOUT_APPEND( l, push_set_align(), push_set_footprint( FD_ACTIVE_SET_MAX_PEERS ) ); l = FD_LAYOUT_FINI( l, fd_gossip_align() ); return l; } @@ -145,8 +139,8 @@ ping_tracker_change( void * _ctx, if( FD_UNLIKELY( !memcmp( peer_pubkey, ctx->identity_pubkey, 32UL ) ) ) return; switch( change_type ) { - case FD_PING_TRACKER_CHANGE_TYPE_ACTIVE: fd_crds_peer_active( ctx->crds, peer_pubkey, now ); break; - case FD_PING_TRACKER_CHANGE_TYPE_INACTIVE: fd_crds_peer_inactive( ctx->crds, peer_pubkey, now ); break; + case FD_PING_TRACKER_CHANGE_TYPE_ACTIVE: fd_crds_peer_active( ctx->crds, peer_pubkey ); break; + case FD_PING_TRACKER_CHANGE_TYPE_INACTIVE: fd_crds_peer_inactive( ctx->crds, peer_pubkey ); break; case FD_PING_TRACKER_CHANGE_TYPE_INACTIVE_STAKED: break; default: FD_LOG_ERR(( "Unknown change type %d", change_type )); return; } @@ -154,6 +148,14 @@ ping_tracker_change( void * _ctx, ctx->ping_tracker_change_fn( ctx->ping_tracker_change_fn_ctx, peer_pubkey, peer_address, now, change_type ); } +static void +ci_change( void * _ctx, + ulong crds_pool_idx, + ulong stake, + int change_type, + fd_stem_context_t * stem, + long now ); + void * fd_gossip_new( void * shmem, fd_rng_t * rng, @@ -198,7 +200,6 @@ fd_gossip_new( void * shmem, void * ping_tracker = FD_SCRATCH_ALLOC_APPEND( l, fd_ping_tracker_align(), fd_ping_tracker_footprint( entrypoints_len ) ); void * stake_pool = FD_SCRATCH_ALLOC_APPEND( l, stake_pool_align(), stake_pool_footprint( CRDS_MAX_CONTACT_INFO ) ); void * stake_weights = FD_SCRATCH_ALLOC_APPEND( l, stake_map_align(), stake_map_footprint( stake_map_chain_cnt ) ); - void * active_ps = FD_SCRATCH_ALLOC_APPEND( l, push_set_align(), push_set_footprint( FD_ACTIVE_SET_MAX_PEERS ) ); FD_TEST( FD_SCRATCH_ALLOC_FINI( l, fd_gossip_align() ) == (ulong)shmem + fd_gossip_footprint( max_values, entrypoints_len ) ); gossip->gossip_net_out = gossip_net_out; @@ -206,7 +207,7 @@ fd_gossip_new( void * shmem, gossip->entrypoints_cnt = entrypoints_len; fd_memcpy( gossip->entrypoints, entrypoints, entrypoints_len*sizeof(fd_ip4_port_t) ); - gossip->crds = fd_crds_join( fd_crds_new( crds, rng, max_values, max_values, gossip_update_out ) ); + gossip->crds = fd_crds_join( fd_crds_new( crds, rng, max_values, max_values, gossip_update_out, ci_change, gossip ) ); FD_TEST( gossip->crds ); gossip->active_set = fd_active_set_join( fd_active_set_new( active_set, rng ) ); @@ -222,9 +223,6 @@ fd_gossip_new( void * shmem, gossip->stake.map = stake_map_join( stake_map_new( stake_weights, stake_map_chain_cnt, fd_rng_ulong( rng ) ) ); FD_TEST( gossip->stake.map ); - gossip->active_pset = push_set_join( push_set_new( active_ps, FD_ACTIVE_SET_MAX_PEERS ) ); - FD_TEST( gossip->active_pset ); - FD_TEST( fd_sha256_join( fd_sha256_new( gossip->sha256 ) ) ); FD_TEST( fd_sha512_join( fd_sha512_new( gossip->sha512 ) ) ); @@ -293,6 +291,7 @@ txbuild_flush( fd_gossip_t * gossip, long now ) { if( FD_UNLIKELY( !txbuild->crds_len ) ) return; + fd_gossip_txbuild_stamp_pubkey( txbuild, gossip->identity_pubkey ); gossip->send_fn( gossip->send_ctx, stem, txbuild->bytes, txbuild->bytes_len, &dest_addr, (ulong)now ); gossip->metrics->message_tx[ txbuild->tag ]++; @@ -307,28 +306,17 @@ txbuild_flush( fd_gossip_t * gossip, } } - fd_gossip_txbuild_init( txbuild, gossip->identity_pubkey, txbuild->tag ); + fd_gossip_txbuild_init( txbuild, txbuild->tag ); } -/* Note: NOT a no-op in the case contact info does not exist. We - reset and push it back to the last-hit queue instead. - - TODO: Is this desired behavior? */ - static void -active_push_set_flush( fd_gossip_t * gossip, - push_set_t * pset, - ulong idx, - fd_stem_context_t * stem, - long now ) { - fd_contact_info_t const * ci = fd_crds_contact_info_lookup( gossip->crds, fd_active_set_node_pubkey( gossip->active_set, idx ) ); - push_set_entry_t * state = pset_entry_pool_ele( pset->pool, idx ); - if( FD_LIKELY( ci ) ) { - txbuild_flush( gossip, state->txbuild, stem, fd_contact_info_gossip_socket( ci ), now ); - } else { - fd_gossip_txbuild_init( state->txbuild, gossip->identity_pubkey, state->txbuild->tag ); - } - push_set_pop_append( pset, state, now ); +active_push_set_flush( fd_gossip_t * gossip, + fd_active_set_push_state_t * push_state, + fd_stem_context_t * stem, + long now ) { + fd_contact_info_t const * ci = fd_crds_contact_info_idx_lookup( gossip->crds, push_state->crds_idx ); + FD_TEST( ci ); + txbuild_flush( gossip, push_state->txbuild, stem, fd_contact_info_gossip_socket( ci ), now ); } static void @@ -340,29 +328,63 @@ active_push_set_insert( fd_gossip_t * gossip, fd_stem_context_t * stem, long now, int flush_immediately ) { - ulong out_nodes[ 12UL ]; - ulong out_nodes_cnt = fd_active_set_nodes( gossip->active_set, - gossip->identity_pubkey, - gossip->identity_stake, - origin_pubkey, - origin_stake, - 0UL, /* ignore_prunes_if_peer_is_origin TODO */ - out_nodes ); - for( ulong j=0UL; jactive_pset->pool, idx ); + fd_active_set_push_state_t push_states[ FD_ACTIVE_SET_PEERS_PER_BUCKET ]; + ulong states_cnt = fd_active_set_nodes( gossip->active_set, + gossip->identity_pubkey, + gossip->identity_stake, + origin_pubkey, + origin_stake, + 0UL, /* ignore_prunes_if_peer_is_origin TODO */ + now, + push_states ); + for( ulong j=0UL; jtxbuild, crds_sz ) ) ) { - active_push_set_flush( gossip, gossip->active_pset, idx, stem, now ); + active_push_set_flush( gossip, entry, stem, now ); } - fd_gossip_txbuild_append( entry->txbuild, crds_sz, crds_val ); - push_set_pop_append( gossip->active_pset, entry, now ); if( FD_UNLIKELY( !!flush_immediately ) ) { - active_push_set_flush( gossip, gossip->active_pset, idx, stem, now ); + active_push_set_flush( gossip, entry, stem, now ); } } } + +static void +ci_change( void * _ctx, + ulong crds_pool_idx, + ulong stake, + int change_type, + fd_stem_context_t * stem, + long now ) { + fd_gossip_t * ctx = (fd_gossip_t *)_ctx; + switch( change_type ) { + case FD_CRDS_CONTACT_INFO_CHANGE_TYPE_NEW: fd_active_set_peer_insert( ctx->active_set, crds_pool_idx, stake ); break; + case FD_CRDS_CONTACT_INFO_CHANGE_TYPE_REMOVED: + { + fd_active_set_push_state_t states_to_flush[ FD_ACTIVE_SET_STAKE_BUCKETS ]; + ulong state_cnt = fd_active_set_peer_remove( ctx->active_set, crds_pool_idx, states_to_flush ); + for( ulong i=0UL; iactive_set, crds_pool_idx, stake ); break; + case FD_CRDS_CONTACT_INFO_CHANGE_IDENTITY_CHANGED: + { + FD_TEST( stem==NULL ); + fd_active_set_push_state_t states_to_reset[ FD_ACTIVE_SET_STAKE_BUCKETS ]; + ulong state_cnt = fd_active_set_peer_remove( ctx->active_set, crds_pool_idx, states_to_reset ); + for( ulong i=0UL; iidentity_pubkey, contact_info->pubkey.uc, 32UL ) ) ){ + FD_LOG_WARNING(( "Changing identity pubkey" )); + fd_crds_handle_identity_change( gossip->crds, contact_info->pubkey.uc ); + + } fd_memcpy( gossip->identity_pubkey, contact_info->pubkey.uc, 32UL ); *gossip->my_contact_info.ci = *contact_info; @@ -457,7 +484,7 @@ rx_pull_request( fd_gossip_t * gossip, filter->bits = (ulong *)( payload + pr_view->bloom_bits_offset ); fd_gossip_txbuild_t pull_resp[1]; - fd_gossip_txbuild_init( pull_resp, gossip->identity_pubkey, FD_GOSSIP_MESSAGE_PULL_RESPONSE ); + fd_gossip_txbuild_init( pull_resp, FD_GOSSIP_MESSAGE_PULL_RESPONSE ); uchar iter_mem[ 16UL ]; @@ -872,36 +899,20 @@ static inline void rotate_active_set( fd_gossip_t * gossip, fd_stem_context_t * stem, long now ) { - push_set_t * pset = gossip->active_pset; - ulong replaced_idx = fd_active_set_rotate( gossip->active_set, gossip->crds ); - if( FD_UNLIKELY( replaced_idx==ULONG_MAX ) ) { - return; - } - push_set_entry_t * entry = pset_entry_pool_ele( pset->pool, replaced_idx ); - if( FD_LIKELY( !!entry->pool.in_use ) ) { - active_push_set_flush( gossip, pset, replaced_idx, stem, now ); - } else { - entry->pool.in_use = 1U; - entry->last_hit.wallclock_nanos = now; - pset_last_hit_ele_push_tail( pset->last_hit, entry, pset->pool ); - } - - fd_gossip_txbuild_init( entry->txbuild, gossip->identity_pubkey, FD_GOSSIP_MESSAGE_PUSH ); + fd_active_set_push_state_t maybe_flush[ 1UL ]; + fd_active_set_rotate( gossip->active_set, gossip->crds, now, maybe_flush ); + if( FD_LIKELY( maybe_flush->txbuild ) ) active_push_set_flush( gossip, maybe_flush, stem, now ); } static inline void flush_stale_push_states( fd_gossip_t * gossip, fd_stem_context_t * stem, long now ) { - long stale_if_before = now-1*1000L*1000L; - push_set_t * push_set = gossip->active_pset; - if( FD_UNLIKELY( pset_last_hit_is_empty( push_set->last_hit, push_set->pool ) ) ) return; - + long stale_if_before = now-1*1000L*1000L; + fd_active_set_push_state_t state[ 1UL ]; for(;;) { - push_set_entry_t * entry = pset_last_hit_ele_peek_head( push_set->last_hit, push_set->pool ); - ulong entry_idx = pset_entry_pool_idx( push_set->pool, entry ); - if( FD_UNLIKELY( entry->last_hit.wallclock_nanos>stale_if_before ) ) break; - active_push_set_flush( gossip, push_set, entry_idx, stem, now ); + if( FD_UNLIKELY( !fd_active_set_flush_stale_advance( gossip->active_set, stale_if_before, now, state ) ) ) break; + active_push_set_flush( gossip, state, stem, now ); } } diff --git a/src/flamenco/gossip/fd_gossip_txbuild.c b/src/flamenco/gossip/fd_gossip_txbuild.c index c2ddd9e541d..d311ab133a9 100644 --- a/src/flamenco/gossip/fd_gossip_txbuild.c +++ b/src/flamenco/gossip/fd_gossip_txbuild.c @@ -20,7 +20,6 @@ typedef struct crds_msg crds_msg_t; void fd_gossip_txbuild_init( fd_gossip_txbuild_t * txbuild, - uchar const * identity_pubkey, uchar msg_type ) { txbuild->tag = msg_type; txbuild->bytes_len = 44UL; /* offsetof( crds_msg_t, crds ) */ @@ -28,10 +27,16 @@ fd_gossip_txbuild_init( fd_gossip_txbuild_t * txbuild, crds_msg_t * msg = (crds_msg_t *)txbuild->bytes; msg->msg_type = msg_type; - fd_memcpy( msg->identity_pubkey, identity_pubkey, 32UL ); msg->crds_len = 0UL; } +void +fd_gossip_txbuild_stamp_pubkey( fd_gossip_txbuild_t * txbuild, + uchar const * pubkey ) { + crds_msg_t * msg = (crds_msg_t *)txbuild->bytes; + fd_memcpy( msg->identity_pubkey, pubkey, 32UL ); +} + int fd_gossip_txbuild_can_fit( fd_gossip_txbuild_t const * txbuild, ulong crds_len ) { diff --git a/src/flamenco/gossip/fd_gossip_txbuild.h b/src/flamenco/gossip/fd_gossip_txbuild.h index ceed5321608..9ec08d8f7ed 100644 --- a/src/flamenco/gossip/fd_gossip_txbuild.h +++ b/src/flamenco/gossip/fd_gossip_txbuild.h @@ -6,7 +6,7 @@ /* fd_gossip_txbuild_t provides a set of APIs to incrementally build a push or pull response message from CRDS values. The caller is responsible for checking there is space before appending a new value, - and flushing the final message. */ + and flushing the final message with the identity pubkey stamped. */ struct fd_gossip_txbuild { uchar tag; @@ -26,15 +26,21 @@ typedef struct fd_gossip_txbuild fd_gossip_txbuild_t; FD_PROTOTYPES_BEGIN -/* fd_gossip_txbuild_init() initializes the builder with the identity - pubkey and message type (FD_GOSSIP_MESSAGE_PULL_RESPONSE or +/* fd_gossip_txbuild_init() initializes the builder with the + message type (FD_GOSSIP_MESSAGE_PULL_RESPONSE or FD_GOSSIP_MESSAGE_PUSH). */ void fd_gossip_txbuild_init( fd_gossip_txbuild_t * txbuild, - uchar const * identity_pubkey, uchar tag ); +/* fd_gossip_txbuild_stamp_pubkey stamps the identity pubkey into the + message. This must be called in order to form a valid + push or pull response message. */ +void +fd_gossip_txbuild_stamp_pubkey( fd_gossip_txbuild_t * txbuild, + uchar const * pubkey ); + /* fd_gossip_txbuild_can_fit() returns 1 if the outgoing message can fit an additional CRDS value of size crds_len, or 0 otherwise. If the message cannot fit it is undefined behavior to append it. */ diff --git a/src/flamenco/gossip/fd_gossip_wpeer_sampler.c b/src/flamenco/gossip/fd_gossip_wpeer_sampler.c new file mode 100644 index 00000000000..1cdc370fb67 --- /dev/null +++ b/src/flamenco/gossip/fd_gossip_wpeer_sampler.c @@ -0,0 +1,125 @@ +#include "fd_gossip_wpeer_sampler.h" +#include "crds/fd_crds.h" + +/* This is a very rudimentary implementation of a weighted sampler. We + will eventually want to use a modified fd_wsample. */ +struct wpeer_sampler_private { + /* Cumulative weight for each peer. + Individual peer weight can be derived with + cumul_weight[i]-cumul_weight[i-1] */ + ulong * cumul_weight; + ulong max_idx; +}; + +#define PREV_PEER_WEIGHT( ps, idx ) \ + ( (idx) ? (ps)->cumul_weight[(idx)-1] : 0UL ) + + +FD_FN_CONST ulong +wpeer_sampler_footprint( ulong max_peers ) { + ulong l; + l = FD_LAYOUT_INIT; + l = FD_LAYOUT_APPEND( l, wpeer_sampler_align(), sizeof(wpeer_sampler_t) ); + l = FD_LAYOUT_APPEND( l, alignof(ulong), sizeof(ulong)*max_peers ); + return FD_LAYOUT_FINI( l, wpeer_sampler_align() ); +} + +void * +wpeer_sampler_new( void * shmem, ulong max_peers ) { + if( FD_UNLIKELY( !shmem ) ){ + FD_LOG_WARNING(( "NULL shmem" )); + return NULL; + } + + if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, wpeer_sampler_align() ) ) ) { + FD_LOG_WARNING(( "misaligned shmem" )); + return NULL; + } + + FD_SCRATCH_ALLOC_INIT( l, shmem ); + wpeer_sampler_t * ws = FD_SCRATCH_ALLOC_APPEND( l, wpeer_sampler_align(), sizeof(wpeer_sampler_t) ); + ws->cumul_weight = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), sizeof(ulong)*max_peers ); + FD_TEST( FD_SCRATCH_ALLOC_FINI( l, wpeer_sampler_align() )==(ulong)shmem + wpeer_sampler_footprint( max_peers ) ); + + fd_memset( ws->cumul_weight, 0, sizeof(ulong)*max_peers ); + ws->max_idx = ULONG_MAX; + return (void *)ws; +} + +wpeer_sampler_t * +wpeer_sampler_join( void * shmem ) { + + if( FD_UNLIKELY( !shmem ) ){ + FD_LOG_WARNING(( "null shmem" )); + return NULL; + } + + if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, wpeer_sampler_align() ) ) ) { + FD_LOG_WARNING(( "misaligned shmem" )); + return NULL; + } + + wpeer_sampler_t * ws = (wpeer_sampler_t *) shmem; + + return ws; + +} + +ulong +wpeer_sampler_sample( wpeer_sampler_t const * ps, + fd_rng_t * rng ) { + if( FD_UNLIKELY( ps->max_idx == ULONG_MAX || !ps->cumul_weight[ps->max_idx] ) ) { + return SAMPLE_IDX_SENTINEL; + } + + ulong sample = fd_rng_ulong_roll( rng, ps->cumul_weight[ps->max_idx] ); + /* avoid sampling 0 */ + sample = fd_ulong_min( sample+1UL, ps->cumul_weight[ps->max_idx] ); + + /* Binary search for the smallest cumulative weight >= sample */ + ulong left = 0UL; + ulong right = ps->max_idx+1; + while( left < right ) { + ulong mid = left + (right - left) / 2UL; + if( ps->cumul_weight[mid]max_idx ) ) { + ps->cumul_weight[idx] = 0UL; + ps->max_idx = ( idx==0UL ) ? ULONG_MAX : idx-1; + return 0; + } + /* Handle edge case where ps->max_idx is ULONG_MAX (sampler is empty) */ + if( FD_UNLIKELY( ps->max_idx == ULONG_MAX ) ) { + ps->max_idx = idx; + } else { + ps->max_idx = fd_ulong_max( ps->max_idx, idx ); + } + + ulong old_weight = ps->cumul_weight[idx] - PREV_PEER_WEIGHT( ps, idx ); + if( FD_UNLIKELY( old_weight==weight ) ) return 0; + + if( weight>old_weight ) { + for( ulong i=idx; imax_idx+1; i++ ) { + ps->cumul_weight[i] += (weight - old_weight); + } + } else { + for( ulong i=idx; imax_idx+1; i++ ) { + ps->cumul_weight[i] -= (old_weight - weight); + } + } + return 0; +} diff --git a/src/flamenco/gossip/fd_gossip_wpeer_sampler.h b/src/flamenco/gossip/fd_gossip_wpeer_sampler.h new file mode 100644 index 00000000000..dfb35dd1ba3 --- /dev/null +++ b/src/flamenco/gossip/fd_gossip_wpeer_sampler.h @@ -0,0 +1,60 @@ +#ifndef HEADER_fd_src_flamenco_gossip_fd_gossip_wpeer_sampler_h +#define HEADER_fd_src_flamenco_gossip_fd_gossip_wpeer_sampler_h + +#include "../../util/fd_util.h" + +/* wpeer_sampler provides a set of APIs to maintain a weighted sampler + with the ability to change weights dynamically on a runtime-bounded + element set. The sampler is designed to be used for sampling peers + in various parts of the gossip protocol. Users supply weights/score + updates in wpeer_sampler_upd and sample with wpeer_sampler_sample. + + The sampler works in terms of indices into an array of peers. The + user is responsible for maintaining a mapping between peers and + indices. The sampler does not store any information about the peers + themselves. fd_crds provides a set of APIs to track a peer's contact + info with an index to the contact info sidetable. These are provided + by fd_crds_ci_change_fn callbacks, with an API to lookup the + corresponding Contact Info. + + Why not use fd_wsample? The peer population constantly changes + throughout the epoch, as nodes enter, leave, or become + (un)responsive in the cluster. The fd_wsample APIs (currenlty) + do not provide the ability to change individual peer weights without + clearing the sampler and recalculating scores. */ + +#define SAMPLE_IDX_SENTINEL ULONG_MAX + +struct wpeer_sampler_private; +typedef struct wpeer_sampler_private wpeer_sampler_t; + +FD_FN_CONST static inline ulong +wpeer_sampler_align( void ) { + return 8UL; +} + +FD_FN_CONST ulong +wpeer_sampler_footprint( ulong max_peers ); + +void * +wpeer_sampler_new( void * shmem, ulong max_peers ); + +wpeer_sampler_t * +wpeer_sampler_join( void * shmem ); + +/* wpeer_sampler_sample returns the index of the entry sampled + by the weighted sampler. Does not sample entries with 0 weight. */ +ulong +wpeer_sampler_sample( wpeer_sampler_t const * ps, + fd_rng_t * rng ); + +/* wpeer_sampler_upd updates the weight/score of the entry at index idx + to weight. idx must be in [0, max_peers). weight can be 0, which + effectively disables the entry in the sampler population. */ +int +wpeer_sampler_upd( wpeer_sampler_t * ps, + ulong weight, + ulong idx ); + + +#endif diff --git a/src/flamenco/gossip/fd_push_set_private.c b/src/flamenco/gossip/fd_push_set_private.c deleted file mode 100644 index aedebf3fc3a..00000000000 --- a/src/flamenco/gossip/fd_push_set_private.c +++ /dev/null @@ -1,102 +0,0 @@ -#include "fd_gossip_txbuild.h" - -struct push_set_entry { - fd_gossip_txbuild_t txbuild[1]; - - /* We use fd_pool APIs because fd_dlist doesn't work without it and - there are no suitable linked list APIs. However, because we want - to track the active set indices, we also need to bypass the native - acquire/release mechanism provided by fd_pool and use it as an - array instead. */ - - struct { - ulong next; - uchar in_use; - } pool; - - struct{ - long wallclock_nanos; - ulong prev; - ulong next; - } last_hit; -}; - -typedef struct push_set_entry push_set_entry_t; - -#define POOL_NAME pset_entry_pool -#define POOL_T push_set_entry_t -#define POOL_NEXT pool.next -#include "../../util/tmpl/fd_pool.c" - -#define DLIST_NAME pset_last_hit -#define DLIST_ELE_T push_set_entry_t -#define DLIST_PREV last_hit.prev -#define DLIST_NEXT last_hit.next - -#include "../../util/tmpl/fd_dlist.c" - -struct push_set { - push_set_entry_t * pool; - pset_last_hit_t * last_hit; -}; - -typedef struct push_set push_set_t; - -ulong -push_set_align( void ) { - return pset_entry_pool_align(); -} - -ulong -push_set_footprint( ulong ele_max ) { - ulong l; - l = FD_LAYOUT_INIT; - l = FD_LAYOUT_APPEND( l, pset_entry_pool_align(), pset_entry_pool_footprint( ele_max ) ); - l = FD_LAYOUT_APPEND( l, pset_last_hit_align(), pset_last_hit_footprint() ); - l = FD_LAYOUT_APPEND( l, alignof(push_set_t), sizeof(push_set_t) ); - l = FD_LAYOUT_FINI( l, push_set_align() ); - return l; -} - -void * -push_set_new( void * shmem, - ulong ele_max ) { - if( FD_UNLIKELY( !shmem ) ) { - FD_LOG_ERR(( "NULL shmem" )); - } - if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, push_set_align() ) ) ) { - FD_LOG_ERR(( "misaligned shmem" )); - } - - FD_SCRATCH_ALLOC_INIT( l, shmem ); - void * _pool = FD_SCRATCH_ALLOC_APPEND( l, pset_entry_pool_align(), pset_entry_pool_footprint( ele_max ) ); - void * _last_appended = FD_SCRATCH_ALLOC_APPEND( l, pset_last_hit_align(), pset_last_hit_footprint() ); - push_set_t * push_set = FD_SCRATCH_ALLOC_APPEND( l, alignof(push_set_t), sizeof(push_set_t) ); - - push_set->pool = pset_entry_pool_join( pset_entry_pool_new( _pool, ele_max ) ); - push_set->last_hit = pset_last_hit_join( pset_last_hit_new( _last_appended ) ); - - for( ulong i=0UL; ipool, i ); - entry->pool.in_use = 0U; - } - - return (void *)push_set; -} - -push_set_t * -push_set_join( void * shpool ) { - if( FD_UNLIKELY( !shpool ) ) { - FD_LOG_ERR(( "NULL shpool" )); - } - return (push_set_t *)shpool; -} - -void -push_set_pop_append( push_set_t * pset, - push_set_entry_t * state, - long now ) { - state->last_hit.wallclock_nanos = now; - pset_last_hit_ele_remove( pset->last_hit, state, pset->pool ); - pset_last_hit_ele_push_tail( pset->last_hit, state, pset->pool ); -} diff --git a/src/flamenco/gossip/test_active_set.c b/src/flamenco/gossip/test_active_set.c index f6ee7981705..0554f9d52da 100644 --- a/src/flamenco/gossip/test_active_set.c +++ b/src/flamenco/gossip/test_active_set.c @@ -1,81 +1,409 @@ #include "fd_active_set.h" #include "fd_active_set_private.h" +#include "fd_gossip_types.h" +#include "crds/fd_crds.h" + +#include "../../util/fd_util.h" + #include "test_crds_utils.c" +FD_STATIC_ASSERT( FD_ACTIVE_SET_ALIGN==128UL, unit_test ); + +/* Helper to generate a random pubkey */ +static void +generate_random_pubkey( fd_rng_t * rng, uchar pubkey[ static 32UL ] ) { + for( ulong i=0UL; i<32UL; i++ ) pubkey[ i ] = fd_rng_uchar( rng ); +} + +static ulong +generate_stake_for_bucket( ulong bucket ){ + return bucket==0UL ? 1UL : (1UL<<(bucket-1))*1000000000UL; +} + +/* Test basic initialization and join */ +void +test_basic_init( void ) { + fd_rng_t _rng[1]; + fd_rng_t * rng = fd_rng_join( fd_rng_new( _rng, 0U, 0UL ) ); + FD_TEST( rng ); + + void * bytes = aligned_alloc( fd_active_set_align(), fd_active_set_footprint() ); + FD_TEST( bytes ); + + fd_active_set_t * active_set = fd_active_set_join( fd_active_set_new( bytes, rng ) ); + FD_TEST( active_set ); + + free( bytes ); +} + +/* Test peer insertion and removal */ +void +test_peer_lifecycle( void ) { + fd_rng_t _rng[1]; + fd_rng_t * rng = fd_rng_join( fd_rng_new( _rng, 0U, 0UL ) ); + FD_TEST( rng ); + + void * bytes = aligned_alloc( fd_active_set_align(), fd_active_set_footprint() ); + FD_TEST( bytes ); + + fd_active_set_t * active_set = fd_active_set_join( fd_active_set_new( bytes, rng ) ); + FD_TEST( active_set ); + + /* Insert a peer */ + ulong test_idx = 100UL; + ulong test_stake = 1000000000UL; + fd_active_set_peer_insert( active_set, test_idx, test_stake ); + + /* Remove the peer */ + fd_active_set_push_state_t evicted_states[ FD_ACTIVE_SET_STAKE_BUCKETS ]; + ulong flush_cnt = fd_active_set_peer_remove( active_set, test_idx, evicted_states ); + + /* Should have no evicted states since peer was never rotated into active set */ + FD_TEST( flush_cnt == 0UL ); -FD_STATIC_ASSERT( FD_ACTIVE_SET_ALIGN==64UL, unit_test ); -FD_STATIC_ASSERT( FD_ACTIVE_SET_ALIGN==alignof(fd_active_set_t), unit_test ); + free( bytes ); +} +/* Test stake bucket calculation */ void -test_get_stake_bucket( void ) { - ulong buckets[] = { 0, 1, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5 }; - for( ulong i=0UL; i<16UL; i++ ) { - FD_TEST( fd_active_set_stake_bucket( i*1000000000UL )==buckets[ i ] ); +test_stake_buckets( void ) { + /* Test stake bucket boundaries */ + FD_TEST( fd_active_set_stake_bucket( 0UL ) == 0UL ); + FD_TEST( fd_active_set_stake_bucket( 999999999UL ) == 0UL ); + FD_TEST( fd_active_set_stake_bucket( 1000000000UL ) == 1UL ); + FD_TEST( fd_active_set_stake_bucket( 2000000000UL ) == 2UL ); + FD_TEST( fd_active_set_stake_bucket( 1UL << 40 ) == 11UL ); + FD_TEST( fd_active_set_stake_bucket( 1UL << 50 ) == 21UL ); + FD_TEST( fd_active_set_stake_bucket( ULONG_MAX ) == 24UL ); +} + +/* Test rotation with CRDS */ +void +test_rotation( void ) { + fd_rng_t _rng[1]; + fd_rng_t * rng = fd_rng_join( fd_rng_new( _rng, 0U, 0UL ) ); + FD_TEST( rng ); + + /* Create CRDS with test peers */ + fd_crds_t * crds = create_test_crds_with_ci( rng, 10UL ); + FD_TEST( crds ); + + void * bytes = aligned_alloc( fd_active_set_align(), fd_active_set_footprint() ); + FD_TEST( bytes ); + + fd_active_set_t * active_set = fd_active_set_join( fd_active_set_new( bytes, rng ) ); + FD_TEST( active_set ); + + long now = fd_log_wallclock(); + + /* Insert peers into active set */ + for( ulong i=0UL; i<10UL; i++ ) { + ulong stake = 1000000000UL * (i + 1UL); + fd_active_set_peer_insert( active_set, i, stake ); } - ulong stake1[] = { 4194303UL, 4194304UL, 8388607UL, 8388608UL }; - ulong buckets1[] = { 22UL, 23UL, 23UL, 24UL }; - for( ulong i=0UL; i<4UL; i++ ) { - FD_TEST( fd_active_set_stake_bucket( stake1[ i ]*1000000000UL )==buckets1[ i ] ); + /* Perform rotations */ + for( ulong i=0UL; i<100UL; i++ ) { + fd_active_set_push_state_t maybe_flush; + fd_active_set_rotate( active_set, crds, now, &maybe_flush ); + + /* If a peer was flushed, verify the state */ + if( maybe_flush.txbuild != NULL ) { + FD_TEST( maybe_flush.crds_idx<10UL ); + } } - FD_TEST( fd_active_set_stake_bucket( ULONG_MAX )==24UL ); + free_test_crds( crds ); + free( bytes ); } void -test_push_active_set( void ) { +test_single_peer( void ) { + fd_rng_t _rng[1]; + fd_rng_t * rng = fd_rng_join( fd_rng_new( _rng, 0U, 0UL ) ); + FD_TEST( rng ); + + /* Create CRDS with test peers */ + fd_crds_t * crds = create_test_crds_with_ci( rng, 1UL ); + FD_TEST( crds ); + void * bytes = aligned_alloc( fd_active_set_align(), fd_active_set_footprint() ); FD_TEST( bytes ); - fd_rng_t _rng[1]; fd_rng_t * rng = fd_rng_join( fd_rng_new( _rng, 0U, 0UL ) ); + fd_active_set_t * active_set = fd_active_set_join( fd_active_set_new( bytes, rng ) ); + FD_TEST( active_set ); + + long now = fd_log_wallclock(); + + /* Insert a single peer into active set */ + fd_active_set_peer_insert( active_set, 0UL, 1000000000UL ); + + /* Perform multiple rotations */ + for( ulong i=0UL; i<10UL; i++ ) { + fd_active_set_push_state_t maybe_flush; + fd_active_set_rotate( active_set, crds, now, &maybe_flush ); + } + uchar identity_pubkey[ 32UL ]; + uchar origin_pubkey[ 32UL ]; + generate_random_pubkey( rng, identity_pubkey ); + generate_random_pubkey( rng, origin_pubkey ); + fd_active_set_push_state_t out_push_states[ FD_ACTIVE_SET_PEERS_PER_BUCKET ]; + + /* Check across all bucket levels */ + ulong num_send = 0UL; + + for( ulong i=0UL; ientries[ i ]->nodes_len==0UL ); + long now = fd_log_wallclock(); + + /* Insert and rotate peers into active set */ + for( ulong i=0UL; i<20UL; i++ ) { + ulong stake = 1000000000UL * (i + 1UL); + fd_active_set_peer_insert( active_set, i, stake ); + } + + /* Rotate to populate buckets */ + for( ulong i=0UL; i<50UL; i++ ) { + fd_active_set_push_state_t maybe_flush; + fd_active_set_rotate( active_set, crds, now, &maybe_flush ); } - /* Test fd_active_set_rotate with the CRDS */ - fd_active_set_rotate( active_set, crds ); + /* Test getting nodes */ + uchar identity_pubkey[ 32UL ]; + uchar origin_pubkey[ 32UL ]; + generate_random_pubkey( rng, identity_pubkey ); + generate_random_pubkey( rng, origin_pubkey ); + + for( ulong i=0UL; ientries[ i ]->nodes_len<=12UL ); - for( ulong j=0UL; jentries[ i ]->nodes_len; j++ ) { - FD_TEST( fd_bloom_contains( active_set->entries[ i ]->nodes[ j ]->bloom, active_set->entries[ i ]->nodes[ j ]->pubkey, 32UL ) ); + /* Now prune the origin from one of the returned peers */ + ulong crds_idx = out_push_states[0].crds_idx; + fd_contact_info_t const * ci = fd_crds_contact_info_idx_lookup( crds, crds_idx ); + FD_TEST( ci ); + fd_active_set_prune( active_set, ci->pubkey.uc, origin_pubkey, stake, + identity_pubkey, stake ); + + ulong node_cnt2 = fd_active_set_nodes( active_set, identity_pubkey, stake, + origin_pubkey, stake, 0, now, out_push_states ); + FD_TEST( node_cnt2==node_cnt-1UL ); + for( ulong j=0UL; j