From 8de27b42714ad68f3ca148e2ec37976c0b7a6e1c Mon Sep 17 00:00:00 2001 From: bartosz rzepa Date: Wed, 20 Nov 2024 13:19:59 +0100 Subject: [PATCH] fixes --- compositor_pipeline/src/error.rs | 6 ++--- compositor_pipeline/src/pipeline/output.rs | 5 ++-- .../src/pipeline/output/whip.rs | 7 ++++-- .../output/whip/establish_peer_connection.rs | 25 ++++++++++++++++--- .../output/whip/init_peer_connection.rs | 22 ++++++++++------ 5 files changed, 46 insertions(+), 19 deletions(-) diff --git a/compositor_pipeline/src/error.rs b/compositor_pipeline/src/error.rs index 3a5c8d992..3f8d95c7a 100644 --- a/compositor_pipeline/src/error.rs +++ b/compositor_pipeline/src/error.rs @@ -94,14 +94,14 @@ pub enum OutputInitError { #[error("Failed to register output. FFmpeg error: {0}.")] FfmpegMp4Error(ffmpeg_next::Error), - #[error("Unkown Whip output error, channel unexpectedly closed")] + #[error("Unkown Whip output error.")] UnknownWhipError, #[error("Whip init timeout exceeded")] WhipInitTimeout, - #[error("Failed to init whip output: {0}")] - WhipInitError(Box), + #[error("Failed to init whip output")] + WhipInitError(#[source] Box), } #[derive(Debug, thiserror::Error)] diff --git a/compositor_pipeline/src/pipeline/output.rs b/compositor_pipeline/src/pipeline/output.rs index 2e97d3514..423870c85 100644 --- a/compositor_pipeline/src/pipeline/output.rs +++ b/compositor_pipeline/src/pipeline/output.rs @@ -230,14 +230,15 @@ impl Output { Output::RawData { .. } => return Err(RequestKeyframeError::RawOutput(output_id)), }; - if let Err(err) = encoder + if encoder .video .as_ref() .ok_or(RequestKeyframeError::NoVideoOutput(output_id))? .keyframe_request_sender() .send(()) + .is_err() { - debug!(%err, "Failed to send keyframe request to the encoder."); + debug!("Failed to send keyframe request to the encoder. Channel closed."); }; Ok(()) diff --git a/compositor_pipeline/src/pipeline/output/whip.rs b/compositor_pipeline/src/pipeline/output/whip.rs index e43988a29..98d3e07f1 100644 --- a/compositor_pipeline/src/pipeline/output/whip.rs +++ b/compositor_pipeline/src/pipeline/output/whip.rs @@ -40,6 +40,7 @@ pub struct WhipSenderOptions { pub audio: Option, } +#[derive(Debug, Clone)] pub struct WhipCtx { options: WhipSenderOptions, output_id: OutputId, @@ -196,7 +197,7 @@ async fn run_whip_sender_task( }; let output_id_clone = whip_ctx.output_id.clone(); let should_close_clone = whip_ctx.should_close.clone(); - let whip_session_url = match connect(peer_connection, client.clone(), whip_ctx).await { + let whip_session_url = match connect(peer_connection, client.clone(), &whip_ctx).await { Ok(val) => val, Err(err) => { init_confirmation_sender.send(Err(err)).unwrap(); @@ -240,7 +241,9 @@ async fn run_whip_sender_task( }, } } - let _ = client.delete(whip_session_url).send().await; + if let Err(err) = client.delete(whip_session_url).send().await { + error!("Error while sending delete whip session request: {}", err); + } event_emitter.emit(Event::OutputDone(output_id_clone)); debug!("Closing WHIP sender thread.") } diff --git a/compositor_pipeline/src/pipeline/output/whip/establish_peer_connection.rs b/compositor_pipeline/src/pipeline/output/whip/establish_peer_connection.rs index 3a1ff0414..f5597892b 100644 --- a/compositor_pipeline/src/pipeline/output/whip/establish_peer_connection.rs +++ b/compositor_pipeline/src/pipeline/output/whip/establish_peer_connection.rs @@ -1,4 +1,5 @@ use super::{WhipCtx, WhipError}; +use compositor_render::error::ErrorStack; use reqwest::{ header::{HeaderMap, HeaderValue}, Client, Method, StatusCode, Url, @@ -17,8 +18,9 @@ use webrtc::{ pub async fn connect( peer_connection: Arc, client: Arc, - whip_ctx: WhipCtx, + whip_ctx: &WhipCtx, ) -> Result { + let whip_ctx = whip_ctx.clone(); peer_connection.on_ice_connection_state_change(Box::new( move |connection_state: RTCIceConnectionState| { debug!("Connection State has changed {connection_state}."); @@ -74,7 +76,14 @@ pub async fn connect( header_map.append("Content-Type", HeaderValue::from_static("application/sdp")); if let Some(token) = &whip_ctx.options.bearer_token { - header_map.append("Authorization", format!("Bearer {token}").parse().unwrap()); + let header_value_str: HeaderValue = match format!("Bearer {token}").parse() { + Ok(val) => val, + Err(err) => { + error!("Ivalid header token, couldn't parse: {}", err); + HeaderValue::from_static("Bearer") + } + }; + header_map.append("Authorization", header_value_str); } let response = client @@ -99,6 +108,7 @@ pub async fn connect( .get("location") .and_then(|url| url.to_str().ok()) .ok_or_else(|| WhipError::MissingLocationHeader)?; + error!("location header: {location_url_path}"); let scheme = endpoint_url.scheme(); let host = endpoint_url @@ -154,7 +164,7 @@ pub async fn connect( info!("Entity tags not supported by WHIP output"); should_stop_clone.store(true, Ordering::Relaxed); } - _ => error!("{err}"), + _ => error!("{}", ErrorStack::new(&err).into_string()), } } } @@ -181,7 +191,14 @@ async fn handle_candidate( ); if let Some(token) = bearer_token { - header_map.append("Authorization", format!("Bearer {token}").parse().unwrap()); + let header_value_str: HeaderValue = match format!("Bearer {token}").parse() { + Ok(val) => val, + Err(err) => { + error!("Ivalid header token, couldn't parse: {}", err); + HeaderValue::from_static("Bearer") + } + }; + header_map.append("Authorization", header_value_str); } let response = client diff --git a/compositor_pipeline/src/pipeline/output/whip/init_peer_connection.rs b/compositor_pipeline/src/pipeline/output/whip/init_peer_connection.rs index 2cd9cc2a3..e4e979550 100644 --- a/compositor_pipeline/src/pipeline/output/whip/init_peer_connection.rs +++ b/compositor_pipeline/src/pipeline/output/whip/init_peer_connection.rs @@ -30,9 +30,9 @@ pub async fn init_peer_connection() -> Result< ), WhipError, > { - let mut m = MediaEngine::default(); - m.register_default_codecs()?; - m.register_codec( + let mut media_engine = MediaEngine::default(); + media_engine.register_default_codecs()?; + media_engine.register_codec( RTCRtpCodecParameters { capability: RTCRtpCodecCapability { mime_type: MIME_TYPE_H264.to_owned(), @@ -46,7 +46,7 @@ pub async fn init_peer_connection() -> Result< }, RTPCodecType::Video, )?; - m.register_codec( + media_engine.register_codec( RTCRtpCodecParameters { capability: RTCRtpCodecCapability { mime_type: MIME_TYPE_OPUS.to_owned(), @@ -61,9 +61,9 @@ pub async fn init_peer_connection() -> Result< RTPCodecType::Audio, )?; let mut registry = Registry::new(); - registry = register_default_interceptors(registry, &mut m)?; + registry = register_default_interceptors(registry, &mut media_engine)?; let api = APIBuilder::new() - .with_media_engine(m) + .with_media_engine(media_engine) .with_interceptor_registry(registry) .build(); @@ -109,8 +109,14 @@ pub async fn init_peer_connection() -> Result< "audio".to_owned(), "webrtc-rs".to_owned(), )); - let _ = peer_connection.add_track(video_track.clone()).await; - let _ = peer_connection.add_track(audio_track.clone()).await; + peer_connection + .add_track(video_track.clone()) + .await + .map_err(WhipError::PeerConnectionInitError)?; + peer_connection + .add_track(audio_track.clone()) + .await + .map_err(WhipError::PeerConnectionInitError)?; let transceivers = peer_connection.get_transceivers().await; for transceiver in transceivers { transceiver