Skip to content

Run GeoIp YAML tests in multi-project cluster #131521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions modules/ingest-geoip/qa/multi-project/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,25 @@
*/

apply plugin: 'elasticsearch.internal-java-rest-test'
apply plugin: 'elasticsearch.internal-yaml-rest-test'
apply plugin: 'elasticsearch.yaml-rest-compat-test'

dependencies {
javaRestTestImplementation project(':modules:ingest-geoip')
javaRestTestImplementation project(':test:external-modules:test-multi-project')
javaRestTestImplementation project(':test:fixtures:geoip-fixture')

yamlRestTestImplementation project(':modules:ingest-geoip')
yamlRestTestImplementation project(':test:external-modules:test-multi-project')
yamlRestTestImplementation project(':test:fixtures:geoip-fixture')
yamlRestTestImplementation project(':x-pack:qa:multi-project:yaml-test-framework')

clusterModules project(':modules:ingest-geoip')
clusterModules project(':modules:reindex') // needed for database cleanup
clusterModules project(':test:external-modules:test-multi-project')

// includes yaml rest test artifacts from ingest-geoip module
restTestConfig project(path: ':modules:ingest-geoip', configuration: "restTests")
}

tasks.withType(Test).configureEach {
Expand All @@ -27,3 +37,10 @@ tasks.withType(Test).configureEach {
tasks.named { it == "javaRestTest" || it == "yamlRestTest" }.configureEach {
it.onlyIf("snapshot build") { buildParams.snapshotBuild }
}

restResources {
restTests {
// includes yaml rest test from ingest_geoip folder
includeCore 'ingest_geoip'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package geoip;
package org.elasticsearch.ingest.geoip;

import fixture.geoip.GeoIpHttpFixture;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.ingest.geoip.GeoIpDownloader;
import org.elasticsearch.ingest.geoip.GeoIpDownloaderTaskExecutor;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.rest.ESRestTestCase;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.ingest.geoip;

import fixture.geoip.GeoIpHttpFixture;

import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.multiproject.test.MultipleProjectsClientYamlSuiteTestCase;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;

public class IngestGeoIpClientMultiProjectYamlTestSuiteIT extends MultipleProjectsClientYamlSuiteTestCase {

private static final boolean useFixture = Booleans.parseBoolean(System.getProperty("geoip_use_service", "false")) == false;

private static GeoIpHttpFixture fixture = new GeoIpHttpFixture(useFixture);

private static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.module("reindex")
.module("ingest-geoip")
.systemProperty("ingest.geoip.downloader.enabled.default", "true")
// sets the plain (geoip.elastic.co) downloader endpoint, which is used in these tests
.setting("ingest.geoip.downloader.endpoint", () -> fixture.getAddress(), s -> useFixture)
// also sets the enterprise downloader maxmind endpoint, to make sure we do not accidentally hit the real endpoint from tests
// note: it's not important that the downloading actually work at this point -- the rest tests (so far) don't exercise
// the downloading code because of license reasons -- but if they did, then it would be important that we're hitting a fixture
.systemProperty("ingest.geoip.downloader.maxmind.endpoint.default", () -> fixture.getAddress(), s -> useFixture)
.setting("test.multi_project.enabled", "true")
.setting("xpack.license.self_generated.type", "trial")
.user(USER, PASS)
.build();

@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(fixture).around(cluster);

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

public IngestGeoIpClientMultiProjectYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}

@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return ESClientYamlSuiteTestCase.createParameters();
}

@Before
public void waitForDatabases() throws Exception {
putGeoipPipeline();
assertBusy(() -> {
Request request = new Request("GET", "/_ingest/geoip/stats");
Map<String, Object> response = entityAsMap(client().performRequest(request));
// assert databases are downloaded
Map<?, ?> downloadStats = (Map<?, ?>) response.get("stats");
assertThat(downloadStats.get("databases_count"), equalTo(4));
// assert databases are loaded to node
Map<?, ?> nodes = (Map<?, ?>) response.get("nodes");
assertThat(nodes.size(), equalTo(1));
Map<?, ?> node = (Map<?, ?>) nodes.values().iterator().next();
List<?> databases = ((List<?>) node.get("databases"));
assertThat(databases, notNullValue());
List<String> databaseNames = databases.stream().map(o -> (String) ((Map<?, ?>) o).get("name")).toList();
assertThat(
databaseNames,
containsInAnyOrder("GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "GeoLite2-ASN.mmdb", "MyCustomGeoLite2-City.mmdb")
);
}, 10, TimeUnit.SECONDS);
}

/**
* This creates a pipeline with a geoip processor so that the GeoipDownloader will download its databases.
* @throws IOException
*/
private void putGeoipPipeline() throws IOException {
final BytesReference bytes;
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
{
builder.startArray("processors");
{
builder.startObject();
{
builder.startObject("geoip");
{
builder.field("field", "ip");
builder.field("target_field", "ip-city");
builder.field("database_file", "GeoLite2-City.mmdb");
}
builder.endObject();
}
builder.endObject();
}
builder.endArray();
}
builder.endObject();
bytes = BytesReference.bytes(builder);
}
Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/pipeline-with-geoip");
putPipelineRequest.setEntity(new ByteArrayEntity(bytes.array(), ContentType.APPLICATION_JSON));
client().performRequest(putPipelineRequest);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -289,103 +289,102 @@ void checkDatabases(ClusterState state) {

// Optimization: only load the .geoip_databases for projects that are allocated to this node
for (ProjectMetadata projectMetadata : state.metadata().projects().values()) {
ProjectId projectId = projectMetadata.id();
checkDatabases(state, projectMetadata);
}
}

PersistentTasksCustomMetadata persistentTasks = projectMetadata.custom(PersistentTasksCustomMetadata.TYPE);
if (persistentTasks == null) {
logger.trace("Not checking databases for project [{}] because persistent tasks are null", projectId);
continue;
}
void checkDatabases(ClusterState state, ProjectMetadata projectMetadata) {
ProjectId projectId = projectMetadata.id();
PersistentTasksCustomMetadata persistentTasks = projectMetadata.custom(PersistentTasksCustomMetadata.TYPE);
if (persistentTasks == null) {
logger.trace("Not checking databases for project [{}] because persistent tasks are null", projectId);
return;
}

IndexAbstraction databasesAbstraction = projectMetadata.getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX);
if (databasesAbstraction == null) {
logger.trace("Not checking databases because geoip databases index does not exist for project [{}]", projectId);
IndexAbstraction databasesAbstraction = projectMetadata.getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX);
if (databasesAbstraction == null) {
logger.trace("Not checking databases because geoip databases index does not exist for project [{}]", projectId);
return;
} else {
// regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index
Index databasesIndex = databasesAbstraction.getWriteIndex();
IndexRoutingTable databasesIndexRT = state.routingTable(projectId).index(databasesIndex);
if (databasesIndexRT == null || databasesIndexRT.allPrimaryShardsActive() == false) {
logger.trace(
"Not checking databases because geoip databases index does not have all active primary shards for" + " project [{}]",
projectId
);
return;
} else {
// regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index
Index databasesIndex = databasesAbstraction.getWriteIndex();
IndexRoutingTable databasesIndexRT = state.routingTable(projectId).index(databasesIndex);
if (databasesIndexRT == null || databasesIndexRT.allPrimaryShardsActive() == false) {
logger.trace(
"Not checking databases because geoip databases index does not have all active primary shards for"
+ " project [{}]",
projectId
);
return;
}
}
}

// we'll consult each of the geoip downloaders to build up a list of database metadatas to work with
List<Tuple<String, GeoIpTaskState.Metadata>> validMetadatas = new ArrayList<>();
// we'll consult each of the geoip downloaders to build up a list of database metadatas to work with
List<Tuple<String, GeoIpTaskState.Metadata>> validMetadatas = new ArrayList<>();

// process the geoip task state for the (ordinary) geoip downloader
{
GeoIpTaskState taskState = getGeoIpTaskState(
projectMetadata,
getTaskId(projectId, projectResolver.supportsMultipleProjects())
);
if (taskState == null) {
// Note: an empty state will purge stale entries in databases map
taskState = GeoIpTaskState.EMPTY;
}
validMetadatas.addAll(
taskState.getDatabases()
.entrySet()
.stream()
.filter(e -> e.getValue().isNewEnough(state.getMetadata().settings()))
.map(entry -> Tuple.tuple(entry.getKey(), entry.getValue()))
.toList()
);
// process the geoip task state for the (ordinary) geoip downloader
{
GeoIpTaskState taskState = getGeoIpTaskState(projectMetadata, getTaskId(projectId, projectResolver.supportsMultipleProjects()));
if (taskState == null) {
// Note: an empty state will purge stale entries in databases map
taskState = GeoIpTaskState.EMPTY;
}
validMetadatas.addAll(
taskState.getDatabases()
.entrySet()
.stream()
.filter(e -> e.getValue().isNewEnough(state.getMetadata().settings()))
.map(entry -> Tuple.tuple(entry.getKey(), entry.getValue()))
.toList()
);
}

// process the geoip task state for the enterprise geoip downloader
{
EnterpriseGeoIpTaskState taskState = getEnterpriseGeoIpTaskState(state);
if (taskState == null) {
// Note: an empty state will purge stale entries in databases map
taskState = EnterpriseGeoIpTaskState.EMPTY;
}
validMetadatas.addAll(
taskState.getDatabases()
.entrySet()
.stream()
.filter(e -> e.getValue().isNewEnough(state.getMetadata().settings()))
.map(entry -> Tuple.tuple(entry.getKey(), entry.getValue()))
.toList()
);
// process the geoip task state for the enterprise geoip downloader
{
EnterpriseGeoIpTaskState taskState = getEnterpriseGeoIpTaskState(state);
if (taskState == null) {
// Note: an empty state will purge stale entries in databases map
taskState = EnterpriseGeoIpTaskState.EMPTY;
}
validMetadatas.addAll(
taskState.getDatabases()
.entrySet()
.stream()
.filter(e -> e.getValue().isNewEnough(state.getMetadata().settings()))
.map(entry -> Tuple.tuple(entry.getKey(), entry.getValue()))
.toList()
);
}

// run through all the valid metadatas, regardless of source, and retrieve them if the persistent downloader task
// has downloaded a new version of the databases
validMetadatas.forEach(e -> {
String name = e.v1();
GeoIpTaskState.Metadata metadata = e.v2();
DatabaseReaderLazyLoader reference = getProjectLazyLoader(projectId, name);
String remoteMd5 = metadata.md5();
String localMd5 = reference != null ? reference.getMd5() : null;
if (Objects.equals(localMd5, remoteMd5)) {
logger.debug("[{}] is up to date [{}] with cluster state [{}]", name, localMd5, remoteMd5);
return;
}
// run through all the valid metadatas, regardless of source, and retrieve them if the persistent downloader task
// has downloaded a new version of the databases
validMetadatas.forEach(e -> {
String name = e.v1();
GeoIpTaskState.Metadata metadata = e.v2();
DatabaseReaderLazyLoader reference = getProjectLazyLoader(projectId, name);
String remoteMd5 = metadata.md5();
String localMd5 = reference != null ? reference.getMd5() : null;
if (Objects.equals(localMd5, remoteMd5)) {
logger.debug("[{}] is up to date [{}] with cluster state [{}]", name, localMd5, remoteMd5);
return;
}

try {
retrieveAndUpdateDatabase(projectId, name, metadata);
} catch (Exception ex) {
logger.error(() -> "failed to retrieve database [" + name + "]", ex);
}
});

// TODO perhaps we need to handle the license flap persistent task state better than we do
// i think the ideal end state is that we *do not* drop the files that the enterprise downloader
// handled if they fall out -- which means we need to track that in the databases map itself

// start with the list of all databases we currently know about in this service,
// then drop the ones that didn't check out as valid from the task states
if (databases.containsKey(projectId)) {
Set<String> staleDatabases = new HashSet<>(databases.get(projectId).keySet());
staleDatabases.removeAll(validMetadatas.stream().map(Tuple::v1).collect(Collectors.toSet()));
removeStaleEntries(projectId, staleDatabases);
try {
retrieveAndUpdateDatabase(projectId, name, metadata);
} catch (Exception ex) {
logger.error(() -> "failed to retrieve database [" + name + "]", ex);
}
});

// TODO perhaps we need to handle the license flap persistent task state better than we do
// i think the ideal end state is that we *do not* drop the files that the enterprise downloader
// handled if they fall out -- which means we need to track that in the databases map itself

// start with the list of all databases we currently know about in this service,
// then drop the ones that didn't check out as valid from the task states
if (databases.containsKey(projectId)) {
Set<String> staleDatabases = new HashSet<>(databases.get(projectId).keySet());
staleDatabases.removeAll(validMetadatas.stream().map(Tuple::v1).collect(Collectors.toSet()));
removeStaleEntries(projectId, staleDatabases);
}
}

Expand Down