Skip to content

Commit 7b6f9fc

Browse files
[fix][broker] Allow intermittent error from topic policies service when loading topics (#24829)
1 parent 5e59d0e commit 7b6f9fc

File tree

2 files changed

+36
-9
lines changed

2 files changed

+36
-9
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1168,14 +1168,7 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
11681168
// The topic level policies are not needed now, but the meaning of calling
11691169
// "getTopicPoliciesBypassSystemTopic" will wait for system topic policies initialization.
11701170
getTopicPoliciesBypassSystemTopic(topicName, TopicPoliciesService.GetType.LOCAL_ONLY)
1171-
.exceptionally(ex -> {
1172-
final Throwable rc = FutureUtil.unwrapCompletionException(ex);
1173-
final String errorInfo = String.format("Topic creation encountered an exception by initialize"
1174-
+ " topic policies service. topic_name=%s error_message=%s", topicName,
1175-
rc.getMessage());
1176-
log.error(errorInfo, rc);
1177-
throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo));
1178-
}).thenRun(() -> {
1171+
.thenRun(() -> {
11791172
final var inserted = new MutableBoolean(false);
11801173
final var cachedFuture = topics.computeIfAbsent(topicName.toString(), ___ -> {
11811174
inserted.setTrue();
@@ -1195,6 +1188,15 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
11951188
}
11961189
});
11971190
}
1191+
}).exceptionally(e -> {
1192+
pulsar.getExecutor().execute(() -> topics.remove(topicName.toString(), topicFuture));
1193+
final Throwable rc = FutureUtil.unwrapCompletionException(e);
1194+
final String errorInfo = String.format("Topic creation encountered an exception by initialize"
1195+
+ " topic policies service. topic_name=%s error_message=%s", topicName,
1196+
rc.getMessage());
1197+
log.error(errorInfo, rc);
1198+
topicFuture.completeExceptionally(rc);
1199+
return null;
11981200
});
11991201
});
12001202
return topicFuture;

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@
125125
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
126126
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
127127
import org.apache.pulsar.common.policies.data.SubscriptionStats;
128+
import org.apache.pulsar.common.policies.data.TopicPolicies;
128129
import org.apache.pulsar.common.policies.data.TopicStats;
129130
import org.apache.pulsar.common.protocol.Commands;
130131
import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -148,7 +149,8 @@ public class BrokerServiceTest extends BrokerTestBase {
148149
@Override
149150
protected void setup() throws Exception {
150151
conf.setSystemTopicEnabled(false);
151-
conf.setTopicLevelPoliciesEnabled(false);
152+
conf.setTopicLevelPoliciesEnabled(true);
153+
conf.setTopicPoliciesServiceClassName(MockTopicPoliciesService.class.getName());
152154
super.baseSetup();
153155
}
154156

@@ -2036,5 +2038,28 @@ public void testPulsarMetadataEventSyncProducerCreation() throws Exception {
20362038
retryStrategically((test) -> sync.getProducer() != null, 1000, 10);
20372039
assertNotNull(sync.getProducer());
20382040
}
2041+
2042+
@Test
2043+
public void testGetTopicWhenTopicPoliciesFail() throws Exception {
2044+
final var topicName = TopicName.get("prop/ns-abc/test-get-topic-when-topic-policies-fail");
2045+
MockTopicPoliciesService.FAILED_TOPICS.add(topicName);
2046+
@Cleanup final var producer = pulsarClient.newProducer().topic(topicName.toString()).create();
2047+
assertFalse(MockTopicPoliciesService.FAILED_TOPICS.contains(topicName));
2048+
}
2049+
2050+
static class MockTopicPoliciesService extends TopicPoliciesService.TopicPoliciesServiceDisabled {
2051+
2052+
static final Set<TopicName> FAILED_TOPICS = ConcurrentHashMap.newKeySet();
2053+
2054+
@Override
2055+
public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(TopicName topicName, GetType type) {
2056+
if (FAILED_TOPICS.contains(topicName)) {
2057+
// Only fail once
2058+
FAILED_TOPICS.remove(topicName);
2059+
return CompletableFuture.failedFuture(new RuntimeException("injected failure for " + topicName));
2060+
}
2061+
return CompletableFuture.completedFuture(Optional.empty());
2062+
}
2063+
}
20392064
}
20402065

0 commit comments

Comments
 (0)