Skip to content

Commit b75cce3

Browse files
committed
Add copy_from option to append processor.
1 parent 6bf55e4 commit b75cce3

File tree

3 files changed

+180
-16
lines changed

3 files changed

+180
-16
lines changed

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import java.util.Map;
2323

24+
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
25+
2426
/**
2527
* Processor that appends value or values to existing lists. If the field is not present a new list holding the
2628
* provided values will be added. If the field is a scalar it will be converted to a single item list and the provided
@@ -32,12 +34,21 @@ public final class AppendProcessor extends AbstractProcessor {
3234

3335
private final TemplateScript.Factory field;
3436
private final ValueSource value;
37+
private final String copyFrom;
3538
private final boolean allowDuplicates;
3639

37-
AppendProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, boolean allowDuplicates) {
40+
AppendProcessor(
41+
String tag,
42+
String description,
43+
TemplateScript.Factory field,
44+
ValueSource value,
45+
String copyFrom,
46+
boolean allowDuplicates
47+
) {
3848
super(tag, description);
3949
this.field = field;
4050
this.value = value;
51+
this.copyFrom = copyFrom;
4152
this.allowDuplicates = allowDuplicates;
4253
}
4354

@@ -52,7 +63,12 @@ public ValueSource getValue() {
5263
@Override
5364
public IngestDocument execute(IngestDocument document) throws Exception {
5465
String path = document.renderTemplate(field);
55-
document.appendFieldValue(path, value, allowDuplicates);
66+
if (copyFrom != null) {
67+
Object fieldValue = document.getFieldValue(copyFrom, Object.class);
68+
document.appendFieldValue(path, IngestDocument.deepCopy(fieldValue), allowDuplicates);
69+
} else {
70+
document.appendFieldValue(path, value, allowDuplicates);
71+
}
5672
return document;
5773
}
5874

@@ -78,15 +94,32 @@ public AppendProcessor create(
7894
ProjectId projectId
7995
) throws Exception {
8096
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
81-
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
97+
String copyFrom = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "copy_from");
98+
String mediaType = ConfigurationUtils.readMediaTypeProperty(TYPE, processorTag, config, "media_type", "application/json");
99+
ValueSource valueSource = null;
100+
if (copyFrom == null) {
101+
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
102+
valueSource = ValueSource.wrap(value, scriptService, Map.of(Script.CONTENT_TYPE_OPTION, mediaType));
103+
} else {
104+
Object value = config.remove("value");
105+
if (value != null) {
106+
throw newConfigurationException(
107+
TYPE,
108+
processorTag,
109+
"copy_from",
110+
"cannot set both `copy_from` and `value` in the same processor"
111+
);
112+
}
113+
}
82114
boolean allowDuplicates = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "allow_duplicates", true);
83115
TemplateScript.Factory compiledTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", field, scriptService);
84-
String mediaType = ConfigurationUtils.readMediaTypeProperty(TYPE, processorTag, config, "media_type", "application/json");
116+
85117
return new AppendProcessor(
86118
processorTag,
87119
description,
88120
compiledTemplate,
89-
ValueSource.wrap(value, scriptService, Map.of(Script.CONTENT_TYPE_OPTION, mediaType)),
121+
valueSource,
122+
copyFrom,
90123
allowDuplicates
91124
);
92125
}

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java

Lines changed: 141 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.ArrayList;
2222
import java.util.Collections;
23+
import java.util.Date;
2324
import java.util.HashMap;
2425
import java.util.HashSet;
2526
import java.util.List;
@@ -51,13 +52,13 @@ public void testAppendValuesToExistingList() throws Exception {
5152
if (randomBoolean()) {
5253
Object value = scalar.randomValue();
5354
values.add(value);
54-
appendProcessor = createAppendProcessor(field, value, true);
55+
appendProcessor = createAppendProcessor(field, value, null, true);
5556
} else {
5657
int valuesSize = randomIntBetween(0, 10);
5758
for (int i = 0; i < valuesSize; i++) {
5859
values.add(scalar.randomValue());
5960
}
60-
appendProcessor = createAppendProcessor(field, values, true);
61+
appendProcessor = createAppendProcessor(field, values, null, true);
6162
}
6263
appendProcessor.execute(ingestDocument);
6364
Object fieldValue = ingestDocument.getFieldValue(field, Object.class);
@@ -80,13 +81,13 @@ public void testAppendValuesToNonExistingList() throws Exception {
8081
if (randomBoolean()) {
8182
Object value = scalar.randomValue();
8283
values.add(value);
83-
appendProcessor = createAppendProcessor(field, value, true);
84+
appendProcessor = createAppendProcessor(field, value, null, true);
8485
} else {
8586
int valuesSize = randomIntBetween(0, 10);
8687
for (int i = 0; i < valuesSize; i++) {
8788
values.add(scalar.randomValue());
8889
}
89-
appendProcessor = createAppendProcessor(field, values, true);
90+
appendProcessor = createAppendProcessor(field, values, null, true);
9091
}
9192
appendProcessor.execute(ingestDocument);
9293
List<?> list = ingestDocument.getFieldValue(field, List.class);
@@ -104,13 +105,13 @@ public void testConvertScalarToList() throws Exception {
104105
if (randomBoolean()) {
105106
Object value = scalar.randomValue();
106107
values.add(value);
107-
appendProcessor = createAppendProcessor(field, value, true);
108+
appendProcessor = createAppendProcessor(field, value, null, true);
108109
} else {
109110
int valuesSize = randomIntBetween(0, 10);
110111
for (int i = 0; i < valuesSize; i++) {
111112
values.add(scalar.randomValue());
112113
}
113-
appendProcessor = createAppendProcessor(field, values, true);
114+
appendProcessor = createAppendProcessor(field, values, null, true);
114115
}
115116
appendProcessor.execute(ingestDocument);
116117
List<?> fieldValue = ingestDocument.getFieldValue(field, List.class);
@@ -128,7 +129,7 @@ public void testAppendingDuplicateValueToScalarDoesNotModifyDocument() throws Ex
128129

129130
List<Object> valuesToAppend = new ArrayList<>();
130131
valuesToAppend.add(originalValue);
131-
Processor appendProcessor = createAppendProcessor(field, valuesToAppend, false);
132+
Processor appendProcessor = createAppendProcessor(field, valuesToAppend, null, false);
132133
appendProcessor.execute(ingestDocument);
133134
Object fieldValue = ingestDocument.getFieldValue(field, Object.class);
134135
assertThat(fieldValue, not(instanceOf(List.class)));
@@ -143,7 +144,7 @@ public void testAppendingUniqueValueToScalar() throws Exception {
143144
List<Object> valuesToAppend = new ArrayList<>();
144145
String newValue = randomValueOtherThan(originalValue, () -> randomAlphaOfLengthBetween(1, 10));
145146
valuesToAppend.add(newValue);
146-
Processor appendProcessor = createAppendProcessor(field, valuesToAppend, false);
147+
Processor appendProcessor = createAppendProcessor(field, valuesToAppend, null, false);
147148
appendProcessor.execute(ingestDocument);
148149
List<?> list = ingestDocument.getFieldValue(field, List.class);
149150
assertThat(list.size(), equalTo(2));
@@ -172,19 +173,149 @@ public void testAppendingToListWithDuplicatesDisallowed() throws Exception {
172173
Collections.sort(valuesToAppend);
173174

174175
// attempt to append both new and existing values
175-
Processor appendProcessor = createAppendProcessor(originalField, valuesToAppend, false);
176+
Processor appendProcessor = createAppendProcessor(originalField, valuesToAppend, null, false);
176177
appendProcessor.execute(ingestDocument);
177178
List<?> fieldValue = ingestDocument.getFieldValue(originalField, List.class);
178179
assertThat(fieldValue, sameInstance(list));
179180
assertThat(fieldValue, containsInAnyOrder(expectedValues.toArray()));
180181
}
181182

182-
private static Processor createAppendProcessor(String fieldName, Object fieldValue, boolean allowDuplicates) {
183+
public void testCopyFromOtherField() throws Exception {
184+
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
185+
186+
// generate values, add some to a target field, the rest to a source field
187+
int size = randomIntBetween(0, 10);
188+
List<String> allValues = Stream.generate(() -> randomAlphaOfLengthBetween(1, 10)).limit(size).collect(Collectors.toList());
189+
List<String> originalValues = randomSubsetOf(allValues);
190+
List<String> additionalValues = new ArrayList<>(Sets.difference(new HashSet<>(allValues), new HashSet<>(originalValues)));
191+
List<String> targetFieldValue = new ArrayList<>(originalValues);
192+
String targetField = RandomDocumentPicks.addRandomField(random(), ingestDocument, targetFieldValue);
193+
String sourceField = RandomDocumentPicks.addRandomField(random(), ingestDocument, additionalValues);
194+
195+
Processor appendProcessor = createAppendProcessor(targetField, null, sourceField, false);
196+
appendProcessor.execute(ingestDocument);
197+
List<?> fieldValue = ingestDocument.getFieldValue(targetField, List.class);
198+
assertThat(fieldValue, sameInstance(targetFieldValue));
199+
assertThat(fieldValue, containsInAnyOrder(allValues.toArray()));
200+
}
201+
202+
public void testCopyFromCopiesNonPrimitiveMutableTypes() throws Exception {
203+
final String sourceField = "sourceField";
204+
final String targetField = "targetField";
205+
Processor processor = createAppendProcessor(targetField, null, sourceField, false);
206+
207+
// map types
208+
Map<String, Object> document = new HashMap<>();
209+
Map<String, Object> sourceMap = new HashMap<>();
210+
sourceMap.put("foo", "bar");
211+
document.put(sourceField, sourceMap);
212+
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
213+
IngestDocument output = processor.execute(ingestDocument);
214+
sourceMap.put("foo", "not-bar");
215+
Map<?, ?> outputMap = (Map<?, ?>) output.getFieldValue(targetField, List.class).getFirst();
216+
assertThat(outputMap.get("foo"), equalTo("bar"));
217+
218+
// set types
219+
document = new HashMap<>();
220+
Set<String> sourceSet = randomUnique(() -> randomAlphaOfLength(5), 5);
221+
Set<String> preservedSet = new HashSet<>(sourceSet);
222+
document.put(sourceField, sourceSet);
223+
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
224+
processor.execute(ingestDocument);
225+
sourceSet.add(randomValueOtherThanMany(sourceSet::contains, () -> randomAlphaOfLength(5)));
226+
Set<?> outputSet = (Set<?>) ingestDocument.getFieldValue(targetField, List.class).getFirst();
227+
assertThat(outputSet, equalTo(preservedSet));
228+
229+
// list types (the outer list isn't used, but an inner list should be copied)
230+
document = new HashMap<>();
231+
List<String> sourceList = randomList(1, 5, () -> randomAlphaOfLength(5));
232+
List<String> preservedList = new ArrayList<>(sourceList);
233+
List<List<String>> wrappedSourceList = List.of(sourceList);
234+
document.put(sourceField, wrappedSourceList);
235+
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
236+
processor.execute(ingestDocument);
237+
sourceList.add(randomValueOtherThanMany(sourceList::contains, () -> randomAlphaOfLength(5)));
238+
List<?> unwrappedOutputList = (List<?>) ingestDocument.getFieldValue(targetField, List.class).getFirst();
239+
assertThat(unwrappedOutputList, equalTo(preservedList));
240+
241+
// byte[] types
242+
document = new HashMap<>();
243+
byte[] sourceBytes = randomByteArrayOfLength(10);
244+
byte[] preservedBytes = new byte[sourceBytes.length];
245+
System.arraycopy(sourceBytes, 0, preservedBytes, 0, sourceBytes.length);
246+
document.put(sourceField, sourceBytes);
247+
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
248+
processor.execute(ingestDocument);
249+
sourceBytes[0] = sourceBytes[0] == 0 ? (byte) 1 : (byte) 0;
250+
byte[] outputBytes = (byte[]) ingestDocument.getFieldValue(targetField, List.class).getFirst();
251+
assertThat(outputBytes, equalTo(preservedBytes));
252+
253+
// Date types
254+
document = new HashMap<>();
255+
Date sourceDate = new Date();
256+
Date preservedDate = new Date(sourceDate.getTime());
257+
document.put(sourceField, sourceDate);
258+
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
259+
processor.execute(ingestDocument);
260+
sourceDate.setTime(sourceDate.getTime() + 1);
261+
Date outputDate = (Date) ingestDocument.getFieldValue(targetField, List.class).getFirst();
262+
assertThat(outputDate, equalTo(preservedDate));
263+
}
264+
265+
public void testCopyFromDeepCopiesNonPrimitiveMutableTypes() throws Exception {
266+
final String sourceField = "sourceField";
267+
final String targetField = "targetField";
268+
Processor processor = createAppendProcessor(targetField, null, sourceField, false);
269+
Map<String, Object> document = new HashMap<>();
270+
271+
// a root map with values of map, set, list, bytes, date
272+
Map<String, Object> sourceMap = new HashMap<>();
273+
sourceMap.put("foo", "bar");
274+
Set<String> sourceSet = randomUnique(() -> randomAlphaOfLength(5), 5);
275+
List<String> sourceList = randomList(1, 5, () -> randomAlphaOfLength(5));
276+
byte[] sourceBytes = randomByteArrayOfLength(10);
277+
Date sourceDate = new Date();
278+
Map<String, Object> root = new HashMap<>();
279+
root.put("foo", "bar");
280+
root.put("map", sourceMap);
281+
root.put("set", sourceSet);
282+
root.put("list", sourceList);
283+
root.put("bytes", sourceBytes);
284+
root.put("date", sourceDate);
285+
286+
Set<String> preservedSet = new HashSet<>(sourceSet);
287+
List<String> preservedList = new ArrayList<>(sourceList);
288+
byte[] preservedBytes = new byte[sourceBytes.length];
289+
System.arraycopy(sourceBytes, 0, preservedBytes, 0, sourceBytes.length);
290+
Date preservedDate = new Date(sourceDate.getTime());
291+
292+
document.put(sourceField, root);
293+
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
294+
IngestDocument output = processor.execute(ingestDocument);
295+
Map<?, ?> outputRoot = (Map<?, ?>) output.getFieldValue(targetField, List.class).getFirst();
296+
297+
root.put("foo", "not-bar");
298+
sourceMap.put("foo", "not-bar");
299+
sourceSet.add(randomValueOtherThanMany(sourceSet::contains, () -> randomAlphaOfLength(5)));
300+
sourceList.add(randomValueOtherThanMany(sourceList::contains, () -> randomAlphaOfLength(5)));
301+
sourceBytes[0] = sourceBytes[0] == 0 ? (byte) 1 : (byte) 0;
302+
sourceDate.setTime(sourceDate.getTime() + 1);
303+
304+
assertThat(outputRoot.get("foo"), equalTo("bar"));
305+
assertThat(((Map<?, ?>) outputRoot.get("map")).get("foo"), equalTo("bar"));
306+
assertThat(((Set<?>) outputRoot.get("set")), equalTo(preservedSet));
307+
assertThat(((List<?>) outputRoot.get("list")), equalTo(preservedList));
308+
assertThat(((byte[]) outputRoot.get("bytes")), equalTo(preservedBytes));
309+
assertThat(((Date) outputRoot.get("date")), equalTo(preservedDate));
310+
}
311+
312+
private static Processor createAppendProcessor(String fieldName, Object fieldValue, String copyFrom, boolean allowDuplicates) {
183313
return new AppendProcessor(
184314
randomAlphaOfLength(10),
185315
null,
186316
new TestTemplateService.MockTemplateScript.Factory(fieldName),
187317
ValueSource.wrap(fieldValue, TestTemplateService.instance()),
318+
copyFrom,
188319
allowDuplicates
189320
);
190321
}

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ public void testModifyFieldsOutsideArray() {
207207
new CompoundProcessor(
208208
false,
209209
List.of(new UppercaseProcessor("_tag_upper", null, "_ingest._value", false, "_ingest._value")),
210-
List.of(new AppendProcessor("_tag", null, template, (model) -> (List.of("added")), true))
210+
List.of(new AppendProcessor("_tag", null, template, (model) -> (List.of("added")), null, true))
211211
),
212212
false
213213
);

0 commit comments

Comments
 (0)