From 4c34a12a3027ffcb705a1b60886cbcb779d966d7 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Tue, 10 Jun 2025 17:10:04 +0530 Subject: [PATCH 01/10] Bug fixes --- .../com/scalar/db/common/error/CoreError.java | 6 ++-- .../dataimport/processor/ImportProcessor.java | 18 +++------- .../core/dataimport/task/ImportTask.java | 7 ++-- .../db/dataloader/core/util/ColumnUtils.java | 33 ++++++++++++++----- .../dataloader/core/util/ColumnUtilsTest.java | 18 +++++++--- 5 files changed, 51 insertions(+), 31 deletions(-) diff --git a/core/src/main/java/com/scalar/db/common/error/CoreError.java b/core/src/main/java/com/scalar/db/common/error/CoreError.java index b6e450107e..5aae66ab5c 100644 --- a/core/src/main/java/com/scalar/db/common/error/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/error/CoreError.java @@ -689,13 +689,13 @@ public enum CoreError implements ScalarDbError { DATA_LOADER_INVALID_BASE64_ENCODING_FOR_COLUMN_VALUE( Category.USER_ERROR, "0149", - "Invalid base64 encoding for blob value for column %s in table %s in namespace %s", + "Invalid base64 encoding for blob value '%s' for column %s in table %s in namespace %s", "", ""), DATA_LOADER_INVALID_NUMBER_FORMAT_FOR_COLUMN_VALUE( Category.USER_ERROR, "0150", - "Invalid number specified for column %s in table %s in namespace %s", + "Invalid number '%s' specified for column %s in table %s in namespace %s", "", ""), DATA_LOADER_ERROR_METHOD_NULL_ARGUMENT( @@ -899,7 +899,7 @@ public enum CoreError implements ScalarDbError { DATA_LOADER_INVALID_DATE_TIME_FOR_COLUMN_VALUE( Category.USER_ERROR, "0199", - "Invalid date time value specified for column %s in table %s in namespace %s.", + "Invalid date time value '%s' specified for column %s in table %s in namespace %s.", "", ""), DATA_LOADER_NULL_OR_EMPTY_KEY_VALUE_INPUT( diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java index 3f191f7259..c11ae3f655 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java @@ -416,19 +416,11 @@ private ImportDataChunkStatus processDataChunkWithTransactions( ImportTransactionBatchResult importTransactionBatchResult = processTransactionBatch(dataChunk.getDataChunkId(), transactionBatch); - importTransactionBatchResult - .getRecords() - .forEach( - batchRecords -> { - if (batchRecords.getTargets().stream() - .allMatch( - targetResult -> - targetResult.getStatus().equals(ImportTargetResultStatus.SAVED))) { - successCount.incrementAndGet(); - } else { - failureCount.incrementAndGet(); - } - }); + if (importTransactionBatchResult.isSuccess()) { + successCount.addAndGet(importTransactionBatchResult.getRecords().size()); + } else { + failureCount.addAndGet(importTransactionBatchResult.getRecords().size()); + } } Instant endTime = Instant.now(); int totalDuration = (int) Duration.between(startTime, endTime).toMillis(); diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java index 26b4993977..5e64e4d63d 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java @@ -73,7 +73,7 @@ public ImportTaskResult execute() { return ImportTaskResult.builder() .rawRecord(params.getSourceRecord()) .rowNumber(params.getRowNumber()) - .targets(Collections.singletonList(singleTargetResult)) + .targets(new ArrayList<>(Collections.singletonList(singleTargetResult))) .build(); } @@ -308,11 +308,14 @@ && shouldRevalidateMissingColumns(importOptions, checkForMissingColumns)) { optionalScalarDBResult.orElse(null), mutableSourceRecord, importOptions.isIgnoreNullValues(), - tableMetadata); + tableMetadata, + namespace, + table); } catch (Base64Exception | ColumnParsingException e) { return ImportTargetResult.builder() .namespace(namespace) .tableName(table) + .importedRecord(mutableSourceRecord) .status(ImportTargetResultStatus.VALIDATION_FAILED) .errors(Collections.singletonList(e.getMessage())) .build(); diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/ColumnUtils.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/ColumnUtils.java index d6a653cced..90001ed062 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/ColumnUtils.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/ColumnUtils.java @@ -134,17 +134,17 @@ public static Column createColumnFromValue( } catch (NumberFormatException e) { throw new ColumnParsingException( CoreError.DATA_LOADER_INVALID_NUMBER_FORMAT_FOR_COLUMN_VALUE.buildMessage( - columnName, columnInfo.getTableName(), columnInfo.getNamespace()), + value, columnName, columnInfo.getTableName(), columnInfo.getNamespace()), e); } catch (DateTimeParseException e) { throw new ColumnParsingException( CoreError.DATA_LOADER_INVALID_DATE_TIME_FOR_COLUMN_VALUE.buildMessage( - columnName, columnInfo.getTableName(), columnInfo.getNamespace()), + value, columnName, columnInfo.getTableName(), columnInfo.getNamespace()), e); } catch (IllegalArgumentException e) { throw new ColumnParsingException( CoreError.DATA_LOADER_INVALID_BASE64_ENCODING_FOR_COLUMN_VALUE.buildMessage( - columnName, columnInfo.getTableName(), columnInfo.getNamespace()), + value, columnName, columnInfo.getTableName(), columnInfo.getNamespace()), e); } } @@ -166,6 +166,8 @@ public static Column createColumnFromValue( * @param sourceRecord the source data in JSON format to compare against * @param ignoreNullValues if true, null values will be excluded from the result * @param tableMetadata metadata about the table structure and column types + * @param namespace namespace in which the table is present + * @param table table name to which data is to be imported * @return a List of Column objects representing the processed data * @throws Base64Exception if there's an error processing base64 encoded BLOB data * @throws ColumnParsingException if there's an error parsing column values @@ -174,7 +176,9 @@ public static List> getColumnsFromResult( Result scalarDBResult, JsonNode sourceRecord, boolean ignoreNullValues, - TableMetadata tableMetadata) + TableMetadata tableMetadata, + String namespace, + String table) throws Base64Exception, ColumnParsingException { List> columns = new ArrayList<>(); @@ -193,7 +197,9 @@ public static List> getColumnsFromResult( sourceRecord, columnName, ignoreNullValues, - tableMetadata.getColumnDataTypes()); + tableMetadata.getColumnDataTypes(), + namespace, + table); if (column != null) { columns.add(column); @@ -242,6 +248,8 @@ private static Set getColumnsToIgnore( * @param columnName the name of the column to retrieve * @param ignoreNullValues whether to ignore null values in the result * @param dataTypesByColumns mapping of column names to their data types + * @param namespace namespace in which the table is present + * @param table table name to which data is to be imported * @return the Column object containing the value, or null if ignored * @throws ColumnParsingException if there's an error parsing the column value */ @@ -250,13 +258,15 @@ private static Column getColumn( JsonNode sourceRecord, String columnName, boolean ignoreNullValues, - Map dataTypesByColumns) + Map dataTypesByColumns, + String namespace, + String table) throws ColumnParsingException { if (scalarDBResult != null && !sourceRecord.has(columnName)) { return getColumnFromResult(scalarDBResult, columnName); } else { return getColumnFromSourceRecord( - sourceRecord, columnName, ignoreNullValues, dataTypesByColumns); + sourceRecord, columnName, ignoreNullValues, dataTypesByColumns, namespace, table); } } @@ -279,6 +289,8 @@ private static Column getColumnFromResult(Result scalarDBResult, String colum * @param columnName column name * @param ignoreNullValues ignore null values or not * @param dataTypesByColumns data types of columns + * @param namespace namespace in which the table is present + * @param table table name to which data is to be imported * @return column data * @throws ColumnParsingException if an error occurs while parsing the column */ @@ -286,7 +298,9 @@ private static Column getColumnFromSourceRecord( JsonNode sourceRecord, String columnName, boolean ignoreNullValues, - Map dataTypesByColumns) + Map dataTypesByColumns, + String namespace, + String table) throws ColumnParsingException { DataType dataType = dataTypesByColumns.get(columnName); String columnValue = @@ -294,7 +308,8 @@ private static Column getColumnFromSourceRecord( ? sourceRecord.get(columnName).asText() : null; if (!ignoreNullValues || columnValue != null) { - ColumnInfo columnInfo = ColumnInfo.builder().columnName(columnName).build(); + ColumnInfo columnInfo = + ColumnInfo.builder().columnName(columnName).tableName(table).namespace(namespace).build(); return createColumnFromValue(dataType, columnInfo, columnValue); } return null; diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/ColumnUtilsTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/ColumnUtilsTest.java index 49616cb02e..2ecd782fa6 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/ColumnUtilsTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/util/ColumnUtilsTest.java @@ -161,7 +161,7 @@ void createColumnFromValue_invalidNumberFormat_throwsNumberFormatException() { () -> ColumnUtils.createColumnFromValue(DataType.INT, columnInfo, value)); assertEquals( CoreError.DATA_LOADER_INVALID_NUMBER_FORMAT_FOR_COLUMN_VALUE.buildMessage( - columnName, "table", "ns"), + value, columnName, "table", "ns"), exception.getMessage()); } @@ -181,7 +181,7 @@ void createColumnFromValue_invalidBase64_throwsBase64Exception() { () -> ColumnUtils.createColumnFromValue(DataType.BLOB, columnInfo, value)); assertEquals( CoreError.DATA_LOADER_INVALID_BASE64_ENCODING_FOR_COLUMN_VALUE.buildMessage( - columnName, "table", "ns"), + value, columnName, "table", "ns"), exception.getMessage()); } /** @@ -200,7 +200,7 @@ void createColumnFromValue_invalidDateTimeFormat_throwsDateTimeParseException() () -> ColumnUtils.createColumnFromValue(DataType.TIMESTAMP, columnInfo, value)); assertEquals( CoreError.DATA_LOADER_INVALID_DATE_TIME_FOR_COLUMN_VALUE.buildMessage( - columnName, "table", "ns"), + value, columnName, "table", "ns"), exception.getMessage()); } @@ -215,7 +215,17 @@ void createColumnFromValue_invalidDateTimeFormat_throwsDateTimeParseException() void getColumnsFromResult_withValidData_shouldReturnColumns() throws Base64Exception, ColumnParsingException { List> columns = - ColumnUtils.getColumnsFromResult(scalarDBResult, sourceRecord, false, mockMetadata); + ColumnUtils.getColumnsFromResult( + scalarDBResult, sourceRecord, false, mockMetadata, "namespace", "table"); + assertEquals(8, columns.size()); + } + + @Test + void getColumnsFromResult_withResultNull_withValidData_shouldReturnColumns() + throws Base64Exception, ColumnParsingException { + List> columns = + ColumnUtils.getColumnsFromResult( + null, sourceRecord, false, mockMetadata, "namespace", "table"); assertEquals(8, columns.size()); } } From 912dd3b50a13be374c9ef226f7f17ec4260602c9 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Wed, 11 Jun 2025 09:43:32 +0530 Subject: [PATCH 02/10] Minor change --- core/src/main/java/com/scalar/db/common/error/CoreError.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/com/scalar/db/common/error/CoreError.java b/core/src/main/java/com/scalar/db/common/error/CoreError.java index 5aae66ab5c..6730c6c992 100644 --- a/core/src/main/java/com/scalar/db/common/error/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/error/CoreError.java @@ -689,7 +689,7 @@ public enum CoreError implements ScalarDbError { DATA_LOADER_INVALID_BASE64_ENCODING_FOR_COLUMN_VALUE( Category.USER_ERROR, "0149", - "Invalid base64 encoding for blob value '%s' for column %s in table %s in namespace %s", + "Invalid base64 encoding for blob value '%s' for column %s in table %s in namespace %s.", "", ""), DATA_LOADER_INVALID_NUMBER_FORMAT_FOR_COLUMN_VALUE( From d08f8371b7b292f8f61161371cdb384c974915eb Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Wed, 11 Jun 2025 14:21:44 +0530 Subject: [PATCH 03/10] Initial changes [skip ci] --- .../ConsoleImportProgressListener.java | 119 ++++++++++++++++++ .../cli/command/dataimport/ImportCommand.java | 2 + 2 files changed, 121 insertions(+) create mode 100644 data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java new file mode 100644 index 0000000000..acb210a00f --- /dev/null +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java @@ -0,0 +1,119 @@ +package com.scalar.db.dataloader.cli.command.dataimport; + +import com.scalar.db.dataloader.core.dataimport.ImportEventListener; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + +public class ConsoleImportProgressListener implements ImportEventListener { + + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final long startTime; + private final Map chunkLogs = new ConcurrentHashMap<>(); + private final Map chunkFailureLogs = new ConcurrentHashMap<>(); + private final AtomicLong totalRecords = new AtomicLong(); + private volatile boolean completed = false; + + public ConsoleImportProgressListener(Duration updateInterval) { + this.startTime = System.currentTimeMillis(); + scheduler.scheduleAtFixedRate( + this::render, 0, updateInterval.toMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public void onDataChunkStarted(ImportDataChunkStatus status) { + chunkLogs.put( + status.getDataChunkId(), + String.format( + "šŸ”„ Chunk %d: Processing... %d records so far", + status.getDataChunkId(), status.getTotalRecords())); + } + + @Override + public void onDataChunkCompleted(ImportDataChunkStatus status) { + long elapsed = System.currentTimeMillis() - status.getStartTime().toEpochMilli(); + totalRecords.addAndGet(status.getTotalRecords()); + if (status.getSuccessCount() > 0) { + chunkLogs.put( + status.getDataChunkId(), + String.format( + "āœ“ Chunk %d: %,d records imported (%.1fs), %d records imported successfully, import of %d records failed", + status.getDataChunkId(), + status.getTotalRecords(), + elapsed / 1000.0, + status.getSuccessCount(), + status.getFailureCount())); + } + } + + @Override + public void onAllDataChunksCompleted() { + completed = true; + scheduler.shutdown(); + render(); // Final render + } + + @Override + public void onTransactionBatchStarted(ImportTransactionBatchStatus batchStatus) { + // Optional: Implement if you want to show more granular batch progress + } + + @Override + public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult) { + if (!batchResult.isSuccess()) { + chunkFailureLogs.put( + batchResult.getDataChunkId(), + String.format( + "āŒ Chunk %d: Transaction batch %d Failed - %d records failed to be imported) ", + batchResult.getDataChunkId(), + batchResult.getTransactionBatchId(), + batchResult.getRecords().size())); + } + // Optional: Implement error reporting or success/failure count + } + + @Override + public void onTaskComplete(ImportTaskResult taskResult) { + // Optional: Summary or stats after final chunk + } + + private void render() { + StringBuilder builder = new StringBuilder(); + long now = System.currentTimeMillis(); + long elapsed = now - startTime; + double recPerSec = (totalRecords.get() * 1000.0) / (elapsed == 0 ? 1 : elapsed); + + builder.append( + String.format( + "\rImporting... %,d records | %.0f rec/s | %s\n", + totalRecords.get(), recPerSec, formatElapsed(elapsed))); + + chunkLogs.values().stream() + .sorted() // Optional: stable ordering + .forEach(line -> builder.append(line).append("\n")); + chunkFailureLogs.values().stream() + .sorted() // Optional: stable ordering + .forEach(line -> builder.append(line).append("\n")); + + clearConsole(); + System.out.print(builder); + System.out.flush(); + } + + private String formatElapsed(long elapsedMillis) { + long seconds = (elapsedMillis / 1000) % 60; + long minutes = (elapsedMillis / 1000) / 60; + return String.format("%dm %ds elapsed", minutes, seconds); + } + + private void clearConsole() { + // Clear screen for updated multiline rendering + System.out.print("\033[H\033[2J"); // ANSI escape for clearing screen + System.out.flush(); + } +} diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java index a505a42ade..e19493b9ad 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java @@ -34,6 +34,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -80,6 +81,7 @@ public Integer call() throws Exception { Files.newBufferedReader(Paths.get(sourceFilePath), Charset.defaultCharset())) { ImportManager importManager = createImportManager(importOptions, tableMetadataMap, reader, logWriterFactory, config); + importManager.addListener(new ConsoleImportProgressListener(Duration.ofMillis(1000))); importManager.startImport(); } return 0; From 020f1d59ae003c268ff0d8b4840301355ed1a705 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Fri, 13 Jun 2025 08:57:48 +0530 Subject: [PATCH 04/10] Minor change [skip ci] --- .../cli/command/dataimport/ConsoleImportProgressListener.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java index acb210a00f..2d49054088 100644 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java @@ -60,7 +60,7 @@ public void onAllDataChunksCompleted() { @Override public void onTransactionBatchStarted(ImportTransactionBatchStatus batchStatus) { - // Optional: Implement if you want to show more granular batch progress + } @Override @@ -79,7 +79,6 @@ public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult @Override public void onTaskComplete(ImportTaskResult taskResult) { - // Optional: Summary or stats after final chunk } private void render() { From 45481b31e4f1e9281d9b02f1bd94b3dbc963c267 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Fri, 13 Jun 2025 12:11:20 +0530 Subject: [PATCH 05/10] changes --- .../cli/command/dataimport/ConsoleImportProgressListener.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java index 2d49054088..6535ce7f4d 100644 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java @@ -36,7 +36,7 @@ public void onDataChunkStarted(ImportDataChunkStatus status) { @Override public void onDataChunkCompleted(ImportDataChunkStatus status) { - long elapsed = System.currentTimeMillis() - status.getStartTime().toEpochMilli(); + long elapsed = status.getEndTime().toEpochMilli() - status.getStartTime().toEpochMilli(); totalRecords.addAndGet(status.getTotalRecords()); if (status.getSuccessCount() > 0) { chunkLogs.put( From 4a8dada6e3b238afd58ec4cc80561dd994b29947 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Wed, 18 Jun 2025 15:32:25 +0530 Subject: [PATCH 06/10] Added import relatd changes --- .../ConsoleImportProgressListener.java | 64 ++++--- .../cli/command/dataimport/ImportCommand.java | 2 +- .../ConsoleImportProgressListenerTest.java | 165 ++++++++++++++++++ 3 files changed, 206 insertions(+), 25 deletions(-) create mode 100644 data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListenerTest.java diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java index 6535ce7f4d..85cbff5738 100644 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java @@ -7,20 +7,25 @@ import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus; import java.time.Duration; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class ConsoleImportProgressListener implements ImportEventListener { private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); - private final long startTime; + private final long startTime; private final Map chunkLogs = new ConcurrentHashMap<>(); private final Map chunkFailureLogs = new ConcurrentHashMap<>(); private final AtomicLong totalRecords = new AtomicLong(); + private final AtomicLong totalSuccess = new AtomicLong(); + private final AtomicLong totalFailures = new AtomicLong(); private volatile boolean completed = false; public ConsoleImportProgressListener(Duration updateInterval) { - this.startTime = System.currentTimeMillis(); + startTime = System.currentTimeMillis(); scheduler.scheduleAtFixedRate( this::render, 0, updateInterval.toMillis(), TimeUnit.MILLISECONDS); } @@ -30,14 +35,16 @@ public void onDataChunkStarted(ImportDataChunkStatus status) { chunkLogs.put( status.getDataChunkId(), String.format( - "šŸ”„ Chunk %d: Processing... %d records so far", - status.getDataChunkId(), status.getTotalRecords())); + "šŸ”„ Chunk %d: Processing... %,d records so far", + status.getDataChunkId(), totalRecords.get())); } @Override public void onDataChunkCompleted(ImportDataChunkStatus status) { long elapsed = status.getEndTime().toEpochMilli() - status.getStartTime().toEpochMilli(); totalRecords.addAndGet(status.getTotalRecords()); + totalSuccess.addAndGet(status.getSuccessCount()); + totalFailures.addAndGet(status.getFailureCount()); if (status.getSuccessCount() > 0) { chunkLogs.put( status.getDataChunkId(), @@ -51,16 +58,9 @@ public void onDataChunkCompleted(ImportDataChunkStatus status) { } } - @Override - public void onAllDataChunksCompleted() { - completed = true; - scheduler.shutdown(); - render(); // Final render - } - @Override public void onTransactionBatchStarted(ImportTransactionBatchStatus batchStatus) { - + // Not used currently, but could be extended for detailed batch-level progress } @Override @@ -69,16 +69,31 @@ public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult chunkFailureLogs.put( batchResult.getDataChunkId(), String.format( - "āŒ Chunk %d: Transaction batch %d Failed - %d records failed to be imported) ", + "āŒ Chunk id: %d, Transaction batch id: %d failed: %,d records could not be imported", batchResult.getDataChunkId(), batchResult.getTransactionBatchId(), batchResult.getRecords().size())); } - // Optional: Implement error reporting or success/failure count } @Override public void onTaskComplete(ImportTaskResult taskResult) { + // No-op currently, could be extended to summarize task-level results + } + + @Override + public void onAllDataChunksCompleted() { + completed = true; + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(2, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + scheduler.shutdownNow(); + } + render(); // Final render after shutdown } private void render() { @@ -92,13 +107,15 @@ private void render() { "\rImporting... %,d records | %.0f rec/s | %s\n", totalRecords.get(), recPerSec, formatElapsed(elapsed))); - chunkLogs.values().stream() - .sorted() // Optional: stable ordering - .forEach(line -> builder.append(line).append("\n")); - chunkFailureLogs.values().stream() - .sorted() // Optional: stable ordering - .forEach(line -> builder.append(line).append("\n")); + chunkLogs.values().stream().sorted().forEach(line -> builder.append(line).append("\n")); + chunkFailureLogs.values().stream().sorted().forEach(line -> builder.append(line).append("\n")); + if (completed) { + builder.append( + String.format( + "\nāœ… Import completed: %,d records succeeded, %,d failed\n", + totalSuccess.get(), totalFailures.get())); + } clearConsole(); System.out.print(builder); System.out.flush(); @@ -107,12 +124,11 @@ private void render() { private String formatElapsed(long elapsedMillis) { long seconds = (elapsedMillis / 1000) % 60; long minutes = (elapsedMillis / 1000) / 60; - return String.format("%dm %ds elapsed", minutes, seconds); + return String.format("%dm %02ds elapsed", minutes, seconds); } private void clearConsole() { - // Clear screen for updated multiline rendering - System.out.print("\033[H\033[2J"); // ANSI escape for clearing screen + System.out.print("\033[H\033[2J"); // ANSI escape to clear screen System.out.flush(); } } diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java index e19493b9ad..b4fc44c58f 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java @@ -81,7 +81,7 @@ public Integer call() throws Exception { Files.newBufferedReader(Paths.get(sourceFilePath), Charset.defaultCharset())) { ImportManager importManager = createImportManager(importOptions, tableMetadataMap, reader, logWriterFactory, config); - importManager.addListener(new ConsoleImportProgressListener(Duration.ofMillis(1000))); + importManager.addListener(new ConsoleImportProgressListener(Duration.ofSeconds(1))); importManager.startImport(); } return 0; diff --git a/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListenerTest.java b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListenerTest.java new file mode 100644 index 0000000000..cd9047d1e9 --- /dev/null +++ b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListenerTest.java @@ -0,0 +1,165 @@ +package com.scalar.db.dataloader.cli.command.dataimport; + +import static org.junit.jupiter.api.Assertions.*; + +import com.fasterxml.jackson.databind.node.TextNode; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatusState; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.*; + +class ConsoleImportProgressListenerTest { + + private final ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + private final PrintStream originalOut = System.out; + private ConsoleImportProgressListener listener; + + @BeforeEach + void setUp() { + System.setOut(new PrintStream(outContent)); + listener = new ConsoleImportProgressListener(Duration.ofMillis(100)); + } + + @AfterEach + void tearDown() { + System.setOut(originalOut); + } + + @Test + void testOnDataChunkStartedAndCompleted_shouldTrackSuccessAndFailureCounts() + throws InterruptedException { + int chunkId = 1; + Instant start = Instant.now(); + TimeUnit.MILLISECONDS.sleep(50); + Instant end = Instant.now(); + + ImportDataChunkStatus startStatus = + ImportDataChunkStatus.builder() + .dataChunkId(chunkId) + .startTime(start) + .totalRecords(100) + .successCount(0) + .failureCount(0) + .batchCount(0) + .endTime(null) + .totalDurationInMilliSeconds(0) + .status(ImportDataChunkStatusState.IN_PROGRESS) + .build(); + + listener.onDataChunkStarted(startStatus); + + ImportDataChunkStatus completedStatus = + ImportDataChunkStatus.builder() + .dataChunkId(chunkId) + .startTime(start) + .endTime(end) + .totalRecords(100) + .successCount(90) + .failureCount(10) + .batchCount(1) + .totalDurationInMilliSeconds((int) (end.toEpochMilli() - start.toEpochMilli())) + .status(ImportDataChunkStatusState.COMPLETE) + .build(); + + listener.onDataChunkCompleted(completedStatus); + + TimeUnit.MILLISECONDS.sleep(200); // Allow render + listener.onAllDataChunksCompleted(); + + String output = outContent.toString(); + + assertTrue(output.contains("āœ“ Chunk 1"), "Expected chunk log line"); + assertTrue(output.contains("90 records imported successfully"), "Expected success count"); + assertTrue(output.contains("10 records failed"), "Expected failure count"); + assertTrue( + output.contains("āœ… Import completed: 90 records succeeded, 10 failed"), + "Expected final summary"); + } + + @Test + void testOnTransactionBatchFailed_shouldAccumulateFailuresInChunkFailureLogs() + throws InterruptedException { + // Arrange: Build dummy failed batch + ImportTaskResult task = + ImportTaskResult.builder() + .rowNumber(1) + .targets(Collections.emptyList()) + .rawRecord(TextNode.valueOf("{\"id\":1}")) + .dataChunkId(7) + .build(); + + ImportTransactionBatchResult failedBatch = + ImportTransactionBatchResult.builder() + .transactionBatchId(2) + .dataChunkId(7) + .transactionId("txn-123") + .records(Collections.nCopies(5, task)) + .errors(Arrays.asList("error1", "error2")) + .success(false) + .build(); + + // Act + listener.onTransactionBatchCompleted(failedBatch); + + // Trigger final flush + listener.onAllDataChunksCompleted(); + + // Give time for render() to complete after scheduler shutdown + TimeUnit.MILLISECONDS.sleep(500); + + // Assert + String output = outContent.toString(); + System.out.println("OUTPUT:\n" + output); // Useful for debug + assertTrue( + output.contains("āŒ Chunk id: 7, Transaction batch id: 2 failed"), + "Expected failure message"); + // assertTrue(output.contains("5 records failed to be imported"), "Expected failed record + // count in batch"); + // assertTrue(output.contains("āœ… Import completed: 0 records succeeded, 5 failed"), + // "Expected final summary with failure count"); + } + + @Test + void testOnAllDataChunksCompleted_shouldShutdownAndPrintFinalSummary() { + listener.onAllDataChunksCompleted(); + String output = outContent.toString(); + assertTrue(output.contains("āœ… Import completed"), "Should print completion summary"); + } + + @Test + void testOnTransactionBatchStarted_shouldRunWithoutException() { + ImportTransactionBatchStatus status = + ImportTransactionBatchStatus.builder() + .dataChunkId(3) + .transactionBatchId(1) + .transactionId("txn-1") + .records(Collections.emptyList()) + .errors(Collections.emptyList()) + .success(true) + .build(); + + listener.onTransactionBatchStarted(status); // Should not throw + } + + @Test + void testOnTaskComplete_shouldRunWithoutException() { + ImportTaskResult task = + ImportTaskResult.builder() + .rowNumber(42) + .targets(Collections.emptyList()) + .rawRecord(TextNode.valueOf("{}")) + .dataChunkId(99) + .build(); + + listener.onTaskComplete(task); // Should not throw + } +} From 2894260656c62f02859978685e0f943827a10ec6 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Thu, 19 Jun 2025 15:44:37 +0530 Subject: [PATCH 07/10] import and export progress reporting --- .../ConsoleExportProgressReporter.java | 76 +++++++++++++ .../cli/command/dataexport/ExportCommand.java | 19 ++-- .../ConsoleImportProgressListener.java | 5 +- .../ConsoleExportProgressReporterTest.java | 104 ++++++++++++++++++ 4 files changed, 191 insertions(+), 13 deletions(-) create mode 100644 data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ConsoleExportProgressReporter.java create mode 100644 data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataexport/ConsoleExportProgressReporterTest.java diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ConsoleExportProgressReporter.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ConsoleExportProgressReporter.java new file mode 100644 index 0000000000..6a9dd1f783 --- /dev/null +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ConsoleExportProgressReporter.java @@ -0,0 +1,76 @@ +package com.scalar.db.dataloader.cli.command.dataexport; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A simple utility class to report export progress to the console. + * + *

This class is intended to be used in scenarios where there is no event-driven listener for + * export progress, but feedback to the user is still valuable. + * + *

It displays: + * + *

    + *
  • A starting message when export begins + *
  • A completion message with total records exported and time taken + *
  • Error messages in case of failures (via {@link #reportError(String, Throwable)}) + *
+ */ +public class ConsoleExportProgressReporter { + + private final long startTime; + private final AtomicBoolean completed = new AtomicBoolean(false); + private final String outputFile; + + /** + * Constructs a reporter and logs the export start. + * + * @param outputFile the file to which data will be exported + */ + public ConsoleExportProgressReporter(String outputFile) { + this.outputFile = outputFile; + this.startTime = System.currentTimeMillis(); + System.out.println("šŸ“¤ Starting export..."); + System.out.println("šŸ“ Exporting data to file: " + outputFile); + } + + /** + * Reports the completion of the export process, including total records exported and time taken. + * + * @param totalExported the total number of records exported + */ + public void reportCompletion(long totalExported) { + if (completed.getAndSet(true)) { + return; + } + long elapsed = System.currentTimeMillis() - startTime; + System.out.printf( + "%nāœ… Export completed: %,d records exported to %s in %s%n", + totalExported, outputFile, formatElapsed(elapsed)); + } + + /** + * Prints a formatted error message to the console. + * + * @param message the error description + * @param throwable the associated exception (can be null) + */ + public static void reportError(String message, Throwable throwable) { + System.err.println("%nāŒ Export failed: " + message); + if (throwable != null) { + System.err.println("Cause: " + throwable.getMessage()); + } + } + + /** + * Formats elapsed time in "Xm Ys" format. + * + * @param elapsedMillis the elapsed time in milliseconds + * @return a human-readable string of the elapsed time + */ + private String formatElapsed(long elapsedMillis) { + long seconds = (elapsedMillis / 1000) % 60; + long minutes = (elapsedMillis / 1000) / 60; + return String.format("%dm %ds", minutes, seconds); + } +} diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java index 015366258e..ed1fd79629 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java @@ -17,6 +17,7 @@ import com.scalar.db.dataloader.core.dataexport.CsvExportManager; import com.scalar.db.dataloader.core.dataexport.ExportManager; import com.scalar.db.dataloader.core.dataexport.ExportOptions; +import com.scalar.db.dataloader.core.dataexport.ExportReport; import com.scalar.db.dataloader.core.dataexport.JsonExportManager; import com.scalar.db.dataloader.core.dataexport.JsonLineExportManager; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; @@ -36,8 +37,6 @@ import java.util.Objects; import java.util.concurrent.Callable; import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import picocli.CommandLine; import picocli.CommandLine.Model.CommandSpec; import picocli.CommandLine.Spec; @@ -45,8 +44,6 @@ @CommandLine.Command(name = "export", description = "export data from a ScalarDB table") public class ExportCommand extends ExportCommandOptions implements Callable { - private static final Logger logger = LoggerFactory.getLogger(ExportCommand.class); - @Spec CommandSpec spec; @Override @@ -86,23 +83,23 @@ public Integer call() throws Exception { String filePath = getOutputAbsoluteFilePath( outputDirectory, outputFileName, exportOptions.getOutputFileFormat()); - logger.info("Exporting data to file: {}", filePath); try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(filePath), Charset.defaultCharset(), CREATE, APPEND)) { - exportManager.startExport(exportOptions, tableMetadata, writer); + ConsoleExportProgressReporter reporter = new ConsoleExportProgressReporter(filePath); + ExportReport report = exportManager.startExport(exportOptions, tableMetadata, writer); + reporter.reportCompletion(report.getExportedRowCount()); } } catch (DirectoryValidationException e) { - logger.error("Invalid output directory path: {}", outputDirectory); + ConsoleExportProgressReporter.reportError("Invalid output directory: " + outputDirectory, e); return 1; } catch (InvalidFilePathException e) { - logger.error( - "The ScalarDB connection settings file path is invalid or the file is missing: {}", - scalarDbPropertiesFilePath); + ConsoleExportProgressReporter.reportError( + "Invalid ScalarDB connection file path: " + scalarDbPropertiesFilePath, e); return 1; } catch (TableMetadataException e) { - logger.error("Failed to retrieve table metadata: {}", e.getMessage()); + ConsoleExportProgressReporter.reportError("Failed to retrieve table metadata", e); return 1; } return 0; diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java index 85cbff5738..431afa5f9a 100644 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java @@ -24,6 +24,7 @@ public class ConsoleImportProgressListener implements ImportEventListener { private final AtomicLong totalFailures = new AtomicLong(); private volatile boolean completed = false; + @SuppressWarnings("FutureReturnValueIgnored") public ConsoleImportProgressListener(Duration updateInterval) { startTime = System.currentTimeMillis(); scheduler.scheduleAtFixedRate( @@ -104,7 +105,7 @@ private void render() { builder.append( String.format( - "\rImporting... %,d records | %.0f rec/s | %s\n", + "\rImporting... %,d records | %.0f rec/s | %s%n", totalRecords.get(), recPerSec, formatElapsed(elapsed))); chunkLogs.values().stream().sorted().forEach(line -> builder.append(line).append("\n")); @@ -113,7 +114,7 @@ private void render() { if (completed) { builder.append( String.format( - "\nāœ… Import completed: %,d records succeeded, %,d failed\n", + "%nāœ… Import completed: %,d records succeeded, %,d failed%n", totalSuccess.get(), totalFailures.get())); } clearConsole(); diff --git a/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataexport/ConsoleExportProgressReporterTest.java b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataexport/ConsoleExportProgressReporterTest.java new file mode 100644 index 0000000000..bf9ad764fa --- /dev/null +++ b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataexport/ConsoleExportProgressReporterTest.java @@ -0,0 +1,104 @@ +package com.scalar.db.dataloader.cli.command.dataexport; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class ConsoleExportProgressReporterTest { + + private final ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + private final PrintStream originalOut = System.out; + + @BeforeEach + void setUpStreams() { + System.setOut(new PrintStream(outContent)); + } + + @AfterEach + void restoreStreams() { + System.setOut(originalOut); + } + + @Test + void testStartMessageIncludesExportFilePath() { + new ConsoleExportProgressReporter("output/test.csv"); + String output = outContent.toString(); + assertTrue(output.contains("šŸ“¤ Starting export"), "Expected start message"); + assertTrue( + output.contains("šŸ“ Exporting data to file: output/test.csv"), "Expected file path info"); + } + + @Test + void testCompletionMessageIncludesFilePathAndDuration() throws InterruptedException { + ConsoleExportProgressReporter reporter = new ConsoleExportProgressReporter("target/output.csv"); + + Thread.sleep(100); // Simulate work + + reporter.reportCompletion(12345); + String output = outContent.toString(); + + assertTrue( + output.contains("āœ… Export completed: 12,345 records exported to target/output.csv"), + "Expected completion message"); + assertTrue(output.matches("(?s).*in \\d+m \\d+s.*"), "Expected duration format"); + } + + @Test + void testCompletionOnlyPrintedOnce() { + ConsoleExportProgressReporter reporter = new ConsoleExportProgressReporter("target/output.csv"); + + reporter.reportCompletion(100); + reporter.reportCompletion(999999); // Should be ignored + + String output = outContent.toString(); + int count = output.split("Export completed").length - 1; + assertEquals(1, count, "Expected completion to be printed only once"); + } + + @Test + void testReportError_shouldPrintErrorMessageWithExceptionMessage() { + ByteArrayOutputStream errContent = new ByteArrayOutputStream(); + PrintStream originalErr = System.err; + System.setErr(new PrintStream(errContent)); + + try { + String errorMessage = "Something went wrong"; + Throwable cause = new RuntimeException("Test exception"); + + ConsoleExportProgressReporter.reportError(errorMessage, cause); + + String output = errContent.toString(); + assertTrue( + output.contains("āŒ Export failed: " + errorMessage), "Expected main error message"); + assertTrue(output.contains("Cause: " + cause.getMessage()), "Expected exception message"); + } finally { + System.setErr(originalErr); + } + } + + @Test + void testReportError_shouldPrintMessageWithoutExceptionWhenNull() { + ByteArrayOutputStream errContent = new ByteArrayOutputStream(); + PrintStream originalErr = System.err; + System.setErr(new PrintStream(errContent)); + + try { + String errorMessage = "Directory not found"; + + ConsoleExportProgressReporter.reportError(errorMessage, null); + + String output = errContent.toString(); + assertTrue(output.contains("āŒ Export failed: " + errorMessage), "Expected error message"); + assertFalse( + output.contains("Cause:"), "Should not print exception cause when throwable is null"); + } finally { + System.setErr(originalErr); + } + } +} From 04e57f0ddecc9e354763e5cfc7fb7720342690f8 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Fri, 20 Jun 2025 16:49:06 +0530 Subject: [PATCH 08/10] Changes --- .../dataimport/ConsoleImportProgressListener.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java index 431afa5f9a..db42df52c3 100644 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java @@ -36,7 +36,7 @@ public void onDataChunkStarted(ImportDataChunkStatus status) { chunkLogs.put( status.getDataChunkId(), String.format( - "šŸ”„ Chunk %d: Processing... %,d records so far", + "šŸ”„ Chunk %d: Processing... %d records so far", status.getDataChunkId(), totalRecords.get())); } @@ -50,7 +50,7 @@ public void onDataChunkCompleted(ImportDataChunkStatus status) { chunkLogs.put( status.getDataChunkId(), String.format( - "āœ“ Chunk %d: %,d records imported (%.1fs), %d records imported successfully, import of %d records failed", + "āœ“ Chunk %d: %d records imported (%.1fs), %d records imported successfully, import of %d records failed", status.getDataChunkId(), status.getTotalRecords(), elapsed / 1000.0, @@ -70,7 +70,7 @@ public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult chunkFailureLogs.put( batchResult.getDataChunkId(), String.format( - "āŒ Chunk id: %d, Transaction batch id: %d failed: %,d records could not be imported", + "āŒ Chunk id: %d, Transaction batch id: %d failed: %d records could not be imported", batchResult.getDataChunkId(), batchResult.getTransactionBatchId(), batchResult.getRecords().size())); @@ -105,7 +105,7 @@ private void render() { builder.append( String.format( - "\rImporting... %,d records | %.0f rec/s | %s%n", + "\rImporting... %d records | %.0f rec/s | %s%n", totalRecords.get(), recPerSec, formatElapsed(elapsed))); chunkLogs.values().stream().sorted().forEach(line -> builder.append(line).append("\n")); @@ -114,7 +114,7 @@ private void render() { if (completed) { builder.append( String.format( - "%nāœ… Import completed: %,d records succeeded, %,d failed%n", + "%nāœ… Import completed: %d records succeeded, %d failed%n", totalSuccess.get(), totalFailures.get())); } clearConsole(); From 42f3e228ecfec7287c87675d2137f3da9024e7c1 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Mon, 23 Jun 2025 16:38:49 +0530 Subject: [PATCH 09/10] Minor changes --- .../ConsoleImportProgressListener.java | 37 +++++++++++-------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java index db42df52c3..80dda160b1 100644 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java @@ -42,21 +42,28 @@ public void onDataChunkStarted(ImportDataChunkStatus status) { @Override public void onDataChunkCompleted(ImportDataChunkStatus status) { - long elapsed = status.getEndTime().toEpochMilli() - status.getStartTime().toEpochMilli(); - totalRecords.addAndGet(status.getTotalRecords()); - totalSuccess.addAndGet(status.getSuccessCount()); - totalFailures.addAndGet(status.getFailureCount()); - if (status.getSuccessCount() > 0) { - chunkLogs.put( - status.getDataChunkId(), - String.format( - "āœ“ Chunk %d: %d records imported (%.1fs), %d records imported successfully, import of %d records failed", - status.getDataChunkId(), - status.getTotalRecords(), - elapsed / 1000.0, - status.getSuccessCount(), - status.getFailureCount())); - } + long elapsedMillis = status.getEndTime().toEpochMilli() - status.getStartTime().toEpochMilli(); + double elapsedSeconds = elapsedMillis / 1000.0; + + int chunkId = status.getDataChunkId(); + int total = status.getTotalRecords(); + int success = status.getSuccessCount(); + int failure = status.getFailureCount(); + + totalRecords.addAndGet(total); + totalSuccess.addAndGet(success); + totalFailures.addAndGet(failure); + + String message = + (failure == 0) + ? String.format( + "āœ“ Chunk %d: %d records imported (%.1fs), %d records imported successfully", + chunkId, total, elapsedSeconds, success) + : String.format( + "āœ“ Chunk %d: %d records imported (%.1fs), %d records imported successfully, import of %d records failed", + chunkId, total, elapsedSeconds, success, failure); + + chunkLogs.put(chunkId, message); } @Override From d1d5b0b7feb53eb5ca3a955e98702f03dda0d6f6 Mon Sep 17 00:00:00 2001 From: Jishnu J Date: Mon, 23 Jun 2025 17:54:03 +0530 Subject: [PATCH 10/10] Spotbugs issues fixed --- .../ConsoleExportProgressReporterTest.java | 32 +++++++++++-------- .../ConsoleImportProgressListenerTest.java | 18 ++++++----- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataexport/ConsoleExportProgressReporterTest.java b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataexport/ConsoleExportProgressReporterTest.java index bf9ad764fa..fee14c2069 100644 --- a/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataexport/ConsoleExportProgressReporterTest.java +++ b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataexport/ConsoleExportProgressReporterTest.java @@ -6,6 +6,7 @@ import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.io.UnsupportedEncodingException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -16,8 +17,8 @@ class ConsoleExportProgressReporterTest { private final PrintStream originalOut = System.out; @BeforeEach - void setUpStreams() { - System.setOut(new PrintStream(outContent)); + void setUpStreams() throws UnsupportedEncodingException { + System.setOut(new PrintStream(outContent, true, "UTF-8")); } @AfterEach @@ -26,22 +27,23 @@ void restoreStreams() { } @Test - void testStartMessageIncludesExportFilePath() { + void testStartMessageIncludesExportFilePath() throws UnsupportedEncodingException { new ConsoleExportProgressReporter("output/test.csv"); - String output = outContent.toString(); + String output = outContent.toString("UTF-8"); assertTrue(output.contains("šŸ“¤ Starting export"), "Expected start message"); assertTrue( output.contains("šŸ“ Exporting data to file: output/test.csv"), "Expected file path info"); } @Test - void testCompletionMessageIncludesFilePathAndDuration() throws InterruptedException { + void testCompletionMessageIncludesFilePathAndDuration() + throws InterruptedException, UnsupportedEncodingException { ConsoleExportProgressReporter reporter = new ConsoleExportProgressReporter("target/output.csv"); Thread.sleep(100); // Simulate work reporter.reportCompletion(12345); - String output = outContent.toString(); + String output = outContent.toString("UTF-8"); assertTrue( output.contains("āœ… Export completed: 12,345 records exported to target/output.csv"), @@ -50,22 +52,23 @@ void testCompletionMessageIncludesFilePathAndDuration() throws InterruptedExcept } @Test - void testCompletionOnlyPrintedOnce() { + void testCompletionOnlyPrintedOnce() throws UnsupportedEncodingException { ConsoleExportProgressReporter reporter = new ConsoleExportProgressReporter("target/output.csv"); reporter.reportCompletion(100); reporter.reportCompletion(999999); // Should be ignored - String output = outContent.toString(); + String output = outContent.toString("UTF-8"); int count = output.split("Export completed").length - 1; assertEquals(1, count, "Expected completion to be printed only once"); } @Test - void testReportError_shouldPrintErrorMessageWithExceptionMessage() { + void testReportError_shouldPrintErrorMessageWithExceptionMessage() + throws UnsupportedEncodingException { ByteArrayOutputStream errContent = new ByteArrayOutputStream(); PrintStream originalErr = System.err; - System.setErr(new PrintStream(errContent)); + System.setErr(new PrintStream(errContent, true, "UTF-8")); try { String errorMessage = "Something went wrong"; @@ -73,7 +76,7 @@ void testReportError_shouldPrintErrorMessageWithExceptionMessage() { ConsoleExportProgressReporter.reportError(errorMessage, cause); - String output = errContent.toString(); + String output = errContent.toString("UTF-8"); assertTrue( output.contains("āŒ Export failed: " + errorMessage), "Expected main error message"); assertTrue(output.contains("Cause: " + cause.getMessage()), "Expected exception message"); @@ -83,17 +86,18 @@ void testReportError_shouldPrintErrorMessageWithExceptionMessage() { } @Test - void testReportError_shouldPrintMessageWithoutExceptionWhenNull() { + void testReportError_shouldPrintMessageWithoutExceptionWhenNull() + throws UnsupportedEncodingException { ByteArrayOutputStream errContent = new ByteArrayOutputStream(); PrintStream originalErr = System.err; - System.setErr(new PrintStream(errContent)); + System.setErr(new PrintStream(errContent, true, "UTF-8")); try { String errorMessage = "Directory not found"; ConsoleExportProgressReporter.reportError(errorMessage, null); - String output = errContent.toString(); + String output = errContent.toString("UTF-8"); assertTrue(output.contains("āŒ Export failed: " + errorMessage), "Expected error message"); assertFalse( output.contains("Cause:"), "Should not print exception cause when throwable is null"); diff --git a/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListenerTest.java b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListenerTest.java index cd9047d1e9..f0c381f341 100644 --- a/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListenerTest.java +++ b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListenerTest.java @@ -10,6 +10,7 @@ import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus; import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.io.UnsupportedEncodingException; import java.time.Duration; import java.time.Instant; import java.util.Arrays; @@ -24,8 +25,8 @@ class ConsoleImportProgressListenerTest { private ConsoleImportProgressListener listener; @BeforeEach - void setUp() { - System.setOut(new PrintStream(outContent)); + void setUp() throws UnsupportedEncodingException { + System.setOut(new PrintStream(outContent, true, "UTF-8")); listener = new ConsoleImportProgressListener(Duration.ofMillis(100)); } @@ -36,7 +37,7 @@ void tearDown() { @Test void testOnDataChunkStartedAndCompleted_shouldTrackSuccessAndFailureCounts() - throws InterruptedException { + throws InterruptedException, UnsupportedEncodingException { int chunkId = 1; Instant start = Instant.now(); TimeUnit.MILLISECONDS.sleep(50); @@ -75,7 +76,7 @@ void testOnDataChunkStartedAndCompleted_shouldTrackSuccessAndFailureCounts() TimeUnit.MILLISECONDS.sleep(200); // Allow render listener.onAllDataChunksCompleted(); - String output = outContent.toString(); + String output = outContent.toString("UTF-8"); assertTrue(output.contains("āœ“ Chunk 1"), "Expected chunk log line"); assertTrue(output.contains("90 records imported successfully"), "Expected success count"); @@ -87,7 +88,7 @@ void testOnDataChunkStartedAndCompleted_shouldTrackSuccessAndFailureCounts() @Test void testOnTransactionBatchFailed_shouldAccumulateFailuresInChunkFailureLogs() - throws InterruptedException { + throws InterruptedException, UnsupportedEncodingException { // Arrange: Build dummy failed batch ImportTaskResult task = ImportTaskResult.builder() @@ -117,7 +118,7 @@ void testOnTransactionBatchFailed_shouldAccumulateFailuresInChunkFailureLogs() TimeUnit.MILLISECONDS.sleep(500); // Assert - String output = outContent.toString(); + String output = outContent.toString("UTF-8"); System.out.println("OUTPUT:\n" + output); // Useful for debug assertTrue( output.contains("āŒ Chunk id: 7, Transaction batch id: 2 failed"), @@ -129,9 +130,10 @@ void testOnTransactionBatchFailed_shouldAccumulateFailuresInChunkFailureLogs() } @Test - void testOnAllDataChunksCompleted_shouldShutdownAndPrintFinalSummary() { + void testOnAllDataChunksCompleted_shouldShutdownAndPrintFinalSummary() + throws UnsupportedEncodingException { listener.onAllDataChunksCompleted(); - String output = outContent.toString(); + String output = outContent.toString("UTF-8"); assertTrue(output.contains("āœ… Import completed"), "Should print completion summary"); }