diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ConsoleExportProgressReporter.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ConsoleExportProgressReporter.java new file mode 100644 index 0000000000..6a9dd1f783 --- /dev/null +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ConsoleExportProgressReporter.java @@ -0,0 +1,76 @@ +package com.scalar.db.dataloader.cli.command.dataexport; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A simple utility class to report export progress to the console. + * + *

This class is intended to be used in scenarios where there is no event-driven listener for + * export progress, but feedback to the user is still valuable. + * + *

It displays: + * + *

+ */ +public class ConsoleExportProgressReporter { + + private final long startTime; + private final AtomicBoolean completed = new AtomicBoolean(false); + private final String outputFile; + + /** + * Constructs a reporter and logs the export start. + * + * @param outputFile the file to which data will be exported + */ + public ConsoleExportProgressReporter(String outputFile) { + this.outputFile = outputFile; + this.startTime = System.currentTimeMillis(); + System.out.println("šŸ“¤ Starting export..."); + System.out.println("šŸ“ Exporting data to file: " + outputFile); + } + + /** + * Reports the completion of the export process, including total records exported and time taken. + * + * @param totalExported the total number of records exported + */ + public void reportCompletion(long totalExported) { + if (completed.getAndSet(true)) { + return; + } + long elapsed = System.currentTimeMillis() - startTime; + System.out.printf( + "%nāœ… Export completed: %,d records exported to %s in %s%n", + totalExported, outputFile, formatElapsed(elapsed)); + } + + /** + * Prints a formatted error message to the console. + * + * @param message the error description + * @param throwable the associated exception (can be null) + */ + public static void reportError(String message, Throwable throwable) { + System.err.println("%nāŒ Export failed: " + message); + if (throwable != null) { + System.err.println("Cause: " + throwable.getMessage()); + } + } + + /** + * Formats elapsed time in "Xm Ys" format. + * + * @param elapsedMillis the elapsed time in milliseconds + * @return a human-readable string of the elapsed time + */ + private String formatElapsed(long elapsedMillis) { + long seconds = (elapsedMillis / 1000) % 60; + long minutes = (elapsedMillis / 1000) / 60; + return String.format("%dm %ds", minutes, seconds); + } +} diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java index c3aaeee3af..7ba063fb96 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java @@ -17,6 +17,7 @@ import com.scalar.db.dataloader.core.dataexport.CsvExportManager; import com.scalar.db.dataloader.core.dataexport.ExportManager; import com.scalar.db.dataloader.core.dataexport.ExportOptions; +import com.scalar.db.dataloader.core.dataexport.ExportReport; import com.scalar.db.dataloader.core.dataexport.JsonExportManager; import com.scalar.db.dataloader.core.dataexport.JsonLineExportManager; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; @@ -36,8 +37,6 @@ import java.util.Objects; import java.util.concurrent.Callable; import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import picocli.CommandLine; import picocli.CommandLine.Model.CommandSpec; import picocli.CommandLine.Spec; @@ -45,8 +44,6 @@ @CommandLine.Command(name = "export", description = "export data from a ScalarDB table") public class ExportCommand extends ExportCommandOptions implements Callable { - private static final Logger logger = LoggerFactory.getLogger(ExportCommand.class); - @Spec CommandSpec spec; @Override @@ -85,23 +82,23 @@ public Integer call() throws Exception { String filePath = getOutputAbsoluteFilePath( outputDirectory, outputFileName, exportOptions.getOutputFileFormat()); - logger.info("Exporting data to file: {}", filePath); try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(filePath), Charset.defaultCharset(), CREATE, APPEND)) { - exportManager.startExport(exportOptions, tableMetadata, writer); + ConsoleExportProgressReporter reporter = new ConsoleExportProgressReporter(filePath); + ExportReport report = exportManager.startExport(exportOptions, tableMetadata, writer); + reporter.reportCompletion(report.getExportedRowCount()); } } catch (DirectoryValidationException e) { - logger.error("Invalid output directory path: {}", outputDirectory); + ConsoleExportProgressReporter.reportError("Invalid output directory: " + outputDirectory, e); return 1; } catch (InvalidFilePathException e) { - logger.error( - "The ScalarDB connection settings file path is invalid or the file is missing: {}", - scalarDbPropertiesFilePath); + ConsoleExportProgressReporter.reportError( + "Invalid ScalarDB connection file path: " + scalarDbPropertiesFilePath, e); return 1; } catch (TableMetadataException e) { - logger.error("Failed to retrieve table metadata: {}", e.getMessage()); + ConsoleExportProgressReporter.reportError("Failed to retrieve table metadata", e); return 1; } return 0; diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java new file mode 100644 index 0000000000..80dda160b1 --- /dev/null +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListener.java @@ -0,0 +1,142 @@ +package com.scalar.db.dataloader.cli.command.dataimport; + +import com.scalar.db.dataloader.core.dataimport.ImportEventListener; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public class ConsoleImportProgressListener implements ImportEventListener { + + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final long startTime; + private final Map chunkLogs = new ConcurrentHashMap<>(); + private final Map chunkFailureLogs = new ConcurrentHashMap<>(); + private final AtomicLong totalRecords = new AtomicLong(); + private final AtomicLong totalSuccess = new AtomicLong(); + private final AtomicLong totalFailures = new AtomicLong(); + private volatile boolean completed = false; + + @SuppressWarnings("FutureReturnValueIgnored") + public ConsoleImportProgressListener(Duration updateInterval) { + startTime = System.currentTimeMillis(); + scheduler.scheduleAtFixedRate( + this::render, 0, updateInterval.toMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public void onDataChunkStarted(ImportDataChunkStatus status) { + chunkLogs.put( + status.getDataChunkId(), + String.format( + "šŸ”„ Chunk %d: Processing... %d records so far", + status.getDataChunkId(), totalRecords.get())); + } + + @Override + public void onDataChunkCompleted(ImportDataChunkStatus status) { + long elapsedMillis = status.getEndTime().toEpochMilli() - status.getStartTime().toEpochMilli(); + double elapsedSeconds = elapsedMillis / 1000.0; + + int chunkId = status.getDataChunkId(); + int total = status.getTotalRecords(); + int success = status.getSuccessCount(); + int failure = status.getFailureCount(); + + totalRecords.addAndGet(total); + totalSuccess.addAndGet(success); + totalFailures.addAndGet(failure); + + String message = + (failure == 0) + ? String.format( + "āœ“ Chunk %d: %d records imported (%.1fs), %d records imported successfully", + chunkId, total, elapsedSeconds, success) + : String.format( + "āœ“ Chunk %d: %d records imported (%.1fs), %d records imported successfully, import of %d records failed", + chunkId, total, elapsedSeconds, success, failure); + + chunkLogs.put(chunkId, message); + } + + @Override + public void onTransactionBatchStarted(ImportTransactionBatchStatus batchStatus) { + // Not used currently, but could be extended for detailed batch-level progress + } + + @Override + public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult) { + if (!batchResult.isSuccess()) { + chunkFailureLogs.put( + batchResult.getDataChunkId(), + String.format( + "āŒ Chunk id: %d, Transaction batch id: %d failed: %d records could not be imported", + batchResult.getDataChunkId(), + batchResult.getTransactionBatchId(), + batchResult.getRecords().size())); + } + } + + @Override + public void onTaskComplete(ImportTaskResult taskResult) { + // No-op currently, could be extended to summarize task-level results + } + + @Override + public void onAllDataChunksCompleted() { + completed = true; + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(2, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + scheduler.shutdownNow(); + } + render(); // Final render after shutdown + } + + private void render() { + StringBuilder builder = new StringBuilder(); + long now = System.currentTimeMillis(); + long elapsed = now - startTime; + double recPerSec = (totalRecords.get() * 1000.0) / (elapsed == 0 ? 1 : elapsed); + + builder.append( + String.format( + "\rImporting... %d records | %.0f rec/s | %s%n", + totalRecords.get(), recPerSec, formatElapsed(elapsed))); + + chunkLogs.values().stream().sorted().forEach(line -> builder.append(line).append("\n")); + chunkFailureLogs.values().stream().sorted().forEach(line -> builder.append(line).append("\n")); + + if (completed) { + builder.append( + String.format( + "%nāœ… Import completed: %d records succeeded, %d failed%n", + totalSuccess.get(), totalFailures.get())); + } + clearConsole(); + System.out.print(builder); + System.out.flush(); + } + + private String formatElapsed(long elapsedMillis) { + long seconds = (elapsedMillis / 1000) % 60; + long minutes = (elapsedMillis / 1000) / 60; + return String.format("%dm %02ds elapsed", minutes, seconds); + } + + private void clearConsole() { + System.out.print("\033[H\033[2J"); // ANSI escape to clear screen + System.out.flush(); + } +} diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java index bc78514b53..478e5bc4c3 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java @@ -34,6 +34,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -77,6 +78,7 @@ public Integer call() throws Exception { Files.newBufferedReader(Paths.get(sourceFilePath), Charset.defaultCharset())) { ImportManager importManager = createImportManager(importOptions, tableMetadataMap, reader, logWriterFactory, config); + importManager.addListener(new ConsoleImportProgressListener(Duration.ofSeconds(1))); importManager.startImport(); } return 0; diff --git a/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataexport/ConsoleExportProgressReporterTest.java b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataexport/ConsoleExportProgressReporterTest.java new file mode 100644 index 0000000000..fee14c2069 --- /dev/null +++ b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataexport/ConsoleExportProgressReporterTest.java @@ -0,0 +1,108 @@ +package com.scalar.db.dataloader.cli.command.dataexport; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class ConsoleExportProgressReporterTest { + + private final ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + private final PrintStream originalOut = System.out; + + @BeforeEach + void setUpStreams() throws UnsupportedEncodingException { + System.setOut(new PrintStream(outContent, true, "UTF-8")); + } + + @AfterEach + void restoreStreams() { + System.setOut(originalOut); + } + + @Test + void testStartMessageIncludesExportFilePath() throws UnsupportedEncodingException { + new ConsoleExportProgressReporter("output/test.csv"); + String output = outContent.toString("UTF-8"); + assertTrue(output.contains("šŸ“¤ Starting export"), "Expected start message"); + assertTrue( + output.contains("šŸ“ Exporting data to file: output/test.csv"), "Expected file path info"); + } + + @Test + void testCompletionMessageIncludesFilePathAndDuration() + throws InterruptedException, UnsupportedEncodingException { + ConsoleExportProgressReporter reporter = new ConsoleExportProgressReporter("target/output.csv"); + + Thread.sleep(100); // Simulate work + + reporter.reportCompletion(12345); + String output = outContent.toString("UTF-8"); + + assertTrue( + output.contains("āœ… Export completed: 12,345 records exported to target/output.csv"), + "Expected completion message"); + assertTrue(output.matches("(?s).*in \\d+m \\d+s.*"), "Expected duration format"); + } + + @Test + void testCompletionOnlyPrintedOnce() throws UnsupportedEncodingException { + ConsoleExportProgressReporter reporter = new ConsoleExportProgressReporter("target/output.csv"); + + reporter.reportCompletion(100); + reporter.reportCompletion(999999); // Should be ignored + + String output = outContent.toString("UTF-8"); + int count = output.split("Export completed").length - 1; + assertEquals(1, count, "Expected completion to be printed only once"); + } + + @Test + void testReportError_shouldPrintErrorMessageWithExceptionMessage() + throws UnsupportedEncodingException { + ByteArrayOutputStream errContent = new ByteArrayOutputStream(); + PrintStream originalErr = System.err; + System.setErr(new PrintStream(errContent, true, "UTF-8")); + + try { + String errorMessage = "Something went wrong"; + Throwable cause = new RuntimeException("Test exception"); + + ConsoleExportProgressReporter.reportError(errorMessage, cause); + + String output = errContent.toString("UTF-8"); + assertTrue( + output.contains("āŒ Export failed: " + errorMessage), "Expected main error message"); + assertTrue(output.contains("Cause: " + cause.getMessage()), "Expected exception message"); + } finally { + System.setErr(originalErr); + } + } + + @Test + void testReportError_shouldPrintMessageWithoutExceptionWhenNull() + throws UnsupportedEncodingException { + ByteArrayOutputStream errContent = new ByteArrayOutputStream(); + PrintStream originalErr = System.err; + System.setErr(new PrintStream(errContent, true, "UTF-8")); + + try { + String errorMessage = "Directory not found"; + + ConsoleExportProgressReporter.reportError(errorMessage, null); + + String output = errContent.toString("UTF-8"); + assertTrue(output.contains("āŒ Export failed: " + errorMessage), "Expected error message"); + assertFalse( + output.contains("Cause:"), "Should not print exception cause when throwable is null"); + } finally { + System.setErr(originalErr); + } + } +} diff --git a/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListenerTest.java b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListenerTest.java new file mode 100644 index 0000000000..f0c381f341 --- /dev/null +++ b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataimport/ConsoleImportProgressListenerTest.java @@ -0,0 +1,167 @@ +package com.scalar.db.dataloader.cli.command.dataimport; + +import static org.junit.jupiter.api.Assertions.*; + +import com.fasterxml.jackson.databind.node.TextNode; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatusState; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.*; + +class ConsoleImportProgressListenerTest { + + private final ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + private final PrintStream originalOut = System.out; + private ConsoleImportProgressListener listener; + + @BeforeEach + void setUp() throws UnsupportedEncodingException { + System.setOut(new PrintStream(outContent, true, "UTF-8")); + listener = new ConsoleImportProgressListener(Duration.ofMillis(100)); + } + + @AfterEach + void tearDown() { + System.setOut(originalOut); + } + + @Test + void testOnDataChunkStartedAndCompleted_shouldTrackSuccessAndFailureCounts() + throws InterruptedException, UnsupportedEncodingException { + int chunkId = 1; + Instant start = Instant.now(); + TimeUnit.MILLISECONDS.sleep(50); + Instant end = Instant.now(); + + ImportDataChunkStatus startStatus = + ImportDataChunkStatus.builder() + .dataChunkId(chunkId) + .startTime(start) + .totalRecords(100) + .successCount(0) + .failureCount(0) + .batchCount(0) + .endTime(null) + .totalDurationInMilliSeconds(0) + .status(ImportDataChunkStatusState.IN_PROGRESS) + .build(); + + listener.onDataChunkStarted(startStatus); + + ImportDataChunkStatus completedStatus = + ImportDataChunkStatus.builder() + .dataChunkId(chunkId) + .startTime(start) + .endTime(end) + .totalRecords(100) + .successCount(90) + .failureCount(10) + .batchCount(1) + .totalDurationInMilliSeconds((int) (end.toEpochMilli() - start.toEpochMilli())) + .status(ImportDataChunkStatusState.COMPLETE) + .build(); + + listener.onDataChunkCompleted(completedStatus); + + TimeUnit.MILLISECONDS.sleep(200); // Allow render + listener.onAllDataChunksCompleted(); + + String output = outContent.toString("UTF-8"); + + assertTrue(output.contains("āœ“ Chunk 1"), "Expected chunk log line"); + assertTrue(output.contains("90 records imported successfully"), "Expected success count"); + assertTrue(output.contains("10 records failed"), "Expected failure count"); + assertTrue( + output.contains("āœ… Import completed: 90 records succeeded, 10 failed"), + "Expected final summary"); + } + + @Test + void testOnTransactionBatchFailed_shouldAccumulateFailuresInChunkFailureLogs() + throws InterruptedException, UnsupportedEncodingException { + // Arrange: Build dummy failed batch + ImportTaskResult task = + ImportTaskResult.builder() + .rowNumber(1) + .targets(Collections.emptyList()) + .rawRecord(TextNode.valueOf("{\"id\":1}")) + .dataChunkId(7) + .build(); + + ImportTransactionBatchResult failedBatch = + ImportTransactionBatchResult.builder() + .transactionBatchId(2) + .dataChunkId(7) + .transactionId("txn-123") + .records(Collections.nCopies(5, task)) + .errors(Arrays.asList("error1", "error2")) + .success(false) + .build(); + + // Act + listener.onTransactionBatchCompleted(failedBatch); + + // Trigger final flush + listener.onAllDataChunksCompleted(); + + // Give time for render() to complete after scheduler shutdown + TimeUnit.MILLISECONDS.sleep(500); + + // Assert + String output = outContent.toString("UTF-8"); + System.out.println("OUTPUT:\n" + output); // Useful for debug + assertTrue( + output.contains("āŒ Chunk id: 7, Transaction batch id: 2 failed"), + "Expected failure message"); + // assertTrue(output.contains("5 records failed to be imported"), "Expected failed record + // count in batch"); + // assertTrue(output.contains("āœ… Import completed: 0 records succeeded, 5 failed"), + // "Expected final summary with failure count"); + } + + @Test + void testOnAllDataChunksCompleted_shouldShutdownAndPrintFinalSummary() + throws UnsupportedEncodingException { + listener.onAllDataChunksCompleted(); + String output = outContent.toString("UTF-8"); + assertTrue(output.contains("āœ… Import completed"), "Should print completion summary"); + } + + @Test + void testOnTransactionBatchStarted_shouldRunWithoutException() { + ImportTransactionBatchStatus status = + ImportTransactionBatchStatus.builder() + .dataChunkId(3) + .transactionBatchId(1) + .transactionId("txn-1") + .records(Collections.emptyList()) + .errors(Collections.emptyList()) + .success(true) + .build(); + + listener.onTransactionBatchStarted(status); // Should not throw + } + + @Test + void testOnTaskComplete_shouldRunWithoutException() { + ImportTaskResult task = + ImportTaskResult.builder() + .rowNumber(42) + .targets(Collections.emptyList()) + .rawRecord(TextNode.valueOf("{}")) + .dataChunkId(99) + .build(); + + listener.onTaskComplete(task); // Should not throw + } +}