Skip to content

Commit d582d6b

Browse files
committed
Only skip split the bundles that do not exist in Metadata
1 parent c08ff26 commit d582d6b

File tree

2 files changed

+14
-2
lines changed

2 files changed

+14
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,13 @@ public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
368368
return future;
369369
}
370370

371+
public boolean checkBundleDataExistInMetadataStore(String bundle) {
372+
Optional<BundleData> optBundleData =
373+
pulsarResources.getLoadBalanceResources().getBundleDataResources().getBundleData(bundle).join();
374+
375+
return optBundleData.isPresent();
376+
}
377+
371378
// Attempt to local the data for the given bundle in metadata store
372379
// If it cannot be found, return the default bundle data.
373380
@Override
@@ -767,6 +774,12 @@ public void checkNamespaceBundleSplit() {
767774
continue;
768775
}
769776

777+
if (!checkBundleDataExistInMetadataStore(bundleName)) {
778+
log.warn("Bundle {} has been removed on the metadata store, skip split this bundle ",
779+
bundleName);
780+
continue;
781+
}
782+
770783
// Make sure the same bundle is not selected again.
771784
loadData.getBundleData().remove(bundleName);
772785
localData.getLastStats().remove(bundleName);
@@ -794,7 +807,6 @@ && shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {
794807
}
795808
}
796809

797-
writeBrokerDataOnZooKeeper(true);
798810
updateBundleSplitMetrics(splitCount);
799811
}
800812

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1163,7 +1163,7 @@ public void testRepeatSplitBundle() throws Exception {
11631163
String topicToFindBundle = topicName + 0;
11641164
NamespaceBundle realBundle = pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle));
11651165
String bundleKey = realBundle.toString();
1166-
log.info("bundle={}", bundleKey);
1166+
log.info("Before bundle={}", bundleKey);
11671167

11681168
NamespaceBundleStats stats = new NamespaceBundleStats();
11691169
stats.msgRateIn = 100000.0;

0 commit comments

Comments
 (0)