Skip to content

Commit

Permalink
refactor audio/video payload
Browse files Browse the repository at this point in the history
  • Loading branch information
wkazmierczak committed Oct 22, 2024
1 parent 67c1029 commit c893415
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 76 deletions.
7 changes: 4 additions & 3 deletions compositor_api/src/types/from_register_output.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
use super::register_output::*;
use super::util::*;
use super::*;
use compositor_pipeline::pipeline::{
self,
encoder::{
Expand All @@ -15,6 +12,10 @@ use compositor_pipeline::pipeline::{
},
};

use super::register_output::*;
use super::util::*;
use super::*;

impl TryFrom<RtpOutput> for pipeline::RegisterOutputOptions<output::OutputOptions> {
type Error = TypeError;

Expand Down
49 changes: 30 additions & 19 deletions compositor_pipeline/src/pipeline/output/whip.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use compositor_render::OutputId;
use crossbeam_channel::Receiver;
use payloader::DataKind;
use payloader::Payload;
use reqwest::{header::HeaderMap, Url};
use std::sync::{atomic::AtomicBool, Arc};
use tracing::{debug, error, info, span, Level};
Expand Down Expand Up @@ -48,6 +48,12 @@ pub struct WhipSenderOptions {
pub audio: Option<AudioCodec>,
}

#[derive(Debug, thiserror::Error)]
pub enum WhipError {
#[error("Missing location header in WHIP response")]
MissingLocationHeader,
}

impl WhipSender {
pub fn new(
output_id: &OutputId,
Expand Down Expand Up @@ -111,17 +117,19 @@ fn start_whip_sender_thread(
tokio_rt.block_on(async {
let client = reqwest::Client::new();
let (peer_connection, video_track, audio_track) = init_pc().await;
let should_close2 = should_close.clone();
let whip_session_url = connect(
let whip_session_url = match connect(
peer_connection,
endpoint_url,
bearer_token,
should_close2,
should_close.clone(),
tokio_rt.clone(),
client.clone(),
)
.await
.unwrap();
{
Ok(val) => val,
Err(_) => return,
};

for chunk in packet_stream {
if should_close.load(std::sync::atomic::Ordering::Relaxed) {
Expand All @@ -135,15 +143,15 @@ fn start_whip_sender_thread(
}
};

match chunk.kind {
DataKind::Audio => {
if audio_track.write(&chunk.data).await.is_err() {
error!("Error occurred while writing to audio track for session");
match chunk {
Payload::Video(bytes) => {
if video_track.write(&bytes).await.is_err() {
error!("Error occurred while writing to video track for session");
}
}
DataKind::Video => {
if video_track.write(&chunk.data).await.is_err() {
error!("Error occurred while writing to video track for session");
Payload::Audio(bytes) => {
if audio_track.write(&bytes).await.is_err() {
error!("Error occurred while writing to audio track for session");
}
}
}
Expand Down Expand Up @@ -238,7 +246,7 @@ async fn connect(
should_close: Arc<AtomicBool>,
tokio_rt: Arc<tokio::runtime::Runtime>,
client: reqwest::Client,
) -> anyhow::Result<Url> {
) -> Result<Url, WhipError> {
peer_connection.on_ice_connection_state_change(Box::new(
move |connection_state: RTCIceConnectionState| {
debug!("Connection State has changed {connection_state}");
Expand Down Expand Up @@ -277,18 +285,21 @@ async fn connect(

let parsed_endpoint_url = Url::parse(&endpoint_url).unwrap();

let location_url_path = response.headers()
.get("location")
.and_then(|url| url.to_str().ok())
.ok_or_else(|| {
error!("Unable to get location endpoint, check correctness of WHIP endpoint and your Bearer token");
WhipError::MissingLocationHeader
})?;

let location_url = Url::try_from(
format!(
"{}://{}:{}{}",
parsed_endpoint_url.scheme(),
parsed_endpoint_url.host_str().unwrap(),
parsed_endpoint_url.port().unwrap(),
response
.headers()
.get("location")
.unwrap()
.to_str()
.unwrap()
location_url_path
)
.as_str(),
)
Expand Down
38 changes: 5 additions & 33 deletions compositor_pipeline/src/pipeline/output/whip/packet_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crossbeam_channel::Receiver;

use crate::pipeline::types::EncoderOutputEvent;

use super::payloader::{DataKind, Payload, Payloader, PayloadingError};
use super::payloader::{Payload, Payloader, PayloadingError};

pub(super) struct PacketStream {
packets_receiver: Receiver<EncoderOutputEvent>,
Expand Down Expand Up @@ -33,51 +33,23 @@ impl PacketStream {
match self.payloader.audio_eos() {
Err(PayloadingError::NoAudioPayloader) => (),
Err(PayloadingError::AudioEOSAlreadySent) => (),
packet => {
return Some(match packet {
Ok(packet) => Ok(Payload {
data: packet,
kind: DataKind::Audio,
}),
Err(err) => Err(err),
})
}
packet => return Some(Ok(Payload::Audio(packet.unwrap()))),
}
match self.payloader.video_eos() {
Err(PayloadingError::NoVideoPayloader) => (),
Err(PayloadingError::VideoEOSAlreadySent) => (),
packet => {
return Some(match packet {
Ok(packet) => Ok(Payload {
data: packet,
kind: DataKind::Video,
}),
Err(err) => Err(err),
})
}
packet => return Some(Ok(Payload::Video(packet.unwrap()))),
}
return None;
};

let encoded_chunk = match packet {
EncoderOutputEvent::Data(packet) => packet,
EncoderOutputEvent::AudioEOS => {
return Some(match self.payloader.audio_eos() {
Ok(packet) => Ok(Payload {
data: packet,
kind: DataKind::Video,
}),
Err(err) => Err(err),
})
return Some(Ok(Payload::Audio(self.payloader.audio_eos().unwrap())));
}
EncoderOutputEvent::VideoEOS => {
return Some(match self.payloader.video_eos() {
Ok(packet) => Ok(Payload {
data: packet,
kind: DataKind::Video,
}),
Err(err) => Err(err),
})
return Some(Ok(Payload::Video(self.payloader.video_eos().unwrap())));
}
};

Expand Down
36 changes: 15 additions & 21 deletions compositor_pipeline/src/pipeline/output/whip/payloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ pub enum PayloadingError {

#[error("Video EOS already sent.")]
VideoEOSAlreadySent,

#[error("Unsupported payload type.")]
UnsupportedPayloadType,
}

pub struct Payloader {
Expand All @@ -95,14 +98,9 @@ enum AudioPayloader {
},
}

pub enum DataKind {
Audio,
Video,
}

pub struct Payload {
pub data: Bytes,
pub kind: DataKind,
pub enum Payload {
Video(Bytes),
Audio(Bytes),
}

impl Payloader {
Expand Down Expand Up @@ -304,19 +302,15 @@ fn payload<T: rtp::packetizer::Payloader>(
};
context.next_sequence_number = context.next_sequence_number.wrapping_add(1);

Ok(Payload {
data: rtp::packet::Packet { header, payload }.marshal()?,
kind: chunk.kind.into(),
})
match payload_type {
VIDEO_PAYLOAD_TYPE => Ok(Payload::Video(
rtp::packet::Packet { header, payload }.marshal()?,
)),
AUDIO_PAYLOAD_TYPE => Ok(Payload::Audio(
rtp::packet::Packet { header, payload }.marshal()?,
)),
_ => Err(PayloadingError::UnsupportedPayloadType),
}
})
.collect()
}

impl From<EncodedChunkKind> for DataKind {
fn from(kind: EncodedChunkKind) -> Self {
match kind {
EncodedChunkKind::Video(_) => DataKind::Video,
EncodedChunkKind::Audio(_) => DataKind::Audio,
}
}
}

0 comments on commit c893415

Please sign in to comment.