Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ by calling `updateAdditionalActionFee` admin function.
### Fixes

* [4194](https://github.com/zeta-chain/node/pull/4194) - remove duplicate solana post-gas-price goroutine
* [4278](https://github.com/zeta-chain/node/pull/4278) - force rescan if inbound vote monitoring fails

## v33.0.0

Expand Down
19 changes: 19 additions & 0 deletions pkg/errors/monitor_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package errors

import "fmt"

// ErrTxMonitor represents an error from the monitoring goroutine.
type ErrTxMonitor struct {
Err error
InboundBlockHeight uint64
ZetaTxHash string
BallotIndex string
}

func (m ErrTxMonitor) Error() string {
if m.Err == nil {
return "monitoring completed without error"
}
return fmt.Sprintf("monitoring error: %v, inbound block height: %d, zeta tx hash: %s, ballot index: %s",
m.Err, m.InboundBlockHeight, m.ZetaTxHash, m.BallotIndex)
}
1 change: 1 addition & 0 deletions zetaclient/chains/base/confirmation.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (ob *Observer) IsInboundEligibleForFastConfirmation(
//
// example 1: given lastBlock = 99, lastScanned = 90, confirmation = 10, then no unscanned block
// example 2: given lastBlock = 100, lastScanned = 90, confirmation = 10, then 1 unscanned block (block 91)
// example 3: given lastBlock = 100, lastScanned = 50, confirmation = 10, then 41 unscanned blocks[51 - 91] (last scanned is reset by monitoring thread)
func calcUnscannedBlockRange(lastBlock, lastScanned, confirmation, blockLimit uint64) (from uint64, end uint64) {
// got unscanned blocks or not?
// returning same values to indicate no unscanned block
Expand Down
14 changes: 12 additions & 2 deletions zetaclient/chains/base/confirmation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,23 @@ func Test_GetScanRangeInboundSafe(t *testing.T) {
},
expectedBlockRange: [2]uint64{91, 101}, // [91, 101), 11 unscanned blocks, but capped to 10
},
{
name: "last scanned reset by monitering thread (Low last scanned compared to last block)",
lastBlock: 100,
lastScanned: 50,
blockLimit: 10,
confParams: observertypes.ConfirmationParams{
SafeInboundCount: 10,
},
expectedBlockRange: [2]uint64{51, 61},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ob := newTestSuite(t, chain, withConfirmationParams(tt.confParams))
ob.Observer.WithLastBlock(tt.lastBlock)
ob.Observer.WithLastBlockScanned(tt.lastScanned)
ob.Observer.WithLastBlockScanned(tt.lastScanned, false)

start, end := ob.GetScanRangeInboundSafe(tt.blockLimit)
require.Equal(t, tt.expectedBlockRange, [2]uint64{start, end})
Expand Down Expand Up @@ -131,7 +141,7 @@ func Test_GetScanRangeInboundFast(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
ob := newTestSuite(t, chain, withConfirmationParams(tt.confParams))
ob.Observer.WithLastBlock(tt.lastBlock)
ob.Observer.WithLastBlockScanned(tt.lastScanned)
ob.Observer.WithLastBlockScanned(tt.lastScanned, false)

start, end := ob.GetScanRangeInboundFast(tt.blockLimit)
require.Equal(t, tt.expectedBlockRange, [2]uint64{start, end})
Expand Down
128 changes: 108 additions & 20 deletions zetaclient/chains/base/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"
"sync"
"sync/atomic"
"time"

lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
Expand All @@ -15,9 +16,11 @@ import (
"google.golang.org/grpc/status"

"github.com/zeta-chain/node/pkg/chains"
zetaerrors "github.com/zeta-chain/node/pkg/errors"
crosschaintypes "github.com/zeta-chain/node/x/crosschain/types"
observertypes "github.com/zeta-chain/node/x/observer/types"
"github.com/zeta-chain/node/zetaclient/chains/interfaces"
zctx "github.com/zeta-chain/node/zetaclient/context"
"github.com/zeta-chain/node/zetaclient/db"
"github.com/zeta-chain/node/zetaclient/logs"
"github.com/zeta-chain/node/zetaclient/metrics"
Expand All @@ -32,6 +35,9 @@ const (
// DefaultBlockCacheSize is the default number of blocks that the observer will keep in cache for performance (without RPC calls)
// Cached blocks can be used to get block information and verify transactions
DefaultBlockCacheSize = 1000

// MonitoringErrHandlerRoutineTimeout is the timeout for the handleMonitoring routine that waits for an error from the monitorVote channel
MonitoringErrHandlerRoutineTimeout = 5 * time.Minute
)

// Observer is the base structure for chain observers, grouping the common logic for each chain observer client.
Expand Down Expand Up @@ -76,6 +82,8 @@ type Observer struct {

// stop is the channel to signal the observer to stop
stop chan struct{}

forceResetLastScanned bool
}

// NewObserver creates a new base observer.
Expand All @@ -95,19 +103,20 @@ func NewObserver(
}

return &Observer{
chain: chain,
chainParams: chainParams,
zetacoreClient: zetacoreClient,
tssSigner: tssSigner,
lastBlock: 0,
lastBlockScanned: 0,
lastTxScanned: "",
ts: ts,
db: database,
blockCache: blockCache,
mu: &sync.Mutex{},
logger: newObserverLogger(chain, logger),
stop: make(chan struct{}),
chain: chain,
chainParams: chainParams,
zetacoreClient: zetacoreClient,
tssSigner: tssSigner,
lastBlock: 0,
lastBlockScanned: 0,
lastTxScanned: "",
ts: ts,
db: database,
blockCache: blockCache,
mu: &sync.Mutex{},
logger: newObserverLogger(chain, logger),
stop: make(chan struct{}),
forceResetLastScanned: false,
}, nil
}

Expand Down Expand Up @@ -215,15 +224,31 @@ func (ob *Observer) WithLastBlock(lastBlock uint64) *Observer {

// LastBlockScanned get last block scanned (not necessarily caught up with the chain; could be slow/paused).
func (ob *Observer) LastBlockScanned() uint64 {
ob.mu.Lock()
defer ob.mu.Unlock()
height := atomic.LoadUint64(&ob.lastBlockScanned)
return height
}

// WithLastBlockScanned set last block scanned (not necessarily caught up with the chain; could be slow/paused).
func (ob *Observer) WithLastBlockScanned(blockNumber uint64) *Observer {
// it also set the value of forceResetLastScanned and returns the previous value.
// If forceResetLastScanned was true before, it means the monitoring thread would have updated it and so it skips updating the last scanned block.
func (ob *Observer) WithLastBlockScanned(blockNumber uint64, forceResetLastScanned bool) (*Observer, bool) {
ob.mu.Lock()
defer ob.mu.Unlock()

wasForceReset := ob.forceResetLastScanned
ob.forceResetLastScanned = forceResetLastScanned

// forceResetLastScanned was set to true before; it means the monitoring thread would have updated it
// In this case we should not update the last scanned block and just return
if wasForceReset && !forceResetLastScanned {
return ob, wasForceReset
}

atomic.StoreUint64(&ob.lastBlockScanned, blockNumber)
metrics.LastScannedBlockNumber.WithLabelValues(ob.chain.Name).Set(float64(blockNumber))
return ob
return ob, wasForceReset
}

// LastTxScanned get last transaction scanned.
Expand Down Expand Up @@ -299,7 +324,7 @@ func (ob *Observer) LoadLastBlockScanned() error {
if err != nil {
return errors.Wrapf(err, "unable to parse block number from ENV %s=%s", envvar, scanFromBlock)
}
ob.WithLastBlockScanned(blockNumber)
ob.WithLastBlockScanned(blockNumber, false)
return nil
}

Expand All @@ -309,14 +334,28 @@ func (ob *Observer) LoadLastBlockScanned() error {
logger.Info().Msg("last scanned block not found in the database")
return nil
}
ob.WithLastBlockScanned(blockNumber)
ob.WithLastBlockScanned(blockNumber, false)

return nil
}

// SaveLastBlockScanned saves the last scanned block to memory and database.
func (ob *Observer) SaveLastBlockScanned(blockNumber uint64) error {
ob.WithLastBlockScanned(blockNumber)
_, forceResetLastScannedBeforeUpdate := ob.WithLastBlockScanned(blockNumber, false)
if forceResetLastScannedBeforeUpdate {
return nil
}
return ob.WriteLastBlockScannedToDB(blockNumber)
}

// ForceSaveLastBlockScanned saves the last scanned block to memory if the new blocknumber is less than the current last scanned block.
// It also forces the update of the last scanned block in the database, to makes sure any other the block gets rescanned.
func (ob *Observer) ForceSaveLastBlockScanned(blockNumber uint64) error {
currentLastScanned := ob.LastBlockScanned()
if blockNumber > currentLastScanned {
return nil
}
ob.WithLastBlockScanned(blockNumber, true)
return ob.WriteLastBlockScannedToDB(blockNumber)
}

Expand Down Expand Up @@ -370,7 +409,7 @@ func (ob *Observer) SaveLastTxScanned(txHash string, slot uint64) error {
ob.WithLastTxScanned(txHash)

// update last_scanned_block_number metrics
ob.WithLastBlockScanned(slot)
ob.WithLastBlockScanned(slot, false)

return ob.WriteLastTxScannedToDB(txHash)
}
Expand Down Expand Up @@ -433,8 +472,15 @@ func (ob *Observer) PostVoteInbound(
return "", nil
}

monitorErrCh := make(chan zetaerrors.ErrTxMonitor, 1)

// ctxWithTimeout is a context with timeout used for monitoring the vote transaction
// Note: the canceller is not used because we want to allow the goroutines to run until they time out
ctxWithTimeout, _ := zctx.CopyWithTimeout(ctx, context.Background(), MonitoringErrHandlerRoutineTimeout)

// post vote to zetacore
zetaHash, ballot, err := ob.ZetacoreClient().PostVoteInbound(ctx, gasLimit, retryGasLimit, msg)
zetaHash, ballot, err := ob.ZetacoreClient().
PostVoteInbound(ctxWithTimeout, gasLimit, retryGasLimit, msg, monitorErrCh)

logger = logger.With().
Str(logs.FieldZetaTx, zetaHash).
Expand All @@ -451,9 +497,51 @@ func (ob *Observer) PostVoteInbound(
logger.Info().Msg("inbound detected: vote posted")
}

go func() {
ob.handleMonitoringError(ctxWithTimeout, monitorErrCh, zetaHash)
}()

return ballot, nil
}

func (ob *Observer) handleMonitoringError(
ctx context.Context,
monitorErrCh <-chan zetaerrors.ErrTxMonitor,
zetaHash string,
) {
logger := ob.logger.Inbound
defer func() {
if r := recover(); r != nil {
logger.Error().Any("panic", r).Msg("recovered from panic in monitoring error handler")
}
}()

select {
case monitorErr := <-monitorErrCh:
if monitorErr.Err != nil {
logger.Error().
Err(monitorErr).
Str(logs.FieldZetaTx, monitorErr.ZetaTxHash).
Str(logs.FieldBallotIndex, monitorErr.BallotIndex).
Uint64(logs.FieldBlock, monitorErr.InboundBlockHeight).
Msg("error monitoring vote transaction")

if monitorErr.InboundBlockHeight > 0 {
err := ob.ForceSaveLastBlockScanned(monitorErr.InboundBlockHeight - 1)
if err != nil {
logger.Error().Err(err).
Str(logs.FieldZetaTx, monitorErr.ZetaTxHash).
Msg("unable to save last scanned block after monitoring error")
}
}
}
case <-ctx.Done():
logger.Debug().
Str(logs.FieldZetaTx, zetaHash).
Msg("no error received for the monitoring, the transaction likely succeeded")
}
}

// EnvVarLatestBlockByChain returns the environment variable for the last block by chain.
func EnvVarLatestBlockByChain(chain chains.Chain) string {
return fmt.Sprintf("CHAIN_%d_SCAN_FROM_BLOCK", chain.ChainId)
Expand Down
2 changes: 1 addition & 1 deletion zetaclient/chains/base/observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func TestObserverGetterAndSetter(t *testing.T) {

// update last block scanned
newLastBlockScanned := uint64(100)
ob.Observer.WithLastBlockScanned(newLastBlockScanned)
ob.Observer.WithLastBlockScanned(newLastBlockScanned, false)
require.Equal(t, newLastBlockScanned, ob.LastBlockScanned())
})

Expand Down
4 changes: 2 additions & 2 deletions zetaclient/chains/bitcoin/observer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ func (ob *Observer) LoadLastBlockScanned(ctx context.Context) error {
return errors.Wrap(err, "unable to get block count")
}
// #nosec G115 always positive
ob.WithLastBlockScanned(uint64(blockNumber))
ob.WithLastBlockScanned(uint64(blockNumber), false)
}

// bitcoin regtest starts from hardcoded block 100
if chains.IsBitcoinRegnet(ob.Chain().ChainId) {
ob.WithLastBlockScanned(RegnetStartBlock)
ob.WithLastBlockScanned(RegnetStartBlock, false)
}
ob.Logger().Chain.Info().Uint64("last_block_scanned", ob.LastBlockScanned()).Send()

Expand Down
2 changes: 1 addition & 1 deletion zetaclient/chains/bitcoin/observer/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func Test_LoadLastBlockScanned(t *testing.T) {
obOther := newTestSuite(t, chain)

// reset last block scanned to 0 so that it will be loaded from RPC
obOther.WithLastBlockScanned(0)
obOther.WithLastBlockScanned(0, false)

// attach a mock btc client that returns rpc error
obOther.client.ExpectedCalls = nil
Expand Down
2 changes: 1 addition & 1 deletion zetaclient/chains/evm/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (ob *Observer) loadLastBlockScanned(ctx context.Context) error {
if err != nil {
return errors.Wrapf(err, "error BlockNumber for chain %d", ob.Chain().ChainId)
}
ob.WithLastBlockScanned(blockNumber)
ob.WithLastBlockScanned(blockNumber, false)
}
ob.Logger().Chain.Info().
Uint64("last_block_scanned", ob.LastBlockScanned()).
Expand Down
2 changes: 1 addition & 1 deletion zetaclient/chains/evm/observer/observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func Test_LoadLastBlockScanned(t *testing.T) {
obOther := newTestSuite(t)

// reset last block scanned to 0 so that it will be loaded from RPC
obOther.WithLastBlockScanned(0)
obOther.WithLastBlockScanned(0, false)

// attach mock evm client to observer
obOther.evmMock.On("BlockNumber", mock.Anything).Unset()
Expand Down
2 changes: 2 additions & 0 deletions zetaclient/chains/interfaces/zetacore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/zeta-chain/go-tss/blame"

"github.com/zeta-chain/node/pkg/chains"
zetaerrors "github.com/zeta-chain/node/pkg/errors"
crosschain "github.com/zeta-chain/node/x/crosschain/types"
fungible "github.com/zeta-chain/node/x/fungible/types"
observer "github.com/zeta-chain/node/x/observer/types"
Expand Down Expand Up @@ -49,6 +50,7 @@ type ZetacoreWriter interface {
gasLimit uint64,
retryGasLimit uint64,
_ *crosschain.MsgVoteInbound,
monitorErrCh chan<- zetaerrors.ErrTxMonitor,
) (string, string, error)

PostOutboundTracker(_ context.Context,
Expand Down
2 changes: 1 addition & 1 deletion zetaclient/chains/solana/observer/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (ob *Observer) ObserveInbound(ctx context.Context) error {
// update metrics if no new signatures found
if len(signatures) == 0 {
if errSlot == nil {
ob.WithLastBlockScanned(lastSlot)
ob.WithLastBlockScanned(lastSlot, false)
}
} else {
ob.Logger().Inbound.Info().
Expand Down
5 changes: 3 additions & 2 deletions zetaclient/chains/sui/observer/observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/zeta-chain/node/pkg/chains"
"github.com/zeta-chain/node/pkg/coin"
"github.com/zeta-chain/node/pkg/contracts/sui"
zetaerrors "github.com/zeta-chain/node/pkg/errors"
"github.com/zeta-chain/node/testutil/sample"
cctypes "github.com/zeta-chain/node/x/crosschain/types"
"github.com/zeta-chain/node/zetaclient/chains/base"
Expand Down Expand Up @@ -602,13 +603,13 @@ func (ts *testSuite) MockGetTxOnce(tx models.SuiTransactionBlockResponse) {
}

func (ts *testSuite) CatchInboundVotes() {
callback := func(_ context.Context, _, _ uint64, msg *cctypes.MsgVoteInbound) (string, string, error) {
callback := func(_ context.Context, _, _ uint64, msg *cctypes.MsgVoteInbound, _ chan<- zetaerrors.ErrTxMonitor) (string, string, error) {
ts.inboundVotesBag = append(ts.inboundVotesBag, msg)
return "", "", nil
}

ts.zetaMock.
On("PostVoteInbound", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
On("PostVoteInbound", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(callback).
Maybe()
}
Expand Down
Loading