Skip to content

Add transaction mode support for data export in dataloader core and CLI #2740

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 37 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
da1cb57
Add getScanner() method to transaction interfaces (#2698)
brfrn169 May 28, 2025
29ca31b
Merge branch 'master' into add-scanner-api-to-transaction-abstraction
brfrn169 May 28, 2025
8ea08fa
Implement scanner API for single CRUD transactions (#2701)
brfrn169 May 29, 2025
5042c49
Implement scanner API for JDBC transactions (#2702)
brfrn169 May 29, 2025
e4db961
Merge branch 'master' into add-scanner-api-to-transaction-abstraction
brfrn169 May 29, 2025
7b91326
Implement scanner API for Consensus Commit (#2711)
brfrn169 Jun 2, 2025
9ad1219
Add more integration tests for scanner API (#2724)
brfrn169 Jun 3, 2025
7ab8b65
Merge branch 'master' into add-scanner-api-to-transaction-abstraction
brfrn169 Jun 3, 2025
b3414fb
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 4, 2025
e32fd44
Initial core and cli changes
inv-jishnu Jun 4, 2025
24ce20c
minor test change
inv-jishnu Jun 5, 2025
68059f8
Added unit test for Dao
inv-jishnu Jun 5, 2025
eb62e60
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 5, 2025
f4b0571
Changes
inv-jishnu Jun 5, 2025
735c47e
Updated javadoc
inv-jishnu Jun 5, 2025
43ef7bc
Spotless applied on CLI
inv-jishnu Jun 5, 2025
7c15643
Resolved conflicts and merged changes from master
inv-jishnu Jun 9, 2025
a53757b
Merge branch 'master' into feat/data-loader/export-trn
ypeckstadt Jun 11, 2025
cbba68c
Fix
inv-jishnu Jun 11, 2025
1c62a14
Changes
inv-jishnu Jun 11, 2025
72d5907
Removed unwanted files
inv-jishnu Jun 11, 2025
a3ae85e
Spotbugs issue fixed
inv-jishnu Jun 11, 2025
c20b183
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 11, 2025
303b288
Table metadata service split to storage and transaction
inv-jishnu Jun 11, 2025
37fa8af
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 11, 2025
8c189e8
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 12, 2025
36f9b43
Changed implemenatation to use DistributedTransactionManager
inv-jishnu Jun 12, 2025
374e833
Renamed CLI input short names for mode and inlude metadata
inv-jishnu Jun 13, 2025
39a4abd
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 13, 2025
761deca
Javadocs added
inv-jishnu Jun 16, 2025
9fe9c83
Merged changes and resolved conflicts with master branch
inv-jishnu Jun 18, 2025
d45f4cd
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 18, 2025
ed66ab1
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 18, 2025
2138113
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 18, 2025
b9cbd6b
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 19, 2025
c2192da
Merged changes from master after resolving conflicts
inv-jishnu Jun 23, 2025
42bc80f
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static java.nio.file.StandardOpenOption.CREATE;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.cli.exception.DirectoryValidationException;
import com.scalar.db.dataloader.cli.util.DirectoryUtils;
Expand All @@ -13,6 +14,7 @@
import com.scalar.db.dataloader.core.ColumnKeyValue;
import com.scalar.db.dataloader.core.DataLoaderError;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.ScanRange;
import com.scalar.db.dataloader.core.dataexport.CsvExportManager;
import com.scalar.db.dataloader.core.dataexport.ExportManager;
Expand All @@ -25,10 +27,14 @@
import com.scalar.db.dataloader.core.exception.KeyParsingException;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataException;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataStorageService;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataTransactionService;
import com.scalar.db.dataloader.core.util.KeyUtils;
import com.scalar.db.io.Key;
import com.scalar.db.service.StorageFactory;
import com.scalar.db.service.TransactionFactory;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand Down Expand Up @@ -60,14 +66,13 @@ public Integer call() throws Exception {
spec.commandLine(), dataChunkSize, DataLoaderError.INVALID_DATA_CHUNK_SIZE);
validatePositiveValue(spec.commandLine(), maxThreads, DataLoaderError.INVALID_MAX_THREADS);

StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath);
TableMetadataService metaDataService =
new TableMetadataService(storageFactory.getStorageAdmin());
TableMetadataService tableMetadataService =
createTableMetadataService(scalarDbMode, scalarDbPropertiesFilePath);
ScalarDbDao scalarDbDao = new ScalarDbDao();

ExportManager exportManager = createExportManager(storageFactory, scalarDbDao, outputFormat);

TableMetadata tableMetadata = metaDataService.getTableMetadata(namespace, table);
ExportManager exportManager =
createExportManager(scalarDbMode, scalarDbDao, outputFormat, scalarDbPropertiesFilePath);
TableMetadata tableMetadata = tableMetadataService.getTableMetadata(namespace, table);

Key partitionKey =
partitionKeyValue != null ? getKeysFromList(partitionKeyValue, tableMetadata) : null;
Expand Down Expand Up @@ -124,11 +129,100 @@ private void validateOutputDirectory() throws DirectoryValidationException {
}
}

/**
* Creates a {@link TableMetadataService} instance based on the specified {@link ScalarDbMode} and
* ScalarDB configuration file.
*
* <p>If the mode is {@code TRANSACTION}, this method initializes a {@link TransactionFactory} and
* uses its transaction admin to create a {@link TableMetadataTransactionService}. Otherwise, it
* initializes a {@link StorageFactory} and creates a {@link TableMetadataStorageService} using
* its storage admin.
*
* @param scalarDbMode the mode ScalarDB is running in (either {@code STORAGE} or {@code
* TRANSACTION})
* @param scalarDbPropertiesFilePath the path to the ScalarDB properties file
* @return an appropriate {@link TableMetadataService} based on the mode
* @throws IOException if reading the ScalarDB properties file fails
*/
private TableMetadataService createTableMetadataService(
ScalarDbMode scalarDbMode, String scalarDbPropertiesFilePath) throws IOException {
if (scalarDbMode.equals(ScalarDbMode.TRANSACTION)) {
TransactionFactory transactionFactory = TransactionFactory.create(scalarDbPropertiesFilePath);
return new TableMetadataTransactionService(transactionFactory.getTransactionAdmin());
}
StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath);
return new TableMetadataStorageService(storageFactory.getStorageAdmin());
}

/**
* Creates an {@link ExportManager} instance based on ScalarDB mode and file format.
*
* @param scalarDbMode The ScalarDB mode (TRANSACTION or STORAGE).
* @param scalarDbDao The DAO for accessing ScalarDB.
* @param fileFormat The output file format (CSV, JSON, JSONL).
* @param scalarDbPropertiesFilePath Path to the ScalarDB properties file.
* @return A configured {@link ExportManager}.
* @throws IOException If there is an error reading the properties file.
*/
private ExportManager createExportManager(
StorageFactory storageFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) {
ScalarDbMode scalarDbMode,
ScalarDbDao scalarDbDao,
FileFormat fileFormat,
String scalarDbPropertiesFilePath)
throws IOException {
ProducerTaskFactory taskFactory =
new ProducerTaskFactory(delimiter, includeTransactionMetadata, prettyPrintJson);
DistributedStorage storage = storageFactory.getStorage();
if (scalarDbMode.equals(ScalarDbMode.STORAGE)) {
DistributedStorage storage = StorageFactory.create(scalarDbPropertiesFilePath).getStorage();
return createExportManagerWithStorage(storage, scalarDbDao, fileFormat, taskFactory);
} else {
DistributedTransactionManager distributedTransactionManager =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need to consider #2740 (comment) ?

TransactionFactory.create(scalarDbPropertiesFilePath).getTransactionManager();
return createExportManagerWithTransaction(
distributedTransactionManager, scalarDbDao, fileFormat, taskFactory);
}
}

/**
* Returns an {@link ExportManager} that uses {@link DistributedTransactionManager}.
*
* @param distributedTransactionManager distributed transaction manager object
* @param scalarDbDao The DAO for accessing ScalarDB.
* @param fileFormat The output file format (CSV, JSON, JSONL).
* @param taskFactory Producer task factory object
* @return A configured {@link ExportManager}.
*/
private ExportManager createExportManagerWithTransaction(
DistributedTransactionManager distributedTransactionManager,
ScalarDbDao scalarDbDao,
FileFormat fileFormat,
ProducerTaskFactory taskFactory) {
switch (fileFormat) {
case JSON:
return new JsonExportManager(distributedTransactionManager, scalarDbDao, taskFactory);
case JSONL:
return new JsonLineExportManager(distributedTransactionManager, scalarDbDao, taskFactory);
case CSV:
return new CsvExportManager(distributedTransactionManager, scalarDbDao, taskFactory);
default:
throw new AssertionError("Invalid file format" + fileFormat);
}
}

/**
* Returns an {@link ExportManager} that uses {@link DistributedStorage}.
*
* @param storage distributed storage object
* @param scalarDbDao The DAO for accessing ScalarDB.
* @param fileFormat The output file format (CSV, JSON, JSONL).
* @param taskFactory Producer task factory object
* @return A configured {@link ExportManager}.
*/
private ExportManager createExportManagerWithStorage(
DistributedStorage storage,
ScalarDbDao scalarDbDao,
FileFormat fileFormat,
ProducerTaskFactory taskFactory) {
switch (fileFormat) {
case JSON:
return new JsonExportManager(storage, scalarDbDao, taskFactory);
Expand All @@ -152,7 +246,8 @@ private ExportOptions buildExportOptions(Key partitionKey, ScanRange scanRange)
.maxThreadCount(maxThreads)
.dataChunkSize(dataChunkSize)
.prettyPrintJson(prettyPrintJson)
.scanRange(scanRange);
.scanRange(scanRange)
.scalarDbMode(scalarDbMode);

if (projectionColumns != null) {
builder.projectionColumns(projectionColumns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.scalar.db.api.Scan;
import com.scalar.db.dataloader.core.ColumnKeyValue;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScalarDbMode;
import java.util.ArrayList;
import java.util.List;
import picocli.CommandLine;
Expand Down Expand Up @@ -61,7 +62,7 @@ public class ExportCommandOptions {
protected FileFormat outputFormat;

@CommandLine.Option(
names = {"--include-metadata", "-m"},
names = {"--include-metadata", "-im"},
description = "Include transaction metadata in the exported data (default: false)",
defaultValue = "false")
protected boolean includeTransactionMetadata;
Expand Down Expand Up @@ -144,4 +145,11 @@ public class ExportCommandOptions {
description = "Size of the data chunk to process in a single task (default: 200)",
defaultValue = "200")
protected int dataChunkSize;

@CommandLine.Option(
names = {"--mode", "-m"},
description = "ScalarDB mode (STORAGE, TRANSACTION) (default: STORAGE)",
paramLabel = "<MODE>",
defaultValue = "STORAGE")
protected ScalarDbMode scalarDbMode;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorFactory;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataException;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataStorageService;
import com.scalar.db.dataloader.core.util.TableMetadataUtil;
import com.scalar.db.service.StorageFactory;
import com.scalar.db.service.TransactionFactory;
Expand Down Expand Up @@ -106,7 +107,7 @@ private Map<String, TableMetadata> createTableMetadataMap(
File configFile = new File(configFilePath);
StorageFactory storageFactory = StorageFactory.create(configFile);
try (DistributedStorageAdmin storageAdmin = storageFactory.getStorageAdmin()) {
TableMetadataService tableMetadataService = new TableMetadataService(storageAdmin);
TableMetadataService tableMetadataService = new TableMetadataStorageService(storageAdmin);
Map<String, TableMetadata> tableMetadataMap = new HashMap<>();
if (controlFile != null) {
for (ControlFileTable table : controlFile.getTables()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.scalar.db.dataloader.core.DataLoaderError;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScalarDbMode;
import java.io.File;
import java.nio.file.Paths;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -55,4 +56,19 @@ void call_withInvalidScalarDBConfigurationFile_shouldReturnOne() throws Exceptio
exportCommand.outputFormat = FileFormat.JSON;
Assertions.assertEquals(1, exportCommand.call());
}

@Test
void call_withScalarDBModeTransaction_WithInvalidConfigurationFile_shouldReturnOne()
throws Exception {
ExportCommand exportCommand = new ExportCommand();
exportCommand.configFilePath = "scalardb.properties";
exportCommand.dataChunkSize = 100;
exportCommand.namespace = "scalar";
exportCommand.table = "asset";
exportCommand.outputDirectory = "";
exportCommand.outputFileName = "sample.json";
exportCommand.outputFormat = FileFormat.JSON;
exportCommand.scalarDbMode = ScalarDbMode.TRANSACTION;
Assertions.assertEquals(1, exportCommand.call());
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.scalar.db.dataloader.core.dataexport;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
Expand All @@ -13,18 +14,40 @@

/** Export manager implementation which manages the export task that exports data in CSV format */
public class CsvExportManager extends ExportManager {
/**
* Constructs a {@code CsvExportManager} for exporting data using a {@link DistributedStorage}
* instance.
*
* <p>This constructor is used when exporting data in non-transactional (storage) mode.
*
* @param distributedStorage the {@link DistributedStorage} used to read data directly from
* storage
* @param dao the {@link ScalarDbDao} used to interact with ScalarDB for exporting data
* @param producerTaskFactory the factory used to create producer tasks for generating
* CSV-formatted output
*/
public CsvExportManager(
DistributedStorage distributedStorage,
ScalarDbDao dao,
ProducerTaskFactory producerTaskFactory) {
super(distributedStorage, dao, producerTaskFactory);
}

/**
* Constructs a {@code CsvExportManager} with the specified {@link DistributedStorage}, {@link
* ScalarDbDao}, and {@link ProducerTaskFactory}.
* Constructs a {@code CsvExportManager} for exporting data using a {@link
* DistributedTransactionManager}.
*
* @param storage the {@code DistributedStorage} instance used to read data from the database
* @param dao the {@code ScalarDbDao} used to execute export-related database operations
* @param producerTaskFactory the factory used to create producer tasks for exporting data
* @param distributedTransactionManager the transaction manager used to read data with
* transactional guarantees
* @param dao the {@link ScalarDbDao} used to interact with ScalarDB for exporting data
* @param producerTaskFactory the factory used to create producer tasks for generating
* CSV-formatted output
*/
public CsvExportManager(
DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
super(storage, dao, producerTaskFactory);
DistributedTransactionManager distributedTransactionManager,
ScalarDbDao dao,
ProducerTaskFactory producerTaskFactory) {
super(distributedTransactionManager, dao, producerTaskFactory);
}

/**
Expand Down
Loading