Skip to content

Commit

Permalink
unable streaming simultaneously to one input, add validation to sessi…
Browse files Browse the repository at this point in the history
…on endpoints
  • Loading branch information
wkazmierczak committed Nov 29, 2024
1 parent 2dffd7f commit 4c00ec7
Showing 1 changed file with 42 additions and 1 deletion.
43 changes: 42 additions & 1 deletion compositor_pipeline/src/pipeline/whip_whep/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ pub async fn handle_whip(

if let Ok(connections) = state_clone.input_connections.lock() {
if let Some(connection) = connections.get(&input_id) {
if connection.peer_connection.is_some() {
error!("Something else is streaming for given input {input_id:?}");
return Err(WhipServerError::InternalError(format!(
"Something else is streaming for given input {input_id:?}"
)));
}
video_sender = connection.video_sender.clone();
audio_sender = connection.audio_sender.clone();
depayloader = connection.depayloader.clone();
Expand Down Expand Up @@ -176,10 +182,27 @@ pub async fn handle_whip(
pub async fn whip_ice_candidates_handler(
Path(id): Path<String>,
State(state): State<Arc<WhipWhepState>>,
headers: HeaderMap,
candidate: String,
) -> Result<(StatusCode, impl IntoResponse), WhipServerError> {
let bearer_token: Option<String>;
let input_id = InputId(Arc::from(id));

if let Ok(connections) = state.input_connections.lock() {
if let Some(connection) = connections.get(&input_id) {
bearer_token = connection.bearer_token.clone();
} else {
return Err(WhipServerError::NotFound(format!(
"InputID {input_id:?} not found"
)));
}
} else {
return Err(WhipServerError::InternalError(
"Input connections lock error".to_string(),
));
}
validate_token(bearer_token, headers.get("Authorization")).await?;

let candidate: Value = serde_json::from_str(&candidate)?;

let candidate_str = candidate["candidate"].as_str().unwrap_or("");
Expand Down Expand Up @@ -231,13 +254,31 @@ pub async fn handle_options() -> Result<Response<Body>, WhipServerError> {
pub async fn terminate_whip_session(
Path(id): Path<String>,
State(state): State<Arc<WhipWhepState>>,
headers: HeaderMap,
) -> Result<(StatusCode, impl IntoResponse), WhipServerError> {
let bearer_token: Option<String>;
let input_id = InputId(Arc::from(id));
let peer_connection;

if let Ok(connections) = state.input_connections.lock() {
if let Some(connection) = connections.get(&input_id) {
bearer_token = connection.bearer_token.clone();
} else {
return Err(WhipServerError::NotFound(format!(
"InputID {input_id:?} not found"
)));
}
} else {
return Err(WhipServerError::InternalError(
"Input connections lock error".to_string(),
));
}
validate_token(bearer_token, headers.get("Authorization")).await?;
let peer_connection;

if let Ok(mut connections) = state.input_connections.lock() {
if let Some(connection) = connections.get_mut(&input_id) {
peer_connection = connection.peer_connection.clone();
connection.peer_connection = None;
drop(connection.audio_sender.clone());
drop(connection.video_sender.clone());
} else {
Expand Down

0 comments on commit 4c00ec7

Please sign in to comment.