diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java index c3aaeee3a..824e41308 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java @@ -5,6 +5,7 @@ import static java.nio.file.StandardOpenOption.CREATE; import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.cli.exception.DirectoryValidationException; import com.scalar.db.dataloader.cli.util.DirectoryUtils; @@ -13,6 +14,7 @@ import com.scalar.db.dataloader.core.ColumnKeyValue; import com.scalar.db.dataloader.core.DataLoaderError; import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.ScalarDbMode; import com.scalar.db.dataloader.core.ScanRange; import com.scalar.db.dataloader.core.dataexport.CsvExportManager; import com.scalar.db.dataloader.core.dataexport.ExportManager; @@ -25,10 +27,14 @@ import com.scalar.db.dataloader.core.exception.KeyParsingException; import com.scalar.db.dataloader.core.tablemetadata.TableMetadataException; import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService; +import com.scalar.db.dataloader.core.tablemetadata.TableMetadataStorageService; +import com.scalar.db.dataloader.core.tablemetadata.TableMetadataTransactionService; import com.scalar.db.dataloader.core.util.KeyUtils; import com.scalar.db.io.Key; import com.scalar.db.service.StorageFactory; +import com.scalar.db.service.TransactionFactory; import java.io.BufferedWriter; +import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Paths; @@ -60,14 +66,13 @@ public Integer call() throws Exception { spec.commandLine(), dataChunkSize, DataLoaderError.INVALID_DATA_CHUNK_SIZE); validatePositiveValue(spec.commandLine(), maxThreads, DataLoaderError.INVALID_MAX_THREADS); - StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath); - TableMetadataService metaDataService = - new TableMetadataService(storageFactory.getStorageAdmin()); + TableMetadataService tableMetadataService = + createTableMetadataService(scalarDbMode, scalarDbPropertiesFilePath); ScalarDbDao scalarDbDao = new ScalarDbDao(); - ExportManager exportManager = createExportManager(storageFactory, scalarDbDao, outputFormat); - - TableMetadata tableMetadata = metaDataService.getTableMetadata(namespace, table); + ExportManager exportManager = + createExportManager(scalarDbMode, scalarDbDao, outputFormat, scalarDbPropertiesFilePath); + TableMetadata tableMetadata = tableMetadataService.getTableMetadata(namespace, table); Key partitionKey = partitionKeyValue != null ? getKeysFromList(partitionKeyValue, tableMetadata) : null; @@ -124,11 +129,100 @@ private void validateOutputDirectory() throws DirectoryValidationException { } } + /** + * Creates a {@link TableMetadataService} instance based on the specified {@link ScalarDbMode} and + * ScalarDB configuration file. + * + *

If the mode is {@code TRANSACTION}, this method initializes a {@link TransactionFactory} and + * uses its transaction admin to create a {@link TableMetadataTransactionService}. Otherwise, it + * initializes a {@link StorageFactory} and creates a {@link TableMetadataStorageService} using + * its storage admin. + * + * @param scalarDbMode the mode ScalarDB is running in (either {@code STORAGE} or {@code + * TRANSACTION}) + * @param scalarDbPropertiesFilePath the path to the ScalarDB properties file + * @return an appropriate {@link TableMetadataService} based on the mode + * @throws IOException if reading the ScalarDB properties file fails + */ + private TableMetadataService createTableMetadataService( + ScalarDbMode scalarDbMode, String scalarDbPropertiesFilePath) throws IOException { + if (scalarDbMode.equals(ScalarDbMode.TRANSACTION)) { + TransactionFactory transactionFactory = TransactionFactory.create(scalarDbPropertiesFilePath); + return new TableMetadataTransactionService(transactionFactory.getTransactionAdmin()); + } + StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath); + return new TableMetadataStorageService(storageFactory.getStorageAdmin()); + } + + /** + * Creates an {@link ExportManager} instance based on ScalarDB mode and file format. + * + * @param scalarDbMode The ScalarDB mode (TRANSACTION or STORAGE). + * @param scalarDbDao The DAO for accessing ScalarDB. + * @param fileFormat The output file format (CSV, JSON, JSONL). + * @param scalarDbPropertiesFilePath Path to the ScalarDB properties file. + * @return A configured {@link ExportManager}. + * @throws IOException If there is an error reading the properties file. + */ private ExportManager createExportManager( - StorageFactory storageFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) { + ScalarDbMode scalarDbMode, + ScalarDbDao scalarDbDao, + FileFormat fileFormat, + String scalarDbPropertiesFilePath) + throws IOException { ProducerTaskFactory taskFactory = new ProducerTaskFactory(delimiter, includeTransactionMetadata, prettyPrintJson); - DistributedStorage storage = storageFactory.getStorage(); + if (scalarDbMode.equals(ScalarDbMode.STORAGE)) { + DistributedStorage storage = StorageFactory.create(scalarDbPropertiesFilePath).getStorage(); + return createExportManagerWithStorage(storage, scalarDbDao, fileFormat, taskFactory); + } else { + DistributedTransactionManager distributedTransactionManager = + TransactionFactory.create(scalarDbPropertiesFilePath).getTransactionManager(); + return createExportManagerWithTransaction( + distributedTransactionManager, scalarDbDao, fileFormat, taskFactory); + } + } + + /** + * Returns an {@link ExportManager} that uses {@link DistributedTransactionManager}. + * + * @param distributedTransactionManager distributed transaction manager object + * @param scalarDbDao The DAO for accessing ScalarDB. + * @param fileFormat The output file format (CSV, JSON, JSONL). + * @param taskFactory Producer task factory object + * @return A configured {@link ExportManager}. + */ + private ExportManager createExportManagerWithTransaction( + DistributedTransactionManager distributedTransactionManager, + ScalarDbDao scalarDbDao, + FileFormat fileFormat, + ProducerTaskFactory taskFactory) { + switch (fileFormat) { + case JSON: + return new JsonExportManager(distributedTransactionManager, scalarDbDao, taskFactory); + case JSONL: + return new JsonLineExportManager(distributedTransactionManager, scalarDbDao, taskFactory); + case CSV: + return new CsvExportManager(distributedTransactionManager, scalarDbDao, taskFactory); + default: + throw new AssertionError("Invalid file format" + fileFormat); + } + } + + /** + * Returns an {@link ExportManager} that uses {@link DistributedStorage}. + * + * @param storage distributed storage object + * @param scalarDbDao The DAO for accessing ScalarDB. + * @param fileFormat The output file format (CSV, JSON, JSONL). + * @param taskFactory Producer task factory object + * @return A configured {@link ExportManager}. + */ + private ExportManager createExportManagerWithStorage( + DistributedStorage storage, + ScalarDbDao scalarDbDao, + FileFormat fileFormat, + ProducerTaskFactory taskFactory) { switch (fileFormat) { case JSON: return new JsonExportManager(storage, scalarDbDao, taskFactory); @@ -152,7 +246,8 @@ private ExportOptions buildExportOptions(Key partitionKey, ScanRange scanRange) .maxThreadCount(maxThreads) .dataChunkSize(dataChunkSize) .prettyPrintJson(prettyPrintJson) - .scanRange(scanRange); + .scanRange(scanRange) + .scalarDbMode(scalarDbMode); if (projectionColumns != null) { builder.projectionColumns(projectionColumns); diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommandOptions.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommandOptions.java index 5cbe8f6c7..1f7fad7a4 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommandOptions.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommandOptions.java @@ -3,6 +3,7 @@ import com.scalar.db.api.Scan; import com.scalar.db.dataloader.core.ColumnKeyValue; import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.ScalarDbMode; import java.util.ArrayList; import java.util.List; import picocli.CommandLine; @@ -61,7 +62,7 @@ public class ExportCommandOptions { protected FileFormat outputFormat; @CommandLine.Option( - names = {"--include-metadata", "-m"}, + names = {"--include-metadata", "-im"}, description = "Include transaction metadata in the exported data (default: false)", defaultValue = "false") protected boolean includeTransactionMetadata; @@ -144,4 +145,11 @@ public class ExportCommandOptions { description = "Size of the data chunk to process in a single task (default: 200)", defaultValue = "200") protected int dataChunkSize; + + @CommandLine.Option( + names = {"--mode", "-m"}, + description = "ScalarDB mode (STORAGE, TRANSACTION) (default: STORAGE)", + paramLabel = "", + defaultValue = "STORAGE") + protected ScalarDbMode scalarDbMode; } diff --git a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java index bc78514b5..b62ff6a65 100755 --- a/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java +++ b/data-loader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataimport/ImportCommand.java @@ -24,6 +24,7 @@ import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorFactory; import com.scalar.db.dataloader.core.tablemetadata.TableMetadataException; import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService; +import com.scalar.db.dataloader.core.tablemetadata.TableMetadataStorageService; import com.scalar.db.dataloader.core.util.TableMetadataUtil; import com.scalar.db.service.StorageFactory; import com.scalar.db.service.TransactionFactory; @@ -106,7 +107,7 @@ private Map createTableMetadataMap( File configFile = new File(configFilePath); StorageFactory storageFactory = StorageFactory.create(configFile); try (DistributedStorageAdmin storageAdmin = storageFactory.getStorageAdmin()) { - TableMetadataService tableMetadataService = new TableMetadataService(storageAdmin); + TableMetadataService tableMetadataService = new TableMetadataStorageService(storageAdmin); Map tableMetadataMap = new HashMap<>(); if (controlFile != null) { for (ControlFileTable table : controlFile.getTables()) { diff --git a/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommandTest.java b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommandTest.java index 56518f8e7..78d7b17a8 100755 --- a/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommandTest.java +++ b/data-loader/cli/src/test/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommandTest.java @@ -4,6 +4,7 @@ import com.scalar.db.dataloader.core.DataLoaderError; import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.ScalarDbMode; import java.io.File; import java.nio.file.Paths; import org.junit.jupiter.api.AfterEach; @@ -55,4 +56,19 @@ void call_withInvalidScalarDBConfigurationFile_shouldReturnOne() throws Exceptio exportCommand.outputFormat = FileFormat.JSON; Assertions.assertEquals(1, exportCommand.call()); } + + @Test + void call_withScalarDBModeTransaction_WithInvalidConfigurationFile_shouldReturnOne() + throws Exception { + ExportCommand exportCommand = new ExportCommand(); + exportCommand.configFilePath = "scalardb.properties"; + exportCommand.dataChunkSize = 100; + exportCommand.namespace = "scalar"; + exportCommand.table = "asset"; + exportCommand.outputDirectory = ""; + exportCommand.outputFileName = "sample.json"; + exportCommand.outputFormat = FileFormat.JSON; + exportCommand.scalarDbMode = ScalarDbMode.TRANSACTION; + Assertions.assertEquals(1, exportCommand.call()); + } } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java index 64f059852..b52b8b510 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/CsvExportManager.java @@ -1,6 +1,7 @@ package com.scalar.db.dataloader.core.dataexport; import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao; @@ -13,18 +14,40 @@ /** Export manager implementation which manages the export task that exports data in CSV format */ public class CsvExportManager extends ExportManager { + /** + * Constructs a {@code CsvExportManager} for exporting data using a {@link DistributedStorage} + * instance. + * + *

This constructor is used when exporting data in non-transactional (storage) mode. + * + * @param distributedStorage the {@link DistributedStorage} used to read data directly from + * storage + * @param dao the {@link ScalarDbDao} used to interact with ScalarDB for exporting data + * @param producerTaskFactory the factory used to create producer tasks for generating + * CSV-formatted output + */ + public CsvExportManager( + DistributedStorage distributedStorage, + ScalarDbDao dao, + ProducerTaskFactory producerTaskFactory) { + super(distributedStorage, dao, producerTaskFactory); + } /** - * Constructs a {@code CsvExportManager} with the specified {@link DistributedStorage}, {@link - * ScalarDbDao}, and {@link ProducerTaskFactory}. + * Constructs a {@code CsvExportManager} for exporting data using a {@link + * DistributedTransactionManager}. * - * @param storage the {@code DistributedStorage} instance used to read data from the database - * @param dao the {@code ScalarDbDao} used to execute export-related database operations - * @param producerTaskFactory the factory used to create producer tasks for exporting data + * @param distributedTransactionManager the transaction manager used to read data with + * transactional guarantees + * @param dao the {@link ScalarDbDao} used to interact with ScalarDB for exporting data + * @param producerTaskFactory the factory used to create producer tasks for generating + * CSV-formatted output */ public CsvExportManager( - DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) { - super(storage, dao, producerTaskFactory); + DistributedTransactionManager distributedTransactionManager, + ScalarDbDao dao, + ProducerTaskFactory producerTaskFactory) { + super(distributedTransactionManager, dao, producerTaskFactory); } /** diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java index fdc27d664..aab2e3839 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java @@ -1,10 +1,13 @@ package com.scalar.db.dataloader.core.dataexport; import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Result; import com.scalar.db.api.Scanner; import com.scalar.db.api.TableMetadata; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.ScalarDbMode; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTask; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; import com.scalar.db.dataloader.core.dataexport.validation.ExportOptionsValidationException; @@ -12,6 +15,7 @@ import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException; import com.scalar.db.dataloader.core.util.TableMetadataUtil; +import com.scalar.db.exception.transaction.TransactionException; import com.scalar.db.io.DataType; import java.io.BufferedWriter; import java.io.IOException; @@ -34,11 +38,49 @@ public abstract class ExportManager { private static final Logger logger = LoggerFactory.getLogger(ExportManager.class); - private final DistributedStorage storage; + private final DistributedStorage distributedStorage; + private final DistributedTransactionManager distributedTransactionManager; private final ScalarDbDao dao; private final ProducerTaskFactory producerTaskFactory; private final Object lock = new Object(); + /** + * Constructs an {@code ExportManager} that uses a {@link DistributedStorage} instance for + * non-transactional data export operations. + * + * @param distributedStorage the {@link DistributedStorage} used to read data directly from + * storage + * @param dao the {@link ScalarDbDao} used to perform data operations + * @param producerTaskFactory the factory for creating producer tasks to format the exported data + */ + public ExportManager( + DistributedStorage distributedStorage, + ScalarDbDao dao, + ProducerTaskFactory producerTaskFactory) { + this.distributedStorage = distributedStorage; + this.distributedTransactionManager = null; + this.dao = dao; + this.producerTaskFactory = producerTaskFactory; + } + + /** + * Constructs an {@code ExportManager} that uses a {@link DistributedTransactionManager} instance + * for transactional data export operations. + * + * @param distributedTransactionManager the {@link DistributedTransactionManager} used to read + * data with transactional guarantees + * @param dao the {@link ScalarDbDao} used to perform data operations + * @param producerTaskFactory the factory for creating producer tasks to format the exported data + */ + public ExportManager( + DistributedTransactionManager distributedTransactionManager, + ScalarDbDao dao, + ProducerTaskFactory producerTaskFactory) { + this.distributedStorage = null; + this.distributedTransactionManager = distributedTransactionManager; + this.dao = dao; + this.producerTaskFactory = producerTaskFactory; + } /** * Create and add header part for the export file * @@ -71,59 +113,143 @@ abstract void processFooter( public ExportReport startExport( ExportOptions exportOptions, TableMetadata tableMetadata, Writer writer) { ExportReport exportReport = new ExportReport(); + ExecutorService executorService = null; + try { validateExportOptions(exportOptions, tableMetadata); - Map dataTypeByColumnName = tableMetadata.getColumnDataTypes(); handleTransactionMetadata(exportOptions, tableMetadata); - processHeader(exportOptions, tableMetadata, writer); - int maxThreadCount = - exportOptions.getMaxThreadCount() == 0 - ? Runtime.getRuntime().availableProcessors() - : exportOptions.getMaxThreadCount(); - ExecutorService executorService = Executors.newFixedThreadPool(maxThreadCount); + try (BufferedWriter bufferedWriter = new BufferedWriter(writer)) { + processHeader(exportOptions, tableMetadata, bufferedWriter); - BufferedWriter bufferedWriter = new BufferedWriter(writer); - boolean isJson = exportOptions.getOutputFileFormat() == FileFormat.JSON; + int threadCount = + exportOptions.getMaxThreadCount() > 0 + ? exportOptions.getMaxThreadCount() + : Runtime.getRuntime().availableProcessors(); + executorService = Executors.newFixedThreadPool(threadCount); - try (Scanner scanner = createScanner(exportOptions, dao, storage)) { - - Iterator iterator = scanner.iterator(); AtomicBoolean isFirstBatch = new AtomicBoolean(true); + Map dataTypeByColumnName = tableMetadata.getColumnDataTypes(); - while (iterator.hasNext()) { - List dataChunk = fetchDataChunk(iterator, exportOptions.getDataChunkSize()); - executorService.submit( - () -> - processDataChunk( - exportOptions, - tableMetadata, - dataTypeByColumnName, - dataChunk, - bufferedWriter, - isJson, - isFirstBatch, - exportReport)); - } - executorService.shutdown(); - if (executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { - logger.info("All tasks completed"); - } else { - logger.error("Timeout occurred while waiting for tasks to complete"); - // TODO: handle this + if (exportOptions.getScalarDbMode() == ScalarDbMode.STORAGE) { + try (Scanner scanner = createScannerWithStorage(exportOptions, dao, distributedStorage)) { + submitTasks( + scanner.iterator(), + executorService, + exportOptions, + tableMetadata, + dataTypeByColumnName, + bufferedWriter, + isFirstBatch, + exportReport); + } + } else if (exportOptions.getScalarDbMode() == ScalarDbMode.TRANSACTION + && distributedTransactionManager != null) { + + try (TransactionManagerCrudOperable.Scanner scanner = + createScannerWithTransaction(exportOptions, dao, distributedTransactionManager)) { + submitTasks( + scanner.iterator(), + executorService, + exportOptions, + tableMetadata, + dataTypeByColumnName, + bufferedWriter, + isFirstBatch, + exportReport); + } } + + shutdownExecutor(executorService); processFooter(exportOptions, tableMetadata, bufferedWriter); - } catch (InterruptedException | IOException e) { - logger.error("Error during export: {}", e.getMessage()); - } finally { - bufferedWriter.flush(); + } catch (TransactionException e) { + throw new RuntimeException(e); } - } catch (ExportOptionsValidationException | IOException | ScalarDbDaoException e) { + + } catch (ExportOptionsValidationException + | IOException + | ScalarDbDaoException + | InterruptedException e) { logger.error("Error during export: {}", e.getMessage()); + } finally { + if (executorService != null && !executorService.isShutdown()) { + executorService.shutdownNow(); + } + closeResources(); } + return exportReport; } + /** + * Submits asynchronous tasks for processing chunks of data to the given executor service. + * + *

This method reads data from the provided {@code iterator} in chunks (based on the configured + * chunk size) and submits each chunk as a separate task for processing. Each task invokes {@code + * processDataChunk()} to write the data to the output format. + * + *

Any exceptions thrown during chunk processing are logged but do not halt the submission of + * other tasks. + * + * @param iterator the iterator over database results + * @param executorService the executor service to run the processing tasks + * @param exportOptions configuration for export operation + * @param tableMetadata metadata for the table being exported + * @param dataTypeByColumnName mapping of column names to their data types + * @param writer the writer to which export output is written + * @param isFirstBatch an atomic flag used to track if the current chunk is the first one (used + * for formatting) + * @param exportReport the report object that accumulates export statistics + */ + private void submitTasks( + Iterator iterator, + ExecutorService executorService, + ExportOptions exportOptions, + TableMetadata tableMetadata, + Map dataTypeByColumnName, + BufferedWriter writer, + AtomicBoolean isFirstBatch, + ExportReport exportReport) { + while (iterator.hasNext()) { + List chunk = fetchDataChunk(iterator, exportOptions.getDataChunkSize()); + executorService.submit( + () -> { + try { + processDataChunk( + exportOptions, + tableMetadata, + dataTypeByColumnName, + chunk, + writer, + exportOptions.getOutputFileFormat() == FileFormat.JSON, + isFirstBatch, + exportReport); + } catch (Exception e) { + logger.error("Error processing data chunk", e); + } + }); + } + } + + /** + * Shuts down the given executor service gracefully, waiting for tasks to complete. + * + *

This method initiates an orderly shutdown where previously submitted tasks are executed, but + * no new tasks will be accepted. It then waits for all tasks to finish within a specified + * timeout. If the tasks do not complete in time, a warning is logged. + * + * @param executorService the ExecutorService to shut down + * @throws InterruptedException if the current thread is interrupted while waiting + */ + private void shutdownExecutor(ExecutorService executorService) throws InterruptedException { + executorService.shutdown(); + if (!executorService.awaitTermination(60, TimeUnit.MINUTES)) { + logger.warn("Timeout while waiting for export tasks to finish."); + } else { + logger.info("All export tasks completed."); + } + } + /** * To process result data chunk * @@ -212,15 +338,22 @@ private void handleTransactionMetadata(ExportOptions exportOptions, TableMetadat } /** - * To create a scanner object + * Creates a ScalarDB {@link Scanner} using the {@link DistributedStorage} interface based on the + * scan configuration provided in {@link ExportOptions}. * - * @param exportOptions export options - * @param dao ScalarDB dao object - * @param storage distributed storage object - * @return created scanner - * @throws ScalarDbDaoException throws if any issue occurs in creating scanner object + *

If no partition key is specified in the {@code exportOptions}, a full table scan is + * performed. Otherwise, a partition-specific scan is performed using the provided partition key, + * optional scan range, and sort orders. + * + * @param exportOptions Options containing configuration for the export operation, including + * namespace, table name, projection columns, limit, and scan parameters + * @param dao The {@link ScalarDbDao} used to construct the scan operation + * @param storage The {@link DistributedStorage} instance used to execute the scan + * @return A {@link Scanner} instance for reading data from ScalarDB using storage-level + * operations + * @throws ScalarDbDaoException If an error occurs while creating the scanner */ - private Scanner createScanner( + private Scanner createScannerWithStorage( ExportOptions exportOptions, ScalarDbDao dao, DistributedStorage storage) throws ScalarDbDaoException { boolean isScanAll = exportOptions.getScanPartitionKey() == null; @@ -243,4 +376,67 @@ private Scanner createScanner( storage); } } + + /** + * Creates a {@link TransactionManagerCrudOperable.Scanner} instance using the given {@link + * ExportOptions}, {@link ScalarDbDao}, and {@link DistributedTransactionManager}. + * + *

If {@code scanPartitionKey} is not specified in {@code exportOptions}, a full table scan is + * performed using the specified projection columns and limit. Otherwise, the scan is executed + * with the specified partition key, range, sort orders, projection columns, and limit. + * + * @param exportOptions the export options containing scan configuration such as namespace, table + * name, partition key, projection columns, limit, range, and sort order + * @param dao the ScalarDB DAO used to create the scanner + * @param distributedTransactionManager the transaction manager to use for the scan operation + * @return a {@link TransactionManagerCrudOperable.Scanner} for retrieving rows in transaction + * mode + * @throws ScalarDbDaoException if an error occurs while creating the scanner + * @throws TransactionException if a transaction-related error occurs during scanner creation + */ + private TransactionManagerCrudOperable.Scanner createScannerWithTransaction( + ExportOptions exportOptions, + ScalarDbDao dao, + DistributedTransactionManager distributedTransactionManager) + throws ScalarDbDaoException, TransactionException { + + boolean isScanAll = exportOptions.getScanPartitionKey() == null; + + TransactionManagerCrudOperable.Scanner scanner; + if (isScanAll) { + scanner = + dao.createScanner( + exportOptions.getNamespace(), + exportOptions.getTableName(), + exportOptions.getProjectionColumns(), + exportOptions.getLimit(), + distributedTransactionManager); + } else { + scanner = + dao.createScanner( + exportOptions.getNamespace(), + exportOptions.getTableName(), + exportOptions.getScanPartitionKey(), + exportOptions.getScanRange(), + exportOptions.getSortOrders(), + exportOptions.getProjectionColumns(), + exportOptions.getLimit(), + distributedTransactionManager); + } + + return scanner; + } + + /** Close resources properly once the process is completed */ + public void closeResources() { + try { + if (distributedStorage != null) { + distributedStorage.close(); + } else if (distributedTransactionManager != null) { + distributedTransactionManager.close(); + } + } catch (Throwable e) { + throw new RuntimeException("Failed to close the resource", e); + } + } } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportOptions.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportOptions.java index 3c7ed9ef5..ebc98b881 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportOptions.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportOptions.java @@ -2,6 +2,7 @@ import com.scalar.db.api.Scan; import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.ScalarDbMode; import com.scalar.db.dataloader.core.ScanRange; import com.scalar.db.io.Key; import java.util.Collections; @@ -30,6 +31,7 @@ public class ExportOptions { @Builder.Default private final boolean includeTransactionMetadata = false; @Builder.Default private List projectionColumns = Collections.emptyList(); private List sortOrders; + @Builder.Default private final ScalarDbMode scalarDbMode = ScalarDbMode.STORAGE; /** * Generates and returns an export options builder. diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java index fadac644a..af8a2546c 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonExportManager.java @@ -1,6 +1,7 @@ package com.scalar.db.dataloader.core.dataexport; import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao; @@ -9,18 +10,40 @@ /** Export manager implementation which manages the export task that exports data in JSON format */ public class JsonExportManager extends ExportManager { + /** + * Constructs a {@code JsonExportManager} for exporting data using a {@link DistributedStorage} + * instance. + * + *

This constructor is used when exporting data in non-transactional (storage) mode. + * + * @param distributedStorage the {@link DistributedStorage} used to read data directly from + * storage + * @param dao the {@link ScalarDbDao} used to interact with ScalarDB for exporting data + * @param producerTaskFactory the factory used to create producer tasks for generating + * CSV-formatted output + */ + public JsonExportManager( + DistributedStorage distributedStorage, + ScalarDbDao dao, + ProducerTaskFactory producerTaskFactory) { + super(distributedStorage, dao, producerTaskFactory); + } /** - * Constructs a {@code JsonExportManager} with the specified {@link DistributedStorage}, {@link - * ScalarDbDao}, and {@link ProducerTaskFactory}. + * Constructs a {@code JsonExportManager} for exporting data using a {@link + * DistributedTransactionManager}. * - * @param storage the {@code DistributedStorage} instance used to read data from the database - * @param dao the {@code ScalarDbDao} used to execute export-related database operations - * @param producerTaskFactory the factory used to create producer tasks for exporting data + * @param distributedTransactionManager the transaction manager used to read data with + * transactional guarantees + * @param dao the {@link ScalarDbDao} used to interact with ScalarDB for exporting data + * @param producerTaskFactory the factory used to create producer tasks for generating + * CSV-formatted output */ public JsonExportManager( - DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) { - super(storage, dao, producerTaskFactory); + DistributedTransactionManager distributedTransactionManager, + ScalarDbDao dao, + ProducerTaskFactory producerTaskFactory) { + super(distributedTransactionManager, dao, producerTaskFactory); } /** diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java index 2ce21deb7..db111cb68 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManager.java @@ -1,6 +1,7 @@ package com.scalar.db.dataloader.core.dataexport; import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao; @@ -11,18 +12,40 @@ * Export manager implementation which manages the export task that exports data in JSONLines format */ public class JsonLineExportManager extends ExportManager { + /** + * Constructs a {@code JsonLineExportManager} for exporting data using a {@link + * DistributedStorage} instance. + * + *

This constructor is used when exporting data in non-transactional (storage) mode. + * + * @param distributedStorage the {@link DistributedStorage} used to read data directly from + * storage + * @param dao the {@link ScalarDbDao} used to interact with ScalarDB for exporting data + * @param producerTaskFactory the factory used to create producer tasks for generating + * CSV-formatted output + */ + public JsonLineExportManager( + DistributedStorage distributedStorage, + ScalarDbDao dao, + ProducerTaskFactory producerTaskFactory) { + super(distributedStorage, dao, producerTaskFactory); + } /** - * Constructs a {@code JsonLineExportManager} with the specified {@link DistributedStorage}, - * {@link ScalarDbDao}, and {@link ProducerTaskFactory}. + * Constructs a {@code JsonLineExportManager} for exporting data using a {@link + * DistributedTransactionManager}. * - * @param storage the {@code DistributedStorage} instance used to read data from the database - * @param dao the {@code ScalarDbDao} used to execute export-related database operations - * @param producerTaskFactory the factory used to create producer tasks for exporting data + * @param distributedTransactionManager the transaction manager used to read data with + * transactional guarantees + * @param dao the {@link ScalarDbDao} used to interact with ScalarDB for exporting data + * @param producerTaskFactory the factory used to create producer tasks for generating + * CSV-formatted output */ public JsonLineExportManager( - DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) { - super(storage, dao, producerTaskFactory); + DistributedTransactionManager distributedTransactionManager, + ScalarDbDao dao, + ProducerTaskFactory producerTaskFactory) { + super(distributedTransactionManager, dao, producerTaskFactory); } /** diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java index afd7b124a..4d3610d89 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDao.java @@ -2,6 +2,7 @@ import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Get; import com.scalar.db.api.GetBuilder; import com.scalar.db.api.Put; @@ -10,6 +11,7 @@ import com.scalar.db.api.Scan; import com.scalar.db.api.ScanBuilder; import com.scalar.db.api.Scanner; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.dataloader.core.DataLoaderError; import com.scalar.db.dataloader.core.ScanRange; import com.scalar.db.exception.storage.ExecutionException; @@ -245,6 +247,33 @@ public Scanner createScanner( } } + /** + * Create a ScalarDB scanner instance + * + * @param namespace ScalarDB namespace + * @param table ScalarDB table name + * @param projectionColumns List of column projection to use during scan + * @param limit Scan limit value + * @param transaction Distributed transaction object + * @return ScalarDB Scanner object + * @throws ScalarDbDaoException if scan fails + */ + public TransactionManagerCrudOperable.Scanner createScanner( + String namespace, + String table, + List projectionColumns, + int limit, + DistributedTransactionManager transaction) + throws ScalarDbDaoException { + Scan scan = + createScan(namespace, table, null, null, new ArrayList<>(), projectionColumns, limit); + try { + return transaction.getScanner(scan); + } catch (CrudException e) { + throw new ScalarDbDaoException(DataLoaderError.ERROR_SCAN.buildMessage(e.getMessage()), e); + } + } + /** * Create a ScalarDB scanner instance * @@ -278,6 +307,37 @@ public Scanner createScanner( } } + /** + * Create a ScalarDB scanner instance + * + * @param namespace ScalarDB namespace + * @param table ScalarDB table name + * @param partitionKey Partition key used in ScalarDB scan + * @param scanRange Optional range to set ScalarDB scan start and end values + * @param sortOrders Optional scan clustering key sorting values + * @param projectionColumns List of column projection to use during scan + * @param limit Scan limit value + * @param transaction Distributed transaction object + * @return ScalarDB Scanner object + */ + public TransactionManagerCrudOperable.Scanner createScanner( + String namespace, + String table, + @Nullable Key partitionKey, + @Nullable ScanRange scanRange, + @Nullable List sortOrders, + @Nullable List projectionColumns, + int limit, + DistributedTransactionManager transaction) { + Scan scan = + createScan(namespace, table, partitionKey, scanRange, sortOrders, projectionColumns, limit); + try { + return transaction.getScanner(scan); + } catch (CrudException e) { + throw new RuntimeException(e); + } + } + /** * Create ScalarDB scan instance * diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataService.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataService.java index 70d49a51e..8c50880b7 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataService.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataService.java @@ -1,24 +1,18 @@ package com.scalar.db.dataloader.core.tablemetadata; -import com.scalar.db.api.DistributedStorageAdmin; import com.scalar.db.api.TableMetadata; import com.scalar.db.dataloader.core.DataLoaderError; import com.scalar.db.dataloader.core.util.TableMetadataUtil; -import com.scalar.db.exception.storage.ExecutionException; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import lombok.RequiredArgsConstructor; /** - * Service for retrieving {@link TableMetadata} from ScalarDB. Provides methods to fetch metadata - * for individual tables or a collection of tables. + * Abstract base class for retrieving {@link TableMetadata} from ScalarDB. Provides shared logic for + * fetching metadata for a collection of tables. Subclasses must implement the specific logic for + * fetching individual table metadata. */ -@SuppressWarnings("SameNameButDifferent") -@RequiredArgsConstructor -public class TableMetadataService { - - private final DistributedStorageAdmin storageAdmin; +public abstract class TableMetadataService { /** * Retrieves the {@link TableMetadata} for a specific namespace and table name. @@ -31,17 +25,12 @@ public class TableMetadataService { */ public TableMetadata getTableMetadata(String namespace, String tableName) throws TableMetadataException { - try { - TableMetadata tableMetadata = storageAdmin.getTableMetadata(namespace, tableName); - if (tableMetadata == null) { - throw new TableMetadataException( - DataLoaderError.MISSING_NAMESPACE_OR_TABLE.buildMessage(namespace, tableName)); - } - return tableMetadata; - } catch (ExecutionException e) { + TableMetadata metadata = getTableMetadataInternal(namespace, tableName); + if (metadata == null) { throw new TableMetadataException( - DataLoaderError.TABLE_METADATA_RETRIEVAL_FAILED.buildMessage(e.getMessage()), e); + DataLoaderError.MISSING_NAMESPACE_OR_TABLE.buildMessage(namespace, tableName)); } + return metadata; } /** @@ -60,15 +49,25 @@ public TableMetadata getTableMetadata(String namespace, String tableName) public Map getTableMetadata(Collection requests) throws TableMetadataException { Map metadataMap = new HashMap<>(); - for (TableMetadataRequest request : requests) { String namespace = request.getNamespace(); String tableName = request.getTable(); - TableMetadata tableMetadata = getTableMetadata(namespace, tableName); + TableMetadata metadata = getTableMetadata(namespace, tableName); String key = TableMetadataUtil.getTableLookupKey(namespace, tableName); - metadataMap.put(key, tableMetadata); + metadataMap.put(key, metadata); } - return metadataMap; } + + /** + * Abstract method for retrieving table metadata for a specific namespace and table. Subclasses + * must implement this to define how the metadata is fetched. + * + * @param namespace The namespace of the table. + * @param tableName The table name. + * @return The {@link TableMetadata} object, or null if not found. + * @throws TableMetadataException if an error occurs during metadata retrieval. + */ + protected abstract TableMetadata getTableMetadataInternal(String namespace, String tableName) + throws TableMetadataException; } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataStorageService.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataStorageService.java new file mode 100644 index 000000000..b74947cdf --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataStorageService.java @@ -0,0 +1,38 @@ +package com.scalar.db.dataloader.core.tablemetadata; + +import com.scalar.db.api.DistributedStorageAdmin; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.dataloader.core.DataLoaderError; +import com.scalar.db.exception.storage.ExecutionException; +import lombok.RequiredArgsConstructor; + +/** + * Implementation of {@link TableMetadataService} that retrieves table metadata using {@link + * DistributedStorageAdmin}. + */ +@SuppressWarnings("SameNameButDifferent") +@RequiredArgsConstructor +public class TableMetadataStorageService extends TableMetadataService { + + private final DistributedStorageAdmin storageAdmin; + + /** + * Retrieves the {@link TableMetadata} for a given namespace and table using the {@link + * DistributedStorageAdmin}. + * + * @param namespace The namespace of the table. + * @param tableName The name of the table. + * @return The {@link TableMetadata} for the specified table, or null if not found. + * @throws TableMetadataException If an error occurs while fetching metadata. + */ + @Override + protected TableMetadata getTableMetadataInternal(String namespace, String tableName) + throws TableMetadataException { + try { + return storageAdmin.getTableMetadata(namespace, tableName); + } catch (ExecutionException e) { + throw new TableMetadataException( + DataLoaderError.TABLE_METADATA_RETRIEVAL_FAILED.buildMessage(e.getMessage()), e); + } + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataTransactionService.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataTransactionService.java new file mode 100644 index 000000000..a9053a52d --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataTransactionService.java @@ -0,0 +1,38 @@ +package com.scalar.db.dataloader.core.tablemetadata; + +import com.scalar.db.api.DistributedTransactionAdmin; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.dataloader.core.DataLoaderError; +import com.scalar.db.exception.storage.ExecutionException; +import lombok.RequiredArgsConstructor; + +/** + * Implementation of {@link TableMetadataService} that retrieves table metadata using {@link + * DistributedTransactionAdmin}. + */ +@SuppressWarnings("SameNameButDifferent") +@RequiredArgsConstructor +public class TableMetadataTransactionService extends TableMetadataService { + + private final DistributedTransactionAdmin transactionAdmin; + + /** + * Retrieves the {@link TableMetadata} for a given namespace and table using the {@link + * DistributedTransactionAdmin}. + * + * @param namespace The namespace of the table. + * @param tableName The name of the table. + * @return The {@link TableMetadata} for the specified table, or null if not found. + * @throws TableMetadataException If an error occurs while fetching metadata. + */ + @Override + protected TableMetadata getTableMetadataInternal(String namespace, String tableName) + throws TableMetadataException { + try { + return transactionAdmin.getTableMetadata(namespace, tableName); + } catch (ExecutionException e) { + throw new TableMetadataException( + DataLoaderError.TABLE_METADATA_RETRIEVAL_FAILED.buildMessage(e.getMessage()), e); + } + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java index ca65c1001..22b0830f2 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/CsvExportManagerTest.java @@ -1,16 +1,23 @@ package com.scalar.db.dataloader.core.dataexport; +import static org.mockito.Mockito.when; + import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Result; import com.scalar.db.api.Scanner; import com.scalar.db.api.TableMetadata; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.common.ResultImpl; import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.ScalarDbMode; import com.scalar.db.dataloader.core.ScanRange; import com.scalar.db.dataloader.core.UnitTestUtils; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException; +import com.scalar.db.exception.transaction.TransactionException; import com.scalar.db.io.Column; import com.scalar.db.io.IntColumn; import com.scalar.db.io.Key; @@ -33,20 +40,25 @@ public class CsvExportManagerTest { TableMetadata mockData; DistributedStorage storage; + DistributedTransactionManager manager; + DistributedTransaction transaction; @Spy ScalarDbDao dao; ProducerTaskFactory producerTaskFactory; ExportManager exportManager; @BeforeEach - void setup() { + void setup() throws TransactionException { storage = Mockito.mock(DistributedStorage.class); + manager = Mockito.mock(DistributedTransactionManager.class); + transaction = Mockito.mock(DistributedTransaction.class); mockData = UnitTestUtils.createTestTableMetadata(); dao = Mockito.mock(ScalarDbDao.class); + when(manager.start()).thenReturn(transaction); producerTaskFactory = new ProducerTaskFactory(null, false, true); } @Test - void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() + void startExport_givenValidDataWithoutPartitionKey_withStorage_shouldGenerateOutputFile() throws IOException, ScalarDbDaoException { exportManager = new JsonLineExportManager(storage, dao, producerTaskFactory); Scanner scanner = Mockito.mock(Scanner.class); @@ -60,15 +72,14 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() .scanRange(new ScanRange(null, null, false, false)) .build(); - Mockito.when( - dao.createScanner( - exportOptions.getNamespace(), - exportOptions.getTableName(), - exportOptions.getProjectionColumns(), - exportOptions.getLimit(), - storage)) + when(dao.createScanner( + exportOptions.getNamespace(), + exportOptions.getTableName(), + exportOptions.getProjectionColumns(), + exportOptions.getLimit(), + storage)) .thenReturn(scanner); - Mockito.when(scanner.iterator()).thenReturn(results.iterator()); + when(scanner.iterator()).thenReturn(results.iterator()); try (BufferedWriter writer = new BufferedWriter( Files.newBufferedWriter( @@ -84,7 +95,7 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() } @Test - void startExport_givenPartitionKey_shouldGenerateOutputFile() + void startExport_givenPartitionKey_withStorage_shouldGenerateOutputFile() throws IOException, ScalarDbDaoException { producerTaskFactory = new ProducerTaskFactory(",", false, false); exportManager = new CsvExportManager(storage, dao, producerTaskFactory); @@ -104,18 +115,103 @@ void startExport_givenPartitionKey_shouldGenerateOutputFile() .scanRange(new ScanRange(null, null, false, false)) .build(); - Mockito.when( - dao.createScanner( - exportOptions.getNamespace(), - exportOptions.getTableName(), - exportOptions.getScanPartitionKey(), - exportOptions.getScanRange(), - exportOptions.getSortOrders(), - exportOptions.getProjectionColumns(), - exportOptions.getLimit(), - storage)) + when(dao.createScanner( + exportOptions.getNamespace(), + exportOptions.getTableName(), + exportOptions.getScanPartitionKey(), + exportOptions.getScanRange(), + exportOptions.getSortOrders(), + exportOptions.getProjectionColumns(), + exportOptions.getLimit(), + storage)) + .thenReturn(scanner); + when(scanner.iterator()).thenReturn(results.iterator()); + try (BufferedWriter writer = + new BufferedWriter( + Files.newBufferedWriter( + Paths.get(filePath), + Charset.defaultCharset(), // Explicitly use the default charset + StandardOpenOption.CREATE, + StandardOpenOption.APPEND))) { + exportManager.startExport(exportOptions, mockData, writer); + } + File file = new File(filePath); + Assertions.assertTrue(file.exists()); + Assertions.assertTrue(file.delete()); + } + + @Test + void startExport_givenValidDataWithoutPartitionKey_withTransaction_shouldGenerateOutputFile() + throws IOException, ScalarDbDaoException { + exportManager = new JsonLineExportManager(manager, dao, producerTaskFactory); + TransactionManagerCrudOperable.Scanner scanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); + String filePath = Paths.get("").toAbsolutePath() + "/output.csv"; + Map> values = UnitTestUtils.createTestValues(); + Result result = new ResultImpl(values, mockData); + List results = Collections.singletonList(result); + ExportOptions exportOptions = + ExportOptions.builder("namespace", "table", null, FileFormat.CSV) + .sortOrders(Collections.emptyList()) + .scanRange(new ScanRange(null, null, false, false)) + .scalarDbMode(ScalarDbMode.TRANSACTION) + .build(); + + when(dao.createScanner( + exportOptions.getNamespace(), + exportOptions.getTableName(), + exportOptions.getProjectionColumns(), + exportOptions.getLimit(), + manager)) + .thenReturn(scanner); + when(scanner.iterator()).thenReturn(results.iterator()); + try (BufferedWriter writer = + new BufferedWriter( + Files.newBufferedWriter( + Paths.get(filePath), + Charset.defaultCharset(), // Explicitly use the default charset + StandardOpenOption.CREATE, + StandardOpenOption.APPEND))) { + exportManager.startExport(exportOptions, mockData, writer); + } + File file = new File(filePath); + Assertions.assertTrue(file.exists()); + Assertions.assertTrue(file.delete()); + } + + @Test + void startExport_givenPartitionKey_withTransaction_shouldGenerateOutputFile() throws IOException { + producerTaskFactory = new ProducerTaskFactory(",", false, false); + exportManager = new CsvExportManager(manager, dao, producerTaskFactory); + TransactionManagerCrudOperable.Scanner scanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); + String filePath = Paths.get("").toAbsolutePath() + "/output.csv"; + Map> values = UnitTestUtils.createTestValues(); + Result result = new ResultImpl(values, mockData); + List results = Collections.singletonList(result); + + ExportOptions exportOptions = + ExportOptions.builder( + "namespace", + "table", + Key.newBuilder().add(IntColumn.of("col1", 1)).build(), + FileFormat.CSV) + .sortOrders(Collections.emptyList()) + .scanRange(new ScanRange(null, null, false, false)) + .scalarDbMode(ScalarDbMode.TRANSACTION) + .build(); + + when(dao.createScanner( + exportOptions.getNamespace(), + exportOptions.getTableName(), + exportOptions.getScanPartitionKey(), + exportOptions.getScanRange(), + exportOptions.getSortOrders(), + exportOptions.getProjectionColumns(), + exportOptions.getLimit(), + manager)) .thenReturn(scanner); - Mockito.when(scanner.iterator()).thenReturn(results.iterator()); + when(scanner.iterator()).thenReturn(results.iterator()); try (BufferedWriter writer = new BufferedWriter( Files.newBufferedWriter( diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonExportManagerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonExportManagerTest.java index c1ef7ead1..bc1f64296 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonExportManagerTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonExportManagerTest.java @@ -1,16 +1,23 @@ package com.scalar.db.dataloader.core.dataexport; +import static org.mockito.Mockito.when; + import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Result; import com.scalar.db.api.Scanner; import com.scalar.db.api.TableMetadata; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.common.ResultImpl; import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.ScalarDbMode; import com.scalar.db.dataloader.core.ScanRange; import com.scalar.db.dataloader.core.UnitTestUtils; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDaoException; +import com.scalar.db.exception.transaction.TransactionException; import com.scalar.db.io.Column; import com.scalar.db.io.IntColumn; import com.scalar.db.io.Key; @@ -34,20 +41,25 @@ public class JsonExportManagerTest { TableMetadata mockData; DistributedStorage storage; + DistributedTransactionManager manager; + DistributedTransaction transaction; @Spy ScalarDbDao dao; ProducerTaskFactory producerTaskFactory; ExportManager exportManager; @BeforeEach - void setup() { + void setup() throws TransactionException { storage = Mockito.mock(DistributedStorage.class); + transaction = Mockito.mock(DistributedTransaction.class); + manager = Mockito.mock(DistributedTransactionManager.class); mockData = UnitTestUtils.createTestTableMetadata(); dao = Mockito.mock(ScalarDbDao.class); producerTaskFactory = new ProducerTaskFactory(null, false, true); + when(manager.start()).thenReturn(transaction); } @Test - void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() + void startExport_givenValidDataWithoutPartitionKey_withStorage_shouldGenerateOutputFile() throws IOException, ScalarDbDaoException { exportManager = new JsonExportManager(storage, dao, producerTaskFactory); Scanner scanner = Mockito.mock(Scanner.class); @@ -86,7 +98,7 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() } @Test - void startExport_givenPartitionKey_shouldGenerateOutputFile() + void startExport_givenPartitionKey_withStorage_shouldGenerateOutputFile() throws IOException, ScalarDbDaoException { exportManager = new JsonExportManager(storage, dao, producerTaskFactory); Scanner scanner = Mockito.mock(Scanner.class); @@ -130,4 +142,93 @@ void startExport_givenPartitionKey_shouldGenerateOutputFile() Assertions.assertTrue(file.exists()); Assertions.assertTrue(file.delete()); } + + @Test + void + startExport_givenValidDataWithoutPartitionKey_withTransaction_withStorage_shouldGenerateOutputFile() + throws IOException, ScalarDbDaoException { + exportManager = new JsonExportManager(manager, dao, producerTaskFactory); + TransactionManagerCrudOperable.Scanner scanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); + String filePath = Paths.get("").toAbsolutePath() + "/output.json"; + Map> values = UnitTestUtils.createTestValues(); + Result result = new ResultImpl(values, mockData); + List results = Collections.singletonList(result); + + ExportOptions exportOptions = + ExportOptions.builder("namespace", "table", null, FileFormat.JSON) + .sortOrders(Collections.emptyList()) + .scanRange(new ScanRange(null, null, false, false)) + .scalarDbMode(ScalarDbMode.TRANSACTION) + .build(); + + Mockito.when( + dao.createScanner( + exportOptions.getNamespace(), + exportOptions.getTableName(), + exportOptions.getProjectionColumns(), + exportOptions.getLimit(), + manager)) + .thenReturn(scanner); + Mockito.when(scanner.iterator()).thenReturn(results.iterator()); + try (BufferedWriter writer = + new BufferedWriter( + Files.newBufferedWriter( + Paths.get(filePath), + Charset.defaultCharset(), // Explicitly use the default charset + StandardOpenOption.CREATE, + StandardOpenOption.APPEND))) { + exportManager.startExport(exportOptions, mockData, writer); + } + File file = new File(filePath); + Assertions.assertTrue(file.exists()); + Assertions.assertTrue(file.delete()); + } + + @Test + void startExport_givenPartitionKey_withTransaction_shouldGenerateOutputFile() throws IOException { + exportManager = new JsonExportManager(manager, dao, producerTaskFactory); + TransactionManagerCrudOperable.Scanner scanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); + String filePath = Paths.get("").toAbsolutePath() + "/output.json"; + Map> values = UnitTestUtils.createTestValues(); + Result result = new ResultImpl(values, mockData); + List results = Collections.singletonList(result); + + ExportOptions exportOptions = + ExportOptions.builder( + "namespace", + "table", + Key.newBuilder().add(IntColumn.of("col1", 1)).build(), + FileFormat.JSON) + .sortOrders(Collections.emptyList()) + .scanRange(new ScanRange(null, null, false, false)) + .scalarDbMode(ScalarDbMode.TRANSACTION) + .build(); + + Mockito.when( + dao.createScanner( + exportOptions.getNamespace(), + exportOptions.getTableName(), + exportOptions.getScanPartitionKey(), + exportOptions.getScanRange(), + exportOptions.getSortOrders(), + exportOptions.getProjectionColumns(), + exportOptions.getLimit(), + manager)) + .thenReturn(scanner); + Mockito.when(scanner.iterator()).thenReturn(results.iterator()); + try (BufferedWriter writer = + new BufferedWriter( + Files.newBufferedWriter( + Paths.get(filePath), + Charset.defaultCharset(), // Explicitly use the default charset + StandardOpenOption.CREATE, + StandardOpenOption.APPEND))) { + exportManager.startExport(exportOptions, mockData, writer); + } + File file = new File(filePath); + Assertions.assertTrue(file.exists()); + Assertions.assertTrue(file.delete()); + } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManagerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManagerTest.java index 31e4326a3..e06aa09f6 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManagerTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataexport/JsonLineExportManagerTest.java @@ -1,11 +1,14 @@ package com.scalar.db.dataloader.core.dataexport; import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Result; import com.scalar.db.api.Scanner; import com.scalar.db.api.TableMetadata; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.common.ResultImpl; import com.scalar.db.dataloader.core.FileFormat; +import com.scalar.db.dataloader.core.ScalarDbMode; import com.scalar.db.dataloader.core.ScanRange; import com.scalar.db.dataloader.core.UnitTestUtils; import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; @@ -33,6 +36,7 @@ public class JsonLineExportManagerTest { TableMetadata mockData; DistributedStorage storage; + DistributedTransactionManager manager; @Spy ScalarDbDao dao; ProducerTaskFactory producerTaskFactory; ExportManager exportManager; @@ -40,13 +44,14 @@ public class JsonLineExportManagerTest { @BeforeEach void setup() { storage = Mockito.mock(DistributedStorage.class); + manager = Mockito.mock(DistributedTransactionManager.class); mockData = UnitTestUtils.createTestTableMetadata(); dao = Mockito.mock(ScalarDbDao.class); producerTaskFactory = new ProducerTaskFactory(null, false, true); } @Test - void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() + void startExport_givenValidDataWithoutPartitionKey_withStorage_shouldGenerateOutputFile() throws IOException, ScalarDbDaoException { exportManager = new JsonLineExportManager(storage, dao, producerTaskFactory); Scanner scanner = Mockito.mock(Scanner.class); @@ -85,7 +90,7 @@ void startExport_givenValidDataWithoutPartitionKey_shouldGenerateOutputFile() } @Test - void startExport_givenPartitionKey_shouldGenerateOutputFile() + void startExport_givenPartitionKey_withStorage_shouldGenerateOutputFile() throws IOException, ScalarDbDaoException { exportManager = new JsonLineExportManager(storage, dao, producerTaskFactory); Scanner scanner = Mockito.mock(Scanner.class); @@ -129,4 +134,93 @@ void startExport_givenPartitionKey_shouldGenerateOutputFile() Assertions.assertTrue(file.exists()); Assertions.assertTrue(file.delete()); } + + @Test + void + startExport_givenValidDataWithoutPartitionKey_withTransaction_withStorage_shouldGenerateOutputFile() + throws IOException, ScalarDbDaoException { + exportManager = new JsonLineExportManager(manager, dao, producerTaskFactory); + TransactionManagerCrudOperable.Scanner scanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); + String filePath = Paths.get("").toAbsolutePath() + "/output.jsonl"; + Map> values = UnitTestUtils.createTestValues(); + Result result = new ResultImpl(values, mockData); + List results = Collections.singletonList(result); + + ExportOptions exportOptions = + ExportOptions.builder("namespace", "table", null, FileFormat.JSONL) + .sortOrders(Collections.emptyList()) + .scanRange(new ScanRange(null, null, false, false)) + .scalarDbMode(ScalarDbMode.TRANSACTION) + .build(); + + Mockito.when( + dao.createScanner( + exportOptions.getNamespace(), + exportOptions.getTableName(), + exportOptions.getProjectionColumns(), + exportOptions.getLimit(), + manager)) + .thenReturn(scanner); + Mockito.when(scanner.iterator()).thenReturn(results.iterator()); + try (BufferedWriter writer = + new BufferedWriter( + Files.newBufferedWriter( + Paths.get(filePath), + Charset.defaultCharset(), // Explicitly use the default charset + StandardOpenOption.CREATE, + StandardOpenOption.APPEND))) { + exportManager.startExport(exportOptions, mockData, writer); + } + File file = new File(filePath); + Assertions.assertTrue(file.exists()); + Assertions.assertTrue(file.delete()); + } + + @Test + void startExport_givenPartitionKey_withTransaction_shouldGenerateOutputFile() throws IOException { + exportManager = new JsonLineExportManager(manager, dao, producerTaskFactory); + TransactionManagerCrudOperable.Scanner scanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); + String filePath = Paths.get("").toAbsolutePath() + "/output.jsonl"; + Map> values = UnitTestUtils.createTestValues(); + Result result = new ResultImpl(values, mockData); + List results = Collections.singletonList(result); + + ExportOptions exportOptions = + ExportOptions.builder( + "namespace", + "table", + Key.newBuilder().add(IntColumn.of("col1", 1)).build(), + FileFormat.JSONL) + .sortOrders(Collections.emptyList()) + .scanRange(new ScanRange(null, null, false, false)) + .scalarDbMode(ScalarDbMode.TRANSACTION) + .build(); + + Mockito.when( + dao.createScanner( + exportOptions.getNamespace(), + exportOptions.getTableName(), + exportOptions.getScanPartitionKey(), + exportOptions.getScanRange(), + exportOptions.getSortOrders(), + exportOptions.getProjectionColumns(), + exportOptions.getLimit(), + manager)) + .thenReturn(scanner); + Mockito.when(scanner.iterator()).thenReturn(results.iterator()); + try (BufferedWriter writer = + new BufferedWriter( + Files.newBufferedWriter( + Paths.get(filePath), + Charset.defaultCharset(), // Explicitly use the default charset + StandardOpenOption.CREATE, + StandardOpenOption.APPEND))) { + exportManager.startExport(exportOptions, mockData, writer); + } + File file = new File(filePath); + Assertions.assertTrue(file.exists()); + Assertions.assertTrue(file.delete()); + } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDaoTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDaoTest.java index 58a62203e..be1fb788c 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDaoTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/dao/ScalarDbDaoTest.java @@ -10,25 +10,41 @@ import static com.scalar.db.dataloader.core.UnitTestUtils.TEST_VALUE_INT; import static com.scalar.db.dataloader.core.UnitTestUtils.TEST_VALUE_LONG; import static org.assertj.core.api.Assertions.assertThat; - +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.scalar.db.api.DistributedStorage; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Scan; import com.scalar.db.api.ScanBuilder; +import com.scalar.db.api.Scanner; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.dataloader.core.ScanRange; +import com.scalar.db.exception.storage.ExecutionException; +import com.scalar.db.exception.transaction.CrudException; import com.scalar.db.io.Key; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; class ScalarDbDaoTest { private static final int TEST_VALUE_INT_MIN = 1; private ScalarDbDao dao; + private DistributedTransactionManager manager; + private DistributedStorage distributedStorage; @BeforeEach void setUp() { this.dao = new ScalarDbDao(); + this.distributedStorage = mock(DistributedStorage.class); + this.manager = mock(DistributedTransactionManager.class); } @Test @@ -161,6 +177,58 @@ void createScan_scanAllWithLimitAndProjection_shouldCreateScanAllObjectWithLimit assertThat(scan.toString()).isEqualTo(expectedResult.toString()); } + @Test + void createScanner_withTransactionManager_ShouldCreateScannerObject() + throws CrudException, ScalarDbDaoException { + // Create Scan Object + TransactionManagerCrudOperable.Scanner mockScanner = + Mockito.mock(TransactionManagerCrudOperable.Scanner.class); + when(manager.getScanner(any())).thenReturn(mockScanner); + TransactionManagerCrudOperable.Scanner result = + this.dao.createScanner( + TEST_NAMESPACE, + TEST_TABLE_NAME, + null, + new ScanRange(null, null, false, false), + new ArrayList<>(), + new ArrayList<>(), + 0, + manager); + // Assert + assertNotNull(result); + assertEquals(mockScanner, result); + result = this.dao.createScanner(TEST_NAMESPACE, TEST_TABLE_NAME, null, 0, manager); + // Assert + assertNotNull(result); + assertEquals(mockScanner, result); + } + + @Test + void createScanner_withStorage_ShouldCreateScannerObject() + throws CrudException, ExecutionException, ScalarDbDaoException { + // Create Scan Object + Scanner mockScanner = mock(Scanner.class); + when(distributedStorage.scan(any())).thenReturn(mockScanner); + Scanner result = + this.dao.createScanner( + TEST_NAMESPACE, + TEST_TABLE_NAME, + null, + new ScanRange(null, null, false, false), + new ArrayList<>(), + new ArrayList<>(), + 0, + distributedStorage); + // Assert + assertNotNull(result); + assertEquals(mockScanner, result); + + result = this.dao.createScanner(TEST_NAMESPACE, TEST_TABLE_NAME, null, 0, distributedStorage); + // Assert + assertNotNull(result); + assertEquals(mockScanner, result); + } + /** * Create Scan Object * diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataServiceTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataStorageServiceTest.java similarity index 88% rename from data-loader/core/src/test/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataServiceTest.java rename to data-loader/core/src/test/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataStorageServiceTest.java index ff9d5d8b0..5fe097faf 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataServiceTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataStorageServiceTest.java @@ -15,7 +15,7 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; -class TableMetadataServiceTest { +class TableMetadataStorageServiceTest { DistributedStorageAdmin storageAdmin; TableMetadataService tableMetadataService; @@ -25,23 +25,28 @@ void setup() throws ExecutionException { storageAdmin = Mockito.mock(DistributedStorageAdmin.class); Mockito.when(storageAdmin.getTableMetadata("namespace", "table")) .thenReturn(UnitTestUtils.createTestTableMetadata()); - tableMetadataService = new TableMetadataService(storageAdmin); + + tableMetadataService = new TableMetadataStorageService(storageAdmin); } @Test void getTableMetadata_withValidNamespaceAndTable_shouldReturnTableMetadataMap() throws TableMetadataException { - Map expected = new HashMap<>(); expected.put("namespace.table", UnitTestUtils.createTestTableMetadata()); + TableMetadataRequest tableMetadataRequest = new TableMetadataRequest("namespace", "table"); Map output = tableMetadataService.getTableMetadata(Collections.singleton(tableMetadataRequest)); + Assertions.assertEquals(expected.get("namespace.table"), output.get("namespace.table")); } @Test - void getTableMetadata_withInvalidNamespaceAndTable_shouldThrowException() { + void getTableMetadata_withInvalidNamespaceAndTable_shouldThrowException() + throws ExecutionException { + Mockito.when(storageAdmin.getTableMetadata("namespace2", "table2")).thenReturn(null); + TableMetadataRequest tableMetadataRequest = new TableMetadataRequest("namespace2", "table2"); assertThatThrownBy( () -> diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataTransactionServiceTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataTransactionServiceTest.java new file mode 100644 index 000000000..1536a9e65 --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataTransactionServiceTest.java @@ -0,0 +1,58 @@ +package com.scalar.db.dataloader.core.tablemetadata; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.scalar.db.api.DistributedTransactionAdmin; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.dataloader.core.DataLoaderError; +import com.scalar.db.dataloader.core.UnitTestUtils; +import com.scalar.db.exception.storage.ExecutionException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +class TableMetadataTransactionServiceTest { + + DistributedTransactionAdmin transactionAdmin; + TableMetadataService tableMetadataService; + + @BeforeEach + void setup() throws ExecutionException { + transactionAdmin = Mockito.mock(DistributedTransactionAdmin.class); + Mockito.when(transactionAdmin.getTableMetadata("namespace", "table")) + .thenReturn(UnitTestUtils.createTestTableMetadata()); + + tableMetadataService = new TableMetadataTransactionService(transactionAdmin); + } + + @Test + void getTableMetadata_withValidNamespaceAndTable_shouldReturnTableMetadataMap() + throws TableMetadataException { + Map expected = new HashMap<>(); + expected.put("namespace.table", UnitTestUtils.createTestTableMetadata()); + + TableMetadataRequest tableMetadataRequest = new TableMetadataRequest("namespace", "table"); + Map output = + tableMetadataService.getTableMetadata(Collections.singleton(tableMetadataRequest)); + + Assertions.assertEquals(expected.get("namespace.table"), output.get("namespace.table")); + } + + @Test + void getTableMetadata_withInvalidNamespaceAndTable_shouldThrowException() + throws ExecutionException { + Mockito.when(transactionAdmin.getTableMetadata("namespace2", "table2")).thenReturn(null); + + TableMetadataRequest tableMetadataRequest = new TableMetadataRequest("namespace2", "table2"); + assertThatThrownBy( + () -> + tableMetadataService.getTableMetadata(Collections.singleton(tableMetadataRequest))) + .isInstanceOf(TableMetadataException.class) + .hasMessage( + DataLoaderError.MISSING_NAMESPACE_OR_TABLE.buildMessage("namespace2", "table2")); + } +}