Skip to content

Commit 5d2e8a2

Browse files
authored
[WLM] Add WLM mode validation for workload group CRUD requests (#18652)
Signed-off-by: Ruirui Zhang <[email protected]>
1 parent 5ac8177 commit 5d2e8a2

15 files changed

+339
-45
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1414
- Add NodeResourceUsageStats to ClusterInfo ([#18480](https://github.com/opensearch-project/OpenSearch/issues/18472))
1515
- Introduce SecureHttpTransportParameters experimental API (to complement SecureTransportParameters counterpart) ([#18572](https://github.com/opensearch-project/OpenSearch/issues/18572))
1616
- Create equivalents of JSM's AccessController in the java agent ([#18346](https://github.com/opensearch-project/OpenSearch/issues/18346))
17+
- [WLM] Add WLM mode validation for workload group CRUD requests ([#18652](https://github.com/opensearch-project/OpenSearch/issues/18652))
1718
- Introduced a new cluster-level API to fetch remote store metadata (segments and translogs) for each shard of an index. ([#18257](https://github.com/opensearch-project/OpenSearch/pull/18257))
1819
- Add last index request timestamp columns to the `_cat/indices` API. ([10766](https://github.com/opensearch-project/OpenSearch/issues/10766))
1920
- Introduce a new pull-based ingestion plugin for file-based indexing (for local testing) ([#18591](https://github.com/opensearch-project/OpenSearch/pull/18591))

plugins/workload-management/src/javaRestTest/java/org/opensearch/rest/WorkloadManagementRestIT.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,18 @@
1313
import org.opensearch.client.Response;
1414
import org.opensearch.client.ResponseException;
1515
import org.opensearch.test.rest.OpenSearchRestTestCase;
16+
import org.junit.Before;
1617

1718
import java.io.IOException;
19+
import java.util.Locale;
1820

1921
public class WorkloadManagementRestIT extends OpenSearchRestTestCase {
2022

23+
@Before
24+
public void enableWlmMode() throws Exception {
25+
setWlmMode("enabled");
26+
}
27+
2128
public void testCreate() throws Exception {
2229
Response response = performOperation("PUT", "_wlm/workload_group", getCreateJson("analytics", "enforced", 0.4, 0.2));
2330
assertEquals(response.getStatusLine().getStatusCode(), 200);
@@ -129,6 +136,16 @@ public void testCRUD() throws Exception {
129136
performOperation("DELETE", "_wlm/workload_group/users3", null);
130137
}
131138

139+
public void testOperationWhenWlmDisabled() throws Exception {
140+
setWlmMode("disabled");
141+
assertThrows(
142+
ResponseException.class,
143+
() -> performOperation("PUT", "_wlm/workload_group", getCreateJson("analytics", "enforced", 0.4, 0.2))
144+
);
145+
assertThrows(ResponseException.class, () -> performOperation("DELETE", "_wlm/workload_group/analytics4", null));
146+
assertOK(performOperation("GET", "_wlm/workload_group/", null));
147+
}
148+
132149
static String getCreateJson(String name, String resiliencyMode, double cpu, double memory) {
133150
return "{\n"
134151
+ " \"name\": \""
@@ -171,4 +188,19 @@ Response performOperation(String method, String uriPath, String json) throws IOE
171188
}
172189
return client().performRequest(request);
173190
}
191+
192+
private void setWlmMode(String mode) throws Exception {
193+
String settingJson = String.format(Locale.ROOT, """
194+
{
195+
"persistent": {
196+
"wlm.workload_group.mode": "%s"
197+
}
198+
}
199+
""", mode);
200+
201+
Request request = new Request("PUT", "/_cluster/settings");
202+
request.setJsonEntity(settingJson);
203+
Response response = client().performRequest(request);
204+
assertEquals(200, response.getStatusLine().getStatusCode());
205+
}
174206
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.wlm;
10+
11+
import org.opensearch.common.settings.ClusterSettings;
12+
import org.opensearch.common.settings.Settings;
13+
import org.opensearch.wlm.WlmMode;
14+
import org.opensearch.wlm.WorkloadManagementSettings;
15+
16+
/**
17+
* Central provider for maintaining and supplying the current values of wlm cluster settings.
18+
* This class listens for updates to relevant settings and provides the latest setting values.
19+
*/
20+
public class WlmClusterSettingValuesProvider {
21+
22+
private volatile WlmMode wlmMode;
23+
24+
/**
25+
* Constructor for WlmClusterSettingValuesProvider
26+
* @param settings OpenSearch settings
27+
* @param clusterSettings Cluster settings to register update listener
28+
*/
29+
public WlmClusterSettingValuesProvider(Settings settings, ClusterSettings clusterSettings) {
30+
this.wlmMode = WorkloadManagementSettings.WLM_MODE_SETTING.get(settings);
31+
clusterSettings.addSettingsUpdateConsumer(WorkloadManagementSettings.WLM_MODE_SETTING, this::setWlmMode);
32+
}
33+
34+
/**
35+
* Check if WLM mode is ENABLED
36+
* Throws an IllegalStateException if WLM mode is DISABLED or MONITOR ONLY.
37+
* @param operationDescription A short text describing the operation, e.g. "create workload group".
38+
*/
39+
public void ensureWlmEnabled(String operationDescription) {
40+
if (wlmMode != WlmMode.ENABLED) {
41+
throw new IllegalStateException(
42+
"Cannot "
43+
+ operationDescription
44+
+ " because workload management mode is disabled or monitor_only."
45+
+ "To enable this feature, set [wlm.workload_group.mode] to 'enabled' in cluster settings."
46+
);
47+
}
48+
}
49+
50+
/**
51+
* Set the latest WLM mode.
52+
* @param mode The wlm mode to set
53+
*/
54+
private void setWlmMode(WlmMode mode) {
55+
this.wlmMode = mode;
56+
}
57+
58+
/**
59+
* Get the latest WLM mode.
60+
*/
61+
public WlmMode getWlmMode() {
62+
return wlmMode;
63+
}
64+
}

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPlugin.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ public class WorkloadManagementPlugin extends Plugin implements ActionPlugin, Sy
9393
private static FeatureType featureType;
9494
private static RulePersistenceService rulePersistenceService;
9595
private static RuleRoutingService ruleRoutingService;
96+
private WlmClusterSettingValuesProvider wlmClusterSettingValuesProvider;
9697
private AutoTaggingActionFilter autoTaggingActionFilter;
9798

9899
/**
@@ -114,6 +115,10 @@ public Collection<Object> createComponents(
114115
IndexNameExpressionResolver indexNameExpressionResolver,
115116
Supplier<RepositoriesService> repositoriesServiceSupplier
116117
) {
118+
wlmClusterSettingValuesProvider = new WlmClusterSettingValuesProvider(
119+
clusterService.getSettings(),
120+
clusterService.getClusterSettings()
121+
);
117122
featureType = new WorkloadGroupFeatureType(new WorkloadGroupFeatureValueValidator(clusterService));
118123
RuleEntityParser parser = new XContentRuleParser(featureType);
119124
AttributeValueStoreFactory attributeValueStoreFactory = new AttributeValueStoreFactory(
@@ -134,12 +139,10 @@ public Collection<Object> createComponents(
134139
RefreshBasedSyncMechanism refreshMechanism = new RefreshBasedSyncMechanism(
135140
threadPool,
136141
clusterService.getSettings(),
137-
clusterService.getClusterSettings(),
138-
parser,
139-
ruleProcessingService,
140142
featureType,
141143
rulePersistenceService,
142-
new RuleEventClassifier(Collections.emptySet(), ruleProcessingService)
144+
new RuleEventClassifier(Collections.emptySet(), ruleProcessingService),
145+
wlmClusterSettingValuesProvider
143146
);
144147

145148
autoTaggingActionFilter = new AutoTaggingActionFilter(ruleProcessingService, threadPool);
@@ -183,10 +186,10 @@ public List<RestHandler> getRestHandlers(
183186
Supplier<DiscoveryNodes> nodesInCluster
184187
) {
185188
return List.of(
186-
new RestCreateWorkloadGroupAction(),
189+
new RestCreateWorkloadGroupAction(wlmClusterSettingValuesProvider),
187190
new RestGetWorkloadGroupAction(),
188-
new RestDeleteWorkloadGroupAction(),
189-
new RestUpdateWorkloadGroupAction()
191+
new RestDeleteWorkloadGroupAction(wlmClusterSettingValuesProvider),
192+
new RestUpdateWorkloadGroupAction(wlmClusterSettingValuesProvider)
190193
);
191194
}
192195

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestCreateWorkloadGroupAction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.opensearch.core.rest.RestStatus;
1212
import org.opensearch.core.xcontent.ToXContent;
1313
import org.opensearch.core.xcontent.XContentParser;
14+
import org.opensearch.plugin.wlm.WlmClusterSettingValuesProvider;
1415
import org.opensearch.plugin.wlm.action.CreateWorkloadGroupAction;
1516
import org.opensearch.plugin.wlm.action.CreateWorkloadGroupRequest;
1617
import org.opensearch.plugin.wlm.action.CreateWorkloadGroupResponse;
@@ -35,10 +36,15 @@
3536
*/
3637
public class RestCreateWorkloadGroupAction extends BaseRestHandler {
3738

39+
private final WlmClusterSettingValuesProvider nonPluginSettingValuesProvider;
40+
3841
/**
3942
* Constructor for RestCreateWorkloadGroupAction
43+
* @param nonPluginSettingValuesProvider the settings provider to access the current WLM mode
4044
*/
41-
public RestCreateWorkloadGroupAction() {}
45+
public RestCreateWorkloadGroupAction(WlmClusterSettingValuesProvider nonPluginSettingValuesProvider) {
46+
this.nonPluginSettingValuesProvider = nonPluginSettingValuesProvider;
47+
}
4248

4349
@Override
4450
public String getName() {
@@ -55,6 +61,7 @@ public List<Route> routes() {
5561

5662
@Override
5763
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
64+
nonPluginSettingValuesProvider.ensureWlmEnabled(getName());
5865
try (XContentParser parser = request.contentParser()) {
5966
CreateWorkloadGroupRequest createWorkloadGroupRequest = CreateWorkloadGroupRequest.fromXContent(parser);
6067
return channel -> client.execute(

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestDeleteWorkloadGroupAction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.plugin.wlm.rest;
1010

11+
import org.opensearch.plugin.wlm.WlmClusterSettingValuesProvider;
1112
import org.opensearch.plugin.wlm.action.DeleteWorkloadGroupAction;
1213
import org.opensearch.plugin.wlm.action.DeleteWorkloadGroupRequest;
1314
import org.opensearch.rest.BaseRestHandler;
@@ -27,10 +28,15 @@
2728
*/
2829
public class RestDeleteWorkloadGroupAction extends BaseRestHandler {
2930

31+
private final WlmClusterSettingValuesProvider nonPluginSettingValuesProvider;
32+
3033
/**
3134
* Constructor for RestDeleteWorkloadGroupAction
35+
* @param nonPluginSettingValuesProvider the settings provider to access the current WLM mode
3236
*/
33-
public RestDeleteWorkloadGroupAction() {}
37+
public RestDeleteWorkloadGroupAction(WlmClusterSettingValuesProvider nonPluginSettingValuesProvider) {
38+
this.nonPluginSettingValuesProvider = nonPluginSettingValuesProvider;
39+
}
3440

3541
@Override
3642
public String getName() {
@@ -47,6 +53,7 @@ public List<Route> routes() {
4753

4854
@Override
4955
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
56+
nonPluginSettingValuesProvider.ensureWlmEnabled(getName());
5057
DeleteWorkloadGroupRequest deleteWorkloadGroupRequest = new DeleteWorkloadGroupRequest(request.param("name"));
5158
deleteWorkloadGroupRequest.clusterManagerNodeTimeout(
5259
request.paramAsTime("cluster_manager_timeout", deleteWorkloadGroupRequest.clusterManagerNodeTimeout())

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestUpdateWorkloadGroupAction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.opensearch.core.rest.RestStatus;
1212
import org.opensearch.core.xcontent.ToXContent;
1313
import org.opensearch.core.xcontent.XContentParser;
14+
import org.opensearch.plugin.wlm.WlmClusterSettingValuesProvider;
1415
import org.opensearch.plugin.wlm.action.UpdateWorkloadGroupAction;
1516
import org.opensearch.plugin.wlm.action.UpdateWorkloadGroupRequest;
1617
import org.opensearch.plugin.wlm.action.UpdateWorkloadGroupResponse;
@@ -35,10 +36,15 @@
3536
*/
3637
public class RestUpdateWorkloadGroupAction extends BaseRestHandler {
3738

39+
private final WlmClusterSettingValuesProvider nonPluginSettingValuesProvider;
40+
3841
/**
3942
* Constructor for RestUpdateWorkloadGroupAction
43+
* @param nonPluginSettingValuesProvider the settings provider to access the current WLM mode
4044
*/
41-
public RestUpdateWorkloadGroupAction() {}
45+
public RestUpdateWorkloadGroupAction(WlmClusterSettingValuesProvider nonPluginSettingValuesProvider) {
46+
this.nonPluginSettingValuesProvider = nonPluginSettingValuesProvider;
47+
}
4248

4349
@Override
4450
public String getName() {
@@ -55,6 +61,7 @@ public List<Route> routes() {
5561

5662
@Override
5763
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
64+
nonPluginSettingValuesProvider.ensureWlmEnabled(getName());
5865
try (XContentParser parser = request.contentParser()) {
5966
UpdateWorkloadGroupRequest updateWorkloadGroupRequest = UpdateWorkloadGroupRequest.fromXContent(parser, request.param("name"));
6067
return channel -> client.execute(

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rule/sync/RefreshBasedSyncMechanism.java

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,13 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
14-
import org.opensearch.common.settings.ClusterSettings;
1514
import org.opensearch.common.settings.Setting;
1615
import org.opensearch.common.settings.Settings;
1716
import org.opensearch.common.unit.TimeValue;
1817
import org.opensearch.core.action.ActionListener;
18+
import org.opensearch.plugin.wlm.WlmClusterSettingValuesProvider;
1919
import org.opensearch.plugin.wlm.rule.sync.detect.RuleEvent;
2020
import org.opensearch.plugin.wlm.rule.sync.detect.RuleEventClassifier;
21-
import org.opensearch.rule.InMemoryRuleProcessingService;
22-
import org.opensearch.rule.RuleEntityParser;
2321
import org.opensearch.rule.RulePersistenceService;
2422
import org.opensearch.rule.action.GetRuleRequest;
2523
import org.opensearch.rule.action.GetRuleResponse;
@@ -28,7 +26,6 @@
2826
import org.opensearch.threadpool.Scheduler;
2927
import org.opensearch.threadpool.ThreadPool;
3028
import org.opensearch.wlm.WlmMode;
31-
import org.opensearch.wlm.WorkloadManagementSettings;
3229

3330
import java.io.IOException;
3431
import java.util.Collections;
@@ -65,12 +62,10 @@ public class RefreshBasedSyncMechanism extends AbstractLifecycleComponent {
6562
private final ThreadPool threadPool;
6663
private long refreshInterval;
6764
private volatile Scheduler.Cancellable scheduledFuture;
68-
private final RuleEntityParser parser;
69-
private final InMemoryRuleProcessingService ruleProcessingService;
7065
private final RulePersistenceService rulePersistenceService;
7166
private final RuleEventClassifier ruleEventClassifier;
7267
private final FeatureType featureType;
73-
private WlmMode wlmMode;
68+
private final WlmClusterSettingValuesProvider nonPluginSettingValuesProvider;
7469
// This var keeps the Rules which were present during last run of this service
7570
private Set<Rule> lastRunIndexedRules;
7671
private static final Logger logger = LogManager.getLogger(RefreshBasedSyncMechanism.class);
@@ -80,41 +75,34 @@ public class RefreshBasedSyncMechanism extends AbstractLifecycleComponent {
8075
*
8176
* @param threadPool
8277
* @param settings
83-
* @param clusterSettings
84-
* @param parser
85-
* @param ruleProcessingService
8678
* @param featureType
8779
* @param rulePersistenceService
8880
* @param ruleEventClassifier
81+
* @param nonPluginSettingValuesProvider
8982
*/
9083
public RefreshBasedSyncMechanism(
9184
ThreadPool threadPool,
9285
Settings settings,
93-
ClusterSettings clusterSettings,
94-
RuleEntityParser parser,
95-
InMemoryRuleProcessingService ruleProcessingService,
9686
FeatureType featureType,
9787
RulePersistenceService rulePersistenceService,
98-
RuleEventClassifier ruleEventClassifier
88+
RuleEventClassifier ruleEventClassifier,
89+
WlmClusterSettingValuesProvider nonPluginSettingValuesProvider
9990
) {
10091
this.threadPool = threadPool;
10192
refreshInterval = RULE_SYNC_REFRESH_INTERVAL_SETTING.get(settings);
102-
this.parser = parser;
103-
this.ruleProcessingService = ruleProcessingService;
10493
this.featureType = featureType;
10594
this.rulePersistenceService = rulePersistenceService;
10695
this.lastRunIndexedRules = new HashSet<>();
10796
this.ruleEventClassifier = ruleEventClassifier;
108-
wlmMode = WorkloadManagementSettings.WLM_MODE_SETTING.get(settings);
109-
clusterSettings.addSettingsUpdateConsumer(WorkloadManagementSettings.WLM_MODE_SETTING, this::setWlmMode);
97+
this.nonPluginSettingValuesProvider = nonPluginSettingValuesProvider;
11098
}
11199

112100
/**
113101
* synchronized check is needed in case two scheduled runs happen concurrently though highly improbable
114102
* but theoretically possible
115103
*/
116104
synchronized void doRun() {
117-
if (wlmMode != WlmMode.ENABLED) {
105+
if (nonPluginSettingValuesProvider.getWlmMode() != WlmMode.ENABLED) {
118106
return;
119107
}
120108

@@ -161,8 +149,4 @@ protected void doClose() throws IOException {
161149
scheduledFuture.cancel();
162150
}
163151
}
164-
165-
void setWlmMode(WlmMode mode) {
166-
this.wlmMode = mode;
167-
}
168152
}

0 commit comments

Comments
 (0)