Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,23 @@
import org.apache.arrow.vector.types.pojo.Schema;
import org.opensearch.index.engine.exec.DataFormat;
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
import org.opensearch.index.engine.exec.Merger;
import org.opensearch.index.engine.exec.RefreshInput;
import org.opensearch.index.engine.exec.RefreshResult;
import org.opensearch.index.engine.exec.Writer;
import org.opensearch.index.engine.exec.WriterFileSet;
import org.opensearch.index.engine.exec.Merger;
import org.opensearch.index.shard.ShardPath;

import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.StreamSupport;

import static com.parquet.parquetdataformat.engine.ParquetDataFormat.PARQUET_DATA_FORMAT;

Expand Down Expand Up @@ -54,7 +59,10 @@
*/
public class ParquetExecutionEngine implements IndexingExecutionEngine<ParquetDataFormat> {

public static final String FILE_NAME_PREFIX = "_parquet_file_generation";
private static final Pattern FILE_PATTERN = Pattern.compile(".*_(\\d+)\\.parquet$", Pattern.CASE_INSENSITIVE);
private static final String FILE_NAME_PREFIX = "_parquet_file_generation";
private static final String FILE_NAME_EXT = ".parquet";

private final Supplier<Schema> schema;
private final List<WriterFileSet> filesWrittenAlready = new ArrayList<>();
private final ShardPath shardPath;
Expand All @@ -65,14 +73,31 @@ public ParquetExecutionEngine(Supplier<Schema> schema, ShardPath shardPath) {
this.shardPath = shardPath;
}

@Override
public void loadWriterFiles(ShardPath shardPath) throws IOException {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(shardPath.getDataPath(), "*" + FILE_NAME_EXT)) {
StreamSupport.stream(stream.spliterator(), false)
.map(Path::getFileName)
.map(Path::toString)
.map(FILE_PATTERN::matcher)
.filter(Matcher::matches)
.map(m -> WriterFileSet.builder()
.directory(shardPath.getDataPath())
.writerGeneration(Long.parseLong(m.group(1)))
.addFile(m.group(0))
.build())
.forEach(filesWrittenAlready::add);
}
}

@Override
public List<String> supportedFieldTypes() {
return List.of();
}

@Override
public Writer<ParquetDocumentInput> createWriter(long writerGeneration) throws IOException {
String fileName = Path.of(shardPath.getDataPath().toString(), FILE_NAME_PREFIX + "_" + writerGeneration + ".parquet").toString();
String fileName = Path.of(shardPath.getDataPath().toString(), FILE_NAME_PREFIX + "_" + writerGeneration + FILE_NAME_EXT).toString();
return new ParquetWriter(fileName, schema.get(), writerGeneration);
}

Expand All @@ -85,7 +110,7 @@ public Merger getMerger() {
public RefreshResult refresh(RefreshInput refreshInput) throws IOException {
RefreshResult refreshResult = new RefreshResult();
filesWrittenAlready.addAll(refreshInput.getWriterFiles());
if(!refreshInput.getFilesToRemove().isEmpty()) {
if (!refreshInput.getFilesToRemove().isEmpty()) {
filesWrittenAlready.removeAll(refreshInput.getFilesToRemove());
}
refreshResult.add(PARQUET_DATA_FORMAT, filesWrittenAlready);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,17 @@ public WriteResult addDoc(ParquetDocumentInput d) throws IOException {
@Override
public FileInfos flush(FlushIn flushIn) throws IOException {
String fileName = vsrManager.flush(flushIn);
FileInfos fileInfos = new FileInfos();
// no data flushed
if (fileName == null) {
return fileInfos;
return FileInfos.empty();
}
WriterFileSet writerFileSet = new WriterFileSet(Path.of(fileName).getParent(), writerGeneration);
writerFileSet.add(fileName);
fileInfos.putWriterFileSet(PARQUET_DATA_FORMAT, writerFileSet);
return fileInfos;
Path file = Path.of(fileName);
WriterFileSet writerFileSet = WriterFileSet.builder()
.directory(file.getParent())
.writerGeneration(writerGeneration)
.addFile(file.getFileName().toString())
.build();
return FileInfos.builder().putWriterFileSet(PARQUET_DATA_FORMAT, writerFileSet).build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.engine;

import org.opensearch.common.lucene.uid.Versions;
import org.opensearch.index.seqno.SequenceNumbers;

public class DeletionStrategy extends OperationStrategy {

public final boolean currentlyDeleted;

public DeletionStrategy(
boolean deleteFromLucene,
boolean addStaleOpToEngine,
boolean currentlyDeleted,
long version,
int reservedDocs,
Engine.DeleteResult earlyResultOnPreflightError
) {
super(deleteFromLucene, addStaleOpToEngine, version, earlyResultOnPreflightError, reservedDocs);
assert (deleteFromLucene && earlyResultOnPreflightError != null) == false :
"can only delete from lucene or have a preflight result but not both." + "deleteFromLucene: " + deleteFromLucene
+ " earlyResultOnPreFlightError:" + earlyResultOnPreflightError;
this.currentlyDeleted = currentlyDeleted;
}

public static DeletionStrategy skipDueToVersionConflict(
VersionConflictEngineException e,
long currentVersion,
boolean currentlyDeleted
) {
final Engine.DeleteResult deleteResult = new Engine.DeleteResult(
e,
currentVersion,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
SequenceNumbers.UNASSIGNED_SEQ_NO,
currentlyDeleted == false
);
return new DeletionStrategy(false, false, currentlyDeleted, Versions.NOT_FOUND, 0, deleteResult);
}

static DeletionStrategy processNormally(boolean currentlyDeleted, long versionOfDeletion, int reservedDocs) {
return new DeletionStrategy(true, false, currentlyDeleted, versionOfDeletion, reservedDocs, null);

}

public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, long versionOfDeletion) {
return new DeletionStrategy(false, false, currentlyDeleted, versionOfDeletion, 0, null);
}

static DeletionStrategy processAsStaleOp(long versionOfDeletion) {
return new DeletionStrategy(false, true, false, versionOfDeletion, 0, null);
}

static DeletionStrategy failAsTooManyDocs(Exception e) {
final Engine.DeleteResult deleteResult = new Engine.DeleteResult(
e,
Versions.NOT_FOUND,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
SequenceNumbers.UNASSIGNED_SEQ_NO,
false
);
return new DeletionStrategy(false, false, false, Versions.NOT_FOUND, 0, deleteResult);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.engine;

import org.opensearch.common.CheckedBiFunction;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.lucene.uid.Versions;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.engine.exec.bridge.Indexer;
import org.opensearch.index.seqno.SequenceNumbers;

import java.io.IOException;
import java.util.function.BiFunction;
import java.util.function.Predicate;

public class DeletionStrategyPlanner implements OperationStrategyPlanner {

private final EngineConfig engineConfig;
private final ShardId shardId;
private final Predicate<Engine.Operation> hasBeenProcessedBefore;
private final CheckedFunction<Engine.Operation, Indexer.OpVsEngineDocStatus, IOException> opVsEngineDocStatusFunction;
private final CheckedBiFunction<Engine.Operation, Boolean, VersionValue, IOException> docVersionSupplier;
private final BiFunction<Engine.Operation, Integer, Exception> tryAcquireInFlightDocs;

public DeletionStrategyPlanner(
EngineConfig engineConfig,
ShardId shardId,
Predicate<Engine.Operation> hasBeenProcessedBefore,
CheckedFunction<Engine.Operation, Indexer.OpVsEngineDocStatus, IOException> opVsEngineDocStatusFunction,
CheckedBiFunction<Engine.Operation, Boolean, VersionValue, IOException> docVersionSupplier,
BiFunction<Engine.Operation, Integer, Exception> tryAcquireInFlightDocs
) {
this.engineConfig = engineConfig;
this.shardId = shardId;
this.hasBeenProcessedBefore = hasBeenProcessedBefore;
this.opVsEngineDocStatusFunction = opVsEngineDocStatusFunction;
this.docVersionSupplier = docVersionSupplier;
this.tryAcquireInFlightDocs = tryAcquireInFlightDocs;
}

@Override
public DeletionStrategy planOperationAsPrimary(Engine.Operation operation) throws IOException {
final Engine.Delete delete = (Engine.Delete) operation;
assert delete.origin() == Engine.Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
// resolve operation from external to internal
final VersionValue versionValue = docVersionSupplier.apply(delete, delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO);
// TODO - assert incrementVersionLookup();
final long currentVersion;
final boolean currentlyDeleted;
if (versionValue == null) {
currentVersion = Versions.NOT_FOUND;
currentlyDeleted = true;
} else {
currentVersion = versionValue.version;
currentlyDeleted = versionValue.isDelete();
}
final DeletionStrategy plan;
if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && currentlyDeleted) {
final VersionConflictEngineException e = new VersionConflictEngineException(
shardId,
delete.id(),
delete.getIfSeqNo(),
delete.getIfPrimaryTerm(),
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM
);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, true);
} else if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (versionValue.seqNo != delete.getIfSeqNo()
|| versionValue.term != delete.getIfPrimaryTerm())) {
final VersionConflictEngineException e = new VersionConflictEngineException(
shardId,
delete.id(),
delete.getIfSeqNo(),
delete.getIfPrimaryTerm(),
versionValue.seqNo,
versionValue.term
);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
} else if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
} else {
final Exception reserveError = tryAcquireInFlightDocs.apply(delete, 1);
if (reserveError != null) {
plan = DeletionStrategy.failAsTooManyDocs(reserveError);
} else {
final long versionOfDeletion = delete.versionType().updateVersion(currentVersion, delete.version());
plan = DeletionStrategy.processNormally(currentlyDeleted, versionOfDeletion, 1);
}
}
return plan;
}

@Override
public DeletionStrategy planOperationAsNonPrimary(Engine.Operation operation) throws IOException {
final Engine.Delete delete = (Engine.Delete) operation;
assert operation.origin() != Engine.Operation.Origin.PRIMARY : "planing as primary but got " + operation.origin();
final DeletionStrategy plan;
if (hasBeenProcessedBefore.test(delete)) {
// the operation seq# was processed thus this operation was already put into lucene
// this can happen during recovery where older operations are sent from the translog that are already
// part of the lucene commit (either from a peer recovery or a local translog)
// or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in
// question may have been deleted in an out of order op that is not replayed.
// See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery
plan = DeletionStrategy.processButSkipLucene(false, delete.version());
} else {
boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode();
final Indexer.OpVsEngineDocStatus opVsLucene = opVsEngineDocStatusFunction.apply(delete);
if (opVsLucene == Indexer.OpVsEngineDocStatus.OP_STALE_OR_EQUAL) {
if (segRepEnabled) {
// For segrep based indices, we can't completely rely on localCheckpointTracker
// as the preserved checkpoint may not have all the operations present in lucene
// we don't need to index it again as stale op as it would create multiple documents for same seq no
plan = DeletionStrategy.processButSkipLucene(false, delete.version());
} else {
plan = DeletionStrategy.processAsStaleOp(delete.version());
}
} else {
plan = DeletionStrategy.processNormally(opVsLucene == Indexer.OpVsEngineDocStatus.DOC_NOT_FOUND, delete.version(), 0);
}
}
return plan;
}
}
Loading
Loading