diff --git a/docs/reference/enrich-processor/append-processor.md b/docs/reference/enrich-processor/append-processor.md index 1059dfee92162..48b26788358d6 100644 --- a/docs/reference/enrich-processor/append-processor.md +++ b/docs/reference/enrich-processor/append-processor.md @@ -14,7 +14,8 @@ $$$append-options$$$ | Name | Required | Default | Description | | --- | --- | --- | --- | | `field` | yes | - | The field to be appended to. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). | -| `value` | yes | - | The value to be appended. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). | +| `value` | yes* | - | The value to be appended. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). May specify only one of `value` or `copy_from`. | +| `copy_from` | no | - | The source field to be append. Cannot set `value` simultaneously. | | `allow_duplicates` | no | true | If `false`, the processor does not appendvalues already present in the field. | | `media_type` | no | `application/json` | The media type for encoding `value`. Applies only when `value` is a[template snippet](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). Must be one of `application/json`, `text/plain`, or`application/x-www-form-urlencoded`. | | `description` | no | - | Description of the processor. Useful for describing the purpose of the processor or its configuration. | diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java index a812e6eef550a..50bbbaf5b9b12 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java @@ -21,6 +21,8 @@ import java.util.Map; +import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; + /** * Processor that appends value or values to existing lists. If the field is not present a new list holding the * provided values will be added. If the field is a scalar it will be converted to a single item list and the provided @@ -32,12 +34,21 @@ public final class AppendProcessor extends AbstractProcessor { private final TemplateScript.Factory field; private final ValueSource value; + private final String copyFrom; private final boolean allowDuplicates; - AppendProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, boolean allowDuplicates) { + AppendProcessor( + String tag, + String description, + TemplateScript.Factory field, + ValueSource value, + String copyFrom, + boolean allowDuplicates + ) { super(tag, description); this.field = field; this.value = value; + this.copyFrom = copyFrom; this.allowDuplicates = allowDuplicates; } @@ -52,7 +63,12 @@ public ValueSource getValue() { @Override public IngestDocument execute(IngestDocument document) throws Exception { String path = document.renderTemplate(field); - document.appendFieldValue(path, value, allowDuplicates); + if (copyFrom != null) { + Object fieldValue = document.getFieldValue(copyFrom, Object.class); + document.appendFieldValue(path, IngestDocument.deepCopy(fieldValue), allowDuplicates); + } else { + document.appendFieldValue(path, value, allowDuplicates); + } return document; } @@ -78,17 +94,27 @@ public AppendProcessor create( ProjectId projectId ) throws Exception { String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); - Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value"); + String copyFrom = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "copy_from"); + String mediaType = ConfigurationUtils.readMediaTypeProperty(TYPE, processorTag, config, "media_type", "application/json"); + ValueSource valueSource = null; + if (copyFrom == null) { + Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value"); + valueSource = ValueSource.wrap(value, scriptService, Map.of(Script.CONTENT_TYPE_OPTION, mediaType)); + } else { + Object value = config.remove("value"); + if (value != null) { + throw newConfigurationException( + TYPE, + processorTag, + "copy_from", + "cannot set both `copy_from` and `value` in the same processor" + ); + } + } boolean allowDuplicates = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "allow_duplicates", true); TemplateScript.Factory compiledTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", field, scriptService); - String mediaType = ConfigurationUtils.readMediaTypeProperty(TYPE, processorTag, config, "media_type", "application/json"); - return new AppendProcessor( - processorTag, - description, - compiledTemplate, - ValueSource.wrap(value, scriptService, Map.of(Script.CONTENT_TYPE_OPTION, mediaType)), - allowDuplicates - ); + + return new AppendProcessor(processorTag, description, compiledTemplate, valueSource, copyFrom, allowDuplicates); } } } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java index b7c4997e6dd6a..791f356b2f546 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -51,13 +52,13 @@ public void testAppendValuesToExistingList() throws Exception { if (randomBoolean()) { Object value = scalar.randomValue(); values.add(value); - appendProcessor = createAppendProcessor(field, value, true); + appendProcessor = createAppendProcessor(field, value, null, true); } else { int valuesSize = randomIntBetween(0, 10); for (int i = 0; i < valuesSize; i++) { values.add(scalar.randomValue()); } - appendProcessor = createAppendProcessor(field, values, true); + appendProcessor = createAppendProcessor(field, values, null, true); } appendProcessor.execute(ingestDocument); Object fieldValue = ingestDocument.getFieldValue(field, Object.class); @@ -80,13 +81,13 @@ public void testAppendValuesToNonExistingList() throws Exception { if (randomBoolean()) { Object value = scalar.randomValue(); values.add(value); - appendProcessor = createAppendProcessor(field, value, true); + appendProcessor = createAppendProcessor(field, value, null, true); } else { int valuesSize = randomIntBetween(0, 10); for (int i = 0; i < valuesSize; i++) { values.add(scalar.randomValue()); } - appendProcessor = createAppendProcessor(field, values, true); + appendProcessor = createAppendProcessor(field, values, null, true); } appendProcessor.execute(ingestDocument); List list = ingestDocument.getFieldValue(field, List.class); @@ -104,13 +105,13 @@ public void testConvertScalarToList() throws Exception { if (randomBoolean()) { Object value = scalar.randomValue(); values.add(value); - appendProcessor = createAppendProcessor(field, value, true); + appendProcessor = createAppendProcessor(field, value, null, true); } else { int valuesSize = randomIntBetween(0, 10); for (int i = 0; i < valuesSize; i++) { values.add(scalar.randomValue()); } - appendProcessor = createAppendProcessor(field, values, true); + appendProcessor = createAppendProcessor(field, values, null, true); } appendProcessor.execute(ingestDocument); List fieldValue = ingestDocument.getFieldValue(field, List.class); @@ -128,7 +129,7 @@ public void testAppendingDuplicateValueToScalarDoesNotModifyDocument() throws Ex List valuesToAppend = new ArrayList<>(); valuesToAppend.add(originalValue); - Processor appendProcessor = createAppendProcessor(field, valuesToAppend, false); + Processor appendProcessor = createAppendProcessor(field, valuesToAppend, null, false); appendProcessor.execute(ingestDocument); Object fieldValue = ingestDocument.getFieldValue(field, Object.class); assertThat(fieldValue, not(instanceOf(List.class))); @@ -143,7 +144,7 @@ public void testAppendingUniqueValueToScalar() throws Exception { List valuesToAppend = new ArrayList<>(); String newValue = randomValueOtherThan(originalValue, () -> randomAlphaOfLengthBetween(1, 10)); valuesToAppend.add(newValue); - Processor appendProcessor = createAppendProcessor(field, valuesToAppend, false); + Processor appendProcessor = createAppendProcessor(field, valuesToAppend, null, false); appendProcessor.execute(ingestDocument); List list = ingestDocument.getFieldValue(field, List.class); assertThat(list.size(), equalTo(2)); @@ -172,19 +173,149 @@ public void testAppendingToListWithDuplicatesDisallowed() throws Exception { Collections.sort(valuesToAppend); // attempt to append both new and existing values - Processor appendProcessor = createAppendProcessor(originalField, valuesToAppend, false); + Processor appendProcessor = createAppendProcessor(originalField, valuesToAppend, null, false); appendProcessor.execute(ingestDocument); List fieldValue = ingestDocument.getFieldValue(originalField, List.class); assertThat(fieldValue, sameInstance(list)); assertThat(fieldValue, containsInAnyOrder(expectedValues.toArray())); } - private static Processor createAppendProcessor(String fieldName, Object fieldValue, boolean allowDuplicates) { + public void testCopyFromOtherField() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + + // generate values, add some to a target field, the rest to a source field + int size = randomIntBetween(0, 10); + List allValues = Stream.generate(() -> randomAlphaOfLengthBetween(1, 10)).limit(size).collect(Collectors.toList()); + List originalValues = randomSubsetOf(allValues); + List additionalValues = new ArrayList<>(Sets.difference(new HashSet<>(allValues), new HashSet<>(originalValues))); + List targetFieldValue = new ArrayList<>(originalValues); + String targetField = RandomDocumentPicks.addRandomField(random(), ingestDocument, targetFieldValue); + String sourceField = RandomDocumentPicks.addRandomField(random(), ingestDocument, additionalValues); + + Processor appendProcessor = createAppendProcessor(targetField, null, sourceField, false); + appendProcessor.execute(ingestDocument); + List fieldValue = ingestDocument.getFieldValue(targetField, List.class); + assertThat(fieldValue, sameInstance(targetFieldValue)); + assertThat(fieldValue, containsInAnyOrder(allValues.toArray())); + } + + public void testCopyFromCopiesNonPrimitiveMutableTypes() throws Exception { + final String sourceField = "sourceField"; + final String targetField = "targetField"; + Processor processor = createAppendProcessor(targetField, null, sourceField, false); + + // map types + Map document = new HashMap<>(); + Map sourceMap = new HashMap<>(); + sourceMap.put("foo", "bar"); + document.put(sourceField, sourceMap); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + IngestDocument output = processor.execute(ingestDocument); + sourceMap.put("foo", "not-bar"); + Map outputMap = (Map) output.getFieldValue(targetField, List.class).getFirst(); + assertThat(outputMap.get("foo"), equalTo("bar")); + + // set types + document = new HashMap<>(); + Set sourceSet = randomUnique(() -> randomAlphaOfLength(5), 5); + Set preservedSet = new HashSet<>(sourceSet); + document.put(sourceField, sourceSet); + ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + processor.execute(ingestDocument); + sourceSet.add(randomValueOtherThanMany(sourceSet::contains, () -> randomAlphaOfLength(5))); + Set outputSet = (Set) ingestDocument.getFieldValue(targetField, List.class).getFirst(); + assertThat(outputSet, equalTo(preservedSet)); + + // list types (the outer list isn't used, but an inner list should be copied) + document = new HashMap<>(); + List sourceList = randomList(1, 5, () -> randomAlphaOfLength(5)); + List preservedList = new ArrayList<>(sourceList); + List> wrappedSourceList = List.of(sourceList); + document.put(sourceField, wrappedSourceList); + ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + processor.execute(ingestDocument); + sourceList.add(randomValueOtherThanMany(sourceList::contains, () -> randomAlphaOfLength(5))); + List unwrappedOutputList = (List) ingestDocument.getFieldValue(targetField, List.class).getFirst(); + assertThat(unwrappedOutputList, equalTo(preservedList)); + + // byte[] types + document = new HashMap<>(); + byte[] sourceBytes = randomByteArrayOfLength(10); + byte[] preservedBytes = new byte[sourceBytes.length]; + System.arraycopy(sourceBytes, 0, preservedBytes, 0, sourceBytes.length); + document.put(sourceField, sourceBytes); + ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + processor.execute(ingestDocument); + sourceBytes[0] = sourceBytes[0] == 0 ? (byte) 1 : (byte) 0; + byte[] outputBytes = (byte[]) ingestDocument.getFieldValue(targetField, List.class).getFirst(); + assertThat(outputBytes, equalTo(preservedBytes)); + + // Date types + document = new HashMap<>(); + Date sourceDate = new Date(); + Date preservedDate = new Date(sourceDate.getTime()); + document.put(sourceField, sourceDate); + ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + processor.execute(ingestDocument); + sourceDate.setTime(sourceDate.getTime() + 1); + Date outputDate = (Date) ingestDocument.getFieldValue(targetField, List.class).getFirst(); + assertThat(outputDate, equalTo(preservedDate)); + } + + public void testCopyFromDeepCopiesNonPrimitiveMutableTypes() throws Exception { + final String sourceField = "sourceField"; + final String targetField = "targetField"; + Processor processor = createAppendProcessor(targetField, null, sourceField, false); + Map document = new HashMap<>(); + + // a root map with values of map, set, list, bytes, date + Map sourceMap = new HashMap<>(); + sourceMap.put("foo", "bar"); + Set sourceSet = randomUnique(() -> randomAlphaOfLength(5), 5); + List sourceList = randomList(1, 5, () -> randomAlphaOfLength(5)); + byte[] sourceBytes = randomByteArrayOfLength(10); + Date sourceDate = new Date(); + Map root = new HashMap<>(); + root.put("foo", "bar"); + root.put("map", sourceMap); + root.put("set", sourceSet); + root.put("list", sourceList); + root.put("bytes", sourceBytes); + root.put("date", sourceDate); + + Set preservedSet = new HashSet<>(sourceSet); + List preservedList = new ArrayList<>(sourceList); + byte[] preservedBytes = new byte[sourceBytes.length]; + System.arraycopy(sourceBytes, 0, preservedBytes, 0, sourceBytes.length); + Date preservedDate = new Date(sourceDate.getTime()); + + document.put(sourceField, root); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + IngestDocument output = processor.execute(ingestDocument); + Map outputRoot = (Map) output.getFieldValue(targetField, List.class).getFirst(); + + root.put("foo", "not-bar"); + sourceMap.put("foo", "not-bar"); + sourceSet.add(randomValueOtherThanMany(sourceSet::contains, () -> randomAlphaOfLength(5))); + sourceList.add(randomValueOtherThanMany(sourceList::contains, () -> randomAlphaOfLength(5))); + sourceBytes[0] = sourceBytes[0] == 0 ? (byte) 1 : (byte) 0; + sourceDate.setTime(sourceDate.getTime() + 1); + + assertThat(outputRoot.get("foo"), equalTo("bar")); + assertThat(((Map) outputRoot.get("map")).get("foo"), equalTo("bar")); + assertThat(((Set) outputRoot.get("set")), equalTo(preservedSet)); + assertThat(((List) outputRoot.get("list")), equalTo(preservedList)); + assertThat(((byte[]) outputRoot.get("bytes")), equalTo(preservedBytes)); + assertThat(((Date) outputRoot.get("date")), equalTo(preservedDate)); + } + + private static Processor createAppendProcessor(String fieldName, Object fieldValue, String copyFrom, boolean allowDuplicates) { return new AppendProcessor( randomAlphaOfLength(10), null, new TestTemplateService.MockTemplateScript.Factory(fieldName), ValueSource.wrap(fieldValue, TestTemplateService.instance()), + copyFrom, allowDuplicates ); } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index c293edc73de99..f8013a4eebe73 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -207,7 +207,7 @@ public void testModifyFieldsOutsideArray() { new CompoundProcessor( false, List.of(new UppercaseProcessor("_tag_upper", null, "_ingest._value", false, "_ingest._value")), - List.of(new AppendProcessor("_tag", null, template, (model) -> (List.of("added")), true)) + List.of(new AppendProcessor("_tag", null, template, (model) -> (List.of("added")), null, true)) ), false );