From ac42ba4aca110bdd3a1ba9dd5452df65e1900a8b Mon Sep 17 00:00:00 2001 From: bartosz rzepa Date: Fri, 22 Nov 2024 12:31:48 +0100 Subject: [PATCH] get settings from api, support both or just one track --- .../src/types/from_register_output.rs | 35 +++- compositor_api/src/types/register_output.rs | 27 ++- .../src/pipeline/output/whip.rs | 70 +++++--- .../output/whip/establish_peer_connection.rs | 1 + .../output/whip/init_peer_connection.rs | 170 ++++++++++++------ .../src/pipeline/output/whip/payloader.rs | 6 +- 6 files changed, 225 insertions(+), 84 deletions(-) diff --git a/compositor_api/src/types/from_register_output.rs b/compositor_api/src/types/from_register_output.rs index ff7a3ebe2..8aba4815b 100644 --- a/compositor_api/src/types/from_register_output.rs +++ b/compositor_api/src/types/from_register_output.rs @@ -10,6 +10,7 @@ use compositor_pipeline::pipeline::{ output::{ self, mp4::{Mp4AudioTrack, Mp4OutputOptions, Mp4VideoTrack}, + whip::WhipAudioOptions, }, }; @@ -193,8 +194,21 @@ impl TryFrom for pipeline::RegisterOutputOptions pipeline::VideoCodec::H264, }); - let audio_codec = audio.as_ref().map(|a| match a.encoder { - RtpAudioEncoderOptions::Opus { .. } => pipeline::AudioCodec::Opus, + let audio_options = audio.as_ref().map(|a| match &a.encoder { + WhipAudioEncoderOptions::Opus { + channels, + preset: _, + } => WhipAudioOptions { + codec: pipeline::AudioCodec::Opus, + channels: match channels { + audio::AudioChannels::Mono => { + compositor_pipeline::audio_mixer::AudioChannels::Mono + } + audio::AudioChannels::Stereo => { + compositor_pipeline::audio_mixer::AudioChannels::Stereo + } + }, + }, }); if let Some(token) = &bearer_token { @@ -205,7 +219,7 @@ impl TryFrom for pipeline::RegisterOutputOptions for pipeline::RegisterOutputOptions for pipeline::encoder::AudioEncoderOptions { } } +impl From for pipeline::encoder::AudioEncoderOptions { + fn from(value: WhipAudioEncoderOptions) -> Self { + match value { + WhipAudioEncoderOptions::Opus { channels, preset } => { + AudioEncoderOptions::Opus(encoder::opus::OpusEncoderOptions { + channels: channels.into(), + preset: preset.unwrap_or(OpusEncoderPreset::Voip).into(), + }) + } + } + } +} + impl TryFrom for pipeline::PipelineOutputEndCondition { type Error = TypeError; diff --git a/compositor_api/src/types/register_output.rs b/compositor_api/src/types/register_output.rs index c70fad968..cd8cc26f8 100644 --- a/compositor_api/src/types/register_output.rs +++ b/compositor_api/src/types/register_output.rs @@ -48,7 +48,7 @@ pub struct WhipOutput { /// Video track configuration. pub video: Option, /// Audio track configuration. - pub audio: Option, + pub audio: Option, } #[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] @@ -90,6 +90,19 @@ pub struct OutputMp4AudioOptions { pub initial: Audio, } +#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct OutputWhipAudioOptions { + /// (**default="sum_clip"**) Specifies how audio should be mixed. + pub mixing_strategy: Option, + /// Condition for termination of output stream based on the input streams states. + pub send_eos_when: Option, + /// Audio encoder options. + pub encoder: WhipAudioEncoderOptions, + /// Initial audio mixer configuration for output. + pub initial: Audio, +} + #[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] #[serde(tag = "type", rename_all = "snake_case", deny_unknown_fields)] pub enum VideoEncoderOptions { @@ -121,6 +134,18 @@ pub enum Mp4AudioEncoderOptions { Aac { channels: AudioChannels }, } +#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] +#[serde(tag = "type", rename_all = "snake_case", deny_unknown_fields)] +pub enum WhipAudioEncoderOptions { + Opus { + /// Specifies channels configuration. + channels: AudioChannels, + + /// (**default="voip"**) Specifies preset for audio output encoder. + preset: Option, + }, +} + /// This type defines when end of an input stream should trigger end of the output stream. Only one of those fields can be set at the time. /// Unless specified otherwise the input stream is considered finished/ended when: /// - TCP connection was dropped/closed. diff --git a/compositor_pipeline/src/pipeline/output/whip.rs b/compositor_pipeline/src/pipeline/output/whip.rs index 98d3e07f1..9be315cd7 100644 --- a/compositor_pipeline/src/pipeline/output/whip.rs +++ b/compositor_pipeline/src/pipeline/output/whip.rs @@ -16,6 +16,7 @@ use url::{ParseError, Url}; use webrtc::track::track_local::TrackLocalWriter; use crate::{ + audio_mixer::AudioChannels, error::OutputInitError, event::{Event, EventEmitter}, pipeline::{AudioCodec, EncoderOutputEvent, PipelineCtx, VideoCodec}, @@ -37,13 +38,20 @@ pub struct WhipSenderOptions { pub endpoint_url: String, pub bearer_token: Option>, pub video: Option, - pub audio: Option, + pub audio: Option, +} + +#[derive(Debug, Clone, Copy)] +pub struct WhipAudioOptions { + pub codec: AudioCodec, + pub channels: AudioChannels, } #[derive(Debug, Clone)] pub struct WhipCtx { - options: WhipSenderOptions, output_id: OutputId, + options: WhipSenderOptions, + sample_rate: u32, request_keyframe_sender: Option>, should_close: Arc, tokio_rt: Arc, @@ -106,6 +114,9 @@ pub enum WhipError { #[error("Entity Tag non-matching")] EntityTagNonMatching, + + #[error("Codec not supported: {0}")] + UnsupportedCodec(&'static str), } const WHIP_INIT_TIMEOUT: Duration = Duration::from_secs(60); @@ -125,8 +136,9 @@ impl WhipSender { oneshot::channel::>(); let whip_ctx = WhipCtx { - options: options.clone(), output_id: output_id.clone(), + options: options.clone(), + sample_rate: pipeline_ctx.output_sample_rate, request_keyframe_sender, should_close: should_close.clone(), tokio_rt: pipeline_ctx.tokio_rt.clone(), @@ -188,7 +200,7 @@ async fn run_whip_sender_task( event_emitter: Arc, ) { let client = Arc::new(reqwest::Client::new()); - let (peer_connection, video_track, audio_track) = match init_peer_connection().await { + let (peer_connection, video_track, audio_track) = match init_peer_connection(&whip_ctx).await { Ok(pc) => pc, Err(err) => { init_confirmation_sender.send(Err(err)).unwrap(); @@ -219,26 +231,44 @@ async fn run_whip_sender_task( }; match chunk { - Payload::Video(video_payload) => match video_payload { - Ok(video_bytes) => { - if video_track.write(&video_bytes).await.is_err() { - error!("Error occurred while writing to video track for session"); + Payload::Video(video_payload) => { + match video_track.clone() { + Some(video_track) => match video_payload { + Ok(video_bytes) => { + if video_track.write(&video_bytes).await.is_err() { + error!("Error occurred while writing to video track, closing connection."); + break; + } + } + Err(err) => { + error!("Error while reading video bytes: {err}"); + } + }, + None => { + error!("Video payload detected on output with no video, shutting down"); + break; } } - Err(err) => { - error!("Error while reading video bytes: {err}"); - } - }, - Payload::Audio(audio_payload) => match audio_payload { - Ok(audio_bytes) => { - if audio_track.write(&audio_bytes).await.is_err() { - error!("Error occurred while writing to video track for session"); + } + Payload::Audio(audio_payload) => { + match audio_track.clone() { + Some(audio_track) => match audio_payload { + Ok(audio_bytes) => { + if audio_track.write(&audio_bytes).await.is_err() { + error!("Error occurred while writing to audio track, closing connection."); + break; + } + } + Err(err) => { + error!("Error while audio video bytes: {err}"); + } + }, + None => { + error!("Audio payload detected on output with no audio, shutting down"); + break; } } - Err(err) => { - error!("Error while reading audio bytes: {err}"); - } - }, + } } } if let Err(err) = client.delete(whip_session_url).send().await { diff --git a/compositor_pipeline/src/pipeline/output/whip/establish_peer_connection.rs b/compositor_pipeline/src/pipeline/output/whip/establish_peer_connection.rs index 9b074bac0..098cfd1b2 100644 --- a/compositor_pipeline/src/pipeline/output/whip/establish_peer_connection.rs +++ b/compositor_pipeline/src/pipeline/output/whip/establish_peer_connection.rs @@ -67,6 +67,7 @@ pub async fn connect( .create_offer(None) .await .map_err(WhipError::OfferCreationError)?; + error!("offer: {offer:?}"); let endpoint_url = Url::parse(&whip_ctx.options.endpoint_url) .map_err(|e| WhipError::InvalidEndpointUrl(e, whip_ctx.options.endpoint_url.clone()))?; diff --git a/compositor_pipeline/src/pipeline/output/whip/init_peer_connection.rs b/compositor_pipeline/src/pipeline/output/whip/init_peer_connection.rs index e4e979550..8e37d99cc 100644 --- a/compositor_pipeline/src/pipeline/output/whip/init_peer_connection.rs +++ b/compositor_pipeline/src/pipeline/output/whip/init_peer_connection.rs @@ -1,9 +1,14 @@ -use super::WhipError; +use crate::{ + audio_mixer::AudioChannels, + pipeline::{AudioCodec, VideoCodec}, +}; + +use super::{WhipAudioOptions, WhipCtx, WhipError}; use std::{ env::{self, VarError}, sync::Arc, }; -use tracing::{info, warn}; +use tracing::{error, info, warn}; use webrtc::{ api::{ interceptor_registry::register_default_interceptors, @@ -22,44 +27,29 @@ use webrtc::{ const STUN_SERVER_ENV: &str = "LIVE_COMPOSITOR_STUN_SERVERS"; -pub async fn init_peer_connection() -> Result< +pub async fn init_peer_connection( + whip_ctx: &WhipCtx, +) -> Result< ( Arc, - Arc, - Arc, + Option>, + Option>, ), WhipError, > { let mut media_engine = MediaEngine::default(); media_engine.register_default_codecs()?; - media_engine.register_codec( - RTCRtpCodecParameters { - capability: RTCRtpCodecCapability { - mime_type: MIME_TYPE_H264.to_owned(), - clock_rate: 90000, - channels: 0, - sdp_fmtp_line: "".to_owned(), - rtcp_feedback: vec![], - }, - payload_type: 96, - ..Default::default() - }, - RTPCodecType::Video, - )?; - media_engine.register_codec( - RTCRtpCodecParameters { - capability: RTCRtpCodecCapability { - mime_type: MIME_TYPE_OPUS.to_owned(), - clock_rate: 48000, - channels: 2, - sdp_fmtp_line: "".to_owned(), - rtcp_feedback: vec![], - }, - payload_type: 111, - ..Default::default() - }, - RTPCodecType::Audio, - )?; + + if let Some(video) = whip_ctx.options.video { + media_engine.register_codec(video_codec_parameters(video), RTPCodecType::Video)?; + } + + if let Some(audio) = whip_ctx.options.audio { + media_engine.register_codec( + audio_codec_parameters(audio, whip_ctx.sample_rate)?, + RTPCodecType::Audio, + )?; + } let mut registry = Registry::new(); registry = register_default_interceptors(registry, &mut media_engine)?; let api = APIBuilder::new() @@ -93,30 +83,37 @@ pub async fn init_peer_connection() -> Result< ..Default::default() }; let peer_connection = Arc::new(api.new_peer_connection(config).await?); - let video_track = Arc::new(TrackLocalStaticRTP::new( - RTCRtpCodecCapability { - mime_type: MIME_TYPE_H264.to_owned(), - ..Default::default() - }, - "video".to_owned(), - "webrtc-rs".to_owned(), - )); - let audio_track = Arc::new(TrackLocalStaticRTP::new( - RTCRtpCodecCapability { - mime_type: MIME_TYPE_OPUS.to_owned(), - ..Default::default() - }, - "audio".to_owned(), - "webrtc-rs".to_owned(), - )); - peer_connection - .add_track(video_track.clone()) - .await - .map_err(WhipError::PeerConnectionInitError)?; - peer_connection - .add_track(audio_track.clone()) - .await - .map_err(WhipError::PeerConnectionInitError)?; + + let video_track = match whip_ctx.options.video { + Some(video) => { + let video_track = Arc::new(TrackLocalStaticRTP::new( + video_codec_capability(video), + "video".to_owned(), + format!("live-compositor-{}-video", whip_ctx.output_id).to_owned(), + )); + peer_connection + .add_track(video_track.clone()) + .await + .map_err(WhipError::PeerConnectionInitError)?; + Some(video_track) + } + None => None, + }; + let audio_track = match whip_ctx.options.audio { + Some(audio_options) => { + let audio_track = Arc::new(TrackLocalStaticRTP::new( + audio_codec_capability(audio_options, whip_ctx.sample_rate)?, + "audio".to_owned(), + format!("live-compositor-{}-audio", whip_ctx.output_id).to_owned(), + )); + peer_connection + .add_track(audio_track.clone()) + .await + .map_err(WhipError::PeerConnectionInitError)?; + Some(audio_track) + } + None => None, + }; let transceivers = peer_connection.get_transceivers().await; for transceiver in transceivers { transceiver @@ -125,3 +122,62 @@ pub async fn init_peer_connection() -> Result< } Ok((peer_connection, video_track, audio_track)) } + +fn video_codec_capability(video: VideoCodec) -> RTCRtpCodecCapability { + match video { + VideoCodec::H264 => RTCRtpCodecCapability { + mime_type: MIME_TYPE_H264.to_owned(), + clock_rate: 90000, + channels: 0, + sdp_fmtp_line: "".to_owned(), + rtcp_feedback: vec![], + }, + } +} + +fn audio_codec_capability( + audio_options: WhipAudioOptions, + sample_rate: u32, +) -> Result { + match audio_options.codec { + AudioCodec::Opus => Ok(RTCRtpCodecCapability { + mime_type: MIME_TYPE_OPUS.to_owned(), + clock_rate: sample_rate, + channels: match audio_options.channels { + AudioChannels::Mono => 1, + AudioChannels::Stereo => 2, + }, + sdp_fmtp_line: "".to_owned(), + rtcp_feedback: vec![], + }), + AudioCodec::Aac => Err(WhipError::UnsupportedCodec("AAC")), + } +} + +fn video_codec_parameters(video: VideoCodec) -> RTCRtpCodecParameters { + let capability = video_codec_capability(video); + let payload_type = match video { + VideoCodec::H264 => 96, + }; + RTCRtpCodecParameters { + capability, + payload_type, + ..Default::default() + } +} + +fn audio_codec_parameters( + audio_options: WhipAudioOptions, + sample_rate: u32, +) -> Result { + let capability = audio_codec_capability(audio_options, sample_rate)?; + let payload_type = match audio_options.codec { + AudioCodec::Aac => return Err(WhipError::UnsupportedCodec("AAC")), + AudioCodec::Opus => 111, + }; + Ok(RTCRtpCodecParameters { + capability, + payload_type, + ..Default::default() + }) +} diff --git a/compositor_pipeline/src/pipeline/output/whip/payloader.rs b/compositor_pipeline/src/pipeline/output/whip/payloader.rs index 720dddb32..f64a51772 100644 --- a/compositor_pipeline/src/pipeline/output/whip/payloader.rs +++ b/compositor_pipeline/src/pipeline/output/whip/payloader.rs @@ -12,6 +12,8 @@ use crate::pipeline::{ AudioCodec, VideoCodec, }; +use super::WhipAudioOptions; + const H264_CLOCK_RATE: u32 = 90000; const OPUS_CLOCK_RATE: u32 = 48000; @@ -104,10 +106,10 @@ pub enum Payload { } impl Payloader { - pub fn new(video: Option, audio: Option) -> Self { + pub fn new(video: Option, audio: Option) -> Self { Self { video: video.map(VideoPayloader::new), - audio: audio.map(AudioPayloader::new), + audio: audio.map(|audio| AudioPayloader::new(audio.codec)), } }