Skip to content

Commit 1ca3962

Browse files
committed
Use prometheus client directly for metrics + remove go-metrics dependency
Remove use of cosmossdk.io/telemetry package which relies on go-metrics and replace it with direct use of prometheus/client_golang. This aligns with how CometBFT handles metrics and brings several benefits: eliminates race conditions from go-metrics, avoids questionable maintenance of upstream repo, removes problematic metric retention, and significantly simplifies the metrics codebase. All beacon-kit services now use prometheus client directly through a unified metrics.Factory interface. Additionally, replace cosmossdk.io/store/metrics imports with a custom storage.NoOpStoreMetrics implementation to avoid pulling in the telemetry wrapper. While go-metrics remains as a transitive dependency through cosmossdk.io/store/types, it is no longer used at runtime.
1 parent e0deabc commit 1ca3962

File tree

90 files changed

+2362
-1133
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

90 files changed

+2362
-1133
lines changed

.golangci.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ linters:
288288
- nosprintfhostport # checks for misuse of Sprintf to construct a host with port in a URL
289289
- perfsprint # checks that fmt.Sprintf can be replaced with a faster alternative
290290
- predeclared # finds code that shadows one of Go's predeclared identifiers
291-
- promlinter # checks Prometheus metrics naming via promlint
291+
#- promlinter # checks Prometheus metrics naming via promlint
292292
- protogetter # reports direct reads from proto message fields when getters should be used
293293
- reassign # checks that package variables are not reassigned
294294
- revive # fast, configurable, extensible, flexible, and beautiful linter for Go, drop-in replacement of golint
@@ -373,4 +373,4 @@ issues:
373373
- thelper
374374
exclude-files:
375375
- "pkg/cometbft/cli/.*\\.go"
376-
- "pkg/cometbft/service/server/.*\\.go"
376+
- "pkg/cometbft/service/server/.*\\.go"

beacon/blockchain/blob_fetcher.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ type blobFetcher struct {
6060
queue *blobQueue // Queue for persistent requests
6161
executor *blobFetchExecutor // Executor for fetch logic
6262
config BlobFetcherConfig // Configuration
63-
metrics *blobFetcherMetrics
63+
metrics *BlobFetcherMetrics
6464

6565
// We need to track current head slot so we know when blob download requests need to be pruned as they are outside the WithinDAPeriod
6666
headSlotMu sync.RWMutex
@@ -80,10 +80,8 @@ func NewBlobFetcher(
8080
storageBackend StorageBackend,
8181
chainSpec BlobFetcherChainSpec,
8282
config BlobFetcherConfig,
83-
telemetrySink TelemetrySink,
83+
metrics *BlobFetcherMetrics,
8484
) (BlobFetcher, error) {
85-
metrics := newBlobFetcherMetrics(telemetrySink)
86-
8785
queue, err := newBlobQueue(filepath.Join(dataDir, "blobs", "download_queue"), logger, metrics)
8886
if err != nil {
8987
return nil, err

beacon/blockchain/blob_fetcher_metrics.go

Lines changed: 66 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,44 +20,94 @@
2020

2121
package blockchain
2222

23+
import (
24+
"github.com/berachain/beacon-kit/observability/metrics"
25+
)
26+
2327
// Metric reason constants for blob fetcher.
2428
const (
2529
expiredReasonOutsideDA = "outside_da_period"
2630
expiredReasonMaxRetries = "max_retries"
2731
)
2832

29-
// blobFetcherMetrics contains metrics for the blob fetcher queue and retry operations.
30-
type blobFetcherMetrics struct {
31-
sink TelemetrySink
33+
// BlobFetcherMetrics contains metrics for the blob fetcher queue and retry operations.
34+
type BlobFetcherMetrics struct {
35+
RetriesTotal metrics.Counter
36+
RequestsExpiredTotal metrics.Counter
37+
RequestsCompletedTotal metrics.Counter
38+
RequestsQueuedTotal metrics.Counter
39+
QueueDepth metrics.Gauge
3240
}
3341

34-
// newBlobFetcherMetrics creates a new blobFetcherMetrics instance.
35-
func newBlobFetcherMetrics(sink TelemetrySink) *blobFetcherMetrics {
36-
return &blobFetcherMetrics{sink: sink}
42+
// NewBlobFetcherMetrics returns a new BlobFetcherMetrics instance with metrics from the provided factory.
43+
// Metric names are kept identical to cosmos-sdk/telemetry output for Grafana compatibility.
44+
func NewBlobFetcherMetrics(factory metrics.Factory) *BlobFetcherMetrics {
45+
return &BlobFetcherMetrics{
46+
RetriesTotal: factory.NewCounter(
47+
metrics.CounterOpts{
48+
Subsystem: "blob_fetcher",
49+
Name: "retries_total",
50+
Help: "Number of times a blob request was retried after failure",
51+
},
52+
nil,
53+
),
54+
RequestsExpiredTotal: factory.NewCounter(
55+
metrics.CounterOpts{
56+
Subsystem: "blob_fetcher",
57+
Name: "requests_expired_total",
58+
Help: "Number of blob fetch requests that expired before completion",
59+
},
60+
[]string{"reason"},
61+
),
62+
RequestsCompletedTotal: factory.NewCounter(
63+
metrics.CounterOpts{
64+
Subsystem: "blob_fetcher",
65+
Name: "requests_completed_total",
66+
Help: "Number of blob fetch requests that completed successfully",
67+
},
68+
nil,
69+
),
70+
RequestsQueuedTotal: factory.NewCounter(
71+
metrics.CounterOpts{
72+
Subsystem: "blob_fetcher",
73+
Name: "requests_queued_total",
74+
Help: "Number of new blob fetch requests added to the queue",
75+
},
76+
nil,
77+
),
78+
QueueDepth: factory.NewGauge(
79+
metrics.GaugeOpts{
80+
Subsystem: "blob_fetcher",
81+
Name: "queue_depth",
82+
Help: "Current depth of the blob fetcher queue",
83+
},
84+
nil,
85+
),
86+
}
3787
}
3888

3989
// recordRetry increments counter when a blob request is retried after failure.
40-
func (m *blobFetcherMetrics) recordRetry() {
41-
m.sink.IncrementCounter("beacon_kit.blob_fetcher.retries_total")
90+
func (m *BlobFetcherMetrics) recordRetry() {
91+
m.RetriesTotal.Add(1)
4292
}
4393

4494
// recordRequestExpired increments counter when request expires before completion.
4595
// Reason: "outside_da_period", "max_retries"
46-
func (m *blobFetcherMetrics) recordRequestExpired(reason string) {
47-
m.sink.IncrementCounter("beacon_kit.blob_fetcher.requests_expired_total", "reason", reason)
96+
func (m *BlobFetcherMetrics) recordRequestExpired(reason string) {
97+
m.RequestsExpiredTotal.With("reason", reason).Add(1)
4898
}
4999

50100
// recordRequestComplete increments counter when request completes successfully.
51-
func (m *blobFetcherMetrics) recordRequestComplete() {
52-
m.sink.IncrementCounter("beacon_kit.blob_fetcher.requests_completed_total")
101+
func (m *BlobFetcherMetrics) recordRequestComplete() {
102+
m.RequestsCompletedTotal.Add(1)
53103
}
54104

55105
// recordRequestQueued increments counter when a new request is added to queue.
56-
func (m *blobFetcherMetrics) recordRequestQueued() {
57-
m.sink.IncrementCounter("beacon_kit.blob_fetcher.requests_queued_total")
106+
func (m *BlobFetcherMetrics) recordRequestQueued() {
107+
m.RequestsQueuedTotal.Add(1)
58108
}
59109

60110
// setQueueDepth sets the current depth of the blob fetcher queue.
61-
func (m *blobFetcherMetrics) setQueueDepth(depth int) {
62-
m.sink.SetGauge("beacon_kit.blob_fetcher.queue_depth", int64(depth))
111+
func (m *BlobFetcherMetrics) setQueueDepth(depth int) {
112+
m.QueueDepth.Set(float64(depth))
63113
}

beacon/blockchain/blob_queue.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,12 @@ type BlobFetchRequest struct {
5252
type blobQueue struct {
5353
queueDir string
5454
logger log.Logger
55-
metrics *blobFetcherMetrics
55+
metrics *BlobFetcherMetrics
5656
}
5757

5858
// newBlobQueue creates a new blob queue with the given directory.
5959
// It creates the directory if it doesn't exist and cleans up orphaned temp files.
60-
func newBlobQueue(queueDir string, logger log.Logger, metrics *blobFetcherMetrics) (*blobQueue, error) {
60+
func newBlobQueue(queueDir string, logger log.Logger, metrics *BlobFetcherMetrics) (*blobQueue, error) {
6161
// Create queue directory
6262
if err := os.MkdirAll(queueDir, 0750); err != nil {
6363
return nil, fmt.Errorf("failed to create blob download queue directory: %w", err)

beacon/blockchain/blob_queue_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030

3131
"cosmossdk.io/log"
3232
ctypes "github.com/berachain/beacon-kit/consensus-types/types"
33-
"github.com/berachain/beacon-kit/node-core/components/metrics"
33+
"github.com/berachain/beacon-kit/observability/metrics/discard"
3434
"github.com/berachain/beacon-kit/primitives/common"
3535
"github.com/berachain/beacon-kit/primitives/eip4844"
3636
"github.com/berachain/beacon-kit/primitives/math"
@@ -47,7 +47,7 @@ func createTestBlobRequest(slot math.Slot, blobCount int) BlobFetchRequest {
4747
func TestBlobQueue_SuccessfulWrite(t *testing.T) {
4848
t.Parallel()
4949
tmpDir := t.TempDir()
50-
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), newBlobFetcherMetrics(metrics.NewNoOpTelemetrySink()))
50+
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), NewBlobFetcherMetrics(discard.NewFactory()))
5151
require.NoError(t, err)
5252

5353
slot := math.Slot(100)
@@ -71,7 +71,7 @@ func TestBlobQueue_SuccessfulWrite(t *testing.T) {
7171
func TestBlobQueue_RetryLogic(t *testing.T) {
7272
t.Parallel()
7373
tmpDir := t.TempDir()
74-
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), newBlobFetcherMetrics(metrics.NewNoOpTelemetrySink()))
74+
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), NewBlobFetcherMetrics(discard.NewFactory()))
7575
require.NoError(t, err)
7676

7777
withinDA := func(_, _ math.Slot) bool { return true }
@@ -103,7 +103,7 @@ func TestBlobQueue_RetryLogic(t *testing.T) {
103103
func TestBlobQueue_AvailabilityWindow(t *testing.T) {
104104
t.Parallel()
105105
tmpDir := t.TempDir()
106-
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), newBlobFetcherMetrics(metrics.NewNoOpTelemetrySink()))
106+
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), NewBlobFetcherMetrics(discard.NewFactory()))
107107
require.NoError(t, err)
108108

109109
// Add old request
@@ -131,7 +131,7 @@ func TestBlobQueue_AvailabilityWindow(t *testing.T) {
131131
func TestBlobQueue_UpdateRetry(t *testing.T) {
132132
t.Parallel()
133133
tmpDir := t.TempDir()
134-
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), newBlobFetcherMetrics(metrics.NewNoOpTelemetrySink()))
134+
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), NewBlobFetcherMetrics(discard.NewFactory()))
135135
require.NoError(t, err)
136136

137137
request := createTestBlobRequest(math.Slot(100), 2)
@@ -157,7 +157,7 @@ func TestBlobQueue_UpdateRetry(t *testing.T) {
157157
func TestBlobQueue_ProcessingOrder(t *testing.T) {
158158
t.Parallel()
159159
tmpDir := t.TempDir()
160-
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), newBlobFetcherMetrics(metrics.NewNoOpTelemetrySink()))
160+
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), NewBlobFetcherMetrics(discard.NewFactory()))
161161
require.NoError(t, err)
162162

163163
withinDA := func(_, _ math.Slot) bool { return true }
@@ -188,7 +188,7 @@ func TestBlobQueue_ProcessingOrder(t *testing.T) {
188188
func TestBlobQueue_MaxRetryLimit(t *testing.T) {
189189
t.Parallel()
190190
tmpDir := t.TempDir()
191-
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), newBlobFetcherMetrics(metrics.NewNoOpTelemetrySink()))
191+
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), NewBlobFetcherMetrics(discard.NewFactory()))
192192
require.NoError(t, err)
193193

194194
withinDA := func(_, _ math.Slot) bool { return true }
@@ -217,7 +217,7 @@ func TestBlobQueue_MaxRetryLimit(t *testing.T) {
217217
func TestBlobQueue_UnderRetryLimit(t *testing.T) {
218218
t.Parallel()
219219
tmpDir := t.TempDir()
220-
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), newBlobFetcherMetrics(metrics.NewNoOpTelemetrySink()))
220+
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), NewBlobFetcherMetrics(discard.NewFactory()))
221221
require.NoError(t, err)
222222

223223
withinDA := func(_, _ math.Slot) bool { return true }
@@ -245,7 +245,7 @@ func TestBlobQueue_UnderRetryLimit(t *testing.T) {
245245
func TestBlobQueue_CorruptedFileHandling(t *testing.T) {
246246
t.Parallel()
247247
tmpDir := t.TempDir()
248-
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), newBlobFetcherMetrics(metrics.NewNoOpTelemetrySink()))
248+
queue, err := newBlobQueue(tmpDir, log.NewNopLogger(), NewBlobFetcherMetrics(discard.NewFactory()))
249249
require.NoError(t, err)
250250

251251
withinDA := func(_, _ math.Slot) bool { return true }

beacon/blockchain/deposit.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,7 @@ func (s *Service) fetchAndStoreDeposits(
5959
deposits, err := s.depositContract.ReadDeposits(ctx, blockNum, blockNum)
6060
if err != nil {
6161
s.logger.Error("Failed to read deposits", "error", err)
62-
s.metrics.sink.IncrementCounter(
63-
"beacon_kit.execution.deposit.failed_to_get_block_logs",
64-
"block_num",
65-
blockNumStr,
66-
)
62+
s.metrics.FailedToGetBlockLogs.With("block_num", blockNumStr).Add(1)
6763
s.failedBlocksMu.Lock()
6864
s.failedBlocks[blockNum] = struct{}{}
6965
s.failedBlocksMu.Unlock()
@@ -79,11 +75,7 @@ func (s *Service) fetchAndStoreDeposits(
7975

8076
if err = s.storageBackend.DepositStore().EnqueueDeposits(ctx, deposits); err != nil {
8177
s.logger.Error("Failed to store deposits", "error", err)
82-
s.metrics.sink.IncrementCounter(
83-
"beacon_kit.execution.deposit.failed_to_enqueue_deposits",
84-
"block_num",
85-
blockNumStr,
86-
)
78+
s.metrics.FailedToEnqueueDeposits.With("block_num", blockNumStr).Add(1)
8779
s.failedBlocksMu.Lock()
8880
s.failedBlocks[blockNum] = struct{}{}
8981
s.failedBlocksMu.Unlock()

beacon/blockchain/interfaces.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ package blockchain
2222

2323
import (
2424
"context"
25-
"time"
2625

2726
"github.com/berachain/beacon-kit/chain"
2827
ctypes "github.com/berachain/beacon-kit/consensus-types/types"
@@ -129,20 +128,6 @@ type StorageBackend interface {
129128
BlockStore() *block.KVStore[*ctypes.BeaconBlock]
130129
}
131130

132-
// TelemetrySink is an interface for sending metrics to a telemetry backend.
133-
type TelemetrySink interface {
134-
// IncrementCounter increments the counter identified by
135-
// the provided key.
136-
IncrementCounter(key string, args ...string)
137-
138-
// SetGauge sets a gauge metric to the specified value.
139-
SetGauge(key string, value int64, args ...string)
140-
141-
// MeasureSince measures the time since the provided start time,
142-
// identified by the provided keys.
143-
MeasureSince(key string, start time.Time, args ...string)
144-
}
145-
146131
//nolint:revive // its ok
147132
type BlockchainI interface {
148133
ProcessGenesisData(

0 commit comments

Comments
 (0)