From d2853353de5d7ee7f8f90631109a9a760dc899da Mon Sep 17 00:00:00 2001 From: h3110w0r1d <2322013233@qq.com> Date: Mon, 7 Jul 2025 14:25:11 +0800 Subject: [PATCH 1/8] feat(toolkit): implement backfill for SectionBloom data in historical blocks --- plugins/README.md | 25 + plugins/build.gradle | 1 + .../src/main/java/org/tron/plugins/Db.java | 1 + .../org/tron/plugins/DbBackfillBloom.java | 471 ++++++++++++++++++ .../org/tron/plugins/DbBackfillBloomTest.java | 320 ++++++++++++ 5 files changed, 818 insertions(+) create mode 100644 plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java create mode 100644 plugins/src/test/java/org/tron/plugins/DbBackfillBloomTest.java diff --git a/plugins/README.md b/plugins/README.md index 0db6f2e6143..16e48760fc7 100644 --- a/plugins/README.md +++ b/plugins/README.md @@ -145,3 +145,28 @@ NOTE: large db may GC overhead limit exceeded. - ``: Source path for database. Default: output-directory/database - `--db`: db name. - `-h | --help`: provide the help info + +## DB Backfill-Bloom + +DB backfill bloom provides the ability to backfill SectionBloom data for historical blocks to enable eth_getLogs address/topics filtering. This is useful when `isJsonRpcFilterEnabled` was disabled during block processing and later enabled, causing historical blocks to lack SectionBloom data. + +### Available parameters: + +- `-d | --database-directory`: Specify the database directory path, default: output-directory/database. +- `-s | --start-block`: Specify the start block number for backfill (required). +- `-e | --end-block`: Specify the end block number for backfill (optional, default: latest block). +- `-c | --max-concurrency`: Specify the maximum concurrency for processing, default: 8. +- `-f | --force-flush`: Force database flush after each batch, default: true. +- `-h | --help`: Provide the help info. + +### Examples: + +```shell script +# full command + java -jar Toolkit.jar db backfill-bloom [-h] -s= [-e=] [-d=] [-c=] [-f=] +# examples + java -jar Toolkit.jar db backfill-bloom -s 1000000 -e 2000000 #1. backfill blocks 1000000 to 2000000 + java -jar Toolkit.jar db backfill-bloom -s 1000000 -d /path/to/database #2. specify custom database directory + java -jar Toolkit.jar db backfill-bloom -s 1000000 -c 8 #3. use higher concurrency (8 threads) + java -jar Toolkit.jar db backfill-bloom -s 1000000 --force-flush=false #4. disable force flush for better performance +``` diff --git a/plugins/build.gradle b/plugins/build.gradle index 01afaa01708..7c845cbf41f 100644 --- a/plugins/build.gradle +++ b/plugins/build.gradle @@ -33,6 +33,7 @@ dependencies { implementation 'io.github.tronprotocol:leveldbjni-all:1.18.2' implementation 'io.github.tronprotocol:leveldb:1.18.2' implementation project(":protocol") + implementation project(":chainbase") } check.dependsOn 'lint' diff --git a/plugins/src/main/java/org/tron/plugins/Db.java b/plugins/src/main/java/org/tron/plugins/Db.java index 84654dca934..0918d939dc5 100644 --- a/plugins/src/main/java/org/tron/plugins/Db.java +++ b/plugins/src/main/java/org/tron/plugins/Db.java @@ -12,6 +12,7 @@ DbConvert.class, DbLite.class, DbCopy.class, + DbBackfillBloom.class, DbRoot.class }, commandListHeading = "%nCommands:%n%nThe most commonly used db commands are:%n" diff --git a/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java b/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java new file mode 100644 index 00000000000..bcf1cdb1dd6 --- /dev/null +++ b/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java @@ -0,0 +1,471 @@ +package org.tron.plugins; + +import java.io.File; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import lombok.extern.slf4j.Slf4j; +import me.tongfei.progressbar.ProgressBar; +import org.apache.commons.collections4.CollectionUtils; +import org.tron.common.bloom.Bloom; +import org.tron.common.utils.ByteArray; +import org.tron.common.utils.ByteUtil; +import org.tron.core.capsule.TransactionRetCapsule; +import org.tron.core.exception.BadItemException; +import org.tron.core.exception.EventBloomException; +import org.tron.plugins.utils.db.DBInterface; +import org.tron.plugins.utils.db.DbTool; +import picocli.CommandLine; + +@Slf4j(topic = "backfill-bloom") +@CommandLine.Command(name = "backfill-bloom", + description = "Backfill SectionBloom for historical blocks to enable eth_getLogs filtering.", + exitCodeListHeading = "Exit Codes:%n", + exitCodeList = { + "0:Successful", + "1:Internal error: exception occurred, please check toolkit.log"}) +public class DbBackfillBloom implements Callable { + + @CommandLine.Spec + CommandLine.Model.CommandSpec spec; + + @CommandLine.Option(names = { "--database-directory", "-d" }, + defaultValue = "output-directory/database", + description = "Database directory path. Default: ${DEFAULT-VALUE}", order = 1) + private String databaseDirectory; + + @CommandLine.Option(names = { "--start-block", "-s" }, required = true, + description = "Start block number for backfill", order = 2) + private long startBlock; + + @CommandLine.Option(names = { "--end-block", "-e" }, + description = "End block number for backfill (default: latest block)", order = 3) + private Long endBlock; + + private static final int BLOCKS_PER_SECTION = 2048; + + @CommandLine.Option(names = { "--max-concurrency", "-c" }, defaultValue = "8", + description = "Maximum concurrency for processing. Default: ${DEFAULT-VALUE}", + order = 5) + private int maxConcurrency; + + @CommandLine.Option(names = { "--force-flush", "-f" }, defaultValue = "true", + description = "Force database flush after each batch. Default: ${DEFAULT-VALUE}", + order = 6) + private boolean forceFlush; + + @CommandLine.Option(names = { "--help", "-h" }, help = true, + description = "Display help message", order = 7) + private boolean help; + + // Statistics + // Number of blocks traversed (including failed ones) + private final AtomicLong processedBlocks = new AtomicLong(0); + // Number of successfully processed blocks + private final AtomicLong successfulBlocks = new AtomicLong(0); + // Number of blocks containing logs + private final AtomicLong blocksWithLogs = new AtomicLong(0); + // Number of failed blocks processed + private final AtomicLong errorCount = new AtomicLong(0); + // Total number of bloom writes + private final AtomicLong totalBloomWrites = new AtomicLong(0); + + private static class SectionRange { + final long start; + final long end; + final long sectionId; + + SectionRange(long start, long end, long sectionId) { + this.start = start; + this.end = end; + this.sectionId = sectionId; + } + + @Override + public String toString() { + return String.format("Section %d: [%d-%d]", sectionId, start, end); + } + } + + @Override + public Integer call() { + if (help) { + spec.commandLine().usage(System.out); + return 0; + } + + try { + // Validate parameters + if (!validateParameters()) { + return 1; + } + + // Initialize database connections + if (!initializeDatabase()) { + return 1; + } + + // Determine end block if not specified + if (endBlock == null) { + endBlock = getLatestBlockNumber(); + if (endBlock == null) { + spec.commandLine().getErr().println("Failed to determine latest block number"); + return 1; + } + } + + // Validate block range + if (endBlock < startBlock) { + spec.commandLine().getErr().println("End block must be >= start block"); + return 1; + } + + long totalBlocks = endBlock - startBlock + 1; + spec.commandLine().getOut().printf( + "Starting SectionBloom backfill for blocks %d to %d (%d blocks)%n", + startBlock, endBlock, totalBlocks); + + // Process blocks with progress bar + long startTime = System.currentTimeMillis(); + int result = processBlocks(); + long duration = (System.currentTimeMillis() - startTime) / 1000; + + // Print summary + printSummary(duration); + + return result; + + } catch (Exception e) { + logger.error("Backfill failed", e); + spec.commandLine().getErr().println("Backfill failed: " + e.getMessage()); + return 1; + } finally { + DbTool.close(); + } + } + + private boolean validateParameters() { + if (startBlock < 0) { + spec.commandLine().getErr().println("Start block must be >= 0"); + return false; + } + + if (maxConcurrency <= 0 || maxConcurrency > 128) { + spec.commandLine().getErr().println("Max concurrency must be between 1 and 128"); + return false; + } + + File dbDir = new File(databaseDirectory); + if (!dbDir.exists() || !dbDir.isDirectory()) { + spec.commandLine().getErr().println("Database directory does not exist: " + + databaseDirectory); + return false; + } + + return true; + } + + private boolean initializeDatabase() { + try { + // Initialize database connections + DbTool.getDB(databaseDirectory, "transactionRetStore"); + DbTool.getDB(databaseDirectory, "section-bloom"); + + spec.commandLine().getOut().println("Database connections initialized successfully"); + return true; + } catch (Exception e) { + logger.error("Failed to initialize database connections: {}", e.getMessage()); + spec.commandLine().getErr().println("Failed to initialize database connections: " + + e.getMessage()); + return false; + } + } + + private Long getLatestBlockNumber() { + try { + DBInterface blockIndexDb = DbTool.getDB(databaseDirectory, "block_index"); + byte[] latestBlockKey = "latest_block_header_number".getBytes(); + byte[] latestBlockBytes = blockIndexDb.get(latestBlockKey); + + if (latestBlockBytes != null) { + return ByteArray.toLong(latestBlockBytes); + } + return null; + } catch (Exception e) { + logger.error("Failed to get latest block number: {}", e.getMessage()); + return null; + } + } + + private int processBlocks() { + long totalBlocks = endBlock - startBlock + 1; + ExecutorService executor = Executors.newFixedThreadPool(maxConcurrency); + List> futures = new ArrayList<>(); + + try (ProgressBar pb = new ProgressBar("Scanning blocks for SectionBloom backfill", + totalBlocks)) { + + // Calculate the section range to be processed + List sectionRanges = calculateSectionRanges(startBlock, endBlock); + + spec.commandLine().getOut().printf("Processing %d sections with %d threads\n", + sectionRanges.size(), maxConcurrency); + // Submit all section tasks to the thread pool + for (SectionRange range : sectionRanges) { + final long finalSectionStart = range.start; + final long finalSectionEnd = range.end; + + CompletableFuture future = CompletableFuture.runAsync(() -> { + try { + processSection(finalSectionStart, finalSectionEnd, pb); + } catch (Exception e) { + spec.commandLine().getOut().printf("Error processing section %d to %d, %s\n", + finalSectionStart, finalSectionEnd, e); + } + }, executor); + + futures.add(future); + } + + // Wait for all tasks to complete + CompletableFuture allTasks = CompletableFuture.allOf(futures.toArray( + new CompletableFuture[0])); + + try { + allTasks.get(); + spec.commandLine().getOut().printf("All %d batch tasks completed\n", futures.size()); + } catch (Exception e) { + spec.commandLine().getOut().printf("Error waiting for tasks to complete: %s\n", + e.getMessage()); + return 1; + } + + } catch (Exception e) { + spec.commandLine().getOut().printf("Error in progress tracking %s\n", e); + return 1; + } finally { + // 关闭线程池 + executor.shutdown(); + try { + if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { + spec.commandLine().getOut().println("Forcing shutdown of executor..."); + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + return errorCount.get() > 0 ? 1 : 0; + } + + /** + * Calculate the section range to be processed to ensure each thread processes a complete section. + * For example, startBlock=1000 and endBlock=4000 will generate: + * - SectionRange 0: [1000-2047] + * - SectionRange 1: [2048-4000] + */ + private List calculateSectionRanges(long startBlock, long endBlock) { + List ranges = new ArrayList<>(); + + long currentBlock = startBlock; + while (currentBlock <= endBlock) { + // 计算当前区块所属的 section + long sectionId = currentBlock / BLOCKS_PER_SECTION; + + // 计算这个 section 的边界 + long sectionStart = sectionId * BLOCKS_PER_SECTION; + long sectionEnd = sectionStart + BLOCKS_PER_SECTION - 1; + + // 调整为实际需要处理的范围 + long rangeStart = Math.max(currentBlock, sectionStart); + long rangeEnd = Math.min(endBlock, sectionEnd); + + ranges.add(new SectionRange(rangeStart, rangeEnd, sectionId)); + + // 移动到下一个 section + currentBlock = sectionEnd + 1; + } + + return ranges; + } + + private void processSection(long sectionStart, long sectionEnd, ProgressBar pb) { + long sectionId = sectionStart / BLOCKS_PER_SECTION; + try { + DBInterface transactionRetDb = DbTool.getDB(databaseDirectory, "transactionRetStore"); + DBInterface sectionBloomDb = DbTool.getDB(databaseDirectory, "section-bloom"); + + for (long blockNum = sectionStart; blockNum <= sectionEnd; blockNum++) { + try { + processBlock(blockNum, transactionRetDb, sectionBloomDb); + successfulBlocks.incrementAndGet(); + } catch (Exception e) { + spec.commandLine().getOut().printf("Error processing block %d, %s\n", blockNum, e); + errorCount.incrementAndGet(); + } finally { + pb.step(); + } + } + } catch (Exception e) { + spec.commandLine().getOut().printf("Error in section %d processing: %s\n", sectionId, e); + throw new RuntimeException(e); + } + } + + private void processBlock(long blockNum, DBInterface transactionRetDb, DBInterface sectionBloomDb) + throws BadItemException, EventBloomException { + + // Get transaction info for this block + byte[] blockKey = ByteArray.fromLong(blockNum); + byte[] transactionRetData = transactionRetDb.get(blockKey); + + if (transactionRetData == null) { + return; + } + + try { + TransactionRetCapsule transactionRetCapsule = new TransactionRetCapsule(transactionRetData); + + // Create bloom filter for this block using the same logic as SectionBloomStore + Bloom blockBloom = Bloom.createBloom(transactionRetCapsule); + + if (blockBloom != null) { + // Extract bit positions from bloom filter + List bitList = extractBitPositions(blockBloom); + + if (!CollectionUtils.isEmpty(bitList)) { + // Write to section bloom store using the same logic as SectionBloomStore.write + writeSectionBloom(blockNum, bitList, sectionBloomDb); + blocksWithLogs.incrementAndGet(); + } + } + } catch (Exception e) { + spec.commandLine().getOut().printf("Error processing block %d: %s\n", blockNum, + e.getMessage()); + throw e; + } + } + + private List extractBitPositions(Bloom blockBloom) { + List bitList = new ArrayList<>(); + BitSet bs = BitSet.valueOf(blockBloom.getData()); + for (int i = bs.nextSetBit(0); i >= 0; i = bs.nextSetBit(i + 1)) { + // operate on index i here + if (i == Integer.MAX_VALUE) { + break; // or (i+1) would overflow + } + bitList.add(i); + } + return bitList; + } + + private void writeSectionBloom(long blockNum, List bitList, DBInterface sectionBloomDb) + throws EventBloomException { + + int section = (int) (blockNum / BLOCKS_PER_SECTION); + int blockNumOffset = (int) (blockNum % BLOCKS_PER_SECTION); + + for (int bitIndex : bitList) { + // Get existing BitSet from database + BitSet bitSet = getSectionBloomBitSet(section, bitIndex, sectionBloomDb); + if (Objects.isNull(bitSet)) { + bitSet = new BitSet(BLOCKS_PER_SECTION); + } + + // Update the bit for this block + bitSet.set(blockNumOffset); + + // Put back into database + putSectionBloomBitSet(section, bitIndex, bitSet, sectionBloomDb); + totalBloomWrites.incrementAndGet(); + } + } + + private long combineKey(int section, int bitIndex) { + return section * 1_000_000L + bitIndex; + } + + private BitSet getSectionBloomBitSet(int section, int bitIndex, DBInterface sectionBloomDb) + throws EventBloomException { + long keyLong = combineKey(section, bitIndex); + byte[] key = Long.toHexString(keyLong).getBytes(); + byte[] data = sectionBloomDb.get(key); + + if (data == null) { + return null; + } + + try { + byte[] decompressedData = ByteUtil.decompress(data); + return BitSet.valueOf(decompressedData); + } catch (Exception e) { + throw new EventBloomException("decompress byte failed"); + } + } + + private void putSectionBloomBitSet(int section, int bitIndex, BitSet bitSet, + DBInterface sectionBloomDb) + throws EventBloomException { + long keyLong = combineKey(section, bitIndex); + byte[] key = Long.toHexString(keyLong).getBytes(); + + try { + byte[] compressedData = ByteUtil.compress(bitSet.toByteArray()); + sectionBloomDb.put(key, compressedData); + } catch (Exception e) { + throw new EventBloomException("compress byte failed"); + } + } + + private void printSummary(long duration) { + spec.commandLine().getOut().println("\n=== Backfill Summary ==="); + + spec.commandLine().getOut().printf("Total blocks scanned: %d%n", processedBlocks.get()); + spec.commandLine().getOut().printf("Successfully processed: %d%n", successfulBlocks.get()); + spec.commandLine().getOut().printf("Blocks with logs: %d%n", blocksWithLogs.get()); + spec.commandLine().getOut().printf("Errors encountered: %d%n", errorCount.get()); + spec.commandLine().getOut().printf("Duration: %d seconds%n", duration); + + // Success rate statistics + if (processedBlocks.get() > 0) { + double successRate = (double) successfulBlocks.get() / processedBlocks.get() * 100; + double logRate = (double) blocksWithLogs.get() / processedBlocks.get() * 100; + spec.commandLine().getOut().printf("Success rate: %.2f%% (%d/%d)%n", + successRate, successfulBlocks.get(), processedBlocks.get()); + spec.commandLine().getOut().printf("Blocks with logs rate: %.2f%% (%d/%d)%n", + logRate, blocksWithLogs.get(), processedBlocks.get()); + } + + // Performance statistics + spec.commandLine().getOut().printf("Total bloom writes: %d%n", totalBloomWrites.get()); + spec.commandLine().getOut().printf("Max concurrency used: %d threads%n", maxConcurrency); + spec.commandLine().getOut().printf("Section-based processing: No locks needed%n"); + + if (duration > 0) { + spec.commandLine().getOut().printf("Scanning rate: %.2f blocks/second%n", + (double) processedBlocks.get() / duration); + spec.commandLine().getOut().printf("Processing rate: %.2f blocks/second%n", + (double) successfulBlocks.get() / duration); + if (totalBloomWrites.get() > 0) { + spec.commandLine().getOut().printf("Bloom write rate: %.2f writes/second%n", + (double) totalBloomWrites.get() / duration); + } + } + + // Result judgment + if (errorCount.get() == 0) { + spec.commandLine().getOut().println("✓ Backfill completed successfully!"); + } else { + spec.commandLine().getOut().println("⚠ Backfill completed with %d errors."); + } + } +} \ No newline at end of file diff --git a/plugins/src/test/java/org/tron/plugins/DbBackfillBloomTest.java b/plugins/src/test/java/org/tron/plugins/DbBackfillBloomTest.java new file mode 100644 index 00000000000..3d62969a843 --- /dev/null +++ b/plugins/src/test/java/org/tron/plugins/DbBackfillBloomTest.java @@ -0,0 +1,320 @@ +package org.tron.plugins; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.util.UUID; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.MockedStatic; +import org.tron.common.utils.ByteArray; +import org.tron.plugins.utils.db.DBInterface; +import org.tron.plugins.utils.db.DbTool; +import picocli.CommandLine; + +public class DbBackfillBloomTest { + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private String databaseDirectory; + private CommandLine cli; + private ByteArrayOutputStream outputStream; + private ByteArrayOutputStream errorStream; + private PrintStream originalOut; + private PrintStream originalErr; + private MockedStatic dbToolMock; + + @Before + public void setUp() throws IOException { + // Create temporary database directory + databaseDirectory = temporaryFolder.newFolder().toString(); + + // Create the command line interface using Toolkit as the root command + cli = new CommandLine(new Toolkit()); + + // Capture output streams + outputStream = new ByteArrayOutputStream(); + errorStream = new ByteArrayOutputStream(); + originalOut = System.out; + originalErr = System.err; + System.setOut(new PrintStream(outputStream)); + System.setErr(new PrintStream(errorStream)); + + // Mock DbTool static methods + dbToolMock = mockStatic(DbTool.class); + } + + @After + public void tearDown() { + // Restore original streams + System.setOut(originalOut); + System.setErr(originalErr); + + // Close static mock + if (dbToolMock != null) { + dbToolMock.close(); + } + } + + @Test + public void testHelp() { + String[] args = new String[] { "db", "backfill-bloom", "-h" }; + assertEquals(0, cli.execute(args)); + } + + @Test + public void testValidParametersWithMockedDatabase() throws Exception { + // Mock database interfaces + DBInterface transactionRetDb = mock(DBInterface.class); + DBInterface sectionBloomDb = mock(DBInterface.class); + DBInterface blockIndexDb = mock(DBInterface.class); + + // Mock DbTool.getDB calls + dbToolMock.when(() -> DbTool.getDB(anyString(), anyString())) + .thenAnswer(invocation -> { + String dbName = invocation.getArgument(1); + switch (dbName) { + case "transactionRetStore": + return transactionRetDb; + case "section-bloom": + return sectionBloomDb; + case "block_index": + return blockIndexDb; + default: + return mock(DBInterface.class); + } + }); + + // Mock latest block number + when(blockIndexDb.get(any(byte[].class))) + .thenReturn(ByteArray.fromLong(1000L)); + + // Mock empty transaction data (no transactions to process) + when(transactionRetDb.get(any(byte[].class))) + .thenReturn(null); + + String[] args = new String[] { + "db", "backfill-bloom", + "-d", databaseDirectory, + "-s", "100", + "-e", "200", + "-c", "2" + }; + + assertEquals(0, cli.execute(args)); + } + + @Test + public void testInvalidStartBlock() { + String[] args = new String[] { + "db", "backfill-bloom", + "-d", databaseDirectory, + "-s", "-1" + }; + assertEquals(1, cli.execute(args)); + } + + @Test + public void testInvalidConcurrency() { + String[] args = new String[] { + "db", "backfill-bloom", + "-d", databaseDirectory, + "-s", "100", + "-c", "0" + }; + assertEquals(1, cli.execute(args)); + } + + @Test + public void testInvalidConcurrencyTooHigh() { + String[] args = new String[] { + "db", "backfill-bloom", + "-d", databaseDirectory, + "-s", "100", + "-c", "200" + }; + assertEquals(1, cli.execute(args)); + } + + @Test + public void testNonExistentDatabaseDirectory() { + String nonExistentDir = databaseDirectory + File.separator + UUID.randomUUID(); + String[] args = new String[] { + "db", "backfill-bloom", + "-d", nonExistentDir, + "-s", "100" + }; + assertEquals(1, cli.execute(args)); + } + + @Test + public void testEndBlockLessThanStartBlock() throws Exception { + // Mock database interfaces + DBInterface transactionRetDb = mock(DBInterface.class); + DBInterface sectionBloomDb = mock(DBInterface.class); + + dbToolMock.when(() -> DbTool.getDB(anyString(), anyString())) + .thenAnswer(invocation -> { + String dbName = invocation.getArgument(1); + switch (dbName) { + case "transactionRetStore": + return transactionRetDb; + case "section-bloom": + return sectionBloomDb; + default: + return mock(DBInterface.class); + } + }); + + String[] args = new String[] { + "db", "backfill-bloom", + "-d", databaseDirectory, + "-s", "200", + "-e", "100" + }; + assertEquals(1, cli.execute(args)); + } + + @Test + public void testDatabaseInitializationFailure() { + // Mock DbTool to throw exception + dbToolMock.when(() -> DbTool.getDB(anyString(), anyString())) + .thenThrow(new RuntimeException("Database connection failed")); + + String[] args = new String[] { + "db", "backfill-bloom", + "-d", databaseDirectory, + "-s", "100" + }; + assertEquals(1, cli.execute(args)); + } + + @Test + public void testAutoDetectEndBlock() throws Exception { + // Mock database interfaces + DBInterface transactionRetDb = mock(DBInterface.class); + DBInterface sectionBloomDb = mock(DBInterface.class); + DBInterface blockIndexDb = mock(DBInterface.class); + + dbToolMock.when(() -> DbTool.getDB(anyString(), anyString())) + .thenAnswer(invocation -> { + String dbName = invocation.getArgument(1); + switch (dbName) { + case "transactionRetStore": + return transactionRetDb; + case "section-bloom": + return sectionBloomDb; + case "block_index": + return blockIndexDb; + default: + return mock(DBInterface.class); + } + }); + + // Mock latest block number + when(blockIndexDb.get(any(byte[].class))) + .thenReturn(ByteArray.fromLong(5000L)); + + // Mock empty transaction data + when(transactionRetDb.get(any(byte[].class))) + .thenReturn(null); + + String[] args = new String[] { + "db", "backfill-bloom", + "-d", databaseDirectory, + "-s", "100" + // No end block specified - should auto-detect + }; + + assertEquals(0, cli.execute(args)); + } + + @Test + public void testFailedToGetLatestBlockNumber() throws Exception { + // Mock database interfaces + DBInterface transactionRetDb = mock(DBInterface.class); + DBInterface sectionBloomDb = mock(DBInterface.class); + DBInterface blockIndexDb = mock(DBInterface.class); + + dbToolMock.when(() -> DbTool.getDB(anyString(), anyString())) + .thenAnswer(invocation -> { + String dbName = invocation.getArgument(1); + switch (dbName) { + case "transactionRetStore": + return transactionRetDb; + case "section-bloom": + return sectionBloomDb; + case "block_index": + return blockIndexDb; + default: + return mock(DBInterface.class); + } + }); + + // Mock null latest block number (failed to get) + when(blockIndexDb.get(any(byte[].class))) + .thenReturn(null); + + String[] args = new String[] { + "db", "backfill-bloom", + "-d", databaseDirectory, + "-s", "100" + // No end block specified - should fail to auto-detect + }; + + assertEquals(1, cli.execute(args)); + } + + @Test + public void testDefaultParameters() throws Exception { + // Mock database interfaces + DBInterface transactionRetDb = mock(DBInterface.class); + DBInterface sectionBloomDb = mock(DBInterface.class); + DBInterface blockIndexDb = mock(DBInterface.class); + + dbToolMock.when(() -> DbTool.getDB(anyString(), anyString())) + .thenAnswer(invocation -> { + String dbName = invocation.getArgument(1); + switch (dbName) { + case "transactionRetStore": + return transactionRetDb; + case "section-bloom": + return sectionBloomDb; + case "block_index": + return blockIndexDb; + default: + return mock(DBInterface.class); + } + }); + + // Mock latest block number + when(blockIndexDb.get(any(byte[].class))) + .thenReturn(ByteArray.fromLong(1000L)); + + // Mock empty transaction data + when(transactionRetDb.get(any(byte[].class))) + .thenReturn(null); + + // Test with default database directory + String[] args = new String[] { + "db", "backfill-bloom", + "-s", "100", + "-e", "200" + }; + + // This should fail because default directory doesn't exist + assertEquals(1, cli.execute(args)); + } +} From 1bb0e01478c88a7570e5d3b740489661c1a2113b Mon Sep 17 00:00:00 2001 From: h3110w0r1d <2322013233@qq.com> Date: Mon, 7 Jul 2025 16:08:11 +0800 Subject: [PATCH 2/8] feat(toolkit): remove force flush option from backfill command --- plugins/README.md | 2 -- plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java | 5 ----- 2 files changed, 7 deletions(-) diff --git a/plugins/README.md b/plugins/README.md index 16e48760fc7..82729d82996 100644 --- a/plugins/README.md +++ b/plugins/README.md @@ -156,7 +156,6 @@ DB backfill bloom provides the ability to backfill SectionBloom data for histori - `-s | --start-block`: Specify the start block number for backfill (required). - `-e | --end-block`: Specify the end block number for backfill (optional, default: latest block). - `-c | --max-concurrency`: Specify the maximum concurrency for processing, default: 8. -- `-f | --force-flush`: Force database flush after each batch, default: true. - `-h | --help`: Provide the help info. ### Examples: @@ -168,5 +167,4 @@ DB backfill bloom provides the ability to backfill SectionBloom data for histori java -jar Toolkit.jar db backfill-bloom -s 1000000 -e 2000000 #1. backfill blocks 1000000 to 2000000 java -jar Toolkit.jar db backfill-bloom -s 1000000 -d /path/to/database #2. specify custom database directory java -jar Toolkit.jar db backfill-bloom -s 1000000 -c 8 #3. use higher concurrency (8 threads) - java -jar Toolkit.jar db backfill-bloom -s 1000000 --force-flush=false #4. disable force flush for better performance ``` diff --git a/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java b/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java index bcf1cdb1dd6..157205e88ef 100644 --- a/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java +++ b/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java @@ -56,11 +56,6 @@ public class DbBackfillBloom implements Callable { order = 5) private int maxConcurrency; - @CommandLine.Option(names = { "--force-flush", "-f" }, defaultValue = "true", - description = "Force database flush after each batch. Default: ${DEFAULT-VALUE}", - order = 6) - private boolean forceFlush; - @CommandLine.Option(names = { "--help", "-h" }, help = true, description = "Display help message", order = 7) private boolean help; From 3ca81ff015e9d1e4f77ddbe67df56ed589a76867 Mon Sep 17 00:00:00 2001 From: h3110w0r1d <2322013233@qq.com> Date: Tue, 8 Jul 2025 15:04:07 +0800 Subject: [PATCH 3/8] feat(toolkit): modify eth_getLogs to `eth_getLogs` --- plugins/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/README.md b/plugins/README.md index 82729d82996..422a09b4841 100644 --- a/plugins/README.md +++ b/plugins/README.md @@ -148,7 +148,7 @@ NOTE: large db may GC overhead limit exceeded. ## DB Backfill-Bloom -DB backfill bloom provides the ability to backfill SectionBloom data for historical blocks to enable eth_getLogs address/topics filtering. This is useful when `isJsonRpcFilterEnabled` was disabled during block processing and later enabled, causing historical blocks to lack SectionBloom data. +DB backfill bloom provides the ability to backfill SectionBloom data for historical blocks to enable `eth_getLogs` address/topics filtering. This is useful when `isJsonRpcFilterEnabled` was disabled during block processing and later enabled, causing historical blocks to lack SectionBloom data. ### Available parameters: From 5c46e3459b8095173b5c976be17811a710371ea5 Mon Sep 17 00:00:00 2001 From: h3110w0r1d <2322013233@qq.com> Date: Tue, 8 Jul 2025 15:18:46 +0800 Subject: [PATCH 4/8] feat(toolkit): change all comments to English --- .../src/main/java/org/tron/plugins/DbBackfillBloom.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java b/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java index 157205e88ef..810561c0e78 100644 --- a/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java +++ b/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java @@ -273,20 +273,18 @@ private List calculateSectionRanges(long startBlock, long endBlock long currentBlock = startBlock; while (currentBlock <= endBlock) { - // 计算当前区块所属的 section + // Calculate the section to which the current block belongs long sectionId = currentBlock / BLOCKS_PER_SECTION; - // 计算这个 section 的边界 + // Calculate the boundaries of this section long sectionStart = sectionId * BLOCKS_PER_SECTION; long sectionEnd = sectionStart + BLOCKS_PER_SECTION - 1; - // 调整为实际需要处理的范围 + // Adjust to the actual range that needs to be processed long rangeStart = Math.max(currentBlock, sectionStart); long rangeEnd = Math.min(endBlock, sectionEnd); ranges.add(new SectionRange(rangeStart, rangeEnd, sectionId)); - - // 移动到下一个 section currentBlock = sectionEnd + 1; } From b0de886d5c700d88856cd77369fbfeccf38334c1 Mon Sep 17 00:00:00 2001 From: h3110w0r1d <2322013233@qq.com> Date: Tue, 8 Jul 2025 16:05:43 +0800 Subject: [PATCH 5/8] feat(toolkit): update README with database directory usage and backfill speed recommendations --- plugins/README.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/plugins/README.md b/plugins/README.md index 422a09b4841..966e3459f5c 100644 --- a/plugins/README.md +++ b/plugins/README.md @@ -152,7 +152,7 @@ DB backfill bloom provides the ability to backfill SectionBloom data for histori ### Available parameters: -- `-d | --database-directory`: Specify the database directory path, default: output-directory/database. +- `-d | --database-directory`: Specify the database directory path, it is used to open the database to get the transaction log and write the SectionBloom data back, default: output-directory/database. - `-s | --start-block`: Specify the start block number for backfill (required). - `-e | --end-block`: Specify the end block number for backfill (optional, default: latest block). - `-c | --max-concurrency`: Specify the maximum concurrency for processing, default: 8. @@ -168,3 +168,10 @@ DB backfill bloom provides the ability to backfill SectionBloom data for histori java -jar Toolkit.jar db backfill-bloom -s 1000000 -d /path/to/database #2. specify custom database directory java -jar Toolkit.jar db backfill-bloom -s 1000000 -c 8 #3. use higher concurrency (8 threads) ``` + +### Backfill speed + +The time required to process different block ranges varies. It is recommended to increase `--max-concurrency` appropriately to speed up the backfill process. + +- 0-10000000: It's done almost instantly because there are no logs inside. +- 10000000-70000000: Takes about 3-4 hours/10,000,000 blocks with `--max-concurrency` set to 32. From 6c452eaf52c0bf32bdb696640f00cf8d9be7c0cc Mon Sep 17 00:00:00 2001 From: h3110w0r1d <2322013233@qq.com> Date: Tue, 8 Jul 2025 17:10:49 +0800 Subject: [PATCH 6/8] fix(toolkit): fix wrong database name --- .../src/main/java/org/tron/plugins/DbBackfillBloom.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java b/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java index 810561c0e78..8c33b30373b 100644 --- a/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java +++ b/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java @@ -110,7 +110,7 @@ public Integer call() { // Determine end block if not specified if (endBlock == null) { endBlock = getLatestBlockNumber(); - if (endBlock == null) { + if (endBlock == null || endBlock == 0) { spec.commandLine().getErr().println("Failed to determine latest block number"); return 1; } @@ -185,9 +185,9 @@ private boolean initializeDatabase() { private Long getLatestBlockNumber() { try { - DBInterface blockIndexDb = DbTool.getDB(databaseDirectory, "block_index"); + DBInterface propertiesDb = DbTool.getDB(databaseDirectory, "properties"); byte[] latestBlockKey = "latest_block_header_number".getBytes(); - byte[] latestBlockBytes = blockIndexDb.get(latestBlockKey); + byte[] latestBlockBytes = propertiesDb.get(latestBlockKey); if (latestBlockBytes != null) { return ByteArray.toLong(latestBlockBytes); @@ -211,7 +211,7 @@ private int processBlocks() { List sectionRanges = calculateSectionRanges(startBlock, endBlock); spec.commandLine().getOut().printf("Processing %d sections with %d threads\n", - sectionRanges.size(), maxConcurrency); + sectionRanges.size(), Math.min(maxConcurrency, sectionRanges.size())); // Submit all section tasks to the thread pool for (SectionRange range : sectionRanges) { final long finalSectionStart = range.start; From 167b3662cc2b7ed288a7a74efd7821965b412542 Mon Sep 17 00:00:00 2001 From: h3110w0r1d <2322013233@qq.com> Date: Tue, 8 Jul 2025 17:43:11 +0800 Subject: [PATCH 7/8] feat(toolkit): change comments to English --- plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java b/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java index 8c33b30373b..1d924eb0bed 100644 --- a/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java +++ b/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java @@ -246,7 +246,7 @@ private int processBlocks() { spec.commandLine().getOut().printf("Error in progress tracking %s\n", e); return 1; } finally { - // 关闭线程池 + // Close thread pool executor.shutdown(); try { if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { From 63fd45c12a150dfe87f6da0a75c0ba8945ab5b93 Mon Sep 17 00:00:00 2001 From: h3110w0r1d <2322013233@qq.com> Date: Thu, 10 Jul 2025 10:45:13 +0800 Subject: [PATCH 8/8] feat(toolkit): optimize thread pool concurrency calculation --- .../main/java/org/tron/plugins/DbBackfillBloom.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java b/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java index 1d924eb0bed..790b6499cc5 100644 --- a/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java +++ b/plugins/src/main/java/org/tron/plugins/DbBackfillBloom.java @@ -201,17 +201,17 @@ private Long getLatestBlockNumber() { private int processBlocks() { long totalBlocks = endBlock - startBlock + 1; + // Calculate the section range to be processed + List sectionRanges = calculateSectionRanges(startBlock, endBlock); + + maxConcurrency = Math.min(maxConcurrency, sectionRanges.size()); ExecutorService executor = Executors.newFixedThreadPool(maxConcurrency); List> futures = new ArrayList<>(); try (ProgressBar pb = new ProgressBar("Scanning blocks for SectionBloom backfill", totalBlocks)) { - - // Calculate the section range to be processed - List sectionRanges = calculateSectionRanges(startBlock, endBlock); - spec.commandLine().getOut().printf("Processing %d sections with %d threads\n", - sectionRanges.size(), Math.min(maxConcurrency, sectionRanges.size())); + sectionRanges.size(), maxConcurrency); // Submit all section tasks to the thread pool for (SectionRange range : sectionRanges) { final long finalSectionStart = range.start;