From 388726d9e3d7f26b069596be69f093f39f77a4ae Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Wed, 28 May 2025 21:14:16 -0400 Subject: [PATCH 1/9] init --- agents/src/tts/stream_adapter.ts | 8 ++- agents/src/tts/tts.ts | 106 +++++++++++++++++++++++++------ agents/src/vad.ts | 5 +- 3 files changed, 95 insertions(+), 24 deletions(-) diff --git a/agents/src/tts/stream_adapter.ts b/agents/src/tts/stream_adapter.ts index e1ab402d..58f6d816 100644 --- a/agents/src/tts/stream_adapter.ts +++ b/agents/src/tts/stream_adapter.ts @@ -51,7 +51,9 @@ export class StreamAdapterWrapper extends SynthesizeStream { async #run() { const forwardInput = async () => { - for await (const input of this.input) { + while (true) { + const { done, value: input } = await this.inputReader.read(); + if (done) break; if (input === SynthesizeStream.FLUSH_SENTINEL) { this.#sentenceStream.flush(); } else { @@ -65,10 +67,10 @@ export class StreamAdapterWrapper extends SynthesizeStream { const synthesize = async () => { for await (const ev of this.#sentenceStream) { for await (const audio of this.#tts.synthesize(ev.token)) { - this.output.put(audio); + this.outputWriter.write(audio); } } - this.output.put(SynthesizeStream.END_OF_STREAM); + this.outputWriter.write(SynthesizeStream.END_OF_STREAM); }; Promise.all([forwardInput(), synthesize()]); diff --git a/agents/src/tts/tts.ts b/agents/src/tts/tts.ts index 7826b446..51112ca4 100644 --- a/agents/src/tts/tts.ts +++ b/agents/src/tts/tts.ts @@ -4,7 +4,11 @@ import type { AudioFrame } from '@livekit/rtc-node'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import { EventEmitter } from 'node:events'; +import type { ReadableStream } from 'node:stream/web'; +import { log } from '../log.js'; import type { TTSMetrics } from '../metrics/base.js'; +import { DeferredReadableStream } from '../stream/deferred_stream.js'; +import { IdentityTransform } from '../stream/identity_transform.js'; import { AsyncIterableQueue, mergeFrames } from '../utils.js'; /** SynthesizedAudio is a packet of speech synthesis as returned by the TTS. */ @@ -105,13 +109,18 @@ export abstract class SynthesizeStream { protected static readonly FLUSH_SENTINEL = Symbol('FLUSH_SENTINEL'); static readonly END_OF_STREAM = Symbol('END_OF_STREAM'); - protected input = new AsyncIterableQueue(); - protected queue = new AsyncIterableQueue< + protected inputWriter: WritableStreamDefaultWriter< + string | typeof SynthesizeStream.FLUSH_SENTINEL + >; + protected inputReader: ReadableStreamDefaultReader< + string | typeof SynthesizeStream.FLUSH_SENTINEL + >; + protected outputWriter: WritableStreamDefaultWriter< SynthesizedAudio | typeof SynthesizeStream.END_OF_STREAM - >(); - protected output = new AsyncIterableQueue< + >; + protected outputReader: ReadableStreamDefaultReader< SynthesizedAudio | typeof SynthesizeStream.END_OF_STREAM - >(); + >; protected closed = false; abstract label: string; #tts: TTS; @@ -119,8 +128,57 @@ export abstract class SynthesizeStream #metricsText = ''; #monitorMetricsTask?: Promise; + private deferredInputStream: DeferredReadableStream< + string | typeof SynthesizeStream.FLUSH_SENTINEL + >; + protected metricsStream: ReadableStream; + private input = new IdentityTransform(); + private output = new IdentityTransform< + SynthesizedAudio | typeof SynthesizeStream.END_OF_STREAM + >(); + private logger = log(); + private inputClosed = false; + constructor(tts: TTS) { this.#tts = tts; + this.deferredInputStream = new DeferredReadableStream(); + + this.inputWriter = this.input.writable.getWriter(); + this.inputReader = this.input.readable.getReader(); + this.outputWriter = this.output.writable.getWriter(); + + const [outputStream, metricsStream] = this.output.readable.tee(); + this.outputReader = outputStream.getReader(); + this.metricsStream = metricsStream; + + this.pumpDeferredStream(); + this.monitorMetrics(); + } + + /** + * Reads from the deferred input stream and forwards chunks to the input writer. + * + * Note: we can't just do this.deferredInputStream.stream.pipeTo(this.input.writable) + * because the inputWriter locks the this.input.writable stream. All writes must go through + * the inputWriter. + */ + private async pumpDeferredStream() { + const reader = this.deferredInputStream.stream.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done || value === SynthesizeStream.FLUSH_SENTINEL) { + break; + } + this.inputWriter.write(value); + } + } catch (error) { + this.logger.error(error, 'Error reading deferred input stream'); + } finally { + reader.releaseLock(); + this.flush(); + this.endInput(); + } } protected async monitorMetrics() { @@ -148,9 +206,11 @@ export abstract class SynthesizeStream } }; - for await (const audio of this.queue) { - this.output.put(audio); - if (audio === SynthesizeStream.END_OF_STREAM) continue; + const metricsReader = this.metricsStream.getReader(); + + while (true) { + const { done, value: audio } = await metricsReader.read(); + if (done || audio === SynthesizeStream.END_OF_STREAM) break; requestId = audio.requestId; if (!ttfb) { ttfb = process.hrtime.bigint() - startTime; @@ -164,23 +224,27 @@ export abstract class SynthesizeStream if (requestId) { emit(); } - this.output.close(); + } + + updateInputStream(text: ReadableStream) { + this.deferredInputStream.setSource(text); } /** Push a string of text to the TTS */ + /** @deprecated Use `updateInputStream` instead */ pushText(text: string) { if (!this.#monitorMetricsTask) { this.#monitorMetricsTask = this.monitorMetrics(); } this.#metricsText += text; - if (this.input.closed) { + if (this.inputClosed) { throw new Error('Input is closed'); } if (this.closed) { throw new Error('Stream is closed'); } - this.input.put(text); + this.inputWriter.write(text); } /** Flush the TTS, causing it to process all pending text */ @@ -189,34 +253,40 @@ export abstract class SynthesizeStream this.#metricsPendingTexts.push(this.#metricsText); this.#metricsText = ''; } - if (this.input.closed) { + if (this.inputClosed) { throw new Error('Input is closed'); } if (this.closed) { throw new Error('Stream is closed'); } - this.input.put(SynthesizeStream.FLUSH_SENTINEL); + this.inputWriter.write(SynthesizeStream.FLUSH_SENTINEL); } /** Mark the input as ended and forbid additional pushes */ endInput() { - if (this.input.closed) { + if (this.inputClosed) { throw new Error('Input is closed'); } if (this.closed) { throw new Error('Stream is closed'); } - this.input.close(); + this.inputClosed = true; + this.inputWriter.close(); } next(): Promise> { - return this.output.next(); + return this.outputReader.read().then(({ done, value }) => { + if (done) { + return { done: true, value: undefined }; + } + return { done: false, value }; + }); } /** Close both the input and output of the TTS stream */ close() { - this.input.close(); - this.output.close(); + this.inputWriter.close(); + this.outputWriter.close(); this.closed = true; } diff --git a/agents/src/vad.ts b/agents/src/vad.ts index 2e135df3..9060414e 100644 --- a/agents/src/vad.ts +++ b/agents/src/vad.ts @@ -84,8 +84,6 @@ export abstract class VAD extends (EventEmitter as new () => TypedEmitter { protected static readonly FLUSH_SENTINEL = Symbol('FLUSH_SENTINEL'); - protected input = new IdentityTransform(); - protected output = new IdentityTransform(); protected inputWriter: WritableStreamDefaultWriter; protected inputReader: ReadableStreamDefaultReader; protected outputWriter: WritableStreamDefaultWriter; @@ -97,7 +95,8 @@ export abstract class VADStream implements AsyncIterableIterator { #lastActivityTime = BigInt(0); private logger = log(); private deferredInputStream: DeferredReadableStream; - + private input = new IdentityTransform(); + private output = new IdentityTransform(); private metricsStream: ReadableStream; constructor(vad: VAD) { this.#vad = vad; From f6de006146446e6e90891ee87889482332af8fbf Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Wed, 28 May 2025 21:25:35 -0400 Subject: [PATCH 2/9] init --- agents/src/tts/tts.ts | 11 ++++------- agents/src/vad.ts | 6 ++++-- examples/src/pipeline_voice_agent.ts | 3 ++- plugins/cartesia/src/tts.ts | 8 +++++--- plugins/elevenlabs/src/tts.ts | 10 ++++++---- plugins/neuphonic/src/tts.ts | 8 +++++--- plugins/resemble/src/tts.ts | 14 ++++++++------ 7 files changed, 34 insertions(+), 26 deletions(-) diff --git a/agents/src/tts/tts.ts b/agents/src/tts/tts.ts index 51112ca4..79646207 100644 --- a/agents/src/tts/tts.ts +++ b/agents/src/tts/tts.ts @@ -109,18 +109,12 @@ export abstract class SynthesizeStream { protected static readonly FLUSH_SENTINEL = Symbol('FLUSH_SENTINEL'); static readonly END_OF_STREAM = Symbol('END_OF_STREAM'); - protected inputWriter: WritableStreamDefaultWriter< - string | typeof SynthesizeStream.FLUSH_SENTINEL - >; protected inputReader: ReadableStreamDefaultReader< string | typeof SynthesizeStream.FLUSH_SENTINEL >; protected outputWriter: WritableStreamDefaultWriter< SynthesizedAudio | typeof SynthesizeStream.END_OF_STREAM >; - protected outputReader: ReadableStreamDefaultReader< - SynthesizedAudio | typeof SynthesizeStream.END_OF_STREAM - >; protected closed = false; abstract label: string; #tts: TTS; @@ -136,6 +130,10 @@ export abstract class SynthesizeStream private output = new IdentityTransform< SynthesizedAudio | typeof SynthesizeStream.END_OF_STREAM >(); + private inputWriter: WritableStreamDefaultWriter; + private outputReader: ReadableStreamDefaultReader< + SynthesizedAudio | typeof SynthesizeStream.END_OF_STREAM + >; private logger = log(); private inputClosed = false; @@ -286,7 +284,6 @@ export abstract class SynthesizeStream /** Close both the input and output of the TTS stream */ close() { this.inputWriter.close(); - this.outputWriter.close(); this.closed = true; } diff --git a/agents/src/vad.ts b/agents/src/vad.ts index 9060414e..025a8a43 100644 --- a/agents/src/vad.ts +++ b/agents/src/vad.ts @@ -84,10 +84,9 @@ export abstract class VAD extends (EventEmitter as new () => TypedEmitter { protected static readonly FLUSH_SENTINEL = Symbol('FLUSH_SENTINEL'); - protected inputWriter: WritableStreamDefaultWriter; + protected inputReader: ReadableStreamDefaultReader; protected outputWriter: WritableStreamDefaultWriter; - protected outputReader: ReadableStreamDefaultReader; protected closed = false; protected inputClosed = false; @@ -98,6 +97,9 @@ export abstract class VADStream implements AsyncIterableIterator { private input = new IdentityTransform(); private output = new IdentityTransform(); private metricsStream: ReadableStream; + private outputReader: ReadableStreamDefaultReader; + private inputWriter: WritableStreamDefaultWriter; + constructor(vad: VAD) { this.#vad = vad; this.deferredInputStream = new DeferredReadableStream(); diff --git a/examples/src/pipeline_voice_agent.ts b/examples/src/pipeline_voice_agent.ts index 0ae14a04..8621b2dd 100644 --- a/examples/src/pipeline_voice_agent.ts +++ b/examples/src/pipeline_voice_agent.ts @@ -12,6 +12,7 @@ import { pipeline, } from '@livekit/agents'; import * as deepgram from '@livekit/agents-plugin-deepgram'; +import * as elevenlabs from '@livekit/agents-plugin-elevenlabs'; import * as livekit from '@livekit/agents-plugin-livekit'; import * as openai from '@livekit/agents-plugin-openai'; import * as silero from '@livekit/agents-plugin-silero'; @@ -59,7 +60,7 @@ export default defineAgent({ vad, new deepgram.STT(), new openai.LLM(), - new openai.TTS(), + new elevenlabs.TTS(), { chatCtx: initialContext, fncCtx, turnDetector: new livekit.turnDetector.EOUModel() }, ); agent.start(ctx.room, participant); diff --git a/plugins/cartesia/src/tts.ts b/plugins/cartesia/src/tts.ts index 28c991d7..a0546159 100644 --- a/plugins/cartesia/src/tts.ts +++ b/plugins/cartesia/src/tts.ts @@ -178,7 +178,9 @@ export class SynthesizeStream extends tts.SynthesizeStream { }; const inputTask = async () => { - for await (const data of this.input) { + while (true) { + const { done, value: data } = await this.inputReader.read(); + if (done) break; if (data === SynthesizeStream.FLUSH_SENTINEL) { this.#tokenizer.flush(); continue; @@ -195,7 +197,7 @@ export class SynthesizeStream extends tts.SynthesizeStream { let lastFrame: AudioFrame | undefined; const sendLastFrame = (segmentId: string, final: boolean) => { if (lastFrame) { - this.queue.put({ requestId, segmentId, frame: lastFrame, final }); + this.outputWriter.write({ requestId, segmentId, frame: lastFrame, final }); lastFrame = undefined; } }; @@ -215,7 +217,7 @@ export class SynthesizeStream extends tts.SynthesizeStream { lastFrame = frame; } sendLastFrame(segmentId, true); - this.queue.put(SynthesizeStream.END_OF_STREAM); + this.outputWriter.write(SynthesizeStream.END_OF_STREAM); if (segmentId === requestId) { closing = true; diff --git a/plugins/elevenlabs/src/tts.ts b/plugins/elevenlabs/src/tts.ts index eb80664c..4d796c35 100644 --- a/plugins/elevenlabs/src/tts.ts +++ b/plugins/elevenlabs/src/tts.ts @@ -148,7 +148,9 @@ export class SynthesizeStream extends tts.SynthesizeStream { const tokenizeInput = async () => { let stream: tokenize.WordStream | null = null; - for await (const text of this.input) { + while (true) { + const { done, value: text } = await this.inputReader.read(); + if (done) break; if (text === SynthesizeStream.FLUSH_SENTINEL) { stream?.endInput(); stream = null; @@ -166,7 +168,7 @@ export class SynthesizeStream extends tts.SynthesizeStream { const runStream = async () => { for await (const stream of segments) { await this.#runWS(stream); - this.queue.put(SynthesizeStream.END_OF_STREAM); + this.outputWriter.write(SynthesizeStream.END_OF_STREAM); } }; @@ -246,7 +248,7 @@ export class SynthesizeStream extends tts.SynthesizeStream { let lastFrame: AudioFrame | undefined; const sendLastFrame = (segmentId: string, final: boolean) => { if (lastFrame) { - this.queue.put({ requestId, segmentId, frame: lastFrame, final }); + this.outputWriter.write({ requestId, segmentId, frame: lastFrame, final }); lastFrame = undefined; } }; @@ -278,7 +280,7 @@ export class SynthesizeStream extends tts.SynthesizeStream { lastFrame = frame; } sendLastFrame(segmentId, true); - this.queue.put(SynthesizeStream.END_OF_STREAM); + this.outputWriter.write(SynthesizeStream.END_OF_STREAM); if (segmentId === requestId) { ws.close(); diff --git a/plugins/neuphonic/src/tts.ts b/plugins/neuphonic/src/tts.ts index 4636da75..99c815e4 100644 --- a/plugins/neuphonic/src/tts.ts +++ b/plugins/neuphonic/src/tts.ts @@ -156,7 +156,9 @@ export class SynthesizeStream extends tts.SynthesizeStream { let closing = false; const sendTask = async (ws: WebSocket) => { - for await (const data of this.input) { + while (true) { + const { done, value: data } = await this.inputReader.read(); + if (done) break; if (data === SynthesizeStream.FLUSH_SENTINEL) { ws.send(JSON.stringify({ text: '' })); continue; @@ -172,7 +174,7 @@ export class SynthesizeStream extends tts.SynthesizeStream { let lastFrame: AudioFrame | undefined; const sendLastFrame = (segmentId: string, final: boolean) => { if (lastFrame) { - this.queue.put({ requestId, segmentId, frame: lastFrame, final }); + this.outputWriter.write({ requestId, segmentId, frame: lastFrame, final }); lastFrame = undefined; } }; @@ -195,7 +197,7 @@ export class SynthesizeStream extends tts.SynthesizeStream { lastFrame = frame; } sendLastFrame(requestId, true); - this.queue.put(SynthesizeStream.END_OF_STREAM); + this.outputWriter.write(SynthesizeStream.END_OF_STREAM); closing = true; ws.close(); diff --git a/plugins/resemble/src/tts.ts b/plugins/resemble/src/tts.ts index 59798049..deb26f98 100644 --- a/plugins/resemble/src/tts.ts +++ b/plugins/resemble/src/tts.ts @@ -187,7 +187,9 @@ export class SynthesizeStream extends tts.SynthesizeStream { }; const inputTask = async () => { - for await (const data of this.input) { + while (true) { + const { done, value: data } = await this.inputReader.read(); + if (done) break; if (data === SynthesizeStream.FLUSH_SENTINEL) { this.#tokenizer.flush(); continue; @@ -204,7 +206,7 @@ export class SynthesizeStream extends tts.SynthesizeStream { let lastFrame: AudioFrame | undefined; const sendLastFrame = (segmentId: string, final: boolean) => { if (lastFrame) { - this.queue.put({ requestId, segmentId, frame: lastFrame, final }); + this.outputWriter.write({ requestId, segmentId, frame: lastFrame, final }); lastFrame = undefined; } }; @@ -234,7 +236,7 @@ export class SynthesizeStream extends tts.SynthesizeStream { activeRequests.delete(Number(segmentId)); if (activeRequests.size === 0 && this.#tokenizer.closed) { - this.queue.put(SynthesizeStream.END_OF_STREAM); + this.outputWriter.write(SynthesizeStream.END_OF_STREAM); closing = true; ws.close(); } @@ -245,7 +247,7 @@ export class SynthesizeStream extends tts.SynthesizeStream { closing = true; ws.close(); - this.queue.put(SynthesizeStream.END_OF_STREAM); + this.outputWriter.write(SynthesizeStream.END_OF_STREAM); } } catch (error) { this.#logger.error(`Error parsing WebSocket message: ${error}`); @@ -256,7 +258,7 @@ export class SynthesizeStream extends tts.SynthesizeStream { this.#logger.error(`WebSocket error: ${error}`); if (!closing) { closing = true; - this.queue.put(SynthesizeStream.END_OF_STREAM); + this.outputWriter.write(SynthesizeStream.END_OF_STREAM); ws.close(); } }); @@ -264,7 +266,7 @@ export class SynthesizeStream extends tts.SynthesizeStream { ws.on('close', (code, reason) => { if (!closing) { this.#logger.error(`WebSocket closed with code ${code}: ${reason}`); - this.queue.put(SynthesizeStream.END_OF_STREAM); + this.outputWriter.write(SynthesizeStream.END_OF_STREAM); } ws.removeAllListeners(); }); From 20d1aeb6eb20a8baa6facddc75f490eae110007a Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Wed, 28 May 2025 21:35:46 -0400 Subject: [PATCH 3/9] SyntesizeStream working --- agents/src/tts/tts.ts | 8 +++----- agents/src/vad.ts | 4 ++-- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/agents/src/tts/tts.ts b/agents/src/tts/tts.ts index 79646207..3686e537 100644 --- a/agents/src/tts/tts.ts +++ b/agents/src/tts/tts.ts @@ -120,7 +120,6 @@ export abstract class SynthesizeStream #tts: TTS; #metricsPendingTexts: string[] = []; #metricsText = ''; - #monitorMetricsTask?: Promise; private deferredInputStream: DeferredReadableStream< string | typeof SynthesizeStream.FLUSH_SENTINEL @@ -231,9 +230,6 @@ export abstract class SynthesizeStream /** Push a string of text to the TTS */ /** @deprecated Use `updateInputStream` instead */ pushText(text: string) { - if (!this.#monitorMetricsTask) { - this.#monitorMetricsTask = this.monitorMetrics(); - } this.#metricsText += text; if (this.inputClosed) { @@ -283,7 +279,9 @@ export abstract class SynthesizeStream /** Close both the input and output of the TTS stream */ close() { - this.inputWriter.close(); + if (!this.inputClosed) { + this.inputWriter.close(); + } this.closed = true; } diff --git a/agents/src/vad.ts b/agents/src/vad.ts index 025a8a43..c0a0c52b 100644 --- a/agents/src/vad.ts +++ b/agents/src/vad.ts @@ -208,7 +208,7 @@ export abstract class VADStream implements AsyncIterableIterator { throw new Error('Stream is closed'); } this.inputClosed = true; - this.input.writable.close(); + this.inputWriter.close(); } async next(): Promise> { @@ -221,7 +221,7 @@ export abstract class VADStream implements AsyncIterableIterator { } close() { - this.input.writable.close(); + this.inputWriter.close(); this.closed = true; } From 0b64413ea02afe87e5adf58eb0e41c7a661d1c60 Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Wed, 28 May 2025 21:51:22 -0400 Subject: [PATCH 4/9] refactor chunk stream --- agents/src/tts/tts.ts | 35 ++++++++++++++++++++-------- examples/src/pipeline_voice_agent.ts | 3 +-- plugins/cartesia/src/tts.ts | 6 ++--- plugins/neuphonic/src/tts.ts | 6 ++--- plugins/openai/src/tts.ts | 4 ++-- plugins/resemble/src/tts.ts | 10 ++++---- 6 files changed, 39 insertions(+), 25 deletions(-) diff --git a/agents/src/tts/tts.ts b/agents/src/tts/tts.ts index 3686e537..a3e32279 100644 --- a/agents/src/tts/tts.ts +++ b/agents/src/tts/tts.ts @@ -9,7 +9,7 @@ import { log } from '../log.js'; import type { TTSMetrics } from '../metrics/base.js'; import { DeferredReadableStream } from '../stream/deferred_stream.js'; import { IdentityTransform } from '../stream/identity_transform.js'; -import { AsyncIterableQueue, mergeFrames } from '../utils.js'; +import { mergeFrames } from '../utils.js'; /** SynthesizedAudio is a packet of speech synthesis as returned by the TTS. */ export interface SynthesizedAudio { @@ -305,17 +305,26 @@ export abstract class SynthesizeStream * exports its own child ChunkedStream class, which inherits this class's methods. */ export abstract class ChunkedStream implements AsyncIterableIterator { - protected queue = new AsyncIterableQueue(); - protected output = new AsyncIterableQueue(); + protected outputWriter: WritableStreamDefaultWriter< + SynthesizedAudio | typeof SynthesizeStream.END_OF_STREAM + >; protected closed = false; abstract label: string; #text: string; #tts: TTS; + private output = new IdentityTransform(); + private outputReader: ReadableStreamDefaultReader; + private metricsStream: ReadableStream; constructor(text: string, tts: TTS) { this.#text = text; this.#tts = tts; + this.outputWriter = this.output.writable.getWriter(); + const [outputStream, metricsStream] = this.output.readable.tee(); + this.outputReader = outputStream.getReader(); + this.metricsStream = metricsStream; + this.monitorMetrics(); } @@ -325,15 +334,18 @@ export abstract class ChunkedStream implements AsyncIterableIterator> { - return this.output.next(); + async next(): Promise> { + const { done, value } = await this.outputReader.read(); + if (done) { + return { done: true, value: undefined }; + } + return { done: false, value }; } /** Close both the input and output of the TTS stream */ close() { - this.queue.close(); - this.output.close(); + this.outputWriter.close(); this.closed = true; } diff --git a/examples/src/pipeline_voice_agent.ts b/examples/src/pipeline_voice_agent.ts index 8621b2dd..0ae14a04 100644 --- a/examples/src/pipeline_voice_agent.ts +++ b/examples/src/pipeline_voice_agent.ts @@ -12,7 +12,6 @@ import { pipeline, } from '@livekit/agents'; import * as deepgram from '@livekit/agents-plugin-deepgram'; -import * as elevenlabs from '@livekit/agents-plugin-elevenlabs'; import * as livekit from '@livekit/agents-plugin-livekit'; import * as openai from '@livekit/agents-plugin-openai'; import * as silero from '@livekit/agents-plugin-silero'; @@ -60,7 +59,7 @@ export default defineAgent({ vad, new deepgram.STT(), new openai.LLM(), - new elevenlabs.TTS(), + new openai.TTS(), { chatCtx: initialContext, fncCtx, turnDetector: new livekit.turnDetector.EOUModel() }, ); agent.start(ctx.room, participant); diff --git a/plugins/cartesia/src/tts.ts b/plugins/cartesia/src/tts.ts index a0546159..08f6d81f 100644 --- a/plugins/cartesia/src/tts.ts +++ b/plugins/cartesia/src/tts.ts @@ -107,7 +107,7 @@ export class ChunkedStream extends tts.ChunkedStream { (res) => { res.on('data', (chunk) => { for (const frame of bstream.write(chunk)) { - this.queue.put({ + this.outputWriter.write({ requestId, frame, final: false, @@ -117,14 +117,14 @@ export class ChunkedStream extends tts.ChunkedStream { }); res.on('close', () => { for (const frame of bstream.flush()) { - this.queue.put({ + this.outputWriter.write({ requestId, frame, final: false, segmentId: requestId, }); } - this.queue.close(); + this.outputWriter.close(); }); }, ); diff --git a/plugins/neuphonic/src/tts.ts b/plugins/neuphonic/src/tts.ts index 99c815e4..4802e6d6 100644 --- a/plugins/neuphonic/src/tts.ts +++ b/plugins/neuphonic/src/tts.ts @@ -109,7 +109,7 @@ export class ChunkedStream extends tts.ChunkedStream { if (parsedMessage?.data?.audio) { for (const frame of bstream.write(parsedMessage.data.audio)) { - this.queue.put({ + this.outputWriter.write({ requestId, frame, final: false, @@ -123,14 +123,14 @@ export class ChunkedStream extends tts.ChunkedStream { }); res.on('close', () => { for (const frame of bstream.flush()) { - this.queue.put({ + this.outputWriter.write({ requestId, frame, final: false, segmentId: requestId, }); } - this.queue.close(); + this.outputWriter.close(); }); }, ); diff --git a/plugins/openai/src/tts.ts b/plugins/openai/src/tts.ts index 6fd91053..cde69fdd 100644 --- a/plugins/openai/src/tts.ts +++ b/plugins/openai/src/tts.ts @@ -97,7 +97,7 @@ export class ChunkedStream extends tts.ChunkedStream { let lastFrame: AudioFrame | undefined; const sendLastFrame = (segmentId: string, final: boolean) => { if (lastFrame) { - this.queue.put({ requestId, segmentId, frame: lastFrame, final }); + this.outputWriter.write({ requestId, segmentId, frame: lastFrame, final }); lastFrame = undefined; } }; @@ -108,6 +108,6 @@ export class ChunkedStream extends tts.ChunkedStream { } sendLastFrame(requestId, true); - this.queue.close(); + this.outputWriter.close(); } } diff --git a/plugins/resemble/src/tts.ts b/plugins/resemble/src/tts.ts index deb26f98..dcf9984d 100644 --- a/plugins/resemble/src/tts.ts +++ b/plugins/resemble/src/tts.ts @@ -116,7 +116,7 @@ export class ChunkedStream extends tts.ChunkedStream { const audioBytes = Buffer.from(audioContentB64, 'base64'); for (const frame of bstream.write(audioBytes)) { - this.queue.put({ + this.outputWriter.write({ requestId, frame, final: false, @@ -125,7 +125,7 @@ export class ChunkedStream extends tts.ChunkedStream { } for (const frame of bstream.flush()) { - this.queue.put({ + this.outputWriter.write({ requestId, frame, final: false, @@ -133,16 +133,16 @@ export class ChunkedStream extends tts.ChunkedStream { }); } - this.queue.close(); + this.outputWriter.close(); } catch (error) { this.#logger.error('Error processing Resemble API response:', error); - this.queue.close(); + this.outputWriter.close(); } }); res.on('error', (error) => { this.#logger.error('Resemble API error:', error); - this.queue.close(); + this.outputWriter.close(); }); }, ); From 1b5d49a0724c8c3e0189af8eba0b0caccb3277ee Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Wed, 28 May 2025 21:53:28 -0400 Subject: [PATCH 5/9] bugfix --- agents/src/vad.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/agents/src/vad.ts b/agents/src/vad.ts index c0a0c52b..bdc904ac 100644 --- a/agents/src/vad.ts +++ b/agents/src/vad.ts @@ -221,6 +221,7 @@ export abstract class VADStream implements AsyncIterableIterator { } close() { + if (!this.inputClosed) { this.inputWriter.close(); this.closed = true; } From e537cc97f1fd833339a795783acb535227703a1c Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Wed, 28 May 2025 21:55:22 -0400 Subject: [PATCH 6/9] bugfix --- agents/src/vad.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/agents/src/vad.ts b/agents/src/vad.ts index bdc904ac..8ff1798a 100644 --- a/agents/src/vad.ts +++ b/agents/src/vad.ts @@ -222,7 +222,8 @@ export abstract class VADStream implements AsyncIterableIterator { close() { if (!this.inputClosed) { - this.inputWriter.close(); + this.inputWriter.close(); + } this.closed = true; } From 497369dfd52e512e13ffbfdfe4146bb633a1edbc Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Wed, 28 May 2025 22:05:33 -0400 Subject: [PATCH 7/9] fix tests --- agents/src/tts/tts.ts | 4 +++- plugins/cartesia/src/tts.ts | 2 +- plugins/neuphonic/src/tts.ts | 2 +- plugins/openai/src/tts.ts | 2 +- plugins/resemble/src/tts.ts | 6 +++--- 5 files changed, 9 insertions(+), 7 deletions(-) diff --git a/agents/src/tts/tts.ts b/agents/src/tts/tts.ts index a3e32279..26bbf0e6 100644 --- a/agents/src/tts/tts.ts +++ b/agents/src/tts/tts.ts @@ -381,7 +381,9 @@ export abstract class ChunkedStream implements AsyncIterableIterator { this.#logger.error('Resemble API error:', error); - this.outputWriter.close(); + this.close(); }); }, ); From 72816c4af1b9860f28fd71384c3408d9e2b94b67 Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Thu, 29 May 2025 16:22:28 -0400 Subject: [PATCH 8/9] nit --- agents/src/tts/tts.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agents/src/tts/tts.ts b/agents/src/tts/tts.ts index 26bbf0e6..35c5b989 100644 --- a/agents/src/tts/tts.ts +++ b/agents/src/tts/tts.ts @@ -115,16 +115,16 @@ export abstract class SynthesizeStream protected outputWriter: WritableStreamDefaultWriter< SynthesizedAudio | typeof SynthesizeStream.END_OF_STREAM >; - protected closed = false; abstract label: string; #tts: TTS; #metricsPendingTexts: string[] = []; #metricsText = ''; + private closed = false; private deferredInputStream: DeferredReadableStream< string | typeof SynthesizeStream.FLUSH_SENTINEL >; - protected metricsStream: ReadableStream; + private metricsStream: ReadableStream; private input = new IdentityTransform(); private output = new IdentityTransform< SynthesizedAudio | typeof SynthesizeStream.END_OF_STREAM From 89fb3e304fe897f6c3abae60014fc0a8a0dc634a Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Thu, 29 May 2025 16:39:01 -0400 Subject: [PATCH 9/9] bugfix --- agents/src/tts/tts.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agents/src/tts/tts.ts b/agents/src/tts/tts.ts index 35c5b989..da66799e 100644 --- a/agents/src/tts/tts.ts +++ b/agents/src/tts/tts.ts @@ -115,12 +115,12 @@ export abstract class SynthesizeStream protected outputWriter: WritableStreamDefaultWriter< SynthesizedAudio | typeof SynthesizeStream.END_OF_STREAM >; + protected closed = false; abstract label: string; #tts: TTS; #metricsPendingTexts: string[] = []; #metricsText = ''; - private closed = false; private deferredInputStream: DeferredReadableStream< string | typeof SynthesizeStream.FLUSH_SENTINEL >;