Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/flamenco/gossip/Local.mk
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ $(call add-objs,fd_gossip fd_gossip_msg_parse fd_gossip_msg_ser fd_gossip_out fd

$(call add-hdrs,fd_bloom.h)
$(call add-hdrs,fd_gossip_types.h)
$(call add-objs,fd_bloom fd_active_set fd_ping_tracker,fd_flamenco)
$(call add-objs,fd_bloom fd_active_set fd_ping_tracker fd_gossip_wpeer_sampler,fd_flamenco)

$(call make-unit-test,test_bloom,test_bloom,fd_flamenco fd_util)
$(call run-unit-test,test_bloom)
Expand Down
172 changes: 89 additions & 83 deletions src/flamenco/gossip/crds/fd_crds.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "fd_crds.h"
#include "fd_crds_contact_info.c"
#include "../fd_gossip_types.h"
#include "../fd_gossip_wpeer_sampler.h"

#include "../../../ballet/sha256/fd_sha256.h"
#include "../../../funk/fd_funk_base.h" /* no link dependency, only using hash */
Expand Down Expand Up @@ -40,7 +41,6 @@ struct fd_crds_entry_private {
fd_crds_contact_info_entry_t * ci;
long instance_creation_wallclock_nanos;
uchar is_active;
ulong sampler_idx;

/* A list of "fresh" contact info entries is maintained, holding
entries that have been refreshed/inserted in the last 60s in
Expand Down Expand Up @@ -269,8 +269,6 @@ lookup_eq( fd_crds_key_t const * key0,

#include "../../../util/tmpl/fd_map_chain.c"

#include "fd_crds_peer_samplers.c"

struct fd_crds_purged {
uchar hash[ 32UL ];
struct {
Expand Down Expand Up @@ -340,6 +338,8 @@ typedef struct fd_crds_purged fd_crds_purged_t;

struct fd_crds_private {
fd_gossip_out_ctx_t * gossip_update;
fd_crds_ci_change_fn change_fn;
void * change_fn_ctx;

fd_sha256_t sha256[1];

Expand All @@ -366,7 +366,7 @@ struct fd_crds_private {
crds_contact_info_evict_dlist_t * evict_dlist;
} contact_info;

crds_samplers_t samplers[1];
wpeer_sampler_t * crds_sampler;

fd_crds_metrics_t metrics[1];

Expand Down Expand Up @@ -397,15 +397,18 @@ fd_crds_footprint( ulong ele_max,
l = FD_LAYOUT_APPEND( l, crds_contact_info_pool_align(), crds_contact_info_pool_footprint( CRDS_MAX_CONTACT_INFO ) );
l = FD_LAYOUT_APPEND( l, crds_contact_info_fresh_list_align(), crds_contact_info_fresh_list_footprint() );
l = FD_LAYOUT_APPEND( l, crds_contact_info_evict_dlist_align(), crds_contact_info_evict_dlist_footprint() );
l = FD_LAYOUT_APPEND( l, wpeer_sampler_align(), wpeer_sampler_footprint( CRDS_MAX_CONTACT_INFO ) );
return FD_LAYOUT_FINI( l, FD_CRDS_ALIGN );
}

void *
fd_crds_new( void * shmem,
fd_rng_t * rng,
ulong ele_max,
ulong purged_max,
fd_gossip_out_ctx_t * gossip_update_out ) {
fd_crds_new( void * shmem,
fd_rng_t * rng,
ulong ele_max,
ulong purged_max,
fd_gossip_out_ctx_t * gossip_update_out,
fd_crds_ci_change_fn change_fn,
void * change_fn_ctx ) {
if( FD_UNLIKELY( !shmem ) ) {
FD_LOG_WARNING(( "NULL shmem" ));
return NULL;
Expand Down Expand Up @@ -451,6 +454,7 @@ fd_crds_new( void * shmem,
void * _ci_pool = FD_SCRATCH_ALLOC_APPEND( l, crds_contact_info_pool_align(), crds_contact_info_pool_footprint( CRDS_MAX_CONTACT_INFO ) );
void * _ci_dlist = FD_SCRATCH_ALLOC_APPEND( l, crds_contact_info_fresh_list_align(), crds_contact_info_fresh_list_footprint() );
void * _ci_evict_dlist = FD_SCRATCH_ALLOC_APPEND( l, crds_contact_info_evict_dlist_align(), crds_contact_info_evict_dlist_footprint() );
void * _wpeer_sampler = FD_SCRATCH_ALLOC_APPEND( l, wpeer_sampler_align(), wpeer_sampler_footprint( CRDS_MAX_CONTACT_INFO ) );
FD_TEST( FD_SCRATCH_ALLOC_FINI( l, FD_CRDS_ALIGN ) == (ulong)shmem + fd_crds_footprint( ele_max, purged_max ) );

crds->pool = crds_pool_join( crds_pool_new( _pool, ele_max ) );
Expand Down Expand Up @@ -495,13 +499,17 @@ fd_crds_new( void * shmem,
crds->contact_info.evict_dlist = crds_contact_info_evict_dlist_join( crds_contact_info_evict_dlist_new( _ci_evict_dlist ) );
FD_TEST( crds->contact_info.evict_dlist );

crds->crds_sampler = wpeer_sampler_join( wpeer_sampler_new( _wpeer_sampler, CRDS_MAX_CONTACT_INFO ) );
FD_TEST( crds->crds_sampler );

FD_TEST( fd_sha256_join( fd_sha256_new( crds->sha256 ) ) );

crds_samplers_new( crds->samplers );

memset( crds->metrics, 0, sizeof(fd_crds_metrics_t) );

crds->gossip_update = gossip_update_out;
crds->change_fn = change_fn;
crds->change_fn_ctx = change_fn_ctx;
crds->has_staked_node = 0;

FD_COMPILER_MFENCE();
Expand Down Expand Up @@ -551,6 +559,10 @@ remove_contact_info( fd_crds_t * crds,
msg->contact_info_remove.idx = crds_contact_info_pool_idx( crds->contact_info.pool, ci->contact_info.ci );
fd_gossip_tx_publish_chunk( crds->gossip_update, stem, (ulong)msg->tag, FD_GOSSIP_UPDATE_SZ_CONTACT_INFO_REMOVE, now );

ulong pool_idx = crds_contact_info_pool_idx( crds->contact_info.pool, ci->contact_info.ci );
crds->change_fn( crds->change_fn_ctx, pool_idx, ci->stake, FD_CRDS_CONTACT_INFO_CHANGE_TYPE_REMOVED, stem, now );
wpeer_sampler_upd( crds->crds_sampler, 0, pool_idx );

if( FD_LIKELY( ci->stake ) ) crds->metrics->peer_staked_cnt--;
else crds->metrics->peer_unstaked_cnt--;

Expand All @@ -561,13 +573,6 @@ remove_contact_info( fd_crds_t * crds,
}
crds_contact_info_evict_dlist_ele_remove( crds->contact_info.evict_dlist, ci, crds->pool );
crds_contact_info_pool_ele_release( crds->contact_info.pool, ci->contact_info.ci );

/* FIXME: If the peer is in any active set bucket, it is NOT removed
here. If the peer is re-inserted into the CRDS table in the future,
it is added back into the bucket's sampler. This means a peer can
be sampled in a bucket (at least) twice during
fd_active_set_rotate. */
crds_samplers_rem_peer( crds->samplers, ci );
}

ulong
Expand Down Expand Up @@ -658,6 +663,24 @@ expire( fd_crds_t * crds,
}
}

/* Freshness is based on how recent a peer's contact info was last
updated.

Activity is based on whether a peer responds to a ping-pong request,
tracked by fd_ping_tracker. */

#define BASE_WEIGHT 256UL /* TODO: test this out!! */
ulong
peer_sampler_score( ulong stake,
int is_active,
int is_fresh ) {
if( FD_UNLIKELY( !is_active ) ) return 0UL;

ulong score = BASE_WEIGHT + stake;
if( FD_UNLIKELY( !is_fresh ) ) score>>=7;
return score;
}

void
unfresh( fd_crds_t * crds,
long now ) {
Expand All @@ -668,7 +691,11 @@ unfresh( fd_crds_t * crds,

head = crds_contact_info_fresh_list_ele_pop_head( crds->contact_info.fresh_dlist, crds->pool );
head->contact_info.fresh_dlist.in_list = 0;
crds_samplers_upd_peer_at_idx( crds->samplers, head, head->contact_info.sampler_idx, now );

ulong pool_idx = crds_contact_info_pool_idx( crds->contact_info.pool, head->contact_info.ci );
ulong score = peer_sampler_score( head->stake, head->contact_info.is_active, 0 );

wpeer_sampler_upd( crds->crds_sampler, score, pool_idx );
}
}

Expand Down Expand Up @@ -762,8 +789,6 @@ crds_entry_init( fd_gossip_view_crds_value_t const * view,
out_value->node_instance.token = view->node_instance->token;
} else if( FD_UNLIKELY( key->tag==FD_GOSSIP_VALUE_CONTACT_INFO ) ) {
out_value->contact_info.instance_creation_wallclock_nanos = view->ci_view->contact_info->instance_creation_wallclock_nanos;
/* Contact Info entry will be added to sampler upon successful insertion */
out_value->contact_info.sampler_idx = SAMPLE_IDX_SENTINEL;
}
}

Expand Down Expand Up @@ -1011,13 +1036,11 @@ fd_crds_insert( fd_crds_t * crds,
and is used in sampler score calculations. So we inherit the
incumbent's setting. */
candidate->contact_info.is_active = incumbent->contact_info.is_active;
if( FD_LIKELY( !is_from_me ) ) {
if( FD_UNLIKELY( candidate->stake!=incumbent->stake ) ) {
/* Perform a rescore here (expensive) */
crds_samplers_upd_peer_at_idx( crds->samplers, candidate, incumbent->contact_info.sampler_idx, now );
} else {
crds_samplers_swap_peer_at_idx( crds->samplers, candidate, incumbent->contact_info.sampler_idx );
}
if( FD_UNLIKELY( candidate->stake!=incumbent->stake && !is_from_me ) ) {
ulong score = peer_sampler_score( origin_stake, candidate->contact_info.is_active, 0 );
ulong pool_idx = crds_contact_info_pool_idx( crds->contact_info.pool, candidate->contact_info.ci );
wpeer_sampler_upd( crds->crds_sampler, score, pool_idx );
crds->change_fn( crds->change_fn_ctx, pool_idx, candidate->stake, FD_CRDS_CONTACT_INFO_CHANGE_TYPE_STAKE_CHANGED, stem, now );
}

if( FD_LIKELY( incumbent->stake ) ) crds->metrics->peer_staked_cnt--;
Expand Down Expand Up @@ -1077,15 +1100,18 @@ fd_crds_insert( fd_crds_t * crds,

crds_contact_info_evict_dlist_ele_push_tail( crds->contact_info.evict_dlist, candidate, crds->pool );

if( FD_LIKELY( !is_from_me ) ){
if( FD_LIKELY( !is_from_me ) ) {
crds_contact_info_fresh_list_ele_push_tail( crds->contact_info.fresh_dlist, candidate, crds->pool );
candidate->contact_info.fresh_dlist.in_list = 1;
} else {
candidate->contact_info.fresh_dlist.in_list = 0;
}

if( FD_UNLIKELY( !is_replacing && !is_from_me ) ) {
crds_samplers_add_peer( crds->samplers, candidate, now);
ulong pool_idx = crds_contact_info_pool_idx( crds->contact_info.pool, candidate->contact_info.ci );
ulong score = peer_sampler_score( origin_stake, candidate->contact_info.is_active, 1 );
wpeer_sampler_upd( crds->crds_sampler, score, pool_idx );
crds->change_fn( crds->change_fn_ctx, pool_idx, candidate->stake, FD_CRDS_CONTACT_INFO_CHANGE_TYPE_NEW, stem, now );
}

if( FD_LIKELY( candidate->stake ) ) crds->metrics->peer_staked_cnt++;
Expand Down Expand Up @@ -1142,6 +1168,13 @@ fd_crds_contact_info_lookup( fd_crds_t const * crds,
return peer_ci->contact_info.ci->contact_info;
}

fd_contact_info_t const *
fd_crds_contact_info_idx_lookup( fd_crds_t const * crds,
ulong idx ) {
fd_crds_contact_info_entry_t const * ci = crds_contact_info_pool_ele_const( crds->contact_info.pool, idx );
return ci->contact_info;
}

ulong
fd_crds_peer_count( fd_crds_t const * crds ){
return crds_contact_info_pool_used( crds->contact_info.pool );
Expand All @@ -1150,88 +1183,61 @@ fd_crds_peer_count( fd_crds_t const * crds ){
static inline void
set_peer_active_status( fd_crds_t * crds,
uchar const * peer_pubkey,
uchar status,
long now ) {
uchar status ) {

fd_crds_key_t key[1];
make_contact_info_key( peer_pubkey, key );

fd_crds_entry_t * peer_ci = lookup_map_ele_query( crds->lookup_map, key, NULL, crds->pool );
/* TODO: error handling? This technically should never hit */

/* In its first inactive->active transition, a peer will not have a
corresponding contact info entry since their contact infos are
dropped on ingress until marked active by ping tracker. */
if( FD_UNLIKELY( !peer_ci ) ) return;
uchar old_status = peer_ci->contact_info.is_active;

uchar old_status = peer_ci->contact_info.is_active;
peer_ci->contact_info.is_active = status;

if( FD_UNLIKELY( old_status!=status ) ) {
/* Trigger sampler update */
crds_samplers_upd_peer_at_idx( crds->samplers,
peer_ci,
peer_ci->contact_info.sampler_idx,
now );
ulong pool_idx = crds_contact_info_pool_idx( crds->contact_info.pool, peer_ci->contact_info.ci );
ulong score = peer_sampler_score( peer_ci->stake, status, !!peer_ci->contact_info.fresh_dlist.in_list );
wpeer_sampler_upd( crds->crds_sampler, score, pool_idx );
}
}
void
fd_crds_peer_active( fd_crds_t * crds,
uchar const * peer_pubkey,
long now ) {
set_peer_active_status( crds, peer_pubkey, 1 /* active */, now );
uchar const * peer_pubkey ) {
set_peer_active_status( crds, peer_pubkey, 1 /* active */ );
}

void
fd_crds_peer_inactive( fd_crds_t * crds,
uchar const * peer_pubkey,
long now ) {
set_peer_active_status( crds, peer_pubkey, 0 /* inactive */, now );
uchar const * peer_pubkey ) {
set_peer_active_status( crds, peer_pubkey, 0 /* inactive */);
}

fd_contact_info_t const *
fd_crds_peer_sample( fd_crds_t const * crds,
fd_rng_t * rng ) {
ulong idx = wpeer_sampler_sample( crds->samplers->pr_sampler,
rng,
crds->samplers->ele_cnt );
ulong idx = wpeer_sampler_sample( crds->crds_sampler, rng );
if( FD_UNLIKELY( idx==SAMPLE_IDX_SENTINEL ) ) return NULL;
return fd_crds_entry_contact_info( crds->samplers->ele[idx] );
return crds_contact_info_pool_ele( crds->contact_info.pool, idx )->contact_info;
}

fd_contact_info_t const *
fd_crds_bucket_sample_and_remove( fd_crds_t * crds,
fd_rng_t * rng,
ulong bucket ) {
ulong idx = wpeer_sampler_sample( &crds->samplers->bucket_samplers[bucket],
rng,
crds->samplers->ele_cnt );
if( FD_UNLIKELY( idx==SAMPLE_IDX_SENTINEL ) ) return NULL;
/* Disable peer to prevent future sampling until added back with
fd_crds_bucket_add */
wpeer_sampler_disable( &crds->samplers->bucket_samplers[bucket],
idx,
crds->samplers->ele_cnt );
void
fd_crds_handle_identity_change( fd_crds_t * crds,
uchar const * new_identity_pubkey ) {
fd_crds_key_t old_key[1];
make_contact_info_key( new_identity_pubkey, old_key );
fd_crds_entry_t * new_ci = lookup_map_ele_query( crds->lookup_map, old_key, NULL, crds->pool );
if( FD_UNLIKELY( !new_ci ) ) return;

return fd_crds_entry_contact_info( crds->samplers->ele[idx] );
}
ulong new_idx = crds_contact_info_pool_idx( crds->contact_info.pool, new_ci->contact_info.ci );

wpeer_sampler_upd( crds->crds_sampler, 0UL, new_idx );
crds->change_fn( crds->change_fn_ctx, new_idx, new_ci->stake, FD_CRDS_CONTACT_INFO_CHANGE_IDENTITY_CHANGED, NULL, 0L );

void
fd_crds_bucket_add( fd_crds_t * crds,
ulong bucket,
uchar const * pubkey ) {
fd_crds_key_t key[1];
make_contact_info_key( pubkey, key );
fd_crds_entry_t * peer_ci = lookup_map_ele_query( crds->lookup_map, key, NULL, crds->pool );
if( FD_UNLIKELY( !peer_ci ) ) {
FD_LOG_DEBUG(( "Sample peer not found in CRDS. Likely dropped." ));
return;
}
wpeer_sampler_t * bucket_sampler = &crds->samplers->bucket_samplers[bucket];
wpeer_sampler_enable( bucket_sampler,
peer_ci->contact_info.sampler_idx,
crds->samplers->ele_cnt );

ulong score = wpeer_sampler_bucket_score( peer_ci, bucket );
wpeer_sampler_upd( bucket_sampler,
score,
peer_ci->contact_info.sampler_idx,
crds->samplers->ele_cnt );
}

struct fd_crds_mask_iter_private {
Expand Down
Loading
Loading