Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"@protobuf-ts/runtime": "^2.11.1",
"@protobuf-ts/runtime-rpc": "^2.11.1",
"@protobuf-ts/twirp-transport": "^2.11.1",
"@stream-io/logger": "^1.2.2",
"@stream-io/worker-timer": "^1.2.4",
"axios": "^1.12.2",
"rxjs": "~7.8.2",
Expand Down
85 changes: 37 additions & 48 deletions packages/client/src/Call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
createSubscription,
getCurrentValue,
} from './store/rxUtils';
import type { ScopedLogger } from './logger';
import type {
AcceptCallResponse,
BlockUserRequest,
Expand Down Expand Up @@ -136,12 +137,11 @@ import {
AllCallEvents,
CallEventListener,
ErrorFromResponse,
Logger,
RejectReason,
StreamCallEvent,
} from './coordinator/connection/types';
import { getClientDetails } from './helpers/client-details';
import { getLogger } from './logger';
import { getLogger } from '@stream-io/logger';
import {
CameraManager,
MicrophoneManager,
Expand Down Expand Up @@ -226,7 +226,7 @@ export class Call {
*/
readonly permissionsContext = new PermissionsContext();
readonly tracer = new Tracer(null);
readonly logger: Logger;
readonly logger: ScopedLogger;

/**
* The event dispatcher instance dedicated to this Call instance.
Expand Down Expand Up @@ -309,7 +309,7 @@ export class Call {
this.streamClient = streamClient;
this.clientStore = clientStore;
this.streamClientBasePath = `/call/${this.type}/${this.id}`;
this.logger = getLogger(['Call']);
this.logger = getLogger('Call');

const callTypeConfig = CallTypes.get(type);
const participantSorter =
Expand Down Expand Up @@ -393,9 +393,9 @@ export class Call {
if (!blockedUserIds || blockedUserIds.length === 0) return;
const currentUserId = this.currentUserId;
if (currentUserId && blockedUserIds.includes(currentUserId)) {
this.logger('info', 'Leaving call because of being blocked');
this.logger.info('Leaving call because of being blocked');
await this.leave({ message: 'user blocked' }).catch((err) => {
this.logger('error', 'Error leaving call after being blocked', err);
this.logger.error('Error leaving call after being blocked', err);
});
}
}),
Expand Down Expand Up @@ -439,8 +439,7 @@ export class Call {
!hasPending(this.joinLeaveConcurrencyTag)
) {
this.leave().catch(() => {
this.logger(
'error',
this.logger.error(
'Could not leave a call that was accepted or rejected elsewhere',
);
});
Expand Down Expand Up @@ -523,8 +522,7 @@ export class Call {
break;
}
} catch (err) {
this.logger(
'error',
this.logger.error(
`Can't disable mic/camera/screenshare after revoked permissions`,
err,
);
Expand Down Expand Up @@ -883,12 +881,12 @@ export class Call {
maxJoinRetries = Math.max(maxJoinRetries, 1);
for (let attempt = 0; attempt < maxJoinRetries; attempt++) {
try {
this.logger('trace', `Joining call (${attempt})`, this.cid);
this.logger.trace(`Joining call (${attempt})`, this.cid);
await this.doJoin(data);
delete joinData.migrating_from;
break;
} catch (err) {
this.logger('warn', `Failed to join call (${attempt})`, this.cid);
this.logger.warn(`Failed to join call (${attempt})`, this.cid);
if (err instanceof ErrorFromResponse && err.unrecoverable) {
// if the error is unrecoverable, we should not retry as that signals
// that connectivity is good, but the coordinator doesn't allow the user
Expand Down Expand Up @@ -924,7 +922,7 @@ export class Call {

this.joinCallData = data;

this.logger('debug', 'Starting join flow');
this.logger.debug('Starting join flow');
this.state.setCallingState(CallingState.JOINING);

const performingMigration =
Expand Down Expand Up @@ -1028,7 +1026,7 @@ export class Call {
);
}
} catch (error) {
this.logger('warn', 'Join SFU request failed', error);
this.logger.warn('Join SFU request failed', error);
sfuClient.close(
StreamSfuClient.JOIN_FAILED,
'Join request failed, connection considered unhealthy',
Expand Down Expand Up @@ -1100,7 +1098,7 @@ export class Call {
this.reconnectStrategy = WebsocketReconnectStrategy.UNSPECIFIED;
this.reconnectReason = '';

this.logger('info', `Joined call ${this.cid}`);
this.logger.info(`Joined call ${this.cid}`);
};

/**
Expand Down Expand Up @@ -1247,7 +1245,7 @@ export class Call {
onReconnectionNeeded: (kind, reason) => {
this.reconnect(kind, reason).catch((err) => {
const message = `[Reconnect] Error reconnecting after a subscriber error: ${reason}`;
this.logger('warn', message, err);
this.logger.warn(message, err);
});
},
});
Expand All @@ -1270,7 +1268,7 @@ export class Call {
onReconnectionNeeded: (kind, reason) => {
this.reconnect(kind, reason).catch((err) => {
const message = `[Reconnect] Error reconnecting after a publisher error: ${reason}`;
this.logger('warn', message, err);
this.logger.warn(message, err);
});
},
});
Expand Down Expand Up @@ -1355,7 +1353,7 @@ export class Call {
sfuClient: StreamSfuClient,
reason: string,
) => {
this.logger('debug', '[Reconnect] SFU signal connection closed');
this.logger.debug('[Reconnect] SFU signal connection closed');
const { callingState } = this.state;
if (
// SFU WS closed before we finished current join,
Expand All @@ -1378,7 +1376,7 @@ export class Call {
? WebsocketReconnectStrategy.FAST
: WebsocketReconnectStrategy.REJOIN;
this.reconnect(strategy, reason).catch((err) => {
this.logger('warn', '[Reconnect] Error reconnecting', err);
this.logger.warn('[Reconnect] Error reconnecting', err);
});
};

Expand Down Expand Up @@ -1424,8 +1422,7 @@ export class Call {
reconnectingTime / 1000 > this.disconnectionTimeoutSeconds;

if (shouldGiveUpReconnecting) {
this.logger(
'warn',
this.logger.warn(
'[Reconnect] Stopping reconnection attempts after reaching disconnection timeout',
);
await markAsReconnectingFailed();
Expand All @@ -1442,16 +1439,14 @@ export class Call {
// wait until the network is available
await this.networkAvailableTask?.promise;

this.logger(
'info',
this.logger.info(
`[Reconnect] Reconnecting with strategy ${WebsocketReconnectStrategy[this.reconnectStrategy]}`,
);

switch (this.reconnectStrategy) {
case WebsocketReconnectStrategy.UNSPECIFIED:
case WebsocketReconnectStrategy.DISCONNECT:
this.logger(
'debug',
this.logger.debug(
`[Reconnect] No-op strategy ${currentStrategy}`,
);
break;
Expand All @@ -1474,17 +1469,15 @@ export class Call {
break; // do-while loop, reconnection worked, exit the loop
} catch (error) {
if (this.state.callingState === CallingState.OFFLINE) {
this.logger(
'debug',
this.logger.debug(
`[Reconnect] Can't reconnect while offline, stopping reconnection attempts`,
);
break;
// we don't need to handle the error if the call is offline
// network change event will trigger the reconnection
}
if (error instanceof ErrorFromResponse && error.unrecoverable) {
this.logger(
'warn',
this.logger.warn(
`[Reconnect] Can't reconnect due to coordinator unrecoverable error`,
error,
);
Expand Down Expand Up @@ -1517,8 +1510,7 @@ export class Call {
: WebsocketReconnectStrategy.FAST;
this.reconnectStrategy = nextStrategy;

this.logger(
'info',
this.logger.info(
`[Reconnect] ${currentStrategy} (${this.reconnectAttempts}) failed. Attempting with ${WebsocketReconnectStrategy[nextStrategy]}`,
error,
);
Expand All @@ -1528,7 +1520,7 @@ export class Call {
this.state.callingState !== CallingState.RECONNECTING_FAILED &&
this.state.callingState !== CallingState.LEFT
);
this.logger('info', '[Reconnect] Reconnection flow finished');
this.logger.info('[Reconnect] Reconnection flow finished');
});
};

Expand Down Expand Up @@ -1630,7 +1622,7 @@ export class Call {
// handles the legacy "goAway" event
const unregisterGoAway = this.on('goAway', () => {
this.reconnect(WebsocketReconnectStrategy.MIGRATE, 'goAway').catch(
(err) => this.logger('warn', '[Reconnect] Error reconnecting', err),
(err) => this.logger.warn('[Reconnect] Error reconnecting', err),
);
});

Expand All @@ -1640,11 +1632,11 @@ export class Call {
if (strategy === WebsocketReconnectStrategy.UNSPECIFIED) return;
if (strategy === WebsocketReconnectStrategy.DISCONNECT) {
this.leave({ message: 'SFU instructed to disconnect' }).catch((err) => {
this.logger('warn', `Can't leave call after disconnect request`, err);
this.logger.warn(`Can't leave call after disconnect request`, err);
});
} else {
this.reconnect(strategy, error?.message || 'SFU Error').catch((err) => {
this.logger('warn', '[Reconnect] Error reconnecting', err);
this.logger.warn('[Reconnect] Error reconnecting', err);
});
}
});
Expand All @@ -1654,7 +1646,7 @@ export class Call {
(e) => {
this.tracer.trace('network.changed', e);
if (!e.online) {
this.logger('debug', '[Reconnect] Going offline');
this.logger.debug('[Reconnect] Going offline');
if (!this.hasJoinedOnce) return;
this.lastOfflineTimestamp = Date.now();
// create a new task that would resolve when the network is available
Expand All @@ -1671,8 +1663,7 @@ export class Call {
}

this.reconnect(strategy, 'Going online').catch((err) => {
this.logger(
'warn',
this.logger.warn(
'[Reconnect] Error reconnecting after going online',
err,
);
Expand All @@ -1682,7 +1673,7 @@ export class Call {
this.sfuStatsReporter?.stop();
this.state.setCallingState(CallingState.OFFLINE);
} else {
this.logger('debug', '[Reconnect] Going online');
this.logger.debug('[Reconnect] Going online');
this.sfuClient?.close(
StreamSfuClient.DISPOSE_OLD_SOCKET,
'Closing WS to reconnect after going online',
Expand Down Expand Up @@ -1872,14 +1863,12 @@ export class Call {
* @param options the options to use.
*/
updatePublishOptions = (options: ClientPublishOptions) => {
this.logger(
'warn',
this.logger.warn(
'[call.updatePublishOptions]: You are manually overriding the publish options for this call. ' +
'This is not recommended, and it can cause call stability/compatibility issues. Use with caution.',
);
if (this.state.callingState === CallingState.JOINED) {
this.logger(
'warn',
this.logger.warn(
'Updating publish options after joining the call does not have an effect',
);
}
Expand All @@ -1893,7 +1882,7 @@ export class Call {
*/
notifyNoiseCancellationStarting = async () => {
return this.sfuClient?.startNoiseCancellation().catch((err) => {
this.logger('warn', 'Failed to notify start of noise cancellation', err);
this.logger.warn('Failed to notify start of noise cancellation', err);
});
};

Expand All @@ -1904,7 +1893,7 @@ export class Call {
*/
notifyNoiseCancellationStopped = async () => {
return this.sfuClient?.stopNoiseCancellation().catch((err) => {
this.logger('warn', 'Failed to notify stop of noise cancellation', err);
this.logger.warn('Failed to notify stop of noise cancellation', err);
});
};

Expand Down Expand Up @@ -2489,7 +2478,7 @@ export class Call {
reason: 'timeout',
message: `ringing timeout - ${this.isCreatedByMe ? 'no one accepted' : `user didn't interact with incoming call screen`}`,
}).catch((err) => {
this.logger('error', 'Failed to drop call', err);
this.logger.error('Failed to drop call', err);
});
}, timeoutInMs);
};
Expand Down Expand Up @@ -2616,10 +2605,10 @@ export class Call {
publish: boolean,
) => {
await this.camera.apply(settings.video, publish).catch((err) => {
this.logger('warn', 'Camera init failed', err);
this.logger.warn('Camera init failed', err);
});
await this.microphone.apply(settings.audio, publish).catch((err) => {
this.logger('warn', 'Mic init failed', err);
this.logger.warn('Mic init failed', err);
});
};

Expand Down
Loading
Loading