Skip to content

Commit 2da2096

Browse files
committed
Only skip split the bundles that do not exist in Metadata
1 parent c08ff26 commit 2da2096

File tree

2 files changed

+13
-2
lines changed

2 files changed

+13
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,13 @@ public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
368368
return future;
369369
}
370370

371+
public boolean checkBundleDataExistInMetadataStore(String bundle) {
372+
Optional<BundleData> optBundleData =
373+
pulsarResources.getLoadBalanceResources().getBundleDataResources().getBundleData(bundle).join();
374+
375+
return optBundleData.isPresent();
376+
}
377+
371378
// Attempt to local the data for the given bundle in metadata store
372379
// If it cannot be found, return the default bundle data.
373380
@Override
@@ -767,6 +774,11 @@ public void checkNamespaceBundleSplit() {
767774
continue;
768775
}
769776

777+
if (!checkBundleDataExistInMetadataStore(bundleName)) {
778+
log.warn("Bundle {} has been removed on the metadata store, skip split this bundle ", bundleName);
779+
continue;
780+
}
781+
770782
// Make sure the same bundle is not selected again.
771783
loadData.getBundleData().remove(bundleName);
772784
localData.getLastStats().remove(bundleName);
@@ -794,7 +806,6 @@ && shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {
794806
}
795807
}
796808

797-
writeBrokerDataOnZooKeeper(true);
798809
updateBundleSplitMetrics(splitCount);
799810
}
800811

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1163,7 +1163,7 @@ public void testRepeatSplitBundle() throws Exception {
11631163
String topicToFindBundle = topicName + 0;
11641164
NamespaceBundle realBundle = pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle));
11651165
String bundleKey = realBundle.toString();
1166-
log.info("bundle={}", bundleKey);
1166+
log.info("Before bundle={}", bundleKey);
11671167

11681168
NamespaceBundleStats stats = new NamespaceBundleStats();
11691169
stats.msgRateIn = 100000.0;

0 commit comments

Comments
 (0)