Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.admin.impl;

import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.common.policies.data.PoliciesUtil.defaultBundle;
import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -962,9 +961,7 @@ protected void internalSetBookieAffinityGroup(BookieAffinityGroupData bookieAffi
bookieAffinityGroup,
policies.namespaceAntiAffinityGroup,
policies.migrated))
.orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
bookieAffinityGroup,
null));
.orElseGet(() -> new LocalPolicies(getDefaultBundleData(), bookieAffinityGroup, null));
log.info("[{}] Successfully updated local-policies configuration: namespace={}, map={}", clientAppId(),
namespaceName, localPolicies);
return localPolicies;
Expand All @@ -973,6 +970,8 @@ protected void internalSetBookieAffinityGroup(BookieAffinityGroupData bookieAffi
log.warn("[{}] Failed to update local-policy configuration for namespace {}: does not exist", clientAppId(),
namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("[{}] Failed to update local-policy configuration for namespace {}", clientAppId(), namespaceName,
e);
Expand Down Expand Up @@ -1789,11 +1788,12 @@ protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
policies.bookieAffinityGroup,
antiAffinityGroup,
policies.migrated))
.orElseGet(() -> new LocalPolicies(defaultBundle(),
null, antiAffinityGroup))
.orElseGet(() -> new LocalPolicies(getDefaultBundleData(), null, antiAffinityGroup))
);
log.info("[{}] Successfully updated local-policies configuration: namespace={}, map={}", clientAppId(),
namespaceName, antiAffinityGroup);
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("[{}] Failed to update local-policy configuration for namespace {}", clientAppId(), namespaceName,
e);
Expand Down Expand Up @@ -2805,9 +2805,10 @@ protected void internalEnableMigration(boolean migrated) {
policies.bookieAffinityGroup,
policies.namespaceAntiAffinityGroup,
migrated))
.orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
null, null, migrated)));
.orElseGet(() -> new LocalPolicies(getDefaultBundleData(), null, null, migrated)));
log.info("Successfully updated migration on namespace {}", namespaceName);
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("Failed to update migration on namespace {}", namespaceName, e);
throw new RestException(e);
Expand Down Expand Up @@ -2907,5 +2908,22 @@ protected CompletableFuture<Set<String>> internalGetNamespaceAllowedClustersAsyn
.thenApply(policies -> policies.allowed_clusters);
}

// TODO remove this sync method after async refactor
private BundlesData getDefaultBundleData() {
try {
return getDefaultBundleDataAsync().get(config().getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
} catch (Exception e) {
log.error("[{}] Failed to get namespace-policy configuration for namespace {}", clientAppId(),
namespaceName, e);
throw new RestException(e);
}
}

private CompletableFuture<BundlesData> getDefaultBundleDataAsync() {
return namespaceResources().getPoliciesAsync(namespaceName).thenApply(
optionalPolicies -> optionalPolicies.isPresent() ? optionalPolicies.get().bundles :
getBundles(config().getDefaultNumberOfNamespaceBundles()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin;

import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
Expand All @@ -38,9 +39,12 @@
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.PublishRate;
Expand Down Expand Up @@ -236,4 +240,149 @@ public void testOperationDelayedDelivery() throws Exception {
assertEquals(e.getResponse().getStatus(), Response.Status.PRECONDITION_FAILED.getStatusCode());
}
}

@Test
public void testSetBookieAffinityGroupWithEmptyPolicies() throws Exception {
// 1. create namespace with empty policies
String setBookieAffinityGroupNs = "test-set-bookie-affinity-group-ns";
asyncRequests(response -> namespaces.createNamespace(response, testTenant, setBookieAffinityGroupNs, null));

// 2.set bookie affinity group
String primaryAffinityGroup = "primary-affinity-group";
String secondaryAffinityGroup = "secondary-affinity-group";
BookieAffinityGroupData bookieAffinityGroupDataReq =
BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(primaryAffinityGroup)
.bookkeeperAffinityGroupSecondary(secondaryAffinityGroup).build();
namespaces.setBookieAffinityGroup(testTenant, setBookieAffinityGroupNs, bookieAffinityGroupDataReq);

// 3.query namespace num bundles, should be conf.getDefaultNumberOfNamespaceBundles()
BundlesData bundlesData = (BundlesData) asyncRequests(
response -> namespaces.getBundlesData(response, testTenant, setBookieAffinityGroupNs));
assertEquals(bundlesData.getNumBundles(), conf.getDefaultNumberOfNamespaceBundles());

// 4.assert namespace bookie affinity group
BookieAffinityGroupData bookieAffinityGroupDataResp =
namespaces.getBookieAffinityGroup(testTenant, setBookieAffinityGroupNs);
assertEquals(bookieAffinityGroupDataResp, bookieAffinityGroupDataReq);
}

@Test
public void testSetBookieAffinityGroupWithExistBundlePolicies() throws Exception {
// 1. create namespace with specified num bundles
String setBookieAffinityGroupNs = "test-set-bookie-affinity-group-ns";
Policies policies = new Policies();
policies.bundles = getBundles(10);
asyncRequests(response -> namespaces.createNamespace(response, testTenant, setBookieAffinityGroupNs, policies));

// 2.set bookie affinity group
String primaryAffinityGroup = "primary-affinity-group";
String secondaryAffinityGroup = "secondary-affinity-group";
BookieAffinityGroupData bookieAffinityGroupDataReq =
BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(primaryAffinityGroup)
.bookkeeperAffinityGroupSecondary(secondaryAffinityGroup).build();
namespaces.setBookieAffinityGroup(testTenant, setBookieAffinityGroupNs, bookieAffinityGroupDataReq);

// 3.query namespace num bundles, should be policies.bundles, which we set before
BundlesData bundlesData = (BundlesData) asyncRequests(
response -> namespaces.getBundlesData(response, testTenant, setBookieAffinityGroupNs));
assertEquals(bundlesData, policies.bundles);

// 4.assert namespace bookie affinity group
BookieAffinityGroupData bookieAffinityGroupDataResp =
namespaces.getBookieAffinityGroup(testTenant, setBookieAffinityGroupNs);
assertEquals(bookieAffinityGroupDataResp, bookieAffinityGroupDataReq);
}

@Test
public void testSetNamespaceAntiAffinityGroupWithEmptyPolicies() throws Exception {
// 1. create namespace with empty policies
String setNamespaceAntiAffinityGroupNs = "test-set-namespace-anti-affinity-group-ns";
asyncRequests(
response -> namespaces.createNamespace(response, testTenant, setNamespaceAntiAffinityGroupNs, null));

// 2.set namespace anti affinity group
String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
namespaces.setNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs,
namespaceAntiAffinityGroupReq);

// 3.query namespace num bundles, should be conf.getDefaultNumberOfNamespaceBundles()
BundlesData bundlesData = (BundlesData) asyncRequests(
response -> namespaces.getBundlesData(response, testTenant, setNamespaceAntiAffinityGroupNs));
assertEquals(bundlesData.getNumBundles(), conf.getDefaultNumberOfNamespaceBundles());

// 4.assert namespace anti affinity group
String namespaceAntiAffinityGroupResp =
namespaces.getNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs);
assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroupReq);
}

@Test
public void testSetNamespaceAntiAffinityGroupWithExistBundlePolicies() throws Exception {
// 1. create namespace with specified num bundles
String setNamespaceAntiAffinityGroupNs = "test-set-namespace-anti-affinity-group-ns";
Policies policies = new Policies();
policies.bundles = getBundles(10);
asyncRequests(response -> namespaces.createNamespace(response, testTenant, setNamespaceAntiAffinityGroupNs,
policies));

// 2.set namespace anti affinity group
String namespaceAntiAffinityGroupReq = "namespace-anti-affinity-group";
namespaces.setNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs,
namespaceAntiAffinityGroupReq);

// 3.query namespace num bundles, should be policies.bundles, which we set before
BundlesData bundlesData = (BundlesData) asyncRequests(
response -> namespaces.getBundlesData(response, testTenant, setNamespaceAntiAffinityGroupNs));
assertEquals(bundlesData, policies.bundles);

// 4.assert namespace anti affinity group
String namespaceAntiAffinityGroupResp =
namespaces.getNamespaceAntiAffinityGroup(testTenant, setNamespaceAntiAffinityGroupNs);
assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroupReq);
}

@Test
public void testEnableMigrationWithEmptyPolicies() throws Exception {
// 1. create namespace with empty policies
String enableMigrationGroupNs = "test-set-namespace-enable-migration-ns";
asyncRequests(response -> namespaces.createNamespace(response, testTenant, enableMigrationGroupNs, null));

// 2.set enable migration
boolean enableMigrationReq = true;
namespaces.enableMigration(testTenant, enableMigrationGroupNs, enableMigrationReq);

// 3.query namespace num bundles, should be conf.getDefaultNumberOfNamespaceBundles()
BundlesData bundlesData = (BundlesData) asyncRequests(
response -> namespaces.getBundlesData(response, testTenant, enableMigrationGroupNs));
assertEquals(bundlesData.getNumBundles(), conf.getDefaultNumberOfNamespaceBundles());

// 4.assert namespace enable migration
Policies policiesResp = (Policies) asyncRequests(
response -> namespaces.getPolicies(response, testTenant, enableMigrationGroupNs));
assertEquals(policiesResp.migrated, enableMigrationReq);
}

@Test
public void testEnableMigrationWithExistBundlePolicies() throws Exception {
// 1. create namespace with specified num bundles
String enableMigrationGroupNs = "test-set-namespace-enable-migration-ns";
Policies policiesReq = new Policies();
policiesReq.bundles = getBundles(10);
asyncRequests(
response -> namespaces.createNamespace(response, testTenant, enableMigrationGroupNs, policiesReq));

// 2.set enable migration
boolean enableMigrationReq = true;
namespaces.enableMigration(testTenant, enableMigrationGroupNs, enableMigrationReq);

// 3.query namespace num bundles, should be policies.bundles, which we set before
BundlesData bundlesData = (BundlesData) asyncRequests(
response -> namespaces.getBundlesData(response, testTenant, enableMigrationGroupNs));
assertEquals(bundlesData, policiesReq.bundles);

// 4.assert namespace enable migration
Policies policiesResp = (Policies) asyncRequests(
response -> namespaces.getPolicies(response, testTenant, enableMigrationGroupNs));
assertEquals(policiesResp.migrated, enableMigrationReq);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public void testStopBroker() throws Exception {
}

@Test(timeOut = 80 * 1000)
public void testAntiaffinityPolicy() throws PulsarAdminException {
public void testAntiAffinityPolicy() throws PulsarAdminException {
final String namespaceAntiAffinityGroup = "my-anti-affinity-filter";
final String antiAffinityEnabledNameSpace = DEFAULT_TENANT + "/my-ns-filter" + nsSuffix;
final int numPartition = 20;
Expand All @@ -312,14 +312,25 @@ public void testAntiaffinityPolicy() throws PulsarAdminException {

assertEquals(activeBrokers.size(), NUM_BROKERS);

Set<String> antiAffinityEnabledNameSpacesReq = new HashSet<>();
for (int i = 0; i < activeBrokers.size(); i++) {
String namespace = antiAffinityEnabledNameSpace + "-" + i;
admin.namespaces().createNamespace(namespace, 10);
antiAffinityEnabledNameSpacesReq.add(namespace);
admin.namespaces().createNamespace(namespace, 1);
admin.namespaces().setNamespaceAntiAffinityGroup(namespace, namespaceAntiAffinityGroup);
admin.clusters().createFailureDomain(clusterName, namespaceAntiAffinityGroup, FailureDomain.builder()
.brokers(Set.of(activeBrokers.get(i))).build());
String namespaceAntiAffinityGroupResp = admin.namespaces().getNamespaceAntiAffinityGroup(namespace);
assertEquals(namespaceAntiAffinityGroupResp, namespaceAntiAffinityGroup);
FailureDomain failureDomainResp =
admin.clusters().getFailureDomain(clusterName, namespaceAntiAffinityGroup);
assertEquals(failureDomainResp.getBrokers(), Set.of(activeBrokers.get(i)));
}

List<String> antiAffinityNamespacesResp =
admin.namespaces().getAntiAffinityNamespaces(DEFAULT_TENANT, clusterName, namespaceAntiAffinityGroup);
assertEquals(new HashSet<>(antiAffinityNamespacesResp), antiAffinityEnabledNameSpacesReq);

Set<String> result = new HashSet<>();
for (int i = 0; i < activeBrokers.size(); i++) {
final String topic = "persistent://" + antiAffinityEnabledNameSpace + "-" + i + "/topic";
Expand Down
Loading