Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,59 @@ public void testMultiplePipelines() throws IOException {
assertThat(listResponseMap.size(), is(ids.size()));
}

public void testValidPipelineIds() throws IOException {
final String pipelineJson = getPipelineJson();
final List<String> validIds = List.of(
"main",
"_internal",
"my_pipeline",
"my-pipeline",
"pipeline123",
"A1",
"_pipeline_1",
"MyPipeline-123",
"main_pipeline_v2"
);

for (String id : validIds) {
createPipeline(id, pipelineJson);
}

refreshAllIndices();

// fetch all pipeline IDs
Request listAll = new Request("GET", "/_logstash/pipeline");
Response listAllResponse = client().performRequest(listAll);
assertThat(listAllResponse.getStatusLine().getStatusCode(), is(200));
Map<String, Object> listResponseMap = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(listAllResponse.getEntity()),
false
);
for (String id : validIds) {
assertTrue(listResponseMap.containsKey(id));
}
assertThat(listResponseMap.size(), is(validIds.size()));
}

public void testInvalidPipelineIds() throws IOException {
final String pipelineJson = getPipelineJson();
final List<String> invalidPipelineIds = List.of("123pipeline", "-pipeline", "*-pipeline");

for (String id : invalidPipelineIds) {
Request putRequest = new Request("PUT", "/_logstash/pipeline/" + id);
putRequest.setJsonEntity(pipelineJson);

ResponseException exception = expectThrows(ResponseException.class, () -> client().performRequest(putRequest));

Response response = exception.getResponse();
assertThat(response.getStatusLine().getStatusCode(), is(400));

String responseBody = EntityUtils.toString(response.getEntity());
assertThat(responseBody, containsString("Invalid pipeline [" + id + "] ID received"));
}
}

private void createPipeline(String id, String json) throws IOException {
Request putRequest = new Request("PUT", "/_logstash/pipeline/" + id);
putRequest.setJsonEntity(json);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.logstash.rest;

import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
Expand All @@ -23,12 +24,17 @@

import java.io.IOException;
import java.util.List;
import java.util.regex.Pattern;

import static org.elasticsearch.rest.RestRequest.Method.PUT;

@ServerlessScope(Scope.PUBLIC)
public class RestPutPipelineAction extends BaseRestHandler {

// A pipeline ID pattern to validate.
// Reference: https://www.elastic.co/docs/reference/logstash/configuring-centralized-pipelines#wildcard-in-pipeline-id
private static final Pattern PIPELINE_ID_PATTERN = Pattern.compile("[a-zA-Z_][a-zA-Z0-9_-]*");

@Override
public String getName() {
return "logstash_put_pipeline";
Expand All @@ -39,9 +45,31 @@ public List<Route> routes() {
return List.of(new Route(PUT, "/_logstash/pipeline/{id}"));
}

/**
* Validates pipeline ID for:
* - must begin with a letter or underscore
* - can contain only letters, underscores, dashes, and numbers
*/
private static void validatePipelineId(String id) {
if (Strings.isEmpty(id)) {
throw new IllegalArgumentException("Pipeline ID cannot be null or empty");
}

if (PIPELINE_ID_PATTERN.matcher(id).matches() == false) {
throw new IllegalArgumentException(
"Invalid pipeline ["
+ id
+ "] ID received. Pipeline ID must begin with a letter or underscore and can contain only letters, "
+ "underscores, dashes, hyphens, and numbers"
);
}
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
final String id = request.param("id");
validatePipelineId(id);

try (XContentParser parser = request.contentParser()) {
// parse pipeline for validation
Pipeline.PARSER.apply(parser, id);
Expand Down