Skip to content

Commit 57a301d

Browse files
authored
Merge of #8065
2 parents 521be2b + 2d749ae commit 57a301d

File tree

1 file changed

+130
-9
lines changed

1 file changed

+130
-9
lines changed

beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs

Lines changed: 130 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ const TASK_NAME: &str = "beacon_processor_reprocess_queue";
3737
const GOSSIP_BLOCKS: &str = "gossip_blocks";
3838
const RPC_BLOCKS: &str = "rpc_blocks";
3939
const ATTESTATIONS: &str = "attestations";
40+
const ATTESTATIONS_PER_ROOT: &str = "attestations_per_root";
4041
const LIGHT_CLIENT_UPDATES: &str = "lc_updates";
42+
const LIGHT_CLIENT_UPDATES_PER_PARENT_ROOT: &str = "lc_updates_per_parent_root";
4143

4244
/// Queue blocks for re-processing with an `ADDITIONAL_QUEUED_BLOCK_DELAY` after the slot starts.
4345
/// This is to account for any slight drift in the system clock.
@@ -829,10 +831,19 @@ impl<S: SlotClock> ReprocessQueue<S> {
829831
);
830832
}
831833

832-
if let Some(queued_atts) = self.awaiting_attestations_per_root.get_mut(&root)
833-
&& let Some(index) = queued_atts.iter().position(|&id| id == queued_id)
834+
if let Entry::Occupied(mut queued_atts) =
835+
self.awaiting_attestations_per_root.entry(root)
836+
&& let Some(index) =
837+
queued_atts.get().iter().position(|&id| id == queued_id)
834838
{
835-
queued_atts.swap_remove(index);
839+
let queued_atts_mut = queued_atts.get_mut();
840+
queued_atts_mut.swap_remove(index);
841+
842+
// If the vec is empty after this attestation's removal, we need to delete
843+
// the entry to prevent bloating the hashmap indefinitely.
844+
if queued_atts_mut.is_empty() {
845+
queued_atts.remove_entry();
846+
}
836847
}
837848
}
838849
}
@@ -853,13 +864,19 @@ impl<S: SlotClock> ReprocessQueue<S> {
853864
error!("Failed to send scheduled light client optimistic update");
854865
}
855866

856-
if let Some(queued_lc_updates) = self
857-
.awaiting_lc_updates_per_parent_root
858-
.get_mut(&parent_root)
859-
&& let Some(index) =
860-
queued_lc_updates.iter().position(|&id| id == queued_id)
867+
if let Entry::Occupied(mut queued_lc_updates) =
868+
self.awaiting_lc_updates_per_parent_root.entry(parent_root)
869+
&& let Some(index) = queued_lc_updates
870+
.get()
871+
.iter()
872+
.position(|&id| id == queued_id)
861873
{
862-
queued_lc_updates.swap_remove(index);
874+
let queued_lc_updates_mut = queued_lc_updates.get_mut();
875+
queued_lc_updates_mut.swap_remove(index);
876+
877+
if queued_lc_updates_mut.is_empty() {
878+
queued_lc_updates.remove_entry();
879+
}
863880
}
864881
}
865882
}
@@ -929,11 +946,21 @@ impl<S: SlotClock> ReprocessQueue<S> {
929946
&[ATTESTATIONS],
930947
self.attestations_delay_queue.len() as i64,
931948
);
949+
metrics::set_gauge_vec(
950+
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
951+
&[ATTESTATIONS_PER_ROOT],
952+
self.awaiting_attestations_per_root.len() as i64,
953+
);
932954
metrics::set_gauge_vec(
933955
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
934956
&[LIGHT_CLIENT_UPDATES],
935957
self.lc_updates_delay_queue.len() as i64,
936958
);
959+
metrics::set_gauge_vec(
960+
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
961+
&[LIGHT_CLIENT_UPDATES_PER_PARENT_ROOT],
962+
self.awaiting_lc_updates_per_parent_root.len() as i64,
963+
);
937964
}
938965

939966
fn recompute_next_backfill_batch_event(&mut self) {
@@ -979,6 +1006,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
9791006
#[cfg(test)]
9801007
mod tests {
9811008
use super::*;
1009+
use crate::BeaconProcessorConfig;
9821010
use logging::create_test_tracing_subscriber;
9831011
use slot_clock::{ManualSlotClock, TestingSlotClock};
9841012
use std::ops::Add;
@@ -1101,4 +1129,97 @@ mod tests {
11011129
Duration::from_secs(slot_duration),
11021130
)
11031131
}
1132+
1133+
fn test_queue() -> ReprocessQueue<ManualSlotClock> {
1134+
create_test_tracing_subscriber();
1135+
1136+
let config = BeaconProcessorConfig::default();
1137+
let (ready_work_tx, _) = mpsc::channel::<ReadyWork>(config.max_scheduled_work_queue_len);
1138+
let (_, reprocess_work_rx) =
1139+
mpsc::channel::<ReprocessQueueMessage>(config.max_scheduled_work_queue_len);
1140+
let slot_clock = Arc::new(testing_slot_clock(12));
1141+
1142+
ReprocessQueue::new(ready_work_tx, reprocess_work_rx, slot_clock)
1143+
}
1144+
1145+
// This is a regression test for a memory leak in `awaiting_attestations_per_root`.
1146+
// See: https://github.com/sigp/lighthouse/pull/8065
1147+
#[tokio::test]
1148+
async fn prune_awaiting_attestations_per_root() {
1149+
create_test_tracing_subscriber();
1150+
1151+
let mut queue = test_queue();
1152+
1153+
// Pause time so it only advances manually
1154+
tokio::time::pause();
1155+
1156+
let beacon_block_root = Hash256::repeat_byte(0xaf);
1157+
1158+
// Insert an attestation.
1159+
let att = ReprocessQueueMessage::UnknownBlockUnaggregate(QueuedUnaggregate {
1160+
beacon_block_root,
1161+
process_fn: Box::new(|| {}),
1162+
});
1163+
1164+
// Process the event to enter it into the delay queue.
1165+
queue.handle_message(InboundEvent::Msg(att));
1166+
1167+
// Check that it is queued.
1168+
assert_eq!(queue.awaiting_attestations_per_root.len(), 1);
1169+
assert!(
1170+
queue
1171+
.awaiting_attestations_per_root
1172+
.contains_key(&beacon_block_root)
1173+
);
1174+
1175+
// Advance time to expire the attestation.
1176+
advance_time(&queue.slot_clock, 2 * QUEUED_ATTESTATION_DELAY).await;
1177+
let ready_msg = queue.next().await.unwrap();
1178+
assert!(matches!(ready_msg, InboundEvent::ReadyAttestation(_)));
1179+
queue.handle_message(ready_msg);
1180+
1181+
// The entry for the block root should be gone.
1182+
assert!(queue.awaiting_attestations_per_root.is_empty());
1183+
}
1184+
1185+
// This is a regression test for a memory leak in `awaiting_lc_updates_per_parent_root`.
1186+
// See: https://github.com/sigp/lighthouse/pull/8065
1187+
#[tokio::test]
1188+
async fn prune_awaiting_lc_updates_per_parent_root() {
1189+
create_test_tracing_subscriber();
1190+
1191+
let mut queue = test_queue();
1192+
1193+
// Pause time so it only advances manually
1194+
tokio::time::pause();
1195+
1196+
let parent_root = Hash256::repeat_byte(0xaf);
1197+
1198+
// Insert an attestation.
1199+
let msg =
1200+
ReprocessQueueMessage::UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate {
1201+
parent_root,
1202+
process_fn: Box::new(|| {}),
1203+
});
1204+
1205+
// Process the event to enter it into the delay queue.
1206+
queue.handle_message(InboundEvent::Msg(msg));
1207+
1208+
// Check that it is queued.
1209+
assert_eq!(queue.awaiting_lc_updates_per_parent_root.len(), 1);
1210+
assert!(
1211+
queue
1212+
.awaiting_lc_updates_per_parent_root
1213+
.contains_key(&parent_root)
1214+
);
1215+
1216+
// Advance time to expire the update.
1217+
advance_time(&queue.slot_clock, 2 * QUEUED_LIGHT_CLIENT_UPDATE_DELAY).await;
1218+
let ready_msg = queue.next().await.unwrap();
1219+
assert!(matches!(ready_msg, InboundEvent::ReadyLightClientUpdate(_)));
1220+
queue.handle_message(ready_msg);
1221+
1222+
// The entry for the block root should be gone.
1223+
assert!(queue.awaiting_lc_updates_per_parent_root.is_empty());
1224+
}
11041225
}

0 commit comments

Comments
 (0)