Skip to content

Commit bdcdeb1

Browse files
authored
When subscribing to already subscribed channel or group should not resubscribe (#384)
* When subscribing to already subscribed channel or group should not resubscribe just emit SubscriptionChanged status.
1 parent 415c246 commit bdcdeb1

File tree

4 files changed

+196
-13
lines changed

4 files changed

+196
-13
lines changed

pubnub-kotlin/pubnub-kotlin-impl/src/integrationTest/kotlin/com/pubnub/api/integration/SubscribeIntegrationTests.kt

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1365,6 +1365,134 @@ class SubscribeIntegrationTests : BaseIntegrationTest() {
13651365
}
13661366
}
13671367

1368+
@Test
1369+
fun whenSubscribingToAlreadySubscribedChannelShouldNotResubscribeButShouldEmitSubscriptionChangedStatus() {
1370+
val channelName = randomChannel()
1371+
val channelName02 = randomChannel()
1372+
val expectedMessage = "test_${randomValue()}"
1373+
1374+
val connectedLatch = CountDownLatch(1)
1375+
val subscriptionChangedLatch = CountDownLatch(1)
1376+
val messagesLatch = CountDownLatch(2)
1377+
1378+
val connectedEventCount = AtomicInteger(0)
1379+
val subscriptionChangedCount = AtomicInteger(0)
1380+
val subscribeHttpRequestCount = AtomicInteger(0)
1381+
1382+
// Track messages received by each listener
1383+
val listenerAMessageCount = AtomicInteger(0)
1384+
val listenerBMessageCount = AtomicInteger(0)
1385+
1386+
// Custom logger to count HTTP subscribe requests
1387+
val customLogger = object : CustomLogger {
1388+
override fun debug(logMessage: LogMessage) {
1389+
if (logMessage.type == LogMessageType.NETWORK_REQUEST) {
1390+
val networkRequestDetails = logMessage.message as LogMessageContent.NetworkRequest
1391+
if (networkRequestDetails.path.contains("/v2/subscribe/")) {
1392+
subscribeHttpRequestCount.incrementAndGet()
1393+
}
1394+
}
1395+
}
1396+
}
1397+
1398+
clientConfig = {
1399+
customLoggers = listOf(customLogger)
1400+
}
1401+
1402+
pubnub.addListener(
1403+
object : StatusListener {
1404+
override fun status(
1405+
pubnub: PubNub,
1406+
status: PNStatus,
1407+
) {
1408+
when (status.category) {
1409+
PNStatusCategory.PNConnectedCategory -> {
1410+
connectedEventCount.incrementAndGet()
1411+
connectedLatch.countDown()
1412+
}
1413+
1414+
PNStatusCategory.PNSubscriptionChanged -> {
1415+
subscriptionChangedCount.incrementAndGet()
1416+
subscriptionChangedLatch.countDown()
1417+
}
1418+
1419+
else -> {}
1420+
}
1421+
}
1422+
}
1423+
)
1424+
1425+
// First subscription
1426+
val subscriptionSet1 = pubnub.subscriptionSetOf(setOf(channelName, channelName02))
1427+
subscriptionSet1.addListener(
1428+
object : EventListener {
1429+
override fun message(
1430+
pubnub: PubNub,
1431+
result: PNMessageResult,
1432+
) {
1433+
listenerAMessageCount.incrementAndGet()
1434+
messagesLatch.countDown()
1435+
}
1436+
}
1437+
)
1438+
1439+
// Second subscription (SAME channel, different listener)
1440+
val subscriptionSet2 = pubnub.subscriptionSetOf(setOf(channelName))
1441+
subscriptionSet2.addListener(
1442+
object : EventListener {
1443+
override fun message(
1444+
pubnub: PubNub,
1445+
result: PNMessageResult,
1446+
) {
1447+
listenerBMessageCount.incrementAndGet()
1448+
messagesLatch.countDown()
1449+
}
1450+
}
1451+
)
1452+
1453+
try {
1454+
subscriptionSet1.subscribe()
1455+
assertTrue("Failed to receive PNConnectedCategory", connectedLatch.await(5, TimeUnit.SECONDS))
1456+
1457+
subscriptionSet2.subscribe()
1458+
assertTrue("Failed to receive PNSubscriptionChanged", subscriptionChangedLatch.await(5, TimeUnit.SECONDS))
1459+
1460+
// Give a moment for any potential resubscribe to occur
1461+
Thread.sleep(500)
1462+
1463+
// Publish a message - both listeners should receive it
1464+
pubnub.publish(channelName, expectedMessage).sync()
1465+
assertTrue("Failed to receive messages on both listeners", messagesLatch.await(5, TimeUnit.SECONDS))
1466+
1467+
// Verify both listeners received the message (this works regardless of the bug)
1468+
assertEquals("ListenerA should receive message", 1, listenerAMessageCount.get())
1469+
assertEquals("ListenerB should receive message", 1, listenerBMessageCount.get())
1470+
1471+
assertEquals(
1472+
"Should emit PNSubscriptionChanged status but not resubscribe when channels unchanged",
1473+
1, // Status emitted but no actual resubscribe (no cancel/new receive)
1474+
subscriptionChangedCount.get()
1475+
)
1476+
1477+
assertEquals(
1478+
"Should have exactly 1 PNConnectedCategory (initial handshake only)",
1479+
1,
1480+
connectedEventCount.get()
1481+
)
1482+
1483+
// Should only have 2 HTTP "/subscribe" requests: handshake + subscribe
1484+
assertEquals(
1485+
"Should have exactly 2 HTTP subscribe requests (handshake + subscribe, NO resubscribe)",
1486+
2,
1487+
subscribeHttpRequestCount.get()
1488+
)
1489+
} finally {
1490+
// Ensure cleanup happens even if assertions fail
1491+
subscriptionSet1.close()
1492+
subscriptionSet2.close()
1493+
}
1494+
}
1495+
13681496
@Test
13691497
fun shouldDeduplicateChannelSubscriptionsWhenSubscribingToListOfTheSameChannels() {
13701498
// given

pubnub-kotlin/pubnub-kotlin-impl/src/main/kotlin/com/pubnub/internal/eventengine/State.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ internal interface State<Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>
1010

1111
internal fun <Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>> S.noTransition(): Pair<S, Set<Ei>> = Pair(this, emptySet())
1212

13+
internal fun <Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>> S.noTransitionWithEffects(
14+
vararg invocations: Ei,
15+
): Pair<S, Set<Ei>> = Pair(this, invocations.toSet())
16+
1317
internal fun <Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>> S.transitionTo(
1418
state: S,
1519
vararg invocations: Ei,

pubnub-kotlin/pubnub-kotlin-impl/src/main/kotlin/com/pubnub/internal/subscribe/eventengine/state/SubscribeState.kt

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import com.pubnub.api.enums.PNStatusCategory
55
import com.pubnub.api.models.consumer.PNStatus
66
import com.pubnub.internal.eventengine.State
77
import com.pubnub.internal.eventengine.noTransition
8+
import com.pubnub.internal.eventengine.noTransitionWithEffects
89
import com.pubnub.internal.eventengine.transitionTo
910
import com.pubnub.internal.subscribe.eventengine.effect.SubscribeEffectInvocation
1011
import com.pubnub.internal.subscribe.eventengine.event.SubscribeEvent
@@ -227,17 +228,32 @@ internal sealed class SubscribeState : State<SubscribeEffectInvocation, Subscrib
227228
}
228229

229230
is SubscribeEvent.SubscriptionChanged -> {
230-
transitionTo(
231-
Receiving(event.channels, event.channelGroups, subscriptionCursor),
232-
SubscribeEffectInvocation.EmitStatus(
233-
PNStatus(
234-
PNStatusCategory.PNSubscriptionChanged,
235-
currentTimetoken = subscriptionCursor.timetoken,
236-
affectedChannels = event.channels,
237-
affectedChannelGroups = event.channelGroups,
231+
// If channels and channelGroups haven't changed, emit status without resubscribing
232+
if (event.channels == channels && event.channelGroups == channelGroups) {
233+
noTransitionWithEffects(
234+
SubscribeEffectInvocation.EmitStatus(
235+
PNStatus(
236+
PNStatusCategory.PNSubscriptionChanged,
237+
currentTimetoken = subscriptionCursor.timetoken,
238+
affectedChannels = event.channels,
239+
affectedChannelGroups = event.channelGroups,
240+
),
238241
),
239-
),
240-
)
242+
)
243+
} else {
244+
// Channels changed, need to resubscribe
245+
transitionTo(
246+
Receiving(event.channels, event.channelGroups, subscriptionCursor),
247+
SubscribeEffectInvocation.EmitStatus(
248+
PNStatus(
249+
PNStatusCategory.PNSubscriptionChanged,
250+
currentTimetoken = subscriptionCursor.timetoken,
251+
affectedChannels = event.channels,
252+
affectedChannelGroups = event.channelGroups,
253+
),
254+
),
255+
)
256+
}
241257
}
242258

243259
is SubscribeEvent.SubscriptionRestored -> {

pubnub-kotlin/pubnub-kotlin-impl/src/test/kotlin/com/pubnub/internal/subscribe/eventengine/worker/TransitionFromReceivingStateTest.kt

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,44 @@ class TransitionFromReceivingStateTest {
8484
}
8585

8686
@Test
87-
fun can_transit_from_RECEIVING_to_RECEIVING_when_there_is_SUBSCRIPTION_CHANGED_event() {
87+
fun can_transit_from_RECEIVING_to_RECEIVING_when_there_is_SUBSCRIPTION_CHANGED_event_with_different_channels_and_groups() {
8888
// given
89+
val newChannels = setOf("Channel2", "Channel3")
90+
val newChannelGroups = setOf("ChannelGroup2")
91+
92+
// when
93+
val (state, invocations) =
94+
transition(
95+
SubscribeState.Receiving(channels, channelGroups, subscriptionCursor),
96+
SubscribeEvent.SubscriptionChanged(newChannels, newChannelGroups),
97+
)
98+
99+
// then
100+
Assertions.assertTrue(state is SubscribeState.Receiving)
101+
state as SubscribeState.Receiving
102+
103+
assertEquals(newChannels, state.channels)
104+
assertEquals(newChannelGroups, state.channelGroups)
105+
assertEquals(subscriptionCursor, state.subscriptionCursor)
106+
assertEquals(
107+
setOf(
108+
SubscribeEffectInvocation.CancelReceiveMessages,
109+
SubscribeEffectInvocation.EmitStatus(
110+
createSubscriptionChangedStatus(
111+
state.subscriptionCursor,
112+
newChannels,
113+
newChannelGroups,
114+
),
115+
),
116+
SubscribeEffectInvocation.ReceiveMessages(newChannels, newChannelGroups, subscriptionCursor),
117+
),
118+
invocations,
119+
)
120+
}
121+
122+
@Test
123+
fun stays_in_RECEIVING_and_emits_status_without_resubscribing_when_SUBSCRIPTION_CHANGED_event_has_same_channels_and_groups() {
124+
// given - channels and channelGroups are the same
89125
// when
90126
val (state, invocations) =
91127
transition(
@@ -100,17 +136,16 @@ class TransitionFromReceivingStateTest {
100136
assertEquals(channels, state.channels)
101137
assertEquals(channelGroups, state.channelGroups)
102138
assertEquals(subscriptionCursor, state.subscriptionCursor)
139+
// Should only emit status - no cancel or new receive
103140
assertEquals(
104141
setOf(
105-
SubscribeEffectInvocation.CancelReceiveMessages,
106142
SubscribeEffectInvocation.EmitStatus(
107143
createSubscriptionChangedStatus(
108144
state.subscriptionCursor,
109145
channels,
110146
channelGroups,
111147
),
112148
),
113-
SubscribeEffectInvocation.ReceiveMessages(channels, channelGroups, subscriptionCursor),
114149
),
115150
invocations,
116151
)

0 commit comments

Comments
 (0)