diff --git a/changelog.md b/changelog.md index 7cf4f0c7fd..4d218ebc2e 100644 --- a/changelog.md +++ b/changelog.md @@ -27,6 +27,7 @@ by calling `updateAdditionalActionFee` admin function. ### Fixes * [4194](https://github.com/zeta-chain/node/pull/4194) - remove duplicate solana post-gas-price goroutine +* [4278](https://github.com/zeta-chain/node/pull/4278) - force rescan if inbound vote monitoring fails ## v33.0.0 diff --git a/pkg/errors/monitor_error.go b/pkg/errors/monitor_error.go new file mode 100644 index 0000000000..54dc26dc00 --- /dev/null +++ b/pkg/errors/monitor_error.go @@ -0,0 +1,19 @@ +package errors + +import "fmt" + +// ErrTxMonitor represents an error from the monitoring goroutine. +type ErrTxMonitor struct { + Err error + InboundBlockHeight uint64 + ZetaTxHash string + BallotIndex string +} + +func (m ErrTxMonitor) Error() string { + if m.Err == nil { + return "monitoring completed without error" + } + return fmt.Sprintf("monitoring error: %v, inbound block height: %d, zeta tx hash: %s, ballot index: %s", + m.Err, m.InboundBlockHeight, m.ZetaTxHash, m.BallotIndex) +} diff --git a/zetaclient/chains/base/confirmation.go b/zetaclient/chains/base/confirmation.go index fb214860fb..94c6bad757 100644 --- a/zetaclient/chains/base/confirmation.go +++ b/zetaclient/chains/base/confirmation.go @@ -95,6 +95,7 @@ func (ob *Observer) IsInboundEligibleForFastConfirmation( // // example 1: given lastBlock = 99, lastScanned = 90, confirmation = 10, then no unscanned block // example 2: given lastBlock = 100, lastScanned = 90, confirmation = 10, then 1 unscanned block (block 91) +// example 3: given lastBlock = 100, lastScanned = 50, confirmation = 10, then 41 unscanned blocks[51 - 91] (last scanned is reset by monitoring thread) func calcUnscannedBlockRange(lastBlock, lastScanned, confirmation, blockLimit uint64) (from uint64, end uint64) { // got unscanned blocks or not? // returning same values to indicate no unscanned block diff --git a/zetaclient/chains/base/confirmation_test.go b/zetaclient/chains/base/confirmation_test.go index 83f5f729f7..05ec2fd6b4 100644 --- a/zetaclient/chains/base/confirmation_test.go +++ b/zetaclient/chains/base/confirmation_test.go @@ -67,13 +67,23 @@ func Test_GetScanRangeInboundSafe(t *testing.T) { }, expectedBlockRange: [2]uint64{91, 101}, // [91, 101), 11 unscanned blocks, but capped to 10 }, + { + name: "last scanned reset by monitering thread (Low last scanned compared to last block)", + lastBlock: 100, + lastScanned: 50, + blockLimit: 10, + confParams: observertypes.ConfirmationParams{ + SafeInboundCount: 10, + }, + expectedBlockRange: [2]uint64{51, 61}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ob := newTestSuite(t, chain, withConfirmationParams(tt.confParams)) ob.Observer.WithLastBlock(tt.lastBlock) - ob.Observer.WithLastBlockScanned(tt.lastScanned) + ob.Observer.WithLastBlockScanned(tt.lastScanned, false) start, end := ob.GetScanRangeInboundSafe(tt.blockLimit) require.Equal(t, tt.expectedBlockRange, [2]uint64{start, end}) @@ -131,7 +141,7 @@ func Test_GetScanRangeInboundFast(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ob := newTestSuite(t, chain, withConfirmationParams(tt.confParams)) ob.Observer.WithLastBlock(tt.lastBlock) - ob.Observer.WithLastBlockScanned(tt.lastScanned) + ob.Observer.WithLastBlockScanned(tt.lastScanned, false) start, end := ob.GetScanRangeInboundFast(tt.blockLimit) require.Equal(t, tt.expectedBlockRange, [2]uint64{start, end}) diff --git a/zetaclient/chains/base/observer.go b/zetaclient/chains/base/observer.go index b640dcef3e..ece1b60ae3 100644 --- a/zetaclient/chains/base/observer.go +++ b/zetaclient/chains/base/observer.go @@ -7,6 +7,7 @@ import ( "strconv" "sync" "sync/atomic" + "time" lru "github.com/hashicorp/golang-lru" "github.com/pkg/errors" @@ -15,9 +16,11 @@ import ( "google.golang.org/grpc/status" "github.com/zeta-chain/node/pkg/chains" + zetaerrors "github.com/zeta-chain/node/pkg/errors" crosschaintypes "github.com/zeta-chain/node/x/crosschain/types" observertypes "github.com/zeta-chain/node/x/observer/types" "github.com/zeta-chain/node/zetaclient/chains/interfaces" + zctx "github.com/zeta-chain/node/zetaclient/context" "github.com/zeta-chain/node/zetaclient/db" "github.com/zeta-chain/node/zetaclient/logs" "github.com/zeta-chain/node/zetaclient/metrics" @@ -32,6 +35,9 @@ const ( // DefaultBlockCacheSize is the default number of blocks that the observer will keep in cache for performance (without RPC calls) // Cached blocks can be used to get block information and verify transactions DefaultBlockCacheSize = 1000 + + // MonitoringErrHandlerRoutineTimeout is the timeout for the handleMonitoring routine that waits for an error from the monitorVote channel + MonitoringErrHandlerRoutineTimeout = 5 * time.Minute ) // Observer is the base structure for chain observers, grouping the common logic for each chain observer client. @@ -76,6 +82,8 @@ type Observer struct { // stop is the channel to signal the observer to stop stop chan struct{} + + forceResetLastScanned bool } // NewObserver creates a new base observer. @@ -95,19 +103,20 @@ func NewObserver( } return &Observer{ - chain: chain, - chainParams: chainParams, - zetacoreClient: zetacoreClient, - tssSigner: tssSigner, - lastBlock: 0, - lastBlockScanned: 0, - lastTxScanned: "", - ts: ts, - db: database, - blockCache: blockCache, - mu: &sync.Mutex{}, - logger: newObserverLogger(chain, logger), - stop: make(chan struct{}), + chain: chain, + chainParams: chainParams, + zetacoreClient: zetacoreClient, + tssSigner: tssSigner, + lastBlock: 0, + lastBlockScanned: 0, + lastTxScanned: "", + ts: ts, + db: database, + blockCache: blockCache, + mu: &sync.Mutex{}, + logger: newObserverLogger(chain, logger), + stop: make(chan struct{}), + forceResetLastScanned: false, }, nil } @@ -215,15 +224,31 @@ func (ob *Observer) WithLastBlock(lastBlock uint64) *Observer { // LastBlockScanned get last block scanned (not necessarily caught up with the chain; could be slow/paused). func (ob *Observer) LastBlockScanned() uint64 { + ob.mu.Lock() + defer ob.mu.Unlock() height := atomic.LoadUint64(&ob.lastBlockScanned) return height } // WithLastBlockScanned set last block scanned (not necessarily caught up with the chain; could be slow/paused). -func (ob *Observer) WithLastBlockScanned(blockNumber uint64) *Observer { +// it also set the value of forceResetLastScanned and returns the previous value. +// If forceResetLastScanned was true before, it means the monitoring thread would have updated it and so it skips updating the last scanned block. +func (ob *Observer) WithLastBlockScanned(blockNumber uint64, forceResetLastScanned bool) (*Observer, bool) { + ob.mu.Lock() + defer ob.mu.Unlock() + + wasForceReset := ob.forceResetLastScanned + ob.forceResetLastScanned = forceResetLastScanned + + // forceResetLastScanned was set to true before; it means the monitoring thread would have updated it + // In this case we should not update the last scanned block and just return + if wasForceReset && !forceResetLastScanned { + return ob, wasForceReset + } + atomic.StoreUint64(&ob.lastBlockScanned, blockNumber) metrics.LastScannedBlockNumber.WithLabelValues(ob.chain.Name).Set(float64(blockNumber)) - return ob + return ob, wasForceReset } // LastTxScanned get last transaction scanned. @@ -299,7 +324,7 @@ func (ob *Observer) LoadLastBlockScanned() error { if err != nil { return errors.Wrapf(err, "unable to parse block number from ENV %s=%s", envvar, scanFromBlock) } - ob.WithLastBlockScanned(blockNumber) + ob.WithLastBlockScanned(blockNumber, false) return nil } @@ -309,14 +334,28 @@ func (ob *Observer) LoadLastBlockScanned() error { logger.Info().Msg("last scanned block not found in the database") return nil } - ob.WithLastBlockScanned(blockNumber) + ob.WithLastBlockScanned(blockNumber, false) return nil } // SaveLastBlockScanned saves the last scanned block to memory and database. func (ob *Observer) SaveLastBlockScanned(blockNumber uint64) error { - ob.WithLastBlockScanned(blockNumber) + _, forceResetLastScannedBeforeUpdate := ob.WithLastBlockScanned(blockNumber, false) + if forceResetLastScannedBeforeUpdate { + return nil + } + return ob.WriteLastBlockScannedToDB(blockNumber) +} + +// ForceSaveLastBlockScanned saves the last scanned block to memory if the new blocknumber is less than the current last scanned block. +// It also forces the update of the last scanned block in the database, to makes sure any other the block gets rescanned. +func (ob *Observer) ForceSaveLastBlockScanned(blockNumber uint64) error { + currentLastScanned := ob.LastBlockScanned() + if blockNumber > currentLastScanned { + return nil + } + ob.WithLastBlockScanned(blockNumber, true) return ob.WriteLastBlockScannedToDB(blockNumber) } @@ -370,7 +409,7 @@ func (ob *Observer) SaveLastTxScanned(txHash string, slot uint64) error { ob.WithLastTxScanned(txHash) // update last_scanned_block_number metrics - ob.WithLastBlockScanned(slot) + ob.WithLastBlockScanned(slot, false) return ob.WriteLastTxScannedToDB(txHash) } @@ -433,8 +472,15 @@ func (ob *Observer) PostVoteInbound( return "", nil } + monitorErrCh := make(chan zetaerrors.ErrTxMonitor, 1) + + // ctxWithTimeout is a context with timeout used for monitoring the vote transaction + // Note: the canceller is not used because we want to allow the goroutines to run until they time out + ctxWithTimeout, _ := zctx.CopyWithTimeout(ctx, context.Background(), MonitoringErrHandlerRoutineTimeout) + // post vote to zetacore - zetaHash, ballot, err := ob.ZetacoreClient().PostVoteInbound(ctx, gasLimit, retryGasLimit, msg) + zetaHash, ballot, err := ob.ZetacoreClient(). + PostVoteInbound(ctxWithTimeout, gasLimit, retryGasLimit, msg, monitorErrCh) logger = logger.With(). Str(logs.FieldZetaTx, zetaHash). @@ -451,9 +497,51 @@ func (ob *Observer) PostVoteInbound( logger.Info().Msg("inbound detected: vote posted") } + go func() { + ob.handleMonitoringError(ctxWithTimeout, monitorErrCh, zetaHash) + }() + return ballot, nil } +func (ob *Observer) handleMonitoringError( + ctx context.Context, + monitorErrCh <-chan zetaerrors.ErrTxMonitor, + zetaHash string, +) { + logger := ob.logger.Inbound + defer func() { + if r := recover(); r != nil { + logger.Error().Any("panic", r).Msg("recovered from panic in monitoring error handler") + } + }() + + select { + case monitorErr := <-monitorErrCh: + if monitorErr.Err != nil { + logger.Error(). + Err(monitorErr). + Str(logs.FieldZetaTx, monitorErr.ZetaTxHash). + Str(logs.FieldBallotIndex, monitorErr.BallotIndex). + Uint64(logs.FieldBlock, monitorErr.InboundBlockHeight). + Msg("error monitoring vote transaction") + + if monitorErr.InboundBlockHeight > 0 { + err := ob.ForceSaveLastBlockScanned(monitorErr.InboundBlockHeight - 1) + if err != nil { + logger.Error().Err(err). + Str(logs.FieldZetaTx, monitorErr.ZetaTxHash). + Msg("unable to save last scanned block after monitoring error") + } + } + } + case <-ctx.Done(): + logger.Debug(). + Str(logs.FieldZetaTx, zetaHash). + Msg("no error received for the monitoring, the transaction likely succeeded") + } +} + // EnvVarLatestBlockByChain returns the environment variable for the last block by chain. func EnvVarLatestBlockByChain(chain chains.Chain) string { return fmt.Sprintf("CHAIN_%d_SCAN_FROM_BLOCK", chain.ChainId) diff --git a/zetaclient/chains/base/observer_test.go b/zetaclient/chains/base/observer_test.go index fe9a8015f4..6275e33a84 100644 --- a/zetaclient/chains/base/observer_test.go +++ b/zetaclient/chains/base/observer_test.go @@ -193,7 +193,7 @@ func TestObserverGetterAndSetter(t *testing.T) { // update last block scanned newLastBlockScanned := uint64(100) - ob.Observer.WithLastBlockScanned(newLastBlockScanned) + ob.Observer.WithLastBlockScanned(newLastBlockScanned, false) require.Equal(t, newLastBlockScanned, ob.LastBlockScanned()) }) diff --git a/zetaclient/chains/bitcoin/observer/db.go b/zetaclient/chains/bitcoin/observer/db.go index 71b406afec..b4e6830d53 100644 --- a/zetaclient/chains/bitcoin/observer/db.go +++ b/zetaclient/chains/bitcoin/observer/db.go @@ -47,12 +47,12 @@ func (ob *Observer) LoadLastBlockScanned(ctx context.Context) error { return errors.Wrap(err, "unable to get block count") } // #nosec G115 always positive - ob.WithLastBlockScanned(uint64(blockNumber)) + ob.WithLastBlockScanned(uint64(blockNumber), false) } // bitcoin regtest starts from hardcoded block 100 if chains.IsBitcoinRegnet(ob.Chain().ChainId) { - ob.WithLastBlockScanned(RegnetStartBlock) + ob.WithLastBlockScanned(RegnetStartBlock, false) } ob.Logger().Chain.Info().Uint64("last_block_scanned", ob.LastBlockScanned()).Send() diff --git a/zetaclient/chains/bitcoin/observer/db_test.go b/zetaclient/chains/bitcoin/observer/db_test.go index 578980ba80..f9ded479b1 100644 --- a/zetaclient/chains/bitcoin/observer/db_test.go +++ b/zetaclient/chains/bitcoin/observer/db_test.go @@ -96,7 +96,7 @@ func Test_LoadLastBlockScanned(t *testing.T) { obOther := newTestSuite(t, chain) // reset last block scanned to 0 so that it will be loaded from RPC - obOther.WithLastBlockScanned(0) + obOther.WithLastBlockScanned(0, false) // attach a mock btc client that returns rpc error obOther.client.ExpectedCalls = nil diff --git a/zetaclient/chains/evm/observer/observer.go b/zetaclient/chains/evm/observer/observer.go index bfc7fc05b3..3e1f6d8cd4 100644 --- a/zetaclient/chains/evm/observer/observer.go +++ b/zetaclient/chains/evm/observer/observer.go @@ -261,7 +261,7 @@ func (ob *Observer) loadLastBlockScanned(ctx context.Context) error { if err != nil { return errors.Wrapf(err, "error BlockNumber for chain %d", ob.Chain().ChainId) } - ob.WithLastBlockScanned(blockNumber) + ob.WithLastBlockScanned(blockNumber, false) } ob.Logger().Chain.Info(). Uint64("last_block_scanned", ob.LastBlockScanned()). diff --git a/zetaclient/chains/evm/observer/observer_test.go b/zetaclient/chains/evm/observer/observer_test.go index a5bb27e58d..b1c20aa943 100644 --- a/zetaclient/chains/evm/observer/observer_test.go +++ b/zetaclient/chains/evm/observer/observer_test.go @@ -238,7 +238,7 @@ func Test_LoadLastBlockScanned(t *testing.T) { obOther := newTestSuite(t) // reset last block scanned to 0 so that it will be loaded from RPC - obOther.WithLastBlockScanned(0) + obOther.WithLastBlockScanned(0, false) // attach mock evm client to observer obOther.evmMock.On("BlockNumber", mock.Anything).Unset() diff --git a/zetaclient/chains/interfaces/zetacore.go b/zetaclient/chains/interfaces/zetacore.go index fb004b42af..41f700cbc9 100644 --- a/zetaclient/chains/interfaces/zetacore.go +++ b/zetaclient/chains/interfaces/zetacore.go @@ -10,6 +10,7 @@ import ( "github.com/zeta-chain/go-tss/blame" "github.com/zeta-chain/node/pkg/chains" + zetaerrors "github.com/zeta-chain/node/pkg/errors" crosschain "github.com/zeta-chain/node/x/crosschain/types" fungible "github.com/zeta-chain/node/x/fungible/types" observer "github.com/zeta-chain/node/x/observer/types" @@ -49,6 +50,7 @@ type ZetacoreWriter interface { gasLimit uint64, retryGasLimit uint64, _ *crosschain.MsgVoteInbound, + monitorErrCh chan<- zetaerrors.ErrTxMonitor, ) (string, string, error) PostOutboundTracker(_ context.Context, diff --git a/zetaclient/chains/solana/observer/inbound.go b/zetaclient/chains/solana/observer/inbound.go index b9f6961b31..9fee860d4d 100644 --- a/zetaclient/chains/solana/observer/inbound.go +++ b/zetaclient/chains/solana/observer/inbound.go @@ -54,7 +54,7 @@ func (ob *Observer) ObserveInbound(ctx context.Context) error { // update metrics if no new signatures found if len(signatures) == 0 { if errSlot == nil { - ob.WithLastBlockScanned(lastSlot) + ob.WithLastBlockScanned(lastSlot, false) } } else { ob.Logger().Inbound.Info(). diff --git a/zetaclient/chains/sui/observer/observer_test.go b/zetaclient/chains/sui/observer/observer_test.go index 95cf506cfa..1658d250b1 100644 --- a/zetaclient/chains/sui/observer/observer_test.go +++ b/zetaclient/chains/sui/observer/observer_test.go @@ -15,6 +15,7 @@ import ( "github.com/zeta-chain/node/pkg/chains" "github.com/zeta-chain/node/pkg/coin" "github.com/zeta-chain/node/pkg/contracts/sui" + zetaerrors "github.com/zeta-chain/node/pkg/errors" "github.com/zeta-chain/node/testutil/sample" cctypes "github.com/zeta-chain/node/x/crosschain/types" "github.com/zeta-chain/node/zetaclient/chains/base" @@ -602,13 +603,13 @@ func (ts *testSuite) MockGetTxOnce(tx models.SuiTransactionBlockResponse) { } func (ts *testSuite) CatchInboundVotes() { - callback := func(_ context.Context, _, _ uint64, msg *cctypes.MsgVoteInbound) (string, string, error) { + callback := func(_ context.Context, _, _ uint64, msg *cctypes.MsgVoteInbound, _ chan<- zetaerrors.ErrTxMonitor) (string, string, error) { ts.inboundVotesBag = append(ts.inboundVotesBag, msg) return "", "", nil } ts.zetaMock. - On("PostVoteInbound", mock.Anything, mock.Anything, mock.Anything, mock.Anything). + On("PostVoteInbound", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(callback). Maybe() } diff --git a/zetaclient/chains/ton/observer/observer_test.go b/zetaclient/chains/ton/observer/observer_test.go index 8ffce7dbcb..16419538da 100644 --- a/zetaclient/chains/ton/observer/observer_test.go +++ b/zetaclient/chains/ton/observer/observer_test.go @@ -244,7 +244,7 @@ func setupVotesBag(ts *testSuite) { ts.votesBag = append(ts.votesBag, cctx) } ts.zetacore. - On("PostVoteInbound", ts.ctx, mock.Anything, mock.Anything, mock.Anything). + On("PostVoteInbound", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Maybe(). Run(catcher). Return("", "", nil) // zeta hash, ballot index, error diff --git a/zetaclient/context/context.go b/zetaclient/context/context.go index ee45eae58c..6e53e79473 100644 --- a/zetaclient/context/context.go +++ b/zetaclient/context/context.go @@ -2,6 +2,7 @@ package context import ( goctx "context" + "time" "github.com/pkg/errors" ) @@ -36,3 +37,14 @@ func Copy(from, to goctx.Context) goctx.Context { return WithAppContext(to, app) } + +// CopyWithTimeout copies AppContext from one context to another and adds a timeout. +// This is useful when you want to run something in another goroutine with a timeout. +func CopyWithTimeout(from, to goctx.Context, timeout time.Duration) (goctx.Context, goctx.CancelFunc) { + app, err := FromContext(from) + if err != nil { + return goctx.WithTimeout(to, timeout) + } + ctxWithTimeout, cancel := goctx.WithTimeout(to, timeout) + return WithAppContext(ctxWithTimeout, app), cancel +} diff --git a/zetaclient/context/context_test.go b/zetaclient/context/context_test.go index ad6129157d..0af44b34f0 100644 --- a/zetaclient/context/context_test.go +++ b/zetaclient/context/context_test.go @@ -3,6 +3,7 @@ package context_test import ( goctx "context" "testing" + "time" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" @@ -55,3 +56,33 @@ func TestCopy(t *testing.T) { assert.NotNil(t, app2) assert.Equal(t, app, app2) } + +func TestCopyWithTimeout(t *testing.T) { + // ARRANGE + var ( + app = context.New(config.New(false), nil, zerolog.Nop()) + ctx1 = context.WithAppContext(goctx.Background(), app) + timeout = 500 * time.Millisecond + ) + + // ACT + ctx2, cancel := context.CopyWithTimeout(ctx1, goctx.Background(), timeout) + defer cancel() + + // ASSERT + // Verify that AppContext is copied correctly + app2, err := context.FromContext(ctx2) + assert.NoError(t, err) + assert.NotNil(t, app2) + assert.Equal(t, app, app2) + + // Verify that timeout is working + start := time.Now() + <-ctx2.Done() + elapsed := time.Since(start) + + // The context should be cancelled after approximately the timeout duration + assert.True(t, elapsed >= timeout, "context should be not cancelled too early") + assert.True(t, elapsed < timeout*2, "context should not be cancelled too late") + assert.ErrorIs(t, ctx2.Err(), goctx.DeadlineExceeded) +} diff --git a/zetaclient/testutils/mocks/zetacore_client.go b/zetaclient/testutils/mocks/zetacore_client.go index 4bdb14204a..15e9f8bcc9 100644 --- a/zetaclient/testutils/mocks/zetacore_client.go +++ b/zetaclient/testutils/mocks/zetacore_client.go @@ -14,6 +14,8 @@ import ( crosschaintypes "github.com/zeta-chain/node/x/crosschain/types" + errors "github.com/zeta-chain/node/pkg/errors" + fungibletypes "github.com/zeta-chain/node/x/fungible/types" keysinterfaces "github.com/zeta-chain/node/zetaclient/keys/interfaces" @@ -921,9 +923,9 @@ func (_m *ZetacoreClient) PostVoteGasPrice(_a0 context.Context, _a1 chains.Chain return r0, r1 } -// PostVoteInbound provides a mock function with given fields: _a0, gasLimit, retryGasLimit, _a3 -func (_m *ZetacoreClient) PostVoteInbound(_a0 context.Context, gasLimit uint64, retryGasLimit uint64, _a3 *crosschaintypes.MsgVoteInbound) (string, string, error) { - ret := _m.Called(_a0, gasLimit, retryGasLimit, _a3) +// PostVoteInbound provides a mock function with given fields: _a0, gasLimit, retryGasLimit, _a3, monitorErrCh +func (_m *ZetacoreClient) PostVoteInbound(_a0 context.Context, gasLimit uint64, retryGasLimit uint64, _a3 *crosschaintypes.MsgVoteInbound, monitorErrCh chan<- errors.ErrTxMonitor) (string, string, error) { + ret := _m.Called(_a0, gasLimit, retryGasLimit, _a3, monitorErrCh) if len(ret) == 0 { panic("no return value specified for PostVoteInbound") @@ -932,23 +934,23 @@ func (_m *ZetacoreClient) PostVoteInbound(_a0 context.Context, gasLimit uint64, var r0 string var r1 string var r2 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64, *crosschaintypes.MsgVoteInbound) (string, string, error)); ok { - return rf(_a0, gasLimit, retryGasLimit, _a3) + if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64, *crosschaintypes.MsgVoteInbound, chan<- errors.ErrTxMonitor) (string, string, error)); ok { + return rf(_a0, gasLimit, retryGasLimit, _a3, monitorErrCh) } - if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64, *crosschaintypes.MsgVoteInbound) string); ok { - r0 = rf(_a0, gasLimit, retryGasLimit, _a3) + if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64, *crosschaintypes.MsgVoteInbound, chan<- errors.ErrTxMonitor) string); ok { + r0 = rf(_a0, gasLimit, retryGasLimit, _a3, monitorErrCh) } else { r0 = ret.Get(0).(string) } - if rf, ok := ret.Get(1).(func(context.Context, uint64, uint64, *crosschaintypes.MsgVoteInbound) string); ok { - r1 = rf(_a0, gasLimit, retryGasLimit, _a3) + if rf, ok := ret.Get(1).(func(context.Context, uint64, uint64, *crosschaintypes.MsgVoteInbound, chan<- errors.ErrTxMonitor) string); ok { + r1 = rf(_a0, gasLimit, retryGasLimit, _a3, monitorErrCh) } else { r1 = ret.Get(1).(string) } - if rf, ok := ret.Get(2).(func(context.Context, uint64, uint64, *crosschaintypes.MsgVoteInbound) error); ok { - r2 = rf(_a0, gasLimit, retryGasLimit, _a3) + if rf, ok := ret.Get(2).(func(context.Context, uint64, uint64, *crosschaintypes.MsgVoteInbound, chan<- errors.ErrTxMonitor) error); ok { + r2 = rf(_a0, gasLimit, retryGasLimit, _a3, monitorErrCh) } else { r2 = ret.Error(2) } diff --git a/zetaclient/testutils/mocks/zetacore_client_opts.go b/zetaclient/testutils/mocks/zetacore_client_opts.go index 3eb06f5c3f..96542a573d 100644 --- a/zetaclient/testutils/mocks/zetacore_client_opts.go +++ b/zetaclient/testutils/mocks/zetacore_client_opts.go @@ -47,7 +47,7 @@ func (_m *ZetacoreClient) WithPostOutboundTracker(zetaTxHash string) *ZetacoreCl } func (_m *ZetacoreClient) WithPostVoteInbound(zetaTxHash string, ballotIndex string) *ZetacoreClient { - _m.On("PostVoteInbound", mock.Anything, mock.Anything, mock.Anything, mock.Anything). + _m.On("PostVoteInbound", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Maybe(). Return(zetaTxHash, ballotIndex, nil) diff --git a/zetaclient/zetacore/client_monitor.go b/zetaclient/zetacore/client_monitor.go index 63ce44e92b..5f833a9007 100644 --- a/zetaclient/zetacore/client_monitor.go +++ b/zetaclient/zetacore/client_monitor.go @@ -9,6 +9,7 @@ import ( "github.com/pkg/errors" "github.com/zeta-chain/node/pkg/constant" + zetaerrors "github.com/zeta-chain/node/pkg/errors" "github.com/zeta-chain/node/pkg/retry" "github.com/zeta-chain/node/x/crosschain/types" "github.com/zeta-chain/node/zetaclient/logs" @@ -102,6 +103,7 @@ func (c *Client) MonitorVoteInboundResult( zetaTxHash string, retryGasLimit uint64, msg *types.MsgVoteInbound, + monitorErrCh chan<- zetaerrors.ErrTxMonitor, ) error { logger := c.logger.With().Str(logs.FieldZetaTx, zetaTxHash).Logger() @@ -112,15 +114,19 @@ func (c *Client) MonitorVoteInboundResult( }() call := func() error { - err := c.monitorVoteInboundResult(ctx, zetaTxHash, retryGasLimit, msg) + err := c.monitorVoteInboundResult(ctx, zetaTxHash, retryGasLimit, msg, monitorErrCh) // force retry on err return retry.Retry(err) } + // 10 attempts, 2 seconds, 4 seconds max + // This will retry for a maximum of ~40 seconds with exponential backoff, + // However, this call is recursive for up to 1 layer and so the maxim time this can take is ~80 seconds err := retryWithBackoff(call, monitorRetryCount, monitorInterval, monitorInterval*2) if err != nil { - logger.Error().Err(err).Msg("unable to query tx result") + // All errors are forced to be retryable, we only return an error if the tx result cannot be queried + logger.Error().Err(err).Str(logs.FieldZetaTx, zetaTxHash).Msg("unable to query tx result") return err } @@ -132,6 +138,7 @@ func (c *Client) monitorVoteInboundResult( zetaTxHash string, retryGasLimit uint64, msg *types.MsgVoteInbound, + monitorErrCh chan<- zetaerrors.ErrTxMonitor, ) error { // query tx result from ZetaChain txResult, err := c.QueryTxResult(zetaTxHash) @@ -141,6 +148,8 @@ func (c *Client) monitorVoteInboundResult( logger := c.logger.With().Str("inbound_raw_log", txResult.RawLog).Logger() + // There is no error returned from here which mean the MonitorVoteInboundResult would return nil and no error is posted to monitorErrCh + // However the channel is passed to the subsequent call, which can post an error to the channel if the "execute" vote fails. switch { case strings.Contains(txResult.RawLog, "failed to execute message"): // the inbound vote tx shouldn't fail to execute. this shouldn't happen @@ -151,7 +160,7 @@ func (c *Client) monitorVoteInboundResult( logger.Debug().Str(logs.FieldZetaTx, zetaTxHash).Msg("out of gas") if retryGasLimit > 0 { // new retryGasLimit set to 0 to prevent reentering this function - resentZetaTxHash, _, err := c.PostVoteInbound(ctx, retryGasLimit, 0, msg) + resentZetaTxHash, _, err := c.PostVoteInbound(ctx, retryGasLimit, 0, msg, monitorErrCh) if err != nil { logger.Error().Err(err).Str(logs.FieldZetaTx, zetaTxHash).Msg("failed to resend tx") } else { diff --git a/zetaclient/zetacore/client_vote.go b/zetaclient/zetacore/client_vote.go index c3e4306f90..6af9085c6a 100644 --- a/zetaclient/zetacore/client_vote.go +++ b/zetaclient/zetacore/client_vote.go @@ -7,6 +7,7 @@ import ( "github.com/zeta-chain/go-tss/blame" "github.com/zeta-chain/node/pkg/chains" + zetaerrors "github.com/zeta-chain/node/pkg/errors" "github.com/zeta-chain/node/pkg/retry" "github.com/zeta-chain/node/x/crosschain/types" observerclient "github.com/zeta-chain/node/x/observer/client/cli" @@ -155,6 +156,7 @@ func (c *Client) PostVoteInbound( ctx context.Context, gasLimit, retryGasLimit uint64, msg *types.MsgVoteInbound, + monitorErrCh chan<- zetaerrors.ErrTxMonitor, ) (string, string, error) { // zetaclient patch // force use SAFE mode for all inbound votes (both fast and slow votes) @@ -188,11 +190,24 @@ func (c *Client) PostVoteInbound( } go func() { - ctxForWorker := zctx.Copy(ctx, context.Background()) - - errMonitor := c.MonitorVoteInboundResult(ctxForWorker, zetaTxHash, retryGasLimit, msg) + // Use the passed context directly instead of creating a new one + // This ensures the monitoring goroutine respects the same timeout as the error handler + errMonitor := c.MonitorVoteInboundResult(ctx, zetaTxHash, retryGasLimit, msg, monitorErrCh) if errMonitor != nil { c.logger.Error().Err(errMonitor).Msg("failed to monitor vote inbound result") + + if monitorErrCh != nil { + select { + case monitorErrCh <- zetaerrors.ErrTxMonitor{ + Err: errMonitor, + InboundBlockHeight: msg.InboundBlockHeight, + ZetaTxHash: zetaTxHash, + BallotIndex: ballotIndex, + }: + case <-ctx.Done(): + c.logger.Error().Msg("context cancelled: timeout") + } + } } }() diff --git a/zetaclient/zetacore/tx_test.go b/zetaclient/zetacore/tx_test.go index 20569bf12f..110e356eab 100644 --- a/zetaclient/zetacore/tx_test.go +++ b/zetaclient/zetacore/tx_test.go @@ -239,7 +239,7 @@ func TestZetacore_PostVoteInbound(t *testing.T) { t.Run("post inbound vote already voted", func(t *testing.T) { hash, _, err := client.PostVoteInbound(ctx, 100, 200, &crosschaintypes.MsgVoteInbound{ Creator: address.String(), - }) + }, nil) require.NoError(t, err) require.Equal(t, sampleHash, hash) }) @@ -281,7 +281,7 @@ func TestZetacore_MonitorVoteInboundResult(t *testing.T) { t.Run("monitor inbound vote", func(t *testing.T) { err := client.MonitorVoteInboundResult(ctx, sampleHash, 1000, &crosschaintypes.MsgVoteInbound{ Creator: address.String(), - }) + }, nil) require.NoError(t, err) })