Skip to content

Commit b010a11

Browse files
committed
Add peer rate limiting and reputation tracking to blob reactor
1 parent 4536507 commit b010a11

File tree

7 files changed

+656
-33
lines changed

7 files changed

+656
-33
lines changed

config/template/template.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,16 @@ logging = "{{ .BeaconKit.NodeAPI.Logging }}"
113113
[beacon-kit.blob-reactor]
114114
# Request timeout for blob requests from peers
115115
request-timeout = "{{ .BeaconKit.BlobReactor.RequestTimeout }}"
116+
117+
# Rate limiting for incoming blob requests per peer (messages per second)
118+
max-messages-per-peer-per-second = {{ .BeaconKit.BlobReactor.MaxMessagesPerPeerPerSecond }}
119+
120+
# Maximum burst size allowed for per-peer rate limiting
121+
max-message-burst = {{ .BeaconKit.BlobReactor.MaxMessageBurst }}
122+
123+
# Global rate limiting for incoming blob requests from all peers (requests per second)
124+
max-global-requests-per-second = {{ .BeaconKit.BlobReactor.MaxGlobalRequestsPerSecond }}
125+
126+
# Maximum burst size allowed for global rate limiting across all peers
127+
max-global-request-burst = {{ .BeaconKit.BlobReactor.MaxGlobalRequestBurst }}
116128
`

da/blobreactor/config.go

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,60 @@ package blobreactor
2222

2323
import "time"
2424

25-
const defaultRequestTimeout = 5 * time.Second
25+
const (
26+
// defaultRequestTimeout is the maximum time to wait for a blob response from a peer
27+
defaultRequestTimeout = 5 * time.Second
28+
// defaultMaxMessagesPerPeerPerSecond limits the rate of incoming blob requests per peer
29+
defaultMaxMessagesPerPeerPerSecond = 10.0 / 60.0
30+
// defaultMaxMessageBurst is the maximum burst size allowed for per-peer rate limiting
31+
defaultMaxMessageBurst = 10
32+
// defaultMaxGlobalRequestsPerSecond limits the total rate of incoming blob requests from all peers
33+
defaultMaxGlobalRequestsPerSecond = 30.0 / 60.0
34+
// defaultMaxGlobalRequestBurst is the maximum burst size allowed for global rate limiting across all peers
35+
defaultMaxGlobalRequestBurst = 100
36+
)
2637

27-
// Config is the configuration for the blob reactor
2838
type Config struct {
29-
// RequestTimeout is the timeout for blob requests
30-
RequestTimeout time.Duration `mapstructure:"request-timeout"`
39+
RequestTimeout time.Duration `mapstructure:"request-timeout"`
40+
MaxMessagesPerPeerPerSecond float64 `mapstructure:"max-messages-per-peer-per-second"`
41+
MaxMessageBurst int `mapstructure:"max-message-burst"`
42+
MaxGlobalRequestsPerSecond float64 `mapstructure:"max-global-requests-per-second"`
43+
MaxGlobalRequestBurst int `mapstructure:"max-global-request-burst"`
44+
Reputation ReputationConfig `mapstructure:"reputation"`
3145
}
3246

33-
// DefaultConfig returns the default configuration
3447
func DefaultConfig() Config {
3548
return Config{
36-
RequestTimeout: defaultRequestTimeout,
49+
RequestTimeout: defaultRequestTimeout,
50+
MaxMessagesPerPeerPerSecond: defaultMaxMessagesPerPeerPerSecond,
51+
MaxMessageBurst: defaultMaxMessageBurst,
52+
MaxGlobalRequestsPerSecond: defaultMaxGlobalRequestsPerSecond,
53+
MaxGlobalRequestBurst: defaultMaxGlobalRequestBurst,
54+
Reputation: DefaultReputationConfig(),
3755
}
3856
}
57+
58+
// WithDefaults returns a new Config with zero values replaced by defaults
59+
func (c Config) WithDefaults() Config {
60+
defaults := DefaultConfig()
61+
62+
if c.RequestTimeout == 0 {
63+
c.RequestTimeout = defaults.RequestTimeout
64+
}
65+
if c.MaxMessagesPerPeerPerSecond == 0 {
66+
c.MaxMessagesPerPeerPerSecond = defaults.MaxMessagesPerPeerPerSecond
67+
}
68+
if c.MaxMessageBurst == 0 {
69+
c.MaxMessageBurst = defaults.MaxMessageBurst
70+
}
71+
if c.MaxGlobalRequestsPerSecond == 0 {
72+
c.MaxGlobalRequestsPerSecond = defaults.MaxGlobalRequestsPerSecond
73+
}
74+
if c.MaxGlobalRequestBurst == 0 {
75+
c.MaxGlobalRequestBurst = defaults.MaxGlobalRequestBurst
76+
}
77+
// Apply defaults to reputation config
78+
c.Reputation = c.Reputation.WithDefaults()
79+
80+
return c
81+
}

da/blobreactor/reactor.go

Lines changed: 132 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,24 +37,30 @@ import (
3737
"github.com/berachain/beacon-kit/primitives/math"
3838
"github.com/cometbft/cometbft/libs/service"
3939
"github.com/cometbft/cometbft/p2p"
40+
"golang.org/x/time/rate"
4041
)
4142

4243
const (
4344
// BlobChannel is our custom channel ID for blob requests/responses
4445
BlobChannel = byte(0x70)
45-
4646
// ReactorName is the registered name for the blob reactor in CometBFT's switch
4747
ReactorName = "BLOBREACTOR"
48-
49-
defaultSleepDuration = 100 * time.Millisecond
50-
defaultPriority = 5
51-
defaultSendQueueCapacity = 100
52-
defaultRecvBufferCapacity = 1024 * 1024
48+
// defaultPriority sets the channel priority for blob messages in CometBFT's message scheduler
49+
defaultPriority = 5
50+
// defaultSendQueueCapacity is the maximum number of outgoing messages queued per peer
51+
defaultSendQueueCapacity = 10
52+
// defaultRecvBufferCapacity is the size of the receive buffer for incoming blob messages
53+
defaultRecvBufferCapacity = 10 * 1024 * 1024
54+
// defaultRecvMessageCapacity is the maximum size of a single blob message we'll accept
5355
defaultRecvMessageCapacity = 1024 * 1024
54-
56+
// defaultMaxRequestWorkers limits concurrent goroutines handling blob requests/responses
5557
defaultMaxRequestWorkers = 10
56-
58+
// maxBlobsPerBlock is the protocol limit for blob sidecars per block
5759
maxBlobsPerBlock = 6
60+
// rateLimiterCleanupInterval determines how often to check for stale peer rate limiters
61+
rateLimiterCleanupInterval = 30 * time.Minute
62+
// staleLimiterTimeout determines how long to keep inactive peer rate limiters before deleting them
63+
staleLimiterTimeout = 24 * time.Hour
5864
)
5965

6066
// blobRequestError wraps an error with a status for metrics tracking.
@@ -75,8 +81,23 @@ func newBlobRequestError(err error, status string) error {
7581
return &blobRequestError{err: err, status: status}
7682
}
7783

78-
// BlobReactor handles P2P blob distribution for BeaconKit.
79-
// It implements the CometBFT Reactor interface.
84+
type peerRateLimiter struct {
85+
limiter *rate.Limiter
86+
lastSeen time.Time
87+
}
88+
89+
// BlobReactor manages p2p blob distribution across the network. When a node is
90+
// missing blobs for a slot, it sends requests to connected peers and waits for
91+
// responses containing the serialized blob sidecars.
92+
//
93+
// The reactor implements defense mechanisms including per-peer and global rate
94+
// limiting of incoming requests, and reputation scoring that temporarily bans
95+
// peers exhibiting bad behavior (invalid messages, unsolicited responses, etc).
96+
//
97+
// All request/response handling is concurrent with worker pool limits to prevent
98+
// resource exhaustion.
99+
//
100+
// Integrates with CometBFT's P2P layer via the Reactor interface.
80101
type BlobReactor struct {
81102
service.BaseService
82103
sw *p2p.Switch
@@ -105,6 +126,16 @@ type BlobReactor struct {
105126

106127
// Shutdown flag to prevent new workers during stop
107128
stopped atomic.Bool // set to true when OnStop begins
129+
130+
// We maintain per-peer rate limiters
131+
peerLimiters map[p2p.ID]*peerRateLimiter
132+
peerLimitersMu sync.RWMutex
133+
134+
// A global rate limiter for all incoming requests
135+
globalLimiter *rate.Limiter
136+
137+
// Reputation manager for tracking peer behavior
138+
reputationMgr *ReputationManager
108139
}
109140

110141
// NewBlobReactor creates a new blob reactor with storage backend
@@ -117,6 +148,9 @@ func NewBlobReactor(blobStore BlobStore, logger log.Logger, cfg Config, sink Tel
117148
metrics: newBlobReactorMetrics(sink),
118149
responseChans: make(map[uint64]chan *BlobResponse),
119150
requestWorkers: make(chan struct{}, defaultMaxRequestWorkers),
151+
peerLimiters: make(map[p2p.ID]*peerRateLimiter),
152+
globalLimiter: rate.NewLimiter(rateOrInf(cfg.MaxGlobalRequestsPerSecond), cfg.MaxGlobalRequestBurst),
153+
reputationMgr: NewReputationManager(logger, cfg.Reputation.WithDefaults()),
120154
}
121155
br.BaseService = *service.NewBaseService(nil, ReactorName, br)
122156
return br
@@ -145,12 +179,16 @@ func (br *BlobReactor) GetChannels() []*p2p.ChannelDescriptor {
145179
// InitPeer is called by the switch before the peer is started. Use it to
146180
// initialize data for the peer (e.g. peer state).
147181
func (br *BlobReactor) InitPeer(peer p2p.Peer) p2p.Peer {
148-
br.AddPeer(peer)
149182
return peer
150183
}
151184

152185
// AddPeer is called by the switch after the peer is added and successfully started.
153186
func (br *BlobReactor) AddPeer(peer p2p.Peer) {
187+
if !br.reputationMgr.ShouldAcceptPeer(peer.ID()) {
188+
br.logger.Warn("Rejecting peer due to low reputation", "peer", peer.ID())
189+
return
190+
}
191+
154192
br.stateMu.Lock()
155193
br.peers[peer.ID()] = struct{}{}
156194
br.stateMu.Unlock()
@@ -167,6 +205,49 @@ func (br *BlobReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
167205
br.logger.Info("Removed peer", "peer", peer.ID(), "reason", reason)
168206
}
169207

208+
func (br *BlobReactor) checkPeerRateLimit(peerID p2p.ID) bool {
209+
br.peerLimitersMu.Lock()
210+
defer br.peerLimitersMu.Unlock()
211+
212+
limiter, exists := br.peerLimiters[peerID]
213+
if exists {
214+
limiter.lastSeen = time.Now()
215+
return limiter.limiter.Allow()
216+
}
217+
218+
limiter = &peerRateLimiter{
219+
limiter: rate.NewLimiter(rateOrInf(br.config.MaxMessagesPerPeerPerSecond), br.config.MaxMessageBurst),
220+
lastSeen: time.Now(),
221+
}
222+
br.peerLimiters[peerID] = limiter
223+
224+
return limiter.limiter.Allow()
225+
}
226+
227+
func (br *BlobReactor) cleanupStalePeerData() {
228+
ticker := time.NewTicker(rateLimiterCleanupInterval)
229+
defer ticker.Stop()
230+
231+
for {
232+
select {
233+
case <-ticker.C:
234+
// Cleanup stale rate limiters
235+
br.peerLimitersMu.Lock()
236+
for peerID, limiter := range br.peerLimiters {
237+
if time.Since(limiter.lastSeen) > staleLimiterTimeout {
238+
delete(br.peerLimiters, peerID)
239+
}
240+
}
241+
br.peerLimitersMu.Unlock()
242+
243+
// Cleanup stale reputations
244+
br.reputationMgr.CleanupStaleReputations()
245+
case <-br.Quit():
246+
return
247+
}
248+
}
249+
}
250+
170251
// spawnWorker attempts to spawn a worker goroutine to handle the given task.
171252
// Returns true if worker was spawned, false if pool is full or reactor is stopped.
172253
func (br *BlobReactor) spawnWorker(task func(), peerID p2p.ID, taskType string) {
@@ -200,6 +281,12 @@ func (br *BlobReactor) Receive(envelope p2p.Envelope) {
200281
return
201282
}
202283

284+
// Ignore messages from peers with low reputation
285+
if !br.reputationMgr.ShouldAcceptPeer(envelope.Src.ID()) {
286+
br.logger.Debug("Ignoring message from banned peer", "peer", envelope.Src.ID())
287+
return
288+
}
289+
203290
br.logger.Info("Received message on BlobChannel",
204291
"peer", envelope.Src.ID(),
205292
"channel", envelope.ChannelID,
@@ -225,9 +312,19 @@ func (br *BlobReactor) Receive(envelope p2p.Envelope) {
225312

226313
switch msgType {
227314
case MessageTypeRequest:
315+
if !br.checkPeerRateLimit(envelope.Src.ID()) {
316+
br.logger.Warn("Peer exceeded rate limit, dropping request", "peer", envelope.Src.ID())
317+
return
318+
}
319+
if !br.globalLimiter.Allow() {
320+
br.logger.Warn("Global rate limit exceeded, dropping request", "peer", envelope.Src.ID())
321+
return
322+
}
323+
228324
var req BlobRequest
229325
if err := req.UnmarshalSSZ(msgData); err != nil {
230326
br.logger.Error("Failed to unmarshal BlobRequest", "error", err, "peer", envelope.Src.ID())
327+
br.reputationMgr.RecordBadBehavior(envelope.Src.ID(), fmt.Errorf("invalid_ssz: %w", err))
231328
return
232329
}
233330
br.logger.Info("Received blob request", "slot", req.Slot.Unwrap(), "request_id", req.RequestID, "peer", envelope.Src.ID())
@@ -241,6 +338,7 @@ func (br *BlobReactor) Receive(envelope p2p.Envelope) {
241338
var resp BlobResponse
242339
if err := resp.UnmarshalSSZ(msgData); err != nil {
243340
br.logger.Error("Failed to unmarshal BlobResponse", "error", err, "peer", envelope.Src.ID(), "data_size", len(msgData))
341+
br.reputationMgr.RecordBadBehavior(envelope.Src.ID(), fmt.Errorf("invalid_ssz: %w", err))
244342
return
245343
}
246344
br.logger.Info("Received blob response",
@@ -256,6 +354,7 @@ func (br *BlobReactor) Receive(envelope p2p.Envelope) {
256354

257355
default:
258356
br.logger.Warn("Received unknown message type", "type", msgType, "peer", envelope.Src.ID())
357+
br.reputationMgr.RecordBadBehavior(envelope.Src.ID(), fmt.Errorf("unknown_message_type: %d", msgType))
259358
}
260359
}
261360

@@ -340,6 +439,7 @@ func (br *BlobReactor) handleBlobResponse(peer p2p.Peer, resp *BlobResponse) {
340439
br.logger.Info("No waiting channel for response (request may have timed out)",
341440
"request_id", resp.RequestID,
342441
"slot", resp.Slot.Unwrap())
442+
br.reputationMgr.RecordBadBehavior(peer.ID(), fmt.Errorf("unsolicited_response: request_id=%d", resp.RequestID))
343443
return
344444
}
345445

@@ -434,6 +534,10 @@ func (br *BlobReactor) RequestBlobs(
434534
br.metrics.recordPeerAttempt(statusSuccess)
435535
br.metrics.recordOverallRequestComplete(statusSuccess, start)
436536
br.logger.Info("Successfully retrieved and verified blobs", "slot", slot.Unwrap(), "peer", peerID, "count", len(sidecars))
537+
538+
// Reward peer for successful blob exchange
539+
br.reputationMgr.RecordGoodBehavior(peerID)
540+
437541
return sidecars, nil
438542
}
439543

@@ -524,34 +628,34 @@ func (br *BlobReactor) requestBlobsFromPeer(ctx context.Context, peerID p2p.ID,
524628

525629
if resp.Slot != slot {
526630
err = fmt.Errorf("peer %s returned wrong slot: expected %d, got %d", peerID, slot.Unwrap(), resp.Slot.Unwrap())
631+
br.reputationMgr.RecordBadBehavior(peer.ID(), err)
527632
return nil, newBlobRequestError(err, statusInvalidResponse)
528633
}
529634

530635
if resp.HeadSlot < resp.Slot {
531636
err = fmt.Errorf("peer %s head (%d) not at requested slot (%d)", peerID, resp.HeadSlot.Unwrap(), resp.Slot.Unwrap())
637+
br.reputationMgr.RecordBadBehavior(peer.ID(), err)
532638
return nil, newBlobRequestError(err, statusInvalidResponse)
533639
}
534640

535641
if len(resp.SidecarData) > defaultRecvMessageCapacity {
536-
err = fmt.Errorf(
537-
"peer %s sent oversized response: %d bytes (max %d)",
538-
peerID,
539-
len(resp.SidecarData),
540-
defaultRecvMessageCapacity,
541-
)
642+
err = fmt.Errorf("peer %s sent oversized response: %d > %d", peerID, len(resp.SidecarData), defaultRecvMessageCapacity)
643+
br.reputationMgr.RecordBadBehavior(peer.ID(), err)
542644
return nil, newBlobRequestError(err, statusInvalidResponse)
543645
}
544646

545647
var sidecars datypes.BlobSidecars
546648
if len(resp.SidecarData) > 0 {
547649
if err = ssz.Unmarshal(resp.SidecarData, &sidecars); err != nil {
548-
err = fmt.Errorf("failed to unmarshal sidecars from peer %s: %w", peerID, err)
650+
err = fmt.Errorf("peer %s failed to unmarshal sidecars: %w", peerID, err)
651+
br.reputationMgr.RecordBadBehavior(peer.ID(), err)
549652
return nil, newBlobRequestError(err, statusInvalidResponse)
550653
}
551654
}
552655

553656
if len(sidecars) > maxBlobsPerBlock {
554-
err = fmt.Errorf("peer %s sent too many blobs: %d (max %d)", peerID, len(sidecars), maxBlobsPerBlock)
657+
err = fmt.Errorf("peer %s sent too many blobs: %d > %d", peerID, len(sidecars), maxBlobsPerBlock)
658+
br.reputationMgr.RecordBadBehavior(peer.ID(), err)
555659
return nil, newBlobRequestError(err, statusInvalidResponse)
556660
}
557661

@@ -568,6 +672,7 @@ func (br *BlobReactor) requestBlobsFromPeer(ctx context.Context, peerID p2p.ID,
568672

569673
func (br *BlobReactor) OnStart() error {
570674
br.logger.Info("Starting BlobReactor", "node_key", br.nodeKey)
675+
go br.cleanupStalePeerData()
571676
return nil
572677
}
573678

@@ -584,6 +689,14 @@ func (br *BlobReactor) OnStop() {
584689
br.logger.Info("BlobReactor stopped, all workers completed")
585690
}
586691

692+
// rateOrInf returns rate.Inf if r <= 0, otherwise returns rate.Limit(r)
693+
func rateOrInf(r float64) rate.Limit {
694+
if r <= 0 {
695+
return rate.Inf
696+
}
697+
return rate.Limit(r)
698+
}
699+
587700
// encodeBlobSidecarsSSZ takes multiple SSZ-encoded BlobSidecar bytes and combines them
588701
// into a single SSZ-encoded BlobSidecars (slice) format.
589702
// The encoding is: 4-byte offset (always 4) + concatenated sidecars.

0 commit comments

Comments
 (0)