Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/confirmations/confirmations.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Manager interface {
NewBlockHashes() chan<- *ffcapi.BlockHashEvent
CheckInFlight(listenerID *fftypes.UUID) bool
StartConfirmedBlockListener(ctx context.Context, id *fftypes.UUID, fromBlock string, checkpoint *ffcapi.BlockListenerCheckpoint, eventStream chan<- *ffcapi.ListenerEvent) error
StartBlockConfirmationsListener(ctx context.Context, id *fftypes.UUID, fromBlock string, checkpoint *ffcapi.BlockListenerCheckpoint, eventStream chan<- *apitypes.ConfirmationsForListenerEvent) error
Comment on lines 48 to +49
Copy link
Contributor

@EnriqueL8 EnriqueL8 Jul 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth a comment to explain the difference, I'm still torn and think the confirmations context should be included in the ListenerEvent?

StopConfirmedBlockListener(ctx context.Context, id *fftypes.UUID) error
}

Expand Down
187 changes: 132 additions & 55 deletions internal/confirmations/confirmed_block_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,45 +43,65 @@ import (
// This implementation is thus deliberately simple assuming that when instability is found
// in the notifications it can simply wipe out its view and start again.
type confirmedBlockListener struct {
bcm *blockConfirmationManager
ctx context.Context
cancelFunc func()
id *fftypes.UUID
stateLock sync.Mutex
fromBlock uint64
waitingForFromBlock bool
rollingCheckpoint *ffcapi.BlockListenerCheckpoint
blocksSinceCheckpoint []*apitypes.BlockInfo
newHeadToAdd []*apitypes.BlockInfo // used by the notification routine when there are new blocks that add directly onto the end of the blocksSinceCheckpoint
newBlockHashes chan *ffcapi.BlockHashEvent
dispatcherTap chan struct{}
eventStream chan<- *ffcapi.ListenerEvent
connector ffcapi.API
requiredConfirmations int
retry *retry.Retry
processorDone chan struct{}
dispatcherDone chan struct{}
bcm *blockConfirmationManager
ctx context.Context
cancelFunc func()
id *fftypes.UUID
stateLock sync.Mutex
fromBlock uint64
waitingForFromBlock bool
rollingCheckpoint *ffcapi.BlockListenerCheckpoint
blocksSinceCheckpoint []*apitypes.BlockInfo
newHeadToAdd []*apitypes.BlockInfo // used by the notification routine when there are new blocks that add directly onto the end of the blocksSinceCheckpoint
newBlockHashes chan *ffcapi.BlockHashEvent
dispatcherTap chan struct{}
blockEventOutputChannel chan<- *ffcapi.ListenerEvent
connector ffcapi.API
requiredConfirmations int
retry *retry.Retry
processorDone chan struct{}
dispatcherDone chan struct{}

// attributes that are used for confirmation streaming only
streamConfirmations bool // whether this listener is for confirmations or just block events
confirmationsOutputChannel chan<- *apitypes.ConfirmationsForListenerEvent
reOrgedSinceDispatch bool // (relies on stateLock mutex) whether the canonical chain has been re-orged since the last confirmation dispatch
}

func (bcm *blockConfirmationManager) StartConfirmedBlockListener(ctx context.Context, id *fftypes.UUID, fromBlock string, checkpoint *ffcapi.BlockListenerCheckpoint, eventStream chan<- *ffcapi.ListenerEvent) error {
_, err := bcm.startConfirmedBlockListener(ctx, id, fromBlock, checkpoint, eventStream)
_, err := bcm.startConfirmedBlockListener(ctx, id, fromBlock, checkpoint, eventStream, nil)
return err
}

func (bcm *blockConfirmationManager) startConfirmedBlockListener(fgCtx context.Context, id *fftypes.UUID, fromBlock string, checkpoint *ffcapi.BlockListenerCheckpoint, eventStream chan<- *ffcapi.ListenerEvent) (cbl *confirmedBlockListener, err error) {
func (bcm *blockConfirmationManager) StartBlockConfirmationsListener(ctx context.Context, id *fftypes.UUID, fromBlock string, checkpoint *ffcapi.BlockListenerCheckpoint, eventStream chan<- *apitypes.ConfirmationsForListenerEvent) error {
_, err := bcm.startConfirmedBlockListener(ctx, id, fromBlock, checkpoint, nil, eventStream)
return err
}

func (bcm *blockConfirmationManager) startConfirmedBlockListener(fgCtx context.Context, id *fftypes.UUID, fromBlock string, checkpoint *ffcapi.BlockListenerCheckpoint, blockEventOutputChannel chan<- *ffcapi.ListenerEvent, confirmationsOutputChannel chan<- *apitypes.ConfirmationsForListenerEvent) (cbl *confirmedBlockListener, err error) { //nolint:unparam

if blockEventOutputChannel == nil && confirmationsOutputChannel == nil {
return nil, i18n.NewError(fgCtx, tmmsgs.MsgBlockListenerNoOutputChannel)
}
if blockEventOutputChannel != nil && confirmationsOutputChannel != nil {
return nil, i18n.NewError(fgCtx, tmmsgs.MsgBlockListenerBothOutputChannels)
}

cbl = &confirmedBlockListener{
bcm: bcm,
// We need our own listener for each confirmed block stream, and the bcm has to fan out
newBlockHashes: make(chan *ffcapi.BlockHashEvent, config.GetInt(tmconfig.ConfirmationsBlockQueueLength)),
dispatcherTap: make(chan struct{}, 1),
id: id,
eventStream: eventStream,
requiredConfirmations: bcm.requiredConfirmations,
connector: bcm.connector,
retry: bcm.retry,
rollingCheckpoint: checkpoint,
processorDone: make(chan struct{}),
dispatcherDone: make(chan struct{}),
newBlockHashes: make(chan *ffcapi.BlockHashEvent, config.GetInt(tmconfig.ConfirmationsBlockQueueLength)),
dispatcherTap: make(chan struct{}, 1),
id: id,
blockEventOutputChannel: blockEventOutputChannel,
confirmationsOutputChannel: confirmationsOutputChannel,
streamConfirmations: confirmationsOutputChannel != nil,
requiredConfirmations: bcm.requiredConfirmations,
connector: bcm.connector,
retry: bcm.retry,
rollingCheckpoint: checkpoint,
processorDone: make(chan struct{}),
dispatcherDone: make(chan struct{}),
}
cbl.ctx, cbl.cancelFunc = context.WithCancel(bcm.ctx)
// add a log context for this specific confirmation manager (as there are many within the )
Expand Down Expand Up @@ -252,7 +272,7 @@ func (cbl *confirmedBlockListener) dispatcher() {
// spin getting blocks until we it looks like we need to wait for a notification
lastFromNotification := false
for cbl.readNextBlock(&lastFromNotification) {
cbl.dispatchAllConfirmed()
cbl.dispatchEventsToOutputChannel()
}
}

Expand Down Expand Up @@ -342,39 +362,96 @@ func (cbl *confirmedBlockListener) readNextBlock(lastFromNotification *bool) (fo

}

func (cbl *confirmedBlockListener) dispatchAllConfirmed() {
for {
var toDispatch *ffcapi.ListenerEvent
cbl.stateLock.Lock()
if len(cbl.blocksSinceCheckpoint) > cbl.requiredConfirmations {
block := cbl.blocksSinceCheckpoint[0]
// don't want memory to grow indefinitely by shifting right, so we create a new slice here
cbl.blocksSinceCheckpoint = append([]*apitypes.BlockInfo{}, cbl.blocksSinceCheckpoint[1:]...)
func (cbl *confirmedBlockListener) dispatchEventsToOutputChannel() {
cbl.stateLock.Lock()

totalBlocks := len(cbl.blocksSinceCheckpoint)
earliestUncomfirmedBlockIndex := totalBlocks - cbl.requiredConfirmations
if earliestUncomfirmedBlockIndex < 0 {
earliestUncomfirmedBlockIndex = 0
}

for i, block := range cbl.blocksSinceCheckpoint {
var toDispatch *apitypes.ConfirmationsForListenerEvent
cbEvent := &ffcapi.ListenerEvent{
BlockEvent: &ffcapi.BlockEvent{
ListenerID: cbl.id,
BlockInfo: ffcapi.BlockInfo{
//nolint:gosec
BlockNumber: fftypes.NewFFBigInt(int64(block.BlockNumber)),
BlockHash: block.BlockHash,
ParentHash: block.ParentHash,
TransactionHashes: block.TransactionHashes,
},
},
Checkpoint: cbl.rollingCheckpoint,
}
confirmationBlocks := []*apitypes.BlockInfo{}
if cbl.streamConfirmations {
if i < totalBlocks-1 {
// build up the array when the current block is not the last one
confirmationEndingIndex := i + cbl.requiredConfirmations + 1
if confirmationEndingIndex > totalBlocks {
confirmationEndingIndex = totalBlocks
}

confirmationBlocks = cbl.blocksSinceCheckpoint[i+1 : confirmationEndingIndex]
}
}
if i < earliestUncomfirmedBlockIndex || cbl.requiredConfirmations == 0 {
// this block is confirmed
toDispatch = &apitypes.ConfirmationsForListenerEvent{
Event: cbEvent,
}
cbl.rollingCheckpoint = &ffcapi.BlockListenerCheckpoint{
Block: block.BlockNumber.Uint64(),
}
toDispatch = &ffcapi.ListenerEvent{
BlockEvent: &ffcapi.BlockEvent{
ListenerID: cbl.id,
BlockInfo: ffcapi.BlockInfo{
//nolint:gosec
BlockNumber: fftypes.NewFFBigInt(int64(block.BlockNumber)),
BlockHash: block.BlockHash,
ParentHash: block.ParentHash,
TransactionHashes: block.TransactionHashes,
// for confirmed blocks we always set the checkpoint to the current rolling checkpoint
cbEvent.Checkpoint = cbl.rollingCheckpoint
if cbl.streamConfirmations {
toDispatch.TargetConfirmationCount = cbl.requiredConfirmations
toDispatch.CurrentConfirmationCount = cbl.requiredConfirmations
toDispatch.ConfirmationsNotification = apitypes.ConfirmationsNotification{
Confirmed: true,
NewFork: cbl.reOrgedSinceDispatch,
Confirmations: apitypes.BlockInfosToConfirmations(confirmationBlocks),
}
}
} else if cbl.streamConfirmations {
// NOTE: we dispatch all blocks, even if they have 0 confirmations, which serves as the receipt of the block
// this block is not confirmed, only need to dispatch if we are streaming confirmations
// We are streaming confirmations, so we dispatch the confirmation event
toDispatch = &apitypes.ConfirmationsForListenerEvent{
Event: cbEvent,
ConfirmationContext: apitypes.ConfirmationContext{
ConfirmationsNotification: apitypes.ConfirmationsNotification{
Confirmed: false,
NewFork: cbl.reOrgedSinceDispatch,
Confirmations: apitypes.BlockInfosToConfirmations(confirmationBlocks),
},
TargetConfirmationCount: cbl.requiredConfirmations,
CurrentConfirmationCount: (totalBlocks - i) - 1,
},
Checkpoint: cbl.rollingCheckpoint,
}

}
cbl.stateLock.Unlock()
if toDispatch == nil {
return
break
}
log.L(cbl.ctx).Infof("Dispatching block %d/%s", toDispatch.BlockEvent.BlockNumber.Uint64(), toDispatch.BlockEvent.BlockHash)
select {
case cbl.eventStream <- toDispatch:
case <-cbl.ctx.Done():
log.L(cbl.ctx).Infof("Dispatching block %d/%s", toDispatch.Event.BlockEvent.BlockNumber.Uint64(), toDispatch.Event.BlockEvent.BlockHash)
if cbl.streamConfirmations {
select {
case cbl.confirmationsOutputChannel <- toDispatch:
case <-cbl.ctx.Done():
}
} else {
select {
case cbl.blockEventOutputChannel <- toDispatch.Event:
case <-cbl.ctx.Done():
}
}
}
cbl.blocksSinceCheckpoint = append([]*apitypes.BlockInfo{}, cbl.blocksSinceCheckpoint[earliestUncomfirmedBlockIndex:]...)
cbl.reOrgedSinceDispatch = false
cbl.stateLock.Unlock()
}
28 changes: 14 additions & 14 deletions internal/confirmations/confirmed_block_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestCBLCatchUpToHeadFromZeroNoConfirmations(t *testing.T) {
mbiNum.Run(func(args mock.Arguments) { mockBlockNumberReturn(mbiNum, args, blocks) })

bcm.requiredConfirmations = 0
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch)
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch, nil)
assert.NoError(t, err)

for i := 0; i < len(blocks)-bcm.requiredConfirmations; i++ {
Expand Down Expand Up @@ -70,7 +70,7 @@ func TestCBLCatchUpToHeadFromZeroWithConfirmations(t *testing.T) {
mbiNum.Run(func(args mock.Arguments) { mockBlockNumberReturn(mbiNum, args, blocks) })

bcm.requiredConfirmations = 5
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch)
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch, nil)
assert.NoError(t, err)

for i := 0; i < len(blocks)-bcm.requiredConfirmations; i++ {
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestCBLListenFromCurrentBlock(t *testing.T) {
mca.On("BlockInfoByNumber", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReasonNotFound, fmt.Errorf("not found")).Maybe()

bcm.requiredConfirmations = 5
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "", nil, esDispatch)
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "", nil, esDispatch, nil)
assert.NoError(t, err)

// Notify starting at block 5
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestCBLListenFromCurrentUsingCheckpointBlock(t *testing.T) {
bcm.requiredConfirmations = 5
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "", &ffcapi.BlockListenerCheckpoint{
Block: 12345,
}, esDispatch)
}, esDispatch, nil)
assert.NoError(t, err)

assert.False(t, cbl.waitingForFromBlock)
Expand Down Expand Up @@ -249,7 +249,7 @@ func testCBLHandleReorgInConfirmationWindow(t *testing.T, blockLenBeforeReorg, o
mbiHash.Run(func(args mock.Arguments) { mockBlockHashReturn(mbiHash, args, blocksAfterReorg) })

bcm.requiredConfirmations = reqConf
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch)
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch, nil)
assert.NoError(t, err)

for i := 0; i < len(blocksAfterReorg)-bcm.requiredConfirmations; i++ {
Expand Down Expand Up @@ -314,7 +314,7 @@ func TestCBLHandleRandomConflictingBlockNotification(t *testing.T) {
return req.BlockHash == randBlock.BlockHash
})).Return(randBlock, ffcapi.ErrorReason(""), nil)

cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch)
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, ffcapi.FromBlockEarliest, nil, esDispatch, nil)
assert.NoError(t, err)
cbl.requiredConfirmations = 5

Expand Down Expand Up @@ -342,7 +342,7 @@ func TestCBLDispatcherFallsBehindHead(t *testing.T) {
bcm.requiredConfirmations = 5

// Start a CBL, but then cancel the dispatcher
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "", nil, esDispatch)
cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "", nil, esDispatch, nil)
assert.NoError(t, err)
cbl.cancelFunc()
<-cbl.dispatcherDone
Expand Down Expand Up @@ -385,7 +385,7 @@ func TestCBLStartBadFromBlock(t *testing.T) {

id := fftypes.NewUUID()

_, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "wrong", nil, esDispatch)
_, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "wrong", nil, esDispatch, nil)
assert.Regexp(t, "FF21090", err)

bcm.Stop()
Expand All @@ -409,11 +409,11 @@ func TestProcessBlockHashesSwallowsFailure(t *testing.T) {
func TestDispatchAllConfirmedNonBlocking(t *testing.T) {
bcm, _ := newTestBlockConfirmationManager()
cbl := &confirmedBlockListener{
id: fftypes.NewUUID(),
ctx: bcm.ctx,
bcm: bcm,
processorDone: make(chan struct{}),
eventStream: make(chan<- *ffcapi.ListenerEvent), // blocks indefinitely
id: fftypes.NewUUID(),
ctx: bcm.ctx,
bcm: bcm,
processorDone: make(chan struct{}),
blockEventOutputChannel: make(chan<- *ffcapi.ListenerEvent), // blocks indefinitely
}

cbl.requiredConfirmations = 0
Expand All @@ -423,7 +423,7 @@ func TestDispatchAllConfirmedNonBlocking(t *testing.T) {
waitForDispatchReturn := make(chan struct{})
go func() {
defer close(waitForDispatchReturn)
cbl.dispatchAllConfirmed()
cbl.dispatchEventsToOutputChannel()
}()

bcm.cancelFunc()
Expand Down
Loading