Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion api/middlewares/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,3 @@ func validPath(url string) *string {

return nil
}

21 changes: 2 additions & 19 deletions api/storage/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -196,25 +195,9 @@ func loadTopnRewardsOverall(topnMiners map[time.Duration][]store.TopnMiner) erro
func cacheSyncHeights() error {
cache.mu.Lock()
defer cache.mu.Unlock()

value, ok, err := db.ConfigStore.Get(store.SyncHeightNode)
if err != nil {
return errors.WithMessage(err, "Failed to get node sync height")
}
if !ok {
return errors.New("No matching record found(node sync height)")
}
nodeSyncHeight, err := strconv.ParseUint(value, 10, 64)
nodeSyncHeight, scanSyncHeight, err := db.GetSyncHeights()
if err != nil {
return errors.WithMessage(err, "Failed to parse node sync height")
}

scanSyncHeight, ok, err := db.BlockStore.MaxBlock()
if err != nil {
return errors.WithMessage(err, "Failed to get scan sync height")
}
if !ok {
return errors.New("No matching record found(scan sync height)")
return errors.WithMessage(err, "Failed to load sync heights")
}

cache.syncHeights = LogSyncInfo{
Expand Down
2 changes: 1 addition & 1 deletion cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func startSyncService(*cobra.Command, []string) {
viperUtil.MustUnmarshalKey("sync", &conf)

cs := nhSync.MustNewCatchupSyncer(dataCtx.Eth, dataCtx.DB, conf, dataCtx.EthCfg.AlertChannel, dataCtx.EthCfg.HealthReport)
ss := nhSync.MustNewStorageSyncer(dataCtx.DB, dataCtx.StorageConfig, dataCtx.StorageConfig.AlertChannel, dataCtx.StorageConfig.HealthReport)
ss := nhSync.MustNewStorageSyncer(dataCtx.DB, dataCtx.StorageConfig, dataCtx.StorageConfig.AlertChannel, dataCtx.StorageConfig.HealthReport, dataCtx.Eth)
ps := nhSync.MustNewPatchSyncer(dataCtx.Eth, dataCtx.DB, conf)
syncer := nhSync.MustNewSyncer(dataCtx.Eth, dataCtx.DB, conf, cs, ss, ps)

Expand Down
1 change: 0 additions & 1 deletion rpc/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,6 @@ func BatchGetBlocks(ctx context.Context, w3c *web3go.Client, blkNums []types.Blo
}
}


return blockNum2Block, nil
}

Expand Down
31 changes: 16 additions & 15 deletions rpc/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,22 @@ var (
BatchGetSubmitsByGoroutines = 10
)

type FileInfo struct {
FileInfoParam
UploadedSegNum uint64
}

type StorageConfig struct {
Indexer string
Retry int
RetryInterval time.Duration `default:"1s"`
RequestTimeout time.Duration `default:"3s"`
MaxConnsPerHost int `default:"1024"`
AlertChannel string
HealthReport health.TimedCounterConfig
SyncGapAlertThreshold uint64 `default:"1000"`
}

type FileInfoParam struct {
SubmissionIndex uint64
Status uint8
Expand Down Expand Up @@ -63,21 +79,6 @@ func (executor *FileInfoExecutor) ParallelCollect(ctx context.Context, result *p
return nil
}

type FileInfo struct {
FileInfoParam
UploadedSegNum uint64
}

type StorageConfig struct {
Indexer string
Retry int
RetryInterval time.Duration `default:"1s"`
RequestTimeout time.Duration `default:"3s"`
MaxConnsPerHost int `default:"1024"`
AlertChannel string
HealthReport health.TimedCounterConfig
}

// getFileInfo implements the rpcFunc interface
func (executor *FileInfoExecutor) getFileInfo(ctx context.Context, storageConfig StorageConfig,
rpcParam FileInfoParam, task int) (*FileInfo, error) {
Expand Down
27 changes: 27 additions & 0 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package store
import (
"context"
"fmt"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -287,6 +288,32 @@ func (ms *MysqlStore) Close() error {
return ms.Store.Close()
}

// GetSyncHeights returns (nodeSyncHeight, scanSyncHeight).
// It queries ConfigStore and BlockStore for the stored node sync height and max scanned block.
func (ms *MysqlStore) GetSyncHeights() (uint64, uint64, error) {
value, ok, err := ms.ConfigStore.Get(SyncHeightNode)
if err != nil {
return 0, 0, errors.WithMessage(err, "Failed to get node sync height")
}
if !ok {
return 0, 0, errors.New("No matching record found(node sync height)")
}
nodeSyncHeight, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return 0, 0, errors.WithMessage(err, "Failed to parse node sync height")
}

scanSyncHeight, ok, err := ms.BlockStore.MaxBlock()
if err != nil {
return 0, 0, errors.WithMessage(err, "Failed to get scan sync height")
}
if !ok {
return 0, 0, errors.New("No matching record found(scan sync height)")
}

return nodeSyncHeight, scanSyncHeight, nil
}

func (ms *MysqlStore) UpdateSubmitByPrimaryKey(s *Submit, as *AddressSubmit) error {
return ms.Store.DB.Transaction(func(dbTx *gorm.DB) error {
if err := ms.SubmitStore.UpdateByPrimaryKey(dbTx, s); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions store/store_reward.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ type TopnMiner struct {
func (t *RewardTopnStatStore) Topn(duration time.Duration, limit int) ([]TopnMiner, error) {
miners := new([]TopnMiner)

sqlTopn := fmt.Sprintf(`
sqlTopn := `
SELECT
a.id, a.address, s.amount, s.win_count
FROM
Expand All @@ -415,7 +415,7 @@ func (t *RewardTopnStatStore) Topn(duration time.Duration, limit int) ([]TopnMin
LIMIT ?
) s
LEFT JOIN addresses a ON s.address_id = a.id
`)
`

if err := t.DB.Raw(sqlTopn, time.Now().Add(-duration), limit).Scan(miners).Error; err != nil {
return nil, err
Expand Down
58 changes: 56 additions & 2 deletions sync/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package sync

import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"

"github.com/0glabs/0g-storage-scan/rpc"
"github.com/0glabs/0g-storage-scan/store"
"github.com/Conflux-Chain/go-conflux-util/health"
"github.com/openweb3/web3go"
"github.com/sirupsen/logrus"
)

Expand All @@ -23,16 +27,18 @@ type StorageSyncer struct {
alertChannel string
healthReport health.TimedCounterConfig
storageRpcHealth health.TimedCounter
blockchainClient *web3go.Client
}

func MustNewStorageSyncer(db *store.MysqlStore, storageConfig rpc.StorageConfig, alertChannel string,
healthReport health.TimedCounterConfig) *StorageSyncer {
healthReport health.TimedCounterConfig, blockchainClient *web3go.Client) *StorageSyncer {
return &StorageSyncer{
db: db,
storageConfig: storageConfig,
alertChannel: alertChannel,
healthReport: healthReport,
storageRpcHealth: health.TimedCounter{},
blockchainClient: blockchainClient,
}
}

Expand Down Expand Up @@ -83,12 +89,17 @@ func (ss *StorageSyncer) LatestFiles(ctx context.Context, ticker *time.Ticker) {
}

func (ss *StorageSyncer) NodeSyncHeight(ctx context.Context, ticker *time.Ticker) {
var err error
nodeStatus, err := rpc.GetNodeStatus(ss.storageConfig)

if err == nil {
height := nodeStatus.LogSyncHeight
err := ss.db.ConfigStore.Upsert(nil, store.SyncHeightNode, strconv.FormatUint(height, 10))
err = ss.db.ConfigStore.Upsert(nil, store.SyncHeightNode, strconv.FormatUint(height, 10))
if err != nil {
logrus.WithError(err).Error("Failed to upsert storage node sync height")
} else {
// Check sync height gaps and use gap error for alerting if present
err = ss.checkSyncHeightGaps(height)
}
}

Expand All @@ -104,3 +115,46 @@ func (ss *StorageSyncer) NodeSyncHeight(ctx context.Context, ticker *time.Ticker
}
}
}

// checkSyncHeightGaps monitors sync height differences and returns error if gaps exceed 1000 blocks
func (ss *StorageSyncer) checkSyncHeightGaps(nodeSyncHeight uint64) error {
if ss.blockchainClient == nil {
return nil
}

// Get current blockchain height
currentBlock, err := ss.blockchainClient.Eth.BlockNumber()
if err != nil {
logrus.WithError(err).Error("Failed to get current block height for sync monitoring")
return err
}

currentHeight := currentBlock.Uint64()

// Get scanner and node sync heights from database
_, scannerSyncHeight, err := ss.db.GetSyncHeights()
if err != nil {
logrus.WithError(err).Error("Failed to get scanner sync height for monitoring")
return err
}

// Accumulate gap errors so we can report all of them at once
var gapMsgs []string
// Check layer1-logsyncheight gap (node sync height vs blockchain height)
if currentHeight > nodeSyncHeight && currentHeight-nodeSyncHeight > ss.storageConfig.SyncGapAlertThreshold {
gap := currentHeight - nodeSyncHeight
gapMsgs = append(gapMsgs, fmt.Sprintf("Layer1LogSyncHeight sync gap: %d blocks behind (sync: %d, current: %d)", gap, nodeSyncHeight, currentHeight))
}

// Check logsyncheight gap (scanner sync height vs blockchain height)
if currentHeight > scannerSyncHeight && currentHeight-scannerSyncHeight > ss.storageConfig.SyncGapAlertThreshold {
gap := currentHeight - scannerSyncHeight
gapMsgs = append(gapMsgs, fmt.Sprintf("LogSyncHeight sync gap: %d blocks behind (sync: %d, current: %d)", gap, scannerSyncHeight, currentHeight))
}

if len(gapMsgs) > 0 {
return errors.New(strings.Join(gapMsgs, "; "))
}

return nil // No sync gap issues
}
2 changes: 1 addition & 1 deletion sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (s *Syncer) syncOnce(ctx context.Context) (bool, error) {
batchSize := s.calculateBatchSize(blockGap)

// calculate the actual range to sync
endBlock := min(curBlock + batchSize - 1, maxSyncBlock)
endBlock := min(curBlock+batchSize-1, maxSyncBlock)

// check parity api available
if err := s.tryParityAPI(ctx, curBlock); err != nil {
Expand Down
Loading