Skip to content

Add copy_from option to the Append processor #132003

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/reference/enrich-processor/append-processor.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ $$$append-options$$$
| Name | Required | Default | Description |
| --- | --- | --- | --- |
| `field` | yes | - | The field to be appended to. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). |
| `value` | yes | - | The value to be appended. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). |
| `value` | yes* | - | The value to be appended. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). May specify only one of `value` or `copy_from`. |
| `copy_from` | no | - | The source field to be append. Cannot set `value` simultaneously. |
| `allow_duplicates` | no | true | If `false`, the processor does not appendvalues already present in the field. |
| `media_type` | no | `application/json` | The media type for encoding `value`. Applies only when `value` is a[template snippet](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). Must be one of `application/json`, `text/plain`, or`application/x-www-form-urlencoded`. |
| `description` | no | - | Description of the processor. Useful for describing the purpose of the processor or its configuration. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import java.util.Map;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;

/**
* Processor that appends value or values to existing lists. If the field is not present a new list holding the
* provided values will be added. If the field is a scalar it will be converted to a single item list and the provided
Expand All @@ -32,12 +34,21 @@ public final class AppendProcessor extends AbstractProcessor {

private final TemplateScript.Factory field;
private final ValueSource value;
private final String copyFrom;
private final boolean allowDuplicates;

AppendProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, boolean allowDuplicates) {
AppendProcessor(
String tag,
String description,
TemplateScript.Factory field,
ValueSource value,
String copyFrom,
boolean allowDuplicates
) {
super(tag, description);
this.field = field;
this.value = value;
this.copyFrom = copyFrom;
this.allowDuplicates = allowDuplicates;
}

Expand All @@ -52,7 +63,12 @@ public ValueSource getValue() {
@Override
public IngestDocument execute(IngestDocument document) throws Exception {
String path = document.renderTemplate(field);
document.appendFieldValue(path, value, allowDuplicates);
if (copyFrom != null) {
Object fieldValue = document.getFieldValue(copyFrom, Object.class);
document.appendFieldValue(path, IngestDocument.deepCopy(fieldValue), allowDuplicates);
} else {
document.appendFieldValue(path, value, allowDuplicates);
}
return document;
}

Expand All @@ -78,17 +94,27 @@ public AppendProcessor create(
ProjectId projectId
) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
String copyFrom = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "copy_from");
String mediaType = ConfigurationUtils.readMediaTypeProperty(TYPE, processorTag, config, "media_type", "application/json");
ValueSource valueSource = null;
if (copyFrom == null) {
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
valueSource = ValueSource.wrap(value, scriptService, Map.of(Script.CONTENT_TYPE_OPTION, mediaType));
} else {
Object value = config.remove("value");
if (value != null) {
throw newConfigurationException(
TYPE,
processorTag,
"copy_from",
"cannot set both `copy_from` and `value` in the same processor"
);
}
}
boolean allowDuplicates = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "allow_duplicates", true);
TemplateScript.Factory compiledTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", field, scriptService);
String mediaType = ConfigurationUtils.readMediaTypeProperty(TYPE, processorTag, config, "media_type", "application/json");
return new AppendProcessor(
processorTag,
description,
compiledTemplate,
ValueSource.wrap(value, scriptService, Map.of(Script.CONTENT_TYPE_OPTION, mediaType)),
allowDuplicates
);

return new AppendProcessor(processorTag, description, compiledTemplate, valueSource, copyFrom, allowDuplicates);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -51,13 +52,13 @@ public void testAppendValuesToExistingList() throws Exception {
if (randomBoolean()) {
Object value = scalar.randomValue();
values.add(value);
appendProcessor = createAppendProcessor(field, value, true);
appendProcessor = createAppendProcessor(field, value, null, true);
} else {
int valuesSize = randomIntBetween(0, 10);
for (int i = 0; i < valuesSize; i++) {
values.add(scalar.randomValue());
}
appendProcessor = createAppendProcessor(field, values, true);
appendProcessor = createAppendProcessor(field, values, null, true);
}
appendProcessor.execute(ingestDocument);
Object fieldValue = ingestDocument.getFieldValue(field, Object.class);
Expand All @@ -80,13 +81,13 @@ public void testAppendValuesToNonExistingList() throws Exception {
if (randomBoolean()) {
Object value = scalar.randomValue();
values.add(value);
appendProcessor = createAppendProcessor(field, value, true);
appendProcessor = createAppendProcessor(field, value, null, true);
} else {
int valuesSize = randomIntBetween(0, 10);
for (int i = 0; i < valuesSize; i++) {
values.add(scalar.randomValue());
}
appendProcessor = createAppendProcessor(field, values, true);
appendProcessor = createAppendProcessor(field, values, null, true);
}
appendProcessor.execute(ingestDocument);
List<?> list = ingestDocument.getFieldValue(field, List.class);
Expand All @@ -104,13 +105,13 @@ public void testConvertScalarToList() throws Exception {
if (randomBoolean()) {
Object value = scalar.randomValue();
values.add(value);
appendProcessor = createAppendProcessor(field, value, true);
appendProcessor = createAppendProcessor(field, value, null, true);
} else {
int valuesSize = randomIntBetween(0, 10);
for (int i = 0; i < valuesSize; i++) {
values.add(scalar.randomValue());
}
appendProcessor = createAppendProcessor(field, values, true);
appendProcessor = createAppendProcessor(field, values, null, true);
}
appendProcessor.execute(ingestDocument);
List<?> fieldValue = ingestDocument.getFieldValue(field, List.class);
Expand All @@ -128,7 +129,7 @@ public void testAppendingDuplicateValueToScalarDoesNotModifyDocument() throws Ex

List<Object> valuesToAppend = new ArrayList<>();
valuesToAppend.add(originalValue);
Processor appendProcessor = createAppendProcessor(field, valuesToAppend, false);
Processor appendProcessor = createAppendProcessor(field, valuesToAppend, null, false);
appendProcessor.execute(ingestDocument);
Object fieldValue = ingestDocument.getFieldValue(field, Object.class);
assertThat(fieldValue, not(instanceOf(List.class)));
Expand All @@ -143,7 +144,7 @@ public void testAppendingUniqueValueToScalar() throws Exception {
List<Object> valuesToAppend = new ArrayList<>();
String newValue = randomValueOtherThan(originalValue, () -> randomAlphaOfLengthBetween(1, 10));
valuesToAppend.add(newValue);
Processor appendProcessor = createAppendProcessor(field, valuesToAppend, false);
Processor appendProcessor = createAppendProcessor(field, valuesToAppend, null, false);
appendProcessor.execute(ingestDocument);
List<?> list = ingestDocument.getFieldValue(field, List.class);
assertThat(list.size(), equalTo(2));
Expand Down Expand Up @@ -172,19 +173,149 @@ public void testAppendingToListWithDuplicatesDisallowed() throws Exception {
Collections.sort(valuesToAppend);

// attempt to append both new and existing values
Processor appendProcessor = createAppendProcessor(originalField, valuesToAppend, false);
Processor appendProcessor = createAppendProcessor(originalField, valuesToAppend, null, false);
appendProcessor.execute(ingestDocument);
List<?> fieldValue = ingestDocument.getFieldValue(originalField, List.class);
assertThat(fieldValue, sameInstance(list));
assertThat(fieldValue, containsInAnyOrder(expectedValues.toArray()));
}

private static Processor createAppendProcessor(String fieldName, Object fieldValue, boolean allowDuplicates) {
public void testCopyFromOtherField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());

// generate values, add some to a target field, the rest to a source field
int size = randomIntBetween(0, 10);
List<String> allValues = Stream.generate(() -> randomAlphaOfLengthBetween(1, 10)).limit(size).collect(Collectors.toList());
List<String> originalValues = randomSubsetOf(allValues);
List<String> additionalValues = new ArrayList<>(Sets.difference(new HashSet<>(allValues), new HashSet<>(originalValues)));
List<String> targetFieldValue = new ArrayList<>(originalValues);
String targetField = RandomDocumentPicks.addRandomField(random(), ingestDocument, targetFieldValue);
String sourceField = RandomDocumentPicks.addRandomField(random(), ingestDocument, additionalValues);

Processor appendProcessor = createAppendProcessor(targetField, null, sourceField, false);
appendProcessor.execute(ingestDocument);
List<?> fieldValue = ingestDocument.getFieldValue(targetField, List.class);
assertThat(fieldValue, sameInstance(targetFieldValue));
assertThat(fieldValue, containsInAnyOrder(allValues.toArray()));
}

public void testCopyFromCopiesNonPrimitiveMutableTypes() throws Exception {
final String sourceField = "sourceField";
final String targetField = "targetField";
Processor processor = createAppendProcessor(targetField, null, sourceField, false);

// map types
Map<String, Object> document = new HashMap<>();
Map<String, Object> sourceMap = new HashMap<>();
sourceMap.put("foo", "bar");
document.put(sourceField, sourceMap);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
IngestDocument output = processor.execute(ingestDocument);
sourceMap.put("foo", "not-bar");
Map<?, ?> outputMap = (Map<?, ?>) output.getFieldValue(targetField, List.class).getFirst();
assertThat(outputMap.get("foo"), equalTo("bar"));

// set types
document = new HashMap<>();
Set<String> sourceSet = randomUnique(() -> randomAlphaOfLength(5), 5);
Set<String> preservedSet = new HashSet<>(sourceSet);
document.put(sourceField, sourceSet);
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
processor.execute(ingestDocument);
sourceSet.add(randomValueOtherThanMany(sourceSet::contains, () -> randomAlphaOfLength(5)));
Set<?> outputSet = (Set<?>) ingestDocument.getFieldValue(targetField, List.class).getFirst();
assertThat(outputSet, equalTo(preservedSet));

// list types (the outer list isn't used, but an inner list should be copied)
document = new HashMap<>();
List<String> sourceList = randomList(1, 5, () -> randomAlphaOfLength(5));
List<String> preservedList = new ArrayList<>(sourceList);
List<List<String>> wrappedSourceList = List.of(sourceList);
document.put(sourceField, wrappedSourceList);
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
processor.execute(ingestDocument);
sourceList.add(randomValueOtherThanMany(sourceList::contains, () -> randomAlphaOfLength(5)));
List<?> unwrappedOutputList = (List<?>) ingestDocument.getFieldValue(targetField, List.class).getFirst();
assertThat(unwrappedOutputList, equalTo(preservedList));

// byte[] types
document = new HashMap<>();
byte[] sourceBytes = randomByteArrayOfLength(10);
byte[] preservedBytes = new byte[sourceBytes.length];
System.arraycopy(sourceBytes, 0, preservedBytes, 0, sourceBytes.length);
document.put(sourceField, sourceBytes);
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
processor.execute(ingestDocument);
sourceBytes[0] = sourceBytes[0] == 0 ? (byte) 1 : (byte) 0;
byte[] outputBytes = (byte[]) ingestDocument.getFieldValue(targetField, List.class).getFirst();
assertThat(outputBytes, equalTo(preservedBytes));

// Date types
document = new HashMap<>();
Date sourceDate = new Date();
Date preservedDate = new Date(sourceDate.getTime());
document.put(sourceField, sourceDate);
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
processor.execute(ingestDocument);
sourceDate.setTime(sourceDate.getTime() + 1);
Date outputDate = (Date) ingestDocument.getFieldValue(targetField, List.class).getFirst();
assertThat(outputDate, equalTo(preservedDate));
}

public void testCopyFromDeepCopiesNonPrimitiveMutableTypes() throws Exception {
final String sourceField = "sourceField";
final String targetField = "targetField";
Processor processor = createAppendProcessor(targetField, null, sourceField, false);
Map<String, Object> document = new HashMap<>();

// a root map with values of map, set, list, bytes, date
Map<String, Object> sourceMap = new HashMap<>();
sourceMap.put("foo", "bar");
Set<String> sourceSet = randomUnique(() -> randomAlphaOfLength(5), 5);
List<String> sourceList = randomList(1, 5, () -> randomAlphaOfLength(5));
byte[] sourceBytes = randomByteArrayOfLength(10);
Date sourceDate = new Date();
Map<String, Object> root = new HashMap<>();
root.put("foo", "bar");
root.put("map", sourceMap);
root.put("set", sourceSet);
root.put("list", sourceList);
root.put("bytes", sourceBytes);
root.put("date", sourceDate);

Set<String> preservedSet = new HashSet<>(sourceSet);
List<String> preservedList = new ArrayList<>(sourceList);
byte[] preservedBytes = new byte[sourceBytes.length];
System.arraycopy(sourceBytes, 0, preservedBytes, 0, sourceBytes.length);
Date preservedDate = new Date(sourceDate.getTime());

document.put(sourceField, root);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
IngestDocument output = processor.execute(ingestDocument);
Map<?, ?> outputRoot = (Map<?, ?>) output.getFieldValue(targetField, List.class).getFirst();

root.put("foo", "not-bar");
sourceMap.put("foo", "not-bar");
sourceSet.add(randomValueOtherThanMany(sourceSet::contains, () -> randomAlphaOfLength(5)));
sourceList.add(randomValueOtherThanMany(sourceList::contains, () -> randomAlphaOfLength(5)));
sourceBytes[0] = sourceBytes[0] == 0 ? (byte) 1 : (byte) 0;
sourceDate.setTime(sourceDate.getTime() + 1);

assertThat(outputRoot.get("foo"), equalTo("bar"));
assertThat(((Map<?, ?>) outputRoot.get("map")).get("foo"), equalTo("bar"));
assertThat(((Set<?>) outputRoot.get("set")), equalTo(preservedSet));
assertThat(((List<?>) outputRoot.get("list")), equalTo(preservedList));
assertThat(((byte[]) outputRoot.get("bytes")), equalTo(preservedBytes));
assertThat(((Date) outputRoot.get("date")), equalTo(preservedDate));
}

private static Processor createAppendProcessor(String fieldName, Object fieldValue, String copyFrom, boolean allowDuplicates) {
return new AppendProcessor(
randomAlphaOfLength(10),
null,
new TestTemplateService.MockTemplateScript.Factory(fieldName),
ValueSource.wrap(fieldValue, TestTemplateService.instance()),
copyFrom,
allowDuplicates
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public void testModifyFieldsOutsideArray() {
new CompoundProcessor(
false,
List.of(new UppercaseProcessor("_tag_upper", null, "_ingest._value", false, "_ingest._value")),
List.of(new AppendProcessor("_tag", null, template, (model) -> (List.of("added")), true))
List.of(new AppendProcessor("_tag", null, template, (model) -> (List.of("added")), null, true))
),
false
);
Expand Down
Loading