Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/floppy-schools-send.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@signalwire/webrtc': minor
'@signalwire/core': minor
'@signalwire/js': minor
'@sw-internal/e2e-js': patch
---

CHANGED improved the handling of WebSockets reconnections.
10 changes: 10 additions & 0 deletions packages/core/src/BaseComponent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,16 @@ export class BaseComponent<
return this._attachWorker(name, def)
}

public cancelWorker(workerTask: Task) {
const foundTaskIndex = this._runningWorkers.findIndex(
(task) => task === workerTask
)
if (foundTaskIndex > -1) {
this._runningWorkers.splice(foundTaskIndex, 1)
workerTask.cancel()
}
}

private _setWorker<Hooks extends SDKWorkerHooks = SDKWorkerHooks>(
name: string,
def: SDKWorkerDefinition<Hooks>
Expand Down
30 changes: 25 additions & 5 deletions packages/core/src/BaseSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,15 @@ import {
safeParseJson,
isJSONRPCResponse,
SWCloseEvent,
isConnectRequest,
} from './utils'
import { DEFAULT_HOST, WebSocketState } from './utils/constants'
import {
DEFAULT_HOST,
SYMBOL_CONNECT_ERROR,
SYMBOL_EXECUTE_CONNECTION_CLOSED,
SYMBOL_EXECUTE_TIMEOUT,
WebSocketState,
} from './utils/constants'
import {
RPCConnect,
RPCConnectParams,
Expand Down Expand Up @@ -69,15 +76,16 @@ export class BaseSession {
private _host: string = DEFAULT_HOST

private _executeTimeoutMs = 10 * 1000
private _executeTimeoutError = Symbol.for('sw-execute-timeout')
private _executeTimeoutError = SYMBOL_EXECUTE_TIMEOUT
private _executeQueue: Set<JSONRPCRequest | JSONRPCResponse> = new Set()
private _swConnectError = Symbol.for('sw-connect-error')
private _executeConnectionClosed = Symbol.for('sw-execute-connection-closed')
private _swConnectError = SYMBOL_CONNECT_ERROR
private _executeConnectionClosed = SYMBOL_EXECUTE_CONNECTION_CLOSED

private _checkPingDelay = 15 * 1000
private _checkPingTimer: any = null
private _reconnectTimer: ReturnType<typeof setTimeout>
private _status: SessionStatus = 'unknown'
private _resolveWaitConnected: null | (() => void) = null
private _sessionChannel: SessionChannel
private wsOpenHandler: (event: Event) => void
private wsCloseHandler: (event: SWCloseEvent) => void
Expand Down Expand Up @@ -181,6 +189,16 @@ export class BaseSession {
return !Boolean(this.idle || !this.connected)
}

protected async _waitConnected() {
return new Promise<void>((resolve) => {
if (this.connected) {
resolve()
} else {
this._resolveWaitConnected = resolve
}
})
}

set token(token: string) {
this.options.token = token
}
Expand Down Expand Up @@ -329,7 +347,7 @@ export class BaseSession {
if (error === this._executeConnectionClosed) {
throw this._executeConnectionClosed
} else if (error === this._executeTimeoutError) {
if ('method' in msg && msg.method === 'signalwire.connect') {
if (isConnectRequest(msg)) {
throw this._swConnectError
}
this._checkCurrentStatus()
Expand Down Expand Up @@ -400,6 +418,7 @@ export class BaseSession {
this._clearTimers()
await this.authenticate()
this._status = 'connected'
this._resolveWaitConnected?.()
this._flushExecuteQueue()
this.dispatch(authSuccessAction())
} catch (error) {
Expand Down Expand Up @@ -445,6 +464,7 @@ export class BaseSession {
this._requests.forEach(({ reject }) => {
reject(this._executeConnectionClosed)
})
this._requests.clear()
}

protected _onSocketMessage(event: MessageEvent) {
Expand Down
12 changes: 12 additions & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@ import {
increasingDelay,
decreasingDelay,
constDelay,
isConnectRequest,
isVertoInvite,
} from './utils'
import {
SYMBOL_CONNECT_ERROR,
SYMBOL_EXECUTE_CONNECTION_CLOSED,
SYMBOL_EXECUTE_TIMEOUT,
} from './utils/constants'
import { WEBRTC_EVENT_TYPES, isWebrtcEventType } from './utils/common'
import { BaseSession } from './BaseSession'
import { BaseJWTSession } from './BaseJWTSession'
Expand Down Expand Up @@ -74,12 +81,17 @@ export {
isSATAuth,
isJSONRPCRequest,
isJSONRPCResponse,
isConnectRequest,
isVertoInvite,
LOCAL_EVENT_PREFIX,
stripNamespacePrefix,
asyncRetry,
increasingDelay,
decreasingDelay,
constDelay,
SYMBOL_CONNECT_ERROR,
SYMBOL_EXECUTE_CONNECTION_CLOSED,
SYMBOL_EXECUTE_TIMEOUT,
}

export * from './redux/features/component/componentSlice'
Expand Down
6 changes: 6 additions & 0 deletions packages/core/src/utils/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,9 @@ export const PRODUCT_PREFIXES = [
export const INTERNAL_GLOBAL_VIDEO_EVENTS = GLOBAL_VIDEO_EVENTS.map(
(event) => `${PRODUCT_PREFIX_VIDEO}.${event}` as const
)

export const SYMBOL_EXECUTE_CONNECTION_CLOSED = Symbol.for(
'sw-execute-connection-closed'
)
export const SYMBOL_EXECUTE_TIMEOUT = Symbol.for('sw-execute-timeout')
export const SYMBOL_CONNECT_ERROR = Symbol.for('sw-connect-error')
8 changes: 8 additions & 0 deletions packages/core/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,11 @@ export const isJSONRPCResponse = (
export const isSATAuth = (e?: Authorization): e is SATAuthorization => {
return typeof e !== 'undefined' && 'jti' in e
}

export const isConnectRequest = (e: JSONRPCRequest | JSONRPCResponse) =>
isJSONRPCRequest(e) && e.method == 'signalwire.connect'

export const isVertoInvite = (e: JSONRPCRequest | JSONRPCResponse) =>
isJSONRPCRequest(e) &&
e.method == 'webrtc.verto' &&
e.params?.message.method === 'verto.invite'
20 changes: 17 additions & 3 deletions packages/js/src/fabric/SATSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import {
RPCReauthenticateParams,
SATAuthorization,
UNIFIED_CONNECT_VERSION,
isConnectRequest,
getLogger,
isVertoInvite,
SYMBOL_EXECUTE_CONNECTION_CLOSED,
SYMBOL_EXECUTE_TIMEOUT,
} from '@signalwire/core'
import { JWTSession } from '../JWTSession'
import { SATSessionOptions } from './interfaces'
Expand Down Expand Up @@ -71,15 +76,24 @@ export class SATSession extends JWTSession {

override async execute(msg: JSONRPCRequest | JSONRPCResponse): Promise<any> {
return asyncRetry({
asyncCallable: () => super.execute(msg),
asyncCallable: async () => {
await this._waitConnected() // avoid queuing a retry
return super.execute(msg)
},
maxRetries: this.options.maxApiRequestRetries,
delayFn: increasingDelay({
initialDelay: this.options.apiRequestRetriesDelay,
variation: this.options.apiRequestRetriesDelayIncrement,
}),
expectedErrorHandler: (error) => {
if (error?.message?.startsWith('Authentication failed')) {
// is expected to be handle by the app developer, skipping retries
getLogger().warn(error)
if (isConnectRequest(msg)) {
// `signalwire.connect` retries are handle by the connection
return true
}
if (isVertoInvite(msg) && ![SYMBOL_EXECUTE_CONNECTION_CLOSED, SYMBOL_EXECUTE_TIMEOUT].includes(error)) {
// we can't retry verto.invites after errors on the transport layer
getLogger().debug('skip verto.invite retry on error:', error)
return true
}
return false
Expand Down
12 changes: 6 additions & 6 deletions packages/js/src/fabric/WSClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,12 @@ export class WSClient extends BaseClient<{}> implements WSClientContract {
let video = false
let negotiateVideo = false

const queryParams = new URLSearchParams(query)
const channel = queryParams.get('channel')
if (channel === 'video') {
video = true
negotiateVideo = true
}
const queryParams = new URLSearchParams(query)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please run the prettier

const channel = queryParams.get('channel')
if (channel === 'video') {
video = true
negotiateVideo = true
}

const call = this.makeFabricObject({
audio: params.audio ?? true,
Expand Down
84 changes: 69 additions & 15 deletions packages/webrtc/src/BaseConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import {
VertoAnswer,
UpdateMediaParams,
UpdateMediaDirection,
asyncRetry,
constDelay,
SYMBOL_EXECUTE_CONNECTION_CLOSED,
SYMBOL_EXECUTE_TIMEOUT,
} from '@signalwire/core'
import type { ReduxComponent, VertoModifyResponse } from '@signalwire/core'
import RTCPeer from './RTCPeer'
Expand Down Expand Up @@ -96,6 +100,10 @@ export class BaseConnection<
return this.state === 'active'
}

get requesting() {
return this.state === 'requesting'
}

get trying() {
return this.state === 'trying'
}
Expand Down Expand Up @@ -277,6 +285,8 @@ export class BaseConnection<
}
}

private _myWorkers: Task[] = []

getRTCPeerById(rtcPeerId: string) {
return this.rtcPeerMap.get(rtcPeerId)
}
Expand All @@ -285,6 +295,10 @@ export class BaseConnection<
return this.rtcPeerMap.set(rtcPeer.uuid, rtcPeer)
}

removeRTCPeer(rtcPeerId: string) {
return this.rtcPeerMap.delete(rtcPeerId)
}

setActiveRTCPeer(rtcPeerId: string) {
this.peer = this.getRTCPeerById(rtcPeerId)
}
Expand Down Expand Up @@ -741,38 +755,74 @@ export class BaseConnection<
}

runRTCPeerWorkers(rtcPeerId: string) {
this.runWorker('vertoEventWorker', {
const vertoWorker = this.runWorker('vertoEventWorker', {
worker: workers.vertoEventWorker,
initialState: { rtcPeerId },
})
this._myWorkers.push(vertoWorker)

const main = !(this.options.additionalDevice || this.options.screenShare)

if (main) {
this.runWorker('roomSubscribedWorker', {
const subscribedWorker = this.runWorker('roomSubscribedWorker', {
worker: workers.roomSubscribedWorker,
initialState: { rtcPeerId },
})
this._myWorkers.push(subscribedWorker)

this.runWorker('promoteDemoteWorker', {
const promoteDemoteWorker = this.runWorker('promoteDemoteWorker', {
worker: workers.promoteDemoteWorker,
initialState: { rtcPeerId },
})
this._myWorkers.push(promoteDemoteWorker)
}
}

removeRTCWorkers() {
for (const task of this._myWorkers) {
this.cancelWorker(task)
}
this._myWorkers = []
}

private _destroyPeer() {
if (this.peer) {
//clean up previous attempt
this.peer.detachAndStop()
this.removeRTCWorkers()
this.removeRTCPeer(this.peer.uuid)
this.peer = undefined
}
}

/** @internal */
invite<T>(): Promise<T> {
return new Promise(async (resolve, reject) => {
this.direction = 'outbound'
this.peer = this._buildPeer('offer')
try {
await this.peer.start()
resolve(this as any as T)
} catch (error) {
this.logger.error('Invite error', error)
reject(error)
}
return asyncRetry({
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the retry logic should be touching the internal methods/or API calls. This will make the SDK inconsistent, and it also does not care about developer-passed params, which means the developer can not control the retry specifically for the verto.invite.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I am confused. The global retry will also retry this API? Because this will in the end call the verto.invite, which will trigger the execute, and execute has a global retry logic. So, is this API nested under two retry methods?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The global retry doesn't retry verto.invites anymore, let it be handled here...
The verto.invite is a special case, and this is the main reason users are being disconnected from calls ATM.

In a nutshell, the SDK can't replay a verto.invite, it's to "execute" a new one using a new RTCPeer, or just after connecting, the call will disconnect for various reasons.

We may find other RPC with the same characteristics, but this PR only focuses on the verto.invite request.

asyncCallable: async () => {
return new Promise(async (resolve, reject) => {
await this._waitUntilSessionAuthorized()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you think the user can call this API if the session is not authorized? This is already a part of the connect logic. If that does not happen, the user can't call this API. If that happens, then do we need to wait here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not the user calling it, but he next scheduled retry, which is normally faster than the authentication.
So this Retry usage/implementation needs to handle that, or it's going to fail again.

this.direction = 'outbound'
this.peer = this._buildPeer('offer')
try {
await this.peer.start()
resolve(this as any as T)
} catch (error) {
this.logger.error('Invite error', error)
this._destroyPeer()
reject(error)
}
})
},
maxRetries: 5,
delayFn: constDelay({ initialDelay: 0 }),
expectedErrorHandler: (error) => {
if (this.requesting && [SYMBOL_EXECUTE_CONNECTION_CLOSED, SYMBOL_EXECUTE_TIMEOUT].includes(error)) { // eslint-disable-line max-len, no-nested-ternaryerror === SYMBOL_EXECUTE_CONNECTION_CLOSED) {
this.logger.debug('Retrying verto.invite with new RTCPeer')
return false // we should retry
}
// other case are expected to be handle upstream
return true
},
})
}

Expand All @@ -794,7 +844,7 @@ export class BaseConnection<
}

/** @internal */
onLocalSDPReady(rtcPeer: RTCPeer<EventTypes>) {
async onLocalSDPReady(rtcPeer: RTCPeer<EventTypes>) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not an async method

Suggested change
async onLocalSDPReady(rtcPeer: RTCPeer<EventTypes>) {
onLocalSDPReady(rtcPeer: RTCPeer<EventTypes>) {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async is not required here, but the method returns Promises, and since the return is being inferred,
Having the async act as a reminder for us, reading the code the the method is an async execution.

if (!rtcPeer.instance.localDescription) {
this.logger.error('Missing localDescription', rtcPeer)
throw new Error('Invalid RTCPeerConnection localDescription')
Expand Down Expand Up @@ -899,11 +949,15 @@ export class BaseConnection<
node_id: nodeId ?? this.options.nodeId,
subscribe,
})
if (this.state === 'requesting') {
// The Server Created the call, and now is trying to connect us to the destination
this.setState('trying')
}
Comment on lines +952 to +955
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point? Where are you using this trying state? The SDK is already setting the state to requesting when the invite begins, and it only sets the state to active when verto.answer is received. Please check the vertoEventWorker code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trying is a well-known state in a call setup.
https://github.com/signalwire/signalwire-js/blob/main/packages/core/src/utils/interfaces.ts#L276

The SDK should be in the requesting state only until the request is confirmed by the server.
After the successful request, the state should now be trying and later after the verto.answer active.

This makes more sense in 1:1 calls, supported by the CF SDK. I believe that's why it was missing in the Video SDK.

this.logger.debug('Invite response', response)

this.resuming = false
} catch (error) {
this.setState('hangup')
this.setState('hangup')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run prettier please

throw error
}
}
Expand Down
Loading
Loading