Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
brzep committed Nov 20, 2024
1 parent f39dcd5 commit 8de27b4
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 19 deletions.
6 changes: 3 additions & 3 deletions compositor_pipeline/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,14 @@ pub enum OutputInitError {
#[error("Failed to register output. FFmpeg error: {0}.")]
FfmpegMp4Error(ffmpeg_next::Error),

#[error("Unkown Whip output error, channel unexpectedly closed")]
#[error("Unkown Whip output error.")]
UnknownWhipError,

#[error("Whip init timeout exceeded")]
WhipInitTimeout,

#[error("Failed to init whip output: {0}")]
WhipInitError(Box<whip::WhipError>),
#[error("Failed to init whip output")]
WhipInitError(#[source] Box<whip::WhipError>),
}

#[derive(Debug, thiserror::Error)]
Expand Down
5 changes: 3 additions & 2 deletions compositor_pipeline/src/pipeline/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,15 @@ impl Output {
Output::RawData { .. } => return Err(RequestKeyframeError::RawOutput(output_id)),
};

if let Err(err) = encoder
if encoder
.video
.as_ref()
.ok_or(RequestKeyframeError::NoVideoOutput(output_id))?
.keyframe_request_sender()
.send(())
.is_err()
{
debug!(%err, "Failed to send keyframe request to the encoder.");
debug!("Failed to send keyframe request to the encoder. Channel closed.");
};

Ok(())
Expand Down
7 changes: 5 additions & 2 deletions compositor_pipeline/src/pipeline/output/whip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct WhipSenderOptions {
pub audio: Option<AudioCodec>,
}

#[derive(Debug, Clone)]
pub struct WhipCtx {
options: WhipSenderOptions,
output_id: OutputId,
Expand Down Expand Up @@ -196,7 +197,7 @@ async fn run_whip_sender_task(
};
let output_id_clone = whip_ctx.output_id.clone();
let should_close_clone = whip_ctx.should_close.clone();
let whip_session_url = match connect(peer_connection, client.clone(), whip_ctx).await {
let whip_session_url = match connect(peer_connection, client.clone(), &whip_ctx).await {
Ok(val) => val,
Err(err) => {
init_confirmation_sender.send(Err(err)).unwrap();
Expand Down Expand Up @@ -240,7 +241,9 @@ async fn run_whip_sender_task(
},
}
}
let _ = client.delete(whip_session_url).send().await;
if let Err(err) = client.delete(whip_session_url).send().await {
error!("Error while sending delete whip session request: {}", err);
}
event_emitter.emit(Event::OutputDone(output_id_clone));
debug!("Closing WHIP sender thread.")
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::{WhipCtx, WhipError};
use compositor_render::error::ErrorStack;
use reqwest::{
header::{HeaderMap, HeaderValue},
Client, Method, StatusCode, Url,
Expand All @@ -17,8 +18,9 @@ use webrtc::{
pub async fn connect(
peer_connection: Arc<RTCPeerConnection>,
client: Arc<reqwest::Client>,
whip_ctx: WhipCtx,
whip_ctx: &WhipCtx,
) -> Result<Url, WhipError> {
let whip_ctx = whip_ctx.clone();
peer_connection.on_ice_connection_state_change(Box::new(
move |connection_state: RTCIceConnectionState| {
debug!("Connection State has changed {connection_state}.");
Expand Down Expand Up @@ -74,7 +76,14 @@ pub async fn connect(
header_map.append("Content-Type", HeaderValue::from_static("application/sdp"));

if let Some(token) = &whip_ctx.options.bearer_token {
header_map.append("Authorization", format!("Bearer {token}").parse().unwrap());
let header_value_str: HeaderValue = match format!("Bearer {token}").parse() {
Ok(val) => val,
Err(err) => {
error!("Ivalid header token, couldn't parse: {}", err);
HeaderValue::from_static("Bearer")
}
};
header_map.append("Authorization", header_value_str);
}

let response = client
Expand All @@ -99,6 +108,7 @@ pub async fn connect(
.get("location")
.and_then(|url| url.to_str().ok())
.ok_or_else(|| WhipError::MissingLocationHeader)?;
error!("location header: {location_url_path}");

let scheme = endpoint_url.scheme();
let host = endpoint_url
Expand Down Expand Up @@ -154,7 +164,7 @@ pub async fn connect(
info!("Entity tags not supported by WHIP output");
should_stop_clone.store(true, Ordering::Relaxed);
}
_ => error!("{err}"),
_ => error!("{}", ErrorStack::new(&err).into_string()),
}
}
}
Expand All @@ -181,7 +191,14 @@ async fn handle_candidate(
);

if let Some(token) = bearer_token {
header_map.append("Authorization", format!("Bearer {token}").parse().unwrap());
let header_value_str: HeaderValue = match format!("Bearer {token}").parse() {
Ok(val) => val,
Err(err) => {
error!("Ivalid header token, couldn't parse: {}", err);
HeaderValue::from_static("Bearer")
}
};
header_map.append("Authorization", header_value_str);
}

let response = client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ pub async fn init_peer_connection() -> Result<
),
WhipError,
> {
let mut m = MediaEngine::default();
m.register_default_codecs()?;
m.register_codec(
let mut media_engine = MediaEngine::default();
media_engine.register_default_codecs()?;
media_engine.register_codec(
RTCRtpCodecParameters {
capability: RTCRtpCodecCapability {
mime_type: MIME_TYPE_H264.to_owned(),
Expand All @@ -46,7 +46,7 @@ pub async fn init_peer_connection() -> Result<
},
RTPCodecType::Video,
)?;
m.register_codec(
media_engine.register_codec(
RTCRtpCodecParameters {
capability: RTCRtpCodecCapability {
mime_type: MIME_TYPE_OPUS.to_owned(),
Expand All @@ -61,9 +61,9 @@ pub async fn init_peer_connection() -> Result<
RTPCodecType::Audio,
)?;
let mut registry = Registry::new();
registry = register_default_interceptors(registry, &mut m)?;
registry = register_default_interceptors(registry, &mut media_engine)?;
let api = APIBuilder::new()
.with_media_engine(m)
.with_media_engine(media_engine)
.with_interceptor_registry(registry)
.build();

Expand Down Expand Up @@ -109,8 +109,14 @@ pub async fn init_peer_connection() -> Result<
"audio".to_owned(),
"webrtc-rs".to_owned(),
));
let _ = peer_connection.add_track(video_track.clone()).await;
let _ = peer_connection.add_track(audio_track.clone()).await;
peer_connection
.add_track(video_track.clone())
.await
.map_err(WhipError::PeerConnectionInitError)?;
peer_connection
.add_track(audio_track.clone())
.await
.map_err(WhipError::PeerConnectionInitError)?;
let transceivers = peer_connection.get_transceivers().await;
for transceiver in transceivers {
transceiver
Expand Down

0 comments on commit 8de27b4

Please sign in to comment.