@@ -37,7 +37,9 @@ const TASK_NAME: &str = "beacon_processor_reprocess_queue";
3737const GOSSIP_BLOCKS : & str = "gossip_blocks" ;
3838const RPC_BLOCKS : & str = "rpc_blocks" ;
3939const ATTESTATIONS : & str = "attestations" ;
40+ const ATTESTATIONS_PER_ROOT : & str = "attestations_per_root" ;
4041const LIGHT_CLIENT_UPDATES : & str = "lc_updates" ;
42+ const LIGHT_CLIENT_UPDATES_PER_PARENT_ROOT : & str = "lc_updates_per_parent_root" ;
4143
4244/// Queue blocks for re-processing with an `ADDITIONAL_QUEUED_BLOCK_DELAY` after the slot starts.
4345/// This is to account for any slight drift in the system clock.
@@ -829,10 +831,19 @@ impl<S: SlotClock> ReprocessQueue<S> {
829831 ) ;
830832 }
831833
832- if let Some ( queued_atts) = self . awaiting_attestations_per_root . get_mut ( & root)
833- && let Some ( index) = queued_atts. iter ( ) . position ( |& id| id == queued_id)
834+ if let Entry :: Occupied ( mut queued_atts) =
835+ self . awaiting_attestations_per_root . entry ( root)
836+ && let Some ( index) =
837+ queued_atts. get ( ) . iter ( ) . position ( |& id| id == queued_id)
834838 {
835- queued_atts. swap_remove ( index) ;
839+ let queued_atts_mut = queued_atts. get_mut ( ) ;
840+ queued_atts_mut. swap_remove ( index) ;
841+
842+ // If the vec is empty after this attestation's removal, we need to delete
843+ // the entry to prevent bloating the hashmap indefinitely.
844+ if queued_atts_mut. is_empty ( ) {
845+ queued_atts. remove_entry ( ) ;
846+ }
836847 }
837848 }
838849 }
@@ -853,13 +864,19 @@ impl<S: SlotClock> ReprocessQueue<S> {
853864 error ! ( "Failed to send scheduled light client optimistic update" ) ;
854865 }
855866
856- if let Some ( queued_lc_updates) = self
857- . awaiting_lc_updates_per_parent_root
858- . get_mut ( & parent_root)
859- && let Some ( index) =
860- queued_lc_updates. iter ( ) . position ( |& id| id == queued_id)
867+ if let Entry :: Occupied ( mut queued_lc_updates) =
868+ self . awaiting_lc_updates_per_parent_root . entry ( parent_root)
869+ && let Some ( index) = queued_lc_updates
870+ . get ( )
871+ . iter ( )
872+ . position ( |& id| id == queued_id)
861873 {
862- queued_lc_updates. swap_remove ( index) ;
874+ let queued_lc_updates_mut = queued_lc_updates. get_mut ( ) ;
875+ queued_lc_updates_mut. swap_remove ( index) ;
876+
877+ if queued_lc_updates_mut. is_empty ( ) {
878+ queued_lc_updates. remove_entry ( ) ;
879+ }
863880 }
864881 }
865882 }
@@ -929,11 +946,21 @@ impl<S: SlotClock> ReprocessQueue<S> {
929946 & [ ATTESTATIONS ] ,
930947 self . attestations_delay_queue . len ( ) as i64 ,
931948 ) ;
949+ metrics:: set_gauge_vec (
950+ & metrics:: BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL ,
951+ & [ ATTESTATIONS_PER_ROOT ] ,
952+ self . awaiting_attestations_per_root . len ( ) as i64 ,
953+ ) ;
932954 metrics:: set_gauge_vec (
933955 & metrics:: BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL ,
934956 & [ LIGHT_CLIENT_UPDATES ] ,
935957 self . lc_updates_delay_queue . len ( ) as i64 ,
936958 ) ;
959+ metrics:: set_gauge_vec (
960+ & metrics:: BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL ,
961+ & [ LIGHT_CLIENT_UPDATES_PER_PARENT_ROOT ] ,
962+ self . awaiting_lc_updates_per_parent_root . len ( ) as i64 ,
963+ ) ;
937964 }
938965
939966 fn recompute_next_backfill_batch_event ( & mut self ) {
@@ -979,6 +1006,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
9791006#[ cfg( test) ]
9801007mod tests {
9811008 use super :: * ;
1009+ use crate :: BeaconProcessorConfig ;
9821010 use logging:: create_test_tracing_subscriber;
9831011 use slot_clock:: { ManualSlotClock , TestingSlotClock } ;
9841012 use std:: ops:: Add ;
@@ -1101,4 +1129,97 @@ mod tests {
11011129 Duration :: from_secs ( slot_duration) ,
11021130 )
11031131 }
1132+
1133+ fn test_queue ( ) -> ReprocessQueue < ManualSlotClock > {
1134+ create_test_tracing_subscriber ( ) ;
1135+
1136+ let config = BeaconProcessorConfig :: default ( ) ;
1137+ let ( ready_work_tx, _) = mpsc:: channel :: < ReadyWork > ( config. max_scheduled_work_queue_len ) ;
1138+ let ( _, reprocess_work_rx) =
1139+ mpsc:: channel :: < ReprocessQueueMessage > ( config. max_scheduled_work_queue_len ) ;
1140+ let slot_clock = Arc :: new ( testing_slot_clock ( 12 ) ) ;
1141+
1142+ ReprocessQueue :: new ( ready_work_tx, reprocess_work_rx, slot_clock)
1143+ }
1144+
1145+ // This is a regression test for a memory leak in `awaiting_attestations_per_root`.
1146+ // See: https://github.com/sigp/lighthouse/pull/8065
1147+ #[ tokio:: test]
1148+ async fn prune_awaiting_attestations_per_root ( ) {
1149+ create_test_tracing_subscriber ( ) ;
1150+
1151+ let mut queue = test_queue ( ) ;
1152+
1153+ // Pause time so it only advances manually
1154+ tokio:: time:: pause ( ) ;
1155+
1156+ let beacon_block_root = Hash256 :: repeat_byte ( 0xaf ) ;
1157+
1158+ // Insert an attestation.
1159+ let att = ReprocessQueueMessage :: UnknownBlockUnaggregate ( QueuedUnaggregate {
1160+ beacon_block_root,
1161+ process_fn : Box :: new ( || { } ) ,
1162+ } ) ;
1163+
1164+ // Process the event to enter it into the delay queue.
1165+ queue. handle_message ( InboundEvent :: Msg ( att) ) ;
1166+
1167+ // Check that it is queued.
1168+ assert_eq ! ( queue. awaiting_attestations_per_root. len( ) , 1 ) ;
1169+ assert ! (
1170+ queue
1171+ . awaiting_attestations_per_root
1172+ . contains_key( & beacon_block_root)
1173+ ) ;
1174+
1175+ // Advance time to expire the attestation.
1176+ advance_time ( & queue. slot_clock , 2 * QUEUED_ATTESTATION_DELAY ) . await ;
1177+ let ready_msg = queue. next ( ) . await . unwrap ( ) ;
1178+ assert ! ( matches!( ready_msg, InboundEvent :: ReadyAttestation ( _) ) ) ;
1179+ queue. handle_message ( ready_msg) ;
1180+
1181+ // The entry for the block root should be gone.
1182+ assert ! ( queue. awaiting_attestations_per_root. is_empty( ) ) ;
1183+ }
1184+
1185+ // This is a regression test for a memory leak in `awaiting_lc_updates_per_parent_root`.
1186+ // See: https://github.com/sigp/lighthouse/pull/8065
1187+ #[ tokio:: test]
1188+ async fn prune_awaiting_lc_updates_per_parent_root ( ) {
1189+ create_test_tracing_subscriber ( ) ;
1190+
1191+ let mut queue = test_queue ( ) ;
1192+
1193+ // Pause time so it only advances manually
1194+ tokio:: time:: pause ( ) ;
1195+
1196+ let parent_root = Hash256 :: repeat_byte ( 0xaf ) ;
1197+
1198+ // Insert an attestation.
1199+ let msg =
1200+ ReprocessQueueMessage :: UnknownLightClientOptimisticUpdate ( QueuedLightClientUpdate {
1201+ parent_root,
1202+ process_fn : Box :: new ( || { } ) ,
1203+ } ) ;
1204+
1205+ // Process the event to enter it into the delay queue.
1206+ queue. handle_message ( InboundEvent :: Msg ( msg) ) ;
1207+
1208+ // Check that it is queued.
1209+ assert_eq ! ( queue. awaiting_lc_updates_per_parent_root. len( ) , 1 ) ;
1210+ assert ! (
1211+ queue
1212+ . awaiting_lc_updates_per_parent_root
1213+ . contains_key( & parent_root)
1214+ ) ;
1215+
1216+ // Advance time to expire the update.
1217+ advance_time ( & queue. slot_clock , 2 * QUEUED_LIGHT_CLIENT_UPDATE_DELAY ) . await ;
1218+ let ready_msg = queue. next ( ) . await . unwrap ( ) ;
1219+ assert ! ( matches!( ready_msg, InboundEvent :: ReadyLightClientUpdate ( _) ) ) ;
1220+ queue. handle_message ( ready_msg) ;
1221+
1222+ // The entry for the block root should be gone.
1223+ assert ! ( queue. awaiting_lc_updates_per_parent_root. is_empty( ) ) ;
1224+ }
11041225}
0 commit comments