Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/confirmations/confirmations.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Manager interface {
NewBlockHashes() chan<- *ffcapi.BlockHashEvent
CheckInFlight(listenerID *fftypes.UUID) bool
StartConfirmedBlockListener(ctx context.Context, id *fftypes.UUID, fromBlock string, checkpoint *ffcapi.BlockListenerCheckpoint, eventStream chan<- *ffcapi.ListenerEvent) error
StartBlockConfirmationsListener(ctx context.Context, id *fftypes.UUID, fromBlock string, checkpoint *ffcapi.BlockListenerCheckpoint, eventStream chan<- *apitypes.ConfirmationsForListenerEvent) error
Comment on lines 48 to +49
Copy link
Contributor

@EnriqueL8 EnriqueL8 Jul 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth a comment to explain the difference, I'm still torn and think the confirmations context should be included in the ListenerEvent?

StopConfirmedBlockListener(ctx context.Context, id *fftypes.UUID) error
}

Expand Down
185 changes: 131 additions & 54 deletions internal/confirmations/confirmed_block_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,45 +43,65 @@ import (
// This implementation is thus deliberately simple assuming that when instability is found
// in the notifications it can simply wipe out its view and start again.
type confirmedBlockListener struct {
bcm *blockConfirmationManager
ctx context.Context
cancelFunc func()
id *fftypes.UUID
stateLock sync.Mutex
fromBlock uint64
waitingForFromBlock bool
rollingCheckpoint *ffcapi.BlockListenerCheckpoint
blocksSinceCheckpoint []*apitypes.BlockInfo
newHeadToAdd []*apitypes.BlockInfo // used by the notification routine when there are new blocks that add directly onto the end of the blocksSinceCheckpoint
newBlockHashes chan *ffcapi.BlockHashEvent
dispatcherTap chan struct{}
eventStream chan<- *ffcapi.ListenerEvent
connector ffcapi.API
requiredConfirmations int
retry *retry.Retry
processorDone chan struct{}
dispatcherDone chan struct{}
bcm *blockConfirmationManager
ctx context.Context
cancelFunc func()
id *fftypes.UUID
stateLock sync.Mutex
fromBlock uint64
waitingForFromBlock bool
rollingCheckpoint *ffcapi.BlockListenerCheckpoint
blocksSinceCheckpoint []*apitypes.BlockInfo
newHeadToAdd []*apitypes.BlockInfo // used by the notification routine when there are new blocks that add directly onto the end of the blocksSinceCheckpoint
newBlockHashes chan *ffcapi.BlockHashEvent
dispatcherTap chan struct{}
blockEventOutputChannel chan<- *ffcapi.ListenerEvent
connector ffcapi.API
requiredConfirmations int
retry *retry.Retry
processorDone chan struct{}
dispatcherDone chan struct{}

// attributes that are used for confirmation streaming only
streamConfirmations bool // whether this listener is for confirmations or just block events
confirmationsOutputChannel chan<- *apitypes.ConfirmationsForListenerEvent
reOrgedSinceDispatch bool // (relies on stateLock mutex) whether the canonical chain has been re-orged since the last confirmation dispatch
}

func (bcm *blockConfirmationManager) StartConfirmedBlockListener(ctx context.Context, id *fftypes.UUID, fromBlock string, checkpoint *ffcapi.BlockListenerCheckpoint, eventStream chan<- *ffcapi.ListenerEvent) error {
_, err := bcm.startConfirmedBlockListener(ctx, id, fromBlock, checkpoint, eventStream)
_, err := bcm.startConfirmedBlockListener(ctx, id, fromBlock, checkpoint, eventStream, nil)
return err
}

func (bcm *blockConfirmationManager) startConfirmedBlockListener(fgCtx context.Context, id *fftypes.UUID, fromBlock string, checkpoint *ffcapi.BlockListenerCheckpoint, eventStream chan<- *ffcapi.ListenerEvent) (cbl *confirmedBlockListener, err error) {
func (bcm *blockConfirmationManager) StartBlockConfirmationsListener(ctx context.Context, id *fftypes.UUID, fromBlock string, checkpoint *ffcapi.BlockListenerCheckpoint, eventStream chan<- *apitypes.ConfirmationsForListenerEvent) error {
_, err := bcm.startConfirmedBlockListener(ctx, id, fromBlock, checkpoint, nil, eventStream)
return err
}

func (bcm *blockConfirmationManager) startConfirmedBlockListener(fgCtx context.Context, id *fftypes.UUID, fromBlock string, checkpoint *ffcapi.BlockListenerCheckpoint, blockEventOutputChannel chan<- *ffcapi.ListenerEvent, confirmationsOutputChannel chan<- *apitypes.ConfirmationsForListenerEvent) (cbl *confirmedBlockListener, err error) { //nolint:unparam

if blockEventOutputChannel == nil && confirmationsOutputChannel == nil {
return nil, i18n.NewError(fgCtx, tmmsgs.MsgBlockListenerNoOutputChannel)
}
if blockEventOutputChannel != nil && confirmationsOutputChannel != nil {
return nil, i18n.NewError(fgCtx, tmmsgs.MsgBlockListenerBothOutputChannels)
}

cbl = &confirmedBlockListener{
bcm: bcm,
// We need our own listener for each confirmed block stream, and the bcm has to fan out
newBlockHashes: make(chan *ffcapi.BlockHashEvent, config.GetInt(tmconfig.ConfirmationsBlockQueueLength)),
dispatcherTap: make(chan struct{}, 1),
id: id,
eventStream: eventStream,
requiredConfirmations: bcm.requiredConfirmations,
connector: bcm.connector,
retry: bcm.retry,
rollingCheckpoint: checkpoint,
processorDone: make(chan struct{}),
dispatcherDone: make(chan struct{}),
newBlockHashes: make(chan *ffcapi.BlockHashEvent, config.GetInt(tmconfig.ConfirmationsBlockQueueLength)),
dispatcherTap: make(chan struct{}, 1),
id: id,
blockEventOutputChannel: blockEventOutputChannel,
confirmationsOutputChannel: confirmationsOutputChannel,
streamConfirmations: confirmationsOutputChannel != nil,
requiredConfirmations: bcm.requiredConfirmations,
connector: bcm.connector,
retry: bcm.retry,
rollingCheckpoint: checkpoint,
processorDone: make(chan struct{}),
dispatcherDone: make(chan struct{}),
}
cbl.ctx, cbl.cancelFunc = context.WithCancel(bcm.ctx)
// add a log context for this specific confirmation manager (as there are many within the )
Expand Down Expand Up @@ -252,7 +272,7 @@ func (cbl *confirmedBlockListener) dispatcher() {
// spin getting blocks until we it looks like we need to wait for a notification
lastFromNotification := false
for cbl.readNextBlock(&lastFromNotification) {
cbl.dispatchAllConfirmed()
cbl.dispatchEventsToOutputChannel()
}
}

Expand Down Expand Up @@ -342,39 +362,96 @@ func (cbl *confirmedBlockListener) readNextBlock(lastFromNotification *bool) (fo

}

func (cbl *confirmedBlockListener) dispatchAllConfirmed() {
for {
var toDispatch *ffcapi.ListenerEvent
cbl.stateLock.Lock()
if len(cbl.blocksSinceCheckpoint) > cbl.requiredConfirmations {
block := cbl.blocksSinceCheckpoint[0]
// don't want memory to grow indefinitely by shifting right, so we create a new slice here
cbl.blocksSinceCheckpoint = append([]*apitypes.BlockInfo{}, cbl.blocksSinceCheckpoint[1:]...)
func (cbl *confirmedBlockListener) dispatchEventsToOutputChannel() {
cbl.stateLock.Lock()

totalBlocks := len(cbl.blocksSinceCheckpoint)
earliestUncomfirmedBlockIndex := totalBlocks - cbl.requiredConfirmations
if earliestUncomfirmedBlockIndex < 0 {
earliestUncomfirmedBlockIndex = 0
}

for i, block := range cbl.blocksSinceCheckpoint {
var toDispatch *apitypes.ConfirmationsForListenerEvent
cbEvent := &ffcapi.ListenerEvent{
BlockEvent: &ffcapi.BlockEvent{
ListenerID: cbl.id,
BlockInfo: ffcapi.BlockInfo{
//nolint:gosec
BlockNumber: fftypes.NewFFBigInt(int64(block.BlockNumber)),
BlockHash: block.BlockHash,
ParentHash: block.ParentHash,
TransactionHashes: block.TransactionHashes,
},
},
Checkpoint: cbl.rollingCheckpoint,
}
confirmationBlocks := []*apitypes.BlockInfo{}
if cbl.streamConfirmations {
if i < totalBlocks-1 {
// build up the array when the current block is not the last one
confirmationEndingIndex := i + cbl.requiredConfirmations + 1
if confirmationEndingIndex > totalBlocks {
confirmationEndingIndex = totalBlocks
}

confirmationBlocks = cbl.blocksSinceCheckpoint[i+1 : confirmationEndingIndex]
}
}
if i < earliestUncomfirmedBlockIndex || cbl.requiredConfirmations == 0 {
// this block is confirmed
toDispatch = &apitypes.ConfirmationsForListenerEvent{
Event: cbEvent,
}
cbl.rollingCheckpoint = &ffcapi.BlockListenerCheckpoint{
Block: block.BlockNumber.Uint64(),
}
toDispatch = &ffcapi.ListenerEvent{
BlockEvent: &ffcapi.BlockEvent{
ListenerID: cbl.id,
BlockInfo: ffcapi.BlockInfo{
//nolint:gosec
BlockNumber: fftypes.NewFFBigInt(int64(block.BlockNumber)),
BlockHash: block.BlockHash,
ParentHash: block.ParentHash,
TransactionHashes: block.TransactionHashes,
// for confirmed blocks we always set the checkpoint to the current rolling checkpoint
cbEvent.Checkpoint = cbl.rollingCheckpoint
if cbl.streamConfirmations {
toDispatch.TargetConfirmationCount = cbl.requiredConfirmations
toDispatch.CurrentConfirmationCount = cbl.requiredConfirmations
toDispatch.ConfirmationsNotification = apitypes.ConfirmationsNotification{
Confirmed: true,
NewFork: cbl.reOrgedSinceDispatch,
Confirmations: apitypes.BlockInfosToConfirmations(confirmationBlocks),
}
}
} else if cbl.streamConfirmations {
// NOTE: we dispatch all blocks, even if they have 0 confirmations, which serves as the receipt of the block
// this block is not confirmed, only need to dispatch if we are streaming confirmations
// We are streaming confirmations, so we dispatch the confirmation event
toDispatch = &apitypes.ConfirmationsForListenerEvent{
Event: cbEvent,
ConfirmationContext: apitypes.ConfirmationContext{
ConfirmationsNotification: apitypes.ConfirmationsNotification{
Confirmed: false,
NewFork: cbl.reOrgedSinceDispatch,
Confirmations: apitypes.BlockInfosToConfirmations(confirmationBlocks),
},
TargetConfirmationCount: cbl.requiredConfirmations,
CurrentConfirmationCount: (totalBlocks - i) - 1,
},
Checkpoint: cbl.rollingCheckpoint,
}

}
cbl.stateLock.Unlock()
if toDispatch == nil {
return
}
log.L(cbl.ctx).Infof("Dispatching block %d/%s", toDispatch.BlockEvent.BlockNumber.Uint64(), toDispatch.BlockEvent.BlockHash)
select {
case cbl.eventStream <- toDispatch:
case <-cbl.ctx.Done():
log.L(cbl.ctx).Infof("Dispatching block %d/%s", toDispatch.Event.BlockEvent.BlockNumber.Uint64(), toDispatch.Event.BlockEvent.BlockHash)
if cbl.streamConfirmations {
select {
case cbl.confirmationsOutputChannel <- toDispatch:
case <-cbl.ctx.Done():
}
} else {
select {
case cbl.blockEventOutputChannel <- toDispatch.Event:
case <-cbl.ctx.Done():
}
}
}
cbl.blocksSinceCheckpoint = append([]*apitypes.BlockInfo{}, cbl.blocksSinceCheckpoint[earliestUncomfirmedBlockIndex:]...)
cbl.reOrgedSinceDispatch = false
cbl.stateLock.Unlock()
}
28 changes: 14 additions & 14 deletions internal/confirmations/confirmed_block_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestCBLCatchUpToHeadFromZeroNoConfirmations(t *testing.T) {
mbiNum.Run(func(args mock.Arguments) { mockBlockNumberReturn(mbiNum, args, blocks) })

bcm.requiredConfirmations = 0
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch)
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch, nil)
assert.NoError(t, err)

for i := 0; i < len(blocks)-bcm.requiredConfirmations; i++ {
Expand Down Expand Up @@ -70,7 +70,7 @@ func TestCBLCatchUpToHeadFromZeroWithConfirmations(t *testing.T) {
mbiNum.Run(func(args mock.Arguments) { mockBlockNumberReturn(mbiNum, args, blocks) })

bcm.requiredConfirmations = 5
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch)
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch, nil)
assert.NoError(t, err)

for i := 0; i < len(blocks)-bcm.requiredConfirmations; i++ {
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestCBLListenFromCurrentBlock(t *testing.T) {
mca.On("BlockInfoByNumber", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReasonNotFound, fmt.Errorf("not found")).Maybe()

bcm.requiredConfirmations = 5
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "", nil, esDispatch)
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "", nil, esDispatch, nil)
assert.NoError(t, err)

// Notify starting at block 5
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestCBLListenFromCurrentUsingCheckpointBlock(t *testing.T) {
bcm.requiredConfirmations = 5
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "", &ffcapi.BlockListenerCheckpoint{
Block: 12345,
}, esDispatch)
}, esDispatch, nil)
assert.NoError(t, err)

assert.False(t, cbl.waitingForFromBlock)
Expand Down Expand Up @@ -249,7 +249,7 @@ func testCBLHandleReorgInConfirmationWindow(t *testing.T, blockLenBeforeReorg, o
mbiHash.Run(func(args mock.Arguments) { mockBlockHashReturn(mbiHash, args, blocksAfterReorg) })

bcm.requiredConfirmations = reqConf
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch)
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch, nil)
assert.NoError(t, err)

for i := 0; i < len(blocksAfterReorg)-bcm.requiredConfirmations; i++ {
Expand Down Expand Up @@ -314,7 +314,7 @@ func TestCBLHandleRandomConflictingBlockNotification(t *testing.T) {
return req.BlockHash == randBlock.BlockHash
})).Return(randBlock, ffcapi.ErrorReason(""), nil)

cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch)
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch, nil)
assert.NoError(t, err)
cbl.requiredConfirmations = 5

Expand Down Expand Up @@ -342,7 +342,7 @@ func TestCBLDispatcherFallsBehindHead(t *testing.T) {
bcm.requiredConfirmations = 5

// Start a CBL, but then cancel the dispatcher
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "", nil, esDispatch)
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "", nil, esDispatch, nil)
assert.NoError(t, err)
cbl.cancelFunc()
<-cbl.dispatcherDone
Expand Down Expand Up @@ -385,7 +385,7 @@ func TestCBLStartBadFromBlock(t *testing.T) {

id := fftypes.NewUUID()

_, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "wrong", nil, esDispatch)
_, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "wrong", nil, esDispatch, nil)
assert.Regexp(t, "FF21090", err)

bcm.Stop()
Expand All @@ -409,11 +409,11 @@ func TestProcessBlockHashesSwallowsFailure(t *testing.T) {
func TestDispatchAllConfirmedNonBlocking(t *testing.T) {
bcm, _ := newTestBlockConfirmationManager()
cbl := &confirmedBlockListener{
id: fftypes.NewUUID(),
ctx: bcm.ctx,
bcm: bcm,
processorDone: make(chan struct{}),
eventStream: make(chan<- *ffcapi.ListenerEvent), // blocks indefinitely
id: fftypes.NewUUID(),
ctx: bcm.ctx,
bcm: bcm,
processorDone: make(chan struct{}),
blockEventOutputChannel: make(chan<- *ffcapi.ListenerEvent), // blocks indefinitely
}

cbl.requiredConfirmations = 0
Expand All @@ -423,7 +423,7 @@ func TestDispatchAllConfirmedNonBlocking(t *testing.T) {
waitForDispatchReturn := make(chan struct{})
go func() {
defer close(waitForDispatchReturn)
cbl.dispatchAllConfirmed()
cbl.dispatchEventsToOutputChannel()
}()

bcm.cancelFunc()
Expand Down
Loading
Loading