Skip to content

Commit 446369d

Browse files
committed
Merge branch 'cluster' into integ
2 parents 0bf30f3 + e8310bf commit 446369d

36 files changed

+853
-320
lines changed

build.gradle

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,6 @@ List<String> jacocoExclusions = [
692692
'org.opensearch.ad.stats.ADStat',
693693
'org.opensearch.ad.feature.AbstractRetriever',
694694
'org.opensearch.ad.feature.SearchFeatureDao',
695-
'org.opensearch.ad.ml.SingleStreamModelIdMapper',
696695
'org.opensearch.ad.settings.AbstractSetting',
697696
'org.opensearch.ad.util.ExceptionUtil',
698697
'org.opensearch.ad.util.DiscoveryNodeFilterer.HotDataNodePredicate',
@@ -723,13 +722,11 @@ List<String> jacocoExclusions = [
723722
'org.opensearch.ad.transport.GetAnomalyDetectorRequest',
724723
'org.opensearch.ad.transport.AnomalyResultTransportAction',
725724
'org.opensearch.ad.transport.DeleteAnomalyDetectorAction',
726-
'org.opensearch.ad.transport.ForwardADTaskRequest',
727725
'org.opensearch.ad.transport.AnomalyResultTransportAction.EntityResultListener',
728726
'org.opensearch.ad.feature.CompositeRetriever',
729727
'org.opensearch.ad.ml.EntityColdStarter',
730728
'org.opensearch.ad.AbstractProfileRunner',
731729
'org.opensearch.ad.Name',
732-
'org.opensearch.ad.NodeStateManager',
733730
'org.opensearch.ad.model.EntityProfile.Builder',
734731
'org.opensearch.ad.transport.AnomalyResultTransportAction.PageListener',
735732
'org.opensearch.ad.feature.CompositeRetriever.Page',
@@ -739,7 +736,6 @@ List<String> jacocoExclusions = [
739736
'org.opensearch.ad.transport.ADJobRunnerTransportAction*',
740737
'org.opensearch.ad.AnomalyDetectorProfileRunner',
741738
'org.opensearch.ad.EntityProfileRunner',
742-
'org.opensearch.ad.util.BulkUtil',
743739
'org.opensearch.ad.common.exception.InternalFailure',
744740
'org.opensearch.ad.cluster.HourlyCron',
745741
'org.opensearch.ad.cluster.ADClusterEventListener',
@@ -750,8 +746,6 @@ List<String> jacocoExclusions = [
750746
'org.opensearch.ad.transport.AnomalyResultTransportAction.RCFActionListener',
751747
'org.opensearch.ad.transport.AnomalyResultRequest',
752748
'org.opensearch.ad.transport.AnomalyResultAction',
753-
'org.opensearch.ad.transport.CronNodeResponse',
754-
'org.opensearch.ad.transport.CronResponse',
755749
'org.opensearch.ad.transport.AnomalyResultResponse',
756750
'org.opensearch.ad.common.exception.ClientException',
757751
'org.opensearch.ad.caching.DoorKeeper',
@@ -765,27 +759,21 @@ List<String> jacocoExclusions = [
765759
'org.opensearch.ad.stats.InternalStatNames',
766760
'org.opensearch.ad.NodeState',
767761
'org.opensearch.ad.transport.DeleteModelRequest',
768-
'org.opensearch.ad.transport.DeleteModelNodeResponse',
769-
'org.opensearch.ad.transport.DeleteModelResponse',
770762
'org.opensearch.ad.transport.StopDetectorTransportAction',
771763
'org.opensearch.ad.transport.StopDetectorRequest',
772764
'org.opensearch.ad.ratelimit.RateLimitedRequestWorker',
773765
'org.opensearch.ad.ratelimit.QueuedRequest',
774766
'org.opensearch.ad.ratelimit.CheckpointReadWorker',
775767
'org.opensearch.ad.ratelimit.ConcurrentWorker',
776-
'org.opensearch.ad.ratelimit.EntityFeatureRequest',
777768
'org.opensearch.ad.ratelimit.RateLimitedRequestWorker.RequestQueue',
778-
'org.opensearch.ad.stats.StatNames',
779769
'org.opensearch.ad.MaintenanceState',
780770
'org.opensearch.ad.AnomalyDetectorExtension.*',
781771
'org.opensearch.ad.EntityProfileRunner',
782-
'org.opensearch.ad.caching.CacheProvider',
783772
'org.opensearch.ad.transport.ADResultBulkTransportAction',
784773
'org.opensearch.ad.transport.ADResultBulkRequest',
785774
'org.opensearch.ad.transport.ADResultBulkAction',
786775
'org.opensearch.ad.ratelimit.ResultWriteRequest',
787776
'org.opensearch.ad.AnomalyDetectorJobRunner.*',
788-
'org.opensearch.ad.util.RestHandlerUtils',
789777
'org.opensearch.ad.transport.SearchAnomalyDetectorInfoTransportAction.*',
790778
'org.opensearch.ad.transport.RCFPollingAction',
791779
'org.opensearch.ad.transport.RCFPollingRequest',
@@ -803,7 +791,7 @@ jacocoTestCoverageVerification {
803791
excludes = jacocoExclusions
804792
limit {
805793
counter = 'BRANCH'
806-
minimum = 0.50
794+
minimum = 0.40
807795
}
808796
}
809797
rule {
@@ -812,7 +800,7 @@ jacocoTestCoverageVerification {
812800
limit {
813801
counter = 'LINE'
814802
value = 'COVEREDRATIO'
815-
minimum = 0.60
803+
minimum = 0.50
816804
}
817805
}
818806
}

src/main/java/org/opensearch/ad/AnomalyDetectorExtension.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,14 @@ public Collection<Object> createComponents(ExtensionsRunner runner) {
249249

250250
Throttler throttler = new Throttler(getClock());
251251
ClientUtil clientUtil = new ClientUtil(environmentSettings, restClient(), throttler);
252-
IndexUtils indexUtils = new IndexUtils(restClient(), clientUtil, sdkClusterService, indexNameExpressionResolver, javaAsyncClient());
252+
IndexUtils indexUtils = new IndexUtils(
253+
restClient(),
254+
clientUtil,
255+
sdkClusterService,
256+
indexNameExpressionResolver,
257+
javaAsyncClient(),
258+
environmentSettings
259+
);
253260
nodeFilter = new DiscoveryNodeFilterer(sdkClusterService);
254261
AnomalyDetectionIndices anomalyDetectionIndices = new AnomalyDetectionIndices(
255262
sdkRestClient,
@@ -358,7 +365,8 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
358365
AnomalyDetectorSettings.MAX_CHECKPOINT_BYTES,
359366
serializeRCFBufferPool,
360367
AnomalyDetectorSettings.SERIALIZATION_BUFFER_BYTES,
361-
1 - AnomalyDetectorSettings.THRESHOLD_MIN_PVALUE
368+
1 - AnomalyDetectorSettings.THRESHOLD_MIN_PVALUE,
369+
environmentSettings
362370
);
363371

364372
Random random = new Random(42);

src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -651,7 +651,9 @@ public void onFailure(Exception exception) {
651651
}
652652

653653
});
654-
Response response = acquireLockResponse.orTimeout(15L, TimeUnit.SECONDS).join();
654+
Response response = acquireLockResponse
655+
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
656+
.join();
655657

656658
log.info("Acquired lock for AD job {}", context.getJobId());
657659

@@ -684,7 +686,9 @@ public void onFailure(Exception exception) {
684686
}
685687

686688
});
687-
Response response = releaseLockResponse.orTimeout(15L, TimeUnit.SECONDS).join();
689+
Response response = releaseLockResponse
690+
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
691+
.join();
688692

689693
boolean lockIsReleased = RestStatus.fromCode(response.getStatusLine().getStatusCode()) == RestStatus.OK ? true : false;
690694
if (lockIsReleased) {

src/main/java/org/opensearch/ad/NodeStateManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@
3939
import org.opensearch.ad.transport.BackPressureRouting;
4040
import org.opensearch.ad.util.ClientUtil;
4141
import org.opensearch.ad.util.ExceptionUtil;
42-
import org.opensearch.common.lease.Releasable;
4342
import org.opensearch.common.settings.Settings;
4443
import org.opensearch.common.unit.TimeValue;
4544
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
4645
import org.opensearch.common.xcontent.XContentType;
46+
import org.opensearch.core.common.lease.Releasable;
4747
import org.opensearch.core.xcontent.XContentParser;
4848
import org.opensearch.sdk.SDKClient.SDKRestClient;
4949
import org.opensearch.sdk.SDKClusterService;

src/main/java/org/opensearch/ad/indices/AnomalyDetectionIndices.java

Lines changed: 94 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,13 @@
6363
import org.opensearch.ad.model.AnomalyDetectorJob;
6464
import org.opensearch.ad.model.AnomalyResult;
6565
import org.opensearch.ad.rest.handler.AnomalyDetectorFunction;
66+
import org.opensearch.ad.settings.AnomalyDetectorSettings;
6667
import org.opensearch.ad.util.DiscoveryNodeFilterer;
6768
import org.opensearch.client.indices.CreateIndexRequest;
6869
import org.opensearch.client.indices.CreateIndexResponse;
70+
import org.opensearch.client.indices.GetIndexRequest;
71+
import org.opensearch.client.indices.GetMappingsRequest;
72+
import org.opensearch.client.indices.GetMappingsResponse;
6973
import org.opensearch.client.indices.PutMappingRequest;
7074
import org.opensearch.client.indices.rollover.RolloverRequest;
7175
import org.opensearch.client.opensearch.OpenSearchAsyncClient;
@@ -75,6 +79,7 @@
7579
import org.opensearch.cluster.LocalNodeMasterListener;
7680
import org.opensearch.cluster.metadata.AliasMetadata;
7781
import org.opensearch.cluster.metadata.IndexMetadata;
82+
import org.opensearch.cluster.metadata.MappingMetadata;
7883
import org.opensearch.common.bytes.BytesArray;
7984
import org.opensearch.common.settings.Settings;
8085
import org.opensearch.common.unit.TimeValue;
@@ -121,6 +126,7 @@ public class AnomalyDetectionIndices implements LocalNodeMasterListener {
121126
private final SDKRestClient adminClient;
122127
private final OpenSearchAsyncClient sdkJavaAsyncClient;
123128
private final ThreadPool threadPool;
129+
private final Settings environmentSettings;
124130

125131
private volatile TimeValue historyRolloverPeriod;
126132
private volatile Long historyMaxDocs;
@@ -188,6 +194,7 @@ public AnomalyDetectionIndices(
188194
this.sdkJavaAsyncClient = sdkJavaAsyncClient;
189195
this.sdkClusterService = sdkClusterService;
190196
this.threadPool = threadPool;
197+
this.environmentSettings = settings;
191198
// FIXME Implement this
192199
// https://github.com/opensearch-project/opensearch-sdk-java/issues/423
193200
// this.clusterService.addLocalNodeMasterListener(this);
@@ -301,13 +308,57 @@ public static String getCheckpointMappings() throws IOException {
301308
return Resources.toString(url, Charsets.UTF_8);
302309
}
303310

311+
/**
312+
* Determine if index exists
313+
*
314+
* @param indexName the name of the index
315+
* @return true if index exists
316+
*/
317+
public boolean indexExists(String indexName) {
318+
GetIndexRequest getindexRequest = new GetIndexRequest(indexName);
319+
320+
CompletableFuture<Boolean> existsFuture = new CompletableFuture<>();
321+
sdkRestClient.indices().exists(getindexRequest, ActionListener.wrap(response -> { existsFuture.complete(response); }, exception -> {
322+
existsFuture.completeExceptionally(exception);
323+
}));
324+
325+
Boolean existsResponse = existsFuture
326+
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(environmentSettings).getMillis(), TimeUnit.MILLISECONDS)
327+
.join();
328+
329+
return existsResponse.booleanValue();
330+
}
331+
332+
/**
333+
* Determine if alias exists
334+
*
335+
* @param aliasName the name of the alias
336+
* @return true if alias exists
337+
*/
338+
public boolean aliasExists(String aliasName) {
339+
GetAliasesRequest getAliasRequest = new GetAliasesRequest(aliasName);
340+
341+
CompletableFuture<Boolean> existsFuture = new CompletableFuture<>();
342+
sdkRestClient
343+
.indices()
344+
.existsAlias(getAliasRequest, ActionListener.wrap(response -> { existsFuture.complete(response); }, exception -> {
345+
existsFuture.completeExceptionally(exception);
346+
}));
347+
348+
Boolean existsResponse = existsFuture
349+
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(environmentSettings).getMillis(), TimeUnit.MILLISECONDS)
350+
.join();
351+
352+
return existsResponse.booleanValue();
353+
}
354+
304355
/**
305356
* Anomaly detector index exist or not.
306357
*
307358
* @return true if anomaly detector index exists
308359
*/
309360
public boolean doesAnomalyDetectorIndexExist() {
310-
return sdkClusterService.state().getRoutingTable().hasIndex(AnomalyDetector.ANOMALY_DETECTORS_INDEX);
361+
return indexExists(AnomalyDetector.ANOMALY_DETECTORS_INDEX);
311362
}
312363

313364
/**
@@ -316,7 +367,7 @@ public boolean doesAnomalyDetectorIndexExist() {
316367
* @return true if anomaly detector job index exists
317368
*/
318369
public boolean doesAnomalyDetectorJobIndexExist() {
319-
return sdkClusterService.state().getRoutingTable().hasIndex(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX);
370+
return indexExists(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX);
320371
}
321372

322373
/**
@@ -325,11 +376,11 @@ public boolean doesAnomalyDetectorJobIndexExist() {
325376
* @return true if anomaly result index exists
326377
*/
327378
public boolean doesDefaultAnomalyResultIndexExist() {
328-
return sdkClusterService.state().metadata().hasAlias(CommonName.ANOMALY_RESULT_INDEX_ALIAS);
379+
return aliasExists(CommonName.ANOMALY_RESULT_INDEX_ALIAS);
329380
}
330381

331382
public boolean doesIndexExist(String indexName) {
332-
return sdkClusterService.state().metadata().hasIndex(indexName);
383+
return indexExists(indexName);
333384
}
334385

335386
public <T> void initCustomResultIndexAndExecute(String resultIndex, AnomalyDetectorFunction function, ActionListener<T> listener) {
@@ -432,8 +483,24 @@ public boolean isValidResultIndexMapping(String resultIndex) {
432483
// failed to populate the field
433484
return false;
434485
}
435-
IndexMetadata indexMetadata = sdkClusterService.state().metadata().index(resultIndex);
436-
Map<String, Object> indexMapping = indexMetadata.mapping().sourceAsMap();
486+
487+
GetMappingsRequest getMappingRequest = new GetMappingsRequest().indices(resultIndex);
488+
CompletableFuture<GetMappingsResponse> getMappingsFuture = new CompletableFuture<>();
489+
sdkRestClient
490+
.indices()
491+
.getMapping(getMappingRequest, ActionListener.wrap(response -> { getMappingsFuture.complete(response); }, exception -> {
492+
getMappingsFuture.completeExceptionally(exception);
493+
}));
494+
GetMappingsResponse getMappingResponse = getMappingsFuture
495+
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(environmentSettings).getMillis(), TimeUnit.MILLISECONDS)
496+
.join();
497+
498+
Map<String, MappingMetadata> resultIndexMappings = getMappingResponse.mappings();
499+
if (resultIndexMappings.size() == 0) {
500+
return false;
501+
}
502+
Map<String, Object> indexMapping = resultIndexMappings.get(resultIndex).sourceAsMap();
503+
437504
String propertyName = CommonName.PROPERTIES;
438505
if (!indexMapping.containsKey(propertyName) || !(indexMapping.get(propertyName) instanceof LinkedHashMap)) {
439506
return false;
@@ -468,7 +535,7 @@ public boolean isValidResultIndexMapping(String resultIndex) {
468535
* @return true if anomaly state index exists
469536
*/
470537
public boolean doesDetectorStateIndexExist() {
471-
return sdkClusterService.state().getRoutingTable().hasIndex(CommonName.DETECTION_STATE_INDEX);
538+
return indexExists(CommonName.DETECTION_STATE_INDEX);
472539
}
473540

474541
/**
@@ -477,27 +544,7 @@ public boolean doesDetectorStateIndexExist() {
477544
* @return true if checkpoint index exists
478545
*/
479546
public boolean doesCheckpointIndexExist() {
480-
return sdkClusterService.state().getRoutingTable().hasIndex(CommonName.CHECKPOINT_INDEX_NAME);
481-
}
482-
483-
/**
484-
* Index exists or not
485-
* @param sdkClusterService Cluster service
486-
* @param name Index name
487-
* @return true if the index exists
488-
*/
489-
public static boolean doesIndexExists(SDKClusterService sdkClusterService, String name) {
490-
return sdkClusterService.state().getRoutingTable().hasIndex(name);
491-
}
492-
493-
/**
494-
* Alias exists or not
495-
* @param sdkClusterService Cluster service
496-
* @param alias Alias name
497-
* @return true if the alias exists
498-
*/
499-
public static boolean doesAliasExists(SDKClusterService sdkClusterService, String alias) {
500-
return sdkClusterService.state().metadata().hasAlias(alias);
547+
return indexExists(CommonName.CHECKPOINT_INDEX_NAME);
501548
}
502549

503550
private ActionListener<CreateIndexResponse> markMappingUpToDate(ADIndex index, ActionListener<CreateIndexResponse> followingListener) {
@@ -976,9 +1023,9 @@ private void markMappingUpdated(ADIndex adIndex) {
9761023
private void shouldUpdateIndex(ADIndex index, ActionListener<Boolean> thenDo) {
9771024
boolean exists = false;
9781025
if (index.isAlias()) {
979-
exists = AnomalyDetectionIndices.doesAliasExists(sdkClusterService, index.getIndexName());
1026+
exists = aliasExists(index.getIndexName());
9801027
} else {
981-
exists = AnomalyDetectionIndices.doesIndexExists(sdkClusterService, index.getIndexName());
1028+
exists = indexExists(index.getIndexName());
9821029
}
9831030
if (false == exists) {
9841031
thenDo.onResponse(Boolean.FALSE);
@@ -1012,14 +1059,25 @@ private void shouldUpdateIndex(ADIndex index, ActionListener<Boolean> thenDo) {
10121059

10131060
@SuppressWarnings("unchecked")
10141061
private void shouldUpdateConcreteIndex(String concreteIndex, Integer newVersion, ActionListener<Boolean> thenDo) {
1015-
IndexMetadata indexMeataData = sdkClusterService.state().getMetadata().indices().get(concreteIndex);
1016-
if (indexMeataData == null) {
1062+
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(concreteIndex);
1063+
CompletableFuture<GetMappingsResponse> getMappingsFuture = new CompletableFuture<>();
1064+
sdkRestClient
1065+
.indices()
1066+
.getMapping(getMappingsRequest, ActionListener.wrap(response -> { getMappingsFuture.complete(response); }, exception -> {
1067+
getMappingsFuture.completeExceptionally(exception);
1068+
}));
1069+
GetMappingsResponse getMappingResponse = getMappingsFuture
1070+
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(environmentSettings).getMillis(), TimeUnit.MILLISECONDS)
1071+
.join();
1072+
1073+
Map<String, MappingMetadata> concreteIndexMappings = getMappingResponse.mappings();
1074+
if (concreteIndexMappings.size() == 0) {
10171075
thenDo.onResponse(Boolean.FALSE);
10181076
return;
10191077
}
10201078
Integer oldVersion = CommonValue.NO_SCHEMA_VERSION;
10211079

1022-
Map<String, Object> indexMapping = indexMeataData.mapping().getSourceAsMap();
1080+
Map<String, Object> indexMapping = concreteIndexMappings.get(concreteIndex).sourceAsMap();
10231081
Object meta = indexMapping.get(META);
10241082
if (meta != null && meta instanceof Map) {
10251083
Map<String, Object> metaMapping = (Map<String, Object>) meta;
@@ -1098,7 +1156,9 @@ private void updateJobIndexSettingIfNecessary(IndexState jobIndexState, ActionLi
10981156

10991157
GetIndicesSettingsResponse settingResponse;
11001158
try {
1101-
settingResponse = getIndicesSettingsResponse.orTimeout(10L, TimeUnit.SECONDS).get();
1159+
settingResponse = getIndicesSettingsResponse
1160+
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(environmentSettings).getMillis(), TimeUnit.MILLISECONDS)
1161+
.get();
11021162
// auto expand setting is a range string like "1-all"
11031163
org.opensearch.client.opensearch.indices.IndexState indexState = settingResponse.get(ADIndex.JOB.getIndexName());
11041164
String autoExpandReplica = indexState.settings().autoExpandReplicas();

0 commit comments

Comments
 (0)