Skip to content

Commit faefc6c

Browse files
vinkal-chudgarlhotari
authored andcommitted
[improve][client] Deduplicate getTopicsUnderNamespace in BinaryProtoLookupService (#24962)
Signed-off-by: Vinkal Chudgar <[email protected]> (cherry picked from commit 1902735)
1 parent 4aabd82 commit faefc6c

File tree

2 files changed

+206
-11
lines changed

2 files changed

+206
-11
lines changed

pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java

Lines changed: 73 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ public class BinaryProtoLookupService implements LookupService {
7373
private final ConcurrentHashMap<PartitionedTopicMetadataKey, CompletableFuture<PartitionedTopicMetadata>>
7474
partitionedMetadataInProgress = new ConcurrentHashMap<>();
7575

76+
private final ConcurrentHashMap<TopicsUnderNamespaceKey, CompletableFuture<GetTopicsResult>>
77+
topicsUnderNamespaceInProgress = new ConcurrentHashMap<>();
78+
7679
private final LatencyHistogram histoGetBroker;
7780
private final LatencyHistogram histoGetTopicMetadata;
7881
private final LatencyHistogram histoGetSchema;
@@ -400,17 +403,31 @@ public CompletableFuture<GetTopicsResult> getTopicsUnderNamespace(NamespaceName
400403
Mode mode,
401404
String topicsPattern,
402405
String topicsHash) {
403-
CompletableFuture<GetTopicsResult> topicsFuture = new CompletableFuture<>();
404-
405-
AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
406-
Backoff backoff = new BackoffBuilder()
407-
.setInitialTime(100, TimeUnit.MILLISECONDS)
408-
.setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
409-
.setMax(1, TimeUnit.MINUTES)
410-
.create();
411-
getTopicsUnderNamespace(namespace, backoff, opTimeoutMs, topicsFuture, mode,
412-
topicsPattern, topicsHash);
413-
return topicsFuture;
406+
final MutableObject<CompletableFuture<GetTopicsResult>> newFutureCreated = new MutableObject<>();
407+
final TopicsUnderNamespaceKey key = new TopicsUnderNamespaceKey(namespace, mode, topicsPattern, topicsHash);
408+
409+
try {
410+
return topicsUnderNamespaceInProgress.computeIfAbsent(key, k -> {
411+
CompletableFuture<GetTopicsResult> topicsFuture = new CompletableFuture<>();
412+
AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
413+
Backoff backoff = new BackoffBuilder()
414+
.setInitialTime(100, TimeUnit.MILLISECONDS)
415+
.setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
416+
.setMax(1, TimeUnit.MINUTES)
417+
.create();
418+
419+
newFutureCreated.setValue(topicsFuture);
420+
getTopicsUnderNamespace(namespace, backoff, opTimeoutMs, topicsFuture, mode,
421+
topicsPattern, topicsHash);
422+
return topicsFuture;
423+
});
424+
} finally {
425+
if (newFutureCreated.getValue() != null) {
426+
newFutureCreated.getValue().whenComplete((v, ex) -> {
427+
topicsUnderNamespaceInProgress.remove(key, newFutureCreated.getValue());
428+
});
429+
}
430+
}
414431
}
415432

416433
private void getTopicsUnderNamespace(
@@ -500,6 +517,51 @@ public LookupDataResult(int partitions) {
500517

501518
}
502519

520+
private static final class TopicsUnderNamespaceKey {
521+
private final NamespaceName namespace;
522+
private final Mode mode;
523+
private final String topicsPattern;
524+
private final String topicsHash;
525+
526+
TopicsUnderNamespaceKey(NamespaceName namespace, Mode mode,
527+
String topicsPattern, String topicsHash) {
528+
this.namespace = namespace;
529+
this.mode = mode;
530+
this.topicsPattern = topicsPattern;
531+
this.topicsHash = topicsHash;
532+
}
533+
534+
@Override
535+
public boolean equals(Object o) {
536+
if (this == o) {
537+
return true;
538+
}
539+
if (o == null || getClass() != o.getClass()) {
540+
return false;
541+
}
542+
TopicsUnderNamespaceKey that = (TopicsUnderNamespaceKey) o;
543+
return Objects.equals(namespace, that.namespace)
544+
&& mode == that.mode
545+
&& Objects.equals(topicsPattern, that.topicsPattern)
546+
&& Objects.equals(topicsHash, that.topicsHash);
547+
}
548+
549+
@Override
550+
public int hashCode() {
551+
return Objects.hash(namespace, mode, topicsPattern, topicsHash);
552+
}
553+
554+
@Override
555+
public String toString() {
556+
return "TopicsUnderNamespaceKey{"
557+
+ "namespace=" + namespace
558+
+ ", mode=" + mode
559+
+ ", topicsPattern='" + topicsPattern + '\''
560+
+ ", topicsHash='" + topicsHash + '\''
561+
+ '}';
562+
}
563+
}
564+
503565
private static final class PartitionedTopicMetadataKey {
504566
private final TopicName topicName;
505567
private final boolean metadataAutoCreationEnabled;

pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@
5050
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
5151
import org.apache.pulsar.common.api.proto.BaseCommand;
5252
import org.apache.pulsar.common.api.proto.BaseCommand.Type;
53+
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
54+
import org.apache.pulsar.common.lookup.GetTopicsResult;
55+
import org.apache.pulsar.common.naming.NamespaceName;
5356
import org.apache.pulsar.common.naming.TopicName;
5457
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
5558
import org.apache.pulsar.common.protocol.Commands;
@@ -201,6 +204,136 @@ private static LookupDataResult createLookupDataResult(String brokerUrl, boolean
201204
return lookupResult;
202205
}
203206

207+
/**
208+
* Verifies that getTopicsUnderNamespace() deduplicates concurrent requests and cleans up after completion.
209+
*
210+
* First, two concurrent calls with identical parameters should return the same CompletableFuture
211+
* and trigger only one connection pool request (deduplication).
212+
*
213+
* Second, after the future completes, the map entry should be removed so a subsequent call
214+
* with the same parameters creates a new future (cleanup).
215+
*
216+
* This test uses a never-completing connection future to isolate the deduplication logic
217+
* without executing the network request path.
218+
*/
219+
220+
@Test(timeOut = 60000)
221+
public void testGetTopicsUnderNamespaceDeduplication() throws Exception {
222+
PulsarClientImpl client = mock(PulsarClientImpl.class);
223+
ConnectionPool cnxPool = mock(ConnectionPool.class);
224+
225+
ClientConfigurationData conf = new ClientConfigurationData();
226+
conf.setOperationTimeoutMs(30000);
227+
when(client.getConfiguration()).thenReturn(conf);
228+
when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP);
229+
when(client.getCnxPool()).thenReturn(cnxPool);
230+
231+
// Never-completing connection prevents the thenAcceptAsync callback in getTopicsUnderNamespace from executing,
232+
// isolating only the deduplication logic without network calls.
233+
CompletableFuture<ClientCnx> neverCompletes = new CompletableFuture<>();
234+
when(cnxPool.getConnection(any(ServiceNameResolver.class))).thenReturn(neverCompletes);
235+
236+
ScheduledExecutorService scheduler =
237+
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("lookup-test-sched"));
238+
239+
try (BinaryProtoLookupService lookup = new BinaryProtoLookupService(
240+
client, "pulsar://broker:6650", null, false, scheduler, /*lookupPinnedExecutor*/ null)) {
241+
242+
NamespaceName ns = NamespaceName.get("public", "default");
243+
Mode mode = Mode.PERSISTENT;
244+
String pattern = ".*";
245+
String topicsHash = null;
246+
247+
CompletableFuture<GetTopicsResult> f1 = lookup.getTopicsUnderNamespace(ns, mode, pattern, topicsHash);
248+
CompletableFuture<GetTopicsResult> f1b = lookup.getTopicsUnderNamespace(ns, mode, pattern, topicsHash);
249+
250+
assertSame(f1b, f1, "Concurrent requests with identical parameters should return the same future");
251+
252+
verify(cnxPool, times(1)).getConnection(any(ServiceNameResolver.class));
253+
254+
GetTopicsResult payload = new GetTopicsResult(java.util.Collections.emptyList(), null, false, true);
255+
256+
// Complete the future. This triggers the whenComplete callback that removes the map entry.
257+
f1.complete(payload);
258+
assertTrue(f1.isDone());
259+
260+
// Verify cleanup: subsequent call with same parameters creates a new future.
261+
CompletableFuture<GetTopicsResult> f2 = lookup.getTopicsUnderNamespace(ns, mode, pattern, topicsHash);
262+
assertNotSame(f2, f1,
263+
"After completion, the deduplication map entry should be removed and a new future created");
264+
verify(cnxPool, times(2)).getConnection(any(ServiceNameResolver.class));
265+
} finally {
266+
scheduler.shutdownNow();
267+
}
268+
}
269+
270+
/**
271+
* Verifies that getTopicsUnderNamespace() treats different topicsHash values as distinct keys for deduplication.
272+
*
273+
* Requests with different topicsHash values should create separate futures and trigger separate connection
274+
* pool requests. Cleanup is per key. Completing one future does not affect another in-flight entry.
275+
*
276+
* This test uses a never-completing connection future to isolate the deduplication logic without executing
277+
* the network request path.
278+
*/
279+
@Test(timeOut = 60000)
280+
public void testGetTopicsUnderNamespaceDeduplicationDifferentHash() throws Exception {
281+
PulsarClientImpl client = mock(PulsarClientImpl.class);
282+
ConnectionPool cnxPool = mock(ConnectionPool.class);
283+
284+
ClientConfigurationData conf = new ClientConfigurationData();
285+
conf.setOperationTimeoutMs(30000);
286+
when(client.getConfiguration()).thenReturn(conf);
287+
when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP);
288+
when(client.getCnxPool()).thenReturn(cnxPool);
289+
290+
// Never-completing connection prevents the thenAcceptAsync callback in getTopicsUnderNamespace from executing,
291+
// isolating only the deduplication logic without network calls.
292+
CompletableFuture<ClientCnx> neverCompletes = new CompletableFuture<>();
293+
when(cnxPool.getConnection(any(ServiceNameResolver.class))).thenReturn(neverCompletes);
294+
295+
ScheduledExecutorService scheduler =
296+
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("lookup-test-sched"));
297+
298+
try (BinaryProtoLookupService lookup = new BinaryProtoLookupService(
299+
client, "pulsar://broker:6650", null, false, scheduler, null)) {
300+
301+
NamespaceName ns = NamespaceName.get("public", "default");
302+
Mode mode = Mode.PERSISTENT;
303+
String pattern = ".*";
304+
305+
CompletableFuture<GetTopicsResult> futureHashA = lookup.getTopicsUnderNamespace(ns, mode, pattern, "HashA");
306+
CompletableFuture<GetTopicsResult> futureHashB = lookup.getTopicsUnderNamespace(ns, mode, pattern, "HashB");
307+
308+
// Verify different hash values create separate futures.
309+
assertNotSame(futureHashA, futureHashB,
310+
"Requests with different topicsHash must not share the same future");
311+
312+
// Verify connection pool called twice, once for each distinct topicsHash.
313+
verify(cnxPool, times(2)).getConnection(any(ServiceNameResolver.class));
314+
315+
GetTopicsResult payload = new GetTopicsResult(java.util.Collections.emptyList(), null, false, true);
316+
317+
futureHashA.complete(payload);
318+
319+
// Verify cleanup for HashA: subsequent call creates a new future.
320+
CompletableFuture<GetTopicsResult> futureHashA2 =
321+
lookup.getTopicsUnderNamespace(ns, mode, pattern, "HashA");
322+
assertNotSame(futureHashA2, futureHashA,
323+
"After completion, a call with the same topicsHash must create a new future");
324+
verify(cnxPool, times(3)).getConnection(any(ServiceNameResolver.class));
325+
326+
// Verify HashB still in-flight: subsequent call returns the original future.
327+
CompletableFuture<GetTopicsResult> futureHashB2 =
328+
lookup.getTopicsUnderNamespace(ns, mode, pattern, "HashB");
329+
assertSame(futureHashB2, futureHashB,
330+
"An in-flight request for the same topicsHash must return the same future");
331+
verify(cnxPool, times(3)).getConnection(any(ServiceNameResolver.class));
332+
} finally {
333+
scheduler.shutdownNow();
334+
}
335+
}
336+
204337
/**
205338
* Verifies that getPartitionedTopicMetadata() deduplicates concurrent requests and cleans up after completion.
206339
*

0 commit comments

Comments
 (0)