Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WHIP output #834

Merged
merged 33 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3c63ac4
first attempts to add whip output
wkazmierczak Oct 14, 2024
2459806
Merge branch 'master' into @wkazmierczak/WHIP-client
wkazmierczak Oct 15, 2024
615d812
messy, but sending video working
wkazmierczak Oct 15, 2024
33d3983
looks like audio is sent also, but sometimes some problems appear
wkazmierczak Oct 15, 2024
4c2b871
whip client example, refactor
wkazmierczak Oct 16, 2024
7140e0a
moved tokio runtime to pipeline_ctx
brzep Oct 17, 2024
8c6ca13
added very basic delete request
brzep Oct 17, 2024
52f587f
add Bearer Token
wkazmierczak Oct 21, 2024
b96f664
add missing port to location url
wkazmierczak Oct 21, 2024
49c1e72
refactor
wkazmierczak Oct 21, 2024
67c1029
changed bearer token to be optional
brzep Oct 22, 2024
c893415
refactor audio/video payload
wkazmierczak Oct 22, 2024
36398e5
added PLI handling
brzep Oct 23, 2024
0789069
error handling
wkazmierczak Oct 24, 2024
1fd3d93
review fixes
brzep Oct 29, 2024
e286d6d
enhanced error handling
brzep Oct 31, 2024
4bb1d74
changed trickle ice logging
brzep Nov 7, 2024
f84f8cc
fixes
brzep Nov 12, 2024
004cdd4
get stun servers from env
brzep Nov 14, 2024
f39dcd5
support trickle ice error handling
brzep Nov 15, 2024
8de27b4
fixes
brzep Nov 20, 2024
bc5675c
handle variable url
brzep Nov 20, 2024
ac42ba4
get settings from api, support both or just one track
brzep Nov 22, 2024
e2e1d61
Merge branch 'master' into @wkazmierczak/WHIP-client
brzep Nov 26, 2024
7742fb8
Merge branch 'master' into @wkazmierczak/WHIP-client
brzep Nov 26, 2024
89e88ad
extend disc space druging github build and test job
brzep Nov 27, 2024
87ca86d
Merge remote-tracking branch 'origin/master' into @wkazmierczak/WHIP-…
brzep Nov 28, 2024
4bf6d09
initializing runtime once
brzep Nov 29, 2024
4ef38b0
temporary fix runtime lifetime errors
brzep Nov 29, 2024
ea0fd09
try to extend ci disk space
brzep Nov 29, 2024
d1c6d77
remove new ci jobs, cleanup
brzep Dec 2, 2024
c63b999
fixes
brzep Dec 3, 2024
cbeae91
fixes
brzep Dec 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,162 changes: 1,136 additions & 26 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions compositor_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ compositor_render = { workspace = true }
serde = { workspace = true }
schemars = { workspace = true }
bytes = { workspace = true }
tracing = { workspace = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
compositor_pipeline = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions compositor_api/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub use component::WebView;
pub use register_input::Mp4Input;
pub use register_output::Mp4Output;
pub use register_output::RtpOutput;
pub use register_output::WhipOutput;

pub use register_input::DeckLink;
pub use register_input::RtpInput;
Expand Down
97 changes: 97 additions & 0 deletions compositor_api/src/types/from_register_output.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use axum::http::HeaderValue;
use compositor_pipeline::pipeline::{
self,
encoder::{
Expand All @@ -9,6 +10,7 @@ use compositor_pipeline::pipeline::{
output::{
self,
mp4::{Mp4AudioTrack, Mp4OutputOptions, Mp4VideoTrack},
whip::WhipAudioOptions,
},
};

Expand Down Expand Up @@ -173,6 +175,88 @@ impl TryFrom<Mp4Output> for pipeline::RegisterOutputOptions<output::OutputOption
}
}

impl TryFrom<WhipOutput> for pipeline::RegisterOutputOptions<output::OutputOptions> {
type Error = TypeError;

fn try_from(request: WhipOutput) -> Result<Self, Self::Error> {
let WhipOutput {
endpoint_url,
bearer_token,
video,
audio,
} = request;

if video.is_none() && audio.is_none() {
return Err(TypeError::new(
"At least one of \"video\" and \"audio\" fields have to be specified.",
));
}
let video_codec = video.as_ref().map(|v| match v.encoder {
VideoEncoderOptions::FfmpegH264 { .. } => pipeline::VideoCodec::H264,
});
let audio_options = audio.as_ref().map(|a| match &a.encoder {
WhipAudioEncoderOptions::Opus {
channels,
preset: _,
} => WhipAudioOptions {
codec: pipeline::AudioCodec::Opus,
channels: match channels {
audio::AudioChannels::Mono => {
compositor_pipeline::audio_mixer::AudioChannels::Mono
}
audio::AudioChannels::Stereo => {
compositor_pipeline::audio_mixer::AudioChannels::Stereo
}
},
},
});

if let Some(token) = &bearer_token {
if HeaderValue::from_str(format!("Bearer {token}").as_str()).is_err() {
return Err(TypeError::new("Bearer token string is not valid. It must contain only 32-127 ASCII characters"));
};
}

let (video_encoder_options, output_video_options) = maybe_video_options(video)?;
let (audio_encoder_options, output_audio_options) = match audio {
Some(OutputWhipAudioOptions {
mixing_strategy,
send_eos_when,
encoder,
initial,
}) => {
let audio_encoder_options: AudioEncoderOptions = encoder.into();
let output_audio_options = pipeline::OutputAudioOptions {
initial: initial.try_into()?,
end_condition: send_eos_when.unwrap_or_default().try_into()?,
mixing_strategy: mixing_strategy.unwrap_or(MixingStrategy::SumClip).into(),
channels: audio_encoder_options.channels(),
};

(Some(audio_encoder_options), Some(output_audio_options))
}
None => (None, None),
};

let output_options = output::OutputOptions {
output_protocol: output::OutputProtocolOptions::Whip(output::whip::WhipSenderOptions {
endpoint_url,
bearer_token,
video: video_codec,
audio: audio_options,
}),
video: video_encoder_options,
audio: audio_encoder_options,
};

Ok(Self {
output_options,
video: output_video_options,
audio: output_audio_options,
})
}
}

fn maybe_video_options(
options: Option<OutputVideoOptions>,
) -> Result<
Expand Down Expand Up @@ -230,6 +314,19 @@ impl From<RtpAudioEncoderOptions> for pipeline::encoder::AudioEncoderOptions {
}
}

impl From<WhipAudioEncoderOptions> for pipeline::encoder::AudioEncoderOptions {
fn from(value: WhipAudioEncoderOptions) -> Self {
match value {
WhipAudioEncoderOptions::Opus { channels, preset } => {
AudioEncoderOptions::Opus(encoder::opus::OpusEncoderOptions {
channels: channels.into(),
preset: preset.unwrap_or(OpusEncoderPreset::Voip).into(),
})
}
}
}
}

impl TryFrom<OutputEndCondition> for pipeline::PipelineOutputEndCondition {
type Error = TypeError;

Expand Down
38 changes: 38 additions & 0 deletions compositor_api/src/types/register_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,19 @@ pub struct Mp4Output {
pub audio: Option<OutputMp4AudioOptions>,
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct WhipOutput {
/// WHIP server endpoint
pub endpoint_url: String,
// Bearer token
pub bearer_token: Option<Arc<str>>,
brzep marked this conversation as resolved.
Show resolved Hide resolved
/// Video track configuration.
pub video: Option<OutputVideoOptions>,
/// Audio track configuration.
pub audio: Option<OutputWhipAudioOptions>,
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct OutputVideoOptions {
Expand Down Expand Up @@ -77,6 +90,19 @@ pub struct OutputMp4AudioOptions {
pub initial: Audio,
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct OutputWhipAudioOptions {
/// (**default="sum_clip"**) Specifies how audio should be mixed.
pub mixing_strategy: Option<MixingStrategy>,
/// Condition for termination of output stream based on the input streams states.
pub send_eos_when: Option<OutputEndCondition>,
/// Audio encoder options.
pub encoder: WhipAudioEncoderOptions,
/// Initial audio mixer configuration for output.
pub initial: Audio,
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(tag = "type", rename_all = "snake_case", deny_unknown_fields)]
pub enum VideoEncoderOptions {
Expand Down Expand Up @@ -108,6 +134,18 @@ pub enum Mp4AudioEncoderOptions {
Aac { channels: AudioChannels },
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(tag = "type", rename_all = "snake_case", deny_unknown_fields)]
pub enum WhipAudioEncoderOptions {
Opus {
/// Specifies channels configuration.
channels: AudioChannels,

/// (**default="voip"**) Specifies preset for audio output encoder.
preset: Option<OpusEncoderPreset>,
},
}

/// This type defines when end of an input stream should trigger end of the output stream. Only one of those fields can be set at the time.
/// Unless specified otherwise the input stream is considered finished/ended when:
/// - TCP connection was dropped/closed.
Expand Down
5 changes: 5 additions & 0 deletions compositor_pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ rubato = "0.15.0"
wgpu = { workspace = true }
vk-video = { path = "../vk-video/", optional = true }
glyphon = { workspace = true }
webrtc = "0.11.0"
tokio = {workspace = true }
serde_json = { workspace = true }
serde = { workspace = true }
url = "2.5.2"

[target.x86_64-unknown-linux-gnu.dependencies]
decklink = { path = "../decklink", optional = true }
14 changes: 13 additions & 1 deletion compositor_pipeline/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use compositor_render::{
InputId, OutputId,
};

use crate::pipeline::{decoder::AacDecoderError, VideoCodec};
use crate::pipeline::{decoder::AacDecoderError, output::whip, VideoCodec};
use fdk_aac_sys as fdk;

#[derive(Debug, thiserror::Error)]
Expand All @@ -20,6 +20,9 @@ pub enum InitPipelineError {
#[cfg(feature = "vk-video")]
#[error(transparent)]
VulkanCtxError(#[from] vk_video::VulkanCtxError),

#[error("Failed to create tokio::Runtime.")]
CreateTokioRuntime(#[source] std::io::Error),
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -90,6 +93,15 @@ pub enum OutputInitError {

#[error("Failed to register output. FFmpeg error: {0}.")]
FfmpegMp4Error(ffmpeg_next::Error),

#[error("Unkown Whip output error.")]
UnknownWhipError,

#[error("Whip init timeout exceeded")]
WhipInitTimeout,

#[error("Failed to init whip output")]
WhipInitError(#[source] Box<whip::WhipError>),
}

#[derive(Debug, thiserror::Error)]
Expand Down
20 changes: 14 additions & 6 deletions compositor_pipeline/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use input::RawDataInputOptions;
use output::EncodedDataOutputOptions;
use output::OutputOptions;
use output::RawDataOutputOptions;
use pipeline_output::register_pipeline_output;
use tokio::runtime::Runtime;
use tracing::{error, info, trace, warn};
use types::RawDataSender;

Expand Down Expand Up @@ -124,6 +126,7 @@ pub struct Options {
pub wgpu_features: WgpuFeatures,
pub load_system_fonts: Option<bool>,
pub wgpu_ctx: Option<GraphicsContext>,
pub tokio_rt: Arc<Runtime>,
brzep marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Clone)]
Expand All @@ -132,6 +135,7 @@ pub struct PipelineCtx {
pub output_framerate: Framerate,
pub download_dir: Arc<PathBuf>,
pub event_emitter: Arc<EventEmitter>,
pub tokio_rt: Arc<Runtime>,
#[cfg(feature = "vk-video")]
pub vulkan_ctx: Option<graphics_context::VulkanCtx>,
}
Expand Down Expand Up @@ -194,6 +198,7 @@ impl Pipeline {
output_framerate: opts.queue_options.output_framerate,
download_dir: download_dir.into(),
event_emitter,
tokio_rt: opts.tokio_rt,
#[cfg(feature = "vk-video")]
vulkan_ctx: preinitialized_ctx.and_then(|ctx| ctx.vulkan_ctx),
},
Expand Down Expand Up @@ -252,11 +257,12 @@ impl Pipeline {
}

pub fn register_output(
&mut self,
pipeline: &Arc<Mutex<Self>>,
output_id: OutputId,
register_options: RegisterOutputOptions<OutputOptions>,
) -> Result<Option<Port>, RegisterOutputError> {
self.register_pipeline_output(
register_pipeline_output(
pipeline,
output_id,
&register_options.output_options,
register_options.video,
Expand All @@ -265,11 +271,12 @@ impl Pipeline {
}

pub fn register_encoded_data_output(
&mut self,
pipeline: &Arc<Mutex<Self>>,
output_id: OutputId,
register_options: RegisterOutputOptions<EncodedDataOutputOptions>,
) -> Result<Receiver<EncoderOutputEvent>, RegisterOutputError> {
self.register_pipeline_output(
register_pipeline_output(
pipeline,
output_id,
&register_options.output_options,
register_options.video,
Expand All @@ -278,11 +285,12 @@ impl Pipeline {
}

pub fn register_raw_data_output(
&mut self,
pipeline: &Arc<Mutex<Self>>,
output_id: OutputId,
register_options: RegisterOutputOptions<RawDataOutputOptions>,
) -> Result<RawDataReceiver, RegisterOutputError> {
self.register_pipeline_output(
register_pipeline_output(
pipeline,
output_id,
&register_options.output_options,
register_options.video,
Expand Down
14 changes: 12 additions & 2 deletions compositor_pipeline/src/pipeline/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ impl Encoder {
}
}

pub fn keyframe_request_sender(&self) -> Option<Sender<()>> {
match self.video.as_ref() {
Some(VideoEncoder::H264(encoder)) => Some(encoder.keyframe_request_sender().clone()),
None => {
error!("Non video encoder received keyframe request.");
None
}
}
}

pub fn samples_batch_sender(&self) -> Option<&Sender<PipelineEvent<OutputSamples>>> {
match &self.audio {
Some(encoder) => Some(encoder.samples_batch_sender()),
Expand Down Expand Up @@ -138,9 +148,9 @@ impl VideoEncoder {
}
}

pub fn request_keyframe(&self) {
pub fn keyframe_request_sender(&self) -> Sender<()> {
match self {
Self::H264(encoder) => encoder.request_keyframe(),
Self::H264(encoder) => encoder.keyframe_request_sender(),
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions compositor_pipeline/src/pipeline/encoder/ffmpeg_h264.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,8 @@ impl LibavH264Encoder {
self.resolution
}

pub fn request_keyframe(&self) {
if let Err(err) = self.keyframe_req_sender.send(()) {
debug!(%err, "Failed to send keyframe request to the encoder.");
}
pub fn keyframe_request_sender(&self) -> Sender<()> {
self.keyframe_req_sender.clone()
}
}

Expand Down
Loading