Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions op-node/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/flags"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/params"
"github.com/ethereum-optimism/optimism/op-node/version"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
Expand Down Expand Up @@ -94,6 +95,11 @@ func RollupNodeMain(ctx *cli.Context, closeApp context.CancelCauseFunc) (cliapp.
cfg.Rollup.LogDescription(log, chaincfg.L2ChainIDToNetworkDisplayName)
}

// CHANGE(thedevbirb): assess whether we're in chain replication mode at startup.
if _, ok := os.LookupEnv("BOP_REPLAY"); ok {
params.BopReplay = true
}

n, err := node.New(ctx.Context, cfg, log, VersionWithMeta, m)
if err != nil {
return nil, fmt.Errorf("unable to create the rollup node: %w", err)
Expand Down
25 changes: 19 additions & 6 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common"
"io"
gosync "sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"

"github.com/hashicorp/go-multierror"
"github.com/libp2p/go-libp2p/core/peer"

Expand All @@ -22,6 +23,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/node/safedb"
"github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/params"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/conductor"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
Expand Down Expand Up @@ -248,7 +250,7 @@ func (n *OpNode) initRegistry(ctx context.Context, cfg *Config) error {
}

// Initially fetch the current gateway + n gateways into the future
err = n.registrySource.FetchNextNGateways(ctx, 2, 3)
err = n.registrySource.FetchNextNGateways(ctx, 6, 3)
if err != nil {
return fmt.Errorf("failed to fetch initial gateways: %w", err)
}
Expand All @@ -259,7 +261,7 @@ func (n *OpNode) initRegistry(ctx context.Context, cfg *Config) error {
fetchCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

if err := n.registrySource.FetchNextNGateways(fetchCtx, 2, 3); err != nil {
if err := n.registrySource.FetchNextNGateways(fetchCtx, 6, 3); err != nil {
n.log.Warn("registry fetch error", "err", err)
}
time.Sleep(time.Second)
Expand Down Expand Up @@ -620,6 +622,10 @@ func (n *OpNode) onEvent(ev event.Event) bool {
}

func (n *OpNode) OnNewL1Head(ctx context.Context, sig eth.L1BlockRef) {
// CHANGE(thedevbirb): allow chain replication without deviation due to L1 state.
if params.BopReplay {
return
}
n.tracer.OnNewL1Head(ctx, sig)

if n.l2Driver == nil {
Expand All @@ -634,6 +640,10 @@ func (n *OpNode) OnNewL1Head(ctx context.Context, sig eth.L1BlockRef) {
}

func (n *OpNode) OnNewL1Safe(ctx context.Context, sig eth.L1BlockRef) {
// CHANGE(thedevbirb): allow chain replication without deviation due to L1 state.
if params.BopReplay {
return
}
if n.l2Driver == nil {
return
}
Expand All @@ -646,6 +656,10 @@ func (n *OpNode) OnNewL1Safe(ctx context.Context, sig eth.L1BlockRef) {
}

func (n *OpNode) OnNewL1Finalized(ctx context.Context, sig eth.L1BlockRef) {
// CHANGE(thedevbirb): allow chain replication without deviation due to L1 state.
if params.BopReplay {
return
}
if n.l2Driver == nil {
return
}
Expand Down Expand Up @@ -688,7 +702,6 @@ func (n *OpNode) PublishNewFrag(ctx context.Context, from peer.ID, frag *eth.Sig
}

func (n *OpNode) PublishSealFrag(ctx context.Context, from peer.ID, seal *eth.SignedSeal) error {

n.tracer.OnPublishSealFrag(ctx, from, seal)

// publish to p2p, if we are running p2p at all
Expand All @@ -704,7 +717,6 @@ func (n *OpNode) PublishSealFrag(ctx context.Context, from peer.ID, seal *eth.Si
}

func (n *OpNode) PublishEnv(ctx context.Context, from peer.ID, env *eth.SignedEnv) error {

n.tracer.OnPublishEnv(ctx, from, env)

// publish to p2p, if we are running p2p at all
Expand Down Expand Up @@ -782,7 +794,8 @@ func (n *OpNode) OnEnv(ctx context.Context, from peer.ID, env *eth.SignedEnv) er
}

func (n *OpNode) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error {
if p2pNode := n.getP2PNodeIfEnabled(); p2pNode != nil && p2pNode.AltSyncEnabled() {
// CHANGE(thedevbirb): for chain replication, ignoring sending p2p syncing requests which may block the event loop.
if p2pNode := n.getP2PNodeIfEnabled(); p2pNode != nil && p2pNode.AltSyncEnabled() && !params.BopReplay {
if unixTimeStale(start.Time, 12*time.Hour) {
n.log.Debug(
"ignoring request to sync L2 range, timestamp is too old for p2p",
Expand Down
34 changes: 19 additions & 15 deletions op-node/p2p/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/crypto"
"sync"
"time"

"github.com/ethereum/go-ethereum/crypto"

"github.com/golang/snappy"
lru "github.com/hashicorp/golang-lru/v2"
pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-node/params"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
opsigner "github.com/ethereum-optimism/optimism/op-service/signer"
Expand Down Expand Up @@ -49,8 +51,10 @@ const (
// Message domains, the msg id function uncompresses to keep data monomorphic,
// but invalid compressed data will need a unique different id.

var MessageDomainInvalidSnappy = [4]byte{0, 0, 0, 0}
var MessageDomainValidSnappy = [4]byte{1, 0, 0, 0}
var (
MessageDomainInvalidSnappy = [4]byte{0, 0, 0, 0}
MessageDomainValidSnappy = [4]byte{1, 0, 0, 0}
)

type GossipSetupConfigurables interface {
PeerScoringParams() *ScoringParams
Expand Down Expand Up @@ -404,7 +408,6 @@ func verifyGatewaySignature(log log.Logger, signatureBytes []byte, messageBytes
}

func BuildBlocksValidator(log log.Logger, cfg *rollup.Config, runCfg GossipRuntimeConfig, blockVersion eth.BlockVersion) pubsub.ValidatorEx {

// Seen block hashes per block height
// uint64 -> *seenBlocks
blockHeightLRU, err := lru.New[uint64, *seenBlocks](1000)
Expand Down Expand Up @@ -472,10 +475,13 @@ func BuildBlocksValidator(log log.Logger, cfg *rollup.Config, runCfg GossipRunti
// rounding down to seconds is fine here.
now := uint64(time.Now().Unix())

// [REJECT] if the `payload.timestamp` is older than 60 seconds in the past
if uint64(payload.Timestamp) < now-60 {
log.Warn("payload is too old", "timestamp", uint64(payload.Timestamp))
return pubsub.ValidationReject
// CHANGE(thedevbirb): for chain replication, allow old blocks.
if !params.BopReplay {
// [REJECT] if the `payload.timestamp` is older than 60 seconds in the past
if uint64(payload.Timestamp) < now-60 {
log.Warn("payload is too old", "timestamp", uint64(payload.Timestamp))
return pubsub.ValidationReject
}
}

// [REJECT] if the `payload.timestamp` is more than 5 seconds into the future
Expand Down Expand Up @@ -703,7 +709,7 @@ type publisher struct {
var _ GossipOut = (*publisher)(nil)

func combinePeers(allPeers ...[]peer.ID) []peer.ID {
var seen = make(map[peer.ID]bool)
seen := make(map[peer.ID]bool)
var res []peer.ID
for _, peers := range allPeers {
for _, p := range peers {
Expand Down Expand Up @@ -933,7 +939,6 @@ func newNewFragTopic(ctx context.Context, topicId string, ps *pubsub.PubSub, log
validator,
pubsub.WithValidatorTimeout(3*time.Second),
pubsub.WithValidatorConcurrency(4))

if err != nil {
return nil, fmt.Errorf("failed to register gossip topic: %w", err)
}
Expand Down Expand Up @@ -971,7 +976,6 @@ func sealFragFragTopic(ctx context.Context, topicId string, ps *pubsub.PubSub, l
validator,
pubsub.WithValidatorTimeout(3*time.Second),
pubsub.WithValidatorConcurrency(4))

if err != nil {
return nil, fmt.Errorf("failed to register gossip topic: %w", err)
}
Expand Down Expand Up @@ -1009,7 +1013,6 @@ func newEnvTopic(ctx context.Context, topicId string, ps *pubsub.PubSub, log log
validator,
pubsub.WithValidatorTimeout(3*time.Second),
pubsub.WithValidatorConcurrency(4))

if err != nil {
return nil, fmt.Errorf("failed to register gossip topic: %w", err)
}
Expand Down Expand Up @@ -1047,7 +1050,6 @@ func newBlockTopic(ctx context.Context, topicId string, ps *pubsub.PubSub, log l
validator,
pubsub.WithValidatorTimeout(3*time.Second),
pubsub.WithValidatorConcurrency(4))

if err != nil {
return nil, fmt.Errorf("failed to register gossip topic: %w", err)
}
Expand Down Expand Up @@ -1080,8 +1082,10 @@ func newBlockTopic(ctx context.Context, topicId string, ps *pubsub.PubSub, log l
}, nil
}

type TopicSubscriber func(ctx context.Context, sub *pubsub.Subscription)
type MessageHandler func(ctx context.Context, from peer.ID, msg any) error
type (
TopicSubscriber func(ctx context.Context, sub *pubsub.Subscription)
MessageHandler func(ctx context.Context, from peer.ID, msg any) error
)

func NewFragHandler(onNewFrag func(ctx context.Context, from peer.ID, msg *eth.SignedNewFrag) error) MessageHandler {
return func(ctx context.Context, from peer.ID, msg any) error {
Expand Down
6 changes: 6 additions & 0 deletions op-node/params/globals.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package params

// CHANGE(thedevbirb): A global variable set only at node startup that assess
// whether the node is running in chain replication mode, leading to some
// syncing and safety functionality to be disabled.
var BopReplay = false
6 changes: 6 additions & 0 deletions op-node/rollup/derive/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"os"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -174,6 +175,11 @@ func (dp *DerivationPipeline) Step(ctx context.Context, pendingSafeHead eth.L2Bl
}
}()

// CHANGE(thedevbirb): for chain replication we must ignore deriving the chain from L1 data.
if _, ok := os.LookupEnv("BOP_REPLAY"); ok {
return nil, io.EOF
}

// if any stages need to be reset, do that first.
if dp.resetting < len(dp.stages) {
if !dp.engineIsReset {
Expand Down
3 changes: 2 additions & 1 deletion op-node/rollup/driver/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,8 @@ func (s *SyncDeriver) SyncStep() {
return false
} else {
s.Emitter.Emit(rollup.CriticalErrorEvent{
Err: fmt.Errorf("unexpected error on SyncStep event Drain: %w", err)})
Err: fmt.Errorf("unexpected error on SyncStep event Drain: %w", err),
})
return false
}
}
Expand Down
6 changes: 4 additions & 2 deletions op-node/rollup/sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ const (
ELSyncString string = "execution-layer"
)

var Modes = []Mode{CLSync, ELSync}
var ModeStrings = []string{CLSyncString, ELSyncString}
var (
Modes = []Mode{CLSync, ELSync}
ModeStrings = []string{CLSyncString, ELSyncString}
)

func StringToMode(s string) (Mode, error) {
switch strings.ToLower(s) {
Expand Down
2 changes: 1 addition & 1 deletion op-node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
conductorRPCEndpoint := ctx.String(flags.ConductorRpcFlag.Name)
cfg := &node.Config{
L1: l1Endpoint,
Registry: registryEndpoint,
Registry: registryEndpoint,
L2: l2Endpoint,
Rollup: *rollupConfig,
Driver: *driverConfig,
Expand Down