Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
error handling
Browse files Browse the repository at this point in the history
wkazmierczak committed Oct 24, 2024
1 parent 36398e5 commit 4cc8444
Showing 1 changed file with 36 additions and 8 deletions.
44 changes: 36 additions & 8 deletions compositor_pipeline/src/pipeline/output/whip.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use compositor_render::OutputId;
use crossbeam_channel::{Receiver, Sender};
use payloader::Payload;
use reqwest::{header::HeaderMap, Url};
use reqwest::{header::HeaderMap, Method, StatusCode, Url};
use std::sync::{atomic::AtomicBool, Arc};
use tracing::{debug, error, info, span, Level};
use webrtc::{
@@ -51,6 +51,12 @@ pub struct WhipSenderOptions {

#[derive(Debug, thiserror::Error)]
pub enum WhipError {
#[error("Bad status in WHIP response\nStatus: {0}\nBody: {1}")]
BadStatus(StatusCode, String),

#[error("WHIP request failed!\nMethod: {0}\nURL: {1}")]
RequestFailed(Method, String),

#[error("Missing location header in WHIP response")]
MissingLocationHeader,
}
@@ -134,7 +140,10 @@ fn start_whip_sender_thread(
.await
{
Ok(val) => val,
Err(_) => return,
Err(err) => {
error!("{err}");
return;
}
};

for chunk in packet_stream {
@@ -303,13 +312,22 @@ async fn connect(
header_map.append("Authorization", format!("Bearer {token}").parse().unwrap());
}

let response = client
let response = match client
.post(endpoint_url.clone())
.headers(header_map)
.body(offer.sdp.clone())
.send()
.await
.unwrap();
{
Ok(res) => res,
Err(_) => return Err(WhipError::RequestFailed(Method::POST, endpoint_url)),
};

if response.status() >= StatusCode::BAD_REQUEST {
let status = response.status();
let body_str = response.text().await.unwrap();
return Err(WhipError::BadStatus(status, body_str));
}

info!("[WHIP] response: {:?}", &response);

@@ -353,7 +371,7 @@ async fn connect(
peer_connection.on_ice_candidate(Box::new(move |candidate| {
if let Some(candidate) = candidate {
let client_clone = client.clone();
let location2 = location1.clone();
let location2 = location1.clone().to_string();
let bearer_token1 = bearer_token.clone();
tokio_rt.spawn(async move {
let ice_candidate = candidate.to_json().unwrap();
@@ -368,13 +386,23 @@ async fn connect(
header_map.append("Authorization", format!("Bearer {token}").parse().unwrap());
}

let _ = client_clone
.patch(location2)
let response = match client_clone
.patch(location2.clone())
.headers(header_map)
.body(serde_json::to_string(&ice_candidate).unwrap())
.send()
.await
.unwrap();
{
Ok(res) => res,
Err(_) => return Err(WhipError::RequestFailed(Method::PATCH, location2)),
};

if response.status() >= StatusCode::BAD_REQUEST {
let status = response.status();
let body_str = response.text().await.unwrap();
return Err(WhipError::BadStatus(status, body_str));
};
Ok(response)
});
}
Box::pin(async {})

0 comments on commit 4cc8444

Please sign in to comment.