Skip to content

Commit 3717b79

Browse files
oneby-wangoneby-wang
authored andcommitted
[fix][admin] Set local policies overwrites "number of bundles" passed during namespace creation (#24762)
Co-authored-by: oneby-wang <[email protected]> (cherry picked from commit 3983ff0)
1 parent c30cb6a commit 3717b79

File tree

3 files changed

+188
-10
lines changed

3 files changed

+188
-10
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.pulsar.broker.admin.impl;
2020

2121
import static org.apache.commons.lang3.StringUtils.isBlank;
22-
import static org.apache.pulsar.common.policies.data.PoliciesUtil.defaultBundle;
2322
import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
2423
import com.google.common.collect.Sets;
2524
import java.lang.reflect.Field;
@@ -969,9 +968,7 @@ protected void internalSetBookieAffinityGroup(BookieAffinityGroupData bookieAffi
969968
bookieAffinityGroup,
970969
policies.namespaceAntiAffinityGroup,
971970
policies.migrated))
972-
.orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
973-
bookieAffinityGroup,
974-
null));
971+
.orElseGet(() -> new LocalPolicies(getDefaultBundleData(), bookieAffinityGroup, null));
975972
log.info("[{}] Successfully updated local-policies configuration: namespace={}, map={}", clientAppId(),
976973
namespaceName, localPolicies);
977974
return localPolicies;
@@ -980,6 +977,8 @@ protected void internalSetBookieAffinityGroup(BookieAffinityGroupData bookieAffi
980977
log.warn("[{}] Failed to update local-policy configuration for namespace {}: does not exist", clientAppId(),
981978
namespaceName);
982979
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
980+
} catch (RestException re) {
981+
throw re;
983982
} catch (Exception e) {
984983
log.error("[{}] Failed to update local-policy configuration for namespace {}", clientAppId(), namespaceName,
985984
e);
@@ -1796,11 +1795,12 @@ protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
17961795
policies.bookieAffinityGroup,
17971796
antiAffinityGroup,
17981797
policies.migrated))
1799-
.orElseGet(() -> new LocalPolicies(defaultBundle(),
1800-
null, antiAffinityGroup))
1798+
.orElseGet(() -> new LocalPolicies(getDefaultBundleData(), null, antiAffinityGroup))
18011799
);
18021800
log.info("[{}] Successfully updated local-policies configuration: namespace={}, map={}", clientAppId(),
18031801
namespaceName, antiAffinityGroup);
1802+
} catch (RestException re) {
1803+
throw re;
18041804
} catch (Exception e) {
18051805
log.error("[{}] Failed to update local-policy configuration for namespace {}", clientAppId(), namespaceName,
18061806
e);
@@ -2816,9 +2816,10 @@ protected void internalEnableMigration(boolean migrated) {
28162816
policies.bookieAffinityGroup,
28172817
policies.namespaceAntiAffinityGroup,
28182818
migrated))
2819-
.orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
2820-
null, null, migrated)));
2819+
.orElseGet(() -> new LocalPolicies(getDefaultBundleData(), null, null, migrated)));
28212820
log.info("Successfully updated migration on namespace {}", namespaceName);
2821+
} catch (RestException re) {
2822+
throw re;
28222823
} catch (Exception e) {
28232824
log.error("Failed to update migration on namespace {}", namespaceName, e);
28242825
throw new RestException(e);
@@ -2920,5 +2921,22 @@ protected CompletableFuture<Set<String>> internalGetNamespaceAllowedClustersAsyn
29202921
.thenApply(policies -> policies.allowed_clusters);
29212922
}
29222923

2924+
// TODO remove this sync method after async refactor
2925+
private BundlesData getDefaultBundleData() {
2926+
try {
2927+
return getDefaultBundleDataAsync().get(config().getMetadataStoreOperationTimeoutSeconds(),
2928+
TimeUnit.SECONDS);
2929+
} catch (Exception e) {
2930+
log.error("[{}] Failed to get namespace-policy configuration for namespace {}", clientAppId(),
2931+
namespaceName, e);
2932+
throw new RestException(e);
2933+
}
2934+
}
2935+
2936+
private CompletableFuture<BundlesData> getDefaultBundleDataAsync() {
2937+
return namespaceResources().getPoliciesAsync(namespaceName).thenApply(
2938+
optionalPolicies -> optionalPolicies.isPresent() ? optionalPolicies.get().bundles :
2939+
getBundles(config().getDefaultNumberOfNamespaceBundles()));
2940+
}
29232941

29242942
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.admin;
2020

21+
import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
2122
import static org.mockito.Mockito.doReturn;
2223
import static org.mockito.Mockito.doThrow;
2324
import static org.mockito.Mockito.mock;
@@ -38,9 +39,12 @@
3839
import org.apache.pulsar.broker.web.PulsarWebResource;
3940
import org.apache.pulsar.broker.web.RestException;
4041
import org.apache.pulsar.common.naming.NamespaceName;
42+
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
43+
import org.apache.pulsar.common.policies.data.BundlesData;
4144
import org.apache.pulsar.common.policies.data.ClusterData;
4245
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
4346
import org.apache.pulsar.common.policies.data.DispatchRate;
47+
import org.apache.pulsar.common.policies.data.Policies;
4448
import org.apache.pulsar.common.policies.data.PolicyName;
4549
import org.apache.pulsar.common.policies.data.PolicyOperation;
4650
import org.apache.pulsar.common.policies.data.PublishRate;
@@ -236,4 +240,149 @@ public void testOperationDelayedDelivery() throws Exception {
236240
assertEquals(e.getResponse().getStatus(), Response.Status.PRECONDITION_FAILED.getStatusCode());
237241
}
238242
}
243+
244+
@Test
245+
public void testSetBookieAffinityGroupWithEmptyPolicies() throws Exception {
246+
// 1. create namespace with empty policies
247+
String setBookieAffinityGroupNs = "test-set-bookie-affinity-group-ns";
248+
asyncRequests(response -> namespaces.createNamespace(response, testTenant, setBookieAffinityGroupNs, null));
249+
250+
// 2.set bookie affinity group
251+
String primaryAffinityGroup = "primary-affinity-group";
252+
String secondaryAffinityGroup = "secondary-affinity-group";
253+
BookieAffinityGroupData bookieAffinityGroupDataReq =
254+
BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(primaryAffinityGroup)
255+
.bookkeeperAffinityGroupSecondary(secondaryAffinityGroup).build();
256+
namespaces.setBookieAffinityGroup(testTenant, setBookieAffinityGroupNs, bookieAffinityGroupDataReq);
257+
258+
// 3.query namespace num bundles, should be conf.getDefaultNumberOfNamespaceBundles()
259+
BundlesData bundlesData = (BundlesData) asyncRequests(
260+
response -> namespaces.getBundlesData(response, testTenant, setBookieAffinityGroupNs));
261+
assertEquals(bundlesData.getNumBundles(), conf.getDefaultNumberOfNamespaceBundles());
262+
263+
// 4.assert namespace bookie affinity group
264+
BookieAffinityGroupData bookieAffinityGroupDataResp =
265+
namespaces.getBookieAffinityGroup(testTenant, setBookieAffinityGroupNs);
266+
assertEquals(bookieAffinityGroupDataResp, bookieAffinityGroupDataReq);
267+
}
268+
269+
@Test
270+
public void testSetBookieAffinityGroupWithExistBundlePolicies() throws Exception {
271+
// 1. create namespace with specified num bundles
272+
String setBookieAffinityGroupNs = "test-set-bookie-affinity-group-ns";
273+
Policies policies = new Policies();
274+
policies.bundles = getBundles(10);
275+
asyncRequests(response -> namespaces.createNamespace(response, testTenant, setBookieAffinityGroupNs, policies));
276+
277+
// 2.set bookie affinity group
278+
String primaryAffinityGroup = "primary-affinity-group";
279+
String secondaryAffinityGroup = "secondary-affinity-group";
280+
BookieAffinityGroupData bookieAffinityGroupDataReq =
281+
BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(primaryAffinityGroup)
282+
.bookkeeperAffinityGroupSecondary(secondaryAffinityGroup).build();
283+
namespaces.setBookieAffinityGroup(testTenant, setBookieAffinityGroupNs, bookieAffinityGroupDataReq);
284+
285+
// 3.query namespace num bundles, should be policies.bundles, which we set before
286+
BundlesData bundlesData = (BundlesData) asyncRequests(
287+
response -> namespaces.getBundlesData(response, testTenant, setBookieAffinityGroupNs));
288+
assertEquals(bundlesData, policies.bundles);
289+
290+
// 4.assert namespace bookie affinity group
291+
BookieAffinityGroupData bookieAffinityGroupDataResp =
292+
namespaces.getBookieAffinityGroup(testTenant, setBookieAffinityGroupNs);
293+
assertEquals(bookieAffinityGroupDataResp, bookieAffinityGroupDataReq);
294+
}
295+
296+
@Test
297+
public void testSetNamespaceAntiAffinityGroupWithEmptyPolicies() throws Exception {
298+
// 1. create namespace with empty policies
299+
String setNamespaceAntiAffinityGroupNs = "test-set-namespace-anti-affinity-group-ns";
300+
asyncRequests(
301+
response -> namespaces.createNamespace(response, testTenant, setNamespaceAntiAffinityGroupNs, null));
302+
303+
// 2.set namespace anti affinity group
304+
String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
305+
namespaces.setNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs,
306+
namespaceAntiAffinityGroupReq);
307+
308+
// 3.query namespace num bundles, should be conf.getDefaultNumberOfNamespaceBundles()
309+
BundlesData bundlesData = (BundlesData) asyncRequests(
310+
response -> namespaces.getBundlesData(response, testTenant, setNamespaceAntiAffinityGroupNs));
311+
assertEquals(bundlesData.getNumBundles(), conf.getDefaultNumberOfNamespaceBundles());
312+
313+
// 4.assert namespace anti affinity group
314+
String namespaceAntiAffinityGroupResp =
315+
namespaces.getNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs);
316+
assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroupReq);
317+
}
318+
319+
@Test
320+
public void testSetNamespaceAntiAffinityGroupWithExistBundlePolicies() throws Exception {
321+
// 1. create namespace with specified num bundles
322+
String setNamespaceAntiAffinityGroupNs = "test-set-namespace-anti-affinity-group-ns";
323+
Policies policies = new Policies();
324+
policies.bundles = getBundles(10);
325+
asyncRequests(response -> namespaces.createNamespace(response, testTenant, setNamespaceAntiAffinityGroupNs,
326+
policies));
327+
328+
// 2.set namespace anti affinity group
329+
String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
330+
namespaces.setNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs,
331+
namespaceAntiAffinityGroupReq);
332+
333+
// 3.query namespace num bundles, should be policies.bundles, which we set before
334+
BundlesData bundlesData = (BundlesData) asyncRequests(
335+
response -> namespaces.getBundlesData(response, testTenant, setNamespaceAntiAffinityGroupNs));
336+
assertEquals(bundlesData, policies.bundles);
337+
338+
// 4.assert namespace anti affinity group
339+
String namespaceAntiAffinityGroupResp =
340+
namespaces.getNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs);
341+
assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroupReq);
342+
}
343+
344+
@Test
345+
public void testEnableMigrationWithEmptyPolicies() throws Exception {
346+
// 1. create namespace with empty policies
347+
String enableMigrationGroupNs = "test-set-namespace-enable-migration-ns";
348+
asyncRequests(response -> namespaces.createNamespace(response, testTenant, enableMigrationGroupNs, null));
349+
350+
// 2.set enable migration
351+
boolean enableMigrationReq = true;
352+
namespaces.enableMigration(testTenant, enableMigrationGroupNs, enableMigrationReq);
353+
354+
// 3.query namespace num bundles, should be conf.getDefaultNumberOfNamespaceBundles()
355+
BundlesData bundlesData = (BundlesData) asyncRequests(
356+
response -> namespaces.getBundlesData(response, testTenant, enableMigrationGroupNs));
357+
assertEquals(bundlesData.getNumBundles(), conf.getDefaultNumberOfNamespaceBundles());
358+
359+
// 4.assert namespace enable migration
360+
Policies policiesResp = (Policies) asyncRequests(
361+
response -> namespaces.getPolicies(response, testTenant, enableMigrationGroupNs));
362+
assertEquals(policiesResp.migrated, enableMigrationReq);
363+
}
364+
365+
@Test
366+
public void testEnableMigrationWithExistBundlePolicies() throws Exception {
367+
// 1. create namespace with specified num bundles
368+
String enableMigrationGroupNs = "test-set-namespace-enable-migration-ns";
369+
Policies policiesReq = new Policies();
370+
policiesReq.bundles = getBundles(10);
371+
asyncRequests(
372+
response -> namespaces.createNamespace(response, testTenant, enableMigrationGroupNs, policiesReq));
373+
374+
// 2.set enable migration
375+
boolean enableMigrationReq = true;
376+
namespaces.enableMigration(testTenant, enableMigrationGroupNs, enableMigrationReq);
377+
378+
// 3.query namespace num bundles, should be policies.bundles, which we set before
379+
BundlesData bundlesData = (BundlesData) asyncRequests(
380+
response -> namespaces.getBundlesData(response, testTenant, enableMigrationGroupNs));
381+
assertEquals(bundlesData, policiesReq.bundles);
382+
383+
// 4.assert namespace enable migration
384+
Policies policiesResp = (Policies) asyncRequests(
385+
response -> namespaces.getPolicies(response, testTenant, enableMigrationGroupNs));
386+
assertEquals(policiesResp.migrated, enableMigrationReq);
387+
}
239388
}

tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ public void testStopBroker() throws Exception {
304304
}
305305

306306
@Test(timeOut = 80 * 1000)
307-
public void testAntiaffinityPolicy() throws PulsarAdminException {
307+
public void testAntiAffinityPolicy() throws PulsarAdminException {
308308
final String namespaceAntiAffinityGroup = "my-anti-affinity-filter";
309309
final String antiAffinityEnabledNameSpace = DEFAULT_TENANT + "/my-ns-filter" + nsSuffix;
310310
final int numPartition = 20;
@@ -313,14 +313,25 @@ public void testAntiaffinityPolicy() throws PulsarAdminException {
313313

314314
assertEquals(activeBrokers.size(), NUM_BROKERS);
315315

316+
Set<String> antiAffinityEnabledNameSpacesReq = new HashSet<>();
316317
for (int i = 0; i < activeBrokers.size(); i++) {
317318
String namespace = antiAffinityEnabledNameSpace + "-" + i;
318-
admin.namespaces().createNamespace(namespace, 10);
319+
antiAffinityEnabledNameSpacesReq.add(namespace);
320+
admin.namespaces().createNamespace(namespace, 1);
319321
admin.namespaces().setNamespaceAntiAffinityGroup(namespace, namespaceAntiAffinityGroup);
320322
admin.clusters().createFailureDomain(clusterName, namespaceAntiAffinityGroup, FailureDomain.builder()
321323
.brokers(Set.of(activeBrokers.get(i))).build());
324+
String namespaceAntiAffinityGroupResp = admin.namespaces().getNamespaceAntiAffinityGroup(namespace);
325+
assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroup);
326+
FailureDomain failureDomainResp =
327+
admin.clusters().getFailureDomain(clusterName, namespaceAntiAffinityGroup);
328+
assertEquals(failureDomainResp.getBrokers(), Set.of(activeBrokers.get(i)));
322329
}
323330

331+
List<String> antiAffinityNamespacesResp =
332+
admin.namespaces().getAntiAffinityNamespaces(DEFAULT_TENANT, clusterName, namespaceAntiAffinityGroup);
333+
assertEquals(new HashSet<>(antiAffinityNamespacesResp), antiAffinityEnabledNameSpacesReq);
334+
324335
Set<String> result = new HashSet<>();
325336
for (int i = 0; i < activeBrokers.size(); i++) {
326337
final String topic = "persistent://" + antiAffinityEnabledNameSpace + "-" + i + "/topic";

0 commit comments

Comments
 (0)