Skip to content

Commit 5f7a2ed

Browse files
Add copy_from option to the Append processor (#132003)
1 parent 2364939 commit 5f7a2ed

File tree

8 files changed

+391
-25
lines changed

8 files changed

+391
-25
lines changed

docs/changelog/132003.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 132003
2+
summary: Add `copy_from` option to the Append processor
3+
area: Ingest Node
4+
type: enhancement
5+
issues: []

docs/reference/enrich-processor/append-processor.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@ $$$append-options$$$
1414
| Name | Required | Default | Description |
1515
| --- | --- | --- | --- |
1616
| `field` | yes | - | The field to be appended to. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). |
17-
| `value` | yes | - | The value to be appended. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). |
17+
| `value` | yes* | - | The value to be appended. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). May specify only one of `value` or `copy_from`. |
18+
| `copy_from` {applies_to}`stack: ga 9.2.0` | no | - | The origin field which will be appended to `field`, cannot set `value` simultaneously. |
1819
| `allow_duplicates` | no | true | If `false`, the processor does not appendvalues already present in the field. |
19-
| `media_type` | no | `application/json` | The media type for encoding `value`. Applies only when `value` is a[template snippet](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). Must be one of `application/json`, `text/plain`, or`application/x-www-form-urlencoded`. |
20+
| `media_type` | no | `application/json` | The media type for encoding `value`. Applies only when `value` is a [template snippet](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). Must be one of `application/json`, `text/plain`, or`application/x-www-form-urlencoded`. |
2021
| `description` | no | - | Description of the processor. Useful for describing the purpose of the processor or its configuration. |
2122
| `if` | no | - | Conditionally execute the processor. See [Conditionally run a processor](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#conditionally-run-processor). |
2223
| `ignore_failure` | no | `false` | Ignore failures for the processor. See [Handling pipeline failures](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#handling-pipeline-failures). |

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import java.util.Map;
2323

24+
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
25+
2426
/**
2527
* Processor that appends value or values to existing lists. If the field is not present a new list holding the
2628
* provided values will be added. If the field is a scalar it will be converted to a single item list and the provided
@@ -32,12 +34,21 @@ public final class AppendProcessor extends AbstractProcessor {
3234

3335
private final TemplateScript.Factory field;
3436
private final ValueSource value;
37+
private final String copyFrom;
3538
private final boolean allowDuplicates;
3639

37-
AppendProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, boolean allowDuplicates) {
40+
AppendProcessor(
41+
String tag,
42+
String description,
43+
TemplateScript.Factory field,
44+
ValueSource value,
45+
String copyFrom,
46+
boolean allowDuplicates
47+
) {
3848
super(tag, description);
3949
this.field = field;
4050
this.value = value;
51+
this.copyFrom = copyFrom;
4152
this.allowDuplicates = allowDuplicates;
4253
}
4354

@@ -49,10 +60,19 @@ public ValueSource getValue() {
4960
return value;
5061
}
5162

63+
public String getCopyFrom() {
64+
return copyFrom;
65+
}
66+
5267
@Override
5368
public IngestDocument execute(IngestDocument document) throws Exception {
5469
String path = document.renderTemplate(field);
55-
document.appendFieldValue(path, value, allowDuplicates);
70+
if (copyFrom != null) {
71+
Object fieldValue = document.getFieldValue(copyFrom, Object.class);
72+
document.appendFieldValue(path, IngestDocument.deepCopy(fieldValue), allowDuplicates);
73+
} else {
74+
document.appendFieldValue(path, value, allowDuplicates);
75+
}
5676
return document;
5777
}
5878

@@ -78,17 +98,27 @@ public AppendProcessor create(
7898
ProjectId projectId
7999
) throws Exception {
80100
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
81-
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
101+
String copyFrom = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "copy_from");
102+
String mediaType = ConfigurationUtils.readMediaTypeProperty(TYPE, processorTag, config, "media_type", "application/json");
103+
ValueSource valueSource = null;
104+
if (copyFrom == null) {
105+
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
106+
valueSource = ValueSource.wrap(value, scriptService, Map.of(Script.CONTENT_TYPE_OPTION, mediaType));
107+
} else {
108+
Object value = config.remove("value");
109+
if (value != null) {
110+
throw newConfigurationException(
111+
TYPE,
112+
processorTag,
113+
"copy_from",
114+
"cannot set both `copy_from` and `value` in the same processor"
115+
);
116+
}
117+
}
82118
boolean allowDuplicates = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "allow_duplicates", true);
83119
TemplateScript.Factory compiledTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", field, scriptService);
84-
String mediaType = ConfigurationUtils.readMediaTypeProperty(TYPE, processorTag, config, "media_type", "application/json");
85-
return new AppendProcessor(
86-
processorTag,
87-
description,
88-
compiledTemplate,
89-
ValueSource.wrap(value, scriptService, Map.of(Script.CONTENT_TYPE_OPTION, mediaType)),
90-
allowDuplicates
91-
);
120+
121+
return new AppendProcessor(processorTag, description, compiledTemplate, valueSource, copyFrom, allowDuplicates);
92122
}
93123
}
94124
}

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorFactoryTests.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,4 +123,28 @@ public void testMediaType() throws Exception {
123123
);
124124
assertThat(e.getMessage(), containsString("property does not contain a supported media type [" + expectedMediaType + "]"));
125125
}
126+
127+
public void testCreateWithCopyFrom() throws Exception {
128+
Map<String, Object> config = new HashMap<>();
129+
config.put("field", "field1");
130+
config.put("copy_from", "field2");
131+
String processorTag = randomAlphaOfLength(10);
132+
AppendProcessor appendProcessor = factory.create(null, processorTag, null, config, null);
133+
assertThat(appendProcessor.getTag(), equalTo(processorTag));
134+
assertThat(appendProcessor.getField().newInstance(Map.of()).execute(), equalTo("field1"));
135+
assertThat(appendProcessor.getCopyFrom(), equalTo("field2"));
136+
}
137+
138+
public void testCreateWithCopyFromAndValue() throws Exception {
139+
Map<String, Object> config = new HashMap<>();
140+
config.put("field", "field1");
141+
config.put("copy_from", "field2");
142+
config.put("value", "value1");
143+
String processorTag = randomAlphaOfLength(10);
144+
ElasticsearchException exception = expectThrows(
145+
ElasticsearchException.class,
146+
() -> factory.create(null, processorTag, null, config, null)
147+
);
148+
assertThat(exception.getMessage(), equalTo("[copy_from] cannot set both `copy_from` and `value` in the same processor"));
149+
}
126150
}

0 commit comments

Comments
 (0)