Skip to content

Commit 0d7f50c

Browse files
authored
AggregatingAttestationPool new CLI params (#9452)
* poolV2-CLI * fix arity
1 parent fdc1b8f commit 0d7f50c

File tree

4 files changed

+252
-8
lines changed

4 files changed

+252
-8
lines changed

ethereum/networks/src/main/java/tech/pegasys/teku/networks/Eth2NetworkConfiguration.java

Lines changed: 127 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,16 @@ public class Eth2NetworkConfiguration {
5757

5858
public static final boolean DEFAULT_FORK_CHOICE_LATE_BLOCK_REORG_ENABLED = false;
5959

60+
public static final boolean DEFAULT_AGGREGATING_ATTESTATION_POOL_PROFILING_ENABLED = false;
61+
public static final boolean DEFAULT_AGGREGATING_ATTESTATION_POOL_V2_ENABLED = false;
62+
public static final int
63+
DEFAULT_AGGREGATING_ATTESTATION_POOL_V2_BLOCK_AGGREGATION_TIME_LIMIT_MILLIS = 150;
64+
public static final int
65+
DEFAULT_AGGREGATING_ATTESTATION_POOL_V2_TOTAL_BLOCK_AGGREGATION_TIME_LIMIT_MILLIS = 500;
66+
public static final boolean
67+
DEFAULT_AGGREGATING_ATTESTATION_POOL_V2_EARLY_DROP_SINGLE_ATTESTATIONS_ENABLED = true;
68+
public static final boolean DEFAULT_AGGREGATING_ATTESTATION_POOL_V2_PARALLEL_ENABLED = true;
69+
6070
// should fit attestations for a slot given validator set size
6171
// so DEFAULT_MAX_QUEUE_PENDING_ATTESTATIONS * slots_per_epoch should be >= validator set size
6272
// ideally
@@ -117,6 +127,12 @@ public class Eth2NetworkConfiguration {
117127
private final boolean forkChoiceUpdatedAlwaysSendPayloadAttributes;
118128
private final int pendingAttestationsMaxQueue;
119129
private final boolean rustKzgEnabled;
130+
private final boolean aggregatingAttestationPoolV2Enabled;
131+
private final boolean aggregatingAttestationPoolProfilingEnabled;
132+
private final int aggregatingAttestationPoolV2BlockAggregationTimeLimit;
133+
private final int aggregatingAttestationPoolV2TotalBlockAggregationTimeLimit;
134+
private final boolean aggregatingAttestationPoolV2EarlyDropSingleAttestationsEnabled;
135+
private final boolean aggregatingAttestationPoolV2ParallelEnabled;
120136

121137
private Eth2NetworkConfiguration(
122138
final Spec spec,
@@ -145,7 +161,13 @@ private Eth2NetworkConfiguration(
145161
final boolean forkChoiceLateBlockReorgEnabled,
146162
final boolean forkChoiceUpdatedAlwaysSendPayloadAttributes,
147163
final int pendingAttestationsMaxQueue,
148-
final boolean rustKzgEnabled) {
164+
final boolean rustKzgEnabled,
165+
final boolean aggregatingAttestationPoolV2Enabled,
166+
final boolean aggregatingAttestationPoolProfilingEnabled,
167+
final int aggregatingAttestationPoolV2BlockAggregationTimeLimit,
168+
final int aggregatingAttestationPoolV2TotalBlockAggregationTimeLimit,
169+
final boolean aggregatingAttestationPoolV2EarlyDropSingleAttestationsEnabled,
170+
final boolean aggregatingAttestationPoolV2ParallelEnabled) {
149171
this.spec = spec;
150172
this.constants = constants;
151173
this.stateBoostrapConfig = stateBoostrapConfig;
@@ -177,6 +199,15 @@ private Eth2NetworkConfiguration(
177199
forkChoiceUpdatedAlwaysSendPayloadAttributes;
178200
this.pendingAttestationsMaxQueue = pendingAttestationsMaxQueue;
179201
this.rustKzgEnabled = rustKzgEnabled;
202+
this.aggregatingAttestationPoolV2Enabled = aggregatingAttestationPoolV2Enabled;
203+
this.aggregatingAttestationPoolProfilingEnabled = aggregatingAttestationPoolProfilingEnabled;
204+
this.aggregatingAttestationPoolV2BlockAggregationTimeLimit =
205+
aggregatingAttestationPoolV2BlockAggregationTimeLimit;
206+
this.aggregatingAttestationPoolV2TotalBlockAggregationTimeLimit =
207+
aggregatingAttestationPoolV2TotalBlockAggregationTimeLimit;
208+
this.aggregatingAttestationPoolV2EarlyDropSingleAttestationsEnabled =
209+
aggregatingAttestationPoolV2EarlyDropSingleAttestationsEnabled;
210+
this.aggregatingAttestationPoolV2ParallelEnabled = aggregatingAttestationPoolV2ParallelEnabled;
180211

181212
LOG.debug(
182213
"P2P async queue - {} threads, max queue size {} ", asyncP2pMaxThreads, asyncP2pMaxQueue);
@@ -290,6 +321,30 @@ public boolean isForkChoiceLateBlockReorgEnabled() {
290321
return forkChoiceLateBlockReorgEnabled;
291322
}
292323

324+
public boolean isAggregatingAttestationPoolV2Enabled() {
325+
return aggregatingAttestationPoolV2Enabled;
326+
}
327+
328+
public boolean isAggregatingAttestationPoolProfilingEnabled() {
329+
return aggregatingAttestationPoolProfilingEnabled;
330+
}
331+
332+
public int getAggregatingAttestationPoolV2BlockAggregationTimeLimit() {
333+
return aggregatingAttestationPoolV2BlockAggregationTimeLimit;
334+
}
335+
336+
public int getAggregatingAttestationPoolV2TotalBlockAggregationTimeLimit() {
337+
return aggregatingAttestationPoolV2TotalBlockAggregationTimeLimit;
338+
}
339+
340+
public boolean isAggregatingAttestationPoolV2EarlyDropSingleAttestationsEnabled() {
341+
return aggregatingAttestationPoolV2EarlyDropSingleAttestationsEnabled;
342+
}
343+
344+
public boolean isAggregatingAttestationPoolV2ParallelEnabled() {
345+
return aggregatingAttestationPoolV2ParallelEnabled;
346+
}
347+
293348
public int getPendingAttestationsMaxQueue() {
294349
return pendingAttestationsMaxQueue;
295350
}
@@ -323,6 +378,17 @@ public boolean equals(final Object o) {
323378
&& asyncBeaconChainMaxQueue == that.asyncBeaconChainMaxQueue
324379
&& asyncP2pMaxQueue == that.asyncP2pMaxQueue
325380
&& forkChoiceLateBlockReorgEnabled == that.forkChoiceLateBlockReorgEnabled
381+
&& aggregatingAttestationPoolV2Enabled == that.aggregatingAttestationPoolV2Enabled
382+
&& aggregatingAttestationPoolProfilingEnabled
383+
== that.aggregatingAttestationPoolProfilingEnabled
384+
&& aggregatingAttestationPoolV2BlockAggregationTimeLimit
385+
== that.aggregatingAttestationPoolV2BlockAggregationTimeLimit
386+
&& aggregatingAttestationPoolV2TotalBlockAggregationTimeLimit
387+
== that.aggregatingAttestationPoolV2TotalBlockAggregationTimeLimit
388+
&& aggregatingAttestationPoolV2EarlyDropSingleAttestationsEnabled
389+
== that.aggregatingAttestationPoolV2EarlyDropSingleAttestationsEnabled
390+
&& aggregatingAttestationPoolV2ParallelEnabled
391+
== that.aggregatingAttestationPoolV2ParallelEnabled
326392
&& forkChoiceUpdatedAlwaysSendPayloadAttributes
327393
== that.forkChoiceUpdatedAlwaysSendPayloadAttributes
328394
&& rustKzgEnabled == that.rustKzgEnabled
@@ -413,6 +479,19 @@ public static class Builder {
413479
private OptionalInt pendingAttestationsMaxQueue = OptionalInt.empty();
414480
private boolean rustKzgEnabled = DEFAULT_RUST_KZG_ENABLED;
415481
private boolean strictConfigLoadingEnabled;
482+
private boolean aggregatingAttestationPoolV2Enabled =
483+
DEFAULT_AGGREGATING_ATTESTATION_POOL_V2_ENABLED;
484+
private boolean aggregatingAttestationPoolProfilingEnabled =
485+
DEFAULT_AGGREGATING_ATTESTATION_POOL_PROFILING_ENABLED;
486+
private int aggregatingAttestationPoolV2BlockAggregationTimeLimit =
487+
DEFAULT_AGGREGATING_ATTESTATION_POOL_V2_BLOCK_AGGREGATION_TIME_LIMIT_MILLIS;
488+
private int aggregatingAttestationPoolV2TotalBlockAggregationTimeLimit =
489+
DEFAULT_AGGREGATING_ATTESTATION_POOL_V2_TOTAL_BLOCK_AGGREGATION_TIME_LIMIT_MILLIS;
490+
491+
private boolean aggregatingAttestationPoolV2EarlyDropSingleAttestationsEnabled =
492+
DEFAULT_AGGREGATING_ATTESTATION_POOL_V2_EARLY_DROP_SINGLE_ATTESTATIONS_ENABLED;
493+
private boolean aggregatingAttestationPoolV2ParallelEnabled =
494+
DEFAULT_AGGREGATING_ATTESTATION_POOL_V2_PARALLEL_ENABLED;
416495

417496
public void spec(final Spec spec) {
418497
this.spec = spec;
@@ -513,7 +592,13 @@ public Eth2NetworkConfiguration build() {
513592
forkChoiceLateBlockReorgEnabled,
514593
forkChoiceUpdatedAlwaysSendPayloadAttributes,
515594
pendingAttestationsMaxQueue.orElse(DEFAULT_MAX_QUEUE_PENDING_ATTESTATIONS),
516-
rustKzgEnabled);
595+
rustKzgEnabled,
596+
aggregatingAttestationPoolV2Enabled,
597+
aggregatingAttestationPoolProfilingEnabled,
598+
aggregatingAttestationPoolV2BlockAggregationTimeLimit,
599+
aggregatingAttestationPoolV2TotalBlockAggregationTimeLimit,
600+
aggregatingAttestationPoolV2EarlyDropSingleAttestationsEnabled,
601+
aggregatingAttestationPoolV2ParallelEnabled);
517602
}
518603

519604
private void validateCommandLineParameters() {
@@ -1049,6 +1134,46 @@ public Builder forkChoiceLateBlockReorgEnabled(final boolean forkChoiceLateBlock
10491134
return this;
10501135
}
10511136

1137+
public Builder aggregatingAttestationPoolV2Enabled(
1138+
final boolean aggregatingAttestationPoolV2Enabled) {
1139+
this.aggregatingAttestationPoolV2Enabled = aggregatingAttestationPoolV2Enabled;
1140+
return this;
1141+
}
1142+
1143+
public Builder aggregatingAttestationPoolProfilingEnabled(
1144+
final boolean aggregatingAttestationPoolProfilingEnabled) {
1145+
this.aggregatingAttestationPoolProfilingEnabled = aggregatingAttestationPoolProfilingEnabled;
1146+
return this;
1147+
}
1148+
1149+
public Builder aggregatingAttestationPoolV2BlockAggregationTimeLimit(
1150+
final int aggregatingAttestationPoolV2BlockAggregationTimeLimit) {
1151+
this.aggregatingAttestationPoolV2BlockAggregationTimeLimit =
1152+
aggregatingAttestationPoolV2BlockAggregationTimeLimit;
1153+
return this;
1154+
}
1155+
1156+
public Builder aggregatingAttestationPoolV2TotalBlockAggregationTimeLimit(
1157+
final int aggregatingAttestationPoolV2TotalBlockAggregationTimeLimit) {
1158+
this.aggregatingAttestationPoolV2TotalBlockAggregationTimeLimit =
1159+
aggregatingAttestationPoolV2TotalBlockAggregationTimeLimit;
1160+
return this;
1161+
}
1162+
1163+
public Builder aggregatingAttestationPoolV2EarlyDropSingleAttestationsEnabled(
1164+
final boolean aggregatingAttestationPoolV2EarlyDropSingleAttestationsEnabled) {
1165+
this.aggregatingAttestationPoolV2EarlyDropSingleAttestationsEnabled =
1166+
aggregatingAttestationPoolV2EarlyDropSingleAttestationsEnabled;
1167+
return this;
1168+
}
1169+
1170+
public Builder aggregatingAttestationPoolV2ParallelEnabled(
1171+
final boolean aggregatingAttestationPoolV2ParallelEnabled) {
1172+
this.aggregatingAttestationPoolV2ParallelEnabled =
1173+
aggregatingAttestationPoolV2ParallelEnabled;
1174+
return this;
1175+
}
1176+
10521177
public Builder forkChoiceUpdatedAlwaysSendPayloadAttributes(
10531178
final boolean forkChoiceUpdatedAlwaysSendPayloadAttributes) {
10541179
this.forkChoiceUpdatedAlwaysSendPayloadAttributes =

services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,10 @@
125125
import tech.pegasys.teku.statetransition.SimpleOperationPool;
126126
import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool;
127127
import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPoolV1;
128+
import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPoolV2;
128129
import tech.pegasys.teku.statetransition.attestation.AttestationManager;
129130
import tech.pegasys.teku.statetransition.attestation.utils.AggregatingAttestationPoolProfiler;
131+
import tech.pegasys.teku.statetransition.attestation.utils.AggregatingAttestationPoolProfilerCSV;
130132
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager;
131133
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager.RemoteOrigin;
132134
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManagerImpl;
@@ -305,6 +307,7 @@ public class BeaconChainController extends Service implements BeaconChainControl
305307
protected SettableLabelledGauge futureItemsMetric;
306308
protected IntSupplier rejectedExecutionCountSupplier;
307309
protected DebugDataDumper debugDataDumper;
310+
protected Path debugDataDirectory;
308311

309312
public BeaconChainController(
310313
final ServiceConfig serviceConfig, final BeaconChainConfiguration beaconConfig) {
@@ -349,6 +352,7 @@ public BeaconChainController(
349352
"Current number of items held for future slots, labelled by type",
350353
"type");
351354
this.ephemerySlotValidationService = new EphemerySlotValidationService();
355+
this.debugDataDirectory = serviceConfig.getDataDirLayout().getDebugDataDirectory();
352356
}
353357

354358
@Override
@@ -1210,13 +1214,29 @@ protected void initSlotProcessor() {
12101214

12111215
public void initAttestationPool() {
12121216
LOG.debug("BeaconChainController.initAttestationPool()");
1217+
final Eth2NetworkConfiguration eth2NetworkConfiguration = beaconConfig.eth2NetworkConfig();
1218+
1219+
final AggregatingAttestationPoolProfiler profiler =
1220+
eth2NetworkConfiguration.isAggregatingAttestationPoolProfilingEnabled()
1221+
? new AggregatingAttestationPoolProfilerCSV(debugDataDirectory)
1222+
: AggregatingAttestationPoolProfiler.NOOP;
1223+
12131224
attestationPool =
1214-
new AggregatingAttestationPoolV1(
1215-
spec,
1216-
recentChainData,
1217-
metricsSystem,
1218-
AggregatingAttestationPoolProfiler.NOOP,
1219-
DEFAULT_MAXIMUM_ATTESTATION_COUNT);
1225+
eth2NetworkConfiguration.isAggregatingAttestationPoolV2Enabled()
1226+
? new AggregatingAttestationPoolV2(
1227+
spec,
1228+
recentChainData,
1229+
metricsSystem,
1230+
DEFAULT_MAXIMUM_ATTESTATION_COUNT,
1231+
profiler,
1232+
eth2NetworkConfiguration.getAggregatingAttestationPoolV2BlockAggregationTimeLimit(),
1233+
eth2NetworkConfiguration
1234+
.getAggregatingAttestationPoolV2TotalBlockAggregationTimeLimit(),
1235+
eth2NetworkConfiguration
1236+
.isAggregatingAttestationPoolV2EarlyDropSingleAttestationsEnabled(),
1237+
eth2NetworkConfiguration.isAggregatingAttestationPoolV2ParallelEnabled())
1238+
: new AggregatingAttestationPoolV1(
1239+
spec, recentChainData, metricsSystem, profiler, DEFAULT_MAXIMUM_ATTESTATION_COUNT);
12201240
eventChannels.subscribe(SlotEventsChannel.class, attestationPool);
12211241
blockImporter.subscribeToVerifiedBlockAttestations(
12221242
attestationPool::onAttestationsIncludedInBlock);

teku/src/main/java/tech/pegasys/teku/cli/options/Eth2NetworkOptions.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,73 @@ public class Eth2NetworkOptions {
309309
arity = "0..1")
310310
private String epochsStoreBlobs;
311311

312+
@Option(
313+
names = {"--Xaggregating-attestation-pool-v2-enabled"},
314+
paramLabel = "<BOOLEAN>",
315+
description = "Enable the new aggregating attestation pool.",
316+
arity = "0..1",
317+
fallbackValue = "true",
318+
showDefaultValue = Visibility.ALWAYS,
319+
hidden = true)
320+
private boolean aggregatingAttestationPoolV2Enabled =
321+
Eth2NetworkConfiguration.DEFAULT_AGGREGATING_ATTESTATION_POOL_V2_ENABLED;
322+
323+
@Option(
324+
names = {"--Xaggregating-attestation-pool-profiling-enabled"},
325+
paramLabel = "<BOOLEAN>",
326+
description = "Enable the profiler for the aggregating attestation pool",
327+
arity = "0..1",
328+
fallbackValue = "true",
329+
showDefaultValue = Visibility.ALWAYS,
330+
hidden = true)
331+
private boolean aggregatingAttestationPoolProfilingEnabled =
332+
Eth2NetworkConfiguration.DEFAULT_AGGREGATING_ATTESTATION_POOL_PROFILING_ENABLED;
333+
334+
@Option(
335+
names = {"--Xaggregating-attestation-pool-v2-block-aggregation-time-limit"},
336+
paramLabel = "<NUMBER>",
337+
description = "Maximum time to spend packing attestations when producing a block.",
338+
arity = "1",
339+
hidden = true)
340+
private int aggregatingAttestationPoolV2BlockAggregationTimeLimit =
341+
Eth2NetworkConfiguration
342+
.DEFAULT_AGGREGATING_ATTESTATION_POOL_V2_BLOCK_AGGREGATION_TIME_LIMIT_MILLIS;
343+
344+
@Option(
345+
names = {"--Xaggregating-attestation-pool-v2-total-block-aggregation-time-limit"},
346+
paramLabel = "<NUMBER>",
347+
description =
348+
"Maximum time to spend packing and improving attestations when producing a block.",
349+
arity = "1",
350+
hidden = true)
351+
private int aggregatingAttestationPoolV2TotalBlockAggregationTimeLimit =
352+
Eth2NetworkConfiguration
353+
.DEFAULT_AGGREGATING_ATTESTATION_POOL_V2_TOTAL_BLOCK_AGGREGATION_TIME_LIMIT_MILLIS;
354+
355+
@Option(
356+
names = {"--Xaggregating-attestation-pool-v2-early-drop-single-attestations-enabled"},
357+
paramLabel = "<BOOLEAN>",
358+
description =
359+
"Discard single attestations upon receiving an attestation that contains that single attestation.",
360+
arity = "0..1",
361+
fallbackValue = "true",
362+
showDefaultValue = Visibility.ALWAYS,
363+
hidden = true)
364+
private boolean aggregatingAttestationPoolV2EarlyDropSingleAttestationsEnabled =
365+
Eth2NetworkConfiguration
366+
.DEFAULT_AGGREGATING_ATTESTATION_POOL_V2_EARLY_DROP_SINGLE_ATTESTATIONS_ENABLED;
367+
368+
@Option(
369+
names = {"--Xaggregating-attestation-pool-v2-parallel-enabled"},
370+
paramLabel = "<BOOLEAN>",
371+
description = "Enable parallel processing of aggregating attestations.",
372+
arity = "0..1",
373+
fallbackValue = "true",
374+
showDefaultValue = Visibility.ALWAYS,
375+
hidden = true)
376+
private boolean aggregatingAttestationPoolV2ParallelEnabled =
377+
Eth2NetworkConfiguration.DEFAULT_AGGREGATING_ATTESTATION_POOL_V2_PARALLEL_ENABLED;
378+
312379
public Eth2NetworkConfiguration getNetworkConfiguration() {
313380
return createEth2NetworkConfig(builder -> {});
314381
}
@@ -401,6 +468,15 @@ private void configureEth2Network(final Eth2NetworkConfiguration.Builder builder
401468
.asyncP2pMaxThreads(asyncP2pMaxThreads)
402469
.asyncBeaconChainMaxThreads(asyncBeaconChainMaxThreads)
403470
.forkChoiceLateBlockReorgEnabled(forkChoiceLateBlockReorgEnabled)
471+
.aggregatingAttestationPoolV2Enabled(aggregatingAttestationPoolV2Enabled)
472+
.aggregatingAttestationPoolProfilingEnabled(aggregatingAttestationPoolProfilingEnabled)
473+
.aggregatingAttestationPoolV2BlockAggregationTimeLimit(
474+
aggregatingAttestationPoolV2BlockAggregationTimeLimit)
475+
.aggregatingAttestationPoolV2TotalBlockAggregationTimeLimit(
476+
aggregatingAttestationPoolV2TotalBlockAggregationTimeLimit)
477+
.aggregatingAttestationPoolV2EarlyDropSingleAttestationsEnabled(
478+
aggregatingAttestationPoolV2EarlyDropSingleAttestationsEnabled)
479+
.aggregatingAttestationPoolV2ParallelEnabled(aggregatingAttestationPoolV2ParallelEnabled)
404480
.epochsStoreBlobs(epochsStoreBlobs)
405481
.forkChoiceUpdatedAlwaysSendPayloadAttributes(forkChoiceUpdatedAlwaysSendPayloadAttributes)
406482
.rustKzgEnabled(rustKzgEnabled);

teku/src/test/java/tech/pegasys/teku/cli/options/Eth2NetworkOptionsTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,29 @@ void shouldSetLateBlockImportEnabled(final String value) {
110110
.isEqualTo(Boolean.valueOf(value));
111111
}
112112

113+
@ParameterizedTest
114+
@ValueSource(strings = {"true", "false"})
115+
void shouldSetAggregatingAttestationPoolV2Enabled(final String value) {
116+
final TekuConfiguration config =
117+
getTekuConfigurationFromArguments("--Xaggregating-attestation-pool-v2-enabled", value);
118+
assertThat(config.eth2NetworkConfiguration().isAggregatingAttestationPoolV2Enabled())
119+
.isEqualTo(Boolean.valueOf(value));
120+
}
121+
122+
@Test
123+
void shouldAggregatingAttestationPoolV2EnabledDisabledByDefault() {
124+
final TekuConfiguration config = getTekuConfigurationFromArguments();
125+
assertThat(config.eth2NetworkConfiguration().isAggregatingAttestationPoolV2Enabled())
126+
.isEqualTo(false);
127+
}
128+
129+
@Test
130+
void shouldAggregatingAttestationPoolProfilerDisabledByDefault() {
131+
final TekuConfiguration config = getTekuConfigurationFromArguments();
132+
assertThat(config.eth2NetworkConfiguration().isAggregatingAttestationPoolProfilingEnabled())
133+
.isEqualTo(false);
134+
}
135+
113136
@Test
114137
void shouldUseDefaultAlwaysSendPayloadAttributesIfUnspecified() {
115138
final TekuConfiguration config = getTekuConfigurationFromArguments();

0 commit comments

Comments
 (0)