Skip to content

Commit c08ff26

Browse files
committed
[fix][load] Avoid get old loadData during split bundle
1 parent a8b41b9 commit c08ff26

File tree

2 files changed

+47
-0
lines changed

2 files changed

+47
-0
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -794,6 +794,7 @@ && shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {
794794
}
795795
}
796796

797+
writeBrokerDataOnZooKeeper(true);
797798
updateBundleSplitMetrics(splitCount);
798799
}
799800

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1130,4 +1130,50 @@ public void testRemoveNonExistBundleData()
11301130
assertFalse(bundlesAfterSplit.contains(bundleWillBeSplit.getBundleRange()));
11311131
}
11321132

1133+
@Test
1134+
public void testRepeatSplitBundle() throws Exception {
1135+
final String cluster = "use";
1136+
final String tenant = "my-tenant";
1137+
final String namespace = "repeat-split-bundle";
1138+
final String topicName = tenant + "/" + namespace + "/" + "topic";
1139+
int bundleNumbers = 8;
1140+
1141+
admin1.clusters().createCluster(cluster, ClusterData.builder()
1142+
.serviceUrl(pulsar1.getWebServiceAddress()).build());
1143+
admin1.tenants().createTenant(tenant,
1144+
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
1145+
admin1.namespaces().createNamespace(tenant + "/" + namespace, bundleNumbers);
1146+
1147+
LoadData loadData = (LoadData) getField(primaryLoadManager, "loadData");
1148+
LocalBrokerData localData = (LocalBrokerData) getField(primaryLoadManager, "localData");
1149+
1150+
@Cleanup
1151+
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar1.getBrokerServiceUrl()).build();
1152+
1153+
// create a lot of topic to fully distributed among bundles.
1154+
for (int i = 0; i < 10; i++) {
1155+
String topicNameI = topicName + i;
1156+
admin1.topics().createPartitionedTopic(topicNameI, 20);
1157+
// trigger bundle assignment
1158+
1159+
pulsarClient.newConsumer().topic(topicNameI)
1160+
.subscriptionName("my-subscriber-name2").subscribe();
1161+
}
1162+
1163+
String topicToFindBundle = topicName + 0;
1164+
NamespaceBundle realBundle = pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle));
1165+
String bundleKey = realBundle.toString();
1166+
log.info("bundle={}", bundleKey);
1167+
1168+
NamespaceBundleStats stats = new NamespaceBundleStats();
1169+
stats.msgRateIn = 100000.0;
1170+
localData.getLastStats().put(bundleKey, stats);
1171+
pulsar1.getBrokerService().updateRates();
1172+
1173+
primaryLoadManager.updateAll();
1174+
1175+
primaryLoadManager.updateAll();
1176+
Assert.assertFalse(loadData.getBundleData().containsKey(bundleKey));
1177+
}
1178+
11331179
}

0 commit comments

Comments
 (0)