Skip to content

Commit 558cc7a

Browse files
authored
Support Fields API in conditional ingest processors (#131581)
1 parent d16eb70 commit 558cc7a

File tree

16 files changed

+639
-364
lines changed

16 files changed

+639
-364
lines changed

docs/changelog/131581.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 131581
2+
summary: Support Fields API in conditional ingest processors
3+
area: Infra/Core
4+
type: enhancement
5+
issues: []

modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/210_conditional_processor.yml

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,150 @@ teardown:
7474
- match: { _source.bytes_source_field: "1kb" }
7575
- match: { _source.conditional_field: "bar" }
7676
- is_false: _source.bytes_target_field
77+
78+
---
79+
"Test conditional processor with fields API":
80+
- do:
81+
ingest.put_pipeline:
82+
id: "my_pipeline"
83+
body:
84+
description: "_description"
85+
processors:
86+
- set:
87+
if: "field('get.field').get('') == 'one'"
88+
field: "one"
89+
value: 1
90+
- set:
91+
if: "field('get.field').get('') == 'two'"
92+
field: "missing"
93+
value: "missing"
94+
- set:
95+
if: " /* avoid yaml stash */ $('get.field', 'one') == 'one'"
96+
field: "dollar"
97+
value: true
98+
- set:
99+
if: "field('missing.field').get('fallback') == 'fallback'"
100+
field: "fallback"
101+
value: "fallback"
102+
- set:
103+
if: "field('nested.array.get.with.index.field').get(1, null) == 'two'"
104+
field: "two"
105+
value: 2
106+
- set:
107+
if: "field('getName.field').getName() == 'getName.field'"
108+
field: "three"
109+
value: 3
110+
- set:
111+
if: "field('existing.field').exists()"
112+
field: "four"
113+
value: 4
114+
- set:
115+
if: "!field('empty.field').isEmpty()"
116+
field: "missing"
117+
value: "missing"
118+
- set:
119+
if: "field('size.field').size() == 2"
120+
field: "five"
121+
value: 5
122+
- set:
123+
if: >
124+
def iterator = field('iterator.field').iterator();
125+
def sum = 0;
126+
while (iterator.hasNext()) {
127+
sum += iterator.next();
128+
}
129+
return sum == 6;
130+
field: "six"
131+
value: 6
132+
- set:
133+
if: "field('hasValue.field').hasValue(v -> v == 'two')"
134+
field: "seven"
135+
value: 7
136+
- match: { acknowledged: true }
137+
138+
- do:
139+
index:
140+
index: test
141+
id: "1"
142+
pipeline: "my_pipeline"
143+
body:
144+
get.field: "one"
145+
nested:
146+
array:
147+
get.with.index.field: ["one", "two", "three"]
148+
getName.field: "my_name"
149+
existing.field: "indeed"
150+
empty.field: []
151+
size.field: ["one", "two"]
152+
iterator.field: [1, 2, 3]
153+
hasValue.field: ["one", "two", "three"]
154+
155+
- do:
156+
get:
157+
index: test
158+
id: "1"
159+
- match: { _source.get\.field: "one" }
160+
- match: { _source.one: 1 }
161+
- is_false: _source.missing
162+
- is_true: _source.dollar
163+
- match: { _source.fallback: "fallback" }
164+
- match: { _source.nested.array.get\.with\.index\.field: ["one", "two", "three"] }
165+
- match: { _source.two: 2 }
166+
- match: { _source.three: 3 }
167+
- match: { _source.four: 4 }
168+
- match: { _source.five: 5 }
169+
- match: { _source.six: 6 }
170+
- match: { _source.seven: 7 }
171+
172+
---
173+
"Test fields iterator is unmodifiable":
174+
- do:
175+
ingest.put_pipeline:
176+
id: "my_pipeline"
177+
body:
178+
description: "_description"
179+
processors:
180+
- set:
181+
if: >
182+
def iterator = field('iterator.field').iterator();
183+
def sum = 0;
184+
while (iterator.hasNext()) {
185+
sum += iterator.next();
186+
iterator.remove();
187+
}
188+
return sum == 6;
189+
field: "sum"
190+
value: 6
191+
- match: { acknowledged: true }
192+
193+
- do:
194+
index:
195+
index: test
196+
id: "1"
197+
pipeline: "my_pipeline"
198+
body:
199+
test.field: [1, 2, 3]
200+
- match: { error: null }
201+
202+
- do:
203+
index:
204+
index: test
205+
id: "2"
206+
pipeline: "my_pipeline"
207+
body:
208+
iterator.field: [1, 2, 3]
209+
catch: bad_request
210+
- length: { error.root_cause: 1 }
211+
212+
- do:
213+
get:
214+
index: test
215+
id: "1"
216+
- match: { _source.test\.field: [1, 2, 3] }
217+
- is_false: _source.sum
218+
219+
- do:
220+
get:
221+
index: test
222+
id: "2"
223+
catch: missing

modules/lang-painless/src/main/resources/org/elasticsearch/painless/org.elasticsearch.script.ingest.txt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,12 @@ class org.elasticsearch.script.IngestScript {
4646
}
4747

4848
class org.elasticsearch.script.field.WriteField {
49-
String getName()
5049
boolean exists()
5150
WriteField move(def)
5251
WriteField overwrite(def)
5352
void remove()
5453
WriteField set(def)
5554
WriteField append(def)
56-
boolean isEmpty()
57-
int size()
5855
Iterator iterator()
5956
def get(def)
6057
def get(int, def)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#
2+
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
# or more contributor license agreements. Licensed under the "Elastic License
4+
# 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
# Public License v 1"; you may not use this file except in compliance with, at
6+
# your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
# License v3.0 only", or the "Server Side Public License, v 1".
8+
#
9+
10+
# This file contains a whitelist for conditional ingest scripts
11+
12+
class org.elasticsearch.script.IngestConditionalScript {
13+
SourceMapField field(String)
14+
}
15+
16+
class org.elasticsearch.script.field.SourceMapField {
17+
boolean exists()
18+
Iterator iterator()
19+
def get(def)
20+
def get(int, def)
21+
boolean hasValue(Predicate)
22+
}

modules/lang-painless/src/main/resources/org/elasticsearch/painless/org.elasticsearch.script.reindex.txt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,12 @@ class org.elasticsearch.script.ReindexScript {
4242
}
4343

4444
class org.elasticsearch.script.field.WriteField {
45-
String getName()
4645
boolean exists()
4746
WriteField move(def)
4847
WriteField overwrite(def)
4948
void remove()
5049
WriteField set(def)
5150
WriteField append(def)
52-
boolean isEmpty()
53-
int size()
5451
Iterator iterator()
5552
def get(def)
5653
def get(int, def)

modules/lang-painless/src/main/resources/org/elasticsearch/painless/org.elasticsearch.script.update.txt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,12 @@ class org.elasticsearch.script.UpdateScript {
3737
}
3838

3939
class org.elasticsearch.script.field.WriteField {
40-
String getName()
4140
boolean exists()
4241
WriteField move(def)
4342
WriteField overwrite(def)
4443
void remove()
4544
WriteField set(def)
4645
WriteField append(def)
47-
boolean isEmpty()
48-
int size()
4946
Iterator iterator()
5047
def get(def)
5148
def get(int, def)

modules/lang-painless/src/main/resources/org/elasticsearch/painless/org.elasticsearch.script.update_by_query.txt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,12 @@ class org.elasticsearch.script.UpdateByQueryScript {
3636
}
3737

3838
class org.elasticsearch.script.field.WriteField {
39-
String getName()
4039
boolean exists()
4140
WriteField move(def)
4241
WriteField overwrite(def)
4342
void remove()
4443
WriteField set(def)
4544
WriteField append(def)
46-
boolean isEmpty()
47-
int size()
4845
Iterator iterator()
4946
def get(def)
5047
def get(int, def)

server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
5656
private final Processor processor;
5757
private final IngestMetric metric;
5858
private final LongSupplier relativeTimeProvider;
59-
private final IngestConditionalScript precompiledConditionScript;
59+
private final IngestConditionalScript.Factory precompiledConditionalScriptFactory;
6060

6161
ConditionalProcessor(String tag, String description, Script script, ScriptService scriptService, Processor processor) {
6262
this(tag, description, script, scriptService, processor, System::nanoTime);
@@ -78,12 +78,11 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
7878
this.relativeTimeProvider = relativeTimeProvider;
7979

8080
try {
81-
final IngestConditionalScript.Factory factory = scriptService.compile(script, IngestConditionalScript.CONTEXT);
8281
if (ScriptType.INLINE.equals(script.getType())) {
83-
precompiledConditionScript = factory.newInstance(script.getParams());
82+
precompiledConditionalScriptFactory = scriptService.compile(script, IngestConditionalScript.CONTEXT);
8483
} else {
8584
// stored script, so will have to compile at runtime
86-
precompiledConditionScript = null;
85+
precompiledConditionalScriptFactory = null;
8786
}
8887
} catch (ScriptException e) {
8988
throw newConfigurationException(TYPE, tag, null, e);
@@ -141,12 +140,14 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
141140
}
142141

143142
boolean evaluate(IngestDocument ingestDocument) {
144-
IngestConditionalScript script = precompiledConditionScript;
145-
if (script == null) {
146-
IngestConditionalScript.Factory factory = scriptService.compile(condition, IngestConditionalScript.CONTEXT);
147-
script = factory.newInstance(condition.getParams());
148-
}
149-
return script.execute(new UnmodifiableIngestData(new DynamicMap(ingestDocument.getSourceAndMetadata(), FUNCTIONS)));
143+
IngestConditionalScript.Factory factory = precompiledConditionalScriptFactory;
144+
if (factory == null) {
145+
factory = scriptService.compile(condition, IngestConditionalScript.CONTEXT);
146+
}
147+
return factory.newInstance(
148+
condition.getParams(),
149+
new UnmodifiableIngestData(new DynamicMap(ingestDocument.getSourceAndMetadata(), FUNCTIONS))
150+
).execute();
150151
}
151152

152153
public Processor getInnerProcessor() {

server/src/main/java/org/elasticsearch/script/IngestConditionalScript.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616
/**
1717
* A script used by {@link org.elasticsearch.ingest.ConditionalProcessor}.
1818
*/
19-
public abstract class IngestConditionalScript {
19+
public abstract class IngestConditionalScript extends SourceMapFieldScript {
2020

21-
public static final String[] PARAMETERS = { "ctx" };
21+
public static final String[] PARAMETERS = {};
2222

23-
/** The context used to compile {@link IngestConditionalScript} factories. */
23+
/**
24+
* The context used to compile {@link IngestConditionalScript} factories.
25+
* */
2426
public static final ScriptContext<Factory> CONTEXT = new ScriptContext<>(
2527
"processor_conditional",
2628
Factory.class,
@@ -30,21 +32,35 @@ public abstract class IngestConditionalScript {
3032
true
3133
);
3234

33-
/** The generic runtime parameters for the script. */
35+
/**
36+
* The generic runtime parameters for the script.
37+
*/
3438
private final Map<String, Object> params;
3539

36-
public IngestConditionalScript(Map<String, Object> params) {
40+
public IngestConditionalScript(Map<String, Object> params, Map<String, Object> ctxMap) {
41+
super(ctxMap);
3742
this.params = params;
3843
}
3944

40-
/** Return the parameters for this script. */
45+
/**
46+
* Provides backwards compatibility access to ctx
47+
* @return the context map containing the source data
48+
*/
49+
public Map<String, Object> getCtx() {
50+
return ctxMap;
51+
}
52+
53+
/**
54+
* Return the parameters for this script.
55+
* @return a map of parameters
56+
*/
4157
public Map<String, Object> getParams() {
4258
return params;
4359
}
4460

45-
public abstract boolean execute(Map<String, Object> ctx);
61+
public abstract boolean execute();
4662

4763
public interface Factory {
48-
IngestConditionalScript newInstance(Map<String, Object> params);
64+
IngestConditionalScript newInstance(Map<String, Object> params, Map<String, Object> ctxMap);
4965
}
5066
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.script;
11+
12+
import org.elasticsearch.script.field.SourceMapField;
13+
14+
import java.util.Map;
15+
16+
/**
17+
* Abstract base class for that exposes the non-mutable field APIs to scripts.
18+
*/
19+
public abstract class SourceMapFieldScript {
20+
protected final Map<String, Object> ctxMap;
21+
22+
public SourceMapFieldScript(Map<String, Object> ctxMap) {
23+
this.ctxMap = ctxMap;
24+
}
25+
26+
/**
27+
* Expose the {@link SourceMapField field} API
28+
*
29+
* @param path the path to the field in the source map
30+
* @return a new {@link SourceMapField} instance for the specified path
31+
*/
32+
public SourceMapField field(String path) {
33+
return new SourceMapField(path, () -> ctxMap);
34+
}
35+
}

0 commit comments

Comments
 (0)