Skip to content

Add transaction mode support for data export in dataloader core and CLI #2740

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 37 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
da1cb57
Add getScanner() method to transaction interfaces (#2698)
brfrn169 May 28, 2025
29ca31b
Merge branch 'master' into add-scanner-api-to-transaction-abstraction
brfrn169 May 28, 2025
8ea08fa
Implement scanner API for single CRUD transactions (#2701)
brfrn169 May 29, 2025
5042c49
Implement scanner API for JDBC transactions (#2702)
brfrn169 May 29, 2025
e4db961
Merge branch 'master' into add-scanner-api-to-transaction-abstraction
brfrn169 May 29, 2025
7b91326
Implement scanner API for Consensus Commit (#2711)
brfrn169 Jun 2, 2025
9ad1219
Add more integration tests for scanner API (#2724)
brfrn169 Jun 3, 2025
7ab8b65
Merge branch 'master' into add-scanner-api-to-transaction-abstraction
brfrn169 Jun 3, 2025
b3414fb
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 4, 2025
e32fd44
Initial core and cli changes
inv-jishnu Jun 4, 2025
24ce20c
minor test change
inv-jishnu Jun 5, 2025
68059f8
Added unit test for Dao
inv-jishnu Jun 5, 2025
eb62e60
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 5, 2025
f4b0571
Changes
inv-jishnu Jun 5, 2025
735c47e
Updated javadoc
inv-jishnu Jun 5, 2025
43ef7bc
Spotless applied on CLI
inv-jishnu Jun 5, 2025
7c15643
Resolved conflicts and merged changes from master
inv-jishnu Jun 9, 2025
a53757b
Merge branch 'master' into feat/data-loader/export-trn
ypeckstadt Jun 11, 2025
cbba68c
Fix
inv-jishnu Jun 11, 2025
1c62a14
Changes
inv-jishnu Jun 11, 2025
72d5907
Removed unwanted files
inv-jishnu Jun 11, 2025
a3ae85e
Spotbugs issue fixed
inv-jishnu Jun 11, 2025
c20b183
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 11, 2025
303b288
Table metadata service split to storage and transaction
inv-jishnu Jun 11, 2025
37fa8af
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 11, 2025
8c189e8
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 12, 2025
36f9b43
Changed implemenatation to use DistributedTransactionManager
inv-jishnu Jun 12, 2025
374e833
Renamed CLI input short names for mode and inlude metadata
inv-jishnu Jun 13, 2025
39a4abd
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 13, 2025
761deca
Javadocs added
inv-jishnu Jun 16, 2025
9fe9c83
Merged changes and resolved conflicts with master branch
inv-jishnu Jun 18, 2025
d45f4cd
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 18, 2025
ed66ab1
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 18, 2025
2138113
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 18, 2025
b9cbd6b
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 19, 2025
c2192da
Merged changes from master after resolving conflicts
inv-jishnu Jun 23, 2025
42bc80f
Merge branch 'master' into feat/data-loader/export-trn
inv-jishnu Jun 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static java.nio.file.StandardOpenOption.CREATE;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.common.error.CoreError;
import com.scalar.db.dataloader.cli.exception.DirectoryValidationException;
Expand All @@ -13,6 +14,7 @@
import com.scalar.db.dataloader.cli.util.InvalidFilePathException;
import com.scalar.db.dataloader.core.ColumnKeyValue;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScalarDbMode;
import com.scalar.db.dataloader.core.ScanRange;
import com.scalar.db.dataloader.core.dataexport.CsvExportManager;
import com.scalar.db.dataloader.core.dataexport.ExportManager;
Expand All @@ -25,10 +27,14 @@
import com.scalar.db.dataloader.core.exception.KeyParsingException;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataException;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataStorageService;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataTransactionService;
import com.scalar.db.dataloader.core.util.KeyUtils;
import com.scalar.db.io.Key;
import com.scalar.db.service.StorageFactory;
import com.scalar.db.service.TransactionFactory;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand Down Expand Up @@ -62,14 +68,13 @@ public Integer call() throws Exception {
validatePositiveValue(
spec.commandLine(), maxThreads, CoreError.DATA_LOADER_INVALID_MAX_THREADS);

StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath);
TableMetadataService metaDataService =
new TableMetadataService(storageFactory.getStorageAdmin());
TableMetadataService tableMetadataService =
createTableMetadataService(scalarDbMode, scalarDbPropertiesFilePath);
ScalarDbDao scalarDbDao = new ScalarDbDao();

ExportManager exportManager = createExportManager(storageFactory, scalarDbDao, outputFormat);

TableMetadata tableMetadata = metaDataService.getTableMetadata(namespace, table);
ExportManager exportManager =
createExportManager(scalarDbMode, scalarDbDao, outputFormat, scalarDbPropertiesFilePath);
TableMetadata tableMetadata = tableMetadataService.getTableMetadata(namespace, table);

Key partitionKey =
partitionKeyValue != null ? getKeysFromList(partitionKeyValue, tableMetadata) : null;
Expand Down Expand Up @@ -127,11 +132,85 @@ private void validateOutputDirectory() throws DirectoryValidationException {
}
}

private TableMetadataService createTableMetadataService(
ScalarDbMode scalarDbMode, String scalarDbPropertiesFilePath) throws IOException {
if (scalarDbMode.equals(ScalarDbMode.TRANSACTION)) {
TransactionFactory transactionFactory = TransactionFactory.create(scalarDbPropertiesFilePath);
return new TableMetadataTransactionService(transactionFactory.getTransactionAdmin());
}
StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath);
return new TableMetadataStorageService(storageFactory.getStorageAdmin());
}

/**
* Creates an {@link ExportManager} instance based on ScalarDB mode and file format.
*
* @param scalarDbMode The ScalarDB mode (TRANSACTION or STORAGE).
* @param scalarDbDao The DAO for accessing ScalarDB.
* @param fileFormat The output file format (CSV, JSON, JSONL).
* @param scalarDbPropertiesFilePath Path to the ScalarDB properties file.
* @return A configured {@link ExportManager}.
* @throws IOException If there is an error reading the properties file.
*/
private ExportManager createExportManager(
StorageFactory storageFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) {
ScalarDbMode scalarDbMode,
ScalarDbDao scalarDbDao,
FileFormat fileFormat,
String scalarDbPropertiesFilePath)
throws IOException {
ProducerTaskFactory taskFactory =
new ProducerTaskFactory(delimiter, includeTransactionMetadata, prettyPrintJson);
DistributedStorage storage = storageFactory.getStorage();
if (scalarDbMode.equals(ScalarDbMode.STORAGE)) {
DistributedStorage storage = StorageFactory.create(scalarDbPropertiesFilePath).getStorage();
return createExportManagerWithStorage(storage, scalarDbDao, fileFormat, taskFactory);
} else {
DistributedTransactionManager distributedTransactionManager =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need to consider #2740 (comment) ?

TransactionFactory.create(scalarDbPropertiesFilePath).getTransactionManager();
return createExportManagerWithTransaction(
distributedTransactionManager, scalarDbDao, fileFormat, taskFactory);
}
}

/**
* Returns an {@link ExportManager} that uses {@link DistributedTransactionManager}.
*
* @param distributedTransactionManager distributed transaction manager object
* @param scalarDbDao The DAO for accessing ScalarDB.
* @param fileFormat The output file format (CSV, JSON, JSONL).
* @param taskFactory Producer task factory object
* @return A configured {@link ExportManager}.
*/
private ExportManager createExportManagerWithTransaction(
DistributedTransactionManager distributedTransactionManager,
ScalarDbDao scalarDbDao,
FileFormat fileFormat,
ProducerTaskFactory taskFactory) {
switch (fileFormat) {
case JSON:
return new JsonExportManager(distributedTransactionManager, scalarDbDao, taskFactory);
case JSONL:
return new JsonLineExportManager(distributedTransactionManager, scalarDbDao, taskFactory);
case CSV:
return new CsvExportManager(distributedTransactionManager, scalarDbDao, taskFactory);
default:
throw new AssertionError("Invalid file format" + fileFormat);
}
}

/**
* Returns an {@link ExportManager} that uses {@link DistributedStorage}.
*
* @param storage distributed storage object
* @param scalarDbDao The DAO for accessing ScalarDB.
* @param fileFormat The output file format (CSV, JSON, JSONL).
* @param taskFactory Producer task factory object
* @return A configured {@link ExportManager}.
*/
private ExportManager createExportManagerWithStorage(
DistributedStorage storage,
ScalarDbDao scalarDbDao,
FileFormat fileFormat,
ProducerTaskFactory taskFactory) {
switch (fileFormat) {
case JSON:
return new JsonExportManager(storage, scalarDbDao, taskFactory);
Expand All @@ -155,7 +234,8 @@ private ExportOptions buildExportOptions(Key partitionKey, ScanRange scanRange)
.maxThreadCount(maxThreads)
.dataChunkSize(dataChunkSize)
.prettyPrintJson(prettyPrintJson)
.scanRange(scanRange);
.scanRange(scanRange)
.scalarDbMode(scalarDbMode);

if (projectionColumns != null) {
builder.projectionColumns(projectionColumns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.scalar.db.api.Scan;
import com.scalar.db.dataloader.core.ColumnKeyValue;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScalarDbMode;
import java.util.ArrayList;
import java.util.List;
import picocli.CommandLine;
Expand Down Expand Up @@ -144,4 +145,11 @@ public class ExportCommandOptions {
description = "Size of the data chunk to process in a single task (default: 200)",
defaultValue = "200")
protected int dataChunkSize;

@CommandLine.Option(
names = {"--mode", "-sm"},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why -sm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brfrn169 san,
-m is already used by the --include-matadata input as this was already present

@CommandLine.Option(
      names = {"--include-metadata", "-m"},
      description = "Include transaction metadata in the exported data (default: false)",
      defaultValue = "false")
  protected boolean includeTransactionMetadata;

I didn't want to change the existing value as it has been there from the always and there may be users who use it.
Then also the --mode in import is -m(which also was added in latest change) Should I change the mode to-mand include metadata to something else like -im` ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brfrn169 san,
After discussing with Yves san, I have set -m for mode and -im to inlcude-metadata to make it consistent with import.
Thank you.

description = "ScalarDB mode (STORAGE, TRANSACTION) (default: STORAGE)",
paramLabel = "<MODE>",
defaultValue = "STORAGE")
protected ScalarDbMode scalarDbMode;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorFactory;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataException;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataStorageService;
import com.scalar.db.dataloader.core.util.TableMetadataUtil;
import com.scalar.db.service.StorageFactory;
import com.scalar.db.service.TransactionFactory;
Expand Down Expand Up @@ -109,7 +110,7 @@ private Map<String, TableMetadata> createTableMetadataMap(
File configFile = new File(configFilePath);
StorageFactory storageFactory = StorageFactory.create(configFile);
try (DistributedStorageAdmin storageAdmin = storageFactory.getStorageAdmin()) {
TableMetadataService tableMetadataService = new TableMetadataService(storageAdmin);
TableMetadataService tableMetadataService = new TableMetadataStorageService(storageAdmin);
Map<String, TableMetadata> tableMetadataMap = new HashMap<>();
if (controlFile != null) {
for (ControlFileTable table : controlFile.getTables()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.scalar.db.common.error.CoreError;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScalarDbMode;
import java.io.File;
import java.nio.file.Paths;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -55,4 +56,19 @@ void call_withInvalidScalarDBConfigurationFile_shouldReturnOne() throws Exceptio
exportCommand.outputFormat = FileFormat.JSON;
Assertions.assertEquals(1, exportCommand.call());
}

@Test
void call_withScalarDBModeTransaction_WithInvalidConfigurationFile_shouldReturnOne()
throws Exception {
ExportCommand exportCommand = new ExportCommand();
exportCommand.configFilePath = "scalardb.properties";
exportCommand.dataChunkSize = 100;
exportCommand.namespace = "scalar";
exportCommand.table = "asset";
exportCommand.outputDirectory = "";
exportCommand.outputFileName = "sample.json";
exportCommand.outputFormat = FileFormat.JSON;
exportCommand.scalarDbMode = ScalarDbMode.TRANSACTION;
Assertions.assertEquals(1, exportCommand.call());
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.scalar.db.dataloader.core.dataexport;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
Expand All @@ -13,8 +14,17 @@

public class CsvExportManager extends ExportManager {
public CsvExportManager(
DistributedStorage storage, ScalarDbDao dao, ProducerTaskFactory producerTaskFactory) {
super(storage, dao, producerTaskFactory);
DistributedStorage distributedStorage,
ScalarDbDao dao,
ProducerTaskFactory producerTaskFactory) {
super(distributedStorage, dao, producerTaskFactory);
}

public CsvExportManager(
DistributedTransactionManager distributedTransactionManager,
ScalarDbDao dao,
ProducerTaskFactory producerTaskFactory) {
super(distributedTransactionManager, dao, producerTaskFactory);
}

/**
Expand Down
Loading