From 2413f1a02fbf7c0d3ce9841281ba51ab732c55a6 Mon Sep 17 00:00:00 2001 From: Chris Berkhout Date: Fri, 25 Jul 2025 17:56:05 +0200 Subject: [PATCH 01/12] Add copy_from option to append processor. --- .../ingest/common/AppendProcessor.java | 43 ++++- .../ingest/common/AppendProcessorTests.java | 151 ++++++++++++++++-- .../ingest/common/ForEachProcessorTests.java | 2 +- 3 files changed, 180 insertions(+), 16 deletions(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java index a812e6eef550a..0b17f5696f862 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java @@ -21,6 +21,8 @@ import java.util.Map; +import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; + /** * Processor that appends value or values to existing lists. If the field is not present a new list holding the * provided values will be added. If the field is a scalar it will be converted to a single item list and the provided @@ -32,12 +34,21 @@ public final class AppendProcessor extends AbstractProcessor { private final TemplateScript.Factory field; private final ValueSource value; + private final String copyFrom; private final boolean allowDuplicates; - AppendProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, boolean allowDuplicates) { + AppendProcessor( + String tag, + String description, + TemplateScript.Factory field, + ValueSource value, + String copyFrom, + boolean allowDuplicates + ) { super(tag, description); this.field = field; this.value = value; + this.copyFrom = copyFrom; this.allowDuplicates = allowDuplicates; } @@ -52,7 +63,12 @@ public ValueSource getValue() { @Override public IngestDocument execute(IngestDocument document) throws Exception { String path = document.renderTemplate(field); - document.appendFieldValue(path, value, allowDuplicates); + if (copyFrom != null) { + Object fieldValue = document.getFieldValue(copyFrom, Object.class); + document.appendFieldValue(path, IngestDocument.deepCopy(fieldValue), allowDuplicates); + } else { + document.appendFieldValue(path, value, allowDuplicates); + } return document; } @@ -78,15 +94,32 @@ public AppendProcessor create( ProjectId projectId ) throws Exception { String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); - Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value"); + String copyFrom = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "copy_from"); + String mediaType = ConfigurationUtils.readMediaTypeProperty(TYPE, processorTag, config, "media_type", "application/json"); + ValueSource valueSource = null; + if (copyFrom == null) { + Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value"); + valueSource = ValueSource.wrap(value, scriptService, Map.of(Script.CONTENT_TYPE_OPTION, mediaType)); + } else { + Object value = config.remove("value"); + if (value != null) { + throw newConfigurationException( + TYPE, + processorTag, + "copy_from", + "cannot set both `copy_from` and `value` in the same processor" + ); + } + } boolean allowDuplicates = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "allow_duplicates", true); TemplateScript.Factory compiledTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", field, scriptService); - String mediaType = ConfigurationUtils.readMediaTypeProperty(TYPE, processorTag, config, "media_type", "application/json"); + return new AppendProcessor( processorTag, description, compiledTemplate, - ValueSource.wrap(value, scriptService, Map.of(Script.CONTENT_TYPE_OPTION, mediaType)), + valueSource, + copyFrom, allowDuplicates ); } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java index b7c4997e6dd6a..791f356b2f546 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -51,13 +52,13 @@ public void testAppendValuesToExistingList() throws Exception { if (randomBoolean()) { Object value = scalar.randomValue(); values.add(value); - appendProcessor = createAppendProcessor(field, value, true); + appendProcessor = createAppendProcessor(field, value, null, true); } else { int valuesSize = randomIntBetween(0, 10); for (int i = 0; i < valuesSize; i++) { values.add(scalar.randomValue()); } - appendProcessor = createAppendProcessor(field, values, true); + appendProcessor = createAppendProcessor(field, values, null, true); } appendProcessor.execute(ingestDocument); Object fieldValue = ingestDocument.getFieldValue(field, Object.class); @@ -80,13 +81,13 @@ public void testAppendValuesToNonExistingList() throws Exception { if (randomBoolean()) { Object value = scalar.randomValue(); values.add(value); - appendProcessor = createAppendProcessor(field, value, true); + appendProcessor = createAppendProcessor(field, value, null, true); } else { int valuesSize = randomIntBetween(0, 10); for (int i = 0; i < valuesSize; i++) { values.add(scalar.randomValue()); } - appendProcessor = createAppendProcessor(field, values, true); + appendProcessor = createAppendProcessor(field, values, null, true); } appendProcessor.execute(ingestDocument); List list = ingestDocument.getFieldValue(field, List.class); @@ -104,13 +105,13 @@ public void testConvertScalarToList() throws Exception { if (randomBoolean()) { Object value = scalar.randomValue(); values.add(value); - appendProcessor = createAppendProcessor(field, value, true); + appendProcessor = createAppendProcessor(field, value, null, true); } else { int valuesSize = randomIntBetween(0, 10); for (int i = 0; i < valuesSize; i++) { values.add(scalar.randomValue()); } - appendProcessor = createAppendProcessor(field, values, true); + appendProcessor = createAppendProcessor(field, values, null, true); } appendProcessor.execute(ingestDocument); List fieldValue = ingestDocument.getFieldValue(field, List.class); @@ -128,7 +129,7 @@ public void testAppendingDuplicateValueToScalarDoesNotModifyDocument() throws Ex List valuesToAppend = new ArrayList<>(); valuesToAppend.add(originalValue); - Processor appendProcessor = createAppendProcessor(field, valuesToAppend, false); + Processor appendProcessor = createAppendProcessor(field, valuesToAppend, null, false); appendProcessor.execute(ingestDocument); Object fieldValue = ingestDocument.getFieldValue(field, Object.class); assertThat(fieldValue, not(instanceOf(List.class))); @@ -143,7 +144,7 @@ public void testAppendingUniqueValueToScalar() throws Exception { List valuesToAppend = new ArrayList<>(); String newValue = randomValueOtherThan(originalValue, () -> randomAlphaOfLengthBetween(1, 10)); valuesToAppend.add(newValue); - Processor appendProcessor = createAppendProcessor(field, valuesToAppend, false); + Processor appendProcessor = createAppendProcessor(field, valuesToAppend, null, false); appendProcessor.execute(ingestDocument); List list = ingestDocument.getFieldValue(field, List.class); assertThat(list.size(), equalTo(2)); @@ -172,19 +173,149 @@ public void testAppendingToListWithDuplicatesDisallowed() throws Exception { Collections.sort(valuesToAppend); // attempt to append both new and existing values - Processor appendProcessor = createAppendProcessor(originalField, valuesToAppend, false); + Processor appendProcessor = createAppendProcessor(originalField, valuesToAppend, null, false); appendProcessor.execute(ingestDocument); List fieldValue = ingestDocument.getFieldValue(originalField, List.class); assertThat(fieldValue, sameInstance(list)); assertThat(fieldValue, containsInAnyOrder(expectedValues.toArray())); } - private static Processor createAppendProcessor(String fieldName, Object fieldValue, boolean allowDuplicates) { + public void testCopyFromOtherField() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + + // generate values, add some to a target field, the rest to a source field + int size = randomIntBetween(0, 10); + List allValues = Stream.generate(() -> randomAlphaOfLengthBetween(1, 10)).limit(size).collect(Collectors.toList()); + List originalValues = randomSubsetOf(allValues); + List additionalValues = new ArrayList<>(Sets.difference(new HashSet<>(allValues), new HashSet<>(originalValues))); + List targetFieldValue = new ArrayList<>(originalValues); + String targetField = RandomDocumentPicks.addRandomField(random(), ingestDocument, targetFieldValue); + String sourceField = RandomDocumentPicks.addRandomField(random(), ingestDocument, additionalValues); + + Processor appendProcessor = createAppendProcessor(targetField, null, sourceField, false); + appendProcessor.execute(ingestDocument); + List fieldValue = ingestDocument.getFieldValue(targetField, List.class); + assertThat(fieldValue, sameInstance(targetFieldValue)); + assertThat(fieldValue, containsInAnyOrder(allValues.toArray())); + } + + public void testCopyFromCopiesNonPrimitiveMutableTypes() throws Exception { + final String sourceField = "sourceField"; + final String targetField = "targetField"; + Processor processor = createAppendProcessor(targetField, null, sourceField, false); + + // map types + Map document = new HashMap<>(); + Map sourceMap = new HashMap<>(); + sourceMap.put("foo", "bar"); + document.put(sourceField, sourceMap); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + IngestDocument output = processor.execute(ingestDocument); + sourceMap.put("foo", "not-bar"); + Map outputMap = (Map) output.getFieldValue(targetField, List.class).getFirst(); + assertThat(outputMap.get("foo"), equalTo("bar")); + + // set types + document = new HashMap<>(); + Set sourceSet = randomUnique(() -> randomAlphaOfLength(5), 5); + Set preservedSet = new HashSet<>(sourceSet); + document.put(sourceField, sourceSet); + ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + processor.execute(ingestDocument); + sourceSet.add(randomValueOtherThanMany(sourceSet::contains, () -> randomAlphaOfLength(5))); + Set outputSet = (Set) ingestDocument.getFieldValue(targetField, List.class).getFirst(); + assertThat(outputSet, equalTo(preservedSet)); + + // list types (the outer list isn't used, but an inner list should be copied) + document = new HashMap<>(); + List sourceList = randomList(1, 5, () -> randomAlphaOfLength(5)); + List preservedList = new ArrayList<>(sourceList); + List> wrappedSourceList = List.of(sourceList); + document.put(sourceField, wrappedSourceList); + ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + processor.execute(ingestDocument); + sourceList.add(randomValueOtherThanMany(sourceList::contains, () -> randomAlphaOfLength(5))); + List unwrappedOutputList = (List) ingestDocument.getFieldValue(targetField, List.class).getFirst(); + assertThat(unwrappedOutputList, equalTo(preservedList)); + + // byte[] types + document = new HashMap<>(); + byte[] sourceBytes = randomByteArrayOfLength(10); + byte[] preservedBytes = new byte[sourceBytes.length]; + System.arraycopy(sourceBytes, 0, preservedBytes, 0, sourceBytes.length); + document.put(sourceField, sourceBytes); + ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + processor.execute(ingestDocument); + sourceBytes[0] = sourceBytes[0] == 0 ? (byte) 1 : (byte) 0; + byte[] outputBytes = (byte[]) ingestDocument.getFieldValue(targetField, List.class).getFirst(); + assertThat(outputBytes, equalTo(preservedBytes)); + + // Date types + document = new HashMap<>(); + Date sourceDate = new Date(); + Date preservedDate = new Date(sourceDate.getTime()); + document.put(sourceField, sourceDate); + ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + processor.execute(ingestDocument); + sourceDate.setTime(sourceDate.getTime() + 1); + Date outputDate = (Date) ingestDocument.getFieldValue(targetField, List.class).getFirst(); + assertThat(outputDate, equalTo(preservedDate)); + } + + public void testCopyFromDeepCopiesNonPrimitiveMutableTypes() throws Exception { + final String sourceField = "sourceField"; + final String targetField = "targetField"; + Processor processor = createAppendProcessor(targetField, null, sourceField, false); + Map document = new HashMap<>(); + + // a root map with values of map, set, list, bytes, date + Map sourceMap = new HashMap<>(); + sourceMap.put("foo", "bar"); + Set sourceSet = randomUnique(() -> randomAlphaOfLength(5), 5); + List sourceList = randomList(1, 5, () -> randomAlphaOfLength(5)); + byte[] sourceBytes = randomByteArrayOfLength(10); + Date sourceDate = new Date(); + Map root = new HashMap<>(); + root.put("foo", "bar"); + root.put("map", sourceMap); + root.put("set", sourceSet); + root.put("list", sourceList); + root.put("bytes", sourceBytes); + root.put("date", sourceDate); + + Set preservedSet = new HashSet<>(sourceSet); + List preservedList = new ArrayList<>(sourceList); + byte[] preservedBytes = new byte[sourceBytes.length]; + System.arraycopy(sourceBytes, 0, preservedBytes, 0, sourceBytes.length); + Date preservedDate = new Date(sourceDate.getTime()); + + document.put(sourceField, root); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + IngestDocument output = processor.execute(ingestDocument); + Map outputRoot = (Map) output.getFieldValue(targetField, List.class).getFirst(); + + root.put("foo", "not-bar"); + sourceMap.put("foo", "not-bar"); + sourceSet.add(randomValueOtherThanMany(sourceSet::contains, () -> randomAlphaOfLength(5))); + sourceList.add(randomValueOtherThanMany(sourceList::contains, () -> randomAlphaOfLength(5))); + sourceBytes[0] = sourceBytes[0] == 0 ? (byte) 1 : (byte) 0; + sourceDate.setTime(sourceDate.getTime() + 1); + + assertThat(outputRoot.get("foo"), equalTo("bar")); + assertThat(((Map) outputRoot.get("map")).get("foo"), equalTo("bar")); + assertThat(((Set) outputRoot.get("set")), equalTo(preservedSet)); + assertThat(((List) outputRoot.get("list")), equalTo(preservedList)); + assertThat(((byte[]) outputRoot.get("bytes")), equalTo(preservedBytes)); + assertThat(((Date) outputRoot.get("date")), equalTo(preservedDate)); + } + + private static Processor createAppendProcessor(String fieldName, Object fieldValue, String copyFrom, boolean allowDuplicates) { return new AppendProcessor( randomAlphaOfLength(10), null, new TestTemplateService.MockTemplateScript.Factory(fieldName), ValueSource.wrap(fieldValue, TestTemplateService.instance()), + copyFrom, allowDuplicates ); } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index c293edc73de99..f8013a4eebe73 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -207,7 +207,7 @@ public void testModifyFieldsOutsideArray() { new CompoundProcessor( false, List.of(new UppercaseProcessor("_tag_upper", null, "_ingest._value", false, "_ingest._value")), - List.of(new AppendProcessor("_tag", null, template, (model) -> (List.of("added")), true)) + List.of(new AppendProcessor("_tag", null, template, (model) -> (List.of("added")), null, true)) ), false ); From 7d6e748b8375f3d0eb0cf11720576de10a8e897b Mon Sep 17 00:00:00 2001 From: Chris Berkhout Date: Fri, 25 Jul 2025 17:56:19 +0200 Subject: [PATCH 02/12] Update documentation to add the copy_from option. --- docs/reference/enrich-processor/append-processor.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/reference/enrich-processor/append-processor.md b/docs/reference/enrich-processor/append-processor.md index 1059dfee92162..554fc11d151fd 100644 --- a/docs/reference/enrich-processor/append-processor.md +++ b/docs/reference/enrich-processor/append-processor.md @@ -14,7 +14,8 @@ $$$append-options$$$ | Name | Required | Default | Description | | --- | --- | --- | --- | | `field` | yes | - | The field to be appended to. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). | -| `value` | yes | - | The value to be appended. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). | +| `value` | yes* | - | The value to be appended. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). May specify only one of `value` or `copy_from`. | +| `copy_from` | no | - | The source field to be append. Cannot set `value` simultaneously. Supported data types are `boolean`, `number`, `array`, `object`, `string`, `date`, etc. | | `allow_duplicates` | no | true | If `false`, the processor does not appendvalues already present in the field. | | `media_type` | no | `application/json` | The media type for encoding `value`. Applies only when `value` is a[template snippet](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). Must be one of `application/json`, `text/plain`, or`application/x-www-form-urlencoded`. | | `description` | no | - | Description of the processor. Useful for describing the purpose of the processor or its configuration. | From a0b25a8ca529ab4878643db63bc7ec5573e7e9ae Mon Sep 17 00:00:00 2001 From: Chris Berkhout Date: Sat, 26 Jul 2025 08:52:48 +0200 Subject: [PATCH 03/12] Fix style. --- .../org/elasticsearch/ingest/common/AppendProcessor.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java index 0b17f5696f862..50bbbaf5b9b12 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java @@ -114,14 +114,7 @@ public AppendProcessor create( boolean allowDuplicates = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "allow_duplicates", true); TemplateScript.Factory compiledTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", field, scriptService); - return new AppendProcessor( - processorTag, - description, - compiledTemplate, - valueSource, - copyFrom, - allowDuplicates - ); + return new AppendProcessor(processorTag, description, compiledTemplate, valueSource, copyFrom, allowDuplicates); } } } From 7ecaab5e552adee029ba9be75bf1c4eb032da950 Mon Sep 17 00:00:00 2001 From: Chris Berkhout Date: Tue, 29 Jul 2025 17:35:44 +0200 Subject: [PATCH 04/12] Docs: don't mention an incomplete list of supported types. It mentioned 6 types, the full list is 15: Map, List, Set, byte[], double[][], double[], null, String, Integer, Long, Float, Double, Boolean, ZonedDateTime, Date. --- docs/reference/enrich-processor/append-processor.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/enrich-processor/append-processor.md b/docs/reference/enrich-processor/append-processor.md index 554fc11d151fd..48b26788358d6 100644 --- a/docs/reference/enrich-processor/append-processor.md +++ b/docs/reference/enrich-processor/append-processor.md @@ -15,7 +15,7 @@ $$$append-options$$$ | --- | --- | --- | --- | | `field` | yes | - | The field to be appended to. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). | | `value` | yes* | - | The value to be appended. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). May specify only one of `value` or `copy_from`. | -| `copy_from` | no | - | The source field to be append. Cannot set `value` simultaneously. Supported data types are `boolean`, `number`, `array`, `object`, `string`, `date`, etc. | +| `copy_from` | no | - | The source field to be append. Cannot set `value` simultaneously. | | `allow_duplicates` | no | true | If `false`, the processor does not appendvalues already present in the field. | | `media_type` | no | `application/json` | The media type for encoding `value`. Applies only when `value` is a[template snippet](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). Must be one of `application/json`, `text/plain`, or`application/x-www-form-urlencoded`. | | `description` | no | - | Description of the processor. Useful for describing the purpose of the processor or its configuration. | From 893692ca76ab147c558448a503a96c6d59941cc4 Mon Sep 17 00:00:00 2001 From: Chris Berkhout Date: Tue, 2 Sep 2025 19:17:19 +0200 Subject: [PATCH 05/12] Update docs/changelog/132003.yaml --- docs/changelog/132003.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/132003.yaml diff --git a/docs/changelog/132003.yaml b/docs/changelog/132003.yaml new file mode 100644 index 0000000000000..7c6698585cbb5 --- /dev/null +++ b/docs/changelog/132003.yaml @@ -0,0 +1,5 @@ +pr: 132003 +summary: Add `copy_from` option to the Append processor +area: Ingest Node +type: enhancement +issues: [] From ea97b399a72c2ce5f96ad492969887471d7d059d Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 2 Sep 2025 14:31:15 -0400 Subject: [PATCH 06/12] Fix a typo (missing a space) --- docs/reference/enrich-processor/append-processor.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/enrich-processor/append-processor.md b/docs/reference/enrich-processor/append-processor.md index 48b26788358d6..9bde6a3c69f4e 100644 --- a/docs/reference/enrich-processor/append-processor.md +++ b/docs/reference/enrich-processor/append-processor.md @@ -17,7 +17,7 @@ $$$append-options$$$ | `value` | yes* | - | The value to be appended. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). May specify only one of `value` or `copy_from`. | | `copy_from` | no | - | The source field to be append. Cannot set `value` simultaneously. | | `allow_duplicates` | no | true | If `false`, the processor does not appendvalues already present in the field. | -| `media_type` | no | `application/json` | The media type for encoding `value`. Applies only when `value` is a[template snippet](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). Must be one of `application/json`, `text/plain`, or`application/x-www-form-urlencoded`. | +| `media_type` | no | `application/json` | The media type for encoding `value`. Applies only when `value` is a [template snippet](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). Must be one of `application/json`, `text/plain`, or`application/x-www-form-urlencoded`. | | `description` | no | - | Description of the processor. Useful for describing the purpose of the processor or its configuration. | | `if` | no | - | Conditionally execute the processor. See [Conditionally run a processor](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#conditionally-run-processor). | | `ignore_failure` | no | `false` | Ignore failures for the processor. See [Handling pipeline failures](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#handling-pipeline-failures). | From 0555140ef9123a0b8aec31e7bd0723819f681877 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 2 Sep 2025 14:34:31 -0400 Subject: [PATCH 07/12] Reword this slightly To make it more similar to the equivalent docs from the `set` processor. --- docs/reference/enrich-processor/append-processor.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/enrich-processor/append-processor.md b/docs/reference/enrich-processor/append-processor.md index 9bde6a3c69f4e..f5cc42c62bcb9 100644 --- a/docs/reference/enrich-processor/append-processor.md +++ b/docs/reference/enrich-processor/append-processor.md @@ -15,7 +15,7 @@ $$$append-options$$$ | --- | --- | --- | --- | | `field` | yes | - | The field to be appended to. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). | | `value` | yes* | - | The value to be appended. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). May specify only one of `value` or `copy_from`. | -| `copy_from` | no | - | The source field to be append. Cannot set `value` simultaneously. | +| `copy_from` | no | - | The origin field which will be appended to `field`, cannot set `value` simultaneously. | | `allow_duplicates` | no | true | If `false`, the processor does not appendvalues already present in the field. | | `media_type` | no | `application/json` | The media type for encoding `value`. Applies only when `value` is a [template snippet](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). Must be one of `application/json`, `text/plain`, or`application/x-www-form-urlencoded`. | | `description` | no | - | Description of the processor. Useful for describing the purpose of the processor or its configuration. | From ac4b876e5dcb895e589c78245f54892038c427ee Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 2 Sep 2025 14:38:45 -0400 Subject: [PATCH 08/12] Add an applies_to tag for the copy_from field --- docs/reference/enrich-processor/append-processor.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/enrich-processor/append-processor.md b/docs/reference/enrich-processor/append-processor.md index f5cc42c62bcb9..e709abeda2189 100644 --- a/docs/reference/enrich-processor/append-processor.md +++ b/docs/reference/enrich-processor/append-processor.md @@ -15,7 +15,7 @@ $$$append-options$$$ | --- | --- | --- | --- | | `field` | yes | - | The field to be appended to. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). | | `value` | yes* | - | The value to be appended. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). May specify only one of `value` or `copy_from`. | -| `copy_from` | no | - | The origin field which will be appended to `field`, cannot set `value` simultaneously. | +| `copy_from` {applies_to}`stack: ga 9.2.0` | no | - | The origin field which will be appended to `field`, cannot set `value` simultaneously. | | `allow_duplicates` | no | true | If `false`, the processor does not appendvalues already present in the field. | | `media_type` | no | `application/json` | The media type for encoding `value`. Applies only when `value` is a [template snippet](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). Must be one of `application/json`, `text/plain`, or`application/x-www-form-urlencoded`. | | `description` | no | - | Description of the processor. Useful for describing the purpose of the processor or its configuration. | From 7e132acb6ad1e0ac5fa42c1fe47584c9637809d3 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 2 Sep 2025 15:06:49 -0400 Subject: [PATCH 09/12] Add factory unit tests --- .../ingest/common/AppendProcessor.java | 4 ++++ .../common/AppendProcessorFactoryTests.java | 24 +++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java index 50bbbaf5b9b12..d27d6a73824b4 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java @@ -60,6 +60,10 @@ public ValueSource getValue() { return value; } + public String getCopyFrom() { + return copyFrom; + } + @Override public IngestDocument execute(IngestDocument document) throws Exception { String path = document.renderTemplate(field); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorFactoryTests.java index 565af0584efe6..77a20838fd1b0 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorFactoryTests.java @@ -123,4 +123,28 @@ public void testMediaType() throws Exception { ); assertThat(e.getMessage(), containsString("property does not contain a supported media type [" + expectedMediaType + "]")); } + + public void testCreateWithCopyFrom() throws Exception { + Map config = new HashMap<>(); + config.put("field", "field1"); + config.put("copy_from", "field2"); + String processorTag = randomAlphaOfLength(10); + AppendProcessor appendProcessor = factory.create(null, processorTag, null, config, null); + assertThat(appendProcessor.getTag(), equalTo(processorTag)); + assertThat(appendProcessor.getField().newInstance(Map.of()).execute(), equalTo("field1")); + assertThat(appendProcessor.getCopyFrom(), equalTo("field2")); + } + + public void testCreateWithCopyFromAndValue() throws Exception { + Map config = new HashMap<>(); + config.put("field", "field1"); + config.put("copy_from", "field2"); + config.put("value", "value1"); + String processorTag = randomAlphaOfLength(10); + ElasticsearchException exception = expectThrows( + ElasticsearchException.class, + () -> factory.create(null, processorTag, null, config, null) + ); + assertThat(exception.getMessage(), equalTo("[copy_from] cannot set both `copy_from` and `value` in the same processor")); + } } From 9535d30bfe7940fd0a5beaf083734a5871b1c0a8 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 2 Sep 2025 15:15:37 -0400 Subject: [PATCH 10/12] Ensure the values are unique for this test --- .../org/elasticsearch/ingest/common/AppendProcessorTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java index 791f356b2f546..54ef1427d29b7 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java @@ -185,7 +185,7 @@ public void testCopyFromOtherField() throws Exception { // generate values, add some to a target field, the rest to a source field int size = randomIntBetween(0, 10); - List allValues = Stream.generate(() -> randomAlphaOfLengthBetween(1, 10)).limit(size).collect(Collectors.toList()); + Set allValues = Stream.generate(() -> randomAlphaOfLengthBetween(1, 10)).limit(size).collect(Collectors.toSet()); List originalValues = randomSubsetOf(allValues); List additionalValues = new ArrayList<>(Sets.difference(new HashSet<>(allValues), new HashSet<>(originalValues))); List targetFieldValue = new ArrayList<>(originalValues); From 76aae757fb2610d68fd4c5b0ec25b26a15f528a7 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 2 Sep 2025 15:24:29 -0400 Subject: [PATCH 11/12] Add a simple test --- .../ingest/common/AppendProcessorTests.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java index 54ef1427d29b7..cee2e919e6b7c 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java @@ -180,6 +180,23 @@ public void testAppendingToListWithDuplicatesDisallowed() throws Exception { assertThat(fieldValue, containsInAnyOrder(expectedValues.toArray())); } + public void testCopyFromOtherFieldSimple() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + ingestDocument.setFieldValue("foo", 1); + ingestDocument.setFieldValue("bar", 2); + ingestDocument.setFieldValue("baz", new ArrayList<>(List.of(3))); + + createAppendProcessor("bar", null, "foo", false).execute(ingestDocument); + createAppendProcessor("baz", null, "bar", false).execute(ingestDocument); + createAppendProcessor("quux", null, "baz", false).execute(ingestDocument); + + Map result = ingestDocument.getCtxMap().getSource(); + assertThat(result.get("foo"), equalTo(1)); + assertThat(result.get("bar"), equalTo(List.of(2, 1))); + assertThat(result.get("baz"), equalTo(List.of(3, 2, 1))); + assertThat(result.get("quux"), equalTo(List.of(3, 2, 1))); + } + public void testCopyFromOtherField() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); From c62b9139a8f74310e4c873d0f133a8ae59efb8cd Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Thu, 4 Sep 2025 13:16:26 -0400 Subject: [PATCH 12/12] Add rest tests --- .../test/ingest/350_append_copy_from.yml | 157 ++++++++++++++++++ .../elasticsearch/ingest/IngestFeatures.java | 3 +- 2 files changed, 159 insertions(+), 1 deletion(-) create mode 100644 modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/350_append_copy_from.yml diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/350_append_copy_from.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/350_append_copy_from.yml new file mode 100644 index 0000000000000..469e2cdd40f1f --- /dev/null +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/350_append_copy_from.yml @@ -0,0 +1,157 @@ +--- +setup: + - requires: + cluster_features: [ "ingest.append.copy_from" ] + reason: "The copy_from option of the append processor is new" + - do: + ingest.put_pipeline: + id: "test-pipeline-1" + body: > + { + "processors": [ + { + "append": { + "field": "dest", + "copy_from": "src" + } + } + ] + } + - do: + indices.create: + index: "test-some-index" + +--- +teardown: + - do: + indices.delete: + index: "test-some-index" + ignore_unavailable: true + - do: + ingest.delete_pipeline: + id: "test-pipeline-1" + ignore: 404 + +--- +"It is not permitted to have both copy_from and value": + + - do: + ingest.put_pipeline: + id: "test-pipeline-fail" + body: > + { + "processors": [ + { + "append": { + "tag": "some_tag", + "field": "dest", + "copy_from": "src", + "value": "uh_oh" + } + } + ] + } + catch: bad_request + - match: { status: 400 } + - match: { error.reason: "[copy_from] cannot set both `copy_from` and `value` in the same processor" } + - match: { error.property_name: "copy_from" } + - match: { error.processor_type: "append" } + - match: { error.processor_tag: "some_tag" } + +--- +"Simple value into an absent value": + - do: + index: + index: test-some-index + id: 1 + pipeline: test-pipeline-1 + body: > + { + "src": 1 + } + + - do: + get: + index: test-some-index + id: "1" + - match: { _source.src: 1 } + - match: { _source.dest: [1] } + +--- +"Simple value into a present simple value": + - do: + index: + index: test-some-index + id: 1 + pipeline: test-pipeline-1 + body: > + { + "src": 2, + "dest": 1 + } + + - do: + get: + index: test-some-index + id: "1" + - match: { _source.src: 2 } + - match: { _source.dest: [1, 2] } + +--- +"Simple value into a present array": + - do: + index: + index: test-some-index + id: 1 + pipeline: test-pipeline-1 + body: > + { + "src": 3, + "dest": [1, 2] + } + + - do: + get: + index: test-some-index + id: "1" + - match: { _source.src: 3 } + - match: { _source.dest: [1, 2, 3] } + +--- +"Array into an absent value": + - do: + index: + index: test-some-index + id: 1 + pipeline: test-pipeline-1 + body: > + { + "src": [1, 2] + } + + - do: + get: + index: test-some-index + id: "1" + - match: { _source.src: [1, 2] } + - match: { _source.dest: [1, 2] } + +--- +"Array into a present array": + - do: + index: + index: test-some-index + id: 1 + pipeline: test-pipeline-1 + body: > + { + "src": [2], + "dest": [1] + } + + - do: + get: + index: test-some-index + id: "1" + - match: { _source.src: [2] } + - match: { _source.dest: [1, 2] } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestFeatures.java b/server/src/main/java/org/elasticsearch/ingest/IngestFeatures.java index 9057040b67f06..38ec1c401148c 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestFeatures.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestFeatures.java @@ -17,6 +17,7 @@ public class IngestFeatures implements FeatureSpecification { private static final NodeFeature SIMULATE_INGEST_400_ON_FAILURE = new NodeFeature("simulate.ingest.400_on_failure", true); + private static final NodeFeature INGEST_APPEND_COPY_FROM = new NodeFeature("ingest.append.copy_from", true); @Override public Set getFeatures() { @@ -29,6 +30,6 @@ public Set getFeatures() { @Override public Set getTestFeatures() { - return Set.of(SIMULATE_INGEST_400_ON_FAILURE); + return Set.of(SIMULATE_INGEST_400_ON_FAILURE, INGEST_APPEND_COPY_FROM); } }