Skip to content

Commit 354d20d

Browse files
committed
feat(preconf): add optimistic payload build fallback, retry logic, and kurtosis devnet improvements
1 parent 92b8c7f commit 354d20d

File tree

26 files changed

+468
-227
lines changed

26 files changed

+468
-227
lines changed

beacon/blockchain/finalize_block.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,29 @@ import (
3636
sdk "github.com/cosmos/cosmos-sdk/types"
3737
)
3838

39+
// sequencerFinalizeOptimisticBuild triggers an optimistic payload build from FinalizeBlock
40+
// when ProcessProposal was skipped (e.g., proposal arrived after the CometBFT propose timeout).
41+
func (s *Service) sequencerFinalizeOptimisticBuild(
42+
ctx context.Context,
43+
blk *ctypes.BeaconBlock,
44+
consensusTime math.U64,
45+
) {
46+
s.logger.Warn("ProcessProposal was skipped, triggering optimistic build from FinalizeBlock",
47+
"current_slot", blk.GetSlot().Base10(),
48+
)
49+
50+
st := s.storageBackend.StateFromContext(ctx)
51+
ephemeralState := st.Protect(ctx)
52+
53+
buildData, err := s.preFetchBuildData(ephemeralState, consensusTime)
54+
if err != nil {
55+
s.logger.Error("Failed to prepare optimistic build data from FinalizeBlock", "error", err)
56+
return
57+
}
58+
59+
s.handleOptimisticPayloadBuild(ctx, buildData)
60+
}
61+
3962
func (s *Service) FinalizeBlock(
4063
ctx sdk.Context,
4164
req *cmtabci.FinalizeBlockRequest,
@@ -80,7 +103,20 @@ func (s *Service) FinalizeBlock(
80103
}
81104

82105
// STEP 4: Post Finalizations cleanups.
83-
return valUpdates, s.PostFinalizeBlockOps(ctx, blk)
106+
if err = s.PostFinalizeBlockOps(ctx, blk); err != nil {
107+
return valUpdates, err
108+
}
109+
110+
// STEP 5: Sequencer fallback — trigger optimistic build if ProcessProposal was skipped.
111+
if s.preconfCfg.IsSequencer() && s.localBuilder.Enabled() {
112+
skipped := !s.optimisticBuildTriggered.Load()
113+
s.optimisticBuildTriggered.Store(false)
114+
if skipped {
115+
s.sequencerFinalizeOptimisticBuild(ctx, blk, math.U64(req.GetTime().Unix())) //#nosec: G115
116+
}
117+
}
118+
119+
return valUpdates, nil
84120
}
85121

86122
func (s *Service) FinalizeSidecars(

beacon/blockchain/payload.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func (s *Service) forceSyncUponFinalize(
133133
// to extract the data necessary to build the next block, whether current block is
134134
// being rejected or accepted. This is way there can be (and so should be)
135135
// a single function doing these ops. preFetchBuildData is that function.
136-
func (s *Service) preFetchBuildData(st *statedb.StateDB, currentTime math.U64, expectedProposer crypto.BLSPubkey) (
136+
func (s *Service) preFetchBuildData(st *statedb.StateDB, currentTime math.U64) (
137137
*builder.RequestPayloadData,
138138
error,
139139
) {
@@ -201,7 +201,6 @@ func (s *Service) preFetchBuildData(st *statedb.StateDB, currentTime math.U64, e
201201
FinalizedBlockHash: lph.GetParentHash(),
202202
},
203203
ParentProposerPubkey: parentProposerPubkey,
204-
ExpectedProposer: expectedProposer,
205204
}, nil
206205
}
207206

beacon/blockchain/process_proposal.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -303,35 +303,47 @@ func (s *Service) VerifyIncomingBlock(
303303

304304
// Determine if we should optimistically build a payload for the next slot.
305305
var shouldBuildNextPayload bool
306-
var expectedProposerPubkey crypto.BLSPubkey
307306
if s.localBuilder.Enabled() {
308-
if isNextBlockProposer {
307+
switch {
308+
case isNextBlockProposer && !s.preconfCfg.ShouldFetchFromSequencer():
309+
// We're the next proposer and not fetching from sequencer,
310+
// build optimistically on local EL for ourselves.
309311
shouldBuildNextPayload = true
310-
} else if s.preconfCfg != nil && s.preconfCfg.IsSequencer() {
311-
expectedProposerPubkey, err = s.getNextProposerPubkey(state, nextProposerAddress)
312+
s.logger.Info("Next proposer is this node, building optimistically",
313+
"current_block_slot", blkSlot.Base10(),
314+
"target_build_slot", (blkSlot + 1).Base10(),
315+
)
316+
case s.preconfCfg.IsSequencer():
317+
// We're the sequencer, build for whitelisted validators.
318+
// Only check whitelist if we can resolve the next proposer pubkey.
319+
expectedProposerPubkey, err := s.getNextProposerPubkey(state, nextProposerAddress)
312320
if err != nil {
313321
s.logger.Error("Failed to get next proposer pubkey", "error", err)
314322
} else {
315323
shouldBuildNextPayload = s.preconfWhitelist.IsWhitelisted(expectedProposerPubkey)
316-
if shouldBuildNextPayload {
317-
s.logger.Info("Sequencer mode: next proposer is whitelisted, triggering optimistic build")
318-
}
319324
}
325+
s.logger.Info("Sequencer mode: determined next proposer",
326+
"current_block_slot", blkSlot.Base10(),
327+
"target_build_slot", (blkSlot + 1).Base10(),
328+
"next_proposer_address", fmt.Sprintf("%x", nextProposerAddress),
329+
"expected_proposer_pubkey", expectedProposerPubkey.String(),
330+
"is_whitelisted", shouldBuildNextPayload,
331+
)
320332
}
321333
}
322334

323335
var nextBlockData *builder.RequestPayloadData
324336
if shouldBuildNextPayload {
325337
// makes sure that preFetchBuildData does not affect state
326338
ephemeralState := state.Protect(ctx)
327-
nextBlockData, err = s.preFetchBuildData(ephemeralState, blk.GetConsensusTime(), expectedProposerPubkey)
339+
nextBlockData, err = s.preFetchBuildData(ephemeralState, blk.GetConsensusTime())
328340
if err != nil {
329341
// We don't return with err if pre-fetch fails. Instead we log the issue
330342
// and still move to process the current block. Next block can always be
331343
// built right after current height is finalized.
332344
s.logger.Warn(
333345
"Failed pre fetching data for optimistic block building",
334-
"case", "block rejectiong",
346+
"case", "block rejection",
335347
"err", err,
336348
)
337349
}
@@ -365,7 +377,7 @@ func (s *Service) VerifyIncomingBlock(
365377
if shouldBuildNextPayload {
366378
// makes sure that preFetchBuildDataForSuccess does not affect state
367379
ephemeralState := state.Protect(ctx)
368-
nextBlockData, err = s.preFetchBuildData(ephemeralState, blk.GetConsensusTime(), expectedProposerPubkey)
380+
nextBlockData, err = s.preFetchBuildData(ephemeralState, blk.GetConsensusTime())
369381
if err != nil {
370382
// We don't mark the block as rejected if it is valid but pre-fetch fails.
371383
// Instead we log the issue and move to process the current block.
@@ -377,6 +389,7 @@ func (s *Service) VerifyIncomingBlock(
377389
)
378390
return valUpdates, nil
379391
}
392+
s.optimisticBuildTriggered.Store(true)
380393
go s.handleOptimisticPayloadBuild(ctx, nextBlockData)
381394
}
382395

beacon/blockchain/service.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ type Service struct {
8080
// - Validator: checks if self is whitelisted to fetch payload from sequencer
8181
// Can be nil if preconf is disabled.
8282
preconfWhitelist preconf.Whitelist
83+
84+
// optimisticBuildTriggered tracks whether ProcessProposal triggered an
85+
// optimistic payload build for the next slot. FinalizeBlock checks this
86+
// flag: if ProcessProposal was skipped (e.g., late proposal), the flag
87+
// remains false and FinalizeBlock triggers the build as a fallback.
88+
optimisticBuildTriggered atomic.Bool
8389
}
8490

8591
// NewService creates a new validator service.

beacon/preconf/client.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
ctypes "github.com/berachain/beacon-kit/consensus-types/types"
3434
"github.com/berachain/beacon-kit/errors"
3535
"github.com/berachain/beacon-kit/log"
36+
"github.com/berachain/beacon-kit/primitives/common"
3637
"github.com/berachain/beacon-kit/primitives/math"
3738
"github.com/berachain/beacon-kit/primitives/net/jwt"
3839
)
@@ -82,13 +83,14 @@ func NewClient(
8283
}
8384
}
8485

85-
// GetPayloadBySlot fetches a payload from the sequencer for the given slot.
86+
// GetPayloadBySlot fetches a payload from the sequencer for the given slot and parent block root.
8687
func (c *Client) GetPayloadBySlot(
8788
ctx context.Context,
8889
slot math.Slot,
90+
parentBlockRoot common.Root,
8991
) (ctypes.BuiltExecutionPayloadEnv, error) {
9092
// Build request
91-
reqBody := GetPayloadRequest{Slot: slot}
93+
reqBody := GetPayloadRequest{Slot: slot, ParentBlockRoot: parentBlockRoot}
9294
reqJSON, err := json.Marshal(reqBody)
9395
if err != nil {
9496
return nil, fmt.Errorf("failed to marshal request: %w", err)
@@ -116,8 +118,7 @@ func (c *Client) GetPayloadBySlot(
116118

117119
resp, err := c.httpClient.Do(req)
118120
if err != nil {
119-
c.logger.Warn("Failed to contact sequencer", "error", err)
120-
return nil, errors.Wrapf(ErrSequencerUnavailable, "request failed: %w", err)
121+
return nil, errors.Wrapf(ErrSequencerUnavailable, "request failed: %v", err)
121122
}
122123
if resp == nil || resp.Body == nil {
123124
return nil, errors.New("received nil response from sequencer")
@@ -148,9 +149,16 @@ func (c *Client) GetPayloadBySlot(
148149
return nil, fmt.Errorf("failed to parse response: %w", err)
149150
}
150151

152+
var blockHash interface{}
153+
if payloadResp.ExecutionPayload != nil {
154+
blockHash = payloadResp.ExecutionPayload.GetBlockHash()
155+
} else {
156+
blockHash = "nil"
157+
}
158+
151159
c.logger.Info("Successfully fetched payload from sequencer",
152160
"slot", slot,
153-
"block_hash", payloadResp.ExecutionPayload.GetBlockHash(),
161+
"block_hash", blockHash,
154162
)
155163

156164
return payloadResp.ToExecutionPayloadEnvelope(), nil

beacon/preconf/config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ const (
2626
// DefaultAPIPort is the default port for the preconf API server.
2727
DefaultAPIPort = 9090
2828

29-
// DefaultFetchTimeout is the default timeout for fetching payloads from sequencer.
29+
// DefaultFetchTimeout is the default timeout for the HTTP client when fetching payloads from sequencer.
3030
DefaultFetchTimeout = 500 * time.Millisecond
3131
)
3232

@@ -80,10 +80,10 @@ func DefaultConfig() Config {
8080

8181
// IsSequencer returns true if this node is configured as the sequencer.
8282
func (c *Config) IsSequencer() bool {
83-
return c.Enabled && c.SequencerMode
83+
return c != nil && c.Enabled && c.SequencerMode
8484
}
8585

8686
// ShouldFetchFromSequencer returns true if this node should fetch payloads from sequencer.
8787
func (c *Config) ShouldFetchFromSequencer() bool {
88-
return c.Enabled && c.SequencerURL != ""
88+
return c != nil && c.Enabled && c.SequencerURL != ""
8989
}

beacon/preconf/server.go

Lines changed: 26 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
ctypes "github.com/berachain/beacon-kit/consensus-types/types"
3333
"github.com/berachain/beacon-kit/errors"
3434
"github.com/berachain/beacon-kit/log"
35+
"github.com/berachain/beacon-kit/primitives/common"
3536
"github.com/berachain/beacon-kit/primitives/crypto"
3637
"github.com/berachain/beacon-kit/primitives/math"
3738
"github.com/berachain/beacon-kit/primitives/net/jwt"
@@ -40,7 +41,7 @@ import (
4041

4142
const (
4243
// jwtValidityWindow is the time window for JWT validity (iat claim).
43-
jwtValidityWindow = 60 * time.Second
44+
jwtValidityWindow = 5 * time.Minute
4445

4546
// serverShutdownTimeout is the timeout for graceful server shutdown.
4647
serverShutdownTimeout = 5 * time.Second
@@ -52,12 +53,10 @@ const (
5253
authHeaderParts = 2
5354
)
5455

55-
// PayloadProvider is an interface for retrieving payloads by slot.
56+
// PayloadProvider is an interface for retrieving payloads by slot and parent block root.
5657
type PayloadProvider interface {
57-
// GetPayloadBySlot returns the payload for the given slot if available.
58-
GetPayloadBySlot(ctx context.Context, slot math.Slot) (ctypes.BuiltExecutionPayloadEnv, error)
59-
// GetExpectedProposer returns the expected proposer for the given slot.
60-
GetExpectedProposer(slot math.Slot) (crypto.BLSPubkey, bool)
58+
// GetPayloadBySlot returns the payload for the given slot and parent block root if available.
59+
GetPayloadBySlot(ctx context.Context, slot math.Slot, parentBlockRoot common.Root) (ctypes.BuiltExecutionPayloadEnv, error)
6160
}
6261

6362
// Server is the preconf API server that serves GetPayload requests from validators.
@@ -110,7 +109,12 @@ func (s *Server) Start(_ context.Context) error {
110109
s.httpServer = server
111110
s.mu.Unlock()
112111

113-
s.logger.Info("Starting preconf API server", "address", addr)
112+
s.logger.Info("Starting preconf API server", "address", addr, "num_validator_jwts", len(s.validatorJWTs))
113+
114+
// Log the registered validator pubkeys for debugging
115+
for pubkey := range s.validatorJWTs {
116+
s.logger.Info("Registered validator JWT", "pubkey", pubkey.String())
117+
}
114118

115119
go func() {
116120
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
@@ -169,45 +173,33 @@ func (s *Server) handleGetPayload(w http.ResponseWriter, r *http.Request) {
169173
return
170174
}
171175

172-
// Validate that the requesting validator is the expected proposer for this slot
173-
expectedProposer, found := s.payloadProvider.GetExpectedProposer(req.Slot)
174-
if !found {
175-
s.logger.Warn("No expected proposer found for slot", "slot", req.Slot)
176-
s.writeError(w, http.StatusNotFound, "no payload building in progress for slot")
177-
return
178-
}
179-
if expectedProposer != pubkey {
180-
s.logger.Warn("Validator is not the expected proposer",
181-
"slot", req.Slot,
182-
"expected", expectedProposer.String()[:16]+"...",
183-
"actual", pubkey.String()[:16]+"...",
184-
)
185-
s.writeError(w, http.StatusForbidden, "validator is not the expected proposer for this slot")
186-
return
187-
}
176+
s.logger.Info("Preconf server received payload request",
177+
"slot", req.Slot,
178+
"validator_pubkey", pubkey.String(),
179+
)
188180

189181
// Get the payload from provider
190182
ctx := r.Context()
191-
envelope, err := s.payloadProvider.GetPayloadBySlot(ctx, req.Slot)
183+
startTime := time.Now()
184+
envelope, err := s.payloadProvider.GetPayloadBySlot(ctx, req.Slot, req.ParentBlockRoot)
185+
elapsed := time.Since(startTime)
192186
if err != nil {
193-
s.logger.Warn("Failed to get payload", "slot", req.Slot, "error", err)
187+
s.logger.Warn("Failed to get payload",
188+
"slot", req.Slot,
189+
"error", err,
190+
"elapsed", elapsed,
191+
)
194192
s.writeError(w, http.StatusNotFound, "payload not available: "+err.Error())
195193
return
196194
}
197195

198-
// Convert to response
199-
resp := NewGetPayloadResponseFromEnvelope(envelope)
200-
201-
s.logger.Info("Serving payload to validator",
202-
"slot", req.Slot,
203-
"validator", pubkey.String()[:16]+"...",
204-
)
205-
206196
// Write response
207197
w.Header().Set("Content-Type", "application/json")
208-
if err = json.NewEncoder(w).Encode(resp); err != nil {
198+
if err = json.NewEncoder(w).Encode(NewGetPayloadResponseFromEnvelope(envelope)); err != nil {
209199
s.logger.Error("Failed to encode response", "error", err)
210200
}
201+
202+
s.logger.Info("GetPayloadBySlot completed", "slot", req.Slot, "elapsed", elapsed)
211203
}
212204

213205
// validateJWT validates the JWT token from the Authorization header and returns

0 commit comments

Comments
 (0)