Skip to content

Commit

Permalink
get settings from api, support both or just one track
Browse files Browse the repository at this point in the history
  • Loading branch information
brzep committed Nov 22, 2024
1 parent bc5675c commit ac42ba4
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 84 deletions.
35 changes: 31 additions & 4 deletions compositor_api/src/types/from_register_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use compositor_pipeline::pipeline::{
output::{
self,
mp4::{Mp4AudioTrack, Mp4OutputOptions, Mp4VideoTrack},
whip::WhipAudioOptions,
},
};

Expand Down Expand Up @@ -193,8 +194,21 @@ impl TryFrom<WhipOutput> for pipeline::RegisterOutputOptions<output::OutputOptio
let video_codec = video.as_ref().map(|v| match v.encoder {
VideoEncoderOptions::FfmpegH264 { .. } => pipeline::VideoCodec::H264,
});
let audio_codec = audio.as_ref().map(|a| match a.encoder {
RtpAudioEncoderOptions::Opus { .. } => pipeline::AudioCodec::Opus,
let audio_options = audio.as_ref().map(|a| match &a.encoder {
WhipAudioEncoderOptions::Opus {
channels,
preset: _,
} => WhipAudioOptions {
codec: pipeline::AudioCodec::Opus,
channels: match channels {
audio::AudioChannels::Mono => {
compositor_pipeline::audio_mixer::AudioChannels::Mono
}
audio::AudioChannels::Stereo => {
compositor_pipeline::audio_mixer::AudioChannels::Stereo
}
},
},
});

if let Some(token) = &bearer_token {
Expand All @@ -205,7 +219,7 @@ impl TryFrom<WhipOutput> for pipeline::RegisterOutputOptions<output::OutputOptio

let (video_encoder_options, output_video_options) = maybe_video_options(video)?;
let (audio_encoder_options, output_audio_options) = match audio {
Some(OutputRtpAudioOptions {
Some(OutputWhipAudioOptions {
mixing_strategy,
send_eos_when,
encoder,
Expand All @@ -229,7 +243,7 @@ impl TryFrom<WhipOutput> for pipeline::RegisterOutputOptions<output::OutputOptio
endpoint_url,
bearer_token,
video: video_codec,
audio: audio_codec,
audio: audio_options,
}),
video: video_encoder_options,
audio: audio_encoder_options,
Expand Down Expand Up @@ -300,6 +314,19 @@ impl From<RtpAudioEncoderOptions> for pipeline::encoder::AudioEncoderOptions {
}
}

impl From<WhipAudioEncoderOptions> for pipeline::encoder::AudioEncoderOptions {
fn from(value: WhipAudioEncoderOptions) -> Self {
match value {
WhipAudioEncoderOptions::Opus { channels, preset } => {
AudioEncoderOptions::Opus(encoder::opus::OpusEncoderOptions {
channels: channels.into(),
preset: preset.unwrap_or(OpusEncoderPreset::Voip).into(),
})
}
}
}
}

impl TryFrom<OutputEndCondition> for pipeline::PipelineOutputEndCondition {
type Error = TypeError;

Expand Down
27 changes: 26 additions & 1 deletion compositor_api/src/types/register_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub struct WhipOutput {
/// Video track configuration.
pub video: Option<OutputVideoOptions>,
/// Audio track configuration.
pub audio: Option<OutputRtpAudioOptions>,
pub audio: Option<OutputWhipAudioOptions>,
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
Expand Down Expand Up @@ -90,6 +90,19 @@ pub struct OutputMp4AudioOptions {
pub initial: Audio,
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct OutputWhipAudioOptions {
/// (**default="sum_clip"**) Specifies how audio should be mixed.
pub mixing_strategy: Option<MixingStrategy>,
/// Condition for termination of output stream based on the input streams states.
pub send_eos_when: Option<OutputEndCondition>,
/// Audio encoder options.
pub encoder: WhipAudioEncoderOptions,
/// Initial audio mixer configuration for output.
pub initial: Audio,
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(tag = "type", rename_all = "snake_case", deny_unknown_fields)]
pub enum VideoEncoderOptions {
Expand Down Expand Up @@ -121,6 +134,18 @@ pub enum Mp4AudioEncoderOptions {
Aac { channels: AudioChannels },
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(tag = "type", rename_all = "snake_case", deny_unknown_fields)]
pub enum WhipAudioEncoderOptions {
Opus {
/// Specifies channels configuration.
channels: AudioChannels,

/// (**default="voip"**) Specifies preset for audio output encoder.
preset: Option<OpusEncoderPreset>,
},
}

/// This type defines when end of an input stream should trigger end of the output stream. Only one of those fields can be set at the time.
/// Unless specified otherwise the input stream is considered finished/ended when:
/// - TCP connection was dropped/closed.
Expand Down
70 changes: 50 additions & 20 deletions compositor_pipeline/src/pipeline/output/whip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use url::{ParseError, Url};
use webrtc::track::track_local::TrackLocalWriter;

use crate::{
audio_mixer::AudioChannels,
error::OutputInitError,
event::{Event, EventEmitter},
pipeline::{AudioCodec, EncoderOutputEvent, PipelineCtx, VideoCodec},
Expand All @@ -37,13 +38,20 @@ pub struct WhipSenderOptions {
pub endpoint_url: String,
pub bearer_token: Option<Arc<str>>,
pub video: Option<VideoCodec>,
pub audio: Option<AudioCodec>,
pub audio: Option<WhipAudioOptions>,
}

#[derive(Debug, Clone, Copy)]
pub struct WhipAudioOptions {
pub codec: AudioCodec,
pub channels: AudioChannels,
}

#[derive(Debug, Clone)]
pub struct WhipCtx {
options: WhipSenderOptions,
output_id: OutputId,
options: WhipSenderOptions,
sample_rate: u32,
request_keyframe_sender: Option<Sender<()>>,
should_close: Arc<AtomicBool>,
tokio_rt: Arc<Runtime>,
Expand Down Expand Up @@ -106,6 +114,9 @@ pub enum WhipError {

#[error("Entity Tag non-matching")]
EntityTagNonMatching,

#[error("Codec not supported: {0}")]
UnsupportedCodec(&'static str),
}

const WHIP_INIT_TIMEOUT: Duration = Duration::from_secs(60);
Expand All @@ -125,8 +136,9 @@ impl WhipSender {
oneshot::channel::<Result<(), WhipError>>();

let whip_ctx = WhipCtx {
options: options.clone(),
output_id: output_id.clone(),
options: options.clone(),
sample_rate: pipeline_ctx.output_sample_rate,
request_keyframe_sender,
should_close: should_close.clone(),
tokio_rt: pipeline_ctx.tokio_rt.clone(),
Expand Down Expand Up @@ -188,7 +200,7 @@ async fn run_whip_sender_task(
event_emitter: Arc<EventEmitter>,
) {
let client = Arc::new(reqwest::Client::new());
let (peer_connection, video_track, audio_track) = match init_peer_connection().await {
let (peer_connection, video_track, audio_track) = match init_peer_connection(&whip_ctx).await {
Ok(pc) => pc,
Err(err) => {
init_confirmation_sender.send(Err(err)).unwrap();
Expand Down Expand Up @@ -219,26 +231,44 @@ async fn run_whip_sender_task(
};

match chunk {
Payload::Video(video_payload) => match video_payload {
Ok(video_bytes) => {
if video_track.write(&video_bytes).await.is_err() {
error!("Error occurred while writing to video track for session");
Payload::Video(video_payload) => {
match video_track.clone() {
Some(video_track) => match video_payload {
Ok(video_bytes) => {
if video_track.write(&video_bytes).await.is_err() {
error!("Error occurred while writing to video track, closing connection.");
break;
}
}
Err(err) => {
error!("Error while reading video bytes: {err}");
}
},
None => {
error!("Video payload detected on output with no video, shutting down");
break;
}
}
Err(err) => {
error!("Error while reading video bytes: {err}");
}
},
Payload::Audio(audio_payload) => match audio_payload {
Ok(audio_bytes) => {
if audio_track.write(&audio_bytes).await.is_err() {
error!("Error occurred while writing to video track for session");
}
Payload::Audio(audio_payload) => {
match audio_track.clone() {
Some(audio_track) => match audio_payload {
Ok(audio_bytes) => {
if audio_track.write(&audio_bytes).await.is_err() {
error!("Error occurred while writing to audio track, closing connection.");
break;
}
}
Err(err) => {
error!("Error while audio video bytes: {err}");
}
},
None => {
error!("Audio payload detected on output with no audio, shutting down");
break;
}
}
Err(err) => {
error!("Error while reading audio bytes: {err}");
}
},
}
}
}
if let Err(err) = client.delete(whip_session_url).send().await {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub async fn connect(
.create_offer(None)
.await
.map_err(WhipError::OfferCreationError)?;
error!("offer: {offer:?}");

let endpoint_url = Url::parse(&whip_ctx.options.endpoint_url)
.map_err(|e| WhipError::InvalidEndpointUrl(e, whip_ctx.options.endpoint_url.clone()))?;
Expand Down
Loading

0 comments on commit ac42ba4

Please sign in to comment.