2828import static org .testng .Assert .fail ;
2929import com .google .common .collect .Sets ;
3030import java .net .URL ;
31+ import java .time .Duration ;
3132import java .util .HashSet ;
3233import java .util .Optional ;
3334import java .util .Set ;
3435import java .util .concurrent .CountDownLatch ;
36+ import java .util .concurrent .CyclicBarrier ;
3537import java .util .concurrent .ExecutorService ;
3638import java .util .concurrent .Executors ;
3739import java .util .concurrent .LinkedBlockingQueue ;
3840import java .util .concurrent .ThreadPoolExecutor ;
3941import java .util .concurrent .TimeUnit ;
4042import java .util .concurrent .atomic .AtomicBoolean ;
43+ import java .util .concurrent .atomic .AtomicReference ;
4144import lombok .Cleanup ;
45+ import org .apache .pulsar .broker .BrokerTestUtil ;
4246import org .apache .pulsar .broker .PulsarService ;
4347import org .apache .pulsar .broker .ServiceConfiguration ;
4448import org .apache .pulsar .broker .loadbalance .LoadManager ;
5054import org .apache .pulsar .broker .service .nonpersistent .NonPersistentTopic ;
5155import org .apache .pulsar .client .admin .PulsarAdmin ;
5256import org .apache .pulsar .client .impl .ConsumerImpl ;
57+ import org .apache .pulsar .client .impl .MessageIdImpl ;
5358import org .apache .pulsar .client .impl .MultiTopicsConsumerImpl ;
5459import org .apache .pulsar .client .impl .PartitionedProducerImpl ;
5560import org .apache .pulsar .client .impl .ProducerImpl ;
6671import org .apache .pulsar .zookeeper .ZookeeperServerTest ;
6772import org .slf4j .Logger ;
6873import org .slf4j .LoggerFactory ;
74+ import org .testcontainers .shaded .org .awaitility .Awaitility ;
6975import org .testng .Assert ;
7076import org .testng .annotations .AfterMethod ;
7177import org .testng .annotations .BeforeMethod ;
@@ -813,14 +819,18 @@ public void testNonPersistentBrokerModeRejectPersistentTopic(String loadManagerN
813819 *
814820 * @throws Exception
815821 */
816- @ Test
822+ @ Test ( timeOut = 60000 )
817823 public void testMsgDropStat () throws Exception {
818824
819825 int defaultNonPersistentMessageRate = conf .getMaxConcurrentNonPersistentMessagePerConnection ();
820826 try {
821- final String topicName = "non-persistent://my-property/my-ns/stats-topic" ;
822- // restart broker with lower publish rate limit
823- conf .setMaxConcurrentNonPersistentMessagePerConnection (1 );
827+ final String topicName = BrokerTestUtil .newUniqueName ("non-persistent://my-property/my-ns/stats-topic" );
828+
829+ // For non-persistent topics, set the per-connection in-flight limit to 0.
830+ // Since ServerCnx drops when inFlight > max; with max=0, any second overlapping send on the
831+ // same connection is dropped (entryId == -1) and recorded. This makes observing a publisher drop
832+ // reliable in this test.
833+ conf .setMaxConcurrentNonPersistentMessagePerConnection (0 );
824834 stopBroker ();
825835 startBroker ();
826836 Consumer <byte []> consumer = pulsarClient .newConsumer ().topic (topicName ).subscriptionName ("subscriber-1" )
@@ -833,30 +843,69 @@ public void testMsgDropStat() throws Exception {
833843 .enableBatching (false )
834844 .messageRoutingMode (MessageRoutingMode .SinglePartition )
835845 .create ();
846+
847+ final int threads = 10 ;
836848 @ Cleanup ("shutdownNow" )
837- ExecutorService executor = Executors .newFixedThreadPool (5 );
849+ ExecutorService executor = Executors .newFixedThreadPool (threads );
838850 byte [] msgData = "testData" .getBytes ();
839- final int totalProduceMessages = 200 ;
840- CountDownLatch latch = new CountDownLatch (totalProduceMessages );
841- for (int i = 0 ; i < totalProduceMessages ; i ++) {
842- executor .submit (() -> {
843- producer .sendAsync (msgData ).handle ((msg , e ) -> {
844- latch .countDown ();
845- return null ;
851+
852+ /*
853+ * Trigger at least one publisher drop through concurrent send() calls.
854+ *
855+ * Uses CyclicBarrier to ensure all threads send simultaneously, creating overlap.
856+ * With maxConcurrentNonPersistentMessagePerConnection = 0, ServerCnx#handleSend
857+ * drops any send while another is in-flight, returning MessageId with entryId = -1.
858+ * Awaitility repeats whole bursts (bounded to 20s) until a drop is observed.
859+ */
860+ AtomicBoolean publisherDropSeen = new AtomicBoolean (false );
861+ Awaitility .await ().atMost (Duration .ofSeconds (20 )).until (() -> {
862+ CyclicBarrier barrier = new CyclicBarrier (threads );
863+ CountDownLatch completionLatch = new CountDownLatch (threads );
864+ AtomicReference <Throwable > error = new AtomicReference <>();
865+ publisherDropSeen .set (false );
866+
867+ for (int i = 0 ; i < threads ; i ++) {
868+ executor .submit (() -> {
869+ try {
870+ barrier .await ();
871+ MessageId msgId = producer .send (msgData );
872+ // Publisher drop is signaled by MessageIdImpl.entryId == -1
873+ if (msgId instanceof MessageIdImpl && ((MessageIdImpl ) msgId ).getEntryId () == -1 ) {
874+ publisherDropSeen .set (true );
875+ }
876+ } catch (Throwable t ) {
877+ if (t instanceof InterruptedException ) {
878+ Thread .currentThread ().interrupt ();
879+ }
880+ error .compareAndSet (null , t );
881+ } finally {
882+ completionLatch .countDown ();
883+ }
846884 });
847- });
848- }
849- latch .await ();
885+ }
886+
887+ // Wait for all sends to complete.
888+ assertTrue (completionLatch .await (20 , TimeUnit .SECONDS ));
889+
890+ assertNull (error .get (), "Concurrent send encountered an exception" );
891+ return publisherDropSeen .get ();
892+ });
893+
894+ assertTrue (publisherDropSeen .get (), "Expected at least one publisher drop (entryId == -1)" );
895+
896+ NonPersistentTopic topic =
897+ (NonPersistentTopic ) pulsar .getBrokerService ().getOrCreateTopic (topicName ).get ();
850898
851- NonPersistentTopic topic = (NonPersistentTopic ) pulsar .getBrokerService ().getOrCreateTopic (topicName ).get ();
852- pulsar .getBrokerService ().updateRates ();
853- NonPersistentTopicStats stats = topic .getStats (false , false , false );
854- NonPersistentPublisherStats npStats = stats .getPublishers ().get (0 );
855- NonPersistentSubscriptionStats sub1Stats = stats .getSubscriptions ().get ("subscriber-1" );
856- NonPersistentSubscriptionStats sub2Stats = stats .getSubscriptions ().get ("subscriber-2" );
857- assertTrue (npStats .getMsgDropRate () > 0 );
858- assertTrue (sub1Stats .getMsgDropRate () > 0 );
859- assertTrue (sub2Stats .getMsgDropRate () > 0 );
899+ Awaitility .await ().ignoreExceptions ().untilAsserted (() -> {
900+ pulsar .getBrokerService ().updateRates ();
901+ NonPersistentTopicStats stats = topic .getStats (false , false , false );
902+ NonPersistentPublisherStats npStats = stats .getPublishers ().get (0 );
903+ NonPersistentSubscriptionStats sub1Stats = stats .getSubscriptions ().get ("subscriber-1" );
904+ NonPersistentSubscriptionStats sub2Stats = stats .getSubscriptions ().get ("subscriber-2" );
905+ assertTrue (npStats .getMsgDropRate () > 0 );
906+ assertTrue (sub1Stats .getMsgDropRate () > 0 );
907+ assertTrue (sub2Stats .getMsgDropRate () > 0 );
908+ });
860909
861910 producer .close ();
862911 consumer .close ();
0 commit comments