Skip to content

Commit d0bade5

Browse files
shank9918Shashank Gowridhwanilpatel
authored
Translog manager & checkpoint tracker integration in CompositeEngine (#19956)
* Adding interfaces for indexing and deletion strategy planner * Translog manager and local checkpoint tracker integration with CompositeEngine * Interim workaround until integration of CompositeEngine with IndexShard is complete * Fixing merges * CompositeDataFormatWriterPool optimizations * Update merge thread count to 1 * Fix thread issue for merge * Changing max merge to 5 * Removed logs * Compile fix * Adding static FileInfos.empty() to be used when no data flushed * Fixing forceMerge in CompositeEngine --------- Co-authored-by: Shashank Gowri <[email protected]> Co-authored-by: Dhwanil Patel <[email protected]>
1 parent ac4466d commit d0bade5

35 files changed

+1879
-1128
lines changed

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/ParquetExecutionEngine.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,23 @@
88
import org.apache.arrow.vector.types.pojo.Schema;
99
import org.opensearch.index.engine.exec.DataFormat;
1010
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
11+
import org.opensearch.index.engine.exec.Merger;
1112
import org.opensearch.index.engine.exec.RefreshInput;
1213
import org.opensearch.index.engine.exec.RefreshResult;
1314
import org.opensearch.index.engine.exec.Writer;
1415
import org.opensearch.index.engine.exec.WriterFileSet;
15-
import org.opensearch.index.engine.exec.Merger;
1616
import org.opensearch.index.shard.ShardPath;
1717

1818
import java.io.IOException;
19+
import java.nio.file.DirectoryStream;
20+
import java.nio.file.Files;
1921
import java.nio.file.Path;
2022
import java.util.ArrayList;
2123
import java.util.List;
2224
import java.util.function.Supplier;
25+
import java.util.regex.Matcher;
26+
import java.util.regex.Pattern;
27+
import java.util.stream.StreamSupport;
2328

2429
import static com.parquet.parquetdataformat.engine.ParquetDataFormat.PARQUET_DATA_FORMAT;
2530

@@ -54,7 +59,10 @@
5459
*/
5560
public class ParquetExecutionEngine implements IndexingExecutionEngine<ParquetDataFormat> {
5661

57-
public static final String FILE_NAME_PREFIX = "_parquet_file_generation";
62+
private static final Pattern FILE_PATTERN = Pattern.compile(".*_(\\d+)\\.parquet$", Pattern.CASE_INSENSITIVE);
63+
private static final String FILE_NAME_PREFIX = "_parquet_file_generation";
64+
private static final String FILE_NAME_EXT = ".parquet";
65+
5866
private final Supplier<Schema> schema;
5967
private final List<WriterFileSet> filesWrittenAlready = new ArrayList<>();
6068
private final ShardPath shardPath;
@@ -65,14 +73,31 @@ public ParquetExecutionEngine(Supplier<Schema> schema, ShardPath shardPath) {
6573
this.shardPath = shardPath;
6674
}
6775

76+
@Override
77+
public void loadWriterFiles(ShardPath shardPath) throws IOException {
78+
try (DirectoryStream<Path> stream = Files.newDirectoryStream(shardPath.getDataPath(), "*" + FILE_NAME_EXT)) {
79+
StreamSupport.stream(stream.spliterator(), false)
80+
.map(Path::getFileName)
81+
.map(Path::toString)
82+
.map(FILE_PATTERN::matcher)
83+
.filter(Matcher::matches)
84+
.map(m -> WriterFileSet.builder()
85+
.directory(shardPath.getDataPath())
86+
.writerGeneration(Long.parseLong(m.group(1)))
87+
.addFile(m.group(0))
88+
.build())
89+
.forEach(filesWrittenAlready::add);
90+
}
91+
}
92+
6893
@Override
6994
public List<String> supportedFieldTypes() {
7095
return List.of();
7196
}
7297

7398
@Override
7499
public Writer<ParquetDocumentInput> createWriter(long writerGeneration) throws IOException {
75-
String fileName = Path.of(shardPath.getDataPath().toString(), FILE_NAME_PREFIX + "_" + writerGeneration + ".parquet").toString();
100+
String fileName = Path.of(shardPath.getDataPath().toString(), FILE_NAME_PREFIX + "_" + writerGeneration + FILE_NAME_EXT).toString();
76101
return new ParquetWriter(fileName, schema.get(), writerGeneration);
77102
}
78103

@@ -85,7 +110,7 @@ public Merger getMerger() {
85110
public RefreshResult refresh(RefreshInput refreshInput) throws IOException {
86111
RefreshResult refreshResult = new RefreshResult();
87112
filesWrittenAlready.addAll(refreshInput.getWriterFiles());
88-
if(!refreshInput.getFilesToRemove().isEmpty()) {
113+
if (!refreshInput.getFilesToRemove().isEmpty()) {
89114
filesWrittenAlready.removeAll(refreshInput.getFilesToRemove());
90115
}
91116
refreshResult.add(PARQUET_DATA_FORMAT, filesWrittenAlready);

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/writer/ParquetWriter.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,17 @@ public WriteResult addDoc(ParquetDocumentInput d) throws IOException {
5353
@Override
5454
public FileInfos flush(FlushIn flushIn) throws IOException {
5555
String fileName = vsrManager.flush(flushIn);
56-
FileInfos fileInfos = new FileInfos();
5756
// no data flushed
5857
if (fileName == null) {
59-
return fileInfos;
58+
return FileInfos.empty();
6059
}
61-
WriterFileSet writerFileSet = new WriterFileSet(Path.of(fileName).getParent(), writerGeneration);
62-
writerFileSet.add(fileName);
63-
fileInfos.putWriterFileSet(PARQUET_DATA_FORMAT, writerFileSet);
64-
return fileInfos;
60+
Path file = Path.of(fileName);
61+
WriterFileSet writerFileSet = WriterFileSet.builder()
62+
.directory(file.getParent())
63+
.writerGeneration(writerGeneration)
64+
.addFile(file.getFileName().toString())
65+
.build();
66+
return FileInfos.builder().putWriterFileSet(PARQUET_DATA_FORMAT, writerFileSet).build();
6567
}
6668

6769
@Override
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.engine;
10+
11+
import org.opensearch.common.lucene.uid.Versions;
12+
import org.opensearch.index.seqno.SequenceNumbers;
13+
14+
public class DeletionStrategy extends OperationStrategy {
15+
16+
public final boolean currentlyDeleted;
17+
18+
public DeletionStrategy(
19+
boolean deleteFromLucene,
20+
boolean addStaleOpToEngine,
21+
boolean currentlyDeleted,
22+
long version,
23+
int reservedDocs,
24+
Engine.DeleteResult earlyResultOnPreflightError
25+
) {
26+
super(deleteFromLucene, addStaleOpToEngine, version, earlyResultOnPreflightError, reservedDocs);
27+
assert (deleteFromLucene && earlyResultOnPreflightError != null) == false :
28+
"can only delete from lucene or have a preflight result but not both." + "deleteFromLucene: " + deleteFromLucene
29+
+ " earlyResultOnPreFlightError:" + earlyResultOnPreflightError;
30+
this.currentlyDeleted = currentlyDeleted;
31+
}
32+
33+
public static DeletionStrategy skipDueToVersionConflict(
34+
VersionConflictEngineException e,
35+
long currentVersion,
36+
boolean currentlyDeleted
37+
) {
38+
final Engine.DeleteResult deleteResult = new Engine.DeleteResult(
39+
e,
40+
currentVersion,
41+
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
42+
SequenceNumbers.UNASSIGNED_SEQ_NO,
43+
currentlyDeleted == false
44+
);
45+
return new DeletionStrategy(false, false, currentlyDeleted, Versions.NOT_FOUND, 0, deleteResult);
46+
}
47+
48+
static DeletionStrategy processNormally(boolean currentlyDeleted, long versionOfDeletion, int reservedDocs) {
49+
return new DeletionStrategy(true, false, currentlyDeleted, versionOfDeletion, reservedDocs, null);
50+
51+
}
52+
53+
public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, long versionOfDeletion) {
54+
return new DeletionStrategy(false, false, currentlyDeleted, versionOfDeletion, 0, null);
55+
}
56+
57+
static DeletionStrategy processAsStaleOp(long versionOfDeletion) {
58+
return new DeletionStrategy(false, true, false, versionOfDeletion, 0, null);
59+
}
60+
61+
static DeletionStrategy failAsTooManyDocs(Exception e) {
62+
final Engine.DeleteResult deleteResult = new Engine.DeleteResult(
63+
e,
64+
Versions.NOT_FOUND,
65+
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
66+
SequenceNumbers.UNASSIGNED_SEQ_NO,
67+
false
68+
);
69+
return new DeletionStrategy(false, false, false, Versions.NOT_FOUND, 0, deleteResult);
70+
}
71+
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.engine;
10+
11+
import org.opensearch.common.CheckedBiFunction;
12+
import org.opensearch.common.CheckedFunction;
13+
import org.opensearch.common.lucene.uid.Versions;
14+
import org.opensearch.core.index.shard.ShardId;
15+
import org.opensearch.index.engine.exec.bridge.Indexer;
16+
import org.opensearch.index.seqno.SequenceNumbers;
17+
18+
import java.io.IOException;
19+
import java.util.function.BiFunction;
20+
import java.util.function.Predicate;
21+
22+
public class DeletionStrategyPlanner implements OperationStrategyPlanner {
23+
24+
private final EngineConfig engineConfig;
25+
private final ShardId shardId;
26+
private final Predicate<Engine.Operation> hasBeenProcessedBefore;
27+
private final CheckedFunction<Engine.Operation, Indexer.OpVsEngineDocStatus, IOException> opVsEngineDocStatusFunction;
28+
private final CheckedBiFunction<Engine.Operation, Boolean, VersionValue, IOException> docVersionSupplier;
29+
private final BiFunction<Engine.Operation, Integer, Exception> tryAcquireInFlightDocs;
30+
31+
public DeletionStrategyPlanner(
32+
EngineConfig engineConfig,
33+
ShardId shardId,
34+
Predicate<Engine.Operation> hasBeenProcessedBefore,
35+
CheckedFunction<Engine.Operation, Indexer.OpVsEngineDocStatus, IOException> opVsEngineDocStatusFunction,
36+
CheckedBiFunction<Engine.Operation, Boolean, VersionValue, IOException> docVersionSupplier,
37+
BiFunction<Engine.Operation, Integer, Exception> tryAcquireInFlightDocs
38+
) {
39+
this.engineConfig = engineConfig;
40+
this.shardId = shardId;
41+
this.hasBeenProcessedBefore = hasBeenProcessedBefore;
42+
this.opVsEngineDocStatusFunction = opVsEngineDocStatusFunction;
43+
this.docVersionSupplier = docVersionSupplier;
44+
this.tryAcquireInFlightDocs = tryAcquireInFlightDocs;
45+
}
46+
47+
@Override
48+
public DeletionStrategy planOperationAsPrimary(Engine.Operation operation) throws IOException {
49+
final Engine.Delete delete = (Engine.Delete) operation;
50+
assert delete.origin() == Engine.Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
51+
// resolve operation from external to internal
52+
final VersionValue versionValue = docVersionSupplier.apply(delete, delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO);
53+
// TODO - assert incrementVersionLookup();
54+
final long currentVersion;
55+
final boolean currentlyDeleted;
56+
if (versionValue == null) {
57+
currentVersion = Versions.NOT_FOUND;
58+
currentlyDeleted = true;
59+
} else {
60+
currentVersion = versionValue.version;
61+
currentlyDeleted = versionValue.isDelete();
62+
}
63+
final DeletionStrategy plan;
64+
if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && currentlyDeleted) {
65+
final VersionConflictEngineException e = new VersionConflictEngineException(
66+
shardId,
67+
delete.id(),
68+
delete.getIfSeqNo(),
69+
delete.getIfPrimaryTerm(),
70+
SequenceNumbers.UNASSIGNED_SEQ_NO,
71+
SequenceNumbers.UNASSIGNED_PRIMARY_TERM
72+
);
73+
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, true);
74+
} else if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (versionValue.seqNo != delete.getIfSeqNo()
75+
|| versionValue.term != delete.getIfPrimaryTerm())) {
76+
final VersionConflictEngineException e = new VersionConflictEngineException(
77+
shardId,
78+
delete.id(),
79+
delete.getIfSeqNo(),
80+
delete.getIfPrimaryTerm(),
81+
versionValue.seqNo,
82+
versionValue.term
83+
);
84+
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
85+
} else if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) {
86+
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted);
87+
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
88+
} else {
89+
final Exception reserveError = tryAcquireInFlightDocs.apply(delete, 1);
90+
if (reserveError != null) {
91+
plan = DeletionStrategy.failAsTooManyDocs(reserveError);
92+
} else {
93+
final long versionOfDeletion = delete.versionType().updateVersion(currentVersion, delete.version());
94+
plan = DeletionStrategy.processNormally(currentlyDeleted, versionOfDeletion, 1);
95+
}
96+
}
97+
return plan;
98+
}
99+
100+
@Override
101+
public DeletionStrategy planOperationAsNonPrimary(Engine.Operation operation) throws IOException {
102+
final Engine.Delete delete = (Engine.Delete) operation;
103+
assert operation.origin() != Engine.Operation.Origin.PRIMARY : "planing as primary but got " + operation.origin();
104+
final DeletionStrategy plan;
105+
if (hasBeenProcessedBefore.test(delete)) {
106+
// the operation seq# was processed thus this operation was already put into lucene
107+
// this can happen during recovery where older operations are sent from the translog that are already
108+
// part of the lucene commit (either from a peer recovery or a local translog)
109+
// or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in
110+
// question may have been deleted in an out of order op that is not replayed.
111+
// See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
112+
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery
113+
plan = DeletionStrategy.processButSkipLucene(false, delete.version());
114+
} else {
115+
boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode();
116+
final Indexer.OpVsEngineDocStatus opVsLucene = opVsEngineDocStatusFunction.apply(delete);
117+
if (opVsLucene == Indexer.OpVsEngineDocStatus.OP_STALE_OR_EQUAL) {
118+
if (segRepEnabled) {
119+
// For segrep based indices, we can't completely rely on localCheckpointTracker
120+
// as the preserved checkpoint may not have all the operations present in lucene
121+
// we don't need to index it again as stale op as it would create multiple documents for same seq no
122+
plan = DeletionStrategy.processButSkipLucene(false, delete.version());
123+
} else {
124+
plan = DeletionStrategy.processAsStaleOp(delete.version());
125+
}
126+
} else {
127+
plan = DeletionStrategy.processNormally(opVsLucene == Indexer.OpVsEngineDocStatus.DOC_NOT_FOUND, delete.version(), 0);
128+
}
129+
}
130+
return plan;
131+
}
132+
}

0 commit comments

Comments
 (0)