Skip to content

Commit c9d11f1

Browse files
Backport to branch(3) : Handle Scan with Index correctly during validation logic in Consensus Commit (#2812)
Co-authored-by: Toshihiro Suzuki <[email protected]>
1 parent 66a18e4 commit c9d11f1

File tree

3 files changed

+655
-38
lines changed

3 files changed

+655
-38
lines changed

core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,15 @@ private void validateScanResults(
640640

641641
// Compare the records of the original scan results and the latest scan results
642642
if (!originalResultEntry.getKey().equals(key)) {
643+
if (writeSet.containsKey(originalResultEntry.getKey())
644+
|| deleteSet.containsKey(originalResultEntry.getKey())) {
645+
// The record is inserted/deleted/updated by this transaction
646+
647+
// Skip the record of the original scan results
648+
originalResultEntry = Iterators.getNext(originalResultIterator, null);
649+
continue;
650+
}
651+
643652
// The record is inserted/deleted by another transaction
644653
throwExceptionDueToAntiDependency();
645654
}
@@ -653,9 +662,21 @@ private void validateScanResults(
653662
originalResultEntry = Iterators.getNext(originalResultIterator, null);
654663
}
655664

656-
if (originalResultEntry != null) {
657-
// Some of the records of the scan results are deleted by another transaction
658-
throwExceptionDueToAntiDependency();
665+
while (originalResultEntry != null) {
666+
if (writeSet.containsKey(originalResultEntry.getKey())
667+
|| deleteSet.containsKey(originalResultEntry.getKey())) {
668+
// The record is inserted/deleted/updated by this transaction
669+
670+
// Skip the record of the original scan results
671+
originalResultEntry = Iterators.getNext(originalResultIterator, null);
672+
} else {
673+
// The record is inserted/deleted by another transaction
674+
throwExceptionDueToAntiDependency();
675+
}
676+
}
677+
678+
if (!latestResult.isPresent()) {
679+
return;
659680
}
660681

661682
if (scan.getLimit() != 0 && results.size() == scan.getLimit()) {

core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java

Lines changed: 131 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -244,14 +244,19 @@ private Scan prepareCrossPartitionScan(String namespace, String table) {
244244
}
245245

246246
private Put preparePut() {
247-
Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1);
248-
Key clusteringKey = new Key(ANY_NAME_2, ANY_TEXT_2);
249-
return new Put(partitionKey, clusteringKey)
250-
.withConsistency(Consistency.LINEARIZABLE)
251-
.forNamespace(ANY_NAMESPACE_NAME)
252-
.forTable(ANY_TABLE_NAME)
253-
.withValue(ANY_NAME_3, ANY_TEXT_3)
254-
.withValue(ANY_NAME_4, ANY_TEXT_4);
247+
return preparePut(ANY_TEXT_1, ANY_TEXT_2);
248+
}
249+
250+
private Put preparePut(String partitionKeyColumnValue, String clusteringKeyColumnValue) {
251+
return Put.newBuilder()
252+
.namespace(ANY_NAMESPACE_NAME)
253+
.table(ANY_TABLE_NAME)
254+
.partitionKey(Key.ofText(ANY_NAME_1, partitionKeyColumnValue))
255+
.clusteringKey(Key.ofText(ANY_NAME_2, clusteringKeyColumnValue))
256+
.textValue(ANY_NAME_3, ANY_TEXT_3)
257+
.textValue(ANY_NAME_4, ANY_TEXT_4)
258+
.consistency(Consistency.LINEARIZABLE)
259+
.build();
255260
}
256261

257262
private Put prepareAnotherPut() {
@@ -301,12 +306,17 @@ private Put preparePutForMergeTest() {
301306
}
302307

303308
private Delete prepareDelete() {
304-
Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1);
305-
Key clusteringKey = new Key(ANY_NAME_2, ANY_TEXT_2);
306-
return new Delete(partitionKey, clusteringKey)
307-
.withConsistency(Consistency.LINEARIZABLE)
308-
.forNamespace(ANY_NAMESPACE_NAME)
309-
.forTable(ANY_TABLE_NAME);
309+
return prepareDelete(ANY_TEXT_1, ANY_TEXT_2);
310+
}
311+
312+
private Delete prepareDelete(String partitionKeyColumnValue, String clusteringKeyColumnValue) {
313+
return Delete.newBuilder()
314+
.namespace(ANY_NAMESPACE_NAME)
315+
.table(ANY_TABLE_NAME)
316+
.partitionKey(Key.ofText(ANY_NAME_1, partitionKeyColumnValue))
317+
.clusteringKey(Key.ofText(ANY_NAME_2, clusteringKeyColumnValue))
318+
.consistency(Consistency.LINEARIZABLE)
319+
.build();
310320
}
311321

312322
private Delete prepareAnotherDelete() {
@@ -1086,7 +1096,7 @@ public void toSerializable_GetSetWithGetWithIndex_ShouldProcessWithoutExceptions
10861096

10871097
@Test
10881098
public void
1089-
toSerializable_GetSetWithGetWithIndex_RecordInsertedByMySelf_ShouldProcessWithoutExceptions()
1099+
toSerializable_GetSetWithGetWithIndex_RecordInsertedByMyself_ShouldProcessWithoutExceptions()
10901100
throws ExecutionException {
10911101
// Arrange
10921102
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
@@ -1160,7 +1170,7 @@ public void toSerializable_ScanSetUpdated_ShouldThrowValidationConflictException
11601170
}
11611171

11621172
@Test
1163-
public void toSerializable_ScanSetUpdatedByMySelf_ShouldProcessWithoutExceptions()
1173+
public void toSerializable_ScanSetUpdatedByMyself_ShouldProcessWithoutExceptions()
11641174
throws ExecutionException {
11651175
// Arrange
11661176
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
@@ -1237,7 +1247,7 @@ public void toSerializable_ScanSetExtended_ShouldThrowValidationConflictExceptio
12371247
}
12381248

12391249
@Test
1240-
public void toSerializable_ScanSetExtendedByMySelf_ShouldProcessWithoutExceptions()
1250+
public void toSerializable_ScanSetExtendedByMyself_ShouldProcessWithoutExceptions()
12411251
throws ExecutionException {
12421252
// Arrange
12431253
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
@@ -1261,7 +1271,7 @@ public void toSerializable_ScanSetExtendedByMySelf_ShouldProcessWithoutException
12611271

12621272
@Test
12631273
public void
1264-
toSerializable_ScanSetWithMultipleRecordsExtendedByMySelf_ShouldProcessWithoutExceptions()
1274+
toSerializable_ScanSetWithMultipleRecordsExtendedByMyself_ShouldProcessWithoutExceptions()
12651275
throws ExecutionException {
12661276
// Arrange
12671277
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
@@ -1526,7 +1536,7 @@ public void toSerializable_ScanWithLimitInScanSet_ShouldProcessWithoutExceptions
15261536

15271537
@Test
15281538
public void
1529-
toSerializable_ScanWithLimitInScanSet_WhenInsertingFirstRecordIntoScanRangeByMySelf_ShouldProcessWithoutExceptions()
1539+
toSerializable_ScanWithLimitInScanSet_WhenInsertingFirstRecordIntoScanRangeByMyself_ShouldProcessWithoutExceptions()
15301540
throws ExecutionException {
15311541
// Arrange
15321542
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
@@ -1589,7 +1599,7 @@ public void toSerializable_ScanWithLimitInScanSet_ShouldProcessWithoutExceptions
15891599

15901600
@Test
15911601
public void
1592-
toSerializable_ScanWithLimitInScanSet_WhenInsertingLastRecordIntoScanRangeByMySelf_ShouldProcessWithoutExceptions()
1602+
toSerializable_ScanWithLimitInScanSet_WhenInsertingLastRecordIntoScanRangeByMyself_ShouldProcessWithoutExceptions()
15931603
throws ExecutionException {
15941604
// Arrange
15951605
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
@@ -1619,6 +1629,107 @@ public void toSerializable_ScanWithLimitInScanSet_ShouldProcessWithoutExceptions
16191629
verify(storage).scan(scanWithProjectionsWithoutLimit);
16201630
}
16211631

1632+
@Test
1633+
public void
1634+
toSerializable_ScanWithIndexInScanSet_WhenUpdatingRecords_ShouldThrowValidationConflictException()
1635+
throws ExecutionException {
1636+
// Arrange
1637+
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
1638+
Scan scan = prepareScanWithIndex();
1639+
TransactionResult result1 = prepareResult(ANY_ID + "x", ANY_TEXT_1, ANY_TEXT_1);
1640+
TransactionResult result2 = prepareResult(ANY_ID + "x", ANY_TEXT_2, ANY_TEXT_1);
1641+
TransactionResult result3 = prepareResult(ANY_ID + "x", ANY_TEXT_3, ANY_TEXT_1);
1642+
Snapshot.Key key1 = new Snapshot.Key(scan, result1);
1643+
Snapshot.Key key2 = new Snapshot.Key(scan, result2);
1644+
Snapshot.Key key3 = new Snapshot.Key(scan, result3);
1645+
snapshot.putIntoScanSet(
1646+
scan, Maps.newLinkedHashMap(ImmutableMap.of(key1, result1, key2, result2, key3, result3)));
1647+
1648+
// Simulate that the first and third records were updated by another transaction
1649+
Scanner scanner = mock(Scanner.class);
1650+
when(scanner.one()).thenReturn(Optional.of(result2)).thenReturn(Optional.empty());
1651+
1652+
DistributedStorage storage = mock(DistributedStorage.class);
1653+
Scan scanWithProjections =
1654+
Scan.newBuilder(scan).projections(Attribute.ID, ANY_NAME_1, ANY_NAME_2).limit(0).build();
1655+
when(storage.scan(scanWithProjections)).thenReturn(scanner);
1656+
1657+
// Act Assert
1658+
assertThatThrownBy(() -> snapshot.toSerializable(storage))
1659+
.isInstanceOf(ValidationConflictException.class);
1660+
1661+
// Assert
1662+
verify(storage).scan(scanWithProjections);
1663+
}
1664+
1665+
@Test
1666+
public void
1667+
toSerializable_ScanWithIndexInScanSet_WhenUpdatingRecordsByMyself_ShouldProcessWithoutExceptions()
1668+
throws ExecutionException {
1669+
// Arrange
1670+
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
1671+
Scan scan = prepareScanWithIndex();
1672+
TransactionResult result1 = prepareResult(ANY_ID + "x", ANY_TEXT_1, ANY_TEXT_1);
1673+
TransactionResult result2 = prepareResult(ANY_ID + "x", ANY_TEXT_2, ANY_TEXT_1);
1674+
TransactionResult result3 = prepareResult(ANY_ID + "x", ANY_TEXT_3, ANY_TEXT_1);
1675+
Snapshot.Key key1 = new Snapshot.Key(scan, result1);
1676+
Snapshot.Key key2 = new Snapshot.Key(scan, result2);
1677+
Snapshot.Key key3 = new Snapshot.Key(scan, result3);
1678+
snapshot.putIntoScanSet(
1679+
scan, Maps.newLinkedHashMap(ImmutableMap.of(key1, result1, key2, result2, key3, result3)));
1680+
1681+
// Simulate that the first and third records were updated by myself
1682+
snapshot.putIntoWriteSet(key1, preparePut(ANY_TEXT_1, ANY_TEXT_1));
1683+
snapshot.putIntoWriteSet(key3, preparePut(ANY_TEXT_3, ANY_TEXT_1));
1684+
Scanner scanner = mock(Scanner.class);
1685+
when(scanner.one()).thenReturn(Optional.of(result2)).thenReturn(Optional.empty());
1686+
1687+
DistributedStorage storage = mock(DistributedStorage.class);
1688+
Scan scanWithProjections =
1689+
Scan.newBuilder(scan).projections(Attribute.ID, ANY_NAME_1, ANY_NAME_2).limit(0).build();
1690+
when(storage.scan(scanWithProjections)).thenReturn(scanner);
1691+
1692+
// Act Assert
1693+
assertThatCode(() -> snapshot.toSerializable(storage)).doesNotThrowAnyException();
1694+
1695+
// Assert
1696+
verify(storage).scan(scanWithProjections);
1697+
}
1698+
1699+
@Test
1700+
public void
1701+
toSerializable_ScanWithIndexInScanSet_WhenDeletingRecordsByMyself_ShouldProcessWithoutExceptions()
1702+
throws ExecutionException {
1703+
// Arrange
1704+
snapshot = prepareSnapshot(Isolation.SERIALIZABLE);
1705+
Scan scan = prepareScanWithIndex();
1706+
TransactionResult result1 = prepareResult(ANY_ID + "x", ANY_TEXT_1, ANY_TEXT_1);
1707+
TransactionResult result2 = prepareResult(ANY_ID + "x", ANY_TEXT_2, ANY_TEXT_1);
1708+
TransactionResult result3 = prepareResult(ANY_ID + "x", ANY_TEXT_3, ANY_TEXT_1);
1709+
Snapshot.Key key1 = new Snapshot.Key(scan, result1);
1710+
Snapshot.Key key2 = new Snapshot.Key(scan, result2);
1711+
Snapshot.Key key3 = new Snapshot.Key(scan, result3);
1712+
snapshot.putIntoScanSet(
1713+
scan, Maps.newLinkedHashMap(ImmutableMap.of(key1, result1, key2, result2, key3, result3)));
1714+
1715+
// Simulate that the first and third records were deleted by myself
1716+
snapshot.putIntoDeleteSet(key1, prepareDelete(ANY_TEXT_1, ANY_TEXT_1));
1717+
snapshot.putIntoDeleteSet(key3, prepareDelete(ANY_TEXT_3, ANY_TEXT_1));
1718+
Scanner scanner = mock(Scanner.class);
1719+
when(scanner.one()).thenReturn(Optional.of(result2)).thenReturn(Optional.empty());
1720+
1721+
DistributedStorage storage = mock(DistributedStorage.class);
1722+
Scan scanWithProjections =
1723+
Scan.newBuilder(scan).projections(Attribute.ID, ANY_NAME_1, ANY_NAME_2).limit(0).build();
1724+
when(storage.scan(scanWithProjections)).thenReturn(scanner);
1725+
1726+
// Act Assert
1727+
assertThatCode(() -> snapshot.toSerializable(storage)).doesNotThrowAnyException();
1728+
1729+
// Assert
1730+
verify(storage).scan(scanWithProjections);
1731+
}
1732+
16221733
@Test
16231734
public void toSerializable_ScannerSetNotChanged_ShouldProcessWithoutExceptions()
16241735
throws ExecutionException {

0 commit comments

Comments
 (0)