Skip to content

Commit dc3ea3a

Browse files
committed
[FLINK-38415][checkpoint] Disable auto compaction to prevent Index OutOfBounds
1 parent ba6c5d8 commit dc3ea3a

File tree

4 files changed

+388
-3
lines changed

4 files changed

+388
-3
lines changed

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,11 @@ public void setCurrentKeyAndKeyGroup(K newKey, int newKeyGroupIndex) {
528528
sharedRocksKeyBuilder.setKeyAndKeyGroup(getCurrentKey(), getCurrentKeyGroupIndex());
529529
}
530530

531+
@VisibleForTesting
532+
LinkedHashMap<String, RocksDbKvStateInfo> getKvStateInformation() {
533+
return kvStateInformation;
534+
}
535+
531536
/** Should only be called by one thread, and only after all accesses to the DB happened. */
532537
@Override
533538
public void dispose() {

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RestoredDBInstance.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,14 @@ public static RestoredDBInstance restoreTempDBInstanceFromLocalState(
100100
Long writeBufferManagerCapacity)
101101
throws Exception {
102102

103+
Function<String, ColumnFamilyOptions> tempDBCfFactory =
104+
stateName ->
105+
columnFamilyOptionsFactory.apply(stateName).setDisableAutoCompactions(true);
106+
103107
List<ColumnFamilyDescriptor> columnFamilyDescriptors =
104108
createColumnFamilyDescriptors(
105109
stateMetaInfoSnapshots,
106-
columnFamilyOptionsFactory,
110+
tempDBCfFactory,
107111
ttlCompactFiltersManager,
108112
writeBufferManagerCapacity,
109113
false);
@@ -118,8 +122,7 @@ public static RestoredDBInstance restoreTempDBInstanceFromLocalState(
118122
restoreSourcePath.toString(),
119123
columnFamilyDescriptors,
120124
columnFamilyHandles,
121-
RocksDBOperationUtils.createColumnFamilyOptions(
122-
columnFamilyOptionsFactory, "default"),
125+
RocksDBOperationUtils.createColumnFamilyOptions(tempDBCfFactory, "default"),
123126
dbOptions);
124127

125128
return new RestoredDBInstance(
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.state.rocksdb;
20+
21+
import org.apache.flink.api.common.functions.OpenContext;
22+
import org.apache.flink.api.common.state.ValueState;
23+
import org.apache.flink.api.common.state.ValueStateDescriptor;
24+
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
25+
import org.apache.flink.api.java.tuple.Tuple2;
26+
import org.apache.flink.configuration.Configuration;
27+
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
28+
import org.apache.flink.runtime.state.KeyedStateBackend;
29+
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
30+
import org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
31+
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
32+
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
33+
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
34+
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
35+
import org.apache.flink.util.Collector;
36+
37+
import org.junit.jupiter.api.Test;
38+
import org.junit.jupiter.api.io.TempDir;
39+
import org.rocksdb.ColumnFamilyDescriptor;
40+
import org.rocksdb.ColumnFamilyHandle;
41+
import org.rocksdb.ColumnFamilyOptions;
42+
43+
import java.util.LinkedHashMap;
44+
45+
import static org.assertj.core.api.Assertions.assertThat;
46+
47+
/**
48+
* Test to verify that auto-compaction is correctly configured during RocksDB incremental restore
49+
* with ingest DB mode. This test ensures that production DBs maintain auto-compaction enabled while
50+
* temporary DBs used during restore have auto-compaction disabled for performance.
51+
*/
52+
public class RocksDBAutoCompactionIngestRestoreTest {
53+
54+
@TempDir private java.nio.file.Path tempFolder;
55+
56+
private static final int MAX_PARALLELISM = 10;
57+
58+
@Test
59+
public void testAutoCompactionEnabledWithIngestDBRestore() throws Exception {
60+
// Create two subtask snapshots and merge them to trigger the multi-state-handle scenario
61+
// required for reproducing the ingest DB restore path
62+
OperatorSubtaskState operatorSubtaskState =
63+
AbstractStreamOperatorTestHarness.repackageState(
64+
createSubtaskSnapshot(0), createSubtaskSnapshot(1));
65+
66+
OperatorSubtaskState initState =
67+
AbstractStreamOperatorTestHarness.repartitionOperatorState(
68+
operatorSubtaskState, MAX_PARALLELISM, 2, 1, 0);
69+
70+
// Restore with ingest DB mode and verify auto-compaction
71+
try (KeyedOneInputStreamOperatorTestHarness<String, Tuple2<String, String>, String>
72+
harness = createTestHarness(new TestKeyedFunction(), MAX_PARALLELISM, 1, 0)) {
73+
74+
EmbeddedRocksDBStateBackend stateBackend = createStateBackend(true);
75+
harness.setStateBackend(stateBackend);
76+
harness.setCheckpointStorage(
77+
new FileSystemCheckpointStorage(
78+
"file://" + tempFolder.resolve("checkpoint-restore").toAbsolutePath()));
79+
80+
harness.initializeState(initState);
81+
harness.open();
82+
83+
verifyAutoCompactionEnabled(harness);
84+
}
85+
}
86+
87+
private OperatorSubtaskState createSubtaskSnapshot(int subtaskIndex) throws Exception {
88+
try (KeyedOneInputStreamOperatorTestHarness<String, Tuple2<String, String>, String>
89+
harness =
90+
createTestHarness(
91+
new TestKeyedFunction(), MAX_PARALLELISM, 2, subtaskIndex)) {
92+
93+
harness.setStateBackend(createStateBackend(false));
94+
harness.setCheckpointStorage(
95+
new FileSystemCheckpointStorage(
96+
"file://"
97+
+ tempFolder
98+
.resolve("checkpoint-subtask" + subtaskIndex)
99+
.toAbsolutePath()));
100+
harness.open();
101+
102+
// Create an empty snapshot - data content doesn't matter for this test
103+
return harness.snapshot(0, 0);
104+
}
105+
}
106+
107+
private void verifyAutoCompactionEnabled(
108+
KeyedOneInputStreamOperatorTestHarness<String, Tuple2<String, String>, String> harness)
109+
throws Exception {
110+
KeyedStateBackend<String> backend = harness.getOperator().getKeyedStateBackend();
111+
assertThat(backend).isNotNull();
112+
113+
LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation =
114+
((RocksDBKeyedStateBackend<String>) backend).getKvStateInformation();
115+
116+
assertThat(kvStateInformation).as("kvStateInformation should not be empty").isNotEmpty();
117+
118+
for (RocksDbKvStateInfo stateInfo : kvStateInformation.values()) {
119+
ColumnFamilyHandle handle = stateInfo.columnFamilyHandle;
120+
assertThat(handle).isNotNull();
121+
122+
ColumnFamilyDescriptor descriptor = handle.getDescriptor();
123+
ColumnFamilyOptions options = descriptor.getOptions();
124+
125+
assertThat(options.disableAutoCompactions())
126+
.as(
127+
"Production DB should have auto-compaction enabled for column family: "
128+
+ stateInfo.metaInfo.getName())
129+
.isFalse();
130+
}
131+
}
132+
133+
private KeyedOneInputStreamOperatorTestHarness<String, Tuple2<String, String>, String>
134+
createTestHarness(
135+
TestKeyedFunction keyedFunction,
136+
int maxParallelism,
137+
int parallelism,
138+
int subtaskIndex)
139+
throws Exception {
140+
141+
return new KeyedOneInputStreamOperatorTestHarness<>(
142+
new KeyedProcessOperator<>(keyedFunction),
143+
tuple2 -> tuple2.f0,
144+
BasicTypeInfo.STRING_TYPE_INFO,
145+
maxParallelism,
146+
parallelism,
147+
subtaskIndex);
148+
}
149+
150+
private EmbeddedRocksDBStateBackend createStateBackend(boolean useIngestDbRestoreMode) {
151+
Configuration config = new Configuration();
152+
config.set(RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, useIngestDbRestoreMode);
153+
154+
EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend(true);
155+
return stateBackend.configure(config, getClass().getClassLoader());
156+
}
157+
158+
private static class TestKeyedFunction
159+
extends KeyedProcessFunction<String, Tuple2<String, String>, String> {
160+
private ValueState<String> state;
161+
162+
@Override
163+
public void open(OpenContext openContext) throws Exception {
164+
super.open(openContext);
165+
state =
166+
getRuntimeContext()
167+
.getState(new ValueStateDescriptor<>("test-state", String.class));
168+
}
169+
170+
@Override
171+
public void processElement(Tuple2<String, String> value, Context ctx, Collector<String> out)
172+
throws Exception {
173+
state.update(value.f1);
174+
out.collect(value.f1);
175+
}
176+
}
177+
}

0 commit comments

Comments
 (0)