Skip to content

Commit d3223e6

Browse files
Backport to branch(3) : Support one-phase commit optimization in Consensus Commit (#2813)
Co-authored-by: Toshihiro Suzuki <[email protected]>
1 parent c9d11f1 commit d3223e6

16 files changed

+1584
-77
lines changed

core/src/main/java/com/scalar/db/common/CoreError.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,8 @@ public enum CoreError implements ScalarDbError {
783783
"A transaction conflict occurred in the Insert operation",
784784
"",
785785
""),
786+
CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_COMMITTING_RECORDS(
787+
Category.CONCURRENCY_ERROR, "0026", "A conflict occurred when committing records", "", ""),
786788

787789
//
788790
// Errors for the internal error category
@@ -935,6 +937,8 @@ public enum CoreError implements ScalarDbError {
935937
""),
936938
CONSENSUS_COMMIT_RECOVERING_RECORDS_FAILED(
937939
Category.INTERNAL_ERROR, "0057", "Recovering records failed. Details: %s", "", ""),
940+
CONSENSUS_COMMIT_COMMITTING_RECORDS_FAILED(
941+
Category.INTERNAL_ERROR, "0058", "Committing records failed", "", ""),
938942

939943
//
940944
// Errors for the unknown transaction status error category

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
import static com.google.common.base.Preconditions.checkNotNull;
44

5+
import com.google.common.annotations.VisibleForTesting;
56
import com.google.errorprone.annotations.concurrent.LazyInit;
7+
import com.scalar.db.api.Delete;
68
import com.scalar.db.api.DistributedStorage;
79
import com.scalar.db.api.Mutation;
810
import com.scalar.db.api.TransactionState;
@@ -24,6 +26,8 @@
2426
import java.util.List;
2527
import java.util.Optional;
2628
import java.util.concurrent.Future;
29+
import java.util.stream.Collectors;
30+
import java.util.stream.Stream;
2731
import javax.annotation.Nullable;
2832
import javax.annotation.concurrent.ThreadSafe;
2933
import org.slf4j.Logger;
@@ -38,6 +42,7 @@ public class CommitHandler {
3842
private final ParallelExecutor parallelExecutor;
3943
private final MutationsGrouper mutationsGrouper;
4044
protected final boolean coordinatorWriteOmissionOnReadOnlyEnabled;
45+
private final boolean onePhaseCommitEnabled;
4146

4247
@LazyInit @Nullable private BeforePreparationSnapshotHook beforePreparationSnapshotHook;
4348

@@ -48,13 +53,15 @@ public CommitHandler(
4853
TransactionTableMetadataManager tableMetadataManager,
4954
ParallelExecutor parallelExecutor,
5055
MutationsGrouper mutationsGrouper,
51-
boolean coordinatorWriteOmissionOnReadOnlyEnabled) {
56+
boolean coordinatorWriteOmissionOnReadOnlyEnabled,
57+
boolean onePhaseCommitEnabled) {
5258
this.storage = checkNotNull(storage);
5359
this.coordinator = checkNotNull(coordinator);
5460
this.tableMetadataManager = checkNotNull(tableMetadataManager);
5561
this.parallelExecutor = checkNotNull(parallelExecutor);
5662
this.mutationsGrouper = checkNotNull(mutationsGrouper);
5763
this.coordinatorWriteOmissionOnReadOnlyEnabled = coordinatorWriteOmissionOnReadOnlyEnabled;
64+
this.onePhaseCommitEnabled = onePhaseCommitEnabled;
5865
}
5966

6067
/**
@@ -118,6 +125,16 @@ public void commit(Snapshot snapshot, boolean readOnly)
118125

119126
Optional<Future<Void>> snapshotHookFuture = invokeBeforePreparationSnapshotHook(snapshot);
120127

128+
if (canOnePhaseCommit(snapshot)) {
129+
try {
130+
onePhaseCommitRecords(snapshot);
131+
return;
132+
} catch (Exception e) {
133+
safelyCallOnFailureBeforeCommit(snapshot);
134+
throw e;
135+
}
136+
}
137+
121138
if (hasWritesOrDeletesInSnapshot) {
122139
try {
123140
prepareRecords(snapshot);
@@ -170,6 +187,52 @@ public void commit(Snapshot snapshot, boolean readOnly)
170187
}
171188
}
172189

190+
@VisibleForTesting
191+
boolean canOnePhaseCommit(Snapshot snapshot) throws CommitException {
192+
if (!onePhaseCommitEnabled) {
193+
return false;
194+
}
195+
196+
// If validation is required (in SERIALIZABLE isolation), we cannot one-phase commit the
197+
// transaction
198+
if (snapshot.isValidationRequired()) {
199+
return false;
200+
}
201+
202+
// If the snapshot has no write and deletes, we do not one-phase commit the transaction
203+
if (!snapshot.hasWritesOrDeletes()) {
204+
return false;
205+
}
206+
207+
List<Delete> deletesInDeleteSet = snapshot.getDeletesInDeleteSet();
208+
209+
// If a record corresponding to a delete in the delete set does not exist in the storage, we
210+
// cannot one-phase commit the transaction. This is because the storage does not support
211+
// delete-if-not-exists semantics, so we cannot detect conflicts with other transactions.
212+
for (Delete delete : deletesInDeleteSet) {
213+
Optional<TransactionResult> result = snapshot.getFromReadSet(new Snapshot.Key(delete));
214+
215+
// For deletes, we always perform implicit pre-reads if the result does not exit in the read
216+
// set. So the result should always exist in the read set.
217+
assert result != null;
218+
219+
if (!result.isPresent()) {
220+
return false;
221+
}
222+
}
223+
224+
try {
225+
// If the mutations can be grouped altogether, the mutations can be done in a single mutate
226+
// API call, so we can one-phase commit the transaction
227+
return mutationsGrouper.canBeGroupedAltogether(
228+
Stream.concat(snapshot.getPutsInWriteSet().stream(), deletesInDeleteSet.stream())
229+
.collect(Collectors.toList()));
230+
} catch (ExecutionException e) {
231+
throw new CommitException(
232+
CoreError.CONSENSUS_COMMIT_COMMITTING_RECORDS_FAILED.buildMessage(), e, snapshot.getId());
233+
}
234+
}
235+
173236
protected void handleCommitConflict(Snapshot snapshot, Exception cause)
174237
throws CommitConflictException, UnknownTransactionStatusException {
175238
try {
@@ -197,6 +260,30 @@ protected void handleCommitConflict(Snapshot snapshot, Exception cause)
197260
}
198261
}
199262

263+
@VisibleForTesting
264+
void onePhaseCommitRecords(Snapshot snapshot) throws CommitException {
265+
try {
266+
OnePhaseCommitMutationComposer composer =
267+
new OnePhaseCommitMutationComposer(snapshot.getId(), tableMetadataManager);
268+
snapshot.to(composer);
269+
270+
// One-phase commit does not require grouping mutations and using the parallel executor since
271+
// it is always executed in a single mutate API call.
272+
storage.mutate(composer.get());
273+
} catch (NoMutationException e) {
274+
throw new CommitConflictException(
275+
CoreError.CONSENSUS_COMMIT_PREPARING_RECORD_EXISTS.buildMessage(), e, snapshot.getId());
276+
} catch (RetriableExecutionException e) {
277+
throw new CommitConflictException(
278+
CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_COMMITTING_RECORDS.buildMessage(),
279+
e,
280+
snapshot.getId());
281+
} catch (ExecutionException e) {
282+
throw new CommitException(
283+
CoreError.CONSENSUS_COMMIT_COMMITTING_RECORDS_FAILED.buildMessage(), e, snapshot.getId());
284+
}
285+
}
286+
200287
public void prepareRecords(Snapshot snapshot) throws PreparationException {
201288
try {
202289
PrepareMutationComposer composer =

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,16 @@ public CommitHandlerWithGroupCommit(
3131
ParallelExecutor parallelExecutor,
3232
MutationsGrouper mutationsGrouper,
3333
boolean coordinatorWriteOmissionOnReadOnlyEnabled,
34+
boolean onePhaseCommitEnabled,
3435
CoordinatorGroupCommitter groupCommitter) {
3536
super(
3637
storage,
3738
coordinator,
3839
tableMetadataManager,
3940
parallelExecutor,
4041
mutationsGrouper,
41-
coordinatorWriteOmissionOnReadOnlyEnabled);
42+
coordinatorWriteOmissionOnReadOnlyEnabled,
43+
onePhaseCommitEnabled);
4244
checkNotNull(groupCommitter);
4345
// The methods of this emitter will be called via GroupCommitter.ready().
4446
groupCommitter.setEmitter(new Emitter(coordinator));

core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitConfig.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class ConsensusCommitConfig {
3535

3636
public static final String COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED =
3737
PREFIX + "coordinator.write_omission_on_read_only.enabled";
38+
public static final String ONE_PHASE_COMMIT_ENABLED = PREFIX + "one_phase_commit.enabled";
3839
public static final String PARALLEL_IMPLICIT_PRE_READ =
3940
PREFIX + "parallel_implicit_pre_read.enabled";
4041
public static final String INCLUDE_METADATA_ENABLED = PREFIX + "include_metadata.enabled";
@@ -75,8 +76,9 @@ public class ConsensusCommitConfig {
7576
private final boolean asyncRollbackEnabled;
7677

7778
private final boolean coordinatorWriteOmissionOnReadOnlyEnabled;
79+
private final boolean onePhaseCommitEnabled;
7880
private final boolean parallelImplicitPreReadEnabled;
79-
private final boolean isIncludeMetadataEnabled;
81+
private final boolean includeMetadataEnabled;
8082

8183
private final boolean coordinatorGroupCommitEnabled;
8284
private final int coordinatorGroupCommitSlotCapacity;
@@ -145,10 +147,12 @@ public ConsensusCommitConfig(DatabaseConfig databaseConfig) {
145147
coordinatorWriteOmissionOnReadOnlyEnabled =
146148
getBoolean(properties, COORDINATOR_WRITE_OMISSION_ON_READ_ONLY_ENABLED, true);
147149

148-
isIncludeMetadataEnabled = getBoolean(properties, INCLUDE_METADATA_ENABLED, false);
150+
onePhaseCommitEnabled = getBoolean(properties, ONE_PHASE_COMMIT_ENABLED, false);
149151

150152
parallelImplicitPreReadEnabled = getBoolean(properties, PARALLEL_IMPLICIT_PRE_READ, true);
151153

154+
includeMetadataEnabled = getBoolean(properties, INCLUDE_METADATA_ENABLED, false);
155+
152156
coordinatorGroupCommitEnabled = getBoolean(properties, COORDINATOR_GROUP_COMMIT_ENABLED, false);
153157
coordinatorGroupCommitSlotCapacity =
154158
getInt(
@@ -219,12 +223,16 @@ public boolean isCoordinatorWriteOmissionOnReadOnlyEnabled() {
219223
return coordinatorWriteOmissionOnReadOnlyEnabled;
220224
}
221225

226+
public boolean isOnePhaseCommitEnabled() {
227+
return onePhaseCommitEnabled;
228+
}
229+
222230
public boolean isParallelImplicitPreReadEnabled() {
223231
return parallelImplicitPreReadEnabled;
224232
}
225233

226234
public boolean isIncludeMetadataEnabled() {
227-
return isIncludeMetadataEnabled;
235+
return includeMetadataEnabled;
228236
}
229237

230238
public boolean isCoordinatorGroupCommitEnabled() {

core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ private CommitHandler createCommitHandler(ConsensusCommitConfig config) {
143143
parallelExecutor,
144144
mutationsGrouper,
145145
config.isCoordinatorWriteOmissionOnReadOnlyEnabled(),
146+
config.isOnePhaseCommitEnabled(),
146147
groupCommitter);
147148
} else {
148149
return new CommitHandler(
@@ -151,7 +152,8 @@ private CommitHandler createCommitHandler(ConsensusCommitConfig config) {
151152
tableMetadataManager,
152153
parallelExecutor,
153154
mutationsGrouper,
154-
config.isCoordinatorWriteOmissionOnReadOnlyEnabled());
155+
config.isCoordinatorWriteOmissionOnReadOnlyEnabled(),
156+
config.isOnePhaseCommitEnabled());
155157
}
156158
}
157159

core/src/main/java/com/scalar/db/transaction/consensuscommit/MutationsGrouper.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.scalar.db.io.Key;
88
import java.util.ArrayList;
99
import java.util.Collection;
10+
import java.util.Iterator;
1011
import java.util.LinkedHashMap;
1112
import java.util.List;
1213
import java.util.Map;
@@ -50,6 +51,41 @@ public List<List<Mutation>> groupMutations(Collection<Mutation> mutations)
5051
return groupToBatches.values().stream().flatMap(List::stream).collect(Collectors.toList());
5152
}
5253

54+
public boolean canBeGroupedAltogether(Collection<Mutation> mutations) throws ExecutionException {
55+
if (mutations.size() <= 1) {
56+
return true;
57+
}
58+
59+
Iterator<Mutation> iterator = mutations.iterator();
60+
Mutation firstMutation = iterator.next();
61+
assert firstMutation.forNamespace().isPresent();
62+
StorageInfo storageInfo =
63+
storageInfoProvider.getStorageInfo(firstMutation.forNamespace().get());
64+
MutationGroup firstGroup = new MutationGroup(firstMutation, storageInfo);
65+
66+
int maxCount = firstGroup.storageInfo.getMaxAtomicMutationsCount();
67+
int mutationCount = 1;
68+
69+
while (iterator.hasNext()) {
70+
Mutation otherMutation = iterator.next();
71+
assert otherMutation.forNamespace().isPresent();
72+
StorageInfo otherStorageInfo =
73+
storageInfoProvider.getStorageInfo(otherMutation.forNamespace().get());
74+
MutationGroup otherGroup = new MutationGroup(otherMutation, otherStorageInfo);
75+
76+
if (!firstGroup.equals(otherGroup)) {
77+
return false; // Found a mutation that does not belong to the first group
78+
}
79+
80+
mutationCount++;
81+
if (mutationCount > maxCount) {
82+
return false; // Exceeds the maximum allowed count for this group
83+
}
84+
}
85+
86+
return true; // All mutations belong to the same group and within the count limit
87+
}
88+
5389
private static class MutationGroup {
5490
public final StorageInfo storageInfo;
5591
@Nullable public final String namespace;

0 commit comments

Comments
 (0)