Skip to content

Commit c20e72d

Browse files
authored
[Extensions] Increasing AD Extension Request timeout to 20 and fixes exception handling for Delete/Profile (#916)
* Replacing hardcoded timeout values with AnomalyDetectorSettings.REQUEST_TIMEOUT setting, increased request timeout setting to 20, fixed affected tests Signed-off-by: Joshua Palis <[email protected]> * Addressing PR comments, adding TODO to opendistro legacy settings Signed-off-by: Joshua Palis <[email protected]> * Fixes Delete/ProfileDetector exception handling to check message rather than type Signed-off-by: Joshua Palis <[email protected]> * Addressing PR comments, added new constructors to IndexUtils/CheckpointDao to assume an empty settings object if not passed directly Signed-off-by: Joshua Palis <[email protected]> --------- Signed-off-by: Joshua Palis <[email protected]>
1 parent 4e97313 commit c20e72d

File tree

10 files changed

+160
-29
lines changed

10 files changed

+160
-29
lines changed

src/main/java/org/opensearch/ad/AnomalyDetectorExtension.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,14 @@ public Collection<Object> createComponents(ExtensionsRunner runner) {
249249

250250
Throttler throttler = new Throttler(getClock());
251251
ClientUtil clientUtil = new ClientUtil(environmentSettings, restClient(), throttler);
252-
IndexUtils indexUtils = new IndexUtils(restClient(), clientUtil, sdkClusterService, indexNameExpressionResolver, javaAsyncClient());
252+
IndexUtils indexUtils = new IndexUtils(
253+
restClient(),
254+
clientUtil,
255+
sdkClusterService,
256+
indexNameExpressionResolver,
257+
javaAsyncClient(),
258+
environmentSettings
259+
);
253260
nodeFilter = new DiscoveryNodeFilterer(sdkClusterService);
254261
AnomalyDetectionIndices anomalyDetectionIndices = new AnomalyDetectionIndices(
255262
sdkRestClient,
@@ -358,7 +365,8 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
358365
AnomalyDetectorSettings.MAX_CHECKPOINT_BYTES,
359366
serializeRCFBufferPool,
360367
AnomalyDetectorSettings.SERIALIZATION_BUFFER_BYTES,
361-
1 - AnomalyDetectorSettings.THRESHOLD_MIN_PVALUE
368+
1 - AnomalyDetectorSettings.THRESHOLD_MIN_PVALUE,
369+
environmentSettings
362370
);
363371

364372
Random random = new Random(42);

src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,9 @@ public void onFailure(Exception exception) {
652652
}
653653

654654
});
655-
Response response = acquireLockResponse.orTimeout(15L, TimeUnit.SECONDS).join();
655+
Response response = acquireLockResponse
656+
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
657+
.join();
656658

657659
log.info("Acquired lock for AD job {}", context.getJobId());
658660

@@ -685,7 +687,9 @@ public void onFailure(Exception exception) {
685687
}
686688

687689
});
688-
Response response = releaseLockResponse.orTimeout(15L, TimeUnit.SECONDS).join();
690+
Response response = releaseLockResponse
691+
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
692+
.join();
689693

690694
boolean lockIsReleased = RestStatus.fromCode(response.getStatusLine().getStatusCode()) == RestStatus.OK ? true : false;
691695
if (lockIsReleased) {

src/main/java/org/opensearch/ad/indices/AnomalyDetectionIndices.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.opensearch.ad.model.AnomalyDetectorJob;
6464
import org.opensearch.ad.model.AnomalyResult;
6565
import org.opensearch.ad.rest.handler.AnomalyDetectorFunction;
66+
import org.opensearch.ad.settings.AnomalyDetectorSettings;
6667
import org.opensearch.ad.util.DiscoveryNodeFilterer;
6768
import org.opensearch.client.indices.CreateIndexRequest;
6869
import org.opensearch.client.indices.CreateIndexResponse;
@@ -121,6 +122,7 @@ public class AnomalyDetectionIndices implements LocalNodeMasterListener {
121122
private final SDKRestClient adminClient;
122123
private final OpenSearchAsyncClient sdkJavaAsyncClient;
123124
private final ThreadPool threadPool;
125+
private final Settings environmentSettings;
124126

125127
private volatile TimeValue historyRolloverPeriod;
126128
private volatile Long historyMaxDocs;
@@ -188,6 +190,7 @@ public AnomalyDetectionIndices(
188190
this.sdkJavaAsyncClient = sdkJavaAsyncClient;
189191
this.sdkClusterService = sdkClusterService;
190192
this.threadPool = threadPool;
193+
this.environmentSettings = settings;
191194
// FIXME Implement this
192195
// https://github.com/opensearch-project/opensearch-sdk-java/issues/423
193196
// this.clusterService.addLocalNodeMasterListener(this);
@@ -1098,7 +1101,9 @@ private void updateJobIndexSettingIfNecessary(IndexState jobIndexState, ActionLi
10981101

10991102
GetIndicesSettingsResponse settingResponse;
11001103
try {
1101-
settingResponse = getIndicesSettingsResponse.orTimeout(10L, TimeUnit.SECONDS).get();
1104+
settingResponse = getIndicesSettingsResponse
1105+
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(environmentSettings).getMillis(), TimeUnit.MILLISECONDS)
1106+
.get();
11021107
// auto expand setting is a range string like "1-all"
11031108
org.opensearch.client.opensearch.indices.IndexState indexState = settingResponse.get(ADIndex.JOB.getIndexName());
11041109
String autoExpandReplica = indexState.settings().autoExpandReplicas();

src/main/java/org/opensearch/ad/ml/CheckpointDao.java

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,15 @@
5656
import org.opensearch.ad.indices.ADIndex;
5757
import org.opensearch.ad.indices.AnomalyDetectionIndices;
5858
import org.opensearch.ad.model.Entity;
59+
import org.opensearch.ad.settings.AnomalyDetectorSettings;
5960
import org.opensearch.ad.util.ClientUtil;
6061
import org.opensearch.client.opensearch.OpenSearchAsyncClient;
6162
import org.opensearch.client.opensearch._types.BulkIndexByScrollFailure;
6263
import org.opensearch.client.opensearch._types.Conflicts;
6364
import org.opensearch.client.opensearch._types.ExpandWildcard;
6465
import org.opensearch.client.opensearch.core.DeleteByQueryRequest;
6566
import org.opensearch.client.opensearch.core.DeleteByQueryResponse;
67+
import org.opensearch.common.settings.Settings;
6668
import org.opensearch.index.IndexNotFoundException;
6769
import org.opensearch.sdk.SDKClient.SDKRestClient;
6870

@@ -115,6 +117,7 @@ public class CheckpointDao {
115117
// configuration
116118
private final String indexName;
117119

120+
private Settings settings;
118121
private Gson gson;
119122
private RandomCutForestMapper mapper;
120123

@@ -142,7 +145,7 @@ public class CheckpointDao {
142145
private double anomalyRate;
143146

144147
/**
145-
* Constructor with dependencies and configuration.
148+
* Constructor with dependencies, configuration, empty settings
146149
*
147150
* @param client ES search client
148151
* @param sdkJavaAsyncClient OpenSearch Async Client
@@ -176,9 +179,68 @@ public CheckpointDao(
176179
GenericObjectPool<LinkedBuffer> serializeRCFBufferPool,
177180
int serializeRCFBufferSize,
178181
double anomalyRate
182+
) {
183+
this(
184+
client,
185+
sdkJavaAsyncClient,
186+
clientUtil,
187+
indexName,
188+
gson,
189+
mapper,
190+
converter,
191+
trcfMapper,
192+
trcfSchema,
193+
thresholdingModelClass,
194+
indexUtil,
195+
maxCheckpointBytes,
196+
serializeRCFBufferPool,
197+
serializeRCFBufferSize,
198+
anomalyRate,
199+
Settings.EMPTY
200+
);
201+
}
202+
203+
/**
204+
* Constructor with dependencies and configuration.
205+
*
206+
* @param client ES search client
207+
* @param sdkJavaAsyncClient OpenSearch Async Client
208+
* @param clientUtil utility with ES client
209+
* @param indexName name of the index for model checkpoints
210+
* @param gson accessor to Gson functionality
211+
* @param mapper RCF model serialization utility
212+
* @param converter converter from rcf v1 serde to protostuff based format
213+
* @param trcfMapper TRCF serialization mapper
214+
* @param trcfSchema TRCF serialization schema
215+
* @param thresholdingModelClass thresholding model's class
216+
* @param indexUtil Index utility methods
217+
* @param maxCheckpointBytes max checkpoint size in bytes
218+
* @param serializeRCFBufferPool object pool for serializing rcf models
219+
* @param serializeRCFBufferSize the size of the buffer for RCF serialization
220+
* @param anomalyRate anomaly rate
221+
* @param settings Environment Settings
222+
*/
223+
public CheckpointDao(
224+
SDKRestClient client,
225+
OpenSearchAsyncClient sdkJavaAsyncClient,
226+
ClientUtil clientUtil,
227+
String indexName,
228+
Gson gson,
229+
RandomCutForestMapper mapper,
230+
V1JsonToV3StateConverter converter,
231+
ThresholdedRandomCutForestMapper trcfMapper,
232+
Schema<ThresholdedRandomCutForestState> trcfSchema,
233+
Class<? extends ThresholdingModel> thresholdingModelClass,
234+
AnomalyDetectionIndices indexUtil,
235+
int maxCheckpointBytes,
236+
GenericObjectPool<LinkedBuffer> serializeRCFBufferPool,
237+
int serializeRCFBufferSize,
238+
double anomalyRate,
239+
Settings settings
179240
) {
180241
this.client = client;
181242
this.sdkJavaAsyncClient = sdkJavaAsyncClient;
243+
this.settings = settings;
182244
this.clientUtil = clientUtil;
183245
this.indexName = indexName;
184246
this.gson = gson;
@@ -192,6 +254,7 @@ public CheckpointDao(
192254
this.serializeRCFBufferPool = serializeRCFBufferPool;
193255
this.serializeRCFBufferSize = serializeRCFBufferSize;
194256
this.anomalyRate = anomalyRate;
257+
this.settings = settings;
195258
}
196259

197260
private void saveModelCheckpointSync(Map<String, Object> source, String modelId) {
@@ -452,7 +515,9 @@ public void deleteModelCheckpointByDetectorId(String detectorID) {
452515
logger.info("Delete checkpoints of detector {}", detectorID);
453516
try {
454517
CompletableFuture<DeleteByQueryResponse> deleteResponse = sdkJavaAsyncClient.deleteByQuery(deleteRequest);
455-
DeleteByQueryResponse response = deleteResponse.orTimeout(10L, TimeUnit.SECONDS).get();
518+
DeleteByQueryResponse response = deleteResponse
519+
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
520+
.get();
456521
if (response.timedOut() || !response.failures().isEmpty()) {
457522
logFailure(response, detectorID);
458523
}

src/main/java/org/opensearch/ad/rest/RestAnomalyDetectorJobAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,9 @@ public void onFailure(Exception exception) {
215215

216216
});
217217

218-
Response response = registerJobDetailsResponse.orTimeout(15, TimeUnit.SECONDS).join();
218+
Response response = registerJobDetailsResponse
219+
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
220+
.join();
219221
this.registeredJobDetails = RestStatus.fromCode(response.getStatusLine().getStatusCode()) == RestStatus.OK ? true : false;
220222
LOG.info("Job Details Registered : " + registeredJobDetails);
221223
}

src/main/java/org/opensearch/ad/settings/LegacyOpenDistroAnomalyDetectorSettings.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,11 @@ private LegacyOpenDistroAnomalyDetectorSettings() {}
5454
Setting.Property.Deprecated
5555
);
5656

57+
// TODO : Revert setting value back to 10 seconds once there is support for more targeted cluster state requests
5758
public static final Setting<TimeValue> REQUEST_TIMEOUT = Setting
5859
.positiveTimeSetting(
5960
"opendistro.anomaly_detection.request_timeout",
60-
TimeValue.timeValueSeconds(10),
61+
TimeValue.timeValueSeconds(20),
6162
Setting.Property.NodeScope,
6263
Setting.Property.Dynamic,
6364
Setting.Property.Deprecated

src/main/java/org/opensearch/ad/task/ADTaskManager.java

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@
121121
import org.opensearch.ad.model.Entity;
122122
import org.opensearch.ad.rest.handler.AnomalyDetectorFunction;
123123
import org.opensearch.ad.rest.handler.IndexAnomalyDetectorJobActionHandler;
124+
import org.opensearch.ad.settings.AnomalyDetectorSettings;
124125
import org.opensearch.ad.transport.ADBatchAnomalyResultAction;
125126
import org.opensearch.ad.transport.ADBatchAnomalyResultRequest;
126127
import org.opensearch.ad.transport.ADCancelTaskAction;
@@ -212,6 +213,8 @@ public class ADTaskManager {
212213
private static int DEFAULT_MAINTAIN_INTERVAL_IN_SECONDS = 5;
213214
private final Semaphore checkingTaskSlot;
214215

216+
private final Settings settings;
217+
215218
private volatile Integer maxAdBatchTaskPerNode;
216219
private volatile Integer maxRunningEntitiesPerDetector;
217220

@@ -238,6 +241,7 @@ public ADTaskManager(
238241
this.nodeFilter = nodeFilter;
239242
this.sdkClusterService = sdkClusterService;
240243
this.adTaskCacheManager = adTaskCacheManager;
244+
this.settings = settings;
241245
/* MultiNode support https://github.com/opensearch-project/opensearch-sdk-java/issues/200 */
242246
// this.hashRing = hashRing;
243247

@@ -1377,7 +1381,9 @@ private void resetEntityTasksAsStopped(String detectorTaskId) {
13771381

13781382
try {
13791383
CompletableFuture<UpdateByQueryResponse> updateByQueryResponse = sdkJavaAsyncClient.updateByQuery(updateByQueryRequest.build());
1380-
UpdateByQueryResponse queryResponse = updateByQueryResponse.orTimeout(10L, TimeUnit.SECONDS).get();
1384+
UpdateByQueryResponse queryResponse = updateByQueryResponse
1385+
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
1386+
.get();
13811387
List<BulkIndexByScrollFailure> bulkFailures = queryResponse.failures();
13821388
if (isNullOrEmpty(bulkFailures)) {
13831389
logger.debug("Updated {} child entity tasks state for detector task {}", queryResponse.updated(), detectorTaskId);
@@ -1573,7 +1579,9 @@ private void updateLatestFlagOfOldTasksAndCreateNewTask(
15731579

15741580
try {
15751581
CompletableFuture<UpdateByQueryResponse> updateByQueryResponse = sdkJavaAsyncClient.updateByQuery(updateByQueryRequest.build());
1576-
UpdateByQueryResponse queryResponse = updateByQueryResponse.orTimeout(10L, TimeUnit.SECONDS).get();
1582+
UpdateByQueryResponse queryResponse = updateByQueryResponse
1583+
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
1584+
.get();
15771585

15781586
List<BulkIndexByScrollFailure> bulkFailures = queryResponse.failures();
15791587
if (isNullOrEmpty(bulkFailures)) {
@@ -1776,7 +1784,9 @@ protected <T> void deleteTaskDocs(
17761784
BulkRequest bulkRequest = new BulkRequest.Builder().operations(operations).build();
17771785
try {
17781786
CompletableFuture<BulkResponse> bulkResponse = sdkJavaAsyncClient.bulk(bulkRequest);
1779-
BulkResponse res = bulkResponse.orTimeout(10L, TimeUnit.SECONDS).get();
1787+
BulkResponse res = bulkResponse
1788+
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
1789+
.get();
17801790

17811791
logger.info("Old AD tasks deleted for detector {}", detectorId);
17821792
List<BulkResponseItem> bulkItemResponses = res.items();
@@ -1833,7 +1843,9 @@ public void cleanChildTasksAndADResultsOfDeletedTask() {
18331843
.build();
18341844
try {
18351845
CompletableFuture<DeleteByQueryResponse> deleteADResultsResponse = sdkJavaAsyncClient.deleteByQuery(deleteADResultsRequest);
1836-
DeleteByQueryResponse res = deleteADResultsResponse.orTimeout(10L, TimeUnit.SECONDS).get();
1846+
DeleteByQueryResponse res = deleteADResultsResponse
1847+
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
1848+
.get();
18371849

18381850
logger.debug("Successfully deleted AD results of task " + taskId);
18391851
DeleteByQueryRequest deleteChildTasksRequest = new DeleteByQueryRequest.Builder()
@@ -1845,7 +1857,9 @@ public void cleanChildTasksAndADResultsOfDeletedTask() {
18451857
try {
18461858
CompletableFuture<DeleteByQueryResponse> deleteChildTasksResponse = sdkJavaAsyncClient
18471859
.deleteByQuery(deleteChildTasksRequest);
1848-
DeleteByQueryResponse deleteByQueryResponse = deleteChildTasksResponse.orTimeout(10L, TimeUnit.SECONDS).get();
1860+
DeleteByQueryResponse deleteByQueryResponse = deleteChildTasksResponse
1861+
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
1862+
.get();
18491863

18501864
logger.debug("Successfully deleted child tasks of task " + taskId);
18511865
cleanChildTasksAndADResultsOfDeletedTask();
@@ -2014,7 +2028,9 @@ public void deleteADTasks(String detectorId, AnomalyDetectorFunction function, A
20142028
request.query(q -> q.bool(query.build()));
20152029
try {
20162030
CompletableFuture<DeleteByQueryResponse> deleteByQueryResponse = sdkJavaAsyncClient.deleteByQuery(request.build());
2017-
DeleteByQueryResponse response = deleteByQueryResponse.orTimeout(10L, TimeUnit.SECONDS).get();
2031+
DeleteByQueryResponse response = deleteByQueryResponse
2032+
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
2033+
.get();
20182034

20192035
if (response.failures() == null || response.failures().size() == 0) {
20202036
logger.info("AD tasks deleted for detector {}", detectorId);
@@ -2023,11 +2039,13 @@ public void deleteADTasks(String detectorId, AnomalyDetectorFunction function, A
20232039
} else {
20242040
listener.onFailure(new OpenSearchStatusException("Failed to delete all AD tasks", RestStatus.INTERNAL_SERVER_ERROR));
20252041
}
2026-
} catch (IndexNotFoundException e) {
2027-
deleteADResultOfDetector(detectorId);
2028-
function.execute();
20292042
} catch (Exception e) {
2030-
listener.onFailure(e);
2043+
if (e.getMessage().contains("index_not_found_exception")) {
2044+
deleteADResultOfDetector(detectorId);
2045+
function.execute();
2046+
} else {
2047+
listener.onFailure(e);
2048+
}
20312049
}
20322050
}
20332051

@@ -2045,7 +2063,9 @@ private void deleteADResultOfDetector(String detectorId) {
20452063
try {
20462064
CompletableFuture<DeleteByQueryResponse> deleteADResultsResponse = sdkJavaAsyncClient
20472065
.deleteByQuery(deleteADResultsRequest.build());
2048-
DeleteByQueryResponse deleteByQueryResponse = deleteADResultsResponse.orTimeout(10L, TimeUnit.SECONDS).get();
2066+
DeleteByQueryResponse deleteByQueryResponse = deleteADResultsResponse
2067+
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
2068+
.get();
20492069
logger.debug("Successfully deleted AD results of detector " + detectorId);
20502070
} catch (Exception exception) {
20512071
logger.error("Failed to delete AD results of detector " + detectorId, exception);
@@ -3037,7 +3057,9 @@ public void resetLatestFlagAsFalse(List<ADTask> adTasks) {
30373057

30383058
try {
30393059
CompletableFuture<BulkResponse> bulkResponse = sdkJavaAsyncClient.bulk(bulkRequest);
3040-
BulkResponse res = bulkResponse.orTimeout(10L, TimeUnit.SECONDS).get();
3060+
BulkResponse res = bulkResponse
3061+
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
3062+
.get();
30413063

30423064
List<BulkResponseItem> bulkItemResponses = res.items();
30433065
if (bulkItemResponses != null && bulkItemResponses.size() > 0) {

src/main/java/org/opensearch/ad/transport/DeleteAnomalyDetectorTransportAction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.opensearch.ad.util.RestHandlerUtils;
4545
import org.opensearch.common.settings.Settings;
4646
import org.opensearch.core.xcontent.XContentParser;
47-
import org.opensearch.index.IndexNotFoundException;
4847
import org.opensearch.rest.RestStatus;
4948
import org.opensearch.sdk.ExtensionsRunner;
5049
import org.opensearch.sdk.SDKClient.SDKRestClient;
@@ -147,7 +146,7 @@ private void deleteAnomalyDetectorJobDoc(String detectorId, ActionListener<Delet
147146
}
148147
}, exception -> {
149148
LOG.error("Failed to delete AD job for " + detectorId, exception);
150-
if (exception instanceof IndexNotFoundException) {
149+
if (exception.getMessage().contains("index_not_found_exception")) {
151150
deleteDetectorStateDoc(detectorId, listener);
152151
} else {
153152
LOG.error("Failed to delete anomaly detector job", exception);
@@ -163,7 +162,7 @@ private void deleteDetectorStateDoc(String detectorId, ActionListener<DeleteResp
163162
// whether deleted state doc or not, continue as state doc may not exist
164163
deleteAnomalyDetectorDoc(detectorId, listener);
165164
}, exception -> {
166-
if (exception instanceof IndexNotFoundException) {
165+
if (exception.getMessage().contains("index_not_found_exception")) {
167166
deleteAnomalyDetectorDoc(detectorId, listener);
168167
} else {
169168
LOG.error("Failed to delete detector state", exception);

0 commit comments

Comments
 (0)