From 82e9b58b1f4c7475ae8c772a102d622b26e00fe1 Mon Sep 17 00:00:00 2001 From: Joao Santos Date: Wed, 16 Apr 2025 16:53:16 -0300 Subject: [PATCH 01/18] shouldAttach change --- packages/webrtc/src/BaseConnection.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/webrtc/src/BaseConnection.ts b/packages/webrtc/src/BaseConnection.ts index e7f9bfb85..76d8bf052 100644 --- a/packages/webrtc/src/BaseConnection.ts +++ b/packages/webrtc/src/BaseConnection.ts @@ -249,12 +249,16 @@ export class BaseConnection< pingSupported = true, } = this.options + const hasRemoteDescription = Boolean(this.peer?.instance?.remoteDescription) + // we should attach when resending an invite + const shouldAttach = attach || hasRemoteDescription + return { dialogParams: { id: rtcPeerId, destinationNumber, - attach, - reattaching: attach, + attach: shouldAttach, + reattaching: shouldAttach, callerName, callerNumber, remoteCallerName, From be09de28ebad7e438537df0cab652078ec2cf653 Mon Sep 17 00:00:00 2001 From: Joao Santos Date: Thu, 17 Apr 2025 08:44:58 -0300 Subject: [PATCH 02/18] reenable tests --- internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts b/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts index 60fcb09ff..fcec11427 100644 --- a/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts +++ b/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts @@ -2,7 +2,7 @@ import { test } from '../../fixtures' import { SERVER_URL, createCFClient, expectMCUVisible } from '../../utils' test.describe('CallFabric Reconnections', () => { - test.skip('Should reconnect the WebSocket as soon it gets onClose event, without media renegotiation', async ({ + test('Should reconnect the WebSocket as soon it gets onClose event, without media renegotiation', async ({ createCustomPage, }) => { const page = await createCustomPage({ name: '[page]' }) @@ -168,7 +168,7 @@ test.describe('CallFabric Reconnections', () => { }) }) - test.skip('Should reconnect the WebSocket when network is up (before FS timeout), without media renegotiation', async ({ + test('Should reconnect the WebSocket when network is up (before FS timeout), without media renegotiation', async ({ createCustomPage, }) => { const page = await createCustomPage({ name: '[page]' }) From d90b023dd18312432484a0bf8fc9c528b4ffb2b0 Mon Sep 17 00:00:00 2001 From: Joao Santos Date: Tue, 22 Apr 2025 14:22:21 -0300 Subject: [PATCH 03/18] no `verto.invite` for simple re connections --- .../callfabric/websocket_reconnect.spec.ts | 140 +++--------------- packages/js/src/fabric/FabricRoomSession.ts | 3 +- packages/webrtc/src/BaseConnection.ts | 8 +- 3 files changed, 23 insertions(+), 128 deletions(-) diff --git a/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts b/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts index fcec11427..388ae08f6 100644 --- a/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts +++ b/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts @@ -39,55 +39,14 @@ test.describe('CallFabric Reconnections', () => { await page.expectWsTraffic({ assertations: [ - { - type: 'send', - name: 'connect', - expect: { - method: 'signalwire.connect', - 'params.version.major': 4, - }, - }, - { - type: 'recv', - name: 'connect-response', - expect: { - 'result.authorization.jti': /.+/, - 'result.authorization.project_id': - 'cb1e91b6-ae04-4be0-89ae-0dffc5ea6aed', - 'result.authorization.fabric_subscriber.subscriber_id': - '48fe0d0c-ac31-4222-93c9-39590ce92d78', - }, - }, - { - type: 'recv', - name: 'authorization-state', - expect: { - method: 'signalwire.event', - 'params.event_type': 'signalwire.authorization.state', - 'params.params.authorization_state': /.+/, - }, - }, { type: 'send', name: 'invite', expect: { method: 'webrtc.verto', 'params.message.method': 'verto.invite', - 'params.message.params.dialogParams.callID': /.+/, 'params.message.params.dialogParams.destination_number': '/public/cf-e2e-test-room', - 'params.message.params.sdp': - /^(?=.*a=setup:actpass.*)(?=.*^m=audio.*)(?=.*^m=video.*)/ms, - }, - }, - { - type: 'recv', - name: 'conversation-call_started', - expect: { - method: 'signalwire.event', - 'params.event_type': 'conversation.message', - 'params.params.type': 'message', - 'params.params.kind': 'call_started', }, }, { @@ -105,22 +64,11 @@ test.describe('CallFabric Reconnections', () => { method: 'signalwire.event', 'params.event_type': 'webrtc.message', 'params.params.method': 'verto.answer', - 'params.params.params.sdp': - /^(?=.*a=setup:(?:active|passive))(?=.*^m=audio.*)(?=.*^m=video.*)/ms, }, }, { type: 'recv', - name: 'mediaParams', - expect: { - method: 'signalwire.event', - 'params.event_type': 'webrtc.message', - 'params.params.method': 'verto.mediaParams', - }, - }, - { - type: 'recv', - name: 'mediaParams', + name: 'callJoined', expect: { method: 'signalwire.event', 'params.event_type': 'call.joined', @@ -129,16 +77,12 @@ test.describe('CallFabric Reconnections', () => { ], }) - await expectMCUVisible(page) - // simulate ws page.resetWsTraffic() await page.evaluate(async () => { //@ts-ignore window._roomObj._closeWSConnection() - return new Promise((res) => { - setTimeout(() => res(null), 15000) - }) + return new Promise((ressolve) => setTimeout(ressolve, 15000)) }) await page.expectWsTraffic({ @@ -158,10 +102,14 @@ test.describe('CallFabric Reconnections', () => { expectNot: { method: 'webrtc.verto', 'params.message.method': 'verto.invite', - 'params.message.params.dialogParams.callID': /.+/, - 'params.message.params.dialogParams.destination_number': /^\/.+/, - 'params.message.params.sdp': - /^(?=.*a=setup:actpass.*)(?=.*^m=audio.*)(?=.*^m=video.*)/ms, + }, + }, + { + type: 'recv', + name: 'callJoined', + expectNot: { + method: 'signalwire.event', + 'params.event_type': 'call.joined', }, }, ], @@ -205,55 +153,14 @@ test.describe('CallFabric Reconnections', () => { await page.expectWsTraffic({ assertations: [ - { - type: 'send', - name: 'connect', - expect: { - method: 'signalwire.connect', - 'params.version.major': 4, - }, - }, - { - type: 'recv', - name: 'connect-response', - expect: { - 'result.authorization.jti': /.+/, - 'result.authorization.project_id': - 'cb1e91b6-ae04-4be0-89ae-0dffc5ea6aed', - 'result.authorization.fabric_subscriber.subscriber_id': - '48fe0d0c-ac31-4222-93c9-39590ce92d78', - }, - }, - { - type: 'recv', - name: 'authorization-state', - expect: { - method: 'signalwire.event', - 'params.event_type': 'signalwire.authorization.state', - 'params.params.authorization_state': /.+/, - }, - }, { type: 'send', name: 'invite', expect: { method: 'webrtc.verto', 'params.message.method': 'verto.invite', - 'params.message.params.dialogParams.callID': /.+/, 'params.message.params.dialogParams.destination_number': '/public/cf-e2e-test-room', - 'params.message.params.sdp': - /^(?=.*a=setup:actpass.*)(?=.*^m=audio.*)(?=.*^m=video.*)/ms, - }, - }, - { - type: 'recv', - name: 'conversation-call_started', - expect: { - method: 'signalwire.event', - 'params.event_type': 'conversation.message', - 'params.params.type': 'message', - 'params.params.kind': 'call_started', }, }, { @@ -271,22 +178,11 @@ test.describe('CallFabric Reconnections', () => { method: 'signalwire.event', 'params.event_type': 'webrtc.message', 'params.params.method': 'verto.answer', - 'params.params.params.sdp': - /^(?=.*a=setup:(?:active|passive))(?=.*^m=audio.*)(?=.*^m=video.*)/ms, }, }, { type: 'recv', - name: 'mediaParams', - expect: { - method: 'signalwire.event', - 'params.event_type': 'webrtc.message', - 'params.params.method': 'verto.mediaParams', - }, - }, - { - type: 'recv', - name: 'mediaParams', + name: 'callJoined', expect: { method: 'signalwire.event', 'params.event_type': 'call.joined', @@ -295,8 +191,6 @@ test.describe('CallFabric Reconnections', () => { ], }) - await expectMCUVisible(page) - page.resetWsTraffic() await page.swNetworkDown() await page.waitForTimeout(14_500) @@ -322,10 +216,14 @@ test.describe('CallFabric Reconnections', () => { expectNot: { method: 'webrtc.verto', 'params.message.method': 'verto.invite', - 'params.message.params.dialogParams.callID': /.+/, - 'params.message.params.dialogParams.destination_number': /^\/.+/, - 'params.message.params.sdp': - /^(?=.*a=setup:actpass.*)(?=.*^m=audio.*)(?=.*^m=video.*)/ms, + }, + }, + { + type: 'recv', + name: 'callJoined', + expectNot: { + method: 'signalwire.event', + 'params.event_type': 'call.joined', }, }, ], diff --git a/packages/js/src/fabric/FabricRoomSession.ts b/packages/js/src/fabric/FabricRoomSession.ts index 77648388a..f7ea78cac 100644 --- a/packages/js/src/fabric/FabricRoomSession.ts +++ b/packages/js/src/fabric/FabricRoomSession.ts @@ -181,7 +181,8 @@ export class FabricRoomSessionConnection `[resume] connectionState for ${this.id} is '${connectionState}'` ) if (['closed', 'failed', 'disconnected'].includes(connectionState)) { - this.resuming = true + // no need to resume only if SDK session lost the state + this.resuming = !this._self this.peer.restartIce() } } diff --git a/packages/webrtc/src/BaseConnection.ts b/packages/webrtc/src/BaseConnection.ts index 06f810cf7..18ee358a7 100644 --- a/packages/webrtc/src/BaseConnection.ts +++ b/packages/webrtc/src/BaseConnection.ts @@ -256,16 +256,12 @@ export class BaseConnection< pingSupported = true, } = this.options - const hasRemoteDescription = Boolean(this.peer?.instance?.remoteDescription) - // we should attach when resending an invite - const shouldAttach = attach || hasRemoteDescription - return { dialogParams: { id: rtcPeerId, destinationNumber, - attach: shouldAttach, - reattaching: shouldAttach, + attach: attach, + reattaching: attach, callerName, callerNumber, remoteCallerName, From 2fd2f123a25f23d411aa88250c1ce70376b98661 Mon Sep 17 00:00:00 2001 From: Joao Santos Date: Tue, 22 Apr 2025 14:29:46 -0300 Subject: [PATCH 04/18] changeset --- .changeset/twelve-worlds-mate.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/twelve-worlds-mate.md diff --git a/.changeset/twelve-worlds-mate.md b/.changeset/twelve-worlds-mate.md new file mode 100644 index 000000000..9c34d911b --- /dev/null +++ b/.changeset/twelve-worlds-mate.md @@ -0,0 +1,7 @@ +--- +'@sw-internal/e2e-js': patch +'@signalwire/webrtc': patch +'@signalwire/js': patch +--- + +Fix CF network re-connections From 8e7faa84d7ce5cc989dc6e3922bbbaf191de5d68 Mon Sep 17 00:00:00 2001 From: Joao Santos Date: Wed, 23 Apr 2025 09:29:08 -0300 Subject: [PATCH 05/18] review changes --- packages/js/src/fabric/FabricRoomSession.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/js/src/fabric/FabricRoomSession.ts b/packages/js/src/fabric/FabricRoomSession.ts index f7ea78cac..a009529b3 100644 --- a/packages/js/src/fabric/FabricRoomSession.ts +++ b/packages/js/src/fabric/FabricRoomSession.ts @@ -181,8 +181,8 @@ export class FabricRoomSessionConnection `[resume] connectionState for ${this.id} is '${connectionState}'` ) if (['closed', 'failed', 'disconnected'].includes(connectionState)) { - // no need to resume only if SDK session lost the state - this.resuming = !this._self + // should not resume when selfMember is defined (the SDK didn't lost its state since the `call.joined` was received) + this.resuming = !this.selfMember this.peer.restartIce() } } From 26a9a10d3346ccba7b9a8ac3b1340b2d87580503 Mon Sep 17 00:00:00 2001 From: Joao Santos Date: Thu, 24 Apr 2025 10:00:35 -0300 Subject: [PATCH 06/18] use dialAddress --- .../callfabric/websocket_reconnect.spec.ts | 53 ++++--------------- 1 file changed, 9 insertions(+), 44 deletions(-) diff --git a/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts b/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts index 388ae08f6..d64f6efe4 100644 --- a/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts +++ b/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts @@ -1,8 +1,8 @@ import { test } from '../../fixtures' -import { SERVER_URL, createCFClient, expectMCUVisible } from '../../utils' +import { SERVER_URL, createCFClient, dialAddress, expectMCUVisible } from '../../utils' test.describe('CallFabric Reconnections', () => { - test('Should reconnect the WebSocket as soon it gets onClose event, without media renegotiation', async ({ + test.skip('Should reconnect the WebSocket as soon it gets onClose event, without media renegotiation', async ({ createCustomPage, }) => { const page = await createCustomPage({ name: '[page]' }) @@ -15,27 +15,10 @@ test.describe('CallFabric Reconnections', () => { page.resetWsTraffic() // Dial an address and join a video room - await page.evaluate( - async ({ roomName }) => { - return new Promise(async (resolve, _reject) => { - const client = window._client! - - const call = await client.dial({ - to: `/public/${roomName}`, - rootElement: document.getElementById('rootElement'), - }) - - call.on('room.joined', resolve) - call.on('room.updated', () => {}) - - // @ts-expect-error - window._roomObj = call - - await call.start() - }) - }, - { roomName } - ) + await dialAddress(page, { + address: `/public/${roomName}?channel=video`, + }) + await page.expectWsTraffic({ assertations: [ @@ -129,27 +112,9 @@ test.describe('CallFabric Reconnections', () => { page.resetWsTraffic() // Dial an address and join a video room - await page.evaluate( - async ({ roomName }) => { - return new Promise(async (resolve, _reject) => { - const client = window._client! - - const call = await client.dial({ - to: `/public/${roomName}`, - rootElement: document.getElementById('rootElement'), - }) - - call.on('room.joined', resolve) - call.on('room.updated', () => {}) - - // @ts-expect-error - window._roomObj = call - - await call.start() - }) - }, - { roomName } - ) + await dialAddress(page, { + address: `/public/${roomName}?channel=video`, + }) await page.expectWsTraffic({ assertations: [ From d6a9bedabe659a44e16b6e2a65d3456a1ac57cef Mon Sep 17 00:00:00 2001 From: Joao Santos Date: Fri, 25 Apr 2025 07:43:33 -0300 Subject: [PATCH 07/18] manually tested --- .../callfabric/websocket_reconnect.spec.ts | 418 ++++++++++-------- packages/webrtc/src/BaseConnection.ts | 4 +- 2 files changed, 227 insertions(+), 195 deletions(-) diff --git a/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts b/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts index d64f6efe4..42f4006ff 100644 --- a/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts +++ b/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts @@ -1,197 +1,229 @@ import { test } from '../../fixtures' -import { SERVER_URL, createCFClient, dialAddress, expectMCUVisible } from '../../utils' + test.describe('CallFabric Reconnections', () => { - test.skip('Should reconnect the WebSocket as soon it gets onClose event, without media renegotiation', async ({ - createCustomPage, - }) => { - const page = await createCustomPage({ name: '[page]' }) - - await page.goto(SERVER_URL) - - const roomName = 'cf-e2e-test-room' - - await createCFClient(page) - - page.resetWsTraffic() - // Dial an address and join a video room - await dialAddress(page, { - address: `/public/${roomName}?channel=video`, - }) - - - await page.expectWsTraffic({ - assertations: [ - { - type: 'send', - name: 'invite', - expect: { - method: 'webrtc.verto', - 'params.message.method': 'verto.invite', - 'params.message.params.dialogParams.destination_number': - '/public/cf-e2e-test-room', - }, - }, - { - type: 'recv', - name: 'call-created', - expect: { - 'result.code': '200', - 'result.result.result.message': 'CALL CREATED', - }, - }, - { - type: 'recv', - name: 'verto-answer', - expect: { - method: 'signalwire.event', - 'params.event_type': 'webrtc.message', - 'params.params.method': 'verto.answer', - }, - }, - { - type: 'recv', - name: 'callJoined', - expect: { - method: 'signalwire.event', - 'params.event_type': 'call.joined', - }, - }, - ], - }) - - // simulate ws - page.resetWsTraffic() - await page.evaluate(async () => { - //@ts-ignore - window._roomObj._closeWSConnection() - return new Promise((ressolve) => setTimeout(ressolve, 15000)) - }) - - await page.expectWsTraffic({ - assertations: [ - { - type: 'send', - name: 'reconnect', - expect: { - method: 'signalwire.connect', - 'params.version.major': 4, - 'params.authorization_state': /.+/, - }, - }, - { - type: 'send', - name: 'invite', - expectNot: { - method: 'webrtc.verto', - 'params.message.method': 'verto.invite', - }, - }, - { - type: 'recv', - name: 'callJoined', - expectNot: { - method: 'signalwire.event', - 'params.event_type': 'call.joined', - }, - }, - ], - }) - }) - - test('Should reconnect the WebSocket when network is up (before FS timeout), without media renegotiation', async ({ - createCustomPage, - }) => { - const page = await createCustomPage({ name: '[page]' }) - - await page.goto(SERVER_URL) - - const roomName = 'cf-e2e-test-room' - - await createCFClient(page) - - page.resetWsTraffic() - // Dial an address and join a video room - await dialAddress(page, { - address: `/public/${roomName}?channel=video`, - }) - - await page.expectWsTraffic({ - assertations: [ - { - type: 'send', - name: 'invite', - expect: { - method: 'webrtc.verto', - 'params.message.method': 'verto.invite', - 'params.message.params.dialogParams.destination_number': - '/public/cf-e2e-test-room', - }, - }, - { - type: 'recv', - name: 'call-created', - expect: { - 'result.code': '200', - 'result.result.result.message': 'CALL CREATED', - }, - }, - { - type: 'recv', - name: 'verto-answer', - expect: { - method: 'signalwire.event', - 'params.event_type': 'webrtc.message', - 'params.params.method': 'verto.answer', - }, - }, - { - type: 'recv', - name: 'callJoined', - expect: { - method: 'signalwire.event', - 'params.event_type': 'call.joined', - }, - }, - ], - }) - - page.resetWsTraffic() - await page.swNetworkDown() - await page.waitForTimeout(14_500) - await page.swNetworkUp() - - //wait network traffic - await page.waitForTimeout(5000) - - await page.expectWsTraffic({ - assertations: [ - { - type: 'send', - name: 'reconnect', - expect: { - method: 'signalwire.connect', - 'params.version.major': 4, - 'params.authorization_state': /.+/, - }, - }, - { - type: 'send', - name: 'invite', - expectNot: { - method: 'webrtc.verto', - 'params.message.method': 'verto.invite', - }, - }, - { - type: 'recv', - name: 'callJoined', - expectNot: { - method: 'signalwire.event', - 'params.event_type': 'call.joined', - }, - }, - ], - }) - }) + // FIXME: page.swNetworkDown() isn't enough to simulate real test scenario + + // test('Should reconnect the WebSocket as soon it gets onClose event, without media renegotiation', async ({ + // createCustomPage, + // }) => { + // const page = await createCustomPage({ name: '[page]' }) + + // await page.goto(SERVER_URL) + + // const roomName = 'cf-e2e-test-room' + + // await createCFClient(page) + + // page.resetWsTraffic() + // // Dial an address and join a video room + // await dialAddress(page, { + // address: `/public/${roomName}?channel=video`, + // }) + + // await page.expectWsTraffic({ + // assertations: [ + // { + // type: 'send', + // name: 'invite', + // expect: { + // method: 'webrtc.verto', + // 'params.message.method': 'verto.invite', + // 'params.message.params.dialogParams.destination_number': + // '/public/cf-e2e-test-room', + // }, + // }, + // { + // type: 'recv', + // name: 'call-created', + // expect: { + // 'result.code': '200', + // 'result.result.result.message': 'CALL CREATED', + // }, + // }, + // { + // type: 'recv', + // name: 'verto-answer', + // expect: { + // method: 'signalwire.event', + // 'params.event_type': 'webrtc.message', + // 'params.params.method': 'verto.answer', + // }, + // }, + // { + // type: 'recv', + // name: 'callJoined', + // expect: { + // method: 'signalwire.event', + // 'params.event_type': 'call.joined', + // }, + // }, + // ], + // }) + + // // simulate ws + // page.resetWsTraffic() + // await page.evaluate(async () => { + // //@ts-ignore + // window._roomObj._closeWSConnection() + // return new Promise((ressolve) => setTimeout(ressolve, 15000)) + // }) + + // await page.expectWsTraffic({ + // assertations: [ + // { + // type: 'send', + // name: 'reconnect', + // expect: { + // method: 'signalwire.connect', + // 'params.version.major': 4, + // 'params.authorization_state': /.+/, + // }, + // }, + // { + // type: 'send', + // name: 'invite', + // expectNot: { + // method: 'webrtc.verto', + // 'params.message.method': 'verto.invite', + // }, + // }, + // { + // type: 'recv', + // name: 'callJoined', + // expectNot: { + // method: 'signalwire.event', + // 'params.event_type': 'call.joined', + // }, + // }, + // ], + // }) + // }) + + // test('Should reconnect the WebSocket when network is up (before FS timeout), without media renegotiation', async ({ + // createCustomPage, + // }) => { + // const page = await createCustomPage({ name: '[page]' }) + + // await page.goto(SERVER_URL) + + // const roomName = 'cf-e2e-test-room' + + // await createCFClient(page) + + // page.resetWsTraffic() + // // Dial an address and join a video room + // // TODO: make dialAddress work with expectWsTraffic + // // await dialAddress(page, { + // // address: `/public/${roomName}?channel=video`, + // // }) + + // await page.evaluate( + // async ({ roomName }) => { + // return new Promise(async (resolve, _reject) => { + // const client = window._client! + + // const call = await client.dial({ + // to: `/public/${roomName}`, + // rootElement: document.getElementById('rootElement'), + // }) + + // call.on('room.joined', resolve) + // call.on('room.updated', () => {}) + + // // @ts-expect-error + // window._roomObj = call + + // await call.start() + // }) + // }, + // { roomName } + // ) + + // await page.expectWsTraffic({ + // assertations: [ + // { + // type: 'send', + // name: 'invite', + // expect: { + // method: 'webrtc.verto', + // 'params.message.method': 'verto.invite', + // 'params.message.params.dialogParams.destination_number': + // '/public/cf-e2e-test-room', + // }, + // }, + // { + // type: 'recv', + // name: 'call-created', + // expect: { + // 'result.code': '200', + // 'result.result.result.message': 'CALL CREATED', + // }, + // }, + // { + // type: 'recv', + // name: 'verto-answer', + // expect: { + // method: 'signalwire.event', + // 'params.event_type': 'webrtc.message', + // 'params.params.method': 'verto.answer', + // }, + // }, + // { + // type: 'recv', + // name: 'callJoined', + // expect: { + // method: 'signalwire.event', + // 'params.event_type': 'call.joined', + // }, + // }, + // ], + // }) + + // page.resetWsTraffic() + // await page.swNetworkDown() + // await page.waitForTimeout(14500) + // await page.swNetworkUp() + + // //wait network traffic + // await page.waitForTimeout(1000) + + // await page.expectWsTraffic({ + // assertations: [ + // { + // type: 'send', + // name: 'reconnect', + // expect: { + // method: 'signalwire.connect', + // 'params.version.major': 4, + // 'params.authorization_state': /.+/, + // }, + // }, + // { + // type: 'send', + // name: 'invite', + // expectNot: { + // method: 'webrtc.verto', + // 'params.message.method': 'verto.invite', + // }, + // }, + // { + // type: 'send', + // name: 'invite', + // expectNot: { + // method: 'webrtc.verto', + // 'params.message.method': 'verto.modify', + // }, + // }, + // { + // type: 'recv', + // name: 'callJoined', + // expectNot: { + // method: 'signalwire.event', + // 'params.event_type': 'call.joined', + // }, + // }, + // ], + // }) + // }) }) diff --git a/packages/webrtc/src/BaseConnection.ts b/packages/webrtc/src/BaseConnection.ts index 18ee358a7..0dc4d4c8d 100644 --- a/packages/webrtc/src/BaseConnection.ts +++ b/packages/webrtc/src/BaseConnection.ts @@ -260,8 +260,8 @@ export class BaseConnection< dialogParams: { id: rtcPeerId, destinationNumber, - attach: attach, - reattaching: attach, + attach: attach || this.resuming, + reattaching: attach || this.resuming, callerName, callerNumber, remoteCallerName, From 2cea4c109d19c2bb3c2e63697df5905d44a23b84 Mon Sep 17 00:00:00 2001 From: Joao Santos Date: Fri, 9 May 2025 12:19:22 -0300 Subject: [PATCH 08/18] fix ws re-connections --- .changeset/floppy-schools-send.md | 8 + .../callfabric/websocket_reconnect.spec.ts | 230 +----------------- packages/core/src/BaseComponent.ts | 2 + packages/core/src/BaseSession.ts | 32 ++- packages/core/src/index.ts | 4 + packages/core/src/types/utils.ts | 4 + packages/core/src/utils/index.ts | 8 + packages/core/src/utils/interfaces.ts | 2 + .../core/src/workers/executeActionWorker.ts | 9 +- packages/js/src/fabric/SATSession.ts | 7 +- packages/js/src/fabric/WSClient.ts | 4 + packages/webrtc/src/BaseConnection.ts | 13 +- 12 files changed, 84 insertions(+), 239 deletions(-) create mode 100644 .changeset/floppy-schools-send.md diff --git a/.changeset/floppy-schools-send.md b/.changeset/floppy-schools-send.md new file mode 100644 index 000000000..bbbf54cbc --- /dev/null +++ b/.changeset/floppy-schools-send.md @@ -0,0 +1,8 @@ +--- +'@signalwire/webrtc': minor +'@signalwire/core': minor +'@signalwire/js': minor +'@sw-internal/e2e-js': patch +--- + +CHANGED improved the handling of WebSockets reconnections. diff --git a/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts b/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts index 42f4006ff..c0d94a2cd 100644 --- a/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts +++ b/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts @@ -1,229 +1 @@ -import { test } from '../../fixtures' - - -test.describe('CallFabric Reconnections', () => { - // FIXME: page.swNetworkDown() isn't enough to simulate real test scenario - - // test('Should reconnect the WebSocket as soon it gets onClose event, without media renegotiation', async ({ - // createCustomPage, - // }) => { - // const page = await createCustomPage({ name: '[page]' }) - - // await page.goto(SERVER_URL) - - // const roomName = 'cf-e2e-test-room' - - // await createCFClient(page) - - // page.resetWsTraffic() - // // Dial an address and join a video room - // await dialAddress(page, { - // address: `/public/${roomName}?channel=video`, - // }) - - // await page.expectWsTraffic({ - // assertations: [ - // { - // type: 'send', - // name: 'invite', - // expect: { - // method: 'webrtc.verto', - // 'params.message.method': 'verto.invite', - // 'params.message.params.dialogParams.destination_number': - // '/public/cf-e2e-test-room', - // }, - // }, - // { - // type: 'recv', - // name: 'call-created', - // expect: { - // 'result.code': '200', - // 'result.result.result.message': 'CALL CREATED', - // }, - // }, - // { - // type: 'recv', - // name: 'verto-answer', - // expect: { - // method: 'signalwire.event', - // 'params.event_type': 'webrtc.message', - // 'params.params.method': 'verto.answer', - // }, - // }, - // { - // type: 'recv', - // name: 'callJoined', - // expect: { - // method: 'signalwire.event', - // 'params.event_type': 'call.joined', - // }, - // }, - // ], - // }) - - // // simulate ws - // page.resetWsTraffic() - // await page.evaluate(async () => { - // //@ts-ignore - // window._roomObj._closeWSConnection() - // return new Promise((ressolve) => setTimeout(ressolve, 15000)) - // }) - - // await page.expectWsTraffic({ - // assertations: [ - // { - // type: 'send', - // name: 'reconnect', - // expect: { - // method: 'signalwire.connect', - // 'params.version.major': 4, - // 'params.authorization_state': /.+/, - // }, - // }, - // { - // type: 'send', - // name: 'invite', - // expectNot: { - // method: 'webrtc.verto', - // 'params.message.method': 'verto.invite', - // }, - // }, - // { - // type: 'recv', - // name: 'callJoined', - // expectNot: { - // method: 'signalwire.event', - // 'params.event_type': 'call.joined', - // }, - // }, - // ], - // }) - // }) - - // test('Should reconnect the WebSocket when network is up (before FS timeout), without media renegotiation', async ({ - // createCustomPage, - // }) => { - // const page = await createCustomPage({ name: '[page]' }) - - // await page.goto(SERVER_URL) - - // const roomName = 'cf-e2e-test-room' - - // await createCFClient(page) - - // page.resetWsTraffic() - // // Dial an address and join a video room - // // TODO: make dialAddress work with expectWsTraffic - // // await dialAddress(page, { - // // address: `/public/${roomName}?channel=video`, - // // }) - - // await page.evaluate( - // async ({ roomName }) => { - // return new Promise(async (resolve, _reject) => { - // const client = window._client! - - // const call = await client.dial({ - // to: `/public/${roomName}`, - // rootElement: document.getElementById('rootElement'), - // }) - - // call.on('room.joined', resolve) - // call.on('room.updated', () => {}) - - // // @ts-expect-error - // window._roomObj = call - - // await call.start() - // }) - // }, - // { roomName } - // ) - - // await page.expectWsTraffic({ - // assertations: [ - // { - // type: 'send', - // name: 'invite', - // expect: { - // method: 'webrtc.verto', - // 'params.message.method': 'verto.invite', - // 'params.message.params.dialogParams.destination_number': - // '/public/cf-e2e-test-room', - // }, - // }, - // { - // type: 'recv', - // name: 'call-created', - // expect: { - // 'result.code': '200', - // 'result.result.result.message': 'CALL CREATED', - // }, - // }, - // { - // type: 'recv', - // name: 'verto-answer', - // expect: { - // method: 'signalwire.event', - // 'params.event_type': 'webrtc.message', - // 'params.params.method': 'verto.answer', - // }, - // }, - // { - // type: 'recv', - // name: 'callJoined', - // expect: { - // method: 'signalwire.event', - // 'params.event_type': 'call.joined', - // }, - // }, - // ], - // }) - - // page.resetWsTraffic() - // await page.swNetworkDown() - // await page.waitForTimeout(14500) - // await page.swNetworkUp() - - // //wait network traffic - // await page.waitForTimeout(1000) - - // await page.expectWsTraffic({ - // assertations: [ - // { - // type: 'send', - // name: 'reconnect', - // expect: { - // method: 'signalwire.connect', - // 'params.version.major': 4, - // 'params.authorization_state': /.+/, - // }, - // }, - // { - // type: 'send', - // name: 'invite', - // expectNot: { - // method: 'webrtc.verto', - // 'params.message.method': 'verto.invite', - // }, - // }, - // { - // type: 'send', - // name: 'invite', - // expectNot: { - // method: 'webrtc.verto', - // 'params.message.method': 'verto.modify', - // }, - // }, - // { - // type: 'recv', - // name: 'callJoined', - // expectNot: { - // method: 'signalwire.event', - // 'params.event_type': 'call.joined', - // }, - // }, - // ], - // }) - // }) -}) +// TODO needs cloud-product/issues/14634 diff --git a/packages/core/src/BaseComponent.ts b/packages/core/src/BaseComponent.ts index b108a2255..18982b2a0 100644 --- a/packages/core/src/BaseComponent.ts +++ b/packages/core/src/BaseComponent.ts @@ -181,6 +181,7 @@ export class BaseComponent< transformParams = identity, transformResolve = identity, transformReject = identity, + expectAuthStateUpdate }: ExecuteExtendedOptions = { transformParams: identity, transformResolve: identity, @@ -199,6 +200,7 @@ export class BaseComponent< componentId: this.__uuid, method, params: transformParams(params as ParamsType), + options: { expectAuthStateUpdate } }, }) }) diff --git a/packages/core/src/BaseSession.ts b/packages/core/src/BaseSession.ts index adb20fe37..8e2384ecd 100644 --- a/packages/core/src/BaseSession.ts +++ b/packages/core/src/BaseSession.ts @@ -7,6 +7,8 @@ import { safeParseJson, isJSONRPCResponse, SWCloseEvent, + isAuthStateEvent, + isConnectRequest, } from './utils' import { DEFAULT_HOST, WebSocketState } from './utils/constants' import { @@ -39,7 +41,7 @@ import { sessionReconnectingAction, } from './redux/actions' import { sessionActions } from './redux/features/session/sessionSlice' -import { SwAuthorizationState } from '.' +import { ExecuteOptions, SwAuthorizationState } from '.' import { SessionChannel, SessionChannelAction } from './redux/interfaces' export const SW_SYMBOL = Symbol('BaseSession') @@ -65,6 +67,7 @@ export class BaseSession { protected _rpcConnectResult: RPCConnectResult private _requests = new Map() + private _waitingAuthStateUpdate: string | null = null private _socket: WebSocketClient | null = null private _host: string = DEFAULT_HOST @@ -73,6 +76,7 @@ export class BaseSession { private _executeQueue: Set = new Set() private _swConnectError = Symbol.for('sw-connect-error') private _executeConnectionClosed = Symbol.for('sw-execute-connection-closed') + private _unsyncedAuthState = Symbol.for('sw-authorization_sate-unsynced') private _checkPingDelay = 15 * 1000 private _checkPingTimer: any = null @@ -286,7 +290,10 @@ export class BaseSession { * Send a JSON object to the server. * @return Promise that will resolve/reject depending on the server response */ - execute(msg: JSONRPCRequest | JSONRPCResponse): Promise { + execute( + msg: JSONRPCRequest | JSONRPCResponse, + options?: ExecuteOptions + ): Promise { if (this._status === 'disconnecting') { this.logger.warn( 'Reject request because the session is disconnecting', @@ -308,6 +315,9 @@ export class BaseSession { if ('params' in msg) { // This is a request so save the "id" to resolve the Promise later promise = new Promise((resolve, reject) => { + this._waitingAuthStateUpdate = options?.expectAuthStateChange + ? msg.id + : null this._requests.set(msg.id, { rpcRequest: msg, resolve, reject }) }) } @@ -329,7 +339,7 @@ export class BaseSession { if (error === this._executeConnectionClosed) { throw this._executeConnectionClosed } else if (error === this._executeTimeoutError) { - if ('method' in msg && msg.method === 'signalwire.connect') { + if (isConnectRequest(msg)) { throw this._swConnectError } this._checkCurrentStatus() @@ -451,6 +461,12 @@ export class BaseSession { const payload = this.decode(event.data) this.logger.wsTraffic({ type: 'recv', payload }) + if (this._waitingAuthStateUpdate && !isAuthStateEvent(payload)) { + // we lost a `signalwire.authorization.state` probably in a WS reconnection + // the server should always send before any other msg + return this._rejectRequestExpectingAuthStateUpdate() + } + if (isJSONRPCResponse(payload)) { const request = this._requests.get(payload.id) if (request) { @@ -495,6 +511,16 @@ export class BaseSession { } } + // stops the RTC negotiation to continue if the client authorization_state is not synced + private _rejectRequestExpectingAuthStateUpdate() { + if (this._waitingAuthStateUpdate) { + const request = this._requests.get(this._waitingAuthStateUpdate) + this._requests.delete(this._waitingAuthStateUpdate) + this._waitingAuthStateUpdate = null + return request?.reject(this._unsyncedAuthState) + } + } + public dispatch(_payload: SessionChannelAction) { if (!this._sessionChannel) { throw new Error('Session channel does not exist') diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index b4585d2f2..97ccaa9d9 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -24,6 +24,8 @@ import { increasingDelay, decreasingDelay, constDelay, + isAuthStateEvent, + isConnectRequest, } from './utils' import { WEBRTC_EVENT_TYPES, isWebrtcEventType } from './utils/common' import { BaseSession } from './BaseSession' @@ -74,6 +76,8 @@ export { isSATAuth, isJSONRPCRequest, isJSONRPCResponse, + isAuthStateEvent, + isConnectRequest, LOCAL_EVENT_PREFIX, stripNamespacePrefix, asyncRetry, diff --git a/packages/core/src/types/utils.ts b/packages/core/src/types/utils.ts index 925312e64..818f0cadd 100644 --- a/packages/core/src/types/utils.ts +++ b/packages/core/src/types/utils.ts @@ -88,6 +88,10 @@ export interface MemberCommandWithValueParams extends MemberCommandParams { value: number } +export interface ExecuteOptions { + expectAuthStateChange: boolean +} + type IsAny = 0 extends 1 & T ? true : false type IsUnknown = IsAny extends true ? false diff --git a/packages/core/src/utils/index.ts b/packages/core/src/utils/index.ts index fbe194e95..69bcc3664 100644 --- a/packages/core/src/utils/index.ts +++ b/packages/core/src/utils/index.ts @@ -227,3 +227,11 @@ export const isJSONRPCResponse = ( export const isSATAuth = (e?: Authorization): e is SATAuthorization => { return typeof e !== 'undefined' && 'jti' in e } + +export const isAuthStateEvent = (e: JSONRPCRequest | JSONRPCResponse) => + isJSONRPCRequest(e) && + e.method === 'signalwire.event' && + e.params?.event_type === 'signalwire.authorization.state' + +export const isConnectRequest = (e: JSONRPCRequest | JSONRPCResponse) => + isJSONRPCRequest(e) && e.method == 'signalwire.connect' diff --git a/packages/core/src/utils/interfaces.ts b/packages/core/src/utils/interfaces.ts index e2c3be60f..5393ed0c0 100644 --- a/packages/core/src/utils/interfaces.ts +++ b/packages/core/src/utils/interfaces.ts @@ -406,6 +406,8 @@ export interface ExecuteExtendedOptions { transformReject?: ExecuteTransform /** To transform the RPC execute params */ transformParams?: ExecuteTransform + + expectAuthStateUpdate?: boolean } export type ExecuteTransform = ( diff --git a/packages/core/src/workers/executeActionWorker.ts b/packages/core/src/workers/executeActionWorker.ts index c0c49c800..e59716026 100644 --- a/packages/core/src/workers/executeActionWorker.ts +++ b/packages/core/src/workers/executeActionWorker.ts @@ -13,7 +13,12 @@ export const executeActionWorker: SDKWorker = function* ( ): SagaIterator { const { initialState, onDone, onFail, getSession } = options - const { requestId: id, method, params } = initialState + const { + requestId: id, + method, + params, + options: executeOptions, + } = initialState const session = getSession() @@ -27,7 +32,7 @@ export const executeActionWorker: SDKWorker = function* ( try { let message = RPCExecute({ id, method, params }) - const response = yield call(session.execute, message) + const response = yield call(session.execute, message, executeOptions) onDone?.(response) } catch (error) { getLogger().warn('Execute error: ', error) diff --git a/packages/js/src/fabric/SATSession.ts b/packages/js/src/fabric/SATSession.ts index bb527c7e7..84763df79 100644 --- a/packages/js/src/fabric/SATSession.ts +++ b/packages/js/src/fabric/SATSession.ts @@ -7,6 +7,8 @@ import { RPCReauthenticateParams, SATAuthorization, UNIFIED_CONNECT_VERSION, + isConnectRequest, + getLogger, } from '@signalwire/core' import { JWTSession } from '../JWTSession' import { SATSessionOptions } from './interfaces' @@ -78,8 +80,9 @@ export class SATSession extends JWTSession { variation: this.options.apiRequestRetriesDelayIncrement, }), expectedErrorHandler: (error) => { - if (error?.message?.startsWith('Authentication failed')) { - // is expected to be handle by the app developer, skipping retries + getLogger().warn(error) + if (isConnectRequest(msg)) { + // `signalwire.connect` retries are handle by the connection return true } return false diff --git a/packages/js/src/fabric/WSClient.ts b/packages/js/src/fabric/WSClient.ts index 9e9bd4788..a00cd01c0 100644 --- a/packages/js/src/fabric/WSClient.ts +++ b/packages/js/src/fabric/WSClient.ts @@ -25,6 +25,8 @@ import { IncomingCallManager } from './IncomingCallManager' import { wsClientWorker } from './workers' import { createWSClient } from './createWSClient' import { WSClientContract } from './interfaces/wsClient' +import { getStorage } from '../utils/storage' +import { PREVIOUS_CALLID_STORAGE_KEY } from './utils/constants' export class WSClient extends BaseClient<{}> implements WSClientContract { private _incomingCallManager: IncomingCallManager @@ -304,6 +306,8 @@ export class WSClient extends BaseClient<{}> implements WSClientContract { public async dial(params: DialParams) { return new Promise(async (resolve, reject) => { try { + // in case the user left the previous call with hangup, and is not reattaching + getStorage()?.removeItem(PREVIOUS_CALLID_STORAGE_KEY) const call = this.buildOutboundCall(params) resolve(call) } catch (error) { diff --git a/packages/webrtc/src/BaseConnection.ts b/packages/webrtc/src/BaseConnection.ts index 0dc4d4c8d..c1653f121 100644 --- a/packages/webrtc/src/BaseConnection.ts +++ b/packages/webrtc/src/BaseConnection.ts @@ -21,7 +21,11 @@ import { UpdateMediaParams, UpdateMediaDirection, } from '@signalwire/core' -import type { ReduxComponent, VertoModifyResponse } from '@signalwire/core' +import type { + ExecuteOptions, + ReduxComponent, + VertoModifyResponse, +} from '@signalwire/core' import RTCPeer from './RTCPeer' import { ConnectionOptions, @@ -260,8 +264,8 @@ export class BaseConnection< dialogParams: { id: rtcPeerId, destinationNumber, - attach: attach || this.resuming, - reattaching: attach || this.resuming, + attach: attach, + reattaching: attach, callerName, callerNumber, remoteCallerName, @@ -344,6 +348,7 @@ export class BaseConnection< message: JSONRPCRequest callID?: string node_id?: string + options?: ExecuteOptions subscribe?: EventEmitter.EventNames[] }) { return this.execute({ @@ -896,6 +901,7 @@ export class BaseConnection< callID: rtcPeerId, node_id: nodeId ?? this.options.nodeId, subscribe, + options: { expectAuthStateChange: true }, }) this.logger.debug('Invite response', response) @@ -925,6 +931,7 @@ export class BaseConnection< callID: rtcPeerId, node_id: nodeId ?? this.options.nodeId, subscribe: this.getSubscriptions(), + options: { expectAuthStateChange: true }, }) this.logger.debug('Answer response', response) From 2982f35207a36eaced937598fda882a78c910e7d Mon Sep 17 00:00:00 2001 From: Joao Santos Date: Mon, 12 May 2025 11:26:01 -0300 Subject: [PATCH 09/18] revert expect auth)state --- packages/core/src/BaseComponent.ts | 2 -- packages/core/src/BaseSession.ts | 29 ++------------------------- packages/core/src/index.ts | 2 -- packages/core/src/types/utils.ts | 4 ---- packages/core/src/utils/index.ts | 5 ----- packages/core/src/utils/interfaces.ts | 2 -- packages/webrtc/src/BaseConnection.ts | 9 +-------- 7 files changed, 3 insertions(+), 50 deletions(-) diff --git a/packages/core/src/BaseComponent.ts b/packages/core/src/BaseComponent.ts index 18982b2a0..b108a2255 100644 --- a/packages/core/src/BaseComponent.ts +++ b/packages/core/src/BaseComponent.ts @@ -181,7 +181,6 @@ export class BaseComponent< transformParams = identity, transformResolve = identity, transformReject = identity, - expectAuthStateUpdate }: ExecuteExtendedOptions = { transformParams: identity, transformResolve: identity, @@ -200,7 +199,6 @@ export class BaseComponent< componentId: this.__uuid, method, params: transformParams(params as ParamsType), - options: { expectAuthStateUpdate } }, }) }) diff --git a/packages/core/src/BaseSession.ts b/packages/core/src/BaseSession.ts index 8e2384ecd..d5831c2ba 100644 --- a/packages/core/src/BaseSession.ts +++ b/packages/core/src/BaseSession.ts @@ -7,7 +7,6 @@ import { safeParseJson, isJSONRPCResponse, SWCloseEvent, - isAuthStateEvent, isConnectRequest, } from './utils' import { DEFAULT_HOST, WebSocketState } from './utils/constants' @@ -41,7 +40,7 @@ import { sessionReconnectingAction, } from './redux/actions' import { sessionActions } from './redux/features/session/sessionSlice' -import { ExecuteOptions, SwAuthorizationState } from '.' +import { SwAuthorizationState } from '.' import { SessionChannel, SessionChannelAction } from './redux/interfaces' export const SW_SYMBOL = Symbol('BaseSession') @@ -67,7 +66,6 @@ export class BaseSession { protected _rpcConnectResult: RPCConnectResult private _requests = new Map() - private _waitingAuthStateUpdate: string | null = null private _socket: WebSocketClient | null = null private _host: string = DEFAULT_HOST @@ -76,7 +74,6 @@ export class BaseSession { private _executeQueue: Set = new Set() private _swConnectError = Symbol.for('sw-connect-error') private _executeConnectionClosed = Symbol.for('sw-execute-connection-closed') - private _unsyncedAuthState = Symbol.for('sw-authorization_sate-unsynced') private _checkPingDelay = 15 * 1000 private _checkPingTimer: any = null @@ -290,10 +287,7 @@ export class BaseSession { * Send a JSON object to the server. * @return Promise that will resolve/reject depending on the server response */ - execute( - msg: JSONRPCRequest | JSONRPCResponse, - options?: ExecuteOptions - ): Promise { + execute(msg: JSONRPCRequest | JSONRPCResponse): Promise { if (this._status === 'disconnecting') { this.logger.warn( 'Reject request because the session is disconnecting', @@ -315,9 +309,6 @@ export class BaseSession { if ('params' in msg) { // This is a request so save the "id" to resolve the Promise later promise = new Promise((resolve, reject) => { - this._waitingAuthStateUpdate = options?.expectAuthStateChange - ? msg.id - : null this._requests.set(msg.id, { rpcRequest: msg, resolve, reject }) }) } @@ -461,12 +452,6 @@ export class BaseSession { const payload = this.decode(event.data) this.logger.wsTraffic({ type: 'recv', payload }) - if (this._waitingAuthStateUpdate && !isAuthStateEvent(payload)) { - // we lost a `signalwire.authorization.state` probably in a WS reconnection - // the server should always send before any other msg - return this._rejectRequestExpectingAuthStateUpdate() - } - if (isJSONRPCResponse(payload)) { const request = this._requests.get(payload.id) if (request) { @@ -511,16 +496,6 @@ export class BaseSession { } } - // stops the RTC negotiation to continue if the client authorization_state is not synced - private _rejectRequestExpectingAuthStateUpdate() { - if (this._waitingAuthStateUpdate) { - const request = this._requests.get(this._waitingAuthStateUpdate) - this._requests.delete(this._waitingAuthStateUpdate) - this._waitingAuthStateUpdate = null - return request?.reject(this._unsyncedAuthState) - } - } - public dispatch(_payload: SessionChannelAction) { if (!this._sessionChannel) { throw new Error('Session channel does not exist') diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 97ccaa9d9..b74f3b027 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -24,7 +24,6 @@ import { increasingDelay, decreasingDelay, constDelay, - isAuthStateEvent, isConnectRequest, } from './utils' import { WEBRTC_EVENT_TYPES, isWebrtcEventType } from './utils/common' @@ -76,7 +75,6 @@ export { isSATAuth, isJSONRPCRequest, isJSONRPCResponse, - isAuthStateEvent, isConnectRequest, LOCAL_EVENT_PREFIX, stripNamespacePrefix, diff --git a/packages/core/src/types/utils.ts b/packages/core/src/types/utils.ts index 818f0cadd..925312e64 100644 --- a/packages/core/src/types/utils.ts +++ b/packages/core/src/types/utils.ts @@ -88,10 +88,6 @@ export interface MemberCommandWithValueParams extends MemberCommandParams { value: number } -export interface ExecuteOptions { - expectAuthStateChange: boolean -} - type IsAny = 0 extends 1 & T ? true : false type IsUnknown = IsAny extends true ? false diff --git a/packages/core/src/utils/index.ts b/packages/core/src/utils/index.ts index 69bcc3664..f0773bca7 100644 --- a/packages/core/src/utils/index.ts +++ b/packages/core/src/utils/index.ts @@ -228,10 +228,5 @@ export const isSATAuth = (e?: Authorization): e is SATAuthorization => { return typeof e !== 'undefined' && 'jti' in e } -export const isAuthStateEvent = (e: JSONRPCRequest | JSONRPCResponse) => - isJSONRPCRequest(e) && - e.method === 'signalwire.event' && - e.params?.event_type === 'signalwire.authorization.state' - export const isConnectRequest = (e: JSONRPCRequest | JSONRPCResponse) => isJSONRPCRequest(e) && e.method == 'signalwire.connect' diff --git a/packages/core/src/utils/interfaces.ts b/packages/core/src/utils/interfaces.ts index 5393ed0c0..e2c3be60f 100644 --- a/packages/core/src/utils/interfaces.ts +++ b/packages/core/src/utils/interfaces.ts @@ -406,8 +406,6 @@ export interface ExecuteExtendedOptions { transformReject?: ExecuteTransform /** To transform the RPC execute params */ transformParams?: ExecuteTransform - - expectAuthStateUpdate?: boolean } export type ExecuteTransform = ( diff --git a/packages/webrtc/src/BaseConnection.ts b/packages/webrtc/src/BaseConnection.ts index c1653f121..18ee358a7 100644 --- a/packages/webrtc/src/BaseConnection.ts +++ b/packages/webrtc/src/BaseConnection.ts @@ -21,11 +21,7 @@ import { UpdateMediaParams, UpdateMediaDirection, } from '@signalwire/core' -import type { - ExecuteOptions, - ReduxComponent, - VertoModifyResponse, -} from '@signalwire/core' +import type { ReduxComponent, VertoModifyResponse } from '@signalwire/core' import RTCPeer from './RTCPeer' import { ConnectionOptions, @@ -348,7 +344,6 @@ export class BaseConnection< message: JSONRPCRequest callID?: string node_id?: string - options?: ExecuteOptions subscribe?: EventEmitter.EventNames[] }) { return this.execute({ @@ -901,7 +896,6 @@ export class BaseConnection< callID: rtcPeerId, node_id: nodeId ?? this.options.nodeId, subscribe, - options: { expectAuthStateChange: true }, }) this.logger.debug('Invite response', response) @@ -931,7 +925,6 @@ export class BaseConnection< callID: rtcPeerId, node_id: nodeId ?? this.options.nodeId, subscribe: this.getSubscriptions(), - options: { expectAuthStateChange: true }, }) this.logger.debug('Answer response', response) From f54967060608f42df1c606c976f2594adc44122a Mon Sep 17 00:00:00 2001 From: Joao Santos Date: Fri, 16 May 2025 13:25:59 -0300 Subject: [PATCH 10/18] outbound calls reconnects --- packages/core/src/BaseComponent.ts | 10 ++ packages/core/src/BaseSession.ts | 27 ++++- packages/core/src/index.ts | 10 ++ packages/core/src/utils/constants.ts | 6 + packages/core/src/utils/index.ts | 5 + .../core/src/workers/executeActionWorker.ts | 9 +- packages/js/src/JWTSession.ts | 2 +- packages/js/src/fabric/SATSession.ts | 12 +- packages/webrtc/src/BaseConnection.ts | 103 ++++++++++++++--- .../webrtc/src/workers/promoteDemoteWorker.ts | 104 +++++++++--------- .../src/workers/roomSubscribedWorker.ts | 73 ++++++------ .../webrtc/src/workers/vertoEventWorker.ts | 27 +++-- 12 files changed, 262 insertions(+), 126 deletions(-) diff --git a/packages/core/src/BaseComponent.ts b/packages/core/src/BaseComponent.ts index b108a2255..bd07b08a1 100644 --- a/packages/core/src/BaseComponent.ts +++ b/packages/core/src/BaseComponent.ts @@ -314,6 +314,16 @@ export class BaseComponent< return this._attachWorker(name, def) } + public cancelWorker(workerTask: Task) { + const foundTaskIndex = this._runningWorkers.findIndex( + (task) => task === workerTask + ) + if (foundTaskIndex > -1) { + this._runningWorkers.splice(foundTaskIndex, 1) + workerTask.cancel() + } + } + private _setWorker( name: string, def: SDKWorkerDefinition diff --git a/packages/core/src/BaseSession.ts b/packages/core/src/BaseSession.ts index d5831c2ba..a77aa5d81 100644 --- a/packages/core/src/BaseSession.ts +++ b/packages/core/src/BaseSession.ts @@ -9,7 +9,13 @@ import { SWCloseEvent, isConnectRequest, } from './utils' -import { DEFAULT_HOST, WebSocketState } from './utils/constants' +import { + DEFAULT_HOST, + SYMBOL_CONNECT_ERROR, + SYMBOL_EXECUTE_CONNECTION_CLOSED, + SYMBOL_EXECUTE_TIMEOUT, + WebSocketState, +} from './utils/constants' import { RPCConnect, RPCConnectParams, @@ -70,15 +76,16 @@ export class BaseSession { private _host: string = DEFAULT_HOST private _executeTimeoutMs = 10 * 1000 - private _executeTimeoutError = Symbol.for('sw-execute-timeout') + private _executeTimeoutError = SYMBOL_EXECUTE_TIMEOUT private _executeQueue: Set = new Set() - private _swConnectError = Symbol.for('sw-connect-error') - private _executeConnectionClosed = Symbol.for('sw-execute-connection-closed') + private _swConnectError = SYMBOL_CONNECT_ERROR + private _executeConnectionClosed = SYMBOL_EXECUTE_CONNECTION_CLOSED private _checkPingDelay = 15 * 1000 private _checkPingTimer: any = null private _reconnectTimer: ReturnType private _status: SessionStatus = 'unknown' + private _resolveWaitConnected: null | (() => void) = null private _sessionChannel: SessionChannel private wsOpenHandler: (event: Event) => void private wsCloseHandler: (event: SWCloseEvent) => void @@ -182,6 +189,16 @@ export class BaseSession { return !Boolean(this.idle || !this.connected) } + protected async _waitConnected() { + return new Promise((resolve) => { + if (this.connected) { + resolve() + } else { + this._resolveWaitConnected = resolve + } + }) + } + set token(token: string) { this.options.token = token } @@ -401,6 +418,7 @@ export class BaseSession { this._clearTimers() await this.authenticate() this._status = 'connected' + this._resolveWaitConnected?.() this._flushExecuteQueue() this.dispatch(authSuccessAction()) } catch (error) { @@ -446,6 +464,7 @@ export class BaseSession { this._requests.forEach(({ reject }) => { reject(this._executeConnectionClosed) }) + this._requests.clear() } protected _onSocketMessage(event: MessageEvent) { diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index b74f3b027..2eeab72b4 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -25,7 +25,13 @@ import { decreasingDelay, constDelay, isConnectRequest, + isVertoInvite, } from './utils' +import { + SYMBOL_CONNECT_ERROR, + SYMBOL_EXECUTE_CONNECTION_CLOSED, + SYMBOL_EXECUTE_TIMEOUT, +} from './utils/constants' import { WEBRTC_EVENT_TYPES, isWebrtcEventType } from './utils/common' import { BaseSession } from './BaseSession' import { BaseJWTSession } from './BaseJWTSession' @@ -76,12 +82,16 @@ export { isJSONRPCRequest, isJSONRPCResponse, isConnectRequest, + isVertoInvite, LOCAL_EVENT_PREFIX, stripNamespacePrefix, asyncRetry, increasingDelay, decreasingDelay, constDelay, + SYMBOL_CONNECT_ERROR, + SYMBOL_EXECUTE_CONNECTION_CLOSED, + SYMBOL_EXECUTE_TIMEOUT, } export * from './redux/features/component/componentSlice' diff --git a/packages/core/src/utils/constants.ts b/packages/core/src/utils/constants.ts index 1ad502064..a95e618c4 100644 --- a/packages/core/src/utils/constants.ts +++ b/packages/core/src/utils/constants.ts @@ -56,3 +56,9 @@ export const PRODUCT_PREFIXES = [ export const INTERNAL_GLOBAL_VIDEO_EVENTS = GLOBAL_VIDEO_EVENTS.map( (event) => `${PRODUCT_PREFIX_VIDEO}.${event}` as const ) + +export const SYMBOL_EXECUTE_CONNECTION_CLOSED = Symbol.for( + 'sw-execute-connection-closed' +) +export const SYMBOL_EXECUTE_TIMEOUT = Symbol.for('sw-execute-timeout') +export const SYMBOL_CONNECT_ERROR = Symbol.for('sw-connect-error') diff --git a/packages/core/src/utils/index.ts b/packages/core/src/utils/index.ts index f0773bca7..ab390acf0 100644 --- a/packages/core/src/utils/index.ts +++ b/packages/core/src/utils/index.ts @@ -230,3 +230,8 @@ export const isSATAuth = (e?: Authorization): e is SATAuthorization => { export const isConnectRequest = (e: JSONRPCRequest | JSONRPCResponse) => isJSONRPCRequest(e) && e.method == 'signalwire.connect' + +export const isVertoInvite = (e: JSONRPCRequest | JSONRPCResponse) => + isJSONRPCRequest(e) && + e.method == 'webrtc.verto' && + e.params?.message.method === 'verto.invite' diff --git a/packages/core/src/workers/executeActionWorker.ts b/packages/core/src/workers/executeActionWorker.ts index e59716026..c0c49c800 100644 --- a/packages/core/src/workers/executeActionWorker.ts +++ b/packages/core/src/workers/executeActionWorker.ts @@ -13,12 +13,7 @@ export const executeActionWorker: SDKWorker = function* ( ): SagaIterator { const { initialState, onDone, onFail, getSession } = options - const { - requestId: id, - method, - params, - options: executeOptions, - } = initialState + const { requestId: id, method, params } = initialState const session = getSession() @@ -32,7 +27,7 @@ export const executeActionWorker: SDKWorker = function* ( try { let message = RPCExecute({ id, method, params }) - const response = yield call(session.execute, message, executeOptions) + const response = yield call(session.execute, message) onDone?.(response) } catch (error) { getLogger().warn('Execute error: ', error) diff --git a/packages/js/src/JWTSession.ts b/packages/js/src/JWTSession.ts index 72e8d8e90..13b130486 100644 --- a/packages/js/src/JWTSession.ts +++ b/packages/js/src/JWTSession.ts @@ -90,7 +90,7 @@ export class JWTSession extends BaseJWTSession { } protected override _onSocketClose(event: SWCloseEvent) { - if (this.status === 'unknown' || this.status === 'disconnected') { + if (this.status === 'disconnected') { const { protocolKey, authStateKey, callIdKey } = sessionStorageManager( this.options.token ) diff --git a/packages/js/src/fabric/SATSession.ts b/packages/js/src/fabric/SATSession.ts index 84763df79..fd0b65297 100644 --- a/packages/js/src/fabric/SATSession.ts +++ b/packages/js/src/fabric/SATSession.ts @@ -9,9 +9,11 @@ import { UNIFIED_CONNECT_VERSION, isConnectRequest, getLogger, + isVertoInvite, } from '@signalwire/core' import { JWTSession } from '../JWTSession' import { SATSessionOptions } from './interfaces' +import { SYMBOL_EXECUTE_CONNECTION_CLOSED } from 'packages/core/src/utils/constants' /** * SAT Session is for the Call Fabric SDK @@ -73,7 +75,10 @@ export class SATSession extends JWTSession { override async execute(msg: JSONRPCRequest | JSONRPCResponse): Promise { return asyncRetry({ - asyncCallable: () => super.execute(msg), + asyncCallable: async () => { + await this._waitConnected() // avoid queuing a retry + return super.execute(msg) + }, maxRetries: this.options.maxApiRequestRetries, delayFn: increasingDelay({ initialDelay: this.options.apiRequestRetriesDelay, @@ -85,6 +90,11 @@ export class SATSession extends JWTSession { // `signalwire.connect` retries are handle by the connection return true } + if (isVertoInvite(msg) && error === SYMBOL_EXECUTE_CONNECTION_CLOSED) { + // we can't retry verto.invites in the transport layer + getLogger().debug('skip verto.invite retry on error:', error) + return true + } return false }, }) diff --git a/packages/webrtc/src/BaseConnection.ts b/packages/webrtc/src/BaseConnection.ts index 18ee358a7..fe572bb0b 100644 --- a/packages/webrtc/src/BaseConnection.ts +++ b/packages/webrtc/src/BaseConnection.ts @@ -20,6 +20,9 @@ import { VertoAnswer, UpdateMediaParams, UpdateMediaDirection, + asyncRetry, + constDelay, + SYMBOL_EXECUTE_CONNECTION_CLOSED, } from '@signalwire/core' import type { ReduxComponent, VertoModifyResponse } from '@signalwire/core' import RTCPeer from './RTCPeer' @@ -96,6 +99,17 @@ export class BaseConnection< return this.state === 'active' } + get newish() { + return this.state === 'new' + } + get requesting() { + return this.state === 'requesting' + } + + get destroysh() { + return this.state === 'destroy' + } + get trying() { return this.state === 'trying' } @@ -275,6 +289,8 @@ export class BaseConnection< } } + private _myWorkers: Task[] = [] + getRTCPeerById(rtcPeerId: string) { return this.rtcPeerMap.get(rtcPeerId) } @@ -283,6 +299,10 @@ export class BaseConnection< return this.rtcPeerMap.set(rtcPeer.uuid, rtcPeer) } + removeRTCPeer(rtcPeerId: string) { + return this.rtcPeerMap.delete(rtcPeerId) + } + setActiveRTCPeer(rtcPeerId: string) { this.peer = this.getRTCPeerById(rtcPeerId) } @@ -739,38 +759,74 @@ export class BaseConnection< } runRTCPeerWorkers(rtcPeerId: string) { - this.runWorker('vertoEventWorker', { + const vertoWorker = this.runWorker('vertoEventWorker', { worker: workers.vertoEventWorker, initialState: { rtcPeerId }, }) + this._myWorkers.push(vertoWorker) const main = !(this.options.additionalDevice || this.options.screenShare) if (main) { - this.runWorker('roomSubscribedWorker', { + const subscribedWorker = this.runWorker('roomSubscribedWorker', { worker: workers.roomSubscribedWorker, initialState: { rtcPeerId }, }) + this._myWorkers.push(subscribedWorker) - this.runWorker('promoteDemoteWorker', { + const promoteDemoteWorker = this.runWorker('promoteDemoteWorker', { worker: workers.promoteDemoteWorker, initialState: { rtcPeerId }, }) + this._myWorkers.push(promoteDemoteWorker) + } + } + + removeRTCWorkers() { + for (const task of this._myWorkers) { + this.cancelWorker(task) + } + this._myWorkers = [] + } + + private _destroyPeer() { + if (this.peer) { + //clean up previous attempt + this.peer.detachAndStop() + this.removeRTCWorkers() + this.removeRTCPeer(this.peer.uuid) + this.peer = undefined } } /** @internal */ invite(): Promise { - return new Promise(async (resolve, reject) => { - this.direction = 'outbound' - this.peer = this._buildPeer('offer') - try { - await this.peer.start() - resolve(this as any as T) - } catch (error) { - this.logger.error('Invite error', error) - reject(error) - } + return asyncRetry({ + asyncCallable: async () => { + return new Promise(async (resolve, reject) => { + await this._waitUntilSessionAuthorized() + this.direction = 'outbound' + this.peer = this._buildPeer('offer') + try { + await this.peer.start() + resolve(this as any as T) + } catch (error) { + this.logger.error('Invite error', error) + this._destroyPeer() + reject(error) + } + }) + }, + maxRetries: 5, + delayFn: constDelay({ initialDelay: 0 }), + expectedErrorHandler: (error) => { + if (this.requesting && error === SYMBOL_EXECUTE_CONNECTION_CLOSED) { + this.logger.debug('Retrying verto.invite with new RTCPeer') + return false // we should retry + } + // other case are expected to be handle upstream + return true + }, }) } @@ -792,12 +848,18 @@ export class BaseConnection< } /** @internal */ - onLocalSDPReady(rtcPeer: RTCPeer) { - if (!rtcPeer.instance.localDescription) { + async onLocalSDPReady( + rtcPeer: RTCPeer, + defferedLocalDescription?: RTCSessionDescriptionInit + ) { + const { type, sdp } = + rtcPeer.instance.localDescription || defferedLocalDescription || {} + + if (!type || !sdp) { this.logger.error('Missing localDescription', rtcPeer) throw new Error('Invalid RTCPeerConnection localDescription') } - const { type, sdp } = rtcPeer.instance.localDescription + const mungedSDP = this._mungeSDP(sdp) this.logger.debug('LOCAL SDP \n', `Type: ${type}`, '\n\n', mungedSDP) switch (type) { @@ -805,6 +867,7 @@ export class BaseConnection< this._watchSessionAuth() // If we have a remoteDescription already, send reinvite if (!this.resuming && rtcPeer.instance.remoteDescription) { + // TODO test auth_state with media updates return this.executeUpdateMedia(mungedSDP, rtcPeer.uuid) } else { return this.executeInvite(mungedSDP, rtcPeer.uuid) @@ -897,11 +960,17 @@ export class BaseConnection< node_id: nodeId ?? this.options.nodeId, subscribe, }) + if (this.state === 'requesting') { + // The Server Created the call, and now is trying to connect us to the destination + this.setState('trying') + } this.logger.debug('Invite response', response) this.resuming = false } catch (error) { - this.setState('hangup') + if (typeof error !== 'symbol') { + this.setState('hangup') + } throw error } } diff --git a/packages/webrtc/src/workers/promoteDemoteWorker.ts b/packages/webrtc/src/workers/promoteDemoteWorker.ts index 2dbb80d9e..00d81bfdd 100644 --- a/packages/webrtc/src/workers/promoteDemoteWorker.ts +++ b/packages/webrtc/src/workers/promoteDemoteWorker.ts @@ -35,59 +35,63 @@ export const promoteDemoteWorker: SDKWorker< throw new Error('Missing rtcPeerId for promoteDemoteWorker') } - const action: MapToPubSubShape< - VideoMemberPromotedEvent | VideoMemberDemotedEvent - > = yield sagaEffects.take(swEventChannel, (action: SDKActions) => { - if ( - action.type === 'video.member.promoted' || - action.type === 'video.member.demoted' - ) { - return action.payload.member_id === instance.memberId - } - return false - }) + try { + const action: MapToPubSubShape< + VideoMemberPromotedEvent | VideoMemberDemotedEvent + > = yield sagaEffects.take(swEventChannel, (action: SDKActions) => { + if ( + action.type === 'video.member.promoted' || + action.type === 'video.member.demoted' + ) { + return action.payload.member_id === instance.memberId + } + return false + }) - getLogger().debug('promoteDemoteWorker:', action.type, action.payload) + getLogger().debug('promoteDemoteWorker:', action.type, action.payload) - yield sagaEffects.put( - sessionActions.updateAuthorization(action.payload.authorization) - ) - const authorization: VideoAuthorization = yield sagaEffects.select( - selectors.getAuthorization - ) - if (!authorization) { - throw new Error(`Invalid authorization for '${action.type}'`) - } + yield sagaEffects.put( + sessionActions.updateAuthorization(action.payload.authorization) + ) + const authorization: VideoAuthorization = yield sagaEffects.select( + selectors.getAuthorization + ) + if (!authorization) { + throw new Error(`Invalid authorization for '${action.type}'`) + } - // TODO: use the new getJoinMediaParams in here - const { audio_allowed, video_allowed } = authorization - switch (action.type) { - case 'video.member.promoted': - /** - * Promote means enable the media allowed and keep the - * same recv settings. (do not force recv media) - */ - instance.updateMediaOptions({ - audio: audio_allowed === 'both', - video: video_allowed === 'both', - negotiateAudio: audio_allowed !== 'none', - negotiateVideo: video_allowed !== 'none', - }) - break - case 'video.member.demoted': - /** - * Demote means force recvonly and receive only the media allowed. - */ - instance.updateMediaOptions({ - audio: false, - video: false, - negotiateAudio: audio_allowed !== 'none', - negotiateVideo: video_allowed !== 'none', - }) - break - } + // TODO: use the new getJoinMediaParams in here + const { audio_allowed, video_allowed } = authorization + switch (action.type) { + case 'video.member.promoted': + /** + * Promote means enable the media allowed and keep the + * same recv settings. (do not force recv media) + */ + instance.updateMediaOptions({ + audio: audio_allowed === 'both', + video: video_allowed === 'both', + negotiateAudio: audio_allowed !== 'none', + negotiateVideo: video_allowed !== 'none', + }) + break + case 'video.member.demoted': + /** + * Demote means force recvonly and receive only the media allowed. + */ + instance.updateMediaOptions({ + audio: false, + video: false, + negotiateAudio: audio_allowed !== 'none', + negotiateVideo: video_allowed !== 'none', + }) + break + } - instance._triggerNewRTCPeer() + instance._triggerNewRTCPeer() - getLogger().debug('promoteDemoteWorker ended', rtcPeerId) + getLogger().debug('promoteDemoteWorker ended', rtcPeerId) + } finally { + getLogger().debug(`promoteDemoteWorker for ${rtcPeerId} [cancelled]`) + } } diff --git a/packages/webrtc/src/workers/roomSubscribedWorker.ts b/packages/webrtc/src/workers/roomSubscribedWorker.ts index 42f563ff7..bde19c6e2 100644 --- a/packages/webrtc/src/workers/roomSubscribedWorker.ts +++ b/packages/webrtc/src/workers/roomSubscribedWorker.ts @@ -37,47 +37,50 @@ export const roomSubscribedWorker: SDKWorker< if (!rtcPeerId) { throw new Error('Missing rtcPeerId for roomSubscribedWorker') } + try { + const action: MapToPubSubShape = + yield sagaEffects.take(swEventChannel, (action: SDKActions) => { + if ( + action.type === 'video.room.subscribed' || + action.type === 'call.joined' + ) { + return action.payload.call_id === rtcPeerId + } + return false + }) - const action: MapToPubSubShape = - yield sagaEffects.take(swEventChannel, (action: SDKActions) => { - if ( - action.type === 'video.room.subscribed' || - action.type === 'call.joined' - ) { - return action.payload.call_id === rtcPeerId - } - return false - }) - - // New emitter should not change the payload by reference - const clonedPayload = JSON.parse(JSON.stringify(action.payload)) + // New emitter should not change the payload by reference + const clonedPayload = JSON.parse(JSON.stringify(action.payload)) - /** - * In here we joined a room_session so we can swap between RTCPeers - */ - instance.setActiveRTCPeer(rtcPeerId) + /** + * In here we joined a room_session so we can swap between RTCPeers + */ + instance.setActiveRTCPeer(rtcPeerId) - const roomSessionId = - action.payload.room_session.id || - action.payload.room_session.room_session_id + const roomSessionId = + action.payload.room_session.id || + action.payload.room_session.room_session_id - /** - * TODO: Replace the redux action/component with properties on RTCPeer instance? - */ - yield sagaEffects.put( - componentActions.upsert({ - id: action.payload.call_id, - roomId: action.payload.room_session.room_id, - roomSessionId: roomSessionId, - memberId: action.payload.member_id, - previewUrl: action.payload.room_session.preview_url, - }) - ) + /** + * TODO: Replace the redux action/component with properties on RTCPeer instance? + */ + yield sagaEffects.put( + componentActions.upsert({ + id: action.payload.call_id, + roomId: action.payload.room_session.room_id, + roomSessionId: roomSessionId, + memberId: action.payload.member_id, + previewUrl: action.payload.room_session.preview_url, + }) + ) - instance.emit('room.subscribed', action.payload) - instance.emit('room.joined', transformPayload.call(instance, clonedPayload)) + instance.emit('room.subscribed', action.payload) + instance.emit('room.joined', transformPayload.call(instance, clonedPayload)) - getLogger().debug('roomSubscribedWorker ended', rtcPeerId) + getLogger().debug('roomSubscribedWorker ended', rtcPeerId) + } finally { + getLogger().debug(`roomSubscribedWorker for ${rtcPeerId} [cancelled]`) + } } function transformPayload( diff --git a/packages/webrtc/src/workers/vertoEventWorker.ts b/packages/webrtc/src/workers/vertoEventWorker.ts index 05eef564f..5ad5edd9e 100644 --- a/packages/webrtc/src/workers/vertoEventWorker.ts +++ b/packages/webrtc/src/workers/vertoEventWorker.ts @@ -154,16 +154,21 @@ export const vertoEventWorker: SDKWorker< getLogger().error('Verto Error', error) }) - while (true) { - const action: MapToPubSubShape = - yield sagaEffects.take(swEventChannel, (action: SDKActions) => { - if (isWebrtcAction(action)) { - return action.payload.params?.callID === rtcPeerId - } - return false - }) - yield sagaEffects.fork(catchableWorker, action) + try { + while (true) { + const action: MapToPubSubShape = + yield sagaEffects.take(swEventChannel, (action: SDKActions) => { + getLogger().debug( + `vertoEventWorker for: ${rtcPeerId} checking action...` + ) + if (isWebrtcAction(action)) { + return action.payload.params?.callID === rtcPeerId + } + return false + }) + yield sagaEffects.fork(catchableWorker, action) + } + } finally { + getLogger().debug(`vertoEventWorker for ${rtcPeerId} [cancelled]`) } - - getLogger().trace('vertoEventWorker ended') } From 02cb4703857f2168c7578396181a9b5f408dfedc Mon Sep 17 00:00:00 2001 From: Joao Santos Date: Fri, 16 May 2025 13:31:12 -0300 Subject: [PATCH 11/18] fix import --- packages/js/src/fabric/SATSession.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/js/src/fabric/SATSession.ts b/packages/js/src/fabric/SATSession.ts index fd0b65297..5d8c0b694 100644 --- a/packages/js/src/fabric/SATSession.ts +++ b/packages/js/src/fabric/SATSession.ts @@ -10,11 +10,10 @@ import { isConnectRequest, getLogger, isVertoInvite, + SYMBOL_EXECUTE_CONNECTION_CLOSED, } from '@signalwire/core' import { JWTSession } from '../JWTSession' import { SATSessionOptions } from './interfaces' -import { SYMBOL_EXECUTE_CONNECTION_CLOSED } from 'packages/core/src/utils/constants' - /** * SAT Session is for the Call Fabric SDK */ From 595fd47628fe3ad26a745b7f18235bab73cb59f6 Mon Sep 17 00:00:00 2001 From: Joao Santos Date: Fri, 16 May 2025 15:41:07 -0300 Subject: [PATCH 12/18] skip test --- internal/e2e-js/tests/roomSessionReattachWrongProtocol.spec.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/e2e-js/tests/roomSessionReattachWrongProtocol.spec.ts b/internal/e2e-js/tests/roomSessionReattachWrongProtocol.spec.ts index 3022bb514..d11b840fb 100644 --- a/internal/e2e-js/tests/roomSessionReattachWrongProtocol.spec.ts +++ b/internal/e2e-js/tests/roomSessionReattachWrongProtocol.spec.ts @@ -9,7 +9,8 @@ import { } from '../utils' test.describe('RoomSessionReattachWrongProtocol', () => { - test('should handle joining a room, reattaching with wrong protocol ID and then leaving the room', async ({ + // FIXME we're not cleaning the session set when the WS disconnects just after sending the signalwire.connect + test.skip('should handle joining a room, reattaching with wrong protocol ID and then leaving the room', async ({ createCustomPage, }) => { const page = await createCustomPage({ name: '[reattach-bad-auth]' }) From 67b77c04e975347204f55f0a101d7db744a6cb36 Mon Sep 17 00:00:00 2001 From: Joao Santos Date: Mon, 19 May 2025 14:36:15 -0300 Subject: [PATCH 13/18] restore auth retry --- .../roomSessionReattachWrongProtocol.spec.ts | 3 +- packages/core/src/BaseJWTSession.ts | 32 ++++++++++++++++++- packages/js/src/JWTSession.ts | 28 ++++++++++++++-- 3 files changed, 58 insertions(+), 5 deletions(-) diff --git a/internal/e2e-js/tests/roomSessionReattachWrongProtocol.spec.ts b/internal/e2e-js/tests/roomSessionReattachWrongProtocol.spec.ts index d11b840fb..3022bb514 100644 --- a/internal/e2e-js/tests/roomSessionReattachWrongProtocol.spec.ts +++ b/internal/e2e-js/tests/roomSessionReattachWrongProtocol.spec.ts @@ -9,8 +9,7 @@ import { } from '../utils' test.describe('RoomSessionReattachWrongProtocol', () => { - // FIXME we're not cleaning the session set when the WS disconnects just after sending the signalwire.connect - test.skip('should handle joining a room, reattaching with wrong protocol ID and then leaving the room', async ({ + test('should handle joining a room, reattaching with wrong protocol ID and then leaving the room', async ({ createCustomPage, }) => { const page = await createCustomPage({ name: '[reattach-bad-auth]' }) diff --git a/packages/core/src/BaseJWTSession.ts b/packages/core/src/BaseJWTSession.ts index 687e4cc05..29a9be2a6 100644 --- a/packages/core/src/BaseJWTSession.ts +++ b/packages/core/src/BaseJWTSession.ts @@ -22,6 +22,7 @@ export class BaseJWTSession extends BaseSession { */ private readonly _checkTokenExpirationDelay = 20 * 1000 private _checkTokenExpirationTimer: any = null + private _allowAAuthRetry = 1 constructor(public options: SessionOptions) { super(options) @@ -94,8 +95,25 @@ export class BaseJWTSession extends BaseSession { this._rpcConnectResult = await this.execute(RPCConnect(params)) await this.persistRelayProtocol() await this._checkTokenExpiration() + this._allowAAuthRetry = 1 } catch (error) { - this.logger.debug('BaseJWTSession authenticate error', error) + this.logger.debug( + 'BaseJWTSession authenticate error', + error, + this._allowAAuthRetry + ) + if ( + error.message === 'Requester validation failed' && + this._allowAAuthRetry > 0 + ) { + this._allowAAuthRetry -= 1 + // removed the persisted params to try again + this.removeRelayProtocol() + this.removeSwAuthorizationState() + this.removePrevCallId() + await this.authenticate() + return + } throw error } } @@ -109,6 +127,18 @@ export class BaseJWTSession extends BaseSession { // no-op } + removeRelayProtocol() { + // no-op + } + + removeSwAuthorizationState() { + // no-op + } + + removePrevCallId() { + // no-op + } + /** * Reauthenticate with the SignalWire Network * using a newer JWT. If the session has expired diff --git a/packages/js/src/JWTSession.ts b/packages/js/src/JWTSession.ts index 13b130486..a3fe7e1f3 100644 --- a/packages/js/src/JWTSession.ts +++ b/packages/js/src/JWTSession.ts @@ -49,7 +49,7 @@ export class JWTSession extends BaseJWTSession { const { protocolKey } = sessionStorageManager(this.options.token) if (protocolKey) { - this.logger.trace('Search protocol for', protocolKey) + this.logger.debug('Search protocol for', protocolKey) return getStorage()?.getItem(protocolKey) ?? '' } return '' @@ -62,11 +62,27 @@ export class JWTSession extends BaseJWTSession { const { protocolKey } = sessionStorageManager(this.options.token) if (protocolKey) { - this.logger.trace('Persist protocol', protocolKey, this.relayProtocol) + this.logger.debug('Persist protocol', protocolKey, this.relayProtocol) getStorage()?.setItem(protocolKey, this.relayProtocol) } } + override removeRelayProtocol() { + const { protocolKey } = sessionStorageManager(this.options.token) + if (protocolKey) { + this.logger.debug('Remove protocol', protocolKey, this.relayProtocol) + getStorage()?.removeItem(protocolKey) + } + } + + override removePrevCallId() { + const { callIdKey } = sessionStorageManager(this.options.token) + if (callIdKey) { + this.logger.debug('Remove Call', callIdKey) + getStorage()?.removeItem(callIdKey) + } + } + protected override async retrieveSwAuthorizationState() { const { authStateKey } = sessionStorageManager(this.options.token) if (authStateKey) { @@ -89,6 +105,14 @@ export class JWTSession extends BaseJWTSession { } } + override removeSwAuthorizationState() { + const { authStateKey } = sessionStorageManager(this.options.token) + if (authStateKey) { + this.logger.trace('Remove auth state', authStateKey) + getStorage()?.removeItem(authStateKey) + } + } + protected override _onSocketClose(event: SWCloseEvent) { if (this.status === 'disconnected') { const { protocolKey, authStateKey, callIdKey } = sessionStorageManager( From 5c60b1017ca03c21e46ce7d8ffe097974c6f5ea8 Mon Sep 17 00:00:00 2001 From: Joao Santos Date: Wed, 21 May 2025 06:54:04 -0300 Subject: [PATCH 14/18] incoming calls reattach --- packages/js/src/fabric/IncomingCallManager.ts | 9 +++++++++ packages/js/src/fabric/interfaces/incomingCallManager.ts | 1 + 2 files changed, 10 insertions(+) diff --git a/packages/js/src/fabric/IncomingCallManager.ts b/packages/js/src/fabric/IncomingCallManager.ts index a306033d5..d6d56efbd 100644 --- a/packages/js/src/fabric/IncomingCallManager.ts +++ b/packages/js/src/fabric/IncomingCallManager.ts @@ -43,11 +43,20 @@ export class IncomingCallManager { return this.options.executeVertoBye(invite.callID, invite.nodeId) } + const reattach = async (params: CallParams) => { + return this._client.reattach({ + ...params, + nodeId: invite.nodeId, + to: '' + }) + } + return { invite: { details: invite, accept: (params) => accept(params), reject: () => reject(), + reattach: (params) => reattach(params), }, } } diff --git a/packages/js/src/fabric/interfaces/incomingCallManager.ts b/packages/js/src/fabric/interfaces/incomingCallManager.ts index 158e1ee87..a8defcd62 100644 --- a/packages/js/src/fabric/interfaces/incomingCallManager.ts +++ b/packages/js/src/fabric/interfaces/incomingCallManager.ts @@ -23,6 +23,7 @@ export interface IncomingCallNotification { details: IncomingInvite accept: (param: CallParams) => Promise reject: () => Promise + reattach: (param: CallParams) => Promise } } export type IncomingCallHandler = ( From 85c58cc1169ebde8b9fb16d158353018803557ef Mon Sep 17 00:00:00 2001 From: Joao Santos Date: Fri, 23 May 2025 15:19:19 -0300 Subject: [PATCH 15/18] cleanup --- packages/webrtc/src/BaseConnection.ts | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/packages/webrtc/src/BaseConnection.ts b/packages/webrtc/src/BaseConnection.ts index 553a7bbfe..00c84f6eb 100644 --- a/packages/webrtc/src/BaseConnection.ts +++ b/packages/webrtc/src/BaseConnection.ts @@ -99,17 +99,10 @@ export class BaseConnection< return this.state === 'active' } - get newish() { - return this.state === 'new' - } get requesting() { return this.state === 'requesting' } - get destroysh() { - return this.state === 'destroy' - } - get trying() { return this.state === 'trying' } @@ -850,18 +843,13 @@ export class BaseConnection< } /** @internal */ - async onLocalSDPReady( - rtcPeer: RTCPeer, - defferedLocalDescription?: RTCSessionDescriptionInit - ) { - const { type, sdp } = - rtcPeer.instance.localDescription || defferedLocalDescription || {} - - if (!type || !sdp) { + async onLocalSDPReady(rtcPeer: RTCPeer) { + if (!rtcPeer.instance.localDescription) { this.logger.error('Missing localDescription', rtcPeer) throw new Error('Invalid RTCPeerConnection localDescription') } + const { type, sdp } = rtcPeer.instance.localDescription const mungedSDP = this._mungeSDP(sdp) this.logger.debug('LOCAL SDP \n', `Type: ${type}`, '\n\n', mungedSDP) switch (type) { From 406877bed66dfd304987cb22bbde2929c5fb4f07 Mon Sep 17 00:00:00 2001 From: Joao Santos Date: Tue, 27 May 2025 12:31:22 -0300 Subject: [PATCH 16/18] calee reattach fixed --- packages/js/src/fabric/IncomingCallManager.ts | 9 ------ packages/js/src/fabric/WSClient.ts | 28 +++++++++++-------- .../fabric/interfaces/incomingCallManager.ts | 1 - packages/js/src/fabric/interfaces/wsClient.ts | 5 ++++ 4 files changed, 21 insertions(+), 22 deletions(-) diff --git a/packages/js/src/fabric/IncomingCallManager.ts b/packages/js/src/fabric/IncomingCallManager.ts index d6d56efbd..a306033d5 100644 --- a/packages/js/src/fabric/IncomingCallManager.ts +++ b/packages/js/src/fabric/IncomingCallManager.ts @@ -43,20 +43,11 @@ export class IncomingCallManager { return this.options.executeVertoBye(invite.callID, invite.nodeId) } - const reattach = async (params: CallParams) => { - return this._client.reattach({ - ...params, - nodeId: invite.nodeId, - to: '' - }) - } - return { invite: { details: invite, accept: (params) => accept(params), reject: () => reject(), - reattach: (params) => reattach(params), }, } } diff --git a/packages/js/src/fabric/WSClient.ts b/packages/js/src/fabric/WSClient.ts index f946cce8c..4d8b3cddf 100644 --- a/packages/js/src/fabric/WSClient.ts +++ b/packages/js/src/fabric/WSClient.ts @@ -15,6 +15,7 @@ import { buildVideoElement } from '../buildVideoElement' import { CallParams, DialParams, + ReattachParams, IncomingInvite, OnlineParams, HandlePushNotificationParams, @@ -158,20 +159,23 @@ export class WSClient extends BaseClient<{}> implements WSClientContract { return room } - private buildOutboundCall(params: DialParams & { attach?: boolean }) { - const [pathname, query] = params.to.split('?') - if (!pathname) { - throw new Error('Invalid destination address') - } + private buildOutboundCall(params: ReattachParams & { attach?: boolean }) { let video = false let negotiateVideo = false - const queryParams = new URLSearchParams(query) - const channel = queryParams.get('channel') - if (channel === 'video') { - video = true - negotiateVideo = true + if (params.to) { + const [pathname, query] = params.to.split('?') + if (!pathname) { + throw new Error('Invalid destination address') + } + + const queryParams = new URLSearchParams(query) + const channel = queryParams.get('channel') + if (channel === 'video') { + video = true + negotiateVideo = true + } } const call = this.makeFabricObject({ @@ -186,7 +190,7 @@ export class WSClient extends BaseClient<{}> implements WSClientContract { stopMicrophoneWhileMuted: params.stopMicrophoneWhileMuted, mirrorLocalVideoOverlay: params.mirrorLocalVideoOverlay, watchMediaPackets: false, - destinationNumber: params.to, + destinationNumber: params.to ?? '', nodeId: params.nodeId, attach: params.attach ?? false, disableUdpIceServers: params.disableUdpIceServers || false, @@ -319,7 +323,7 @@ export class WSClient extends BaseClient<{}> implements WSClientContract { }) } - public async reattach(params: DialParams) { + public async reattach(params: ReattachParams) { return new Promise(async (resolve, reject) => { try { const call = this.buildOutboundCall({ ...params, attach: true }) diff --git a/packages/js/src/fabric/interfaces/incomingCallManager.ts b/packages/js/src/fabric/interfaces/incomingCallManager.ts index a8defcd62..158e1ee87 100644 --- a/packages/js/src/fabric/interfaces/incomingCallManager.ts +++ b/packages/js/src/fabric/interfaces/incomingCallManager.ts @@ -23,7 +23,6 @@ export interface IncomingCallNotification { details: IncomingInvite accept: (param: CallParams) => Promise reject: () => Promise - reattach: (param: CallParams) => Promise } } export type IncomingCallHandler = ( diff --git a/packages/js/src/fabric/interfaces/wsClient.ts b/packages/js/src/fabric/interfaces/wsClient.ts index fe80faf1a..9cec0baa7 100644 --- a/packages/js/src/fabric/interfaces/wsClient.ts +++ b/packages/js/src/fabric/interfaces/wsClient.ts @@ -124,6 +124,11 @@ export interface DialParams extends CallParams { nodeId?: string } +export interface ReattachParams extends CallParams { + to?: string + nodeId?: string +} + export interface ApiRequestRetriesOptions { /** Increment step for each retry delay */ apiRequestRetriesDelayIncrement?: number From 39f46b022f20b397037f782a419f2f9c5088e796 Mon Sep 17 00:00:00 2001 From: Joao Santos Date: Tue, 27 May 2025 12:33:12 -0300 Subject: [PATCH 17/18] cleanup --- packages/webrtc/src/BaseConnection.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/webrtc/src/BaseConnection.ts b/packages/webrtc/src/BaseConnection.ts index 00c84f6eb..8302f38f0 100644 --- a/packages/webrtc/src/BaseConnection.ts +++ b/packages/webrtc/src/BaseConnection.ts @@ -268,7 +268,7 @@ export class BaseConnection< dialogParams: { id: rtcPeerId, destinationNumber, - attach: attach, + attach, reattaching: attach, callerName, callerNumber, From 9519fc20ebf2d513d4f71aa8a9f090f5222275f5 Mon Sep 17 00:00:00 2001 From: Joao Santos Date: Fri, 30 May 2025 07:04:22 -0300 Subject: [PATCH 18/18] create a new peer to retry a verto.invite --- .changeset/twelve-worlds-mate.md | 7 - .../callfabric/websocket_reconnect.spec.ts | 335 +++++++++++++++++- packages/core/src/BaseJWTSession.ts | 32 +- packages/js/src/JWTSession.ts | 30 +- packages/js/src/fabric/FabricRoomSession.ts | 3 +- packages/js/src/fabric/SATSession.ts | 6 +- packages/js/src/fabric/WSClient.ts | 22 +- packages/js/src/fabric/interfaces/wsClient.ts | 5 - packages/webrtc/src/BaseConnection.ts | 7 +- 9 files changed, 352 insertions(+), 95 deletions(-) delete mode 100644 .changeset/twelve-worlds-mate.md diff --git a/.changeset/twelve-worlds-mate.md b/.changeset/twelve-worlds-mate.md deleted file mode 100644 index 9c34d911b..000000000 --- a/.changeset/twelve-worlds-mate.md +++ /dev/null @@ -1,7 +0,0 @@ ---- -'@sw-internal/e2e-js': patch -'@signalwire/webrtc': patch -'@signalwire/js': patch ---- - -Fix CF network re-connections diff --git a/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts b/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts index c0d94a2cd..60fcb09ff 100644 --- a/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts +++ b/internal/e2e-js/tests/callfabric/websocket_reconnect.spec.ts @@ -1 +1,334 @@ -// TODO needs cloud-product/issues/14634 +import { test } from '../../fixtures' +import { SERVER_URL, createCFClient, expectMCUVisible } from '../../utils' + +test.describe('CallFabric Reconnections', () => { + test.skip('Should reconnect the WebSocket as soon it gets onClose event, without media renegotiation', async ({ + createCustomPage, + }) => { + const page = await createCustomPage({ name: '[page]' }) + + await page.goto(SERVER_URL) + + const roomName = 'cf-e2e-test-room' + + await createCFClient(page) + + page.resetWsTraffic() + // Dial an address and join a video room + await page.evaluate( + async ({ roomName }) => { + return new Promise(async (resolve, _reject) => { + const client = window._client! + + const call = await client.dial({ + to: `/public/${roomName}`, + rootElement: document.getElementById('rootElement'), + }) + + call.on('room.joined', resolve) + call.on('room.updated', () => {}) + + // @ts-expect-error + window._roomObj = call + + await call.start() + }) + }, + { roomName } + ) + + await page.expectWsTraffic({ + assertations: [ + { + type: 'send', + name: 'connect', + expect: { + method: 'signalwire.connect', + 'params.version.major': 4, + }, + }, + { + type: 'recv', + name: 'connect-response', + expect: { + 'result.authorization.jti': /.+/, + 'result.authorization.project_id': + 'cb1e91b6-ae04-4be0-89ae-0dffc5ea6aed', + 'result.authorization.fabric_subscriber.subscriber_id': + '48fe0d0c-ac31-4222-93c9-39590ce92d78', + }, + }, + { + type: 'recv', + name: 'authorization-state', + expect: { + method: 'signalwire.event', + 'params.event_type': 'signalwire.authorization.state', + 'params.params.authorization_state': /.+/, + }, + }, + { + type: 'send', + name: 'invite', + expect: { + method: 'webrtc.verto', + 'params.message.method': 'verto.invite', + 'params.message.params.dialogParams.callID': /.+/, + 'params.message.params.dialogParams.destination_number': + '/public/cf-e2e-test-room', + 'params.message.params.sdp': + /^(?=.*a=setup:actpass.*)(?=.*^m=audio.*)(?=.*^m=video.*)/ms, + }, + }, + { + type: 'recv', + name: 'conversation-call_started', + expect: { + method: 'signalwire.event', + 'params.event_type': 'conversation.message', + 'params.params.type': 'message', + 'params.params.kind': 'call_started', + }, + }, + { + type: 'recv', + name: 'call-created', + expect: { + 'result.code': '200', + 'result.result.result.message': 'CALL CREATED', + }, + }, + { + type: 'recv', + name: 'verto-answer', + expect: { + method: 'signalwire.event', + 'params.event_type': 'webrtc.message', + 'params.params.method': 'verto.answer', + 'params.params.params.sdp': + /^(?=.*a=setup:(?:active|passive))(?=.*^m=audio.*)(?=.*^m=video.*)/ms, + }, + }, + { + type: 'recv', + name: 'mediaParams', + expect: { + method: 'signalwire.event', + 'params.event_type': 'webrtc.message', + 'params.params.method': 'verto.mediaParams', + }, + }, + { + type: 'recv', + name: 'mediaParams', + expect: { + method: 'signalwire.event', + 'params.event_type': 'call.joined', + }, + }, + ], + }) + + await expectMCUVisible(page) + + // simulate ws + page.resetWsTraffic() + await page.evaluate(async () => { + //@ts-ignore + window._roomObj._closeWSConnection() + return new Promise((res) => { + setTimeout(() => res(null), 15000) + }) + }) + + await page.expectWsTraffic({ + assertations: [ + { + type: 'send', + name: 'reconnect', + expect: { + method: 'signalwire.connect', + 'params.version.major': 4, + 'params.authorization_state': /.+/, + }, + }, + { + type: 'send', + name: 'invite', + expectNot: { + method: 'webrtc.verto', + 'params.message.method': 'verto.invite', + 'params.message.params.dialogParams.callID': /.+/, + 'params.message.params.dialogParams.destination_number': /^\/.+/, + 'params.message.params.sdp': + /^(?=.*a=setup:actpass.*)(?=.*^m=audio.*)(?=.*^m=video.*)/ms, + }, + }, + ], + }) + }) + + test.skip('Should reconnect the WebSocket when network is up (before FS timeout), without media renegotiation', async ({ + createCustomPage, + }) => { + const page = await createCustomPage({ name: '[page]' }) + + await page.goto(SERVER_URL) + + const roomName = 'cf-e2e-test-room' + + await createCFClient(page) + + page.resetWsTraffic() + // Dial an address and join a video room + await page.evaluate( + async ({ roomName }) => { + return new Promise(async (resolve, _reject) => { + const client = window._client! + + const call = await client.dial({ + to: `/public/${roomName}`, + rootElement: document.getElementById('rootElement'), + }) + + call.on('room.joined', resolve) + call.on('room.updated', () => {}) + + // @ts-expect-error + window._roomObj = call + + await call.start() + }) + }, + { roomName } + ) + + await page.expectWsTraffic({ + assertations: [ + { + type: 'send', + name: 'connect', + expect: { + method: 'signalwire.connect', + 'params.version.major': 4, + }, + }, + { + type: 'recv', + name: 'connect-response', + expect: { + 'result.authorization.jti': /.+/, + 'result.authorization.project_id': + 'cb1e91b6-ae04-4be0-89ae-0dffc5ea6aed', + 'result.authorization.fabric_subscriber.subscriber_id': + '48fe0d0c-ac31-4222-93c9-39590ce92d78', + }, + }, + { + type: 'recv', + name: 'authorization-state', + expect: { + method: 'signalwire.event', + 'params.event_type': 'signalwire.authorization.state', + 'params.params.authorization_state': /.+/, + }, + }, + { + type: 'send', + name: 'invite', + expect: { + method: 'webrtc.verto', + 'params.message.method': 'verto.invite', + 'params.message.params.dialogParams.callID': /.+/, + 'params.message.params.dialogParams.destination_number': + '/public/cf-e2e-test-room', + 'params.message.params.sdp': + /^(?=.*a=setup:actpass.*)(?=.*^m=audio.*)(?=.*^m=video.*)/ms, + }, + }, + { + type: 'recv', + name: 'conversation-call_started', + expect: { + method: 'signalwire.event', + 'params.event_type': 'conversation.message', + 'params.params.type': 'message', + 'params.params.kind': 'call_started', + }, + }, + { + type: 'recv', + name: 'call-created', + expect: { + 'result.code': '200', + 'result.result.result.message': 'CALL CREATED', + }, + }, + { + type: 'recv', + name: 'verto-answer', + expect: { + method: 'signalwire.event', + 'params.event_type': 'webrtc.message', + 'params.params.method': 'verto.answer', + 'params.params.params.sdp': + /^(?=.*a=setup:(?:active|passive))(?=.*^m=audio.*)(?=.*^m=video.*)/ms, + }, + }, + { + type: 'recv', + name: 'mediaParams', + expect: { + method: 'signalwire.event', + 'params.event_type': 'webrtc.message', + 'params.params.method': 'verto.mediaParams', + }, + }, + { + type: 'recv', + name: 'mediaParams', + expect: { + method: 'signalwire.event', + 'params.event_type': 'call.joined', + }, + }, + ], + }) + + await expectMCUVisible(page) + + page.resetWsTraffic() + await page.swNetworkDown() + await page.waitForTimeout(14_500) + await page.swNetworkUp() + + //wait network traffic + await page.waitForTimeout(5000) + + await page.expectWsTraffic({ + assertations: [ + { + type: 'send', + name: 'reconnect', + expect: { + method: 'signalwire.connect', + 'params.version.major': 4, + 'params.authorization_state': /.+/, + }, + }, + { + type: 'send', + name: 'invite', + expectNot: { + method: 'webrtc.verto', + 'params.message.method': 'verto.invite', + 'params.message.params.dialogParams.callID': /.+/, + 'params.message.params.dialogParams.destination_number': /^\/.+/, + 'params.message.params.sdp': + /^(?=.*a=setup:actpass.*)(?=.*^m=audio.*)(?=.*^m=video.*)/ms, + }, + }, + ], + }) + }) +}) diff --git a/packages/core/src/BaseJWTSession.ts b/packages/core/src/BaseJWTSession.ts index 29a9be2a6..687e4cc05 100644 --- a/packages/core/src/BaseJWTSession.ts +++ b/packages/core/src/BaseJWTSession.ts @@ -22,7 +22,6 @@ export class BaseJWTSession extends BaseSession { */ private readonly _checkTokenExpirationDelay = 20 * 1000 private _checkTokenExpirationTimer: any = null - private _allowAAuthRetry = 1 constructor(public options: SessionOptions) { super(options) @@ -95,25 +94,8 @@ export class BaseJWTSession extends BaseSession { this._rpcConnectResult = await this.execute(RPCConnect(params)) await this.persistRelayProtocol() await this._checkTokenExpiration() - this._allowAAuthRetry = 1 } catch (error) { - this.logger.debug( - 'BaseJWTSession authenticate error', - error, - this._allowAAuthRetry - ) - if ( - error.message === 'Requester validation failed' && - this._allowAAuthRetry > 0 - ) { - this._allowAAuthRetry -= 1 - // removed the persisted params to try again - this.removeRelayProtocol() - this.removeSwAuthorizationState() - this.removePrevCallId() - await this.authenticate() - return - } + this.logger.debug('BaseJWTSession authenticate error', error) throw error } } @@ -127,18 +109,6 @@ export class BaseJWTSession extends BaseSession { // no-op } - removeRelayProtocol() { - // no-op - } - - removeSwAuthorizationState() { - // no-op - } - - removePrevCallId() { - // no-op - } - /** * Reauthenticate with the SignalWire Network * using a newer JWT. If the session has expired diff --git a/packages/js/src/JWTSession.ts b/packages/js/src/JWTSession.ts index a3fe7e1f3..72e8d8e90 100644 --- a/packages/js/src/JWTSession.ts +++ b/packages/js/src/JWTSession.ts @@ -49,7 +49,7 @@ export class JWTSession extends BaseJWTSession { const { protocolKey } = sessionStorageManager(this.options.token) if (protocolKey) { - this.logger.debug('Search protocol for', protocolKey) + this.logger.trace('Search protocol for', protocolKey) return getStorage()?.getItem(protocolKey) ?? '' } return '' @@ -62,27 +62,11 @@ export class JWTSession extends BaseJWTSession { const { protocolKey } = sessionStorageManager(this.options.token) if (protocolKey) { - this.logger.debug('Persist protocol', protocolKey, this.relayProtocol) + this.logger.trace('Persist protocol', protocolKey, this.relayProtocol) getStorage()?.setItem(protocolKey, this.relayProtocol) } } - override removeRelayProtocol() { - const { protocolKey } = sessionStorageManager(this.options.token) - if (protocolKey) { - this.logger.debug('Remove protocol', protocolKey, this.relayProtocol) - getStorage()?.removeItem(protocolKey) - } - } - - override removePrevCallId() { - const { callIdKey } = sessionStorageManager(this.options.token) - if (callIdKey) { - this.logger.debug('Remove Call', callIdKey) - getStorage()?.removeItem(callIdKey) - } - } - protected override async retrieveSwAuthorizationState() { const { authStateKey } = sessionStorageManager(this.options.token) if (authStateKey) { @@ -105,16 +89,8 @@ export class JWTSession extends BaseJWTSession { } } - override removeSwAuthorizationState() { - const { authStateKey } = sessionStorageManager(this.options.token) - if (authStateKey) { - this.logger.trace('Remove auth state', authStateKey) - getStorage()?.removeItem(authStateKey) - } - } - protected override _onSocketClose(event: SWCloseEvent) { - if (this.status === 'disconnected') { + if (this.status === 'unknown' || this.status === 'disconnected') { const { protocolKey, authStateKey, callIdKey } = sessionStorageManager( this.options.token ) diff --git a/packages/js/src/fabric/FabricRoomSession.ts b/packages/js/src/fabric/FabricRoomSession.ts index a009529b3..77648388a 100644 --- a/packages/js/src/fabric/FabricRoomSession.ts +++ b/packages/js/src/fabric/FabricRoomSession.ts @@ -181,8 +181,7 @@ export class FabricRoomSessionConnection `[resume] connectionState for ${this.id} is '${connectionState}'` ) if (['closed', 'failed', 'disconnected'].includes(connectionState)) { - // should not resume when selfMember is defined (the SDK didn't lost its state since the `call.joined` was received) - this.resuming = !this.selfMember + this.resuming = true this.peer.restartIce() } } diff --git a/packages/js/src/fabric/SATSession.ts b/packages/js/src/fabric/SATSession.ts index 5d8c0b694..640b50e34 100644 --- a/packages/js/src/fabric/SATSession.ts +++ b/packages/js/src/fabric/SATSession.ts @@ -11,9 +11,11 @@ import { getLogger, isVertoInvite, SYMBOL_EXECUTE_CONNECTION_CLOSED, + SYMBOL_EXECUTE_TIMEOUT, } from '@signalwire/core' import { JWTSession } from '../JWTSession' import { SATSessionOptions } from './interfaces' + /** * SAT Session is for the Call Fabric SDK */ @@ -89,8 +91,8 @@ export class SATSession extends JWTSession { // `signalwire.connect` retries are handle by the connection return true } - if (isVertoInvite(msg) && error === SYMBOL_EXECUTE_CONNECTION_CLOSED) { - // we can't retry verto.invites in the transport layer + if (isVertoInvite(msg) && ![SYMBOL_EXECUTE_CONNECTION_CLOSED, SYMBOL_EXECUTE_TIMEOUT].includes(error)) { + // we can't retry verto.invites after errors on the transport layer getLogger().debug('skip verto.invite retry on error:', error) return true } diff --git a/packages/js/src/fabric/WSClient.ts b/packages/js/src/fabric/WSClient.ts index 4d8b3cddf..39ec26e45 100644 --- a/packages/js/src/fabric/WSClient.ts +++ b/packages/js/src/fabric/WSClient.ts @@ -15,7 +15,6 @@ import { buildVideoElement } from '../buildVideoElement' import { CallParams, DialParams, - ReattachParams, IncomingInvite, OnlineParams, HandlePushNotificationParams, @@ -26,8 +25,6 @@ import { IncomingCallManager } from './IncomingCallManager' import { wsClientWorker } from './workers' import { createWSClient } from './createWSClient' import { WSClientContract } from './interfaces/wsClient' -import { getStorage } from '../utils/storage' -import { PREVIOUS_CALLID_STORAGE_KEY } from './utils/constants' export class WSClient extends BaseClient<{}> implements WSClientContract { private _incomingCallManager: IncomingCallManager @@ -159,24 +156,21 @@ export class WSClient extends BaseClient<{}> implements WSClientContract { return room } - private buildOutboundCall(params: ReattachParams & { attach?: boolean }) { + private buildOutboundCall(params: DialParams & { attach?: boolean }) { + const [pathname, query] = params.to.split('?') + if (!pathname) { + throw new Error('Invalid destination address') + } let video = false let negotiateVideo = false - if (params.to) { - const [pathname, query] = params.to.split('?') - if (!pathname) { - throw new Error('Invalid destination address') - } - const queryParams = new URLSearchParams(query) const channel = queryParams.get('channel') if (channel === 'video') { video = true negotiateVideo = true } - } const call = this.makeFabricObject({ audio: params.audio ?? true, @@ -190,7 +184,7 @@ export class WSClient extends BaseClient<{}> implements WSClientContract { stopMicrophoneWhileMuted: params.stopMicrophoneWhileMuted, mirrorLocalVideoOverlay: params.mirrorLocalVideoOverlay, watchMediaPackets: false, - destinationNumber: params.to ?? '', + destinationNumber: params.to, nodeId: params.nodeId, attach: params.attach ?? false, disableUdpIceServers: params.disableUdpIceServers || false, @@ -312,8 +306,6 @@ export class WSClient extends BaseClient<{}> implements WSClientContract { public async dial(params: DialParams) { return new Promise(async (resolve, reject) => { try { - // in case the user left the previous call with hangup, and is not reattaching - getStorage()?.removeItem(PREVIOUS_CALLID_STORAGE_KEY) const call = this.buildOutboundCall(params) resolve(call) } catch (error) { @@ -323,7 +315,7 @@ export class WSClient extends BaseClient<{}> implements WSClientContract { }) } - public async reattach(params: ReattachParams) { + public async reattach(params: DialParams) { return new Promise(async (resolve, reject) => { try { const call = this.buildOutboundCall({ ...params, attach: true }) diff --git a/packages/js/src/fabric/interfaces/wsClient.ts b/packages/js/src/fabric/interfaces/wsClient.ts index 9cec0baa7..fe80faf1a 100644 --- a/packages/js/src/fabric/interfaces/wsClient.ts +++ b/packages/js/src/fabric/interfaces/wsClient.ts @@ -124,11 +124,6 @@ export interface DialParams extends CallParams { nodeId?: string } -export interface ReattachParams extends CallParams { - to?: string - nodeId?: string -} - export interface ApiRequestRetriesOptions { /** Increment step for each retry delay */ apiRequestRetriesDelayIncrement?: number diff --git a/packages/webrtc/src/BaseConnection.ts b/packages/webrtc/src/BaseConnection.ts index 8302f38f0..b74e227a4 100644 --- a/packages/webrtc/src/BaseConnection.ts +++ b/packages/webrtc/src/BaseConnection.ts @@ -23,6 +23,7 @@ import { asyncRetry, constDelay, SYMBOL_EXECUTE_CONNECTION_CLOSED, + SYMBOL_EXECUTE_TIMEOUT, } from '@signalwire/core' import type { ReduxComponent, VertoModifyResponse } from '@signalwire/core' import RTCPeer from './RTCPeer' @@ -815,7 +816,7 @@ export class BaseConnection< maxRetries: 5, delayFn: constDelay({ initialDelay: 0 }), expectedErrorHandler: (error) => { - if (this.requesting && error === SYMBOL_EXECUTE_CONNECTION_CLOSED) { + if (this.requesting && [SYMBOL_EXECUTE_CONNECTION_CLOSED, SYMBOL_EXECUTE_TIMEOUT].includes(error)) { // eslint-disable-line max-len, no-nested-ternaryerror === SYMBOL_EXECUTE_CONNECTION_CLOSED) { this.logger.debug('Retrying verto.invite with new RTCPeer') return false // we should retry } @@ -848,7 +849,6 @@ export class BaseConnection< this.logger.error('Missing localDescription', rtcPeer) throw new Error('Invalid RTCPeerConnection localDescription') } - const { type, sdp } = rtcPeer.instance.localDescription const mungedSDP = this._mungeSDP(sdp) this.logger.debug('LOCAL SDP \n', `Type: ${type}`, '\n\n', mungedSDP) @@ -857,7 +857,6 @@ export class BaseConnection< this._watchSessionAuth() // If we have a remoteDescription already, send reinvite if (!this.resuming && rtcPeer.instance.remoteDescription) { - // TODO test auth_state with media updates return this.executeUpdateMedia(mungedSDP, rtcPeer.uuid) } else { return this.executeInvite(mungedSDP, rtcPeer.uuid) @@ -958,9 +957,7 @@ export class BaseConnection< this.resuming = false } catch (error) { - if (typeof error !== 'symbol') { this.setState('hangup') - } throw error } }