Skip to content
Draft
18 changes: 12 additions & 6 deletions internal/data/statechanges.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (m *StateChangeModel) BatchInsert(
spenderAccountIDs := make([]*string, len(stateChanges))
sponsoredAccountIDs := make([]*string, len(stateChanges))
sponsorAccountIDs := make([]*string, len(stateChanges))
deployerAccountIDs := make([]*string, len(stateChanges))
signerWeights := make([]*types.NullableJSONB, len(stateChanges))
thresholds := make([]*types.NullableJSONB, len(stateChanges))
flags := make([]*types.NullableJSON, len(stateChanges))
Expand Down Expand Up @@ -90,6 +91,9 @@ func (m *StateChangeModel) BatchInsert(
if sc.SponsorAccountID.Valid {
sponsorAccountIDs[i] = &sc.SponsorAccountID.String
}
if sc.DeployerAccountID.Valid {
deployerAccountIDs[i] = &sc.DeployerAccountID.String
}
if sc.SignerWeights != nil {
signerWeights[i] = &sc.SignerWeights
}
Expand Down Expand Up @@ -130,10 +134,11 @@ func (m *StateChangeModel) BatchInsert(
UNNEST($15::text[]) AS spender_account_id,
UNNEST($16::text[]) AS sponsored_account_id,
UNNEST($17::text[]) AS sponsor_account_id,
UNNEST($18::jsonb[]) AS signer_weights,
UNNEST($19::jsonb[]) AS thresholds,
UNNEST($20::jsonb[]) AS flags,
UNNEST($21::jsonb[]) AS key_value
UNNEST($18::text[]) AS deployer_account_id,
UNNEST($19::jsonb[]) AS signer_weights,
UNNEST($20::jsonb[]) AS thresholds,
UNNEST($21::jsonb[]) AS flags,
UNNEST($22::jsonb[]) AS key_value
),

-- STEP 3: Get state changes that reference existing accounts
Expand All @@ -150,13 +155,13 @@ func (m *StateChangeModel) BatchInsert(
ledger_number, account_id, operation_id, tx_hash, token_id, amount,
claimable_balance_id, liquidity_pool_id, offer_id, signer_account_id,
spender_account_id, sponsored_account_id, sponsor_account_id,
signer_weights, thresholds, flags, key_value)
deployer_account_id, signer_weights, thresholds, flags, key_value)
SELECT
id, state_change_category, state_change_reason, ledger_created_at,
ledger_number, account_id, operation_id, tx_hash, token_id, amount,
claimable_balance_id, liquidity_pool_id, offer_id, signer_account_id,
spender_account_id, sponsored_account_id, sponsor_account_id,
signer_weights, thresholds, flags, key_value
deployer_account_id, signer_weights, thresholds, flags, key_value
FROM valid_state_changes
ON CONFLICT (id) DO NOTHING
RETURNING id
Expand Down Expand Up @@ -185,6 +190,7 @@ func (m *StateChangeModel) BatchInsert(
pq.Array(spenderAccountIDs),
pq.Array(sponsoredAccountIDs),
pq.Array(sponsorAccountIDs),
pq.Array(deployerAccountIDs),
pq.Array(signerWeights),
pq.Array(thresholds),
pq.Array(flags),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ CREATE TABLE state_changes (
spender_account_id TEXT,
sponsored_account_id TEXT,
sponsor_account_id TEXT,
deployer_account_id TEXT,
thresholds JSONB
);

Expand Down
49 changes: 49 additions & 0 deletions internal/indexer/bulk_operation_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package indexer

import (
"context"
"errors"
"fmt"

operation_processor "github.com/stellar/go/processors/operation"

"github.com/stellar/wallet-backend/internal/indexer/processors"
"github.com/stellar/wallet-backend/internal/indexer/types"
)

// BulkOperationProcessor combines multiple OperationProcessorInterface instances
// and processes operations through all of them, collecting their results.
type BulkOperationProcessor struct {
processors []OperationStateChangeProcessorInterface
}

// NewBulkOperationProcessor creates a new bulk processor with the given processors.
func NewBulkOperationProcessor(processors ...OperationStateChangeProcessorInterface) *BulkOperationProcessor {
return &BulkOperationProcessor{
processors: processors,
}
}

// ProcessOperation processes the operation through all child processors and combines their results.
func (b *BulkOperationProcessor) ProcessOperation(ctx context.Context, opWrapper *operation_processor.TransactionOperationWrapper) ([]types.StateChange, error) {
stateChangesMap := map[string]types.StateChange{}

for _, processor := range b.processors {
stateChanges, err := processor.ProcessOperation(ctx, opWrapper)
if err != nil && !errors.Is(err, processors.ErrInvalidOpType) {
return nil, fmt.Errorf("processor %T failed: %w", processor, err)
} else if err != nil {
continue
}

for _, stateChange := range stateChanges {
stateChangesMap[stateChange.ID] = stateChange
}
}

stateChanges := make([]types.StateChange, 0, len(stateChangesMap))
for _, stateChange := range stateChangesMap {
stateChanges = append(stateChanges, stateChange)
}
return stateChanges, nil
}
165 changes: 165 additions & 0 deletions internal/indexer/bulk_operation_processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package indexer

import (
"context"
"errors"
"testing"
"time"

"github.com/stellar/go/network"
operation_processor "github.com/stellar/go/processors/operation"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/stellar/wallet-backend/internal/indexer/processors"
"github.com/stellar/wallet-backend/internal/indexer/types"
)

func Test_BulkOperationProcessor_ProcessOperation(t *testing.T) {
ctx := context.Background()
testOp := &operation_processor.TransactionOperationWrapper{
Network: network.TestNetworkPassphrase,
LedgerClosed: time.Now(),
Operation: xdr.Operation{
Body: xdr.OperationBody{Type: xdr.OperationTypePayment},
},
}

type testCase struct {
name string
numProcessors int
prepareMocks func(t *testing.T, processors []OperationStateChangeProcessorInterface)
wantErrContains string
wantResult []types.StateChange
}

testCases := []testCase{
{
name: "🟢successful_processing_with_multiple_processors",
numProcessors: 2,
prepareMocks: func(t *testing.T, procs []OperationStateChangeProcessorInterface) {
mock1 := procs[0].(*MockOperationStateChangeProcessor)
mock2 := procs[1].(*MockOperationStateChangeProcessor)

stateChanges1 := []types.StateChange{{ID: "sc1"}}
stateChanges2 := []types.StateChange{{ID: "sc2"}}

mock1.On("ProcessOperation", ctx, testOp).Return(stateChanges1, nil)
mock2.On("ProcessOperation", ctx, testOp).Return(stateChanges2, nil)
},
wantResult: []types.StateChange{{ID: "sc1"}, {ID: "sc2"}},
},
{
name: "🟢successful_processing_with_empty_results",
numProcessors: 2,
prepareMocks: func(t *testing.T, procs []OperationStateChangeProcessorInterface) {
mock1 := procs[0].(*MockOperationStateChangeProcessor)
mock2 := procs[1].(*MockOperationStateChangeProcessor)

mock1.On("ProcessOperation", ctx, testOp).Return([]types.StateChange{}, nil)
mock2.On("ProcessOperation", ctx, testOp).Return([]types.StateChange{}, nil)
},
wantResult: []types.StateChange{},
},
{
name: "🟢ignores_err_invalid_op_type_errors",
numProcessors: 2,
prepareMocks: func(t *testing.T, procs []OperationStateChangeProcessorInterface) {
mock1 := procs[0].(*MockOperationStateChangeProcessor)
mock2 := procs[1].(*MockOperationStateChangeProcessor)

stateChanges1 := []types.StateChange{{ID: "sc1"}}
stateChanges2 := []types.StateChange{{ID: "sc2"}}

mock1.On("ProcessOperation", ctx, testOp).Return(stateChanges1, processors.ErrInvalidOpType)
mock2.On("ProcessOperation", ctx, testOp).Return(stateChanges2, nil)
},
wantResult: []types.StateChange{{ID: "sc2"}},
},
{
name: "🔴returns_first_error_from_processor_with_proper_wrapping",
numProcessors: 1,
prepareMocks: func(t *testing.T, procs []OperationStateChangeProcessorInterface) {
mock := procs[0].(*MockOperationStateChangeProcessor)

stateChanges := []types.StateChange{{ID: "sc1"}}
expectedError := errors.New("processor error")

mock.On("ProcessOperation", ctx, testOp).Return(stateChanges, expectedError)
},
wantErrContains: "processor *indexer.MockOperationStateChangeProcessor failed:",
wantResult: nil,
},
{
name: "🔴returns_first_error_when_multiple_processors_would_fail",
numProcessors: 1,
prepareMocks: func(t *testing.T, procs []OperationStateChangeProcessorInterface) {
mock := procs[0].(*MockOperationStateChangeProcessor)

stateChanges := []types.StateChange{{ID: "sc1"}}
resErr := errors.New("first error")

mock.On("ProcessOperation", ctx, testOp).Return(stateChanges, resErr)
},
wantErrContains: "first error",
wantResult: nil,
},
{
name: "🟢deduplicates_state_changes_by_id",
numProcessors: 2,
prepareMocks: func(t *testing.T, procs []OperationStateChangeProcessorInterface) {
mock1 := procs[0].(*MockOperationStateChangeProcessor)
mock2 := procs[1].(*MockOperationStateChangeProcessor)

stateChanges1 := []types.StateChange{{ID: "sc1", StateChangeCategory: types.StateChangeCategoryDebit}}
stateChanges2 := []types.StateChange{{ID: "sc1", StateChangeCategory: types.StateChangeCategoryDebit}}

mock1.On("ProcessOperation", ctx, testOp).Return(stateChanges1, nil)
mock2.On("ProcessOperation", ctx, testOp).Return(stateChanges2, nil)
},
wantResult: []types.StateChange{{ID: "sc1", StateChangeCategory: types.StateChangeCategoryDebit}},
},
{
name: "🟢works_with_no_processors",
numProcessors: 0,
prepareMocks: func(t *testing.T, procs []OperationStateChangeProcessorInterface) {},
wantResult: []types.StateChange{},
},
{
name: "🟢works_with_single_processor",
numProcessors: 1,
prepareMocks: func(t *testing.T, procs []OperationStateChangeProcessorInterface) {
mock := procs[0].(*MockOperationStateChangeProcessor)
stateChanges := []types.StateChange{{ID: "sc1"}}

mock.On("ProcessOperation", ctx, testOp).Return(stateChanges, nil)
},
wantResult: []types.StateChange{{ID: "sc1"}},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
processors := []OperationStateChangeProcessorInterface{}
for range tc.numProcessors {
newProcessor := &MockOperationStateChangeProcessor{}
defer newProcessor.AssertExpectations(t)
processors = append(processors, newProcessor)
}
tc.prepareMocks(t, processors)

bulkProcessor := NewBulkOperationProcessor(processors...)
result, err := bulkProcessor.ProcessOperation(ctx, testOp)

if tc.wantErrContains != "" {
require.Error(t, err)
assert.ErrorContains(t, err, tc.wantErrContains)
assert.Empty(t, result)
} else {
require.NoError(t, err)
assert.ElementsMatch(t, tc.wantResult, result)
}
})
}
}
30 changes: 19 additions & 11 deletions internal/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

set "github.com/deckarep/golang-set/v2"
"github.com/stellar/go/ingest"
operation_processor "github.com/stellar/go/processors/operation"

"github.com/stellar/wallet-backend/internal/indexer/processors"
"github.com/stellar/wallet-backend/internal/indexer/types"
Expand All @@ -32,19 +33,26 @@ type ParticipantsProcessorInterface interface {
GetOperationsParticipants(transaction ingest.LedgerTransaction) (map[int64]processors.OperationParticipants, error)
}

type OperationStateChangeProcessorInterface interface {
ProcessOperation(ctx context.Context, opWrapper *operation_processor.TransactionOperationWrapper) ([]types.StateChange, error)
}

type Indexer struct {
Buffer IndexerBufferInterface
participantsProcessor ParticipantsProcessorInterface
tokenTransferProcessor TokenTransferProcessorInterface
effectsProcessor EffectsProcessorInterface
Buffer IndexerBufferInterface
participantsProcessor ParticipantsProcessorInterface
tokenTransferProcessor TokenTransferProcessorInterface
opStateChangeProcessors OperationStateChangeProcessorInterface
}

func NewIndexer(networkPassphrase string) *Indexer {
return &Indexer{
Buffer: NewIndexerBuffer(),
participantsProcessor: processors.NewParticipantsProcessor(networkPassphrase),
tokenTransferProcessor: processors.NewTokenTransferProcessor(networkPassphrase),
effectsProcessor: processors.NewEffectsProcessor(networkPassphrase),
opStateChangeProcessors: NewBulkOperationProcessor(
processors.NewEffectsProcessor(networkPassphrase),
processors.NewContractDeployProcessor(networkPassphrase),
),
}
}

Expand All @@ -71,7 +79,7 @@ func (i *Indexer) ProcessTransaction(ctx context.Context, transaction ingest.Led
return fmt.Errorf("getting operations participants: %w", err)
}
var dataOp *types.Operation
var effectsStateChanges []types.StateChange
var opStateChanges []types.StateChange
for opID, opParticipants := range opsParticipants {
dataOp, err = processors.ConvertOperation(&transaction, &opParticipants.OpWrapper.Operation, opID)
if err != nil {
Expand All @@ -82,15 +90,15 @@ func (i *Indexer) ProcessTransaction(ctx context.Context, transaction ingest.Led
i.Buffer.PushParticipantOperation(participant, *dataOp, *dataTx)
}

// 2.1. Index effects state changes
effectsStateChanges, err = i.effectsProcessor.ProcessOperation(ctx, opParticipants.OpWrapper)
// 3. Index operation state changes from all inner processors
opStateChanges, err = i.opStateChangeProcessors.ProcessOperation(ctx, opParticipants.OpWrapper)
if err != nil {
return fmt.Errorf("processing effects state changes: %w", err)
return fmt.Errorf("processing operation state changes: %w", err)
}
i.Buffer.PushStateChanges(effectsStateChanges)
i.Buffer.PushStateChanges(opStateChanges)
}

// 3. Index token transfer state changes
// 4. Index token transfer state changes
tokenTransferStateChanges, err := i.tokenTransferProcessor.ProcessTransaction(ctx, transaction)
if err != nil {
return fmt.Errorf("processing token transfer state changes: %w", err)
Expand Down
Loading