@@ -5,6 +5,10 @@ import com.google.gson.JsonObject
55import com.pubnub.api.PubNub
66import com.pubnub.api.callbacks.SubscribeCallback
77import com.pubnub.api.enums.PNStatusCategory
8+ import com.pubnub.api.logging.CustomLogger
9+ import com.pubnub.api.logging.LogMessage
10+ import com.pubnub.api.logging.LogMessageContent
11+ import com.pubnub.api.logging.LogMessageType
812import com.pubnub.api.models.consumer.PNStatus
913import com.pubnub.api.models.consumer.channel_group.PNChannelGroupsAddChannelResult
1014import com.pubnub.api.models.consumer.pubsub.PNMessageResult
@@ -1253,6 +1257,165 @@ class SubscribeIntegrationTests : BaseIntegrationTest() {
12531257 assertEquals(2 , subscriptionSet.subscriptions.size)
12541258 }
12551259
1260+ @Test
1261+ fun shouldDeduplicateChannelSubscriptionsWhenSubscribingToSameChannelMultipleTimes () {
1262+ // given
1263+ val numberOfSubscribe = 4
1264+ // punbub.subscribe does subscribe to already subscribed channel so only two subscribe calls should occur. Handshake and actual subscribe.
1265+ val countDownLatch = CountDownLatch (2 )
1266+ var interceptedUrl: HttpUrl ? = null
1267+ val testChannel = randomChannel()
1268+
1269+ val customLogger = object : CustomLogger {
1270+ override fun debug (logMessage : LogMessage ) {
1271+ if (logMessage.type == LogMessageType .NETWORK_REQUEST ) {
1272+ val networkRequestDetails = logMessage.message as LogMessageContent .NetworkRequest
1273+ if (networkRequestDetails.path.contains(" /v2/subscribe/" )) {
1274+ interceptedUrl = (networkRequestDetails.origin + networkRequestDetails.path).toHttpUrlOrNull()
1275+ countDownLatch.countDown()
1276+ }
1277+ }
1278+ }
1279+ }
1280+
1281+ clientConfig = {
1282+ customLoggers = listOf (customLogger)
1283+ }
1284+
1285+ try {
1286+ repeat(numberOfSubscribe) { iteration ->
1287+ pubnub.subscribe(channels = listOf (testChannel))
1288+ Thread .sleep(150 )
1289+ println (" Subscribe call ${iteration + 1 } /$numberOfSubscribe completed" )
1290+ }
1291+
1292+ // Wait for the subscribe request to be made
1293+ assertTrue(countDownLatch.await(12000 , TimeUnit .MILLISECONDS ))
1294+
1295+ // then: verify channel appears only once in subscribed channels
1296+ val subscribedChannels = pubnub.getSubscribedChannels()
1297+
1298+ assertEquals(1 , subscribedChannels.size)
1299+ assertTrue(subscribedChannels.contains(testChannel))
1300+
1301+ // then: verify the actual HTTP request only includes the channel once
1302+ assertNotNull(" Expected to intercept subscribe URL" , interceptedUrl)
1303+
1304+ val channelsParam = interceptedUrl!! .encodedPath
1305+ .substringAfter(" /subscribe/" )
1306+ .substringAfter(" /" )
1307+ .substringBefore(" /" )
1308+
1309+ val channelList = channelsParam.split(" ," ).filter { it.isNotEmpty() }
1310+
1311+ assertEquals(1 , channelList.count { it == testChannel })
1312+ } finally {
1313+ pubnub.forceDestroy()
1314+ }
1315+ }
1316+
1317+ @Test
1318+ fun heartbeatShouldDeduplicateChannelNameInUrlWhenSubscribingToSameChannelMultipleTimes () {
1319+ // given
1320+ val numberOfSubscribe = 4
1321+ val countDownLatch = CountDownLatch (2 ) // we want to verify second heartbeat URL
1322+ var interceptedUrl: HttpUrl ? = null
1323+ val testChannel = randomChannel()
1324+
1325+ val customLogger = object : CustomLogger {
1326+ override fun debug (logMessage : LogMessage ) {
1327+ if (logMessage.type == LogMessageType .NETWORK_REQUEST ) {
1328+ val networkRequestDetails = logMessage.message as LogMessageContent .NetworkRequest
1329+ if (networkRequestDetails.path.contains(" /v2/presence/" ) && networkRequestDetails.path.contains(" /heartbeat" )) {
1330+ interceptedUrl = (networkRequestDetails.origin + networkRequestDetails.path).toHttpUrlOrNull()
1331+ countDownLatch.countDown()
1332+ }
1333+ }
1334+ }
1335+ }
1336+
1337+ clientConfig = {
1338+ customLoggers = listOf (customLogger)
1339+ heartbeatInterval = 5
1340+ }
1341+
1342+ try {
1343+ repeat(numberOfSubscribe) { iteration ->
1344+ pubnub.subscribe(channels = listOf (testChannel))
1345+ Thread .sleep(150 )
1346+ println (" Subscribe call ${iteration + 1 } /$numberOfSubscribe completed" )
1347+ }
1348+
1349+ // Wait for the heartbeat request to be made
1350+ assertTrue(countDownLatch.await(6000 , TimeUnit .MILLISECONDS ))
1351+
1352+ // then: verify the actual HTTP request only includes the channel once
1353+ assertNotNull(" Expected to intercept heartbeat URL" , interceptedUrl)
1354+
1355+ // Extract channel from heartbeat URL: /v2/presence/sub-key/{sub-key}/channel/{channels}/heartbeat
1356+ val channelsParam = interceptedUrl!! .encodedPath
1357+ .substringAfter(" /channel/" )
1358+ .substringBefore(" /heartbeat" )
1359+
1360+ val channelList = channelsParam.split(" ," ).filter { it.isNotEmpty() }
1361+
1362+ assertEquals(1 , channelList.count { it == testChannel })
1363+ } finally {
1364+ pubnub.forceDestroy()
1365+ }
1366+ }
1367+
1368+ @Test
1369+ fun shouldDeduplicateChannelSubscriptionsWhenSubscribingToListOfTheSameChannels () {
1370+ // given
1371+ val countDownLatch = CountDownLatch (2 ) // Only two subscribe calls should occur. Handshake and actual subscribe.
1372+ var interceptedUrl: HttpUrl ? = null
1373+ val testChannel = randomChannel()
1374+
1375+ val customLogger = object : CustomLogger {
1376+ override fun debug (logMessage : LogMessage ) {
1377+ if (logMessage.type == LogMessageType .NETWORK_REQUEST ) {
1378+ val networkRequestDetails = logMessage.message as LogMessageContent .NetworkRequest
1379+ if (networkRequestDetails.path.contains(" /v2/subscribe/" )) {
1380+ interceptedUrl = (networkRequestDetails.origin + networkRequestDetails.path).toHttpUrlOrNull()
1381+ countDownLatch.countDown()
1382+ }
1383+ }
1384+ }
1385+ }
1386+
1387+ clientConfig = {
1388+ customLoggers = listOf (customLogger)
1389+ }
1390+
1391+ try {
1392+ pubnub.subscribe(channels = listOf (testChannel, testChannel, testChannel))
1393+
1394+ // Wait for the subscribe request to be made
1395+ assertTrue(countDownLatch.await(12000 , TimeUnit .MILLISECONDS ))
1396+
1397+ // then: verify channel appears only once in subscribed channels
1398+ val subscribedChannels = pubnub.getSubscribedChannels()
1399+
1400+ assertEquals(1 , subscribedChannels.size)
1401+ assertTrue(subscribedChannels.contains(testChannel))
1402+
1403+ // then: verify the actual HTTP request only includes the channel once
1404+ assertNotNull(" Expected to intercept subscribe URL" , interceptedUrl)
1405+
1406+ val channelsParam = interceptedUrl!! .encodedPath
1407+ .substringAfter(" /subscribe/" )
1408+ .substringAfter(" /" )
1409+ .substringBefore(" /" )
1410+
1411+ val channelList = channelsParam.split(" ," ).filter { it.isNotEmpty() }
1412+
1413+ assertEquals(1 , channelList.count { it == testChannel })
1414+ } finally {
1415+ pubnub.forceDestroy()
1416+ }
1417+ }
1418+
12561419 private fun publishToChannels (channelsList : List <String >) {
12571420 channelsList.forEach { channelName ->
12581421 pubnub.publish(channelName, " -=message to $channelName " ).sync()
0 commit comments