Skip to content

Commit c4b86db

Browse files
authored
Fix committer (#276)
* Simplify poller and commit loop * Fix S3 source * Remove invalid tests * Logs * Fixes * Fix poll limit
1 parent acdb2eb commit c4b86db

25 files changed

+1021
-3587
lines changed

cmd/migrate_valid.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,6 @@ func processBlockRange(ctx context.Context, migrator *Migrator, workerID int, st
202202
}
203203

204204
blockNumbers := generateBlockNumbersForRange(currentBlock, batchEndBlock)
205-
log.Info().Msgf("Worker %d: Processing blocks %s to %s", workerID, blockNumbers[0].String(), blockNumbers[len(blockNumbers)-1].String())
206205

207206
// Fetch valid blocks from source
208207
fetchStartTime := time.Now()
@@ -214,7 +213,6 @@ func processBlockRange(ctx context.Context, migrator *Migrator, workerID int, st
214213
time.Sleep(3 * time.Second)
215214
continue
216215
}
217-
log.Debug().Dur("duration", fetchDuration).Int("blocks_fetched", len(validBlocksForRange)).Msgf("Worker %d: Fetched valid blocks from source", workerID)
218216

219217
// Build map of fetched blocks
220218
mapBuildStartTime := time.Now()
@@ -231,10 +229,11 @@ func processBlockRange(ctx context.Context, migrator *Migrator, workerID int, st
231229
}
232230
}
233231
mapBuildDuration := time.Since(mapBuildStartTime)
234-
log.Debug().Dur("duration", mapBuildDuration).Int("missing_blocks", len(missingBlocks)).Msgf("Worker %d: Identified missing blocks", workerID)
235232

236233
// Fetch missing blocks from RPC
237234
if len(missingBlocks) > 0 {
235+
log.Debug().Dur("duration", mapBuildDuration).Int("missing_blocks", len(missingBlocks)).Msgf("Worker %d: Identified missing blocks", workerID)
236+
238237
rpcFetchStartTime := time.Now()
239238
validMissingBlocks := migrator.GetValidBlocksFromRPC(missingBlocks)
240239
rpcFetchDuration := time.Since(rpcFetchStartTime)
@@ -249,13 +248,10 @@ func processBlockRange(ctx context.Context, migrator *Migrator, workerID int, st
249248
}
250249

251250
// Prepare blocks for insertion
252-
prepStartTime := time.Now()
253251
blocksToInsert := make([]common.BlockData, 0, len(blocksToInsertMap))
254252
for _, blockData := range blocksToInsertMap {
255253
blocksToInsert = append(blocksToInsert, blockData)
256254
}
257-
prepDuration := time.Since(prepStartTime)
258-
log.Debug().Dur("duration", prepDuration).Int("blocks_to_insert", len(blocksToInsert)).Msgf("Worker %d: Prepared blocks for insertion", workerID)
259255

260256
// Insert blocks to destination
261257
insertStartTime := time.Now()
@@ -273,7 +269,9 @@ func processBlockRange(ctx context.Context, migrator *Migrator, workerID int, st
273269
Dur("fetch_duration", fetchDuration).
274270
Dur("insert_duration", insertDuration).
275271
Int("blocks_processed", len(blocksToInsert)).
276-
Msgf("Worker %d: Batch processed successfully", workerID)
272+
Str("start_block_number", blockNumbers[0].String()).
273+
Str("end_block_number", blockNumbers[len(blockNumbers)-1].String()).
274+
Msgf("Worker %d: Batch processed successfully for %s - %s", workerID, blockNumbers[0].String(), blockNumbers[len(blockNumbers)-1].String())
277275

278276
currentBlock = new(big.Int).Add(batchEndBlock, big.NewInt(1))
279277
}
@@ -315,7 +313,7 @@ func NewMigrator() *Migrator {
315313
log.Fatal().Msg("RPC does not support block receipts, but transactions were indexed with receipts")
316314
}
317315

318-
validator := orchestrator.NewValidator(rpcClient, sourceConnector)
316+
validator := orchestrator.NewValidator(rpcClient, sourceConnector, worker.NewWorker(rpcClient))
319317

320318
destinationConnector, err := storage.NewMainConnector(&config.Cfg.Migrator.Destination, &sourceConnector.OrchestratorStorage)
321319
if err != nil {
@@ -441,8 +439,7 @@ func (m *Migrator) FetchBlocksFromRPC(blockNumbers []*big.Int) ([]common.BlockDa
441439
blockData := m.worker.Run(context.Background(), blockNumbers)
442440
for _, block := range blockData {
443441
if block.Error != nil {
444-
log.Warn().Err(block.Error).Msgf("Failed to fetch block %s from RPC", block.BlockNumber.String())
445-
continue
442+
return nil, block.Error
446443
}
447444
allBlockData = append(allBlockData, block.Data)
448445
}

cmd/root.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ func init() {
5353
rootCmd.PersistentFlags().Bool("poller-interval", true, "Poller interval")
5454
rootCmd.PersistentFlags().Int("poller-blocks-per-poll", 10, "How many blocks to poll each interval")
5555
rootCmd.PersistentFlags().Int("poller-from-block", 0, "From which block to start polling")
56-
rootCmd.PersistentFlags().Bool("poller-force-from-block", false, "Force the poller to start from the block specified in `poller-from-block`")
5756
rootCmd.PersistentFlags().Int("poller-until-block", 0, "Until which block to poll")
5857
rootCmd.PersistentFlags().Int("poller-parallel-pollers", 5, "Maximum number of parallel pollers")
5958
rootCmd.PersistentFlags().String("poller-s3-bucket", "", "S3 bucket for poller archive source")
@@ -77,10 +76,6 @@ func init() {
7776
rootCmd.PersistentFlags().Int("reorgHandler-interval", 1000, "How often to run reorg handler in milliseconds")
7877
rootCmd.PersistentFlags().Int("reorgHandler-blocks-per-scan", 100, "How many blocks to scan for reorgs")
7978
rootCmd.PersistentFlags().Int("reorgHandler-from-block", 0, "From which block to start scanning for reorgs")
80-
rootCmd.PersistentFlags().Bool("reorgHandler-force-from-block", false, "Force the reorg handler to start from the block specified in `reorgHandler-from-block`")
81-
rootCmd.PersistentFlags().Bool("failure-recoverer-enabled", true, "Toggle failure recoverer")
82-
rootCmd.PersistentFlags().Int("failure-recoverer-blocks-per-run", 10, "How many blocks to run failure recoverer for")
83-
rootCmd.PersistentFlags().Int("failure-recoverer-interval", 1000, "How often to run failure recoverer in milliseconds")
8479
rootCmd.PersistentFlags().String("storage-staging-clickhouse-database", "", "Clickhouse database for staging storage")
8580
rootCmd.PersistentFlags().Int("storage-staging-clickhouse-port", 0, "Clickhouse port for staging storage")
8681
rootCmd.PersistentFlags().String("storage-main-clickhouse-database", "", "Clickhouse database for main storage")
@@ -259,7 +254,6 @@ func init() {
259254
viper.BindPFlag("poller.interval", rootCmd.PersistentFlags().Lookup("poller-interval"))
260255
viper.BindPFlag("poller.blocksPerPoll", rootCmd.PersistentFlags().Lookup("poller-blocks-per-poll"))
261256
viper.BindPFlag("poller.fromBlock", rootCmd.PersistentFlags().Lookup("poller-from-block"))
262-
viper.BindPFlag("poller.forceFromBlock", rootCmd.PersistentFlags().Lookup("poller-force-from-block"))
263257
viper.BindPFlag("poller.untilBlock", rootCmd.PersistentFlags().Lookup("poller-until-block"))
264258
viper.BindPFlag("poller.parallelPollers", rootCmd.PersistentFlags().Lookup("poller-parallel-pollers"))
265259
viper.BindPFlag("poller.s3.endpoint", rootCmd.PersistentFlags().Lookup("poller-s3-endpoint"))
@@ -282,10 +276,6 @@ func init() {
282276
viper.BindPFlag("reorgHandler.interval", rootCmd.PersistentFlags().Lookup("reorgHandler-interval"))
283277
viper.BindPFlag("reorgHandler.blocksPerScan", rootCmd.PersistentFlags().Lookup("reorgHandler-blocks-per-scan"))
284278
viper.BindPFlag("reorgHandler.fromBlock", rootCmd.PersistentFlags().Lookup("reorgHandler-from-block"))
285-
viper.BindPFlag("reorgHandler.forceFromBlock", rootCmd.PersistentFlags().Lookup("reorgHandler-force-from-block"))
286-
viper.BindPFlag("failureRecoverer.enabled", rootCmd.PersistentFlags().Lookup("failure-recoverer-enabled"))
287-
viper.BindPFlag("failureRecoverer.blocksPerRun", rootCmd.PersistentFlags().Lookup("failure-recoverer-blocks-per-run"))
288-
viper.BindPFlag("failureRecoverer.interval", rootCmd.PersistentFlags().Lookup("failure-recoverer-interval"))
289279
viper.BindPFlag("storage.staging.clickhouse.database", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-database"))
290280
viper.BindPFlag("storage.staging.clickhouse.host", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-host"))
291281
viper.BindPFlag("storage.staging.clickhouse.port", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-port"))

cmd/validate.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/thirdweb-dev/indexer/internal/orchestrator"
1010
"github.com/thirdweb-dev/indexer/internal/rpc"
1111
"github.com/thirdweb-dev/indexer/internal/storage"
12+
"github.com/thirdweb-dev/indexer/internal/worker"
1213
)
1314

1415
var (
@@ -58,7 +59,7 @@ func RunValidate(cmd *cobra.Command, args []string) {
5859
log.Fatal().Err(err).Msg("Failed to initialize storage")
5960
}
6061

61-
validator := orchestrator.NewValidator(rpcClient, s)
62+
validator := orchestrator.NewValidator(rpcClient, s, worker.NewWorker(rpcClient))
6263

6364
_, invalidBlocks, err := validator.ValidateBlockRange(startBlock, endBlock)
6465
if err != nil {

cmd/validate_and_fix.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/thirdweb-dev/indexer/internal/rpc"
1515
"github.com/thirdweb-dev/indexer/internal/storage"
1616
"github.com/thirdweb-dev/indexer/internal/validation"
17+
"github.com/thirdweb-dev/indexer/internal/worker"
1718
)
1819

1920
var (
@@ -116,7 +117,7 @@ func RunValidateAndFix(cmd *cobra.Command, args []string) {
116117
* Validates a range of blocks (end and start are inclusive) for a given chain and fixes any problems it finds
117118
*/
118119
func validateAndFixRange(rpcClient rpc.IRPCClient, s storage.IStorage, conn clickhouse.Conn, startBlock *big.Int, endBlock *big.Int, fixBatchSize int) error {
119-
validator := orchestrator.NewValidator(rpcClient, s)
120+
validator := orchestrator.NewValidator(rpcClient, s, worker.NewWorker(rpcClient))
120121

121122
chainId := rpcClient.GetChainID()
122123
err := validation.FindAndRemoveDuplicates(conn, chainId, startBlock, endBlock)

0 commit comments

Comments
 (0)