diff --git a/modules/ingest-geoip/qa/multi-project/build.gradle b/modules/ingest-geoip/qa/multi-project/build.gradle index bcfe4f22599ac..25b5ba72f4dd4 100644 --- a/modules/ingest-geoip/qa/multi-project/build.gradle +++ b/modules/ingest-geoip/qa/multi-project/build.gradle @@ -8,15 +8,25 @@ */ apply plugin: 'elasticsearch.internal-java-rest-test' +apply plugin: 'elasticsearch.internal-yaml-rest-test' +apply plugin: 'elasticsearch.yaml-rest-compat-test' dependencies { javaRestTestImplementation project(':modules:ingest-geoip') javaRestTestImplementation project(':test:external-modules:test-multi-project') javaRestTestImplementation project(':test:fixtures:geoip-fixture') + yamlRestTestImplementation project(':modules:ingest-geoip') + yamlRestTestImplementation project(':test:external-modules:test-multi-project') + yamlRestTestImplementation project(':test:fixtures:geoip-fixture') + yamlRestTestImplementation project(':x-pack:qa:multi-project:yaml-test-framework') + clusterModules project(':modules:ingest-geoip') clusterModules project(':modules:reindex') // needed for database cleanup clusterModules project(':test:external-modules:test-multi-project') + + // includes yaml rest test artifacts from ingest-geoip module + restTestConfig project(path: ':modules:ingest-geoip', configuration: "restTests") } tasks.withType(Test).configureEach { @@ -27,3 +37,10 @@ tasks.withType(Test).configureEach { tasks.named { it == "javaRestTest" || it == "yamlRestTest" }.configureEach { it.onlyIf("snapshot build") { buildParams.snapshotBuild } } + +restResources { + restTests { + // includes yaml rest test from ingest_geoip folder + includeCore 'ingest_geoip' + } +} diff --git a/modules/ingest-geoip/qa/multi-project/src/javaRestTest/java/geoip/GeoIpMultiProjectIT.java b/modules/ingest-geoip/qa/multi-project/src/javaRestTest/java/org/elasticsearch/ingest/geoip/GeoIpMultiProjectIT.java similarity index 97% rename from modules/ingest-geoip/qa/multi-project/src/javaRestTest/java/geoip/GeoIpMultiProjectIT.java rename to modules/ingest-geoip/qa/multi-project/src/javaRestTest/java/org/elasticsearch/ingest/geoip/GeoIpMultiProjectIT.java index 1ed4c9e2172de..2472a0e671d6b 100644 --- a/modules/ingest-geoip/qa/multi-project/src/javaRestTest/java/geoip/GeoIpMultiProjectIT.java +++ b/modules/ingest-geoip/qa/multi-project/src/javaRestTest/java/org/elasticsearch/ingest/geoip/GeoIpMultiProjectIT.java @@ -7,15 +7,13 @@ * License v3.0 only", or the "Server Side Public License, v 1". */ -package geoip; +package org.elasticsearch.ingest.geoip; import fixture.geoip.GeoIpHttpFixture; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.core.Booleans; -import org.elasticsearch.ingest.geoip.GeoIpDownloader; -import org.elasticsearch.ingest.geoip.GeoIpDownloaderTaskExecutor; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.test.rest.ESRestTestCase; diff --git a/modules/ingest-geoip/qa/multi-project/src/yamlRestTest/java/org/elasticsearch/ingest/geoip/IngestGeoIpClientMultiProjectYamlTestSuiteIT.java b/modules/ingest-geoip/qa/multi-project/src/yamlRestTest/java/org/elasticsearch/ingest/geoip/IngestGeoIpClientMultiProjectYamlTestSuiteIT.java new file mode 100644 index 0000000000000..703a0e6968976 --- /dev/null +++ b/modules/ingest-geoip/qa/multi-project/src/yamlRestTest/java/org/elasticsearch/ingest/geoip/IngestGeoIpClientMultiProjectYamlTestSuiteIT.java @@ -0,0 +1,136 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.ingest.geoip; + +import fixture.geoip.GeoIpHttpFixture; + +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.elasticsearch.client.Request; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.core.Booleans; +import org.elasticsearch.multiproject.test.MultipleProjectsClientYamlSuiteTestCase; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; +import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.rules.RuleChain; +import org.junit.rules.TestRule; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class IngestGeoIpClientMultiProjectYamlTestSuiteIT extends MultipleProjectsClientYamlSuiteTestCase { + + private static final boolean useFixture = Booleans.parseBoolean(System.getProperty("geoip_use_service", "false")) == false; + + private static GeoIpHttpFixture fixture = new GeoIpHttpFixture(useFixture); + + private static ElasticsearchCluster cluster = ElasticsearchCluster.local() + .module("reindex") + .module("ingest-geoip") + .systemProperty("ingest.geoip.downloader.enabled.default", "true") + // sets the plain (geoip.elastic.co) downloader endpoint, which is used in these tests + .setting("ingest.geoip.downloader.endpoint", () -> fixture.getAddress(), s -> useFixture) + // also sets the enterprise downloader maxmind endpoint, to make sure we do not accidentally hit the real endpoint from tests + // note: it's not important that the downloading actually work at this point -- the rest tests (so far) don't exercise + // the downloading code because of license reasons -- but if they did, then it would be important that we're hitting a fixture + .systemProperty("ingest.geoip.downloader.maxmind.endpoint.default", () -> fixture.getAddress(), s -> useFixture) + .setting("test.multi_project.enabled", "true") + .setting("xpack.license.self_generated.type", "trial") + .user(USER, PASS) + .build(); + + @ClassRule + public static TestRule ruleChain = RuleChain.outerRule(fixture).around(cluster); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } + + public IngestGeoIpClientMultiProjectYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) { + super(testCandidate); + } + + @ParametersFactory + public static Iterable parameters() throws Exception { + return ESClientYamlSuiteTestCase.createParameters(); + } + + @Before + public void waitForDatabases() throws Exception { + putGeoipPipeline(); + assertBusy(() -> { + Request request = new Request("GET", "/_ingest/geoip/stats"); + Map response = entityAsMap(client().performRequest(request)); + // assert databases are downloaded + Map downloadStats = (Map) response.get("stats"); + assertThat(downloadStats.get("databases_count"), equalTo(4)); + // assert databases are loaded to node + Map nodes = (Map) response.get("nodes"); + assertThat(nodes.size(), equalTo(1)); + Map node = (Map) nodes.values().iterator().next(); + List databases = ((List) node.get("databases")); + assertThat(databases, notNullValue()); + List databaseNames = databases.stream().map(o -> (String) ((Map) o).get("name")).toList(); + assertThat( + databaseNames, + containsInAnyOrder("GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "GeoLite2-ASN.mmdb", "MyCustomGeoLite2-City.mmdb") + ); + }, 10, TimeUnit.SECONDS); + } + + /** + * This creates a pipeline with a geoip processor so that the GeoipDownloader will download its databases. + * @throws IOException + */ + private void putGeoipPipeline() throws IOException { + final BytesReference bytes; + try (XContentBuilder builder = JsonXContent.contentBuilder()) { + builder.startObject(); + { + builder.startArray("processors"); + { + builder.startObject(); + { + builder.startObject("geoip"); + { + builder.field("field", "ip"); + builder.field("target_field", "ip-city"); + builder.field("database_file", "GeoLite2-City.mmdb"); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endArray(); + } + builder.endObject(); + bytes = BytesReference.bytes(builder); + } + Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/pipeline-with-geoip"); + putPipelineRequest.setEntity(new ByteArrayEntity(bytes.array(), ContentType.APPLICATION_JSON)); + client().performRequest(putPipelineRequest); + } + +} diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseNodeService.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseNodeService.java index 6de92f373e648..038438becce21 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseNodeService.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseNodeService.java @@ -16,6 +16,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; @@ -288,104 +289,105 @@ void checkDatabases(ClusterState state) { } // Optimization: only load the .geoip_databases for projects that are allocated to this node - for (ProjectMetadata projectMetadata : state.metadata().projects().values()) { - ProjectId projectId = projectMetadata.id(); + for (ProjectId projectId : state.metadata().projects().keySet()) { + checkDatabases(state.projectState(projectId)); + } + } - PersistentTasksCustomMetadata persistentTasks = projectMetadata.custom(PersistentTasksCustomMetadata.TYPE); - if (persistentTasks == null) { - logger.trace("Not checking databases for project [{}] because persistent tasks are null", projectId); - continue; - } + void checkDatabases(ProjectState projectState) { + ProjectId projectId = projectState.projectId(); + ProjectMetadata projectMetadata = projectState.metadata(); + ClusterState clusterState = projectState.cluster(); + PersistentTasksCustomMetadata persistentTasks = projectMetadata.custom(PersistentTasksCustomMetadata.TYPE); + if (persistentTasks == null) { + logger.trace("Not checking databases for project [{}] because persistent tasks are null", projectId); + return; + } - IndexAbstraction databasesAbstraction = projectMetadata.getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX); - if (databasesAbstraction == null) { - logger.trace("Not checking databases because geoip databases index does not exist for project [{}]", projectId); + IndexAbstraction databasesAbstraction = projectMetadata.getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX); + if (databasesAbstraction == null) { + logger.trace("Not checking databases because geoip databases index does not exist for project [{}]", projectId); + return; + } else { + // regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index + Index databasesIndex = databasesAbstraction.getWriteIndex(); + IndexRoutingTable databasesIndexRT = clusterState.routingTable(projectId).index(databasesIndex); + if (databasesIndexRT == null || databasesIndexRT.allPrimaryShardsActive() == false) { + logger.trace( + "Not checking databases because geoip databases index does not have all active primary shards for" + " project [{}]", + projectId + ); return; - } else { - // regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index - Index databasesIndex = databasesAbstraction.getWriteIndex(); - IndexRoutingTable databasesIndexRT = state.routingTable(projectId).index(databasesIndex); - if (databasesIndexRT == null || databasesIndexRT.allPrimaryShardsActive() == false) { - logger.trace( - "Not checking databases because geoip databases index does not have all active primary shards for" - + " project [{}]", - projectId - ); - return; - } } + } - // we'll consult each of the geoip downloaders to build up a list of database metadatas to work with - List> validMetadatas = new ArrayList<>(); + // we'll consult each of the geoip downloaders to build up a list of database metadatas to work with + List> validMetadatas = new ArrayList<>(); - // process the geoip task state for the (ordinary) geoip downloader - { - GeoIpTaskState taskState = getGeoIpTaskState( - projectMetadata, - getTaskId(projectId, projectResolver.supportsMultipleProjects()) - ); - if (taskState == null) { - // Note: an empty state will purge stale entries in databases map - taskState = GeoIpTaskState.EMPTY; - } - validMetadatas.addAll( - taskState.getDatabases() - .entrySet() - .stream() - .filter(e -> e.getValue().isNewEnough(state.getMetadata().settings())) - .map(entry -> Tuple.tuple(entry.getKey(), entry.getValue())) - .toList() - ); + // process the geoip task state for the (ordinary) geoip downloader + { + GeoIpTaskState taskState = getGeoIpTaskState(projectMetadata, getTaskId(projectId, projectResolver.supportsMultipleProjects())); + if (taskState == null) { + // Note: an empty state will purge stale entries in databases map + taskState = GeoIpTaskState.EMPTY; } + validMetadatas.addAll( + taskState.getDatabases() + .entrySet() + .stream() + .filter(e -> e.getValue().isNewEnough(clusterState.getMetadata().settings())) + .map(entry -> Tuple.tuple(entry.getKey(), entry.getValue())) + .toList() + ); + } - // process the geoip task state for the enterprise geoip downloader - { - EnterpriseGeoIpTaskState taskState = getEnterpriseGeoIpTaskState(state); - if (taskState == null) { - // Note: an empty state will purge stale entries in databases map - taskState = EnterpriseGeoIpTaskState.EMPTY; - } - validMetadatas.addAll( - taskState.getDatabases() - .entrySet() - .stream() - .filter(e -> e.getValue().isNewEnough(state.getMetadata().settings())) - .map(entry -> Tuple.tuple(entry.getKey(), entry.getValue())) - .toList() - ); + // process the geoip task state for the enterprise geoip downloader + { + EnterpriseGeoIpTaskState taskState = getEnterpriseGeoIpTaskState(clusterState); + if (taskState == null) { + // Note: an empty state will purge stale entries in databases map + taskState = EnterpriseGeoIpTaskState.EMPTY; } + validMetadatas.addAll( + taskState.getDatabases() + .entrySet() + .stream() + .filter(e -> e.getValue().isNewEnough(clusterState.getMetadata().settings())) + .map(entry -> Tuple.tuple(entry.getKey(), entry.getValue())) + .toList() + ); + } - // run through all the valid metadatas, regardless of source, and retrieve them if the persistent downloader task - // has downloaded a new version of the databases - validMetadatas.forEach(e -> { - String name = e.v1(); - GeoIpTaskState.Metadata metadata = e.v2(); - DatabaseReaderLazyLoader reference = getProjectLazyLoader(projectId, name); - String remoteMd5 = metadata.md5(); - String localMd5 = reference != null ? reference.getMd5() : null; - if (Objects.equals(localMd5, remoteMd5)) { - logger.debug("[{}] is up to date [{}] with cluster state [{}]", name, localMd5, remoteMd5); - return; - } + // run through all the valid metadatas, regardless of source, and retrieve them if the persistent downloader task + // has downloaded a new version of the databases + validMetadatas.forEach(e -> { + String name = e.v1(); + GeoIpTaskState.Metadata metadata = e.v2(); + DatabaseReaderLazyLoader reference = getProjectLazyLoader(projectId, name); + String remoteMd5 = metadata.md5(); + String localMd5 = reference != null ? reference.getMd5() : null; + if (Objects.equals(localMd5, remoteMd5)) { + logger.debug("[{}] is up to date [{}] with cluster state [{}]", name, localMd5, remoteMd5); + return; + } - try { - retrieveAndUpdateDatabase(projectId, name, metadata); - } catch (Exception ex) { - logger.error(() -> "failed to retrieve database [" + name + "]", ex); - } - }); - - // TODO perhaps we need to handle the license flap persistent task state better than we do - // i think the ideal end state is that we *do not* drop the files that the enterprise downloader - // handled if they fall out -- which means we need to track that in the databases map itself - - // start with the list of all databases we currently know about in this service, - // then drop the ones that didn't check out as valid from the task states - if (databases.containsKey(projectId)) { - Set staleDatabases = new HashSet<>(databases.get(projectId).keySet()); - staleDatabases.removeAll(validMetadatas.stream().map(Tuple::v1).collect(Collectors.toSet())); - removeStaleEntries(projectId, staleDatabases); + try { + retrieveAndUpdateDatabase(projectId, name, metadata); + } catch (Exception ex) { + logger.error(() -> "failed to retrieve database [" + name + "]", ex); } + }); + + // TODO perhaps we need to handle the license flap persistent task state better than we do + // i think the ideal end state is that we *do not* drop the files that the enterprise downloader + // handled if they fall out -- which means we need to track that in the databases map itself + + // start with the list of all databases we currently know about in this service, + // then drop the ones that didn't check out as valid from the task states + if (databases.containsKey(projectId)) { + Set staleDatabases = new HashSet<>(databases.get(projectId).keySet()); + staleDatabases.removeAll(validMetadatas.stream().map(Tuple::v1).collect(Collectors.toSet())); + removeStaleEntries(projectId, staleDatabases); } }