Skip to content

Commit d1292ed

Browse files
oneby-wangoneby-wang
authored andcommitted
[fix][admin] Set local policies overwrites "number of bundles" passed during namespace creation (apache#24762)
Co-authored-by: oneby-wang <[email protected]> (cherry picked from commit 3983ff0)
1 parent 4476a2a commit d1292ed

File tree

3 files changed

+140
-5
lines changed

3 files changed

+140
-5
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.pulsar.broker.admin.impl;
2020

2121
import static org.apache.commons.lang3.StringUtils.isBlank;
22-
import static org.apache.pulsar.common.policies.data.PoliciesUtil.defaultBundle;
2322
import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
2423
import com.google.common.collect.Sets;
2524
import java.lang.reflect.Field;
@@ -909,7 +908,7 @@ protected void internalSetBookieAffinityGroup(BookieAffinityGroupData bookieAffi
909908
policies -> new LocalPolicies(policies.bundles,
910909
bookieAffinityGroup,
911910
policies.namespaceAntiAffinityGroup))
912-
.orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
911+
.orElseGet(() -> new LocalPolicies(getDefaultBundleData(),
913912
bookieAffinityGroup,
914913
null));
915914
log.info("[{}] Successfully updated local-policies configuration: namespace={}, map={}", clientAppId(),
@@ -920,6 +919,8 @@ protected void internalSetBookieAffinityGroup(BookieAffinityGroupData bookieAffi
920919
log.warn("[{}] Failed to update local-policy configuration for namespace {}: does not exist", clientAppId(),
921920
namespaceName);
922921
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
922+
} catch (RestException re) {
923+
throw re;
923924
} catch (Exception e) {
924925
log.error("[{}] Failed to update local-policy configuration for namespace {}", clientAppId(), namespaceName,
925926
e);
@@ -1735,11 +1736,13 @@ protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
17351736
lp.map(policies -> new LocalPolicies(policies.bundles,
17361737
policies.bookieAffinityGroup,
17371738
antiAffinityGroup))
1738-
.orElseGet(() -> new LocalPolicies(defaultBundle(),
1739+
.orElseGet(() -> new LocalPolicies(getDefaultBundleData(),
17391740
null, antiAffinityGroup))
17401741
);
17411742
log.info("[{}] Successfully updated local-policies configuration: namespace={}, map={}", clientAppId(),
17421743
namespaceName, antiAffinityGroup);
1744+
} catch (RestException re) {
1745+
throw re;
17431746
} catch (Exception e) {
17441747
log.error("[{}] Failed to update local-policy configuration for namespace {}", clientAppId(), namespaceName,
17451748
e);
@@ -2793,5 +2796,22 @@ protected CompletableFuture<Set<String>> internalGetNamespaceAllowedClustersAsyn
27932796
.thenApply(policies -> policies.allowed_clusters);
27942797
}
27952798

2799+
// TODO remove this sync method after async refactor
2800+
private BundlesData getDefaultBundleData() {
2801+
try {
2802+
return getDefaultBundleDataAsync().get(config().getMetadataStoreOperationTimeoutSeconds(),
2803+
TimeUnit.SECONDS);
2804+
} catch (Exception e) {
2805+
log.error("[{}] Failed to get namespace-policy configuration for namespace {}", clientAppId(),
2806+
namespaceName, e);
2807+
throw new RestException(e);
2808+
}
2809+
}
2810+
2811+
private CompletableFuture<BundlesData> getDefaultBundleDataAsync() {
2812+
return namespaceResources().getPoliciesAsync(namespaceName).thenApply(
2813+
optionalPolicies -> optionalPolicies.isPresent() ? optionalPolicies.get().bundles :
2814+
getBundles(config().getDefaultNumberOfNamespaceBundles()));
2815+
}
27962816

27972817
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.admin;
2020

21+
import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
2122
import static org.mockito.Mockito.doReturn;
2223
import static org.mockito.Mockito.doThrow;
2324
import static org.mockito.Mockito.mock;
@@ -38,8 +39,11 @@
3839
import org.apache.pulsar.broker.web.PulsarWebResource;
3940
import org.apache.pulsar.broker.web.RestException;
4041
import org.apache.pulsar.common.naming.NamespaceName;
42+
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
43+
import org.apache.pulsar.common.policies.data.BundlesData;
4144
import org.apache.pulsar.common.policies.data.ClusterData;
4245
import org.apache.pulsar.common.policies.data.DispatchRate;
46+
import org.apache.pulsar.common.policies.data.Policies;
4347
import org.apache.pulsar.common.policies.data.PolicyName;
4448
import org.apache.pulsar.common.policies.data.PolicyOperation;
4549
import org.apache.pulsar.common.policies.data.PublishRate;
@@ -196,4 +200,104 @@ public void testOperationDispatchRate() throws Exception {
196200
this.testTenant, this.testNamespace));
197201
assertTrue(Objects.isNull(dispatchRate));
198202
}
203+
204+
@Test
205+
public void testSetBookieAffinityGroupWithEmptyPolicies() throws Exception {
206+
// 1. create namespace with empty policies
207+
String setBookieAffinityGroupNs = "test-set-bookie-affinity-group-ns";
208+
asyncRequests(response -> namespaces.createNamespace(response, testTenant, setBookieAffinityGroupNs, null));
209+
210+
// 2.set bookie affinity group
211+
String primaryAffinityGroup = "primary-affinity-group";
212+
String secondaryAffinityGroup = "secondary-affinity-group";
213+
BookieAffinityGroupData bookieAffinityGroupDataReq =
214+
BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(primaryAffinityGroup)
215+
.bookkeeperAffinityGroupSecondary(secondaryAffinityGroup).build();
216+
namespaces.setBookieAffinityGroup(testTenant, setBookieAffinityGroupNs, bookieAffinityGroupDataReq);
217+
218+
// 3.query namespace num bundles, should be conf.getDefaultNumberOfNamespaceBundles()
219+
BundlesData bundlesData = (BundlesData) asyncRequests(
220+
response -> namespaces.getBundlesData(response, testTenant, setBookieAffinityGroupNs));
221+
assertEquals(bundlesData.getNumBundles(), conf.getDefaultNumberOfNamespaceBundles());
222+
223+
// 4.assert namespace bookie affinity group
224+
BookieAffinityGroupData bookieAffinityGroupDataResp =
225+
namespaces.getBookieAffinityGroup(testTenant, setBookieAffinityGroupNs);
226+
assertEquals(bookieAffinityGroupDataResp, bookieAffinityGroupDataReq);
227+
}
228+
229+
@Test
230+
public void testSetBookieAffinityGroupWithExistBundlePolicies() throws Exception {
231+
// 1. create namespace with specified num bundles
232+
String setBookieAffinityGroupNs = "test-set-bookie-affinity-group-ns";
233+
Policies policies = new Policies();
234+
policies.bundles = getBundles(10);
235+
asyncRequests(response -> namespaces.createNamespace(response, testTenant, setBookieAffinityGroupNs, policies));
236+
237+
// 2.set bookie affinity group
238+
String primaryAffinityGroup = "primary-affinity-group";
239+
String secondaryAffinityGroup = "secondary-affinity-group";
240+
BookieAffinityGroupData bookieAffinityGroupDataReq =
241+
BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(primaryAffinityGroup)
242+
.bookkeeperAffinityGroupSecondary(secondaryAffinityGroup).build();
243+
namespaces.setBookieAffinityGroup(testTenant, setBookieAffinityGroupNs, bookieAffinityGroupDataReq);
244+
245+
// 3.query namespace num bundles, should be policies.bundles, which we set before
246+
BundlesData bundlesData = (BundlesData) asyncRequests(
247+
response -> namespaces.getBundlesData(response, testTenant, setBookieAffinityGroupNs));
248+
assertEquals(bundlesData, policies.bundles);
249+
250+
// 4.assert namespace bookie affinity group
251+
BookieAffinityGroupData bookieAffinityGroupDataResp =
252+
namespaces.getBookieAffinityGroup(testTenant, setBookieAffinityGroupNs);
253+
assertEquals(bookieAffinityGroupDataResp, bookieAffinityGroupDataReq);
254+
}
255+
256+
@Test
257+
public void testSetNamespaceAntiAffinityGroupWithEmptyPolicies() throws Exception {
258+
// 1. create namespace with empty policies
259+
String setNamespaceAntiAffinityGroupNs = "test-set-namespace-anti-affinity-group-ns";
260+
asyncRequests(
261+
response -> namespaces.createNamespace(response, testTenant, setNamespaceAntiAffinityGroupNs, null));
262+
263+
// 2.set namespace anti affinity group
264+
String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
265+
namespaces.setNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs,
266+
namespaceAntiAffinityGroupReq);
267+
268+
// 3.query namespace num bundles, should be conf.getDefaultNumberOfNamespaceBundles()
269+
BundlesData bundlesData = (BundlesData) asyncRequests(
270+
response -> namespaces.getBundlesData(response, testTenant, setNamespaceAntiAffinityGroupNs));
271+
assertEquals(bundlesData.getNumBundles(), conf.getDefaultNumberOfNamespaceBundles());
272+
273+
// 4.assert namespace anti affinity group
274+
String namespaceAntiAffinityGroupResp =
275+
namespaces.getNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs);
276+
assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroupReq);
277+
}
278+
279+
@Test
280+
public void testSetNamespaceAntiAffinityGroupWithExistBundlePolicies() throws Exception {
281+
// 1. create namespace with specified num bundles
282+
String setNamespaceAntiAffinityGroupNs = "test-set-namespace-anti-affinity-group-ns";
283+
Policies policies = new Policies();
284+
policies.bundles = getBundles(10);
285+
asyncRequests(response -> namespaces.createNamespace(response, testTenant, setNamespaceAntiAffinityGroupNs,
286+
policies));
287+
288+
// 2.set namespace anti affinity group
289+
String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
290+
namespaces.setNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs,
291+
namespaceAntiAffinityGroupReq);
292+
293+
// 3.query namespace num bundles, should be policies.bundles, which we set before
294+
BundlesData bundlesData = (BundlesData) asyncRequests(
295+
response -> namespaces.getBundlesData(response, testTenant, setNamespaceAntiAffinityGroupNs));
296+
assertEquals(bundlesData, policies.bundles);
297+
298+
// 4.assert namespace anti affinity group
299+
String namespaceAntiAffinityGroupResp =
300+
namespaces.getNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs);
301+
assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroupReq);
302+
}
199303
}

tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ public void testStopBroker() throws Exception {
288288
}
289289

290290
@Test(timeOut = 40 * 1000)
291-
public void testAntiaffinityPolicy() throws PulsarAdminException {
291+
public void testAntiAffinityPolicy() throws PulsarAdminException {
292292
final String namespaceAntiAffinityGroup = "my-anti-affinity-filter";
293293
final String antiAffinityEnabledNameSpace = DEFAULT_TENANT + "/my-ns-filter" + nsSuffix;
294294
final int numPartition = 20;
@@ -297,14 +297,25 @@ public void testAntiaffinityPolicy() throws PulsarAdminException {
297297

298298
assertEquals(activeBrokers.size(), NUM_BROKERS);
299299

300+
Set<String> antiAffinityEnabledNameSpacesReq = new HashSet<>();
300301
for (int i = 0; i < activeBrokers.size(); i++) {
301302
String namespace = antiAffinityEnabledNameSpace + "-" + i;
302-
admin.namespaces().createNamespace(namespace, 10);
303+
antiAffinityEnabledNameSpacesReq.add(namespace);
304+
admin.namespaces().createNamespace(namespace, 1);
303305
admin.namespaces().setNamespaceAntiAffinityGroup(namespace, namespaceAntiAffinityGroup);
304306
admin.clusters().createFailureDomain(clusterName, namespaceAntiAffinityGroup, FailureDomain.builder()
305307
.brokers(Set.of(activeBrokers.get(i))).build());
308+
String namespaceAntiAffinityGroupResp = admin.namespaces().getNamespaceAntiAffinityGroup(namespace);
309+
assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroup);
310+
FailureDomain failureDomainResp =
311+
admin.clusters().getFailureDomain(clusterName, namespaceAntiAffinityGroup);
312+
assertEquals(failureDomainResp.getBrokers(), Set.of(activeBrokers.get(i)));
306313
}
307314

315+
List<String> antiAffinityNamespacesResp =
316+
admin.namespaces().getAntiAffinityNamespaces(DEFAULT_TENANT, clusterName, namespaceAntiAffinityGroup);
317+
assertEquals(new HashSet<>(antiAffinityNamespacesResp), antiAffinityEnabledNameSpacesReq);
318+
308319
Set<String> result = new HashSet<>();
309320
for (int i = 0; i < activeBrokers.size(); i++) {
310321
final String topic = "persistent://" + antiAffinityEnabledNameSpace + "-" + i +"/topic";

0 commit comments

Comments
 (0)