Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.ResourceQuota;
Expand Down Expand Up @@ -368,6 +369,11 @@ public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
return future;
}

public boolean checkBundleDataExistInNamespaceBundles(NamespaceBundles namespaceBundles, String bundleRange) {
List<NamespaceBundle> bundles = namespaceBundles.getBundles();
return bundles.stream().anyMatch(b -> b.getBundleRange().equals(bundleRange));
}

// Attempt to local the data for the given bundle in metadata store
// If it cannot be found, return the default bundle data.
@Override
Expand Down Expand Up @@ -767,6 +773,13 @@ public void checkNamespaceBundleSplit() {
continue;
}

NamespaceBundles bundles = namespaceBundleFactory.getBundles(NamespaceName.get(namespaceName));
if (!checkBundleDataExistInNamespaceBundles(bundles, bundleRange)) {
log.warn("Bundle {} has been removed, skip split this bundle ",
bundleName);
continue;
}

// Make sure the same bundle is not selected again.
loadData.getBundleData().remove(bundleName);
localData.getLastStats().remove(bundleName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1130,4 +1130,50 @@ public void testRemoveNonExistBundleData()
assertFalse(bundlesAfterSplit.contains(bundleWillBeSplit.getBundleRange()));
}

@Test
public void testRepeatSplitBundle() throws Exception {
final String cluster = "use";
final String tenant = "my-tenant";
final String namespace = "repeat-split-bundle";
final String topicName = tenant + "/" + namespace + "/" + "topic";
int bundleNumbers = 8;

admin1.clusters().createCluster(cluster, ClusterData.builder()
.serviceUrl(pulsar1.getWebServiceAddress()).build());
admin1.tenants().createTenant(tenant,
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
admin1.namespaces().createNamespace(tenant + "/" + namespace, bundleNumbers);

LoadData loadData = (LoadData) getField(primaryLoadManager, "loadData");
LocalBrokerData localData = (LocalBrokerData) getField(primaryLoadManager, "localData");

@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar1.getBrokerServiceUrl()).build();

// create a lot of topic to fully distributed among bundles.
for (int i = 0; i < 10; i++) {
String topicNameI = topicName + i;
admin1.topics().createPartitionedTopic(topicNameI, 20);
// trigger bundle assignment

pulsarClient.newConsumer().topic(topicNameI)
.subscriptionName("my-subscriber-name2").subscribe();
}

String topicToFindBundle = topicName + 0;
NamespaceBundle realBundle = pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle));
String bundleKey = realBundle.toString();
log.info("Before bundle={}", bundleKey);

NamespaceBundleStats stats = new NamespaceBundleStats();
stats.msgRateIn = 100000.0;
localData.getLastStats().put(bundleKey, stats);
pulsar1.getBrokerService().updateRates();

primaryLoadManager.updateAll();

primaryLoadManager.updateAll();
Assert.assertFalse(loadData.getBundleData().containsKey(bundleKey));
}

}
Loading