Skip to content

Commit 66a18e4

Browse files
Backport to branch(3) : Optimize mutations based on the storage’s mutation atomicity unit in Consensus Commit (#2810)
Co-authored-by: Toshihiro Suzuki <[email protected]>
1 parent 8c31fe5 commit 66a18e4

File tree

13 files changed

+645
-353
lines changed

13 files changed

+645
-353
lines changed

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
import static com.google.common.base.Preconditions.checkNotNull;
44

5-
import com.google.common.collect.ImmutableList;
65
import com.google.errorprone.annotations.concurrent.LazyInit;
76
import com.scalar.db.api.DistributedStorage;
7+
import com.scalar.db.api.Mutation;
88
import com.scalar.db.api.TransactionState;
99
import com.scalar.db.common.CoreError;
1010
import com.scalar.db.exception.storage.ExecutionException;
@@ -36,6 +36,7 @@ public class CommitHandler {
3636
protected final Coordinator coordinator;
3737
private final TransactionTableMetadataManager tableMetadataManager;
3838
private final ParallelExecutor parallelExecutor;
39+
private final MutationsGrouper mutationsGrouper;
3940
protected final boolean coordinatorWriteOmissionOnReadOnlyEnabled;
4041

4142
@LazyInit @Nullable private BeforePreparationSnapshotHook beforePreparationSnapshotHook;
@@ -46,11 +47,13 @@ public CommitHandler(
4647
Coordinator coordinator,
4748
TransactionTableMetadataManager tableMetadataManager,
4849
ParallelExecutor parallelExecutor,
50+
MutationsGrouper mutationsGrouper,
4951
boolean coordinatorWriteOmissionOnReadOnlyEnabled) {
5052
this.storage = checkNotNull(storage);
5153
this.coordinator = checkNotNull(coordinator);
5254
this.tableMetadataManager = checkNotNull(tableMetadataManager);
5355
this.parallelExecutor = checkNotNull(parallelExecutor);
56+
this.mutationsGrouper = checkNotNull(mutationsGrouper);
5457
this.coordinatorWriteOmissionOnReadOnlyEnabled = coordinatorWriteOmissionOnReadOnlyEnabled;
5558
}
5659

@@ -199,12 +202,11 @@ public void prepareRecords(Snapshot snapshot) throws PreparationException {
199202
PrepareMutationComposer composer =
200203
new PrepareMutationComposer(snapshot.getId(), tableMetadataManager);
201204
snapshot.to(composer);
202-
PartitionedMutations mutations = new PartitionedMutations(composer.get());
205+
List<List<Mutation>> groupedMutations = mutationsGrouper.groupMutations(composer.get());
203206

204-
ImmutableList<PartitionedMutations.Key> orderedKeys = mutations.getOrderedKeys();
205-
List<ParallelExecutorTask> tasks = new ArrayList<>(orderedKeys.size());
206-
for (PartitionedMutations.Key key : orderedKeys) {
207-
tasks.add(() -> storage.mutate(mutations.get(key)));
207+
List<ParallelExecutorTask> tasks = new ArrayList<>(groupedMutations.size());
208+
for (List<Mutation> mutations : groupedMutations) {
209+
tasks.add(() -> storage.mutate(mutations));
208210
}
209211
parallelExecutor.prepareRecords(tasks, snapshot.getId());
210212
} catch (NoMutationException e) {
@@ -252,12 +254,11 @@ public void commitRecords(Snapshot snapshot) {
252254
CommitMutationComposer composer =
253255
new CommitMutationComposer(snapshot.getId(), tableMetadataManager);
254256
snapshot.to(composer);
255-
PartitionedMutations mutations = new PartitionedMutations(composer.get());
257+
List<List<Mutation>> groupedMutations = mutationsGrouper.groupMutations(composer.get());
256258

257-
ImmutableList<PartitionedMutations.Key> orderedKeys = mutations.getOrderedKeys();
258-
List<ParallelExecutorTask> tasks = new ArrayList<>(orderedKeys.size());
259-
for (PartitionedMutations.Key key : orderedKeys) {
260-
tasks.add(() -> storage.mutate(mutations.get(key)));
259+
List<ParallelExecutorTask> tasks = new ArrayList<>(groupedMutations.size());
260+
for (List<Mutation> mutations : groupedMutations) {
261+
tasks.add(() -> storage.mutate(mutations));
261262
}
262263
parallelExecutor.commitRecords(tasks, snapshot.getId());
263264
} catch (Exception e) {
@@ -300,12 +301,11 @@ public void rollbackRecords(Snapshot snapshot) {
300301
RollbackMutationComposer composer =
301302
new RollbackMutationComposer(snapshot.getId(), storage, tableMetadataManager);
302303
snapshot.to(composer);
303-
PartitionedMutations mutations = new PartitionedMutations(composer.get());
304+
List<List<Mutation>> groupedMutations = mutationsGrouper.groupMutations(composer.get());
304305

305-
ImmutableList<PartitionedMutations.Key> orderedKeys = mutations.getOrderedKeys();
306-
List<ParallelExecutorTask> tasks = new ArrayList<>(orderedKeys.size());
307-
for (PartitionedMutations.Key key : orderedKeys) {
308-
tasks.add(() -> storage.mutate(mutations.get(key)));
306+
List<ParallelExecutorTask> tasks = new ArrayList<>(groupedMutations.size());
307+
for (List<Mutation> mutations : groupedMutations) {
308+
tasks.add(() -> storage.mutate(mutations));
309309
}
310310
parallelExecutor.rollbackRecords(tasks, snapshot.getId());
311311
} catch (Exception e) {

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,16 @@ public CommitHandlerWithGroupCommit(
2929
Coordinator coordinator,
3030
TransactionTableMetadataManager tableMetadataManager,
3131
ParallelExecutor parallelExecutor,
32+
MutationsGrouper mutationsGrouper,
3233
boolean coordinatorWriteOmissionOnReadOnlyEnabled,
3334
CoordinatorGroupCommitter groupCommitter) {
3435
super(
3536
storage,
3637
coordinator,
3738
tableMetadataManager,
3839
parallelExecutor,
40+
mutationsGrouper,
3941
coordinatorWriteOmissionOnReadOnlyEnabled);
40-
4142
checkNotNull(groupCommitter);
4243
// The methods of this emitter will be called via GroupCommitter.ready().
4344
groupCommitter.setEmitter(new Emitter(coordinator));

core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.scalar.db.common.AbstractDistributedTransactionManager;
2424
import com.scalar.db.common.AbstractTransactionManagerCrudOperableScanner;
2525
import com.scalar.db.common.ReadOnlyDistributedTransaction;
26+
import com.scalar.db.common.StorageInfoProvider;
2627
import com.scalar.db.config.DatabaseConfig;
2728
import com.scalar.db.exception.transaction.CommitConflictException;
2829
import com.scalar.db.exception.transaction.CrudConflictException;
@@ -133,12 +134,14 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
133134

134135
// `groupCommitter` must be set before calling this method.
135136
private CommitHandler createCommitHandler(ConsensusCommitConfig config) {
137+
MutationsGrouper mutationsGrouper = new MutationsGrouper(new StorageInfoProvider(admin));
136138
if (isGroupCommitEnabled()) {
137139
return new CommitHandlerWithGroupCommit(
138140
storage,
139141
coordinator,
140142
tableMetadataManager,
141143
parallelExecutor,
144+
mutationsGrouper,
142145
config.isCoordinatorWriteOmissionOnReadOnlyEnabled(),
143146
groupCommitter);
144147
} else {
@@ -147,6 +150,7 @@ private CommitHandler createCommitHandler(ConsensusCommitConfig config) {
147150
coordinator,
148151
tableMetadataManager,
149152
parallelExecutor,
153+
mutationsGrouper,
150154
config.isCoordinatorWriteOmissionOnReadOnlyEnabled());
151155
}
152156
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package com.scalar.db.transaction.consensuscommit;
2+
3+
import com.scalar.db.api.Mutation;
4+
import com.scalar.db.api.StorageInfo;
5+
import com.scalar.db.common.StorageInfoProvider;
6+
import com.scalar.db.exception.storage.ExecutionException;
7+
import com.scalar.db.io.Key;
8+
import java.util.ArrayList;
9+
import java.util.Collection;
10+
import java.util.LinkedHashMap;
11+
import java.util.List;
12+
import java.util.Map;
13+
import java.util.Objects;
14+
import java.util.Optional;
15+
import java.util.stream.Collectors;
16+
import javax.annotation.Nullable;
17+
import javax.annotation.concurrent.ThreadSafe;
18+
19+
@ThreadSafe
20+
public class MutationsGrouper {
21+
22+
private final StorageInfoProvider storageInfoProvider;
23+
24+
public MutationsGrouper(StorageInfoProvider storageInfoProvider) {
25+
this.storageInfoProvider = storageInfoProvider;
26+
}
27+
28+
public List<List<Mutation>> groupMutations(Collection<Mutation> mutations)
29+
throws ExecutionException {
30+
// MutationGroup mutations by their storage info and atomicity unit
31+
Map<MutationGroup, List<List<Mutation>>> groupToBatches = new LinkedHashMap<>();
32+
33+
for (Mutation mutation : mutations) {
34+
assert mutation.forNamespace().isPresent();
35+
StorageInfo storageInfo = storageInfoProvider.getStorageInfo(mutation.forNamespace().get());
36+
37+
MutationGroup group = new MutationGroup(mutation, storageInfo);
38+
List<List<Mutation>> batches = groupToBatches.computeIfAbsent(group, g -> new ArrayList<>());
39+
int maxCount = group.storageInfo.getMaxAtomicMutationsCount();
40+
41+
if (batches.isEmpty() || batches.get(batches.size() - 1).size() >= maxCount) {
42+
// If the last batch is full or there are no batches yet, create a new batch
43+
batches.add(new ArrayList<>());
44+
}
45+
46+
batches.get(batches.size() - 1).add(mutation);
47+
}
48+
49+
// Flatten the grouped mutations into a single list of batches
50+
return groupToBatches.values().stream().flatMap(List::stream).collect(Collectors.toList());
51+
}
52+
53+
private static class MutationGroup {
54+
public final StorageInfo storageInfo;
55+
@Nullable public final String namespace;
56+
@Nullable public final String table;
57+
@Nullable public final Key partitionKey;
58+
@Nullable public final Optional<Key> clusteringKey;
59+
60+
private MutationGroup(Mutation mutation, StorageInfo storageInfo) {
61+
assert mutation.forNamespace().isPresent() && mutation.forTable().isPresent();
62+
63+
switch (storageInfo.getMutationAtomicityUnit()) {
64+
case RECORD:
65+
this.clusteringKey = mutation.getClusteringKey();
66+
this.partitionKey = mutation.getPartitionKey();
67+
this.table = mutation.forTable().get();
68+
this.namespace = mutation.forNamespace().get();
69+
this.storageInfo = storageInfo;
70+
break;
71+
case PARTITION:
72+
this.clusteringKey = null;
73+
this.partitionKey = mutation.getPartitionKey();
74+
this.table = mutation.forTable().get();
75+
this.namespace = mutation.forNamespace().get();
76+
this.storageInfo = storageInfo;
77+
break;
78+
case TABLE:
79+
this.clusteringKey = null;
80+
this.partitionKey = null;
81+
this.table = mutation.forTable().get();
82+
this.namespace = mutation.forNamespace().get();
83+
this.storageInfo = storageInfo;
84+
break;
85+
case NAMESPACE:
86+
this.clusteringKey = null;
87+
this.partitionKey = null;
88+
this.table = null;
89+
this.namespace = mutation.forNamespace().get();
90+
this.storageInfo = storageInfo;
91+
break;
92+
case STORAGE:
93+
this.clusteringKey = null;
94+
this.partitionKey = null;
95+
this.table = null;
96+
this.namespace = null;
97+
this.storageInfo = storageInfo;
98+
break;
99+
default:
100+
throw new AssertionError(
101+
"Unknown mutation atomicity unit: " + storageInfo.getMutationAtomicityUnit());
102+
}
103+
}
104+
105+
@Override
106+
public boolean equals(Object o) {
107+
if (this == o) {
108+
return true;
109+
}
110+
if (!(o instanceof MutationGroup)) {
111+
return false;
112+
}
113+
MutationGroup that = (MutationGroup) o;
114+
return Objects.equals(storageInfo.getStorageName(), that.storageInfo.getStorageName())
115+
&& Objects.equals(namespace, that.namespace)
116+
&& Objects.equals(table, that.table)
117+
&& Objects.equals(partitionKey, that.partitionKey)
118+
&& Objects.equals(clusteringKey, that.clusteringKey);
119+
}
120+
121+
@Override
122+
public int hashCode() {
123+
return Objects.hash(
124+
storageInfo.getStorageName(), namespace, table, partitionKey, clusteringKey);
125+
}
126+
}
127+
}

core/src/main/java/com/scalar/db/transaction/consensuscommit/PartitionedMutations.java

Lines changed: 0 additions & 97 deletions
This file was deleted.

core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.scalar.db.common.AbstractTransactionManagerCrudOperableScanner;
2222
import com.scalar.db.common.AbstractTwoPhaseCommitTransactionManager;
2323
import com.scalar.db.common.CoreError;
24+
import com.scalar.db.common.StorageInfoProvider;
2425
import com.scalar.db.config.DatabaseConfig;
2526
import com.scalar.db.exception.transaction.CommitConflictException;
2627
import com.scalar.db.exception.transaction.CrudConflictException;
@@ -81,6 +82,7 @@ public TwoPhaseConsensusCommitManager(
8182
coordinator,
8283
tableMetadataManager,
8384
parallelExecutor,
85+
new MutationsGrouper(new StorageInfoProvider(admin)),
8486
config.isCoordinatorWriteOmissionOnReadOnlyEnabled());
8587
isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
8688
mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager);
@@ -105,6 +107,7 @@ public TwoPhaseConsensusCommitManager(DatabaseConfig databaseConfig) {
105107
coordinator,
106108
tableMetadataManager,
107109
parallelExecutor,
110+
new MutationsGrouper(new StorageInfoProvider(admin)),
108111
config.isCoordinatorWriteOmissionOnReadOnlyEnabled());
109112
isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
110113
mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager);

0 commit comments

Comments
 (0)