Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
name: Build
on:
push:
branches: [next, main]
branches: [next, main, dev-1.0]
pull_request:
branches: [next, main]
branches: [next, main, dev-1.0]

jobs:
reuse:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
name: Test
on:
push:
branches: [next, main]
branches: [next, main, dev-1.0]
pull_request:
branches: [next, main]
branches: [next, main, dev-1.0]

jobs:
build:
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,6 @@ docs

# direnv
.direnv

# vscode workspace config
agents-js.code-workspace
3 changes: 2 additions & 1 deletion agents/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import * as pipeline from './pipeline/index.js';
import * as stt from './stt/index.js';
import * as tokenize from './tokenize/index.js';
import * as tts from './tts/index.js';
import * as voice from './voice/index.js';

export * from './vad.js';
export * from './plugin.js';
Expand All @@ -31,4 +32,4 @@ export * from './audio.js';
export * from './transcription.js';
export * from './inference_runner.js';

export { cli, stt, tts, llm, pipeline, multimodal, tokenize, metrics, ipc };
export { cli, stt, tts, llm, pipeline, multimodal, tokenize, metrics, ipc, voice };
27 changes: 27 additions & 0 deletions agents/src/stream/deferred_stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { type ReadableStream } from 'node:stream/web';
import { IdentityTransform } from './identity_transform.js';

export class DeferredReadableStream<T> {
private transform: IdentityTransform<T>;

get stream() {
return this.transform.readable;
}

constructor() {
this.transform = new IdentityTransform<T>();
}

/**
* Call once the actual source is ready.
*/
setSource(source: ReadableStream<T>) {
if (this.transform.writable.locked) {
throw new Error('Stream is already locked');
}
source.pipeTo(this.transform.writable);
}
}
12 changes: 12 additions & 0 deletions agents/src/stream/identity_transform.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { TransformStream } from 'node:stream/web';

export class IdentityTransform<T> extends TransformStream<T, T> {
constructor() {
super({
transform: (chunk, controller) => controller.enqueue(chunk),
});
}
}
15 changes: 9 additions & 6 deletions agents/src/stt/stream_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,14 @@ export class StreamAdapterWrapper extends SpeechStream {

async #run() {
const forwardInput = async () => {
for await (const input of this.input) {
if (input === SpeechStream.FLUSH_SENTINEL) {
while (true) {
const { done, value } = await this.inputReader.read();
if (done) break;

if (value === SpeechStream.FLUSH_SENTINEL) {
this.#vadStream.flush();
} else {
this.#vadStream.pushFrame(input);
this.#vadStream.pushFrame(value);
}
}
this.#vadStream.endInput();
Expand All @@ -67,18 +70,18 @@ export class StreamAdapterWrapper extends SpeechStream {
for await (const ev of this.#vadStream) {
switch (ev.type) {
case VADEventType.START_OF_SPEECH:
this.output.put({ type: SpeechEventType.START_OF_SPEECH });
this.outputWriter.write({ type: SpeechEventType.START_OF_SPEECH });
break;
case VADEventType.END_OF_SPEECH:
this.output.put({ type: SpeechEventType.END_OF_SPEECH });
this.outputWriter.write({ type: SpeechEventType.END_OF_SPEECH });

try {
const event = await this.#stt.recognize(ev.frames);
if (!event.alternatives![0].text) {
continue;
}

this.output.put(event);
this.outputWriter.write(event);
break;
} catch (error) {
let logger = log();
Expand Down
111 changes: 88 additions & 23 deletions agents/src/stt/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@
import type { AudioFrame } from '@livekit/rtc-node';
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
import { EventEmitter } from 'node:events';
import type {
ReadableStream,
ReadableStreamDefaultReader,
WritableStreamDefaultWriter,
} from 'node:stream/web';
import { log } from '../log.js';
import type { STTMetrics } from '../metrics/base.js';
import { DeferredReadableStream } from '../stream/deferred_stream.js';
import { IdentityTransform } from '../stream/identity_transform.js';
import type { AudioBuffer } from '../utils.js';
import { AsyncIterableQueue } from '../utils.js';

/** Indicates start/middle/end of speech */
export enum SpeechEventType {
Expand Down Expand Up @@ -137,80 +144,138 @@ export abstract class STT extends (EventEmitter as new () => TypedEmitter<STTCal
*/
export abstract class SpeechStream implements AsyncIterableIterator<SpeechEvent> {
protected static readonly FLUSH_SENTINEL = Symbol('FLUSH_SENTINEL');
protected input = new AsyncIterableQueue<AudioFrame | typeof SpeechStream.FLUSH_SENTINEL>();
protected output = new AsyncIterableQueue<SpeechEvent>();
protected queue = new AsyncIterableQueue<SpeechEvent>();
abstract label: string;
protected input = new IdentityTransform<AudioFrame | typeof SpeechStream.FLUSH_SENTINEL>();
protected output = new IdentityTransform<SpeechEvent>();

protected inputReader: ReadableStreamDefaultReader<
AudioFrame | typeof SpeechStream.FLUSH_SENTINEL
>;
protected outputWriter: WritableStreamDefaultWriter<SpeechEvent>;
protected closed = false;
protected inputClosed = false;
abstract label: string;
#stt: STT;
private deferredInputStream: DeferredReadableStream<AudioFrame>;
private logger = log();
private inputWriter: WritableStreamDefaultWriter<AudioFrame | typeof SpeechStream.FLUSH_SENTINEL>;
private outputReader: ReadableStreamDefaultReader<SpeechEvent>;
private metricsStream: ReadableStream<SpeechEvent>;

constructor(stt: STT) {
this.#stt = stt;
this.deferredInputStream = new DeferredReadableStream<AudioFrame>();

this.inputWriter = this.input.writable.getWriter();
this.inputReader = this.input.readable.getReader();
this.outputWriter = this.output.writable.getWriter();

const [outputStream, metricsStream] = this.output.readable.tee();
this.metricsStream = metricsStream;
this.outputReader = outputStream.getReader();

this.pumpDeferredStream();
this.monitorMetrics();
}

/**
* Reads from the deferred input stream and forwards chunks to the input writer.
*
* Note: we can't just do this.deferredInputStream.stream.pipeTo(this.input.writable)
* because the inputWriter locks the this.input.writable stream. All writes must go through
* the inputWriter.
*/
private async pumpDeferredStream() {
const reader = this.deferredInputStream.stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
await this.inputWriter.write(value);
}
} catch (e) {
this.logger.error(`Error pumping deferred stream: ${e}`);
throw e;
} finally {
reader.releaseLock();
}
}

protected async monitorMetrics() {
const startTime = process.hrtime.bigint();
const metricsReader = this.metricsStream.getReader();

while (true) {
const { done, value } = await metricsReader.read();
if (done) {
break;
}

if (value.type !== SpeechEventType.RECOGNITION_USAGE) continue;

for await (const event of this.queue) {
this.output.put(event);
if (event.type !== SpeechEventType.RECOGNITION_USAGE) continue;
const duration = process.hrtime.bigint() - startTime;
const metrics: STTMetrics = {
timestamp: Date.now(),
requestId: event.requestId!,
requestId: value.requestId!,
duration: Math.trunc(Number(duration / BigInt(1000000))),
label: this.label,
audioDuration: event.recognitionUsage!.audioDuration,
audioDuration: value.recognitionUsage!.audioDuration,
streamed: true,
};
this.#stt.emit(SpeechEventType.METRICS_COLLECTED, metrics);
}
this.output.close();
}

/** Push an audio frame to the STT */
updateInputStream(audioStream: ReadableStream<AudioFrame>) {
this.deferredInputStream.setSource(audioStream);
}

/** @deprecated Use `updateInputStream` instead */
pushFrame(frame: AudioFrame) {
if (this.input.closed) {
// TODO: remove this method in future version
if (this.inputClosed) {
throw new Error('Input is closed');
}
if (this.closed) {
throw new Error('Stream is closed');
}
this.input.put(frame);
this.inputWriter.write(frame);
}

/** Flush the STT, causing it to process all pending text */
flush() {
if (this.input.closed) {
if (this.inputClosed) {
throw new Error('Input is closed');
}
if (this.closed) {
throw new Error('Stream is closed');
}
this.input.put(SpeechStream.FLUSH_SENTINEL);
this.inputWriter.write(SpeechStream.FLUSH_SENTINEL);
}

/** Mark the input as ended and forbid additional pushes */
endInput() {
if (this.input.closed) {
if (this.inputClosed) {
throw new Error('Input is closed');
}
if (this.closed) {
throw new Error('Stream is closed');
}
this.input.close();
this.inputClosed = true;
this.inputWriter.close();
}

next(): Promise<IteratorResult<SpeechEvent>> {
return this.output.next();
async next(): Promise<IteratorResult<SpeechEvent>> {
return this.outputReader.read().then(({ done, value }) => {
if (done) {
return { done: true, value: undefined };
}
return { done: false, value };
});
}

/** Close both the input and output of the STT stream */
close() {
this.input.close();
this.queue.close();
this.output.close();
this.input.writable.close();
this.closed = true;
}

Expand Down
12 changes: 6 additions & 6 deletions agents/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ export class Queue<T> {
}

/** @internal */
export class Future {
#await: Promise<void>;
#resolvePromise!: () => void;
export class Future<T = void> {
#await: Promise<T>;
#resolvePromise!: (value: T) => void;
#rejectPromise!: (error: Error) => void;
#done: boolean = false;

constructor() {
this.#await = new Promise<void>((resolve, reject) => {
this.#await = new Promise<T>((resolve, reject) => {
this.#resolvePromise = resolve;
this.#rejectPromise = reject;
});
Expand All @@ -138,9 +138,9 @@ export class Future {
return this.#done;
}

resolve() {
resolve(value: T) {
this.#done = true;
this.#resolvePromise();
this.#resolvePromise(value);
}

reject(error: Error) {
Expand Down
Loading
Loading