diff --git a/src/api/SignalClient.ts b/src/api/SignalClient.ts index 4df1a4fd22..ee91c25ff2 100644 --- a/src/api/SignalClient.ts +++ b/src/api/SignalClient.ts @@ -12,6 +12,7 @@ import { } from '../proto/livekit_models'; import { AddTrackRequest, + AudioTrackMuxUpdate, ConnectionQualityUpdate, JoinResponse, LeaveRequest, @@ -134,6 +135,8 @@ export class SignalClient { onLeave?: (leave: LeaveRequest) => void; + onAudioMuxUpdate?: (update: AudioTrackMuxUpdate) => void; + connectOptions?: ConnectOpts; ws?: WebSocket; @@ -588,6 +591,10 @@ export class SignalClient { } } else if (msg.$case === 'pong') { this.resetPingTimeout(); + } else if (msg.$case === 'audioMuxUpdate') { + if (this.onAudioMuxUpdate) { + this.onAudioMuxUpdate(msg.audioMuxUpdate); + } } else if (msg.$case === 'pongResp') { this.rtt = Date.now() - msg.pongResp.lastPingTimestamp; this.resetPingTimeout(); diff --git a/src/proto/livekit_rtc.ts b/src/proto/livekit_rtc.ts index 45fd359fc7..d05c7bf099 100644 --- a/src/proto/livekit_rtc.ts +++ b/src/proto/livekit_rtc.ts @@ -176,7 +176,8 @@ export interface SignalResponse { | { $case: "trackUnpublished"; trackUnpublished: TrackUnpublishedResponse } | { $case: "pong"; pong: number } | { $case: "reconnect"; reconnect: ReconnectResponse } - | { $case: "pongResp"; pongResp: Pong }; + | { $case: "pongResp"; pongResp: Pong } + | { $case: "audioMuxUpdate"; audioMuxUpdate: AudioTrackMuxUpdate }; } export interface SimulcastCodec { @@ -406,6 +407,16 @@ export interface SimulateScenario { | { $case: "switchCandidateProtocol"; switchCandidateProtocol: CandidateProtocol }; } +export interface AudioTrackMuxUpdate { + audioTrackMuxes: AudioTrackMuxInfo[]; +} + +export interface AudioTrackMuxInfo { + sdpTrackId: string; + trackSid: string; + participantSid: string; +} + export interface Ping { timestamp: number; /** rtt in milliseconds calculated by client */ @@ -794,6 +805,9 @@ export const SignalResponse = { Pong.encode(message.message.pongResp, writer.uint32(162).fork()).ldelim(); break; } + if (message.message?.$case === "audioMuxUpdate") { + AudioTrackMuxUpdate.encode(message.message.audioMuxUpdate, writer.uint32(170).fork()).ldelim(); + } return writer; }, @@ -882,6 +896,12 @@ export const SignalResponse = { case 20: message.message = { $case: "pongResp", pongResp: Pong.decode(reader, reader.uint32()) }; break; + case 21: + message.message = { + $case: "audioMuxUpdate", + audioMuxUpdate: AudioTrackMuxUpdate.decode(reader, reader.uint32()), + }; + break; default: reader.skipType(tag & 7); break; @@ -936,6 +956,8 @@ export const SignalResponse = { ? { $case: "reconnect", reconnect: ReconnectResponse.fromJSON(object.reconnect) } : isSet(object.pongResp) ? { $case: "pongResp", pongResp: Pong.fromJSON(object.pongResp) } + : isSet(object.audioMuxUpdate) + ? { $case: "audioMuxUpdate", audioMuxUpdate: AudioTrackMuxUpdate.fromJSON(object.audioMuxUpdate) } : undefined, }; }, @@ -987,6 +1009,9 @@ export const SignalResponse = { (obj.reconnect = message.message?.reconnect ? ReconnectResponse.toJSON(message.message?.reconnect) : undefined); message.message?.$case === "pongResp" && (obj.pongResp = message.message?.pongResp ? Pong.toJSON(message.message?.pongResp) : undefined); + message.message?.$case === "audioMuxUpdate" && (obj.audioMuxUpdate = message.message?.audioMuxUpdate + ? AudioTrackMuxUpdate.toJSON(message.message?.audioMuxUpdate) + : undefined); return obj; }, @@ -1122,6 +1147,16 @@ export const SignalResponse = { ) { message.message = { $case: "pongResp", pongResp: Pong.fromPartial(object.message.pongResp) }; } + if ( + object.message?.$case === "audioMuxUpdate" && + object.message?.audioMuxUpdate !== undefined && + object.message?.audioMuxUpdate !== null + ) { + message.message = { + $case: "audioMuxUpdate", + audioMuxUpdate: AudioTrackMuxUpdate.fromPartial(object.message.audioMuxUpdate), + }; + } return message; }, }; @@ -3572,6 +3607,128 @@ export const SimulateScenario = { }, }; +function createBaseAudioTrackMuxUpdate(): AudioTrackMuxUpdate { + return { audioTrackMuxes: [] }; +} + +export const AudioTrackMuxUpdate = { + encode(message: AudioTrackMuxUpdate, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + for (const v of message.audioTrackMuxes) { + AudioTrackMuxInfo.encode(v!, writer.uint32(10).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): AudioTrackMuxUpdate { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseAudioTrackMuxUpdate(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + message.audioTrackMuxes.push(AudioTrackMuxInfo.decode(reader, reader.uint32())); + break; + default: + reader.skipType(tag & 7); + break; + } + } + return message; + }, + + fromJSON(object: any): AudioTrackMuxUpdate { + return { + audioTrackMuxes: Array.isArray(object?.audioTrackMuxes) + ? object.audioTrackMuxes.map((e: any) => AudioTrackMuxInfo.fromJSON(e)) + : [], + }; + }, + + toJSON(message: AudioTrackMuxUpdate): unknown { + const obj: any = {}; + if (message.audioTrackMuxes) { + obj.audioTrackMuxes = message.audioTrackMuxes.map((e) => e ? AudioTrackMuxInfo.toJSON(e) : undefined); + } else { + obj.audioTrackMuxes = []; + } + return obj; + }, + + fromPartial, I>>(object: I): AudioTrackMuxUpdate { + const message = createBaseAudioTrackMuxUpdate(); + message.audioTrackMuxes = object.audioTrackMuxes?.map((e) => AudioTrackMuxInfo.fromPartial(e)) || []; + return message; + }, +}; + +function createBaseAudioTrackMuxInfo(): AudioTrackMuxInfo { + return { sdpTrackId: "", trackSid: "", participantSid: "" }; +} + +export const AudioTrackMuxInfo = { + encode(message: AudioTrackMuxInfo, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.sdpTrackId !== "") { + writer.uint32(10).string(message.sdpTrackId); + } + if (message.trackSid !== "") { + writer.uint32(18).string(message.trackSid); + } + if (message.participantSid !== "") { + writer.uint32(26).string(message.participantSid); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): AudioTrackMuxInfo { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseAudioTrackMuxInfo(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + message.sdpTrackId = reader.string(); + break; + case 2: + message.trackSid = reader.string(); + break; + case 3: + message.participantSid = reader.string(); + break; + default: + reader.skipType(tag & 7); + break; + } + } + return message; + }, + + fromJSON(object: any): AudioTrackMuxInfo { + return { + sdpTrackId: isSet(object.sdpTrackId) ? String(object.sdpTrackId) : "", + trackSid: isSet(object.trackSid) ? String(object.trackSid) : "", + participantSid: isSet(object.participantSid) ? String(object.participantSid) : "", + }; + }, + + toJSON(message: AudioTrackMuxInfo): unknown { + const obj: any = {}; + message.sdpTrackId !== undefined && (obj.sdpTrackId = message.sdpTrackId); + message.trackSid !== undefined && (obj.trackSid = message.trackSid); + message.participantSid !== undefined && (obj.participantSid = message.participantSid); + return obj; + }, + + fromPartial, I>>(object: I): AudioTrackMuxInfo { + const message = createBaseAudioTrackMuxInfo(); + message.sdpTrackId = object.sdpTrackId ?? ""; + message.trackSid = object.trackSid ?? ""; + message.participantSid = object.participantSid ?? ""; + return message; + }, +}; + function createBasePing(): Ping { return { timestamp: 0, rtt: 0 }; } diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 5f8e7d8d78..73ca572616 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -61,6 +61,12 @@ enum PCState { Closed, } +interface AudioMuxTrack { + track: MediaStreamTrack; + stream: MediaStream; + receiver: RTCRtpReceiver; +} + /** @internal */ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmitter) { publisher?: PCTransport; @@ -73,6 +79,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit peerConnectionTimeout: number = roomConnectOptionDefaults.peerConnectionTimeout; + audioMuxTracks: Map = new Map(); + get isClosed() { return this._isClosed; } @@ -361,6 +369,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit }; this.subscriber.pc.ontrack = (ev: RTCTrackEvent) => { + // todo-mux: firefox can't get TR_AX... track id + if (ev.track.kind === 'audio' && ev.track.id.includes('TR_AX')) { + this.audioMuxTracks.set(ev.track.id, {track: ev.track, stream: ev.streams[0], receiver: ev.receiver}) + } this.emit(EngineEvent.MediaTrackAdded, ev.track, ev.streams[0], ev.receiver); }; diff --git a/src/room/Room.ts b/src/room/Room.ts index bc1289799a..e2d91df1f7 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -23,6 +23,7 @@ import { UserPacket, } from '../proto/livekit_models'; import { + AudioTrackMuxUpdate, ConnectionQualityUpdate, JoinResponse, SimulateScenario, @@ -50,6 +51,7 @@ import LocalTrackPublication from './track/LocalTrackPublication'; import LocalVideoTrack from './track/LocalVideoTrack'; import type RemoteTrack from './track/RemoteTrack'; import RemoteTrackPublication from './track/RemoteTrackPublication'; +import MuxedRemoteAudioTrack from './track/MuxedRemoteAudioTrack'; import { Track } from './track/Track'; import type { TrackPublication } from './track/TrackPublication'; import type { AdaptiveStreamSettings } from './track/types'; @@ -133,6 +135,11 @@ class Room extends (EventEmitter as new () => TypedEmitter) private disconnectLock: Mutex; + private lastMuxUpdate?: AudioTrackMuxUpdate; + + /** mapping of track id -> MuxedRemoteAudioTrack */ + private muxedTracks: Map = new Map(); + /** * Creates a new Room, the primary construct for a LiveKit session. * @param options @@ -177,6 +184,7 @@ class Room extends (EventEmitter as new () => TypedEmitter) this.engine.client.onStreamStateUpdate = this.handleStreamStateUpdate; this.engine.client.onSubscriptionPermissionUpdate = this.handleSubscriptionPermissionUpdate; this.engine.client.onConnectionQuality = this.handleConnectionQualityUpdate; + this.engine.client.onAudioMuxUpdate = this.handleAudioMuxUpdate; this.engine .on( @@ -657,6 +665,8 @@ class Room extends (EventEmitter as new () => TypedEmitter) this.options.audioOutput.deviceId = prevDeviceId; throw e; } + + // TODO: set audio output for room audio tracks } } @@ -1058,6 +1068,61 @@ class Room extends (EventEmitter as new () => TypedEmitter) }); }; + private handleAudioMuxUpdate = (update: AudioTrackMuxUpdate) => { + + const newlySubTracks = update.audioTrackMuxes.filter(val => !(this.lastMuxUpdate?.audioTrackMuxes.some(lval => lval.trackSid === val.trackSid))); + const newlyUnsubTracks = this.lastMuxUpdate?.audioTrackMuxes.filter(lval => !(update.audioTrackMuxes.some(val => val.trackSid === lval.trackSid))); + this.lastMuxUpdate = update; + + newlySubTracks.forEach(track => { + const muxTrack = this.engine.audioMuxTracks.get(track.sdpTrackId); + if (track.sdpTrackId !== '' && !muxTrack) { + log.error(`can't find mux track for sdp track id ${track.sdpTrackId}`); + } + + const remoteTrack = new MuxedRemoteAudioTrack(track.trackSid, muxTrack?.track,muxTrack?.stream, muxTrack?.receiver, this.audioContext, this.options.audioOutput); + + const participant = this.participants.get(track.participantSid); + if (!participant) { + log.error(`can't find participant for ${track.participantSid}`); + return; + } + + participant.addMuxAudioTrack(remoteTrack, track.trackSid); + this.muxedTracks.set(track.trackSid, remoteTrack); + }); + + newlyUnsubTracks?.forEach(track => { + log.info(`unsub track ${track.trackSid}`); + const muxedTrack = this.muxedTracks.get(track.trackSid); + if (!muxedTrack) { + log.error(`can't find muxed track for participant ${track.participantSid}, track ${track.trackSid}`); + return + } + muxedTrack.close(); + this.muxedTracks.delete(track.trackSid); + }); + + update.audioTrackMuxes.forEach(track => { + const muxedTrack = this.muxedTracks.get(track.trackSid); + if(!muxedTrack) { + log.error(`can't find muxed track for participant ${track.participantSid}, track ${track.trackSid}`); + return + } + if (track.sdpTrackId === '') { + muxedTrack.unbind(); + } else { + const muxTrack = this.engine.audioMuxTracks.get(track.sdpTrackId) + if (!muxTrack) { + log.error(`can't find mux track for sdp track id ${track.sdpTrackId}`); + return + } + log.info(`track ${track.trackSid} muxed to ${track.sdpTrackId}`); + muxedTrack.bind(muxTrack.track, muxTrack.stream,muxTrack.receiver); + } + }); + } + private async acquireAudioContext() { if ( typeof this.options.expWebAudioMix !== 'boolean' && diff --git a/src/room/participant/RemoteParticipant.ts b/src/room/participant/RemoteParticipant.ts index 8e49603579..1744eb3718 100644 --- a/src/room/participant/RemoteParticipant.ts +++ b/src/room/participant/RemoteParticipant.ts @@ -119,6 +119,35 @@ export default class RemoteParticipant extends Participant { return this.volume; } + /** @internal */ + addMuxAudioTrack( + track: RemoteAudioTrack, + sid: Track.SID, + ) { + let publication = this.getTrackPublication(sid); + if (!publication) { + log.error('could not find published track', { participant: this.sid, trackSid: sid }); + this.emit(ParticipantEvent.TrackSubscriptionFailed, sid); + return; + } + track.source = publication.source; + // keep publication's muted status + track.isMuted = publication.isMuted; + track.start(); + + publication.setTrack(track); + // set participant volume on new microphone tracks + if ( + this.volume !== undefined && + track instanceof RemoteAudioTrack && + track.source === Track.Source.Microphone + ) { + track.setVolume(this.volume); + } + + return publication; + } + /** @internal */ addSubscribedMediaTrack( mediaTrack: MediaStreamTrack, diff --git a/src/room/track/MuxedRemoteAudioTrack.ts b/src/room/track/MuxedRemoteAudioTrack.ts new file mode 100644 index 0000000000..840bd5d8ca --- /dev/null +++ b/src/room/track/MuxedRemoteAudioTrack.ts @@ -0,0 +1,87 @@ +import log from '../../logger'; +import RemoteAudioTrack from "./RemoteAudioTrack"; +import type { AudioOutputOptions } from './options'; + +export default class MuxedRemoteAudioTrack extends RemoteAudioTrack { + private audioCtx: AudioContext; + + private streamDst: MediaStreamAudioDestinationNode; + + private streamDstTrack: MediaStreamTrack; + + private muxingTrack?: MediaStreamTrack; + + private muxingSourceNode?: AudioNode; + + private audioForWebRTCStream?: HTMLAudioElement; + + constructor( + sid: string, + mediaTrack?: MediaStreamTrack, + mediaStream?: MediaStream, + receiver?: RTCRtpReceiver, + audioContext?: AudioContext, + audioOutput?: AudioOutputOptions, + ) { + if (!audioContext) { + audioContext = new AudioContext() + } + const dst = audioContext.createMediaStreamDestination(); + // const dst = audioContext.destination; + const [streamDstTrack] = dst.stream.getAudioTracks(); + if (!streamDstTrack) { + throw Error('Could not get media stream audio track'); + } + super(streamDstTrack, sid, receiver, audioContext, audioOutput); + this.audioCtx = audioContext; + this.streamDst = dst; + this.streamDstTrack = streamDstTrack; + this.audioForWebRTCStream = new Audio(); + // this.streamDst.connect(audioContext.destination); + + if (mediaTrack && mediaStream) { + this.bind(mediaTrack, mediaStream, receiver); + } + this.setMediaStream(dst.stream); + } + + bind(track: MediaStreamTrack, stream: MediaStream, receiver?: RTCRtpReceiver) { + if (track === this.muxingTrack) { + return; + } + if (this.muxingSourceNode) { + this.muxingSourceNode.disconnect(); + } + log.info(`bind ${this.sid}`); + + // for chrome known bug: webrtc stream must attach to an element to play. + // https://bugs.chromium.org/p/chromium/issues/detail?id=933677&q=webrtc%20silent&can=2 + if (this.audioForWebRTCStream) { + this.audioForWebRTCStream.srcObject = stream; + } + + const srcNode = this.audioCtx.createMediaStreamSource(stream); + srcNode.connect(this.streamDst); + this.muxingSourceNode = srcNode; + this.muxingTrack = track; + this.receiver = receiver; + // this.streamDstTrack.enabled = true; + } + + unbind() { + if (this.muxingSourceNode) { + log.info(`unbind ${this.sid}`); + this.muxingSourceNode?.disconnect(); + this.muxingSourceNode = undefined; + this.muxingTrack = undefined; + this.receiver = undefined; + } + // this.streamDstTrack.enabled = false; + } + + close() { + this.unbind(); + // fire onremovetrack for Track Ended event + this.streamDst.stream.removeTrack(this.streamDstTrack); + } +} \ No newline at end of file