Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

10 changes: 0 additions & 10 deletions core/src/main/java/com/scalar/db/api/Mutation.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,6 @@ public Mutation(Mutation mutation) {
condition = mutation.condition;
}

Mutation(
@Nullable String namespace,
String tableName,
Key partitionKey,
@Nullable Key clusteringKey,
@Nullable MutationCondition condition) {
super(namespace, tableName, partitionKey, clusteringKey);
this.condition = condition;
}

/**
* Returns the {@link MutationCondition}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,4 @@ protected Put copyAndSetTargetToIfNot(Put put) {
protected Delete copyAndSetTargetToIfNot(Delete delete) {
return ScalarDbUtils.copyAndSetTargetToIfNot(delete, namespace, tableName);
}

protected Get copyAndPrepareForDynamicFiltering(Get get) {
return ScalarDbUtils.copyAndPrepareForDynamicFiltering(get);
}

protected Scan copyAndPrepareForDynamicFiltering(Scan scan) {
return ScalarDbUtils.copyAndPrepareForDynamicFiltering(scan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.scalar.db.common.checker.OperationChecker;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.util.ScalarDbUtils;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -100,7 +101,9 @@ public Optional<Result> get(Get get) throws ExecutionException {
if (get.getConjunctions().isEmpty()) {
scanner = getInternal(get);
} else {
scanner = new FilterableScanner(get, getInternal(copyAndPrepareForDynamicFiltering(get)));
scanner =
new FilterableScanner(
get, getInternal(ScalarDbUtils.copyAndPrepareForDynamicFiltering(get)));
}
Optional<Result> ret = scanner.one();
if (scanner.one().isPresent()) {
Expand Down Expand Up @@ -134,7 +137,8 @@ public Scanner scan(Scan scan) throws ExecutionException {
if (scan.getConjunctions().isEmpty()) {
return scanInternal(scan);
} else {
return new FilterableScanner(scan, scanInternal(copyAndPrepareForDynamicFiltering(scan)));
return new FilterableScanner(
scan, scanInternal(ScalarDbUtils.copyAndPrepareForDynamicFiltering(scan)));
}
}

Expand Down
8 changes: 6 additions & 2 deletions core/src/main/java/com/scalar/db/storage/cosmos/Cosmos.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.scalar.db.common.checker.OperationChecker;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.util.ScalarDbUtils;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -105,7 +106,9 @@ public Optional<Result> get(Get get) throws ExecutionException {
} else {
scanner =
new FilterableScanner(
get, selectStatementHandler.handle(copyAndPrepareForDynamicFiltering(get)));
get,
selectStatementHandler.handle(
ScalarDbUtils.copyAndPrepareForDynamicFiltering(get)));
}
Optional<Result> ret = scanner.one();
if (scanner.one().isPresent()) {
Expand Down Expand Up @@ -133,7 +136,8 @@ public Scanner scan(Scan scan) throws ExecutionException {
return selectStatementHandler.handle(scan);
} else {
return new FilterableScanner(
scan, selectStatementHandler.handle(copyAndPrepareForDynamicFiltering(scan)));
scan,
selectStatementHandler.handle(ScalarDbUtils.copyAndPrepareForDynamicFiltering(scan)));
}
}

Expand Down
8 changes: 6 additions & 2 deletions core/src/main/java/com/scalar/db/storage/dynamo/Dynamo.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.scalar.db.common.checker.OperationChecker;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.util.ScalarDbUtils;
import java.io.IOException;
import java.net.URI;
import java.util.List;
Expand Down Expand Up @@ -125,7 +126,9 @@ public Optional<Result> get(Get get) throws ExecutionException {
} else {
scanner =
new FilterableScanner(
get, selectStatementHandler.handle(copyAndPrepareForDynamicFiltering(get)));
get,
selectStatementHandler.handle(
ScalarDbUtils.copyAndPrepareForDynamicFiltering(get)));
}
Optional<Result> ret = scanner.one();
if (scanner.one().isPresent()) {
Expand Down Expand Up @@ -153,7 +156,8 @@ public Scanner scan(Scan scan) throws ExecutionException {
return selectStatementHandler.handle(scan);
} else {
return new FilterableScanner(
scan, selectStatementHandler.handle(copyAndPrepareForDynamicFiltering(scan)));
scan,
selectStatementHandler.handle(ScalarDbUtils.copyAndPrepareForDynamicFiltering(scan)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,17 @@ private Put composePut(Operation base, @Nullable TransactionResult result)

private Delete composeDelete(Operation base, @Nullable TransactionResult result)
throws ExecutionException {
return new Delete(getPartitionKey(base, result), getClusteringKey(base, result).orElse(null))
.forNamespace(base.forNamespace().get())
.forTable(base.forTable().get())
.withConsistency(Consistency.LINEARIZABLE)
.withCondition(
return Delete.newBuilder()
.namespace(base.forNamespace().get())
.table(base.forTable().get())
.partitionKey(getPartitionKey(base, result))
.clusteringKey(getClusteringKey(base, result).orElse(null))
.consistency(Consistency.LINEARIZABLE)
.condition(
ConditionBuilder.deleteIf(ConditionBuilder.column(ID).isEqualToText(id))
.and(ConditionBuilder.column(STATE).isEqualToInt(TransactionState.DELETED.get()))
.build());
.build())
.build();
}

private Key getPartitionKey(Operation base, @Nullable TransactionResult result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.Get;
import com.scalar.db.api.Put;
import com.scalar.db.api.PutBuilder;
import com.scalar.db.api.Result;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.api.TransactionState;
Expand Down Expand Up @@ -286,10 +287,12 @@ private void putStateForLazyRecoveryRollbackForGroupCommit(String id)

@VisibleForTesting
Get createGetWith(String id) {
return new Get(Key.ofText(Attribute.ID, id))
.withConsistency(Consistency.LINEARIZABLE)
.forNamespace(coordinatorNamespace)
.forTable(TABLE);
return Get.newBuilder()
.namespace(coordinatorNamespace)
.table(TABLE)
.partitionKey(Key.ofText(Attribute.ID, id))
.consistency(Consistency.LINEARIZABLE)
.build();
}

private Optional<Coordinator.State> get(Get get, String id) throws CoordinatorException {
Expand Down Expand Up @@ -327,17 +330,21 @@ private Optional<Coordinator.State> get(Get get, String id) throws CoordinatorEx

@VisibleForTesting
Put createPutWith(Coordinator.State state) {
Put put = new Put(Key.ofText(Attribute.ID, state.getId()));
String childIds = state.getChildIdsAsString();
PutBuilder.Buildable builder =
Put.newBuilder()
.namespace(coordinatorNamespace)
.table(TABLE)
.partitionKey(Key.ofText(Attribute.ID, state.getId()))
.intValue(Attribute.STATE, state.getState().get())
.bigIntValue(Attribute.CREATED_AT, state.getCreatedAt())
.consistency(Consistency.LINEARIZABLE)
.condition(ConditionBuilder.putIfNotExists());

if (!childIds.isEmpty()) {
put.withTextValue(Attribute.CHILD_IDS, childIds);
builder = builder.textValue(Attribute.CHILD_IDS, childIds);
}
return put.withIntValue(Attribute.STATE, state.getState().get())
.withBigIntValue(Attribute.CREATED_AT, state.getCreatedAt())
.withConsistency(Consistency.LINEARIZABLE)
.withCondition(ConditionBuilder.putIfNotExists())
.forNamespace(coordinatorNamespace)
.forTable(TABLE);
return builder.build();
}

private void put(Put put, String id) throws CoordinatorException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,9 +713,19 @@ private Set<AndConditionSet> convertConjunctions(
}

private Selection prepareStorageSelection(Selection selection) {
selection.clearProjections();
selection.withConsistency(Consistency.LINEARIZABLE);
return selection;
if (selection instanceof Get) {
return Get.newBuilder((Get) selection)
.clearProjections()
.consistency(Consistency.LINEARIZABLE)
.build();
} else {
assert selection instanceof Scan;

return Scan.newBuilder((Scan) selection)
.clearProjections()
.consistency(Consistency.LINEARIZABLE)
.build();
}
}

private TransactionTableMetadata getTransactionTableMetadata(Operation operation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,17 @@ private Delete composeDelete(Operation base, TransactionResult result) throws Ex
Key partitionKey = ScalarDbUtils.getPartitionKey(result, tableMetadata);
Optional<Key> clusteringKey = ScalarDbUtils.getClusteringKey(result, tableMetadata);

return new Delete(partitionKey, clusteringKey.orElse(null))
.forNamespace(base.forNamespace().get())
.forTable(base.forTable().get())
.withCondition(
return Delete.newBuilder()
.namespace(base.forNamespace().get())
.table(base.forTable().get())
.partitionKey(partitionKey)
.clusteringKey(clusteringKey.orElse(null))
.condition(
ConditionBuilder.deleteIf(ConditionBuilder.column(ID).isEqualToText(id))
.and(ConditionBuilder.column(STATE).isEqualToInt(result.getState().get()))
.build())
.withConsistency(Consistency.LINEARIZABLE);
.consistency(Consistency.LINEARIZABLE)
.build();
}

private Optional<TransactionResult> getLatestResult(
Expand All @@ -155,10 +158,13 @@ private Optional<TransactionResult> getLatestResult(
}

Get get =
new Get(partitionKey, clusteringKey)
.withConsistency(Consistency.LINEARIZABLE)
.forNamespace(operation.forNamespace().get())
.forTable(operation.forTable().get());
Get.newBuilder()
.namespace(operation.forNamespace().get())
.table(operation.forTable().get())
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.consistency(Consistency.LINEARIZABLE)
.build();

return storage.get(get).map(TransactionResult::new);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.api.ScanAll;
import com.scalar.db.api.ScanBuilder;
import com.scalar.db.api.ScanWithIndex;
import com.scalar.db.api.Scanner;
import com.scalar.db.api.Selection.Conjunction;
Expand Down Expand Up @@ -163,14 +164,16 @@ public void putIntoWriteSet(Key key, Put put) {
// merge the previous put in the write set and the new put
Put originalPut = writeSet.get(key);
PutBuilder.BuildableFromExisting putBuilder = Put.newBuilder(originalPut);
put.getColumns().values().forEach(putBuilder::value);
for (Column<?> value : put.getColumns().values()) {
putBuilder = putBuilder.value(value);
}

// If the implicit pre-read is enabled for the new put, it should also be enabled for the
// merged put. However, if the previous put is in insert mode, this doesn’t apply. This is
// because, in insert mode, the read set is not used during the preparation phase. Therefore,
// we only need to enable the implicit pre-read if the previous put is not in insert mode
if (isImplicitPreReadEnabled(put) && !isInsertModeEnabled(originalPut)) {
putBuilder.enableImplicitPreRead();
putBuilder = putBuilder.enableImplicitPreRead();
}

writeSet.put(key, putBuilder.build());
Expand Down Expand Up @@ -593,9 +596,16 @@ private void validateScanResults(
Scanner scanner = null;
try {
// Only get tx_id and primary key columns because we use only them to compare
scan.clearProjections();
scan.withProjection(Attribute.ID);
ScalarDbUtils.addProjectionsForKeys(scan, getTableMetadata(scan));
ScanBuilder.BuildableScanOrScanAllFromExisting builder =
Scan.newBuilder(scan).clearProjections().projection(Attribute.ID);
TableMetadata tableMetadata = getTableMetadata(scan);
for (String partitionKeyName : tableMetadata.getPartitionKeyNames()) {
builder = builder.projection(partitionKeyName);
}
for (String clusteringKeyName : tableMetadata.getClusteringKeyNames()) {
builder = builder.projection(clusteringKeyName);
}
scan = builder.build();

if (scan.getLimit() == 0) {
scanner = storage.scan(scan);
Expand Down Expand Up @@ -744,8 +754,7 @@ private void validateGetResult(
DistributedStorage storage, Get get, Optional<TransactionResult> originalResult)
throws ExecutionException, ValidationConflictException {
// Only get the tx_id column because we use only them to compare
get.clearProjections();
get.withProjection(Attribute.ID);
get = Get.newBuilder(get).clearProjections().projection(Attribute.ID).build();

// Check if a read record is not changed
Optional<TransactionResult> latestResult = storage.get(get).map(TransactionResult::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public Optional<Result> get(Get get) throws CrudException {
get = copyAndSetTargetToIfNot(get);

try {
return storage.get(get.withConsistency(Consistency.LINEARIZABLE));
return storage.get(Get.newBuilder(get).consistency(Consistency.LINEARIZABLE).build());
} catch (ExecutionException e) {
throw new CrudException(e.getMessage(), e, null);
}
Expand All @@ -171,7 +171,7 @@ public List<Result> scan(Scan scan) throws CrudException {
scan = copyAndSetTargetToIfNot(scan);

try (com.scalar.db.api.Scanner scanner =
storage.scan(scan.withConsistency(Consistency.LINEARIZABLE))) {
storage.scan(Scan.newBuilder(scan).consistency(Consistency.LINEARIZABLE).build())) {
return scanner.all();
} catch (ExecutionException | IOException e) {
throw new CrudException(e.getMessage(), e, null);
Expand Down Expand Up @@ -226,7 +226,7 @@ public void put(Put put) throws CrudException {
put = copyAndSetTargetToIfNot(put);

try {
storage.put(put.withConsistency(Consistency.LINEARIZABLE));
storage.put(Put.newBuilder(put).consistency(Consistency.LINEARIZABLE).build());
} catch (NoMutationException e) {
throwUnsatisfiedConditionException(put);
} catch (ExecutionException e) {
Expand Down Expand Up @@ -331,7 +331,7 @@ public void delete(Delete delete) throws CrudException {
delete = copyAndSetTargetToIfNot(delete);

try {
storage.delete(delete.withConsistency(Consistency.LINEARIZABLE));
storage.delete(Delete.newBuilder(delete).consistency(Consistency.LINEARIZABLE).build());
} catch (NoMutationException e) {
throwUnsatisfiedConditionException(delete);
} catch (ExecutionException e) {
Expand Down
Loading