diff --git a/.changeset/floppy-schools-send.md b/.changeset/floppy-schools-send.md new file mode 100644 index 000000000..bbbf54cbc --- /dev/null +++ b/.changeset/floppy-schools-send.md @@ -0,0 +1,8 @@ +--- +'@signalwire/webrtc': minor +'@signalwire/core': minor +'@signalwire/js': minor +'@sw-internal/e2e-js': patch +--- + +CHANGED improved the handling of WebSockets reconnections. diff --git a/packages/core/src/BaseComponent.ts b/packages/core/src/BaseComponent.ts index b108a2255..bd07b08a1 100644 --- a/packages/core/src/BaseComponent.ts +++ b/packages/core/src/BaseComponent.ts @@ -314,6 +314,16 @@ export class BaseComponent< return this._attachWorker(name, def) } + public cancelWorker(workerTask: Task) { + const foundTaskIndex = this._runningWorkers.findIndex( + (task) => task === workerTask + ) + if (foundTaskIndex > -1) { + this._runningWorkers.splice(foundTaskIndex, 1) + workerTask.cancel() + } + } + private _setWorker( name: string, def: SDKWorkerDefinition diff --git a/packages/core/src/BaseSession.ts b/packages/core/src/BaseSession.ts index adb20fe37..a77aa5d81 100644 --- a/packages/core/src/BaseSession.ts +++ b/packages/core/src/BaseSession.ts @@ -7,8 +7,15 @@ import { safeParseJson, isJSONRPCResponse, SWCloseEvent, + isConnectRequest, } from './utils' -import { DEFAULT_HOST, WebSocketState } from './utils/constants' +import { + DEFAULT_HOST, + SYMBOL_CONNECT_ERROR, + SYMBOL_EXECUTE_CONNECTION_CLOSED, + SYMBOL_EXECUTE_TIMEOUT, + WebSocketState, +} from './utils/constants' import { RPCConnect, RPCConnectParams, @@ -69,15 +76,16 @@ export class BaseSession { private _host: string = DEFAULT_HOST private _executeTimeoutMs = 10 * 1000 - private _executeTimeoutError = Symbol.for('sw-execute-timeout') + private _executeTimeoutError = SYMBOL_EXECUTE_TIMEOUT private _executeQueue: Set = new Set() - private _swConnectError = Symbol.for('sw-connect-error') - private _executeConnectionClosed = Symbol.for('sw-execute-connection-closed') + private _swConnectError = SYMBOL_CONNECT_ERROR + private _executeConnectionClosed = SYMBOL_EXECUTE_CONNECTION_CLOSED private _checkPingDelay = 15 * 1000 private _checkPingTimer: any = null private _reconnectTimer: ReturnType private _status: SessionStatus = 'unknown' + private _resolveWaitConnected: null | (() => void) = null private _sessionChannel: SessionChannel private wsOpenHandler: (event: Event) => void private wsCloseHandler: (event: SWCloseEvent) => void @@ -181,6 +189,16 @@ export class BaseSession { return !Boolean(this.idle || !this.connected) } + protected async _waitConnected() { + return new Promise((resolve) => { + if (this.connected) { + resolve() + } else { + this._resolveWaitConnected = resolve + } + }) + } + set token(token: string) { this.options.token = token } @@ -329,7 +347,7 @@ export class BaseSession { if (error === this._executeConnectionClosed) { throw this._executeConnectionClosed } else if (error === this._executeTimeoutError) { - if ('method' in msg && msg.method === 'signalwire.connect') { + if (isConnectRequest(msg)) { throw this._swConnectError } this._checkCurrentStatus() @@ -400,6 +418,7 @@ export class BaseSession { this._clearTimers() await this.authenticate() this._status = 'connected' + this._resolveWaitConnected?.() this._flushExecuteQueue() this.dispatch(authSuccessAction()) } catch (error) { @@ -445,6 +464,7 @@ export class BaseSession { this._requests.forEach(({ reject }) => { reject(this._executeConnectionClosed) }) + this._requests.clear() } protected _onSocketMessage(event: MessageEvent) { diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index b4585d2f2..2eeab72b4 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -24,7 +24,14 @@ import { increasingDelay, decreasingDelay, constDelay, + isConnectRequest, + isVertoInvite, } from './utils' +import { + SYMBOL_CONNECT_ERROR, + SYMBOL_EXECUTE_CONNECTION_CLOSED, + SYMBOL_EXECUTE_TIMEOUT, +} from './utils/constants' import { WEBRTC_EVENT_TYPES, isWebrtcEventType } from './utils/common' import { BaseSession } from './BaseSession' import { BaseJWTSession } from './BaseJWTSession' @@ -74,12 +81,17 @@ export { isSATAuth, isJSONRPCRequest, isJSONRPCResponse, + isConnectRequest, + isVertoInvite, LOCAL_EVENT_PREFIX, stripNamespacePrefix, asyncRetry, increasingDelay, decreasingDelay, constDelay, + SYMBOL_CONNECT_ERROR, + SYMBOL_EXECUTE_CONNECTION_CLOSED, + SYMBOL_EXECUTE_TIMEOUT, } export * from './redux/features/component/componentSlice' diff --git a/packages/core/src/utils/constants.ts b/packages/core/src/utils/constants.ts index 1ad502064..a95e618c4 100644 --- a/packages/core/src/utils/constants.ts +++ b/packages/core/src/utils/constants.ts @@ -56,3 +56,9 @@ export const PRODUCT_PREFIXES = [ export const INTERNAL_GLOBAL_VIDEO_EVENTS = GLOBAL_VIDEO_EVENTS.map( (event) => `${PRODUCT_PREFIX_VIDEO}.${event}` as const ) + +export const SYMBOL_EXECUTE_CONNECTION_CLOSED = Symbol.for( + 'sw-execute-connection-closed' +) +export const SYMBOL_EXECUTE_TIMEOUT = Symbol.for('sw-execute-timeout') +export const SYMBOL_CONNECT_ERROR = Symbol.for('sw-connect-error') diff --git a/packages/core/src/utils/index.ts b/packages/core/src/utils/index.ts index fbe194e95..ab390acf0 100644 --- a/packages/core/src/utils/index.ts +++ b/packages/core/src/utils/index.ts @@ -227,3 +227,11 @@ export const isJSONRPCResponse = ( export const isSATAuth = (e?: Authorization): e is SATAuthorization => { return typeof e !== 'undefined' && 'jti' in e } + +export const isConnectRequest = (e: JSONRPCRequest | JSONRPCResponse) => + isJSONRPCRequest(e) && e.method == 'signalwire.connect' + +export const isVertoInvite = (e: JSONRPCRequest | JSONRPCResponse) => + isJSONRPCRequest(e) && + e.method == 'webrtc.verto' && + e.params?.message.method === 'verto.invite' diff --git a/packages/js/src/fabric/SATSession.ts b/packages/js/src/fabric/SATSession.ts index bb527c7e7..640b50e34 100644 --- a/packages/js/src/fabric/SATSession.ts +++ b/packages/js/src/fabric/SATSession.ts @@ -7,6 +7,11 @@ import { RPCReauthenticateParams, SATAuthorization, UNIFIED_CONNECT_VERSION, + isConnectRequest, + getLogger, + isVertoInvite, + SYMBOL_EXECUTE_CONNECTION_CLOSED, + SYMBOL_EXECUTE_TIMEOUT, } from '@signalwire/core' import { JWTSession } from '../JWTSession' import { SATSessionOptions } from './interfaces' @@ -71,15 +76,24 @@ export class SATSession extends JWTSession { override async execute(msg: JSONRPCRequest | JSONRPCResponse): Promise { return asyncRetry({ - asyncCallable: () => super.execute(msg), + asyncCallable: async () => { + await this._waitConnected() // avoid queuing a retry + return super.execute(msg) + }, maxRetries: this.options.maxApiRequestRetries, delayFn: increasingDelay({ initialDelay: this.options.apiRequestRetriesDelay, variation: this.options.apiRequestRetriesDelayIncrement, }), expectedErrorHandler: (error) => { - if (error?.message?.startsWith('Authentication failed')) { - // is expected to be handle by the app developer, skipping retries + getLogger().warn(error) + if (isConnectRequest(msg)) { + // `signalwire.connect` retries are handle by the connection + return true + } + if (isVertoInvite(msg) && ![SYMBOL_EXECUTE_CONNECTION_CLOSED, SYMBOL_EXECUTE_TIMEOUT].includes(error)) { + // we can't retry verto.invites after errors on the transport layer + getLogger().debug('skip verto.invite retry on error:', error) return true } return false diff --git a/packages/js/src/fabric/WSClient.ts b/packages/js/src/fabric/WSClient.ts index a6cdcaa82..39ec26e45 100644 --- a/packages/js/src/fabric/WSClient.ts +++ b/packages/js/src/fabric/WSClient.ts @@ -165,12 +165,12 @@ export class WSClient extends BaseClient<{}> implements WSClientContract { let video = false let negotiateVideo = false - const queryParams = new URLSearchParams(query) - const channel = queryParams.get('channel') - if (channel === 'video') { - video = true - negotiateVideo = true - } + const queryParams = new URLSearchParams(query) + const channel = queryParams.get('channel') + if (channel === 'video') { + video = true + negotiateVideo = true + } const call = this.makeFabricObject({ audio: params.audio ?? true, diff --git a/packages/webrtc/src/BaseConnection.ts b/packages/webrtc/src/BaseConnection.ts index f3386ead6..b74e227a4 100644 --- a/packages/webrtc/src/BaseConnection.ts +++ b/packages/webrtc/src/BaseConnection.ts @@ -20,6 +20,10 @@ import { VertoAnswer, UpdateMediaParams, UpdateMediaDirection, + asyncRetry, + constDelay, + SYMBOL_EXECUTE_CONNECTION_CLOSED, + SYMBOL_EXECUTE_TIMEOUT, } from '@signalwire/core' import type { ReduxComponent, VertoModifyResponse } from '@signalwire/core' import RTCPeer from './RTCPeer' @@ -96,6 +100,10 @@ export class BaseConnection< return this.state === 'active' } + get requesting() { + return this.state === 'requesting' + } + get trying() { return this.state === 'trying' } @@ -277,6 +285,8 @@ export class BaseConnection< } } + private _myWorkers: Task[] = [] + getRTCPeerById(rtcPeerId: string) { return this.rtcPeerMap.get(rtcPeerId) } @@ -285,6 +295,10 @@ export class BaseConnection< return this.rtcPeerMap.set(rtcPeer.uuid, rtcPeer) } + removeRTCPeer(rtcPeerId: string) { + return this.rtcPeerMap.delete(rtcPeerId) + } + setActiveRTCPeer(rtcPeerId: string) { this.peer = this.getRTCPeerById(rtcPeerId) } @@ -741,38 +755,74 @@ export class BaseConnection< } runRTCPeerWorkers(rtcPeerId: string) { - this.runWorker('vertoEventWorker', { + const vertoWorker = this.runWorker('vertoEventWorker', { worker: workers.vertoEventWorker, initialState: { rtcPeerId }, }) + this._myWorkers.push(vertoWorker) const main = !(this.options.additionalDevice || this.options.screenShare) if (main) { - this.runWorker('roomSubscribedWorker', { + const subscribedWorker = this.runWorker('roomSubscribedWorker', { worker: workers.roomSubscribedWorker, initialState: { rtcPeerId }, }) + this._myWorkers.push(subscribedWorker) - this.runWorker('promoteDemoteWorker', { + const promoteDemoteWorker = this.runWorker('promoteDemoteWorker', { worker: workers.promoteDemoteWorker, initialState: { rtcPeerId }, }) + this._myWorkers.push(promoteDemoteWorker) + } + } + + removeRTCWorkers() { + for (const task of this._myWorkers) { + this.cancelWorker(task) + } + this._myWorkers = [] + } + + private _destroyPeer() { + if (this.peer) { + //clean up previous attempt + this.peer.detachAndStop() + this.removeRTCWorkers() + this.removeRTCPeer(this.peer.uuid) + this.peer = undefined } } /** @internal */ invite(): Promise { - return new Promise(async (resolve, reject) => { - this.direction = 'outbound' - this.peer = this._buildPeer('offer') - try { - await this.peer.start() - resolve(this as any as T) - } catch (error) { - this.logger.error('Invite error', error) - reject(error) - } + return asyncRetry({ + asyncCallable: async () => { + return new Promise(async (resolve, reject) => { + await this._waitUntilSessionAuthorized() + this.direction = 'outbound' + this.peer = this._buildPeer('offer') + try { + await this.peer.start() + resolve(this as any as T) + } catch (error) { + this.logger.error('Invite error', error) + this._destroyPeer() + reject(error) + } + }) + }, + maxRetries: 5, + delayFn: constDelay({ initialDelay: 0 }), + expectedErrorHandler: (error) => { + if (this.requesting && [SYMBOL_EXECUTE_CONNECTION_CLOSED, SYMBOL_EXECUTE_TIMEOUT].includes(error)) { // eslint-disable-line max-len, no-nested-ternaryerror === SYMBOL_EXECUTE_CONNECTION_CLOSED) { + this.logger.debug('Retrying verto.invite with new RTCPeer') + return false // we should retry + } + // other case are expected to be handle upstream + return true + }, }) } @@ -794,7 +844,7 @@ export class BaseConnection< } /** @internal */ - onLocalSDPReady(rtcPeer: RTCPeer) { + async onLocalSDPReady(rtcPeer: RTCPeer) { if (!rtcPeer.instance.localDescription) { this.logger.error('Missing localDescription', rtcPeer) throw new Error('Invalid RTCPeerConnection localDescription') @@ -899,11 +949,15 @@ export class BaseConnection< node_id: nodeId ?? this.options.nodeId, subscribe, }) + if (this.state === 'requesting') { + // The Server Created the call, and now is trying to connect us to the destination + this.setState('trying') + } this.logger.debug('Invite response', response) this.resuming = false } catch (error) { - this.setState('hangup') + this.setState('hangup') throw error } } diff --git a/packages/webrtc/src/workers/promoteDemoteWorker.ts b/packages/webrtc/src/workers/promoteDemoteWorker.ts index 2dbb80d9e..00d81bfdd 100644 --- a/packages/webrtc/src/workers/promoteDemoteWorker.ts +++ b/packages/webrtc/src/workers/promoteDemoteWorker.ts @@ -35,59 +35,63 @@ export const promoteDemoteWorker: SDKWorker< throw new Error('Missing rtcPeerId for promoteDemoteWorker') } - const action: MapToPubSubShape< - VideoMemberPromotedEvent | VideoMemberDemotedEvent - > = yield sagaEffects.take(swEventChannel, (action: SDKActions) => { - if ( - action.type === 'video.member.promoted' || - action.type === 'video.member.demoted' - ) { - return action.payload.member_id === instance.memberId - } - return false - }) + try { + const action: MapToPubSubShape< + VideoMemberPromotedEvent | VideoMemberDemotedEvent + > = yield sagaEffects.take(swEventChannel, (action: SDKActions) => { + if ( + action.type === 'video.member.promoted' || + action.type === 'video.member.demoted' + ) { + return action.payload.member_id === instance.memberId + } + return false + }) - getLogger().debug('promoteDemoteWorker:', action.type, action.payload) + getLogger().debug('promoteDemoteWorker:', action.type, action.payload) - yield sagaEffects.put( - sessionActions.updateAuthorization(action.payload.authorization) - ) - const authorization: VideoAuthorization = yield sagaEffects.select( - selectors.getAuthorization - ) - if (!authorization) { - throw new Error(`Invalid authorization for '${action.type}'`) - } + yield sagaEffects.put( + sessionActions.updateAuthorization(action.payload.authorization) + ) + const authorization: VideoAuthorization = yield sagaEffects.select( + selectors.getAuthorization + ) + if (!authorization) { + throw new Error(`Invalid authorization for '${action.type}'`) + } - // TODO: use the new getJoinMediaParams in here - const { audio_allowed, video_allowed } = authorization - switch (action.type) { - case 'video.member.promoted': - /** - * Promote means enable the media allowed and keep the - * same recv settings. (do not force recv media) - */ - instance.updateMediaOptions({ - audio: audio_allowed === 'both', - video: video_allowed === 'both', - negotiateAudio: audio_allowed !== 'none', - negotiateVideo: video_allowed !== 'none', - }) - break - case 'video.member.demoted': - /** - * Demote means force recvonly and receive only the media allowed. - */ - instance.updateMediaOptions({ - audio: false, - video: false, - negotiateAudio: audio_allowed !== 'none', - negotiateVideo: video_allowed !== 'none', - }) - break - } + // TODO: use the new getJoinMediaParams in here + const { audio_allowed, video_allowed } = authorization + switch (action.type) { + case 'video.member.promoted': + /** + * Promote means enable the media allowed and keep the + * same recv settings. (do not force recv media) + */ + instance.updateMediaOptions({ + audio: audio_allowed === 'both', + video: video_allowed === 'both', + negotiateAudio: audio_allowed !== 'none', + negotiateVideo: video_allowed !== 'none', + }) + break + case 'video.member.demoted': + /** + * Demote means force recvonly and receive only the media allowed. + */ + instance.updateMediaOptions({ + audio: false, + video: false, + negotiateAudio: audio_allowed !== 'none', + negotiateVideo: video_allowed !== 'none', + }) + break + } - instance._triggerNewRTCPeer() + instance._triggerNewRTCPeer() - getLogger().debug('promoteDemoteWorker ended', rtcPeerId) + getLogger().debug('promoteDemoteWorker ended', rtcPeerId) + } finally { + getLogger().debug(`promoteDemoteWorker for ${rtcPeerId} [cancelled]`) + } } diff --git a/packages/webrtc/src/workers/roomSubscribedWorker.ts b/packages/webrtc/src/workers/roomSubscribedWorker.ts index 42f563ff7..bde19c6e2 100644 --- a/packages/webrtc/src/workers/roomSubscribedWorker.ts +++ b/packages/webrtc/src/workers/roomSubscribedWorker.ts @@ -37,47 +37,50 @@ export const roomSubscribedWorker: SDKWorker< if (!rtcPeerId) { throw new Error('Missing rtcPeerId for roomSubscribedWorker') } + try { + const action: MapToPubSubShape = + yield sagaEffects.take(swEventChannel, (action: SDKActions) => { + if ( + action.type === 'video.room.subscribed' || + action.type === 'call.joined' + ) { + return action.payload.call_id === rtcPeerId + } + return false + }) - const action: MapToPubSubShape = - yield sagaEffects.take(swEventChannel, (action: SDKActions) => { - if ( - action.type === 'video.room.subscribed' || - action.type === 'call.joined' - ) { - return action.payload.call_id === rtcPeerId - } - return false - }) - - // New emitter should not change the payload by reference - const clonedPayload = JSON.parse(JSON.stringify(action.payload)) + // New emitter should not change the payload by reference + const clonedPayload = JSON.parse(JSON.stringify(action.payload)) - /** - * In here we joined a room_session so we can swap between RTCPeers - */ - instance.setActiveRTCPeer(rtcPeerId) + /** + * In here we joined a room_session so we can swap between RTCPeers + */ + instance.setActiveRTCPeer(rtcPeerId) - const roomSessionId = - action.payload.room_session.id || - action.payload.room_session.room_session_id + const roomSessionId = + action.payload.room_session.id || + action.payload.room_session.room_session_id - /** - * TODO: Replace the redux action/component with properties on RTCPeer instance? - */ - yield sagaEffects.put( - componentActions.upsert({ - id: action.payload.call_id, - roomId: action.payload.room_session.room_id, - roomSessionId: roomSessionId, - memberId: action.payload.member_id, - previewUrl: action.payload.room_session.preview_url, - }) - ) + /** + * TODO: Replace the redux action/component with properties on RTCPeer instance? + */ + yield sagaEffects.put( + componentActions.upsert({ + id: action.payload.call_id, + roomId: action.payload.room_session.room_id, + roomSessionId: roomSessionId, + memberId: action.payload.member_id, + previewUrl: action.payload.room_session.preview_url, + }) + ) - instance.emit('room.subscribed', action.payload) - instance.emit('room.joined', transformPayload.call(instance, clonedPayload)) + instance.emit('room.subscribed', action.payload) + instance.emit('room.joined', transformPayload.call(instance, clonedPayload)) - getLogger().debug('roomSubscribedWorker ended', rtcPeerId) + getLogger().debug('roomSubscribedWorker ended', rtcPeerId) + } finally { + getLogger().debug(`roomSubscribedWorker for ${rtcPeerId} [cancelled]`) + } } function transformPayload( diff --git a/packages/webrtc/src/workers/vertoEventWorker.ts b/packages/webrtc/src/workers/vertoEventWorker.ts index 05eef564f..5ad5edd9e 100644 --- a/packages/webrtc/src/workers/vertoEventWorker.ts +++ b/packages/webrtc/src/workers/vertoEventWorker.ts @@ -154,16 +154,21 @@ export const vertoEventWorker: SDKWorker< getLogger().error('Verto Error', error) }) - while (true) { - const action: MapToPubSubShape = - yield sagaEffects.take(swEventChannel, (action: SDKActions) => { - if (isWebrtcAction(action)) { - return action.payload.params?.callID === rtcPeerId - } - return false - }) - yield sagaEffects.fork(catchableWorker, action) + try { + while (true) { + const action: MapToPubSubShape = + yield sagaEffects.take(swEventChannel, (action: SDKActions) => { + getLogger().debug( + `vertoEventWorker for: ${rtcPeerId} checking action...` + ) + if (isWebrtcAction(action)) { + return action.payload.params?.callID === rtcPeerId + } + return false + }) + yield sagaEffects.fork(catchableWorker, action) + } + } finally { + getLogger().debug(`vertoEventWorker for ${rtcPeerId} [cancelled]`) } - - getLogger().trace('vertoEventWorker ended') }