diff --git a/test/integration/components/listeners.test.ts b/test/integration/components/listeners.test.ts index 8a48bd062..ac801291a 100644 --- a/test/integration/components/listeners.test.ts +++ b/test/integration/components/listeners.test.ts @@ -614,4 +614,122 @@ describe('#listeners', () => { expect(actual.uuid).to.be.equal('bob'); expect(actual.timetoken).to.not.be.equal(undefined); }); + it('should route messages correctly between subscription sets and individual subscriptions', (done) => { + utils.createPresenceMockScopes({ + subKey: 'mySubKey', + presenceType: 'heartbeat', + requests: [{ channels: ['a', 'b', 'c', 'd'] }], + }); + + utils.createSubscribeMockScopes({ + subKey: 'mySubKey', + pnsdk: `PubNub-JS-Nodejs/${pubnub.getVersion()}`, + userId: 'myUUID', + eventEngine: true, + requests: [ + { + channels: ['a', 'b', 'c', 'd'], + messages: [ + { channel: 'a', message: { id: 'msg-channel-a', content: 'Message for channel A' } }, + { channel: 'b', message: { id: 'msg-channel-b', content: 'Message for channel B' } }, + { channel: 'c', message: { id: 'msg-channel-c', content: 'Message for channel C' } }, + // Intentionally NO message for channel 'd' - this tests message isolation + ], + }, + // Final empty response to end subscription loop + { channels: ['a', 'b', 'c', 'd'], messages: [], replyDelay: 500 }, + ], + }); + + // Message tracking arrays for individual subscriptions (as requested) + const channelASubscriptionMessagesReceived: Subscription.Message[] = []; + const channelBSubscriptionMessagesReceived: Subscription.Message[] = []; + const channelCSubscriptionMessagesReceived: Subscription.Message[] = []; + const subDReceivedMessages: Subscription.Message[] = []; // Array for subD as requested + const subscriptionSet1MessagesReceived: Subscription.Message[] = []; + + // Create individual subscriptions (subA, subB, subD) + const subA = pubnub.channel('a').subscription(); + const subB = pubnub.channel('b').subscription(); + const subD = pubnub.channel('d').subscription(); // Added subD for channel 'd' + const subscriptionC = pubnub.channel('c').subscription(); // Individual subscription for channel 'c' + + // Add message listeners to individual subscriptions to maintain received message arrays + subA.onMessage = (message) => channelASubscriptionMessagesReceived.push(message); + subB.onMessage = (message) => channelBSubscriptionMessagesReceived.push(message); + subD.onMessage = (message) => subDReceivedMessages.push(message); // subD listener as requested + subscriptionC.onMessage = (message) => channelCSubscriptionMessagesReceived.push(message); + + // Create subscriptionSet1 by adding individual subscriptions (subA + subB + subD + subscriptionC) + // This creates a subscription set covering channels 'a', 'b', 'c', 'd' + const subscriptionSet1 = subA.addSubscription(subB); + subscriptionSet1.addSubscription(subD); // Add subD to subscriptionSet as requested + subscriptionSet1.addSubscription(subscriptionC); + subscriptionSet1.onMessage = (message) => subscriptionSet1MessagesReceived.push(message); + + // Track when we've received enough messages to verify the test + let messageCount = 0; + let testCompleted = false; + + const checkCompletion = () => { + messageCount++; + // We expect messages for channels a, b, c - both to individual subscriptions and subscription set + // subD should receive NO messages since no message is sent to channel 'd' + if (messageCount >= 6 && !testCompleted) { + // 3 for set + 3 for individual subscriptions (a,b,c only) + testCompleted = true; + + try { + // Verify that each individual subscription receives messages for its respective channel + expect(channelASubscriptionMessagesReceived.length).to.equal(1); + expect((channelASubscriptionMessagesReceived[0].message as any).id).to.equal('msg-channel-a'); + expect(channelASubscriptionMessagesReceived[0].channel).to.equal('a'); + + expect(channelBSubscriptionMessagesReceived.length).to.equal(1); + expect((channelBSubscriptionMessagesReceived[0].message as any).id).to.equal('msg-channel-b'); + expect(channelBSubscriptionMessagesReceived[0].channel).to.equal('b'); + + expect(channelCSubscriptionMessagesReceived.length).to.equal(1); + expect((channelCSubscriptionMessagesReceived[0].message as any).id).to.equal('msg-channel-c'); + expect(channelCSubscriptionMessagesReceived[0].channel).to.equal('c'); + + // subD should NOT receive any messages since no message was sent to channel 'd' + // This confirms that messages are not routed to wrong subscription listeners + expect(subDReceivedMessages.length).to.equal(0, 'subD should not receive messages for channels a,b,c'); + + // Verify that subscriptionSet1 receives messages for channels a, b, c (but not d since no message sent) + expect(subscriptionSet1MessagesReceived.length).to.equal(3); + const set1MessageIds = subscriptionSet1MessagesReceived.map((m) => (m.message as any).id).sort(); + expect(set1MessageIds).to.deep.equal(['msg-channel-a', 'msg-channel-b', 'msg-channel-c']); + + // Verify correct channel routing for each message in the subscription set + const channelAMsg = subscriptionSet1MessagesReceived.find((m) => (m.message as any).id === 'msg-channel-a'); + const channelBMsg = subscriptionSet1MessagesReceived.find((m) => (m.message as any).id === 'msg-channel-b'); + const channelCMsg = subscriptionSet1MessagesReceived.find((m) => (m.message as any).id === 'msg-channel-c'); + + expect(channelAMsg?.channel).to.equal('a'); + expect(channelBMsg?.channel).to.equal('b'); + expect(channelCMsg?.channel).to.equal('c'); + + // Verify that no message for channel 'd' exists in the subscription set + const channelDMsg = subscriptionSet1MessagesReceived.find((m) => m.channel === 'd'); + expect(channelDMsg).to.be.undefined; + + done(); + } catch (error) { + done(error); + } + } + }; + + // Add listeners to test completion check + subscriptionSet1.addListener({ message: checkCompletion }); + subA.addListener({ message: checkCompletion }); + subB.addListener({ message: checkCompletion }); + subD.addListener({ message: checkCompletion }); + subscriptionC.addListener({ message: checkCompletion }); + + // Subscribe the subscription set (which includes a, b, c, d through individual subscriptions) + subscriptionSet1.subscribe(); + }); }); diff --git a/test/integration/shared-worker/shared-worker.test.ts b/test/integration/shared-worker/shared-worker.test.ts index 96f108c95..9bb42f2e8 100644 --- a/test/integration/shared-worker/shared-worker.test.ts +++ b/test/integration/shared-worker/shared-worker.test.ts @@ -8,6 +8,8 @@ describe('PubNub Shared Worker Integration Tests', () => { let pubnubWithWorker: PubNub; let pubnubWithoutWorker: PubNub; let testChannels: string[]; + const subscribeKey = process.env.SUBSCRIBE_KEY || 'demo'; + const publishKey = process.env.PUBLISH_KEY || 'demo'; // Determine the correct worker URL based on the environment const getWorkerUrl = () => { @@ -27,8 +29,8 @@ describe('PubNub Shared Worker Integration Tests', () => { // Create PubNub instance with shared worker pubnubWithWorker = new PubNub({ - publishKey: 'demo', - subscribeKey: 'demo', + publishKey, + subscribeKey, userId: `shared-worker-user-${testId}`, enableEventEngine: true, subscriptionWorkerUrl: getWorkerUrl(), @@ -38,8 +40,8 @@ describe('PubNub Shared Worker Integration Tests', () => { // Create PubNub instance without shared worker for comparison pubnubWithoutWorker = new PubNub({ - publishKey: 'demo', - subscribeKey: 'demo', + publishKey, + subscribeKey, userId: `regular-user-${testId}`, heartbeatInterval: 10, // Increased for more stability enableEventEngine: true, @@ -484,8 +486,8 @@ describe('PubNub Shared Worker Integration Tests', () => { // Wait for subscription to be established, then trigger presence setTimeout(() => { const tempClient = new PubNub({ - publishKey: 'demo', - subscribeKey: 'demo', + publishKey, + subscribeKey, userId: `temp-user-${Date.now()}`, }); @@ -615,8 +617,8 @@ describe('PubNub Shared Worker Integration Tests', () => { // Create two PubNub instances with shared worker const pubnub1 = new PubNub({ - publishKey: 'demo', - subscribeKey: 'demo', + publishKey, + subscribeKey, userId: `user1-${testId}`, enableEventEngine: true, subscriptionWorkerUrl: getWorkerUrl(), @@ -624,8 +626,8 @@ describe('PubNub Shared Worker Integration Tests', () => { }); const pubnub2 = new PubNub({ - publishKey: 'demo', - subscribeKey: 'demo', + publishKey, + subscribeKey, userId: `user2-${testId}`, enableEventEngine: true, subscriptionWorkerUrl: getWorkerUrl(), @@ -688,8 +690,8 @@ describe('PubNub Shared Worker Integration Tests', () => { setTimeout(() => { // Use a third instance to publish to avoid self-message issues const publisher = new PubNub({ - publishKey: 'demo', - subscribeKey: 'demo', + publishKey, + subscribeKey, userId: `publisher-${testId}`, }); @@ -751,8 +753,8 @@ describe('PubNub Shared Worker Integration Tests', () => { // Create two PubNub instances with shared worker const instance1 = new PubNub({ - publishKey: 'demo', - subscribeKey: 'demo', + publishKey, + subscribeKey, userId: `instance1-${testId}`, enableEventEngine: true, subscriptionWorkerUrl: getWorkerUrl(), @@ -760,8 +762,8 @@ describe('PubNub Shared Worker Integration Tests', () => { }); const instance2 = new PubNub({ - publishKey: 'demo', - subscribeKey: 'demo', + publishKey, + subscribeKey, userId: `instance2-${testId}`, enableEventEngine: true, subscriptionWorkerUrl: getWorkerUrl(), @@ -837,8 +839,8 @@ describe('PubNub Shared Worker Integration Tests', () => { setTimeout(() => { // Create a publisher instance to send messages const publisher = new PubNub({ - publishKey: 'demo', - subscribeKey: 'demo', + publishKey, + subscribeKey, userId: `publisher-${testId}`, }); @@ -995,8 +997,8 @@ describe('PubNub Shared Worker Integration Tests', () => { // Create a temporary PubNub instance without shared worker to verify the request structure const tempPubNub = new PubNub({ - publishKey: 'demo', - subscribeKey: 'demo', + publishKey, + subscribeKey, userId: `temp-user-${Date.now()}`, enableEventEngine: true, autoNetworkDetection: false, @@ -1489,6 +1491,360 @@ describe('PubNub Shared Worker Integration Tests', () => { const subscription1 = pubnubWithWorker.channel(channel1).subscription(); subscription1.subscribe(); }).timeout(20000); + + it('should handle token refresh across multiple PubNub instances with shared worker aggregation', (done) => { + const testId = `token-refresh-test-${Date.now()}-${Math.floor(Math.random() * 1000)}`; + const sharedUserId = `shared-user-${testId}`; + const initialToken = `initial-token-${testId}`; + const updatedToken = `updated-token-${testId}`; + + const channels = Array.from({ length: 25 }, (_, i) => `channel-${i + 1}-${testId}`); + + const testChannel = channels[0]; + const testMessage = 'hello for channel1'; + + const pubnubInstances: PubNub[] = []; + + for (let i = 0; i < 25; i++) { + const instance = new PubNub({ + publishKey, + subscribeKey, + userId: sharedUserId, + authKey: initialToken, + enableEventEngine: true, + subscriptionWorkerUrl: getWorkerUrl(), + heartbeatInterval: 4, + presenceTimeout: 15, + autoNetworkDetection: false, + }); + + pubnubInstances.push(instance); + } + + let interceptedRequests: any[] = []; + let testCompleted = false; + let errorOccurred = false; + let tokenUpdateCompleted = false; + let messagePublished = false; + let messageReceived = false; + let tokenRefreshVerified = false; + let initialHeartbeatVerified = false; + let updatedHeartbeatVerified = false; + let subscriptionChangeTriggered = false; + let initialSubscribeVerified = false; + + const tokenVerificationInstance = pubnubInstances[0]; + + const transport = (tokenVerificationInstance as any).transport; + const underlyingTransport = transport.configuration.transport; + const originalMakeSendable = underlyingTransport.makeSendable.bind(underlyingTransport); + + underlyingTransport.makeSendable = function (req: any) { + const isSubscribeRequest = req.path.includes('/v2/subscribe/') || req.path.includes('/subscribe'); + const isHeartbeatRequest = req.path.includes('/v2/presence/') || req.path.includes('/heartbeat'); + + if (isSubscribeRequest || isHeartbeatRequest) { + const interceptedRequest = { + path: req.path, + queryParameters: req.queryParameters, + method: req.method, + origin: req.origin, + timestamp: Date.now(), + requestType: isSubscribeRequest ? 'subscribe' : 'heartbeat', + }; + + interceptedRequests.push(interceptedRequest); + + if (isSubscribeRequest) { + if (!tokenUpdateCompleted && !initialSubscribeVerified) { + try { + if (interceptedRequest.queryParameters && interceptedRequest.queryParameters.auth) { + expect(interceptedRequest.queryParameters.auth).to.equal(initialToken); + initialSubscribeVerified = true; + + // Set a timeout to update token after allowing time for heartbeat verification + setTimeout(() => { + if (!testCompleted && !errorOccurred && !tokenUpdateCompleted) { + if (initialHeartbeatVerified) { + tokenVerificationInstance.setToken(updatedToken); + tokenUpdateCompleted = true; + + // Trigger subscription change to generate new requests with updated token + setTimeout(() => { + if (!subscriptionChangeTriggered && !testCompleted) { + const newChannelSub = tokenVerificationInstance + .channel(`new-channel-${testId}`) + .subscription(); + newChannelSub.subscribe(); + subscriptionChangeTriggered = true; + } + }, 1000); + } else { + tokenVerificationInstance.setToken(updatedToken); + tokenUpdateCompleted = true; + + setTimeout(() => { + if (!subscriptionChangeTriggered && !testCompleted) { + const newChannelSub = tokenVerificationInstance + .channel(`new-channel-${testId}`) + .subscription(); + newChannelSub.subscribe(); + subscriptionChangeTriggered = true; + } + }, 1000); + } + } + }, 4000); // 4 second delay to allow heartbeat requests + } + } catch (error) { + if (!testCompleted) { + errorOccurred = true; + testCompleted = true; + cleanup(); + done(error); + return; + } + } + } + // Handle subsequent subscription requests (after token update) + else if (tokenUpdateCompleted && !tokenRefreshVerified) { + try { + if (interceptedRequest.queryParameters && interceptedRequest.queryParameters.auth) { + expect(interceptedRequest.queryParameters.auth).to.equal(updatedToken); + tokenRefreshVerified = true; + + // Wait for updated heartbeat verification before proceeding to message test + if (updatedHeartbeatVerified) { + setTimeout(() => { + testMessageDelivery(); + }, 2000); + } else { + // Trigger fallback if heartbeat verification is taking too long + triggerMessageTestFallback(); + } + } + } catch (error) { + if (!testCompleted) { + errorOccurred = true; + testCompleted = true; + cleanup(); + done(error); + return; + } + } + } + } + + // Handle heartbeat requests + if (isHeartbeatRequest) { + try { + if (interceptedRequest.queryParameters && interceptedRequest.queryParameters.auth) { + const authToken = interceptedRequest.queryParameters.auth; + + // Check for heartbeat with initial token (before token update) + if (!tokenUpdateCompleted && authToken === initialToken && !initialHeartbeatVerified) { + initialHeartbeatVerified = true; + } + + // Check for heartbeat with updated token (after token update) + if (tokenUpdateCompleted && authToken === updatedToken && !updatedHeartbeatVerified) { + updatedHeartbeatVerified = true; + + // If both subscription and heartbeat token refresh are verified, proceed to message test + if (tokenRefreshVerified) { + setTimeout(() => { + testMessageDelivery(); + }, 2000); + } + } + + // Log unexpected tokens for debugging + if (authToken !== initialToken && authToken !== updatedToken) { + console.log('Warning: Unexpected auth token in heartbeat:', authToken); + } + } + } catch (error) { + console.log('Error processing heartbeat request:', error); + } + } + } + + // Call the original method to continue normal flow + return originalMakeSendable(req); + }; + + // Create a separate publisher instance without any interception + const publisherInstance = new PubNub({ + publishKey, + subscribeKey, + userId: `publisher-${testId}`, + enableEventEngine: true, + autoNetworkDetection: false, + }); + + // Create a separate listener instance without any interception + const listenerInstance = new PubNub({ + publishKey, + subscribeKey, + userId: `listener-${testId}`, + enableEventEngine: true, + autoNetworkDetection: false, + }); + + // Set up message listener on the separate listener instance + listenerInstance.addListener({ + message: (messageEvent: any) => { + if (errorOccurred || testCompleted) return; + // Check if this is our test message from the correct channel + if (messageEvent.message === testMessage && messageEvent.channel === testChannel) { + messageReceived = true; + + try { + expect(messageEvent.message).to.equal(testMessage); + expect(messageEvent.channel).to.equal(testChannel); + completeTest(); + } catch (error) { + if (!testCompleted) { + errorOccurred = true; + testCompleted = true; + cleanup(); + done(error); + } + } + } else { + console.log('Message received on listener but not matching expected criteria'); + } + }, + status: (statusEvent: any) => {}, + }); + + // Subscribe the listener instance to the test channel immediately + setTimeout(() => { + const listenerSubscription = listenerInstance.channel(testChannel).subscription(); + listenerSubscription.subscribe(); + }, 1000); // Small delay to ensure it's set up after the main instances + + // Function to test message delivery + async function testMessageDelivery() { + if (messagePublished || testCompleted) return; + messagePublished = true; + try { + // Use the separate publisher instance to avoid interception issues + await publisherInstance.publish({ + channel: testChannel, + message: testMessage, + }); + + // Set timeout to complete test if no message is received + setTimeout(() => { + if (!testCompleted && !messageReceived) { + console.log('Message delivery test completed (timeout reached)'); + completeTest(); + } + }, 3000); + } catch (error) { + console.log('Failed to publish message:', error); + // Complete test even if publish fails + completeTest(); + } + } + + // Fallback function to trigger message test if heartbeat verification is taking too long + function triggerMessageTestFallback() { + setTimeout(() => { + if (!testCompleted && !messagePublished && tokenRefreshVerified) { + testMessageDelivery(); + } + }, 8000); // Additional 8 seconds fallback + } + + // Function to complete the test successfully + function completeTest() { + if (!testCompleted) { + testCompleted = true; + cleanup(); + done(); + } + } + // Other instances don't need status listeners since we only monitor the first instance + function cleanup() { + // Restore original transport + underlyingTransport.makeSendable = originalMakeSendable; + + // Clean up publisher instance + publisherInstance.destroy(true); + + // Clean up listener instance + listenerInstance.removeAllListeners(); + listenerInstance.unsubscribeAll(); + listenerInstance.destroy(true); + + // Clean up all instances + pubnubInstances.forEach((instance) => { + instance.removeAllListeners(); + instance.unsubscribeAll(); + instance.destroy(true); + }); + } + + // Add timeout fallback + setTimeout(() => { + if (!testCompleted && !errorOccurred) { + testCompleted = true; + cleanup(); + + const debugInfo = { + interceptedRequestsCount: interceptedRequests.length, + tokenUpdateCompleted, + tokenRefreshVerified, + initialSubscribeVerified, + initialHeartbeatVerified, + updatedHeartbeatVerified, + subscriptionChangeTriggered, + messagePublished, + messageReceived, + testChannel, + testMessage, + interceptedRequests: interceptedRequests + .map((req) => ({ + auth: req.queryParameters?.auth, + requestType: req.requestType, + timestamp: req.timestamp, + path: req.path.substring(0, 50) + '...', + })) + .slice(-10), // Show last 10 requests + }; + + if (!initialSubscribeVerified) { + done(new Error(`Initial subscription token verification failed. Debug info: ${JSON.stringify(debugInfo)}`)); + } else if (!initialHeartbeatVerified) { + done(new Error(`Initial heartbeat token verification failed. Debug info: ${JSON.stringify(debugInfo)}`)); + } else if (!tokenUpdateCompleted) { + done(new Error(`Token update was not completed. Debug info: ${JSON.stringify(debugInfo)}`)); + } else if (!tokenRefreshVerified) { + done( + new Error(`Token refresh verification in subscription failed. Debug info: ${JSON.stringify(debugInfo)}`), + ); + } else if (!updatedHeartbeatVerified) { + done(new Error(`Updated heartbeat token verification failed. Debug info: ${JSON.stringify(debugInfo)}`)); + } else if (messagePublished && !messageReceived) { + done(new Error(`Message was published but not received. Debug info: ${JSON.stringify(debugInfo)}`)); + } else { + done(); + } + } + }, 30000); + + // Subscribe all 25 instances to their respective channels with 200ms delay + pubnubInstances.forEach((instance, index) => { + setTimeout(() => { + const subscription = instance.channel(channels[index]).subscription(); + subscription.subscribe(); + }, index * 200); // 200ms delay between each subscription + }); + + // All instances will subscribe and requests will be intercepted automatically + }).timeout(40000); }); describe('Subscription Behavior with Event Engine Disabled', () => { @@ -1507,8 +1863,8 @@ describe('PubNub Shared Worker Integration Tests', () => { ]; const config = { - publishKey: 'demo', - subscribeKey: 'demo', + publishKey, + subscribeKey, enableEventEngine: false, heartbeatInterval: 1.5, presenceTimeout: 5, @@ -1728,8 +2084,8 @@ describe('PubNub Shared Worker Integration Tests', () => { // Create three PubNub instances simulating different browser tabs const tab1 = new PubNub({ - publishKey: 'demo', - subscribeKey: 'demo', + publishKey, + subscribeKey, enableEventEngine: false, heartbeatInterval: 1.5, presenceTimeout: 5, @@ -1739,8 +2095,8 @@ describe('PubNub Shared Worker Integration Tests', () => { }); const tab2 = new PubNub({ - publishKey: 'demo', - subscribeKey: 'demo', + publishKey, + subscribeKey, enableEventEngine: false, heartbeatInterval: 1.5, presenceTimeout: 5, @@ -1750,8 +2106,8 @@ describe('PubNub Shared Worker Integration Tests', () => { }); const tab3 = new PubNub({ - publishKey: 'demo', - subscribeKey: 'demo', + publishKey, + subscribeKey, enableEventEngine: false, heartbeatInterval: 1.5, presenceTimeout: 5,