diff --git a/agent-sdk/agent-session/Agent.ts b/agent-sdk/agent-session/Agent.ts new file mode 100644 index 000000000..48c77aa85 --- /dev/null +++ b/agent-sdk/agent-session/Agent.ts @@ -0,0 +1,440 @@ +import { ConnectionState, ParticipantEvent, ParticipantKind, RemoteParticipant, RemoteTrackPublication, Room, RoomEvent, Track } from 'livekit-client'; +import type TypedEventEmitter from 'typed-emitter'; +import { EventEmitter } from 'events'; +import { getParticipantTrackRefs, participantTrackEvents, roomTrackEvents } from '@/agent-sdk/external-deps/components-js'; +import { ParticipantEventCallbacks, RoomEventCallbacks } from '@/agent-sdk/external-deps/client-sdk-js'; +import { ParticipantAttributes } from '@/agent-sdk/lib/participant-attributes'; +import { createRemoteTrack, RemoteTrackInstance } from './RemoteTrack'; +import { createScopedGetSet } from '../lib/scoped-get-set'; + +/** State representing the current status of the agent, whether it is ready for speach, etc */ +export type AgentConversationalState = 'disconnected' | 'initializing' | 'idle' | 'listening' | 'thinking' | 'speaking'; + +export enum AgentEvent { + CameraChanged = 'cameraChanged', + MicrophoneChanged = 'microphoneChanged', + AttributesChanged = 'attributesChanged', + ConversationalStateChanged = 'conversationalStateChanged', +} + +export type AgentCallbacks = { + [AgentEvent.CameraChanged]: (newTrack: RemoteTrackInstance | null) => void; + [AgentEvent.MicrophoneChanged]: (newTrack: RemoteTrackInstance | null) => void; + [AgentEvent.AttributesChanged]: (newAttributes: Record) => void; + [AgentEvent.ConversationalStateChanged]: (newAgentConversationalState: AgentConversationalState) => void; +}; + + + +type AgentInstanceCommon = { + [Symbol.toStringTag]: "AgentInstance"; + + /** Returns a promise that resolves once the agent is available for interaction */ + waitUntilAvailable: (signal?: AbortSignal) => Promise; + + /** Returns a promise that resolves once the agent has published a camera track */ + waitUntilCamera: (signal?: AbortSignal) => Promise; + + /** Returns a promise that resolves once the agent has published a microphone track */ + waitUntilMicrophone: (signal?: AbortSignal) => Promise; + + // FIXME: maybe add some sort of schema to this? + attributes: Record; + + subtle: { + emitter: TypedEventEmitter; + initialize: () => void; + teardown: () => void; + + agentParticipant: RemoteParticipant | null; + workerParticipant: RemoteParticipant | null; + }; +}; + +type AgentInstanceAvailable = AgentInstanceCommon & { + conversationalState: "listening" | "thinking" | "speaking"; + + /** Is the agent ready for user interaction? */ + isAvailable: true; + + camera: RemoteTrackInstance | null; + microphone: RemoteTrackInstance | null; +}; + +type AgentInstanceUnAvailable = AgentInstanceCommon & { + conversationalState: "disconnected" | "initializing" | "idle"; + + /** Is the agent ready for user interaction? */ + isAvailable: false; + + camera: null; + microphone: null; +}; + +export type AgentInstance = AgentInstanceAvailable | AgentInstanceUnAvailable; + +/** + * Agent encapculates all agent state, normalizing some quirks around how LiveKit Agents work. + */ +export function createAgent( + room: Room, + get: () => AgentInstance, + set: (fn: (old: AgentInstance) => AgentInstance) => void, +): AgentInstance { + const emitter = new EventEmitter() as TypedEventEmitter; + + const handleParticipantConnected = () => { + updateParticipants(); + }; + + const handleParticipantDisconnected = () => { + updateParticipants(); + }; + + const handleConnectionStateChanged = () => { + set((old) => generateConversationalStateUpdate(old, old.camera, old.microphone)); + }; + + const handleLocalParticipantTrackPublished = () => { + set((old) => generateConversationalStateUpdate(old, old.camera, old.microphone)); + }; + + const initialize = () => { + set((old) => generateConversationalStateUpdate(old, old.camera, old.microphone)); + + updateParticipants(); + + room.on(RoomEvent.ParticipantConnected, handleParticipantConnected); + room.on(RoomEvent.ParticipantDisconnected, handleParticipantDisconnected); + room.on(RoomEvent.ConnectionStateChanged, handleConnectionStateChanged); + room.localParticipant.on(ParticipantEvent.TrackPublished, handleLocalParticipantTrackPublished) + }; + + const teardown = () => { + room.localParticipant.off(ParticipantEvent.TrackPublished, handleLocalParticipantTrackPublished) + room.off(RoomEvent.ConnectionStateChanged, handleConnectionStateChanged); + room.off(RoomEvent.ParticipantDisconnected, handleParticipantDisconnected); + room.off(RoomEvent.ParticipantConnected, handleParticipantConnected); + + updateParticipants(); // Detaches any participant related event handlers + + get().camera?.subtle.teardown(); + get().microphone?.subtle.teardown(); + set((old) => generateConversationalStateUpdate(old, null, null)); + }; + + const waitUntilAvailable = async (signal?: AbortSignal) => { + return new Promise((resolve, reject) => { + const stateChangedHandler = () => { + if (!get().isAvailable) { + return; + } + cleanup(); + resolve(); + }; + const abortHandler = () => { + cleanup(); + reject(new Error('AgentInstance.waitUntilAgentIsAvailable - signal aborted')); + }; + + const cleanup = () => { + emitter.off(AgentEvent.ConversationalStateChanged, stateChangedHandler); + signal?.removeEventListener('abort', abortHandler); + }; + + emitter.on(AgentEvent.ConversationalStateChanged, stateChangedHandler); + signal?.addEventListener('abort', abortHandler); + }); + }; + + const waitUntilMediaTrack = async (trackType: 'camera' | 'microphone', signal?: AbortSignal) => { + return new Promise((resolve, reject) => { + const stateChangedHandler = () => { + if (!get()[trackType]) { + return; + } + cleanup(); + resolve(); + }; + const abortHandler = () => { + cleanup(); + reject(new Error('AgentInstance.waitUntilMediaTrack - signal aborted')); + }; + + const cleanup = () => { + switch (trackType) { + case 'camera': + emitter.off(AgentEvent.CameraChanged, stateChangedHandler); + break; + case 'microphone': + emitter.off(AgentEvent.MicrophoneChanged, stateChangedHandler); + break; + } + signal?.removeEventListener('abort', abortHandler); + }; + + switch (trackType) { + case 'camera': + emitter.on(AgentEvent.CameraChanged, stateChangedHandler); + break; + case 'microphone': + emitter.on(AgentEvent.MicrophoneChanged, stateChangedHandler); + break; + } + signal?.addEventListener('abort', abortHandler); + }); + }; + const waitUntilCamera = (signal?: AbortSignal) => waitUntilMediaTrack('camera', signal); + const waitUntilMicrophone = (signal?: AbortSignal) => waitUntilMediaTrack('microphone', signal); + + const handleAttributesChanged = (attributes: Record) => { + set((old) => ({ ...old, attributes })); + emitter.emit(AgentEvent.AttributesChanged, attributes); + + set((old) => generateConversationalStateUpdate(old, old.camera, old.microphone)); + }; + + const handleUpdateTracks = () => { + const { + camera: oldCamera, + microphone: oldMicrophone, + subtle: { agentParticipant, workerParticipant } + } = get(); + + const agentTracks = agentParticipant ? getParticipantTrackRefs( + agentParticipant, + { sources: [Track.Source.Microphone, Track.Source.Camera] } + ) : []; + const workerTracks = workerParticipant ? getParticipantTrackRefs( + workerParticipant, + { sources: [Track.Source.Microphone, Track.Source.Camera] } + ) : []; + + const newVideoTrack = ( + agentTracks.find((t) => t.source === Track.Source.Camera) ?? + workerTracks.find((t) => t.source === Track.Source.Camera) ?? null + ); + + let camera = oldCamera; + if (oldCamera?.subtle.publication !== newVideoTrack?.publication) { + if (newVideoTrack) { + const { get: cameraGet, set: cameraSet } = createScopedGetSet(get, set, 'camera', 'Agent'); + camera = createRemoteTrack({ + publication: newVideoTrack.publication as RemoteTrackPublication, + participant: newVideoTrack.participant, + }, cameraGet, cameraSet); + } else { + camera = null; + } + } + if (camera !== oldCamera) { + emitter.emit(AgentEvent.CameraChanged, camera); + } + + const newAudioTrack = ( + agentTracks.find((t) => t.source === Track.Source.Microphone) ?? + workerTracks.find((t) => t.source === Track.Source.Microphone) ?? null + ); + let microphone = oldMicrophone; + if (oldMicrophone?.subtle.publication !== newAudioTrack?.publication) { + if (newAudioTrack) { + const { get: microphoneGet, set: microphoneSet } = createScopedGetSet(get, set, 'microphone', 'Agent'); + microphone = createRemoteTrack({ + publication: newAudioTrack.publication as RemoteTrackPublication, + participant: newAudioTrack.participant, + }, microphoneGet, microphoneSet); + } else { + microphone = null; + } + } + if (microphone !== oldMicrophone) { + emitter.emit(AgentEvent.MicrophoneChanged, microphone); + } + + set((old) => generateConversationalStateUpdate(old, camera, microphone)); + + if (camera !== oldCamera) { + camera?.subtle.initialize(); + } + if (microphone !== oldMicrophone) { + microphone?.subtle.initialize(); + } + }; + + const updateParticipants = () => { + const { + agentParticipant: oldAgentParticipant, + workerParticipant: oldWorkerParticipant, + } = get().subtle; + + const roomRemoteParticipants = Array.from(room.remoteParticipants.values()); + const newAgentParticipant = roomRemoteParticipants.find( + (p) => p.kind === ParticipantKind.AGENT && !(ParticipantAttributes.publishOnBehalf in p.attributes), + ) ?? null; + const newWorkerParticipant = newAgentParticipant ? ( + roomRemoteParticipants.find( + (p) => + p.kind === ParticipantKind.AGENT && p.attributes[ParticipantAttributes.publishOnBehalf] === newAgentParticipant.identity, + ) ?? null + ) : null; + + // 1. Listen for attribute changes + if (oldAgentParticipant !== newAgentParticipant) { + oldAgentParticipant?.off(ParticipantEvent.AttributesChanged, handleAttributesChanged); + + if (newAgentParticipant) { + newAgentParticipant.on(ParticipantEvent.AttributesChanged, handleAttributesChanged); + handleAttributesChanged(newAgentParticipant.attributes); + } + } + + // 2. Listen for track updates + if (oldAgentParticipant !== newAgentParticipant) { + set((old) => ({ ...old, subtle: { ...old.subtle, agentParticipant: newAgentParticipant } })); + + for (const event of participantTrackEvents) { + oldAgentParticipant?.off(event as keyof ParticipantEventCallbacks, handleUpdateTracks); + } + for (const event of roomTrackEvents) { + room.off(event as keyof RoomEventCallbacks, handleUpdateTracks); + } + + if (newAgentParticipant) { + for (const event of participantTrackEvents) { + newAgentParticipant.on(event as keyof ParticipantEventCallbacks, handleUpdateTracks); + } + for (const event of roomTrackEvents) { + room.on(event as keyof RoomEventCallbacks, handleUpdateTracks); + } + handleUpdateTracks(); + } + } + if (oldWorkerParticipant !== newWorkerParticipant) { + set((old) => ({ ...old, subtle: { ...old.subtle, workerParticipant: newWorkerParticipant } })); + + for (const event of participantTrackEvents) { + oldWorkerParticipant?.off(event as keyof ParticipantEventCallbacks, handleUpdateTracks); + } + for (const event of roomTrackEvents) { + room.off(event as keyof RoomEventCallbacks, handleUpdateTracks); + } + + if (newWorkerParticipant) { + for (const event of participantTrackEvents) { + newWorkerParticipant.on(event as keyof ParticipantEventCallbacks, handleUpdateTracks); + } + for (const event of roomTrackEvents) { + room.on(event as keyof RoomEventCallbacks, handleUpdateTracks); + } + handleUpdateTracks(); + } + } + }; + + const generateConversationalState = (attributes: Record, agentParticipant: RemoteParticipant | null): AgentConversationalState => { + let newConversationalState: AgentConversationalState = 'disconnected'; + + if (room.state !== ConnectionState.Disconnected) { + newConversationalState = 'initializing'; + } + + // If the microphone preconnect buffer is active, then the state should be "listening" rather + // than "initializing" + const micTrack = room.localParticipant.getTrackPublication(Track.Source.Microphone); + if (micTrack) { + newConversationalState = 'listening'; + } + + if (agentParticipant && attributes[ParticipantAttributes.state]) { + // ref: https://github.com/livekit/agents/blob/65170238db197f62f479eb7aaef1c0e18bfad6e7/livekit-agents/livekit/agents/voice/events.py#L97 + const agentState = attributes[ParticipantAttributes.state] as 'initializing' | 'idle' | 'listening' | 'thinking' | 'speaking'; + newConversationalState = agentState; + } + + return newConversationalState; + }; + const generateDerivedConversationalStateValues = (conversationalState: ConversationalState) => ({ + isAvailable: ( + conversationalState === 'listening' || + conversationalState === 'thinking' || + conversationalState === 'speaking' + ), + } as { + isAvailable: ConversationalState extends 'listening' | 'thinking' | 'speaking' ? true : false, + }); + + const generateConversationalStateUpdate = ( + old: AgentInstance, + camera: RemoteTrackInstance | null, + microphone: RemoteTrackInstance | null, + ): AgentInstance => { + const newConversationalState = generateConversationalState(old.attributes, old.subtle.agentParticipant); + + if (old.conversationalState !== newConversationalState) { + emitter.emit(AgentEvent.ConversationalStateChanged, newConversationalState); + } + switch (newConversationalState) { + case 'listening': + case 'thinking': + case 'speaking': + // if (camera || !microphone) { + // throw new Error(`AgentInstance.generateConversationalStateUpdate - attempted to transition to conversational state ${newConversationalState}, but camera / microphone not found.`); + // } + if (old.conversationalState === newConversationalState && old.camera === camera && old.microphone === microphone) { + return old; + } + + return { + ...old, + + conversationalState: newConversationalState, + ...generateDerivedConversationalStateValues(newConversationalState), + + camera, + microphone, + }; + + case 'disconnected': + case 'initializing': + case 'idle': + if (old.conversationalState === newConversationalState) { + return old; + } + return { + ...old, + + conversationalState: newConversationalState, + ...generateDerivedConversationalStateValues(newConversationalState), + + // Clear inner values if no longer connected + camera: null, + microphone: null, + }; + } + }; + + return { + [Symbol.toStringTag]: "AgentInstance", + + conversationalState: 'disconnected', + ...generateDerivedConversationalStateValues('disconnected'), + + waitUntilAvailable, + waitUntilCamera, + waitUntilMicrophone, + + microphone: null, + camera: null, + + attributes: {}, + + subtle: { + emitter, + initialize, + teardown, + + agentParticipant: null, + workerParticipant: null, + }, + }; +} diff --git a/agent-sdk/agent-session/AgentSession.ts b/agent-sdk/agent-session/AgentSession.ts new file mode 100644 index 000000000..14b217d8e --- /dev/null +++ b/agent-sdk/agent-session/AgentSession.ts @@ -0,0 +1,466 @@ +import type TypedEventEmitter from 'typed-emitter'; +import { Room, RoomEvent, ConnectionState, TrackPublishOptions } from 'livekit-client'; +import { EventEmitter } from 'events'; + +import { type ReceivedMessage } from "./message"; +import { AgentConversationalState, AgentEvent, AgentInstance, createAgent } from './Agent'; +import { ConnectionCredentials } from './ConnectionCredentialsProvider'; +import { ParticipantAttributes } from '../lib/participant-attributes'; +import { createMessages, MessagesInstance } from './Messages'; +import { createLocal, LocalInstance } from './Local'; +import { createScopedGetSet } from '../lib/scoped-get-set'; + +/** State representing the current connection status to the server hosted agent */ +// FIXME: maybe just make this ConnectionState? +export type AgentSessionConnectionState = 'disconnected' | 'connecting' | 'connected' | 'reconnecting' | 'signalReconnecting'; + +export enum AgentSessionEvent { + ConnectionStateChanged = 'agentConnectionStateChanged', + AgentConversationalStateChanged = 'agentConversationalStateChanged', + AgentAttributesChanged = 'agentAttributesChanged', + MessageReceived = 'messageReceived', + AgentConnectionFailure = 'agentConnectionFailure', + AudioPlaybackStatusChanged = 'AudioPlaybackStatusChanged', + MediaDevicesError = 'MediaDevicesError', +} + +export type AgentSessionCallbacks = { + [AgentSessionEvent.ConnectionStateChanged]: (newAgentConnectionState: AgentSessionConnectionState) => void; + [AgentSessionEvent.AgentConversationalStateChanged]: (newAgentConversationalState: AgentConversationalState) => void; + [AgentSessionEvent.MessageReceived]: (newMessage: ReceivedMessage) => void; + [AgentSessionEvent.AgentConnectionFailure]: (reason: string) => void; + [AgentSessionEvent.AudioPlaybackStatusChanged]: (audioPlaybackPermitted: boolean) => void; + [AgentSessionEvent.MediaDevicesError]: (error: Error) => void; +}; + +export type AgentSessionOptions = { + credentials: ConnectionCredentials; +}; + +export type AgentSessionConnectOptions = { + /** Optional abort signal which if triggered will stop waiting for the room to be disconnected + * prior to connecting + * + * FIXME: is this a confusing property to expose? Maybe expose one `signal` that universally + * could apply across the whole agentSession.connect(...) call? + */ + waitForDisconnectSignal?: AbortSignal; + + /** + * Amount of time in milliseonds the system will wait for an agent to join the room, before + * emitting an AgentSessionEvent.AgentConnectionFailure event. + */ + agentConnectTimeoutMilliseconds?: number; + + // FIXME: not sure about this pattern, background thinking is that it would be good to be able to + // abstract away enabling relevant media tracks to the caller so they don't have to interface with + // the room. + tracks?: { + microphone?: { + enabled?: boolean; + publishOptions?: TrackPublishOptions; + }; + }; +}; + +// FIXME: make this 10 seconds once room dispatch booting info is discoverable +const DEFAULT_AGENT_CONNECT_TIMEOUT_MILLISECONDS = 20_000; + + +export type SwitchActiveDeviceOptions = { + /** + * If true, adds an `exact` constraint to the getUserMedia request. + * The request will fail if this option is true and the device specified is not actually available + */ + exact?: boolean; +}; + +type AgentSessionInstanceCommon = { + [Symbol.toStringTag]: "AgentSessionInstance", + + credentials: ConnectionCredentials; + + /** Returns a promise that resolves once the room connects. */ + waitUntilConnected: (signal?: AbortSignal) => void; + /** Returns a promise that resolves once the room disconnects */ + waitUntilDisconnected: (signal?: AbortSignal) => void; + + agentConnectTimeout: { + delayInMilliseconds: number; + timeoutId: ReturnType | null; + } | null, + + prepareConnection: () => Promise, + connect: (options?: AgentSessionConnectOptions) => Promise; + disconnect: () => Promise; + + canPlayAudio: boolean; + startAudio: () => Promise; + + subtle: { + emitter: TypedEventEmitter; + room: Room; + }; +}; + +type AgentSessionInstanceConnecting = AgentSessionInstanceCommon & { + connectionState: "connecting"; + isConnected: false; + isReconnecting: false; + + agent: AgentInstance | null; + local: LocalInstance | null; + messages: MessagesInstance | null; +}; + +type AgentSessionInstanceConnected = AgentSessionInstanceCommon & { + connectionState: "connected" | "reconnecting" | "signalReconnecting"; + isConnected: true; + isReconnecting: boolean; + + agent: AgentInstance; + local: LocalInstance; + messages: MessagesInstance; +}; + +type AgentSessionInstanceDisconnected = AgentSessionInstanceCommon & { + connectionState: "disconnected"; + isConnected: false; + isReconnecting: false; + + agent: null; + local: null; + messages: null; +}; + +export type AgentSessionInstance = AgentSessionInstanceConnecting | AgentSessionInstanceConnected | AgentSessionInstanceDisconnected; + + /** + * AgentSession represents a connection to a LiveKit Agent, providing abstractions to make 1:1 + * agent/participant rooms easier to work with. + */ +export function createAgentSession( + options: AgentSessionOptions, + get: () => AgentSessionInstance, + set: (fn: (old: AgentSessionInstance) => AgentSessionInstance) => void, +): AgentSessionInstance { + const emitter = new EventEmitter() as TypedEventEmitter; + const room = new Room(); + + const handleAgentAttributesChanged = () => { + set((old) => generateConnectionStateUpdate(old, old.agent, old.local, old.messages)); + }; + + const handleRoomConnected = async () => { + console.log('!! CONNECTED'); + + const { get: agentGet, set: agentSet } = createScopedGetSet(get, set, 'agent', 'AgentSession'); + const agent = createAgent(room, agentGet, agentSet); + agent.subtle.emitter.on(AgentEvent.AttributesChanged, handleAgentAttributesChanged); + + const { get: localGet, set: localSet } = createScopedGetSet(get, set, 'local', 'AgentSession'); + const local = createLocal(room, localGet, localSet); + + const { get: messagesGet, set: messagesSet } = createScopedGetSet(get, set, 'messages', 'AgentSession'); + const messages = createMessages(room, messagesGet, messagesSet); + + set((old) => generateConnectionStateUpdate(old, agent, local, messages)); + agent.subtle.initialize(); + local.subtle.initialize(); + messages.subtle.initialize(); + + set((old) => { + if (!old.agentConnectTimeout) { + // (this case shoudln't in practice ever happen) + throw new Error('AgentSessionInstance.connect - agentConnectTimeout not set, aborting!'); + } + + return { + ...old, + agentConnectTimeout: { + delayInMilliseconds: old.agentConnectTimeout.delayInMilliseconds, + timeoutId: startAgentConnectedTimeout(old.agentConnectTimeout.delayInMilliseconds), + }, + }; + }); + }; + room.on(RoomEvent.Connected, handleRoomConnected); + + const handleRoomDisconnected = () => { + console.log('!! DISCONNECTED'); + // old.subtle.agent?.off(AgentEvent.AgentConnectionStateChanged, this.handleAgentConnectionStateChanged); + // old.subtle.agent?.off(AgentEvent.AgentConversationalStateChanged, this.handleAgentConversationalStateChanged); + get().agent?.subtle.teardown(); + get().agent?.subtle.emitter.off(AgentEvent.AttributesChanged, handleAgentAttributesChanged); + + get().local?.subtle.teardown(); + get().messages?.subtle.teardown(); + + set((old) => generateConnectionStateUpdate(old, null, null, null)); + + set((old) => { + if (old.agentConnectTimeout?.timeoutId) { + clearTimeout(old.agentConnectTimeout?.timeoutId); + } + return { ...old, agentConnectTimeout: null }; + }); + + options.credentials.refresh(); + }; + room.on(RoomEvent.Disconnected, handleRoomDisconnected); + + const handleAudioPlaybackStatusChanged = async () => { + const canPlayAudio = get().subtle.room.canPlaybackAudio; + set((old) => ({ ...old, canPlayAudio })); + emitter.emit(AgentSessionEvent.AudioPlaybackStatusChanged, canPlayAudio); + }; + room.on(RoomEvent.AudioPlaybackStatusChanged, handleAudioPlaybackStatusChanged); + + const handleMediaDevicesError = async (error: Error) => { + emitter.emit(AgentSessionEvent.MediaDevicesError, error); + }; + room.on(RoomEvent.MediaDevicesError, handleMediaDevicesError); + + const handleConnectionStateChanged = () => { + // Skip handling connected / disconnected, because handleRoomConnected / handleRoomDisconnected + // already run for these state changes + if (room.state === ConnectionState.Connected || room.state === ConnectionState.Disconnected) { + return; + } + + set((old) => generateConnectionStateUpdate(old, old.agent, old.local, old.messages)); + }; + room.on(RoomEvent.ConnectionStateChanged, handleConnectionStateChanged); + + + const connect = async (connectOptions: AgentSessionConnectOptions = {}) => { + const { + waitForDisconnectSignal, + agentConnectTimeoutMilliseconds = DEFAULT_AGENT_CONNECT_TIMEOUT_MILLISECONDS, + tracks = { microphone: { enabled: true, publishOptions: { preConnectBuffer: true } } }, + } = connectOptions; + + await waitUntilDisconnected(waitForDisconnectSignal); + + set((old) => ({ + ...old, + agentConnectTimeout: { + delayInMilliseconds: agentConnectTimeoutMilliseconds, + timeoutId: null, + }, + })); + + const state = get(); + await Promise.all([ + options.credentials.generate().then(connection => ( + state.subtle.room.connect(connection.serverUrl, connection.participantToken) + )), + + // Start microphone (with preconnect buffer) by default + tracks.microphone?.enabled ? ( + state.subtle.room.localParticipant.setMicrophoneEnabled(true, undefined, tracks.microphone?.publishOptions ?? {}) + ) : Promise.resolve(), + ]); + + await waitUntilConnected(); + await get().agent!.waitUntilAvailable(); + }; + const disconnect = async () => { + await get().subtle.room.disconnect(); + }; + + const prepareConnection = async () => { + const credentials = await options.credentials.generate(); + await room.prepareConnection(credentials.serverUrl, credentials.participantToken); + }; + prepareConnection().catch(err => { + // FIXME: figure out a better logging solution? + console.warn('WARNING: Room.prepareConnection failed:', err); + }); + + const startAudio = async () => get().subtle.room.startAudio(); + + const startAgentConnectedTimeout = (agentConnectTimeoutMilliseconds: AgentSessionConnectOptions["agentConnectTimeoutMilliseconds"] | null) => { + return setTimeout(() => { + const { connectionState, agent, disconnect } = get(); + if (!agent?.isAvailable) { + const reason = + connectionState === 'connecting' + ? 'Agent did not join the room. ' + : 'Agent connected but did not complete initializing. '; + + emitter.emit(AgentSessionEvent.AgentConnectionFailure, reason); + console.error('!! AGENT WAS NOT CONNECTED WITHIN TIMEOUT!'); + disconnect(); + } + }, agentConnectTimeoutMilliseconds ?? DEFAULT_AGENT_CONNECT_TIMEOUT_MILLISECONDS); + }; + + const waitUntilConnected = async (signal?: AbortSignal) => { + return waitUntilConnectionState( + ConnectionState.Connected, /* FIXME: should I check for other states too? */ + signal, + ); + }; + + const waitUntilDisconnected = async (signal?: AbortSignal) => { + return waitUntilConnectionState( + ConnectionState.Disconnected, + signal, + ); + }; + + const waitUntilConnectionState = async (state: ConnectionState, signal?: AbortSignal) => { + const { connectionState } = get(); + if (connectionState === state) { + return; + } + + return new Promise((resolve, reject) => { + const onceEventOccurred = (newState: AgentSessionConnectionState) => { + if (newState !== state) { + return; + } + cleanup(); + resolve(); + }; + const abortHandler = () => { + cleanup(); + reject(new Error(`AgentSession.waitUntilRoomState(${state}, ...) - signal aborted`)); + }; + + const cleanup = () => { + emitter.off(AgentSessionEvent.ConnectionStateChanged, onceEventOccurred); + signal?.removeEventListener('abort', abortHandler); + }; + + emitter.on(AgentSessionEvent.ConnectionStateChanged, onceEventOccurred); + signal?.addEventListener('abort', abortHandler); + }); + }; + + const generateConnectionState = (agent: AgentInstance | null): AgentSessionConnectionState => { + const roomConnectionState = room.state; + if (roomConnectionState === ConnectionState.Disconnected) { + return 'disconnected'; + } else if ( + roomConnectionState === ConnectionState.Connecting || + !agent?.subtle.agentParticipant || + !agent?.attributes[ParticipantAttributes.state] + ) { + return 'connecting'; + } else { + return roomConnectionState; + } + }; + const generateDerivedConnectionStateValues = (connectionState: ConnectionState) => ({ + isConnected: ( + connectionState === 'connected' || + connectionState === 'reconnecting' || + connectionState === 'signalReconnecting' + ), + isReconnecting: ( + connectionState === 'reconnecting' || + connectionState === 'signalReconnecting' + ), + } as { + isConnected: ConnectionState extends 'connected' | 'reconnecting' | 'signalReconnecting' ? true : false, + isReconnecting: ConnectionState extends 'reconnecting' | 'signalReconnecting' ? true : false, + }); + + const generateConnectionStateUpdate = ( + old: AgentSessionInstance, + agent: AgentInstance | null, + local: LocalInstance | null, + messages: MessagesInstance | null, + ): AgentSessionInstance => { + const newConnectionState = generateConnectionState(agent); + + if (old.connectionState !== newConnectionState) { + emitter.emit(AgentSessionEvent.ConnectionStateChanged, newConnectionState); + } + + switch (newConnectionState) { + case 'connecting': + if (old.connectionState === 'connecting' && old.local === local && old.agent === agent && old.messages === messages) { + return old; + } + return { + ...old, + + connectionState: 'connecting', + ...generateDerivedConnectionStateValues('connecting'), + + local, + agent, + messages, + }; + + case 'connected': + case 'reconnecting': + case 'signalReconnecting': + if (!local || !agent || !messages) { + throw new Error(`AgentSessionInstance.generateConnectionStateUpdate - attempted to transition to connection state ${newConnectionState}, but local / agent / messages not found.`); + } + if (old.connectionState === newConnectionState && old.local === local && old.agent === agent && old.messages === messages) { + return old; + } + return { + ...old, + + connectionState: newConnectionState, + ...generateDerivedConnectionStateValues(newConnectionState), + + local, + agent, + messages, + }; + + case 'disconnected': + if (old.connectionState === newConnectionState) { + return old; + } + return { + ...old, + + connectionState: newConnectionState, + ...generateDerivedConnectionStateValues(newConnectionState), + + // Clear inner values if no longer connected + local: null, + agent: null, + messages: null, + }; + } + }; + + return { + [Symbol.toStringTag]: "AgentSessionInstance", + + credentials: options.credentials, + + connectionState: 'disconnected', + ...generateDerivedConnectionStateValues('disconnected'), + + agent: null, + local: null, + messages: null, + + waitUntilConnected, + waitUntilDisconnected, + + agentConnectTimeout: null, + + prepareConnection, + connect, + disconnect, + + canPlayAudio: room.canPlaybackAudio, + startAudio, + + subtle: { + emitter, + room, + }, + }; +} diff --git a/agent-sdk/agent-session/ConnectionCredentialsProvider.ts b/agent-sdk/agent-session/ConnectionCredentialsProvider.ts new file mode 100644 index 000000000..25565b996 --- /dev/null +++ b/agent-sdk/agent-session/ConnectionCredentialsProvider.ts @@ -0,0 +1,118 @@ +import { decodeJwt } from 'jose'; + +import { ConnectionDetails } from "@/app/api/connection-details/route"; + +const ONE_MINUTE_IN_MILLISECONDS = 60 * 1000; + +/** + * ConnectionDetails handles getting credentials for connecting to a new Room, caching + * the last result and using it until it expires. */ +export abstract class ConnectionCredentials { + private cachedConnectionDetails: ConnectionDetails | null = null; + + protected isCachedConnectionDetailsExpired() { + const token = this.cachedConnectionDetails?.participantToken; + if (!token) { + return true; + } + + const jwtPayload = decodeJwt(token); + if (!jwtPayload.exp) { + return true; + } + const expiresAt = new Date(jwtPayload.exp - ONE_MINUTE_IN_MILLISECONDS); + + const now = new Date(); + return expiresAt >= now; + } + + async generate() { + if (this.isCachedConnectionDetailsExpired()) { + await this.refresh(); + } + + return this.cachedConnectionDetails!; + } + + async refresh() { + this.cachedConnectionDetails = await this.fetch(); + } + + protected abstract fetch(): Promise; +}; + +export class ManualConnectionCredentials extends ConnectionCredentials { + protected fetch: () => Promise; + + constructor(handler: () => Promise) { + super(); + this.fetch = handler; + } +} + +export class LiteralConnectionCredentials extends ConnectionCredentials { + payload: ConnectionDetails; + + constructor(payload: ConnectionDetails) { + super(); + this.payload = payload; + } + + async fetch() { + if (this.isCachedConnectionDetailsExpired()) { + // FIXME: figure out a better logging solution? + console.warn('WARNING: The credentials within LiteralConnectionCredentials have expired, so any upcoming room connections will fail.'); + } + return this.payload; + } + + async refresh() { /* cannot refresh a literal set of credentials! */ } +} + + +type SandboxConnectionCredentialsOptions = { + sandboxId: string; + baseUrl?: string; + + /** The name of the room to join. If omitted, a random new room name will be generated instead. */ + roomName?: string; + + /** The identity of the participant the token should connect as connect as. If omitted, a random + * identity will be used instead. */ + participantName?: string; +}; + +export class SandboxConnectionCredentials extends ConnectionCredentials { + protected options: SandboxConnectionCredentialsOptions; + + constructor(options: SandboxConnectionCredentialsOptions) { + super(); + this.options = options; + + if (process.env.NODE_ENV === 'production') { + // FIXME: figure out a better logging solution? + console.warn('WARNING: SandboxConnectionCredentials is meant for development, and is not security hardened. In production, implement your own token generation solution.'); + } + } + + async fetch() { + const baseUrl = this.options.baseUrl ?? "https://cloud-api.livekit.io"; + const response = await fetch(`${baseUrl}/api/sandbox/connection-details`, { + method: "POST", + headers: { + "X-Sandbox-ID": this.options.sandboxId, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + roomName: this.options.roomName, + participantName: this.options.participantName, + }), + }); + + if (!response.ok) { + throw new Error(`Error generting token from sandbox token server: ${response.status} ${await response.text()}`); + } + + return response.json(); + } +} diff --git a/agent-sdk/agent-session/Local.ts b/agent-sdk/agent-session/Local.ts new file mode 100644 index 000000000..3f5e431a6 --- /dev/null +++ b/agent-sdk/agent-session/Local.ts @@ -0,0 +1,134 @@ +import { ParticipantEvent, Room, RoomEvent } from 'livekit-client'; +import type TypedEventEmitter from 'typed-emitter'; +import { EventEmitter } from "events"; +import { LocalParticipant, Track } from 'livekit-client'; +import { createLocalTrack, LocalTrackInstance } from './LocalTrack'; +import { ParticipantPermission } from 'livekit-server-sdk'; +import { trackSourceToProtocol } from '../external-deps/components-js'; +import { createScopedGetSet } from '../lib/scoped-get-set'; + +export enum LocalEvent { + PermissionsChanged = 'permissionsChanged', +}; + +export type LocalCallbacks = { + [LocalEvent.PermissionsChanged]: (permissions: ParticipantPermission | null) => void; +}; + +export type LocalInstance = { + [Symbol.toStringTag]: "LocalInstance"; + + permissions: ParticipantPermission | null; + publishPermissions: { + camera: boolean | null; + microphone: boolean | null; + screenShare: boolean | null; + data: boolean; + }; + + camera: LocalTrackInstance; + microphone: LocalTrackInstance; + screenShare: LocalTrackInstance; + + subtle: { + emitter: TypedEventEmitter; + initialize: () => void; + teardown: () => void; + + localParticipant: LocalParticipant; + }; +}; + +export function createLocal( + room: Room, + get: () => LocalInstance, + set: (fn: (old: LocalInstance) => LocalInstance) => void, +): LocalInstance { + const emitter = new EventEmitter() as TypedEventEmitter; + + const handleParticipantPermissionsChanged = () => { + const permissions = room.localParticipant.permissions ?? null; + + const canPublishSource = (source: Track.Source) => { + return ( + permissions?.canPublish && + (permissions.canPublishSources.length === 0 || + permissions.canPublishSources.includes(trackSourceToProtocol(source))) + ); + }; + + set((old) => ({ + ...old, + permissions, + publishPermissions: { // FIXME: figure out a better place to put this? Maybe in with tracks? + camera: canPublishSource(Track.Source.Camera) ?? null, + microphone: canPublishSource(Track.Source.Microphone) ?? null, + screenShare: canPublishSource(Track.Source.ScreenShare) ?? null, + data: permissions?.canPublishData ?? false, + }, + })); + + emitter.emit(LocalEvent.PermissionsChanged, permissions); + }; + + const initialize = () => { + get().camera.subtle.initialize(); + get().microphone.subtle.initialize(); + get().screenShare.subtle.initialize(); + + room.on(RoomEvent.ParticipantPermissionsChanged, handleParticipantPermissionsChanged); + }; + + const teardown = () => { + room.localParticipant.off(ParticipantEvent.ParticipantPermissionsChanged, handleParticipantPermissionsChanged); + + get().camera.subtle.teardown(); + get().microphone.subtle.teardown(); + get().screenShare.subtle.teardown(); + }; + + const { get: trackGet, set: trackSet } = createScopedGetSet(get, set, 'camera', 'LocalTrack'); + const camera = createLocalTrack({ + room, + trackSource: Track.Source.Camera, + preventUserChoicesSave: false, + }, trackGet, trackSet); + + const { get: microphoneTrackGet, set: microphoneTrackSet } = createScopedGetSet(get, set, 'microphone', 'LocalTrack'); + const microphone = createLocalTrack({ + room, + trackSource: Track.Source.Microphone, + preventUserChoicesSave: false, + }, microphoneTrackGet, microphoneTrackSet); + + const { get: screenShareTrackGet, set: screenShareTrackSet } = createScopedGetSet(get, set, 'screenShare', 'LocalTrack'); + const screenShare = createLocalTrack({ + room, + trackSource: Track.Source.ScreenShare, + preventUserChoicesSave: false, + }, screenShareTrackGet, screenShareTrackSet); + + return { + [Symbol.toStringTag]: "LocalInstance", + + permissions: room.localParticipant.permissions ?? null, + publishPermissions: { + camera: null, + microphone: null, + screenShare: null, + data: false, + }, + + camera, + microphone, + screenShare, + + subtle: { + emitter, + initialize, + teardown, + + localParticipant: room.localParticipant, + }, + }; +} diff --git a/agent-sdk/agent-session/LocalTrack.ts b/agent-sdk/agent-session/LocalTrack.ts new file mode 100644 index 000000000..df23f4f29 --- /dev/null +++ b/agent-sdk/agent-session/LocalTrack.ts @@ -0,0 +1,436 @@ +import { LocalTrack, Room, RoomEvent } from 'livekit-client'; +import { EventEmitter } from 'events'; +import type TypedEventEmitter from 'typed-emitter'; +import { AudioCaptureOptions, LocalTrackPublication, ParticipantEvent, ScreenShareCaptureOptions, Track, TrackPublishOptions, VideoCaptureOptions } from 'livekit-client'; +import { ParticipantEventCallbacks } from '@/agent-sdk/external-deps/client-sdk-js'; +import { SwitchActiveDeviceOptions } from './AgentSession'; +import { type LocalUserChoices, loadUserChoices, saveUserChoices } from '../external-deps/components-js'; + +// FIXME: rename this +export const participantEvents: Array = [ + ParticipantEvent.TrackMuted, + ParticipantEvent.TrackUnmuted, + ParticipantEvent.ParticipantPermissionsChanged, + // ParticipantEvent.IsSpeakingChanged, + ParticipantEvent.TrackPublished, + ParticipantEvent.TrackUnpublished, + ParticipantEvent.LocalTrackPublished, + ParticipantEvent.LocalTrackUnpublished, + ParticipantEvent.MediaDevicesError, + ParticipantEvent.TrackSubscriptionStatusChanged, + // ParticipantEvent.ConnectionQualityChanged, +]; + +type CaptureOptions = + | (TrackSource extends Track.Source.Microphone ? AudioCaptureOptions : never) + | (TrackSource extends Track.Source.Camera ? VideoCaptureOptions : never) + | (TrackSource extends Track.Source.ScreenShare ? ScreenShareCaptureOptions : never); + +export enum LocalTrackEvent { + DeviceError = 'deviceError', + PendingDisabled = 'pendingDisabled', + DeviceListError = 'deviceListError', + ActiveDeviceChangeError = 'activeDeviceChangeError', +}; + +export type LocalTrackCallbacks = { + [LocalTrackEvent.DeviceError]: (error: Error, source: TrackSource) => void; + [LocalTrackEvent.PendingDisabled]: () => void; + [LocalTrackEvent.DeviceListError]: (error: Error, source: TrackSource) => void; + [LocalTrackEvent.ActiveDeviceChangeError]: (error: Error, source: TrackSource) => void; +}; + +export type LocalTrackInstance = { + [Symbol.toStringTag]: "LocalTrackInstance"; + isLocal: true, + + /** The type of track reprsented (ie, camera, microphone, screen share, etc) */ + source: TrackSource; + + /** Is the track currently enabled? */ + enabled: boolean; + + /** Is the track currently in the midst of being enabled or disabled? */ + pending: boolean; + + /** Returns a promise which resolves once the track is no longer pending. */ + waitUntilNotPending: (signal?: AbortSignal) => void; + + set: (enabled: boolean, captureOptions?: CaptureOptions, publishOptions?: TrackPublishOptions) => Promise; + toggle: (captureOptions?: CaptureOptions, publishOptions?: TrackPublishOptions) => Promise; + devices: TrackSource extends Track.Source.Camera | Track.Source.Microphone ? { + kind: TrackSource extends Track.Source.Camera ? "videoinput" : "audioinput"; + activeId: string; + changeActive: TrackSource extends Track.Source.Camera | Track.Source.Microphone ? (deviceId?: string) => void : undefined; + list: Array; + subtle: { + listDevices: (requestPermissions?: boolean) => Promise>, + }, + } : undefined, + + attachToMediaElement: (element: TrackSource extends Track.Source.Microphone | Track.Source.ScreenShareAudio ? HTMLAudioElement : HTMLVideoElement) => () => void; + + dimensions: Track.Dimensions | null; + orientation: 'landscape' | 'portrait' | null; + + subtle: { + emitter: TypedEventEmitter>, + initialize: () => void; + teardown: () => void; + + publication: LocalTrackPublication | null, + userChoices: LocalUserChoices, + }; +}; + +export function createLocalTrack( + options: { + room: Room; + trackSource: TrackSource; + preventUserChoicesSave: boolean; + }, + get: () => LocalTrackInstance, + set: (fn: (old: LocalTrackInstance) => LocalTrackInstance) => void, +): LocalTrackInstance { + const emitter = new EventEmitter() as TypedEventEmitter>; + + let mediaDeviceKind = null; + switch (options.trackSource) { + case Track.Source.Camera: + mediaDeviceKind = 'videoinput' as const; + break; + case Track.Source.Microphone: + mediaDeviceKind = 'audioinput' as const; + break; + } + + const handleParticipantEvent = () => { + // FIXME: is the rest of this stuff needed? + // const { isMicrophoneEnabled, isCameraEnabled, isScreenShareEnabled } = p; + const publication = options.room.localParticipant.getTrackPublication(options.trackSource); + + let enabled = false; + switch (options.trackSource) { + case Track.Source.Camera: + enabled = options.room.localParticipant.isCameraEnabled; + break; + case Track.Source.Microphone: + enabled = options.room.localParticipant.isMicrophoneEnabled; + break; + case Track.Source.ScreenShare: + enabled = options.room.localParticipant.isScreenShareEnabled; + break; + default: + throw new Error(`LocalTrackInstance.handleParticipantEvent - Unable to handle processing track source ${options.trackSource}.`); + } + + let orientation = null; + // Set the orientation of the video track. + // TODO: This does not handle changes in orientation after a track got published (e.g when rotating a phone camera from portrait to landscape). + if ( + typeof publication?.dimensions?.width === 'number' && + typeof publication?.dimensions?.height === 'number' + ) { + orientation = + publication.dimensions.width > publication.dimensions.height ? 'landscape' as const : 'portrait' as const; + } + + set((old) => ({ + ...old, + enabled, + dimensions: publication?.dimensions ?? null, + orientation, + subtle: { + ...old.subtle, + publication: publication ?? null, + }, + })); + }; + + const initialize = () => { + handleParticipantEvent(); + + for (const eventName of participantEvents) { + options.room.localParticipant.on(eventName, handleParticipantEvent); + } + + if (mediaDeviceKind !== null) { + handleDeviceChange(); + + if (typeof window !== 'undefined') { + if (!window.isSecureContext) { + throw new Error( + `Accessing media devices is available only in secure contexts (HTTPS and localhost), in some or all supporting browsers. See: https://developer.mozilla.org/en-US/docs/Web/API/Navigator/mediaDevices`, + ); + } + navigator?.mediaDevices?.addEventListener('devicechange', handleDeviceChange); + } + + // When the device changes, refetch devices + // This is required because if a user activates a device for the first time, this is what causes + // the permission check to occur and after permissions have been granted, the devices list may + // now return a non empty list + options.room.on(RoomEvent.ActiveDeviceChanged, handleDeviceChange); + } + }; + + const teardown = () => { + for (const eventName of participantEvents) { + options.room.localParticipant.off(eventName, handleParticipantEvent); + } + + if (mediaDeviceKind !== null) { + navigator?.mediaDevices?.removeEventListener('devicechange', handleDeviceChange); + options.room.off(RoomEvent.ActiveDeviceChanged, handleDeviceChange); + } + }; + + const setEnabled = async ( + enabled: boolean, + captureOptions?: CaptureOptions, + publishOptions?: TrackPublishOptions, + ) => { + await waitUntilNotPending(); + + set((old) => ({ ...old, pending: true })); + + let setEnabledPromise; + let getterKey; + switch (options.trackSource) { + case Track.Source.Camera: + setEnabledPromise = options.room.localParticipant.setCameraEnabled( + enabled, + captureOptions as CaptureOptions, + publishOptions, + ); + getterKey = 'isCameraEnabled' as const; + break; + case Track.Source.Microphone: + setEnabledPromise = options.room.localParticipant.setMicrophoneEnabled( + enabled, + captureOptions as CaptureOptions, + publishOptions, + ); + getterKey = 'isMicrophoneEnabled' as const; + break; + case Track.Source.ScreenShare: + setEnabledPromise = options.room.localParticipant.setScreenShareEnabled( + enabled, + captureOptions as CaptureOptions, + publishOptions, + ); + getterKey = 'isScreenShareEnabled' as const; + break; + default: + throw new Error(`LocalTrackInstance.setEnabled - Unable to handle enabling track source ${options.trackSource}.`); + } + + try { + await setEnabledPromise; + } catch (err) { + if (err instanceof Error) { + emitter.emit(LocalTrackEvent.DeviceError, err, options.trackSource); + } + throw err; + } finally { + set((old) => ({ ...old, pending: false })); + } + + switch (options.trackSource) { + case Track.Source.Camera: + updateUserChoices('videoEnabled', enabled); + break; + case Track.Source.Microphone: + updateUserChoices('audioEnabled', enabled); + break; + } + + set((old) => ({ ...old, enabled })); // FIXME: is this needed given the event handler should fire? + + emitter.emit(LocalTrackEvent.PendingDisabled); + return options.room.localParticipant[getterKey]; + }; + + const toggleEnabled = (captureOptions?: CaptureOptions, publishOptions?: TrackPublishOptions) => { + return setEnabled(!get().enabled, captureOptions, publishOptions); + }; + + const updateUserChoices = (key: Key, value: LocalUserChoices[Key]) => { + set((old) => ({ + ...old, + subtle: { + ...old.subtle, + userChoices: { + ...old.subtle.userChoices, + [key]: value, + }, + }, + })); + saveUserChoices(get().subtle.userChoices, options.preventUserChoicesSave); + }; + + const changeActiveDevice = async ( + id: string = 'default', + changeActiveDeviceOptions: SwitchActiveDeviceOptions = {}, + ) => { + if (!mediaDeviceKind) { + throw new Error(`LocalTrackInstance.devices.change - Unable to change active device for track source ${options.trackSource}.`); + } + + let userChoicesKey; + switch (options.trackSource) { + case Track.Source.Camera: + userChoicesKey = 'videoDeviceId' as const; + break; + case Track.Source.Microphone: + userChoicesKey = 'audioDeviceId' as const; + break; + default: + throw new Error(`LocalTrackInstance.devices.change - Unable to change active device for track source ${options.trackSource}.`); + } + + // FIXME: use actual logger of some sort? + console.debug(`Switching active device of kind "${mediaDeviceKind}" with id ${id}.`); + + // FIXME: is there a way to do this that doesn't require reaching all the way back to the room? + try { + await options.room.switchActiveDevice(mediaDeviceKind, id, changeActiveDeviceOptions?.exact); + } catch (err) { + if (err instanceof Error) { + emitter.emit(LocalTrackEvent.ActiveDeviceChangeError, err, options.trackSource); + } + throw err; + } + + const actualDeviceId: string | undefined = options.room.getActiveDevice(mediaDeviceKind) ?? id; + if (actualDeviceId !== id && id !== 'default') { + // FIXME: use actual logger of some sort? + console.info( + `We tried to select the device with id (${id}), but the browser decided to select the device with id (${actualDeviceId}) instead.`, + ); + } + + let targetTrack: LocalTrack | undefined = undefined; + if (mediaDeviceKind === 'audioinput') { + targetTrack = options.room.localParticipant.getTrackPublication(Track.Source.Microphone)?.track; + } else if (mediaDeviceKind === 'videoinput') { + targetTrack = options.room.localParticipant.getTrackPublication(Track.Source.Camera)?.track; + } + + const useDefault = + (id === 'default' && !targetTrack) || + (id === 'default' && targetTrack?.mediaStreamTrack.label.startsWith('Default')); + + let newCurrentDeviceId = useDefault ? id : actualDeviceId; + if (newCurrentDeviceId) { + set((old) => ({ + ...old, + devices: { + ...old.devices, + activeId: newCurrentDeviceId, + }, + })); + updateUserChoices(userChoicesKey, id); + } + return newCurrentDeviceId; + }; + + const listDevices = async (requestPermissions = false) => { + if (!mediaDeviceKind) { + throw new Error(`LocalTrackInstance.devices.list - Unable to list devices for track source ${options.trackSource}.`); + } + + const devices = await Room.getLocalDevices(mediaDeviceKind, requestPermissions); + return devices.filter(d => d.deviceId !== ''); + }; + + const handleDeviceChange = async () => { + let list; + try { + list = await listDevices(); + } catch (err) { + if (err instanceof Error) { + emitter.emit(LocalTrackEvent.DeviceListError, err, options.trackSource); + } + throw err; + } + + set((old) => { + if (!old.devices) { + throw new Error(`LocalTrackInstance.handleDeviceChang - Error storing device list for track source ${options.trackSource}`); + } + return { ...old, devices: { ...old.devices, list } }; + }); + }; + + const waitUntilNotPending = async (signal?: AbortSignal) => { + const { pending } = get(); + if (!pending) { + return; + } + + return new Promise((resolve, reject) => { + const onceEventOccurred = () => { + cleanup(); + resolve(); + }; + const abortHandler = () => { + cleanup(); + reject(new Error(`LocalTrackEvent.waitUntilNotPending - signal aborted`)); + }; + + const cleanup = () => { + emitter.off(LocalTrackEvent.PendingDisabled, onceEventOccurred); + signal?.removeEventListener('abort', abortHandler); + }; + + emitter.on(LocalTrackEvent.PendingDisabled, onceEventOccurred); + signal?.addEventListener('abort', abortHandler); + }); + }; + + const attachToMediaElement = (element: HTMLMediaElement) => { + const track = get().subtle.publication?.track; + if (!track) { + throw new Error('LocalTrackInstance.attachToMediaElement - track publication not set'); + } + + track.attach(element); + return () => { + track.detach(element); + }; + }; + + return { + [Symbol.toStringTag]: "LocalTrackInstance", + isLocal: true, + + source: options.trackSource, + enabled: false, + pending: false, + waitUntilNotPending, + + set: setEnabled, + toggle: toggleEnabled, + devices: mediaDeviceKind ? { + kind: mediaDeviceKind, + activeId: options.room.getActiveDevice(mediaDeviceKind) ?? 'default', + changeActive: changeActiveDevice, + list: [], + subtle: { listDevices }, + } : undefined as any, // FIXME: I can't get this type logic to work... + + attachToMediaElement, + dimensions: null, + orientation: null, + + subtle: { + emitter, + initialize, + teardown, + + publication: options.room.localParticipant.getTrackPublication(options.trackSource) ?? null, + userChoices: loadUserChoices(), + }, + }; +} diff --git a/agent-sdk/agent-session/Messages.ts b/agent-sdk/agent-session/Messages.ts new file mode 100644 index 000000000..4521f7505 --- /dev/null +++ b/agent-sdk/agent-session/Messages.ts @@ -0,0 +1,184 @@ +import type TypedEventEmitter from 'typed-emitter'; +import { Room } from 'livekit-client'; +import { EventEmitter } from "events"; + +import { generateId } from '../lib/uuid'; +import { + type ReceivedMessage, + type SentMessage, + MessageSender, + MessageReceiver, + ChatMessageSender, + CombinedMessageSender, + CombinedMessageReceiver, + TranscriptionMessageReceiver, + ReceivedMessageAggregator, + type ReceivedMessageAggregatorOptions, + ReceivedMessageAggregatorEvent, + SentMessageOptions, + SentChatMessageOptions, +} from "./message"; + +type SendMessageFunction = { + // Plain chat messages + (message: string): Promise; + (message: string, options: SentChatMessageOptions): Promise; + + // Custom messages + // (if SentMessageOptions can be undefined, then options is optional, otherwise it is required) + < + Message extends SentMessage, + OptionalOptions extends SentMessageOptions | undefined + >(message: Message): Promise; + (message: Message, options: SentMessageOptions): Promise; +}; + +export type MessagesInstance = { + [Symbol.toStringTag]: "MessagesInstance", + + list: Array; + + /** Is a send operation currently in progress? */ + sendPending: boolean; + + send: SendMessageFunction, + + /** + * Create a ReceivedMessageAggregator, which allows one to view a snapshot of all received + * messages at the current time. + */ + createMessageAggregator: (options?: ReceivedMessageAggregatorOptions) => ReceivedMessageAggregator; + + subtle: { + emitter: TypedEventEmitter; + initialize: () => void; + teardown: () => void; + + messageSender: MessageSender | null; + messageReceiver: MessageReceiver | null; + defaultMessageAggreggator: ReceivedMessageAggregator | null; + }; +} + +export enum MessagesEvent { + MessageReceived = 'messageReceived', + Disconnected = 'disconnected', +} + +export type MessagesCallbacks = { + [MessagesEvent.MessageReceived]: (message: ReceivedMessage) => void; + [MessagesEvent.Disconnected]: () => void; +}; + +export function createMessages( + room: Room, + get: () => MessagesInstance, + set: (fn: (old: MessagesInstance) => MessagesInstance) => void, +): MessagesInstance { + const emitter = new EventEmitter() as TypedEventEmitter; + + const handleIncomingMessage = (incomingMessage: ReceivedMessage) => { + emitter.emit(MessagesEvent.MessageReceived, incomingMessage); + }; + + + const handleDefaultMessageAggregatorUpdated = () => { + set((old) => ({ + ...old, + list: old.subtle.defaultMessageAggreggator?.toArray() ?? [], + })); + }; + + const initialize = () => { + const chatMessageSender = new ChatMessageSender(room.localParticipant); + const messageSender = new CombinedMessageSender( + chatMessageSender, + // TODO: other types of messages that can be sent + ); + set((old) => ({ ...old, subtle: { ...old.subtle, messageSender } })); + + const messageReceiver = new CombinedMessageReceiver( + new TranscriptionMessageReceiver(room), + chatMessageSender.generateLoopbackMessageReceiver(), + // TODO: images? attachments? rpc? + ); + set((old) => ({ ...old, subtle: { ...old.subtle, messageReceiver } })); + (async () => { + // FIXME: is this sort of pattern a better idea than just making MessageReceiver an EventEmitter? + // FIXME: this probably doesn't handle errors properly right now + for await (const message of messageReceiver.messages()) { + handleIncomingMessage(message); + } + })(); + + const defaultMessageAggreggator = createMessageAggregator(); + defaultMessageAggreggator.on(ReceivedMessageAggregatorEvent.Updated, handleDefaultMessageAggregatorUpdated); + set((old) => ({ ...old, subtle: { ...old.subtle, defaultMessageAggreggator } })); + }; + + const teardown = () => { + get().subtle.messageReceiver?.close(); + set((old) => ({ ...old, subtle: { ...old.subtle, messageReceiver: null } })); + + get().subtle.defaultMessageAggreggator?.off(ReceivedMessageAggregatorEvent.Updated, handleDefaultMessageAggregatorUpdated); + set((old) => ({ ...old, subtle: { ...old.subtle, defaultMessageAggreggator: null } })); + }; + + const sendMessage: SendMessageFunction = async ( + message: Message, + options?: Message extends SentMessage ? SentMessageOptions : SentChatMessageOptions, + ) => { + const messageSender = get().subtle.messageSender; + if (!messageSender) { + throw new Error('AgentSession.sendMessage - cannot send message until room is connected and MessageSender initialized!'); + } + + set((old) => ({ ...old, sendPending: true })); + + const constructedMessage: SentMessage = typeof message === 'string' ? { + id: generateId(), + direction: 'outbound', + timestamp: new Date(), + content: { type: 'chat', text: message }, + } : message; + try { + await messageSender.send(constructedMessage, options); + } finally { + set((old) => ({ ...old, sendPending: false })); + } + }; + + const createMessageAggregator = (options: ReceivedMessageAggregatorOptions = {}) => { + const aggregator = new ReceivedMessageAggregator(options); + emitter.on(MessagesEvent.MessageReceived, aggregator.upsert); + emitter.on(MessagesEvent.Disconnected, aggregator.close); + + const closeHandler = () => { + emitter.off(MessagesEvent.MessageReceived, aggregator.upsert); + emitter.off(MessagesEvent.Disconnected, aggregator.close); + aggregator.off(ReceivedMessageAggregatorEvent.Close, closeHandler); + }; + aggregator.on(ReceivedMessageAggregatorEvent.Close, closeHandler); + + return aggregator; + }; + + return { + [Symbol.toStringTag]: "MessagesInstance", + + list: [], + sendPending: false, + send: sendMessage, + createMessageAggregator, + + subtle: { + emitter, + initialize, + teardown, + + messageSender: null, + messageReceiver: null, + defaultMessageAggreggator: null, + }, + }; +} diff --git a/agent-sdk/agent-session/RemoteTrack.ts b/agent-sdk/agent-session/RemoteTrack.ts new file mode 100644 index 000000000..3e103db88 --- /dev/null +++ b/agent-sdk/agent-session/RemoteTrack.ts @@ -0,0 +1,177 @@ +import { Participant, RemoteAudioTrack, RemoteTrackPublication, Track } from 'livekit-client'; +import { participantEvents } from './LocalTrack'; + +export type RemoteTrackInstance = { + [Symbol.toStringTag]: "RemoteTrackInstance"; + isLocal: false, + + /** Given a media element, properly plumb the media stream through to it so media can be shown / heard in the app. */ + attachToMediaElement: (element: TrackSource extends Track.Source.Microphone | Track.Source.ScreenShareAudio ? HTMLAudioElement : HTMLVideoElement) => () => void; + + setSubscribed: (subscribed: boolean) => void; + waitUntilSubscribed: (signal?: AbortSignal) => Promise; + setEnabled: (enabled: boolean) => void; + setVolume: (volume: number) => void; + + /** The type of track reprsented (ie, camera, microphone, screen share, etc) */ + source: TrackSource; + + /** Is the track currently enabled? */ + enabled: boolean; + + /** Is the track currently muted? */ + muted: boolean; + + /** Is the app currently receiving data from the SFU for this track? */ + subscribed: boolean; + + dimensions: Track.Dimensions | null; + orientation: 'landscape' | 'portrait' | null; + + subtle: { + initialize: () => void; + teardown: () => void; + publication: RemoteTrackPublication, + }; +}; + +export function createRemoteTrack( + options: { participant: Participant, publication: RemoteTrackPublication}, + get: () => RemoteTrackInstance, + set: (fn: (old: RemoteTrackInstance) => RemoteTrackInstance) => void, +): RemoteTrackInstance { + const attachToMediaElement = (element: HTMLMediaElement) => { + const track = get().subtle.publication.track; + if (!track) { + throw new Error('RemoteTrackInstance.attachToMediaElement - track not set'); + } + + track.attach(element); + return () => { + track.detach(element); + }; + }; + + const setSubscribed = (subscribed: boolean) => { + options.publication.setSubscribed(subscribed) + }; + + const waitUntilSubscribed = async (signal?: AbortSignal) => { + const publication = get().subtle.publication; + if (publication.isSubscribed) { + return; + } + + return new Promise((resolve, reject) => { + const subscribedChangedHandler = () => { + if (!publication.isSubscribed) { + return; + } + cleanup(); + resolve(); + }; + const abortHandler = () => { + cleanup(); + reject(new Error('RemoteTrack.waitUntilSubscribed - signal aborted')); + }; + + const cleanup = () => { + publication.off("subscribed", subscribedChangedHandler); + signal?.removeEventListener('abort', abortHandler); + }; + + publication.on("subscribed", subscribedChangedHandler); + signal?.addEventListener('abort', abortHandler); + }); + }; + + const setEnabled = (enabled: boolean) => { + // FIXME: add warning for other side of if? + if (options.publication instanceof RemoteTrackPublication) { + options.publication.setEnabled(enabled) + } + }; + + const setVolume = (volume: number) => { + // FIXME: add warning for other side of if? + if (options.publication instanceof RemoteTrackPublication && options.publication.track instanceof RemoteAudioTrack) { + options.publication.track.setVolume(volume); + } + }; + + const handleParticipantEvent = () => { + let enabled = false; + switch (options.publication.source) { + case Track.Source.Camera: + enabled = options.participant.isCameraEnabled; + break; + case Track.Source.Microphone: + enabled = options.participant.isMicrophoneEnabled; + break; + case Track.Source.ScreenShare: + enabled = options.participant.isScreenShareEnabled; + break; + default: + throw new Error(`RemoteTrackInstance.handleParticipantEvent - Unable to handle processing track source ${options.publication.source}.`); + } + + let orientation = null; + // Set the orientation of the video track. + // TODO: This does not handle changes in orientation after a track got published (e.g when rotating a phone camera from portrait to landscape). + if ( + typeof options.publication.dimensions?.width === 'number' && + typeof options.publication.dimensions?.height === 'number' + ) { + orientation = + options.publication.dimensions.width > options.publication.dimensions.height ? 'landscape' as const : 'portrait' as const; + } + + set((old) => ({ + ...old, + enabled, + muted: options.publication.isMuted, + dimensions: options.publication.dimensions ?? null, + subscribed: options.publication.isSubscribed, + orientation, + })); + }; + for (const eventname of participantEvents) { + options.participant.off(eventname, handleParticipantEvent); + } + + const initialize = () => { + handleParticipantEvent(); + }; + + const teardown = () => { + for (const eventname of participantEvents) { + options.participant.off(eventname, handleParticipantEvent); + }; + }; + + return { + [Symbol.toStringTag]: "RemoteTrackInstance", + isLocal: false, + + attachToMediaElement, + + setSubscribed, + waitUntilSubscribed, + setEnabled, + setVolume, + + source: options.publication.source as TrackSource, + enabled: false, + muted: false, + subscribed: false, + dimensions: null, + orientation: null, + + subtle: { + initialize, + teardown, + + publication: options.publication, + }, + }; +} diff --git a/agent-sdk/agent-session/message/ReceivedMessageAggregator.ts b/agent-sdk/agent-session/message/ReceivedMessageAggregator.ts new file mode 100644 index 000000000..96b2dce77 --- /dev/null +++ b/agent-sdk/agent-session/message/ReceivedMessageAggregator.ts @@ -0,0 +1,120 @@ +import { EventEmitter } from "events"; +import TypedEventEmitter from "typed-emitter"; +import { ReceivedMessage } from "."; + +export type ReceivedMessageAggregatorOptions = { + /** + * Number of messages to buffer internally before old messages are discarded. If not set, the + * buffer size is unlimited. + */ + bufferSize?: number; + + // FIXME: other options? +}; + +export enum ReceivedMessageAggregatorEvent { + Updated = 'updated', + Close = 'close', +} + +type ReceivedMessageAggregatorCallbacks = { + [ReceivedMessageAggregatorEvent.Updated]: () => void; + [ReceivedMessageAggregatorEvent.Close]: () => void; +}; + +/** A container for storing an ordered list of messages that can be easily changed */ +export default class ReceivedMessageAggregator extends (EventEmitter as new () => TypedEventEmitter) { + private messageById: Map = new Map(); + private messageIds: Array = []; + + private options: ReceivedMessageAggregatorOptions; + private closed: boolean = false; + + constructor(options?: ReceivedMessageAggregatorOptions) { + super(); + this.options = options ?? {}; + } + + /** Create a new aggregator pre-populated with the included messages */ + static fromIterator(input: Iterable, options?: ReceivedMessageAggregatorOptions) { + const aggregator = new this(options); + aggregator.extend(input); + return aggregator; + } + + upsert = (message: Message) => { + this.internalBulkUpsert([message]); + this.emit(ReceivedMessageAggregatorEvent.Updated); + } + + delete = (message: Message) => { + this.internalBulkDelete([message.id]); + this.emit(ReceivedMessageAggregatorEvent.Updated); + } + + extend = (input: Iterable) => { + this.internalBulkUpsert(input); + this.emit(ReceivedMessageAggregatorEvent.Updated); + } + + clear = () => { + this.messageById.clear(); + this.messageIds = []; + } + + private internalBulkUpsert(messages: Iterable) { + if (this.closed) { + throw new Error('ReceivedMessageAggregator is closed and is now immutable, no more messages can be ingested!'); + } + + // FIXME: think through this scenario: + // 1. Message `a` is upserted + // 2. `options.bufferSize` messages are upserted, evicting message `a` + // 3. Another message `a` upsert happens, should this somehow get rejected (via bloom filter / etc?) + // or just end up in the list again as a seemingly brand new message? + for (const message of messages) { + this.messageById.set(message.id, message); + if (!this.messageIds.includes(message.id)) { + this.messageIds.push(message.id); + } + + // Truncate message buffer if it is now too large + const numberOfMessagesToRemove = typeof this.options.bufferSize === 'number' ? ( + this.messageIds.length - this.options.bufferSize + ) : 0; + if (numberOfMessagesToRemove > 0) { + const idsToDelete = this.messageIds.slice(0, numberOfMessagesToRemove); + this.internalBulkDelete(idsToDelete); + } + } + } + private internalBulkDelete(messageIdsToDelete: Array) { + if (this.closed) { + throw new Error('ReceivedMessageAggregator is closed and is now immutable, no more messages can be deleted!'); + } + + for (const id of messageIdsToDelete) { + this.messageById.delete(id); + } + this.messageIds = this.messageIds.filter(id => !messageIdsToDelete.includes(id)); + } + + *[Symbol.iterator]() { + for (const id of this.messageIds) { + const message = this.messageById.get(id); + if (!message) { + continue; + } + yield message; + } + } + + toArray() { + return Array.from(this); + } + + close = () => { + this.closed = true; + this.emit(ReceivedMessageAggregatorEvent.Close); + } +} diff --git a/agent-sdk/agent-session/message/index.ts b/agent-sdk/agent-session/message/index.ts new file mode 100644 index 000000000..371d0d62d --- /dev/null +++ b/agent-sdk/agent-session/message/index.ts @@ -0,0 +1,47 @@ +import { SendTextOptions } from 'livekit-client'; +import { TextStreamInfo } from '@/agent-sdk/external-deps/client-sdk-js'; + +export type BaseMessageId = string; +export type BaseMessage = { + id: BaseMessageId; + direction: Direction; + timestamp: Date; + content: Content; +}; + +export type ReceivedTranscriptionMessage = BaseMessage<'inbound', { + type: 'transcription'; + text: string; + participantInfo: { identity: string }; + streamInfo: TextStreamInfo; +}>; + +export type ReceivedChatLoopbackMessage = BaseMessage<'inbound', { type: 'chat'; text: string }>; + +export type ReceivedMessage = + | ReceivedTranscriptionMessage + | ReceivedChatLoopbackMessage; + // TODO: images? attachments? rpc? + +export type SentChatMessage = BaseMessage<'outbound', | { type: 'chat', text: string }>; +export type SentChatMessageOptions = SendTextOptions | undefined; + +export type SentMessage = + | SentChatMessage; + +export type SentMessageOptions = + | (Message extends SentChatMessage ? SentChatMessageOptions : never); + +// FIXME: maybe update all these functions to not have default exports as to avoid the duplicate +// names being written here? +export { default as MessageSender } from './send/MessageSender'; +export { default as ChatMessageSender } from './send/ChatMessageSender'; +export { default as CombinedMessageSender } from './send/CombinedMessageSender'; +export { default as MessageReceiver } from './receive/MessageReceiver'; +export { default as CombinedMessageReceiver } from './receive/CombinedMessageReceiver'; +export { default as TranscriptionMessageReceiver } from './receive/TranscriptionMessageReceiver'; +export { + default as ReceivedMessageAggregator, + type ReceivedMessageAggregatorOptions, + ReceivedMessageAggregatorEvent, +} from './ReceivedMessageAggregator'; diff --git a/agent-sdk/agent-session/message/receive/CombinedMessageReceiver.ts b/agent-sdk/agent-session/message/receive/CombinedMessageReceiver.ts new file mode 100644 index 000000000..0e3389868 --- /dev/null +++ b/agent-sdk/agent-session/message/receive/CombinedMessageReceiver.ts @@ -0,0 +1,31 @@ +import { parallelMerge } from "streaming-iterables"; +import MessageReceiver from "./MessageReceiver"; + +/** + * A `MessageReceiver` that zips together multiple underlying `MessageReceiver`s into one unified source. + */ +export default class CombinedMessageReceiver extends MessageReceiver { + private messageReceivers: Array; + + constructor(...messageReceivers: Array) { + super(); + this.messageReceivers = messageReceivers; + } + + async start() { + const messagesAsyncIterators = this.messageReceivers.map(mr => mr.messages()); + (async () => { + for await (const inboundMessage of parallelMerge(...messagesAsyncIterators)) { + this.enqueue(inboundMessage); + } + })().catch(err => { + this.closeWithError(err); + }); + + return () => { + for (const messageReceiver of this.messageReceivers) { + messageReceiver.close(); + } + }; + } +} diff --git a/agent-sdk/agent-session/message/receive/MessageReceiver.ts b/agent-sdk/agent-session/message/receive/MessageReceiver.ts new file mode 100644 index 000000000..9265d5d46 --- /dev/null +++ b/agent-sdk/agent-session/message/receive/MessageReceiver.ts @@ -0,0 +1,57 @@ +import Future from "@/agent-sdk/lib/future"; +import { type ReceivedMessage } from ".."; + +/** Thrown to signal that a MessageReceiver.messages() generator invocation was terminated out of band */ +export class MessageReceiverTerminationError extends Error {} + +/** + * A MessageReceiver acts as a source for all messages in the system. + */ +export default abstract class MessageReceiver { + private signallingFuture = new Future(); + private queue: Array = []; + + // This returns a cleanup function like useEffect maybe? That could be a good pattern? + abstract start(): Promise void)>; + + /** Submit new IncomingMessages to be received by anybody reading from messages() */ + protected enqueue(...messages: Array) { + for (const message of messages) { + this.queue.push(message); + } + const oldSignallingFuture = this.signallingFuture; + this.signallingFuture = new Future(); + oldSignallingFuture.resolve?.(null); + } + + /** Terminate the messages() iteration from an external source */ + close() { + const name: string = (this as any).constructor.name ?? 'MessageReceiver'; + this.signallingFuture.reject?.( + new MessageReceiverTerminationError(`${name} terminated messages() iteration`) + ); + } + + closeWithError(error: Error) { + this.signallingFuture.reject?.(error); + } + + /** A stream of newly generated `IncomingMessage`s */ + async *messages(): AsyncGenerator { + const cleanup = await this.start(); + try { + while (true) { + await this.signallingFuture.promise; + yield* this.queue; + this.queue = []; + } + } catch (err) { + if (err instanceof MessageReceiverTerminationError) { + cleanup?.(); + return; + } + } finally { + cleanup?.(); + } + } +} diff --git a/agent-sdk/agent-session/message/receive/TranscriptionMessageReceiver.ts b/agent-sdk/agent-session/message/receive/TranscriptionMessageReceiver.ts new file mode 100644 index 000000000..601e54788 --- /dev/null +++ b/agent-sdk/agent-session/message/receive/TranscriptionMessageReceiver.ts @@ -0,0 +1,134 @@ +import { Room, TextStreamReader } from "livekit-client"; +import { DataTopic } from "@/agent-sdk/external-deps/components-js"; +import { TextStreamInfo } from "@/agent-sdk/external-deps/client-sdk-js"; + +import { type ReceivedMessage, type ReceivedTranscriptionMessage } from ".."; +import MessageReceiver from "./MessageReceiver"; +import { ParticipantAttributes } from "@/agent-sdk/lib/participant-attributes"; + +/** + * Processes new `lk.transcription` data stream events generated by the agent for both user and + * LLM generated speach and generates corresponding `TranscriptionReceivedMessage`s. + * + * For agent messages, a new text stream is emitted for each message, and the stream is closed when the message is finalized. + * Each agent message is delivered in chunks which must be accumulated and published into the message stream. + * + * For user messages, the full transcription is sent each time, but may be updated until finalized. + * + * The `lk.segment_id` attribute is stable and unique across the lifetime of the message. + * + * Example agent generated transcriptions: + * ``` + * { segment_id: "1", content: "Hello" } + * { segment_id: "1", content: " world" } + * { segment_id: "1", content: "!" } + * { segment_id: "2", content: "Hello" } + * { segment_id: "2", content: " Apple" } + * { segment_id: "2", content: "!" } + * ``` + * + * Example user generated transcriptions: + * ``` + * { segment_id: "3", content: "Hello" } + * { segment_id: "3", content: "Hello world!" } + * { segment_id: "4", content: "Hello" } + * { segment_id: "4", content: "Hello Apple!" } + * ``` + */ +export default class TranscriptionMessageReceiver extends MessageReceiver { + room: Room; + inFlightMessages: Array = []; + + constructor(room: Room) { + super(); + this.room = room; + } + + async start() { + const textStreamHandler = async (reader: TextStreamReader, participantInfo: { identity: string }) => { + const transcriptionSegmentId = reader.info.attributes?.[ParticipantAttributes.TranscriptionSegmentId]; + const isTranscription = Boolean(transcriptionSegmentId); + const isFinal = reader.info.attributes?.[ParticipantAttributes.TranscriptionFinal] === 'true'; + + let currentStreamId = reader.info.id; + + // Find and update the stream in our array + let messageIndex = this.inFlightMessages.findIndex((message) => { + if (message.content.streamInfo.id === reader.info.id) { + return true; + } + if (isTranscription && transcriptionSegmentId === message.content.streamInfo.attributes?.[ParticipantAttributes.TranscriptionSegmentId]) { + return true; + } + return false; + }); + + // FIXME: I think there may need to be some error handling logic to ensure the below for await + // properly exposes errors via `this.closeWithError` + for await (const chunk of reader) { + const existingMessage = this.inFlightMessages[messageIndex]; + if (existingMessage) { + if (existingMessage.content.streamInfo.id === currentStreamId) { + // Stream hasn't changed, just append content + const updatedMessage = this.appendInFlightMessageText(messageIndex, chunk, reader.info); + this.enqueue(updatedMessage); + } else { + // Stream has changed, so fully replace content + const updatedMessage = this.replaceInFlightMessageText(messageIndex, chunk, reader.info); + this.enqueue(updatedMessage); + } + + } else { + // Handle case where stream ID wasn't found (new message) + const message: ReceivedMessage = { + id: reader.info.id, + direction: 'inbound', + timestamp: new Date(reader.info.timestamp), + content: { + type: 'transcription', + text: chunk, + participantInfo, + streamInfo: reader.info, + }, + }; + this.inFlightMessages.push(message); + messageIndex = this.inFlightMessages.length-1; + this.enqueue(message); + } + } + + if (isFinal) { + this.inFlightMessages.splice(messageIndex, 1); + console.log('!! MESSAGE DONE!', this.inFlightMessages); + } + }; + this.room.registerTextStreamHandler(DataTopic.TRANSCRIPTION, textStreamHandler); + + return () => { + this.room.unregisterTextStreamHandler(DataTopic.TRANSCRIPTION); + }; + } + + private replaceInFlightMessageText(messageIndex: number, text: string, streamInfo: TextStreamInfo) { + this.inFlightMessages[messageIndex] = { + ...this.inFlightMessages[messageIndex], + content: { + ...this.inFlightMessages[messageIndex].content, + text, + streamInfo, + }, + }; + return this.inFlightMessages[messageIndex]; + } + private appendInFlightMessageText(messageIndex: number, text: string, streamInfo: TextStreamInfo) { + this.inFlightMessages[messageIndex] = { + ...this.inFlightMessages[messageIndex], + content: { + ...this.inFlightMessages[messageIndex].content, + text: this.inFlightMessages[messageIndex].content.text + text, + streamInfo, + }, + }; + return this.inFlightMessages[messageIndex]; + } +} diff --git a/agent-sdk/agent-session/message/send/ChatMessageSender.ts b/agent-sdk/agent-session/message/send/ChatMessageSender.ts new file mode 100644 index 000000000..0c14a166c --- /dev/null +++ b/agent-sdk/agent-session/message/send/ChatMessageSender.ts @@ -0,0 +1,82 @@ +import { LocalParticipant } from "livekit-client"; + +import { SentMessage, SentMessageOptions, type ReceivedChatLoopbackMessage, type SentChatMessage } from ".."; +import MessageSender from "./MessageSender"; +import MessageReceiver from "../receive/MessageReceiver"; +import { DataTopic } from "@/agent-sdk/external-deps/components-js"; + + +/** A `MessageSender` for sending chat messages via the `lk.chat` datastream topic. */ +export default class ChatMessageSender extends MessageSender { + private localParticipant: LocalParticipant; + private loopbackReceiverCallbacks: Set<(incomingMessage: SentChatMessage) => void> = new Set(); + + constructor(localParticipant: LocalParticipant) { + super(); + this.localParticipant = localParticipant; + } + + isSentChatMessage(message: SentMessage): message is SentChatMessage { + return message.content.type === 'chat'; + } + + async send(message: SentChatMessage, options: SentMessageOptions) { + if (!this.isSentChatMessage(message)) { + return; + } + const chatMessageOptions = { + // FIXME: maybe there's a more elegant way of doing this, where it also + // gets checked as part of `isSentChatMessage`? + ...options as SentMessageOptions, + topic: DataTopic.CHAT, + }; + + for (const callback of this.loopbackReceiverCallbacks) { + callback(message); + } + + await this.localParticipant.sendText(message.content.text, chatMessageOptions); + + // FIXME: do I need to handle sending legacy chat messages too? + // const legacyChatMsg: LegacyChatMessage = { + // id: message.id, + // timestamp: message.timestamp.getTime(), + // message: message.content.text, + // }; + // const encodeLegacyMsg = (message: LegacyChatMessage) => new TextEncoder().encode(JSON.stringify(message)); + // await this.localParticipant.publishData(encodeLegacyMsg(legacyChatMsg), { + // topic: "lk-chat-topic",//LegacyDataTopic.CHAT, + // reliable: true, + // }); + } + + /** + * Generates a corresponding MessageReceiver which will emit "received" versions of each chat + * message, that can be correspondingly merged into the message list. + * + * FIXME: should this be on the MessageSender instead, so this can be done for any sender? + */ + generateLoopbackMessageReceiver() { + const chatMessageSender = this; + class ChatMessageLoopbackReceiver extends MessageReceiver { + async start() { + const callback = (incomingMessage: SentChatMessage) => { + const outgoingMessage: ReceivedChatLoopbackMessage = { + id: incomingMessage.id, + direction: 'inbound', + timestamp: incomingMessage.timestamp, + content: { type: 'chat', text: incomingMessage.content.text }, + }; + this.enqueue(outgoingMessage); + }; + + chatMessageSender.loopbackReceiverCallbacks.add(callback); + return () => { + chatMessageSender.loopbackReceiverCallbacks.delete(callback); + }; + } + } + + return new ChatMessageLoopbackReceiver(); + } +} diff --git a/agent-sdk/agent-session/message/send/CombinedMessageSender.ts b/agent-sdk/agent-session/message/send/CombinedMessageSender.ts new file mode 100644 index 000000000..5cba83161 --- /dev/null +++ b/agent-sdk/agent-session/message/send/CombinedMessageSender.ts @@ -0,0 +1,21 @@ +import { SentMessageOptions, type SentMessage } from ".."; +import MessageSender from "./MessageSender"; + +/** + * A `MessageSender` that routes any `SentMessage` to the first underlying `MessageSender` which + * can accept it. + */ +export default class CombinedMessageSender extends MessageSender { + private messageSenders: Array; + + constructor(...messageSenders: Array) { + super(); + this.messageSenders = messageSenders; + } + + async send(message: SentMessage, options: SentMessageOptions) { + await Promise.all(this.messageSenders.map(async (sender) => { + return sender.send(message, options); + })); + } +} diff --git a/agent-sdk/agent-session/message/send/MessageSender.ts b/agent-sdk/agent-session/message/send/MessageSender.ts new file mode 100644 index 000000000..6f8a874c5 --- /dev/null +++ b/agent-sdk/agent-session/message/send/MessageSender.ts @@ -0,0 +1,5 @@ +import { SentMessageOptions, type SentMessage } from ".."; + +export default abstract class MessageSender { + abstract send(message: SentMessage, options: SentMessageOptions): Promise; +} diff --git a/agent-sdk/external-deps/client-sdk-js.tsx b/agent-sdk/external-deps/client-sdk-js.tsx new file mode 100644 index 000000000..4765dcbc7 --- /dev/null +++ b/agent-sdk/external-deps/client-sdk-js.tsx @@ -0,0 +1,160 @@ +// This file contains pieces copied and pasted from the livekit-client package, largely internal +// things that aren't currently being exported. +// +// FIXME: export this stuff in livekit-client or explicitly vendor this stuff into the agents sdk + +import { ChatMessage, ConnectionQuality, ConnectionState, DataPacket_Kind, DisconnectReason, LocalParticipant, LocalTrackPublication, LocalVideoTrack, Participant, RemoteParticipant, RemoteTrack, RemoteTrackPublication, SubscriptionError, Track, TrackPublication, TranscriptionSegment } from "livekit-client"; +// import { type SipDtmf, type MetricsBatch } from '@livekit/protocol'; +import { ParticipantPermission } from "livekit-server-sdk"; + +export interface BaseStreamInfo { + id: string; + mimeType: string; + topic: string; + timestamp: number; + /** total size in bytes for finite streams and undefined for streams of unknown size */ + size?: number; + attributes?: Record; +} +export interface ByteStreamInfo extends BaseStreamInfo { + name: string; +} + +export interface TextStreamInfo extends BaseStreamInfo {} + +export type ParticipantEventCallbacks = { + trackPublished: (publication: RemoteTrackPublication) => void; + trackSubscribed: (track: RemoteTrack, publication: RemoteTrackPublication) => void; + trackSubscriptionFailed: (trackSid: string, reason?: SubscriptionError) => void; + trackUnpublished: (publication: RemoteTrackPublication) => void; + trackUnsubscribed: (track: RemoteTrack, publication: RemoteTrackPublication) => void; + trackMuted: (publication: TrackPublication) => void; + trackUnmuted: (publication: TrackPublication) => void; + localTrackPublished: (publication: LocalTrackPublication) => void; + localTrackUnpublished: (publication: LocalTrackPublication) => void; + localTrackCpuConstrained: (track: LocalVideoTrack, publication: LocalTrackPublication) => void; + localSenderCreated: (sender: RTCRtpSender, track: Track) => void; + participantMetadataChanged: (prevMetadata: string | undefined, participant?: any) => void; + participantNameChanged: (name: string) => void; + dataReceived: (payload: Uint8Array, kind: DataPacket_Kind) => void; + sipDTMFReceived: (dtmf: unknown /* SipDTMF */) => void; + transcriptionReceived: ( + transcription: TranscriptionSegment[], + publication?: TrackPublication, + ) => void; + isSpeakingChanged: (speaking: boolean) => void; + connectionQualityChanged: (connectionQuality: ConnectionQuality) => void; + trackStreamStateChanged: ( + publication: RemoteTrackPublication, + streamState: Track.StreamState, + ) => void; + trackSubscriptionPermissionChanged: ( + publication: RemoteTrackPublication, + status: TrackPublication.PermissionStatus, + ) => void; + mediaDevicesError: (error: Error, kind?: MediaDeviceKind) => void; + audioStreamAcquired: () => void; + participantPermissionsChanged: (prevPermissions?: ParticipantPermission) => void; + trackSubscriptionStatusChanged: ( + publication: RemoteTrackPublication, + status: TrackPublication.SubscriptionStatus, + ) => void; + attributesChanged: (changedAttributes: Record) => void; + localTrackSubscribed: (trackPublication: LocalTrackPublication) => void; + chatMessage: (msg: ChatMessage) => void; + active: () => void; +}; + +export type RoomEventCallbacks = { + connected: () => void; + reconnecting: () => void; + signalReconnecting: () => void; + reconnected: () => void; + disconnected: (reason?: DisconnectReason) => void; + connectionStateChanged: (state: ConnectionState) => void; + moved: (name: string) => void; + mediaDevicesChanged: () => void; + participantConnected: (participant: RemoteParticipant) => void; + participantDisconnected: (participant: RemoteParticipant) => void; + trackPublished: (publication: RemoteTrackPublication, participant: RemoteParticipant) => void; + trackSubscribed: ( + track: RemoteTrack, + publication: RemoteTrackPublication, + participant: RemoteParticipant, + ) => void; + trackSubscriptionFailed: ( + trackSid: string, + participant: RemoteParticipant, + reason?: SubscriptionError, + ) => void; + trackUnpublished: (publication: RemoteTrackPublication, participant: RemoteParticipant) => void; + trackUnsubscribed: ( + track: RemoteTrack, + publication: RemoteTrackPublication, + participant: RemoteParticipant, + ) => void; + trackMuted: (publication: TrackPublication, participant: Participant) => void; + trackUnmuted: (publication: TrackPublication, participant: Participant) => void; + localTrackPublished: (publication: LocalTrackPublication, participant: LocalParticipant) => void; + localTrackUnpublished: ( + publication: LocalTrackPublication, + participant: LocalParticipant, + ) => void; + localAudioSilenceDetected: (publication: LocalTrackPublication) => void; + participantMetadataChanged: ( + metadata: string | undefined, + participant: RemoteParticipant | LocalParticipant, + ) => void; + participantNameChanged: (name: string, participant: RemoteParticipant | LocalParticipant) => void; + participantPermissionsChanged: ( + prevPermissions: ParticipantPermission | undefined, + participant: RemoteParticipant | LocalParticipant, + ) => void; + participantAttributesChanged: ( + changedAttributes: Record, + participant: RemoteParticipant | LocalParticipant, + ) => void; + activeSpeakersChanged: (speakers: Array) => void; + roomMetadataChanged: (metadata: string) => void; + dataReceived: ( + payload: Uint8Array, + participant?: RemoteParticipant, + kind?: DataPacket_Kind, + topic?: string, + ) => void; + sipDTMFReceived: (dtmf: unknown /* SipDTMF */, participant?: RemoteParticipant) => void; + transcriptionReceived: ( + transcription: TranscriptionSegment[], + participant?: Participant, + publication?: TrackPublication, + ) => void; + connectionQualityChanged: (quality: ConnectionQuality, participant: Participant) => void; + mediaDevicesError: (error: Error, kind?: MediaDeviceKind) => void; + trackStreamStateChanged: ( + publication: RemoteTrackPublication, + streamState: Track.StreamState, + participant: RemoteParticipant, + ) => void; + trackSubscriptionPermissionChanged: ( + publication: RemoteTrackPublication, + status: TrackPublication.PermissionStatus, + participant: RemoteParticipant, + ) => void; + trackSubscriptionStatusChanged: ( + publication: RemoteTrackPublication, + status: TrackPublication.SubscriptionStatus, + participant: RemoteParticipant, + ) => void; + audioPlaybackChanged: (playing: boolean) => void; + videoPlaybackChanged: (playing: boolean) => void; + signalConnected: () => void; + recordingStatusChanged: (recording: boolean) => void; + participantEncryptionStatusChanged: (encrypted: boolean, participant?: Participant) => void; + encryptionError: (error: Error) => void; + dcBufferStatusChanged: (isLow: boolean, kind: DataPacket_Kind) => void; + activeDeviceChanged: (kind: MediaDeviceKind, deviceId: string) => void; + chatMessage: (message: ChatMessage, participant?: RemoteParticipant | LocalParticipant) => void; + localTrackSubscribed: (publication: LocalTrackPublication, participant: LocalParticipant) => void; + metricsReceived: (metrics: unknown /* MetricsBatch */, participant?: Participant) => void; + participantActive: (participant: Participant) => void; +}; diff --git a/agent-sdk/external-deps/components-js.tsx b/agent-sdk/external-deps/components-js.tsx new file mode 100644 index 000000000..ea0919bf8 --- /dev/null +++ b/agent-sdk/external-deps/components-js.tsx @@ -0,0 +1,312 @@ +import { Participant, ParticipantEvent, RoomEvent, Track, TrackPublication, TranscriptionSegment } from "livekit-client"; + +// This file contains pieces copied and pasted from the components-js repository +// Something is messed up with my local development environment and I can't figure out how to import +// these properly +// +// FIXME: figure out what is going on here or explicitly vendor this stuff into the agents sdk + +/** @public */ +export type TrackReference = { + participant: Participant; + publication: TrackPublication; + source: Track.Source; +}; + +export const participantTrackEvents = [ + ParticipantEvent.TrackPublished, + ParticipantEvent.TrackUnpublished, + ParticipantEvent.TrackMuted, + ParticipantEvent.TrackUnmuted, + ParticipantEvent.TrackStreamStateChanged, + ParticipantEvent.TrackSubscribed, + ParticipantEvent.TrackUnsubscribed, + ParticipantEvent.TrackSubscriptionPermissionChanged, + ParticipantEvent.TrackSubscriptionFailed, + ParticipantEvent.LocalTrackPublished, + ParticipantEvent.LocalTrackUnpublished, +]; + +export const roomTrackEvents = [ + RoomEvent.ParticipantConnected, + RoomEvent.ParticipantDisconnected, + RoomEvent.ConnectionStateChanged, + RoomEvent.LocalTrackPublished, + RoomEvent.LocalTrackUnpublished, + RoomEvent.TrackPublished, + RoomEvent.TrackUnpublished, + RoomEvent.TrackSubscriptionStatusChanged, + RoomEvent.LocalTrackPublished, + RoomEvent.LocalTrackUnpublished, + + RoomEvent.RoomMetadataChanged, + + RoomEvent.ActiveSpeakersChanged, + RoomEvent.ConnectionQualityChanged, + + RoomEvent.ParticipantPermissionsChanged, + RoomEvent.ParticipantMetadataChanged, + RoomEvent.ParticipantNameChanged, + RoomEvent.ParticipantAttributesChanged, + + RoomEvent.TrackMuted, + RoomEvent.TrackUnmuted, + RoomEvent.TrackStreamStateChanged, + RoomEvent.TrackSubscriptionFailed, + RoomEvent.TrackSubscriptionPermissionChanged, + RoomEvent.TrackSubscriptionStatusChanged, +]; + +export type ReceivedTranscriptionSegment = TranscriptionSegment & { + receivedAtMediaTimestamp: number; + receivedAt: number; +}; + +export function addMediaTimestampToTranscription( + segment: TranscriptionSegment, + timestamps: { timestamp: number; rtpTimestamp?: number }, +): ReceivedTranscriptionSegment { + return { + ...segment, + receivedAtMediaTimestamp: timestamps.rtpTimestamp ?? 0, + receivedAt: timestamps.timestamp, + }; +} + +/** + * @returns An array of unique (by id) `TranscriptionSegment`s. Latest wins. If the resulting array would be longer than `windowSize`, the array will be reduced to `windowSize` length + */ +export function dedupeSegments( + prevSegments: T[], + newSegments: T[], + windowSize: number, +) { + return [...prevSegments, ...newSegments] + .reduceRight((acc, segment) => { + if (!acc.find((val) => val.id === segment.id)) { + acc.unshift(segment); + } + return acc; + }, [] as Array) + .slice(0 - windowSize); +} + +/** + * Create `TrackReferences` for all tracks that are included in the sources property. + * */ +export function getParticipantTrackRefs( + participant: Participant, + identifier: any/* ParticipantTrackIdentifier */, + onlySubscribedTracks = false, +): TrackReference[] { + const { sources, kind, name } = identifier; + const sourceReferences = Array.from(participant.trackPublications.values()) + .filter( + (pub) => + (!sources || sources.includes(pub.source)) && + (!kind || pub.kind === kind) && + (!name || pub.trackName === name) && + // either return all or only the ones that are subscribed + (!onlySubscribedTracks || pub.track), + ) + .map((track): TrackReference => { + return { + participant: participant, + publication: track, + source: track.source, + }; + }); + + return sourceReferences; +} + +export interface TextStreamData { + text: string; + participantInfo: { identity: string }; // Replace with the correct type from livekit-client + streamInfo: any /* TextStreamInfo */; +} + +export const DataTopic = { + CHAT: 'lk.chat', + TRANSCRIPTION: 'lk.transcription', +} as const; + +export const trackSourceToProtocol = (source: Track.Source) => { + // NOTE: this mapping avoids importing the protocol package as that leads to a significant bundle size increase + switch (source) { + case Track.Source.Camera: + return 1; + case Track.Source.Microphone: + return 2; + case Track.Source.ScreenShare: + return 3; + default: + return 0; + } +}; + +const cssPrefix = 'lk'; + +type JsonPrimitive = string | number | boolean | null; +type JsonArray = JsonValue[]; +type JsonObject = { [key: string]: JsonValue }; +type JsonValue = JsonPrimitive | JsonArray | JsonObject; + +/** + * Persists a serializable object to local storage associated with the specified key. + * @internal + */ +function saveToLocalStorage(key: string, value: T): void { + if (typeof localStorage === 'undefined') { + console.error('Local storage is not available.'); + return; + } + + try { + if (value) { + const nonEmptySettings = Object.fromEntries( + Object.entries(value).filter(([, value]) => value !== ''), + ); + localStorage.setItem(key, JSON.stringify(nonEmptySettings)); + } + } catch (error) { + console.error(`Error setting item to local storage: ${error}`); + } +} + +/** + * Retrieves a serializable object from local storage by its key. + * @internal + */ +function loadFromLocalStorage(key: string): T | undefined { + if (typeof localStorage === 'undefined') { + console.error('Local storage is not available.'); + return undefined; + } + + try { + const item = localStorage.getItem(key); + if (!item) { + console.warn(`Item with key ${key} does not exist in local storage.`); + return undefined; + } + return JSON.parse(item); + } catch (error) { + console.error(`Error getting item from local storage: ${error}`); + return undefined; + } +} + +/** + * Generate a pair of functions to load and save a value of type T to local storage. + * @internal + */ +export function createLocalStorageInterface( + key: string, +): { load: () => T | undefined; save: (value: T) => void } { + return { + load: () => loadFromLocalStorage(key), + save: (value: T) => saveToLocalStorage(key, value), + }; +} + +const USER_CHOICES_KEY = `${cssPrefix}-user-choices` as const; + +/** + * @public + * Represents the user's choices for video and audio input devices, + * as well as their username. + */ +export type LocalUserChoices = { + /** + * Whether video input is enabled. + * @defaultValue `true` + */ + videoEnabled: boolean; + /** + * Whether audio input is enabled. + * @defaultValue `true` + */ + audioEnabled: boolean; + /** + * The device ID of the video input device to use. + * @defaultValue `''` + */ + videoDeviceId: string; + /** + * The device ID of the audio input device to use. + * @defaultValue `''` + */ + audioDeviceId: string; + /** + * The username to use. + * @defaultValue `''` + */ + username: string; +}; + +export const defaultUserChoices: LocalUserChoices = { + videoEnabled: true, + audioEnabled: true, + videoDeviceId: 'default', + audioDeviceId: 'default', + username: '', +} as const; + +/** + * The type of the object stored in local storage. + * @remarks + * TODO: Replace this type with `LocalUserChoices` after removing the deprecated properties from `LocalUserChoices`. + * @internal + */ +type TempStorageType = Omit; +const { load, save } = createLocalStorageInterface(USER_CHOICES_KEY); + +/** + * Saves user choices to local storage. + * @alpha + */ +export function saveUserChoices( + userChoices: LocalUserChoices, + /** + * Whether to prevent saving user choices to local storage. + */ + preventSave: boolean = false, +): void { + if (preventSave === true) { + return; + } + save(userChoices); +} + +/** + * Reads the user choices from local storage, or returns the default settings if none are found. + * @remarks + * The deprecated parameters `e2ee` and `sharedPassphrase` are not read from local storage + * and always return the value from the passed `defaults` or internal defaults. + * @alpha + */ +export function loadUserChoices( + defaults?: Partial, + /** + * Whether to prevent loading from local storage and return default values instead. + * @defaultValue false + */ + preventLoad: boolean = false, +): LocalUserChoices { + const fallback: LocalUserChoices = { + videoEnabled: defaults?.videoEnabled ?? defaultUserChoices.videoEnabled, + audioEnabled: defaults?.audioEnabled ?? defaultUserChoices.audioEnabled, + videoDeviceId: defaults?.videoDeviceId ?? defaultUserChoices.videoDeviceId, + audioDeviceId: defaults?.audioDeviceId ?? defaultUserChoices.audioDeviceId, + username: defaults?.username ?? defaultUserChoices.username, + }; + + if (preventLoad) { + return fallback; + } else { + const maybeLoadedObject = load(); + const result = { ...fallback, ...(maybeLoadedObject ?? {}) }; + return result; + } +} diff --git a/agent-sdk/index.tsx b/agent-sdk/index.tsx new file mode 100644 index 000000000..d91f4b500 --- /dev/null +++ b/agent-sdk/index.tsx @@ -0,0 +1,241 @@ +import * as React from "react"; +import { useEffect, useCallback, useMemo, useRef } from "react"; +import { create } from 'zustand'; +import { Participant, Track, TrackPublication } from "livekit-client"; +import { AgentSessionConnectionState, AgentSessionInstance, AgentSessionOptions, createAgentSession } from "./agent-session/AgentSession"; +import { AgentInstance } from "./agent-session/Agent"; +import { RemoteTrackInstance } from "./agent-session/RemoteTrack"; +import TypedEventEmitter, { EventMap } from "typed-emitter"; +import { LocalTrackInstance } from "./agent-session/LocalTrack"; +import { AgentState, BarVisualizer, BarVisualizerProps, TrackReference } from "@livekit/components-react"; +import { ManualConnectionCredentials } from "./agent-session/ConnectionCredentialsProvider"; + +export const AgentVideoTrack: React.FunctionComponent<{ + className?: string, + track: LocalTrackInstance | RemoteTrackInstance, +} & React.HTMLAttributes> = ({ track, ...rest }) => { + // FIXME: imperative handle logic + const mediaElementRef = useRef(null); + + useEffect(() => { + if (!mediaElementRef.current) { + return; + } + const mediaElement = mediaElementRef.current; + + if (!track.enabled) { + return; + } + + let cleanup: (() => void) | null = null; + (async () => { + if (!track.isLocal) { + // FIXME: intersection observer logic + track.setSubscribed(true); + await track.waitUntilSubscribed(); // FIXME: move inside of attachToMediaElement + } + + cleanup = track.attachToMediaElement(mediaElement); + })() + + return () => { + if (!track.isLocal) { + track.setSubscribed(false); + } + cleanup?.(); + }; + }, [track]); + + return ( +