Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 96 additions & 9 deletions packages/livekit-rtc/src/participant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -353,15 +353,7 @@ export class LocalParticipant extends Participant {
return writer.info;
}

async streamBytes(options?: {
name?: string;
topic?: string;
attributes?: Record<string, string>;
destinationIdentities?: Array<string>;
streamId?: string;
mimeType?: string;
totalSize?: number;
}) {
async streamBytes(options?: ByteStreamOptions & { streamId?: string; totalSize?: number }) {
const senderIdentity = this.identity;
const streamId = options?.streamId ?? crypto.randomUUID();
const destinationIdentities = options?.destinationIdentities;
Expand Down Expand Up @@ -481,6 +473,101 @@ export class LocalParticipant extends Participant {
}
}

/** Sends raw bytes or byte-like data to specified recipients */
async sendBytes(
data:
| Uint8Array
| ArrayBuffer
| Blob
| ReadableStream<Uint8Array>
| NodeJS.ReadableStream,
options?: ByteStreamOptions,
) {
const streamId = crypto.randomUUID();
const destinationIdentities = options?.destinationIdentities;

// Determine total size if available and infer mime when possible
let totalSize: number | undefined;
let inferredMime: string | undefined;

if (data instanceof Uint8Array) {
totalSize = data.byteLength;
} else if (data instanceof ArrayBuffer) {
totalSize = data.byteLength;
} else if (typeof Blob !== 'undefined' && data instanceof Blob) {
totalSize = data.size;
inferredMime = (data as Blob).type || undefined;
}

const writer = await this.streamBytes({
streamId,
name: options?.name ?? 'unknown',
totalSize,
destinationIdentities,
topic: options?.topic,
mimeType: options?.mimeType ?? inferredMime,
attributes: options?.attributes,
});

let bytesSent = 0;
const maybeReportProgress = (increment: number) => {
bytesSent += increment;
if (options?.onProgress && totalSize !== undefined && totalSize > 0) {
const progress = Math.min(1, bytesSent / totalSize);
options.onProgress(progress);
}
};

// Helper to write from a Web ReadableStream
const writeFromWebStream = async (rs: ReadableStream<Uint8Array>) => {
const reader = rs.getReader();
try {
while (true) {
const { value, done } = await reader.read();
if (done) break;
if (value) {
await writer.write(value);
maybeReportProgress(value.byteLength);
}
}
} finally {
reader.releaseLock();
}
};

if (data instanceof Uint8Array) {
await writer.write(data);
maybeReportProgress(data.byteLength);
} else if (data instanceof ArrayBuffer) {
const bytes = new Uint8Array(data);
await writer.write(bytes);
maybeReportProgress(bytes.byteLength);
} else if (typeof Blob !== 'undefined' && data instanceof Blob) {
await writeFromWebStream(data.stream() as ReadableStream<Uint8Array>);
} else if (
typeof data === 'object' &&
data !== null &&
'getReader' in data &&
typeof (data as ReadableStream<Uint8Array>).getReader === 'function'
) {
// Treat as Web ReadableStream
await writeFromWebStream(data as ReadableStream<Uint8Array>);
} else if (typeof data === 'object' && data !== null && Symbol.asyncIterator in data) {
// Treat as AsyncIterable (e.g., Node.js Readable of Uint8Array/Buffer)
for await (const chunk of data as AsyncIterable<Uint8Array>) {
await writer.write(chunk);
maybeReportProgress(chunk.byteLength);
}
} else {
throw new Error('Unsupported data type for sendBytes');
}

await writer.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be in a finally clause as you're throwing an error just above and the stream has already been opened

if (options?.onProgress && totalSize !== undefined) {
options.onProgress(1);
}
}

private async sendStreamHeader(req: SendStreamHeaderRequest) {
const type = 'sendStreamHeader';
const res = FfiClient.instance.request<SendStreamHeaderResponse>({
Expand Down
Loading