From dbc9b452ca874780132ed2d55084882577398e08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Ka=C5=BAmierczak?= Date: Thu, 16 Jan 2025 16:34:02 +0100 Subject: [PATCH] WHIP-server (#881) Co-authored-by: bartosz rzepa --- Cargo.lock | 207 ++++++++------ Cargo.toml | 6 +- compositor_api/src/types.rs | 6 +- .../src/types/from_register_input.rs | 80 +++++- compositor_api/src/types/register_input.rs | 35 +++ compositor_pipeline/Cargo.toml | 6 +- compositor_pipeline/src/error.rs | 6 + compositor_pipeline/src/pipeline.rs | 47 ++- .../src/pipeline/decoder/video/ffmpeg_h264.rs | 2 +- compositor_pipeline/src/pipeline/input.rs | 9 + .../src/pipeline/input/whip.rs | 215 ++++++++++++++ .../src/pipeline/input/whip/depayloader.rs | 268 ++++++++++++++++++ compositor_pipeline/src/pipeline/whip_whep.rs | 179 ++++++++++++ .../src/pipeline/whip_whep/bearer_token.rs | 52 ++++ .../src/pipeline/whip_whep/error.rs | 47 +++ .../whip_whep/init_peer_connection.rs | 100 +++++++ .../src/pipeline/whip_whep/whip_handlers.rs | 3 + .../whip_handlers/create_whip_session.rs | 154 ++++++++++ .../whip_handlers/new_whip_ice_candidates.rs | 81 ++++++ .../whip_handlers/terminate_whip_session.rs | 48 ++++ generate/src/compositor_instance.rs | 1 + .../manual_graphics_initialization.rs | 2 + .../examples/raw_channel_input.rs | 2 + .../examples/raw_channel_output.rs | 2 + integration_tests/examples/whip_server.rs | 131 +++++++++ integration_tests/src/bin/benchmark/main.rs | 2 + integration_tests/src/compositor_instance.rs | 3 +- src/config.rs | 16 ++ src/routes/register_request.rs | 7 +- src/state.rs | 7 + 30 files changed, 1626 insertions(+), 98 deletions(-) create mode 100644 compositor_pipeline/src/pipeline/input/whip.rs create mode 100644 compositor_pipeline/src/pipeline/input/whip/depayloader.rs create mode 100644 compositor_pipeline/src/pipeline/whip_whep.rs create mode 100644 compositor_pipeline/src/pipeline/whip_whep/bearer_token.rs create mode 100644 compositor_pipeline/src/pipeline/whip_whep/error.rs create mode 100644 compositor_pipeline/src/pipeline/whip_whep/init_peer_connection.rs create mode 100644 compositor_pipeline/src/pipeline/whip_whep/whip_handlers.rs create mode 100644 compositor_pipeline/src/pipeline/whip_whep/whip_handlers/create_whip_session.rs create mode 100644 compositor_pipeline/src/pipeline/whip_whep/whip_handlers/new_whip_ice_candidates.rs create mode 100644 compositor_pipeline/src/pipeline/whip_whep/whip_handlers/terminate_whip_session.rs create mode 100644 integration_tests/examples/whip_server.rs diff --git a/Cargo.lock b/Cargo.lock index c4b3ae267..628e1f1bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -363,13 +363,14 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.7.5" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" +checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" dependencies = [ "async-trait", "axum-core", - "base64 0.21.2", + "axum-macros", + "base64 0.22.1", "bytes", "futures-util", "http", @@ -392,8 +393,8 @@ dependencies = [ "sha1", "sync_wrapper 1.0.1", "tokio", - "tokio-tungstenite", - "tower", + "tokio-tungstenite 0.24.0", + "tower 0.5.1", "tower-layer", "tower-service", "tracing", @@ -401,9 +402,9 @@ dependencies = [ [[package]] name = "axum-core" -version = "0.4.3" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" dependencies = [ "async-trait", "bytes", @@ -414,12 +415,23 @@ dependencies = [ "mime", "pin-project-lite", "rustversion", - "sync_wrapper 0.1.2", + "sync_wrapper 1.0.1", "tower-layer", "tower-service", "tracing", ] +[[package]] +name = "axum-macros" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d123550fa8d071b7255cb0cc04dc302baa6c8c4a79f55701552684d8399bce" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "backtrace" version = "0.3.68" @@ -912,6 +924,7 @@ name = "compositor_pipeline" version = "0.1.0" dependencies = [ "anyhow", + "axum", "bytes", "compositor_render", "crossbeam-channel", @@ -924,19 +937,22 @@ dependencies = [ "opus", "rand", "reqwest", - "rtcp 0.10.1", - "rtp 0.9.0", + "rtcp", + "rtp", "rubato", "serde", "serde_json", "socket2", "thiserror", "tokio", + "tower-http", "tracing", + "tracing-subscriber 0.3.18", "url", + "urlencoding", "vk-video", "webrtc", - "webrtc-util 0.8.1", + "webrtc-util", "wgpu", ] @@ -1854,8 +1870,8 @@ dependencies = [ "pitch-detection", "rand", "reqwest", - "rtcp 0.10.1", - "rtp 0.9.0", + "rtcp", + "rtp", "schemars", "serde", "serde_json", @@ -1863,7 +1879,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber 0.3.18", - "webrtc-util 0.8.1", + "webrtc-util", ] [[package]] @@ -2286,7 +2302,7 @@ dependencies = [ "pin-project-lite", "socket2", "tokio", - "tower", + "tower 0.4.13", "tower-service", "tracing", ] @@ -2413,16 +2429,16 @@ dependencies = [ "rand", "regex", "reqwest", - "rtcp 0.10.1", - "rtp 0.9.0", + "rtcp", + "rtp", "serde", "serde_json", "signal-hook", "socket2", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.21.0", "tracing", - "webrtc-util 0.8.1", + "webrtc-util", "wgpu", ] @@ -2437,13 +2453,13 @@ dependencies = [ "log", "portable-atomic", "rand", - "rtcp 0.11.0", - "rtp 0.11.0", + "rtcp", + "rtp", "thiserror", "tokio", "waitgroup", "webrtc-srtp", - "webrtc-util 0.9.0", + "webrtc-util", ] [[package]] @@ -2652,7 +2668,7 @@ dependencies = [ "log", "rand", "reqwest", - "rtp 0.9.0", + "rtp", "rubato", "schemars", "serde", @@ -2664,7 +2680,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber 0.3.18", - "webrtc-util 0.8.1", + "webrtc-util", "wgpu", ] @@ -3877,17 +3893,6 @@ dependencies = [ "xmlparser", ] -[[package]] -name = "rtcp" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33648a781874466a62d89e265fee9f17e32bc7d05a256e6cca41bf97eadcd8aa" -dependencies = [ - "bytes", - "thiserror", - "webrtc-util 0.8.1", -] - [[package]] name = "rtcp" version = "0.11.0" @@ -3896,20 +3901,7 @@ checksum = "fc9f775ff89c5fe7f0cc0abafb7c57688ae25ce688f1a52dd88e277616c76ab2" dependencies = [ "bytes", "thiserror", - "webrtc-util 0.9.0", -] - -[[package]] -name = "rtp" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e60482acbe8afb31edf6b1413103b7bca7a65004c423b3c3993749a083994fbe" -dependencies = [ - "bytes", - "rand", - "serde", - "thiserror", - "webrtc-util 0.8.1", + "webrtc-util", ] [[package]] @@ -3923,7 +3915,7 @@ dependencies = [ "rand", "serde", "thiserror", - "webrtc-util 0.9.0", + "webrtc-util", ] [[package]] @@ -4568,7 +4560,7 @@ dependencies = [ "thiserror", "tokio", "url", - "webrtc-util 0.9.0", + "webrtc-util", ] [[package]] @@ -4935,7 +4927,19 @@ dependencies = [ "futures-util", "log", "tokio", - "tungstenite", + "tungstenite 0.21.0", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.24.0", ] [[package]] @@ -4982,9 +4986,38 @@ dependencies = [ "tokio", "tower-layer", "tower-service", +] + +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 0.1.2", + "tokio", + "tower-layer", + "tower-service", "tracing", ] +[[package]] +name = "tower-http" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8437150ab6bbc8c5f0f519e3d5ed4aa883a83dd4cdd3d1b21f9482936046cb97" +dependencies = [ + "bitflags 2.6.0", + "bytes", + "http", + "pin-project-lite", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.3" @@ -5181,6 +5214,24 @@ dependencies = [ "utf-8", ] +[[package]] +name = "tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "turn" version = "0.8.0" @@ -5199,7 +5250,7 @@ dependencies = [ "thiserror", "tokio", "tokio-util", - "webrtc-util 0.9.0", + "webrtc-util", ] [[package]] @@ -5328,6 +5379,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "usvg" version = "0.35.0" @@ -5719,8 +5776,8 @@ dependencies = [ "rcgen", "regex", "ring", - "rtcp 0.11.0", - "rtp 0.11.0", + "rtcp", + "rtp", "rustls", "sdp", "serde", @@ -5741,7 +5798,7 @@ dependencies = [ "webrtc-media", "webrtc-sctp", "webrtc-srtp", - "webrtc-util 0.9.0", + "webrtc-util", ] [[package]] @@ -5756,7 +5813,7 @@ dependencies = [ "thiserror", "tokio", "webrtc-sctp", - "webrtc-util 0.9.0", + "webrtc-util", ] [[package]] @@ -5791,7 +5848,7 @@ dependencies = [ "subtle", "thiserror", "tokio", - "webrtc-util 0.9.0", + "webrtc-util", "x25519-dalek", "x509-parser", ] @@ -5818,7 +5875,7 @@ dependencies = [ "uuid", "waitgroup", "webrtc-mdns", - "webrtc-util 0.9.0", + "webrtc-util", ] [[package]] @@ -5831,7 +5888,7 @@ dependencies = [ "socket2", "thiserror", "tokio", - "webrtc-util 0.9.0", + "webrtc-util", ] [[package]] @@ -5843,7 +5900,7 @@ dependencies = [ "byteorder", "bytes", "rand", - "rtp 0.11.0", + "rtp", "thiserror", ] @@ -5862,7 +5919,7 @@ dependencies = [ "rand", "thiserror", "tokio", - "webrtc-util 0.9.0", + "webrtc-util", ] [[package]] @@ -5879,33 +5936,13 @@ dependencies = [ "ctr", "hmac", "log", - "rtcp 0.11.0", - "rtp 0.11.0", + "rtcp", + "rtp", "sha1", "subtle", "thiserror", "tokio", - "webrtc-util 0.9.0", -] - -[[package]] -name = "webrtc-util" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e85154ef743d9a2a116d104faaaa82740a281b8b4bed5ee691a2df6c133d873" -dependencies = [ - "async-trait", - "bitflags 1.3.2", - "bytes", - "ipnet", - "lazy_static", - "libc", - "log", - "nix 0.26.4", - "rand", - "thiserror", - "tokio", - "winapi", + "webrtc-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index b7c10c00f..e3ab92494 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,12 +43,12 @@ signal-hook = "0.3.15" ffmpeg-next = "7.1.0" anyhow = "1.0.71" image = { version = "0.24.7", features = ["jpeg", "png"] } -rtp = "0.9.0" -rtcp = "0.10.0" +rtp = "0.11.0" +rtcp = "0.11.0" rand = "0.8.5" tracing = "0.1.40" socket2 = "0.5.5" -webrtc-util = "0.8.0" +webrtc-util = "0.9.0" opus = "0.3.0" rubato = "0.15.0" glyphon = "0.6.0" diff --git a/compositor_api/src/types.rs b/compositor_api/src/types.rs index 68b6450eb..b4a15ffb8 100644 --- a/compositor_api/src/types.rs +++ b/compositor_api/src/types.rs @@ -38,14 +38,14 @@ pub use component::Tiles; pub use component::View; pub use component::WebView; +pub use register_input::DeckLink; pub use register_input::Mp4Input; +pub use register_input::RtpInput; +pub use register_input::WhipInput; pub use register_output::Mp4Output; pub use register_output::RtpOutput; pub use register_output::WhipOutput; -pub use register_input::DeckLink; -pub use register_input::RtpInput; - pub use renderer::ImageSpec; pub use renderer::ShaderSpec; pub use renderer::WebRendererSpec; diff --git a/compositor_api/src/types/from_register_input.rs b/compositor_api/src/types/from_register_input.rs index f11332c43..6829803a7 100644 --- a/compositor_api/src/types/from_register_input.rs +++ b/compositor_api/src/types/from_register_input.rs @@ -4,7 +4,7 @@ use bytes::Bytes; use compositor_pipeline::{ pipeline::{ self, decoder, - input::{self, rtp::InputAudioStream}, + input::{self, rtp, whip}, }, queue, }; @@ -37,7 +37,7 @@ fn parse_hexadecimal_octet_string(s: &str) -> Result { .collect() } -impl TryFrom for InputAudioStream { +impl TryFrom for rtp::InputAudioStream { type Error = TypeError; fn try_from(audio: InputRtpAudioOptions) -> Result { @@ -150,6 +150,82 @@ impl TryFrom for pipeline::RegisterInputOptions { } } +impl TryFrom for whip::InputAudioStream { + type Error = TypeError; + + fn try_from(audio: InputWhipAudioOptions) -> Result { + match audio { + InputWhipAudioOptions::Opus { + forward_error_correction, + } => { + let forward_error_correction = forward_error_correction.unwrap_or(false); + Ok(input::whip::InputAudioStream { + options: decoder::OpusDecoderOptions { + forward_error_correction, + }, + }) + } + } + } +} + +impl TryFrom for pipeline::RegisterInputOptions { + type Error = TypeError; + + fn try_from(value: WhipInput) -> Result { + let WhipInput { + video, + audio, + required, + offset_ms, + } = value; + + const NO_VIDEO_AUDIO_SPEC: &str = + "At least one of `video` and `audio` has to be specified in `register_input` request."; + + if video.is_none() && audio.is_none() { + return Err(TypeError::new(NO_VIDEO_AUDIO_SPEC)); + } + + let whip_receiver_options = input::whip::WhipReceiverOptions { + video: video + .as_ref() + .map(|video| { + Ok(input::whip::InputVideoStream { + options: match video.decoder { + VideoDecoder::FfmpegH264 => decoder::VideoDecoderOptions { + decoder: pipeline::VideoDecoder::FFmpegH264, + }, + #[cfg(feature = "vk-video")] + VideoDecoder::VulkanVideo => decoder::VideoDecoderOptions { + decoder: pipeline::VideoDecoder::VulkanVideoH264, + }, + #[cfg(not(feature = "vk-video"))] + VideoDecoder::VulkanVideo => { + return Err(TypeError::new(NO_VULKAN_VIDEO)) + } + }, + }) + }) + .transpose()?, + audio: audio.map(TryFrom::try_from).transpose()?, + }; + + let input_options = input::InputOptions::Whip(whip_receiver_options); + + let queue_options = queue::QueueInputOptions { + required: required.unwrap_or(false), + offset: offset_ms.map(|offset_ms| Duration::from_secs_f64(offset_ms / 1000.0)), + buffer_duration: None, + }; + + Ok(pipeline::RegisterInputOptions { + input_options, + queue_options, + }) + } +} + impl TryFrom for pipeline::RegisterInputOptions { type Error = TypeError; diff --git a/compositor_api/src/types/register_input.rs b/compositor_api/src/types/register_input.rs index 93d919f78..05eab37fa 100644 --- a/compositor_api/src/types/register_input.rs +++ b/compositor_api/src/types/register_input.rs @@ -27,6 +27,24 @@ pub struct RtpInput { pub offset_ms: Option, } +/// Parameters for an input stream for WHIP server. +/// At least one of `video` and `audio` has to be defined. +#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct WhipInput { + /// Parameters of a video source included in the RTP stream. + pub video: Option, + /// Parameters of an audio source included in the RTP stream. + pub audio: Option, + /// (**default=`false`**) If input is required and the stream is not delivered + /// on time, then LiveCompositor will delay producing output frames. + pub required: Option, + /// Offset in milliseconds relative to the pipeline start (start request). If the offset is + /// not defined then the stream will be synchronized based on the delivery time of the initial + /// frames. + pub offset_ms: Option, +} + /// Input stream from MP4 file. /// Exactly one of `url` and `path` has to be defined. #[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] @@ -123,12 +141,29 @@ pub enum InputRtpAudioOptions { }, } +#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] +#[serde(tag = "decoder", rename_all = "snake_case", deny_unknown_fields)] +pub enum InputWhipAudioOptions { + Opus { + /// (**default=`false`**) Specifies whether the stream uses forward error correction. + /// It's specific for Opus codec. + /// For more information, check out [RFC](https://datatracker.ietf.org/doc/html/rfc6716#section-2.1.7). + forward_error_correction: Option, + }, +} + #[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] #[serde(rename_all = "snake_case", deny_unknown_fields)] pub struct InputRtpVideoOptions { pub decoder: VideoDecoder, } +#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] +#[serde(rename_all = "snake_case", deny_unknown_fields)] +pub struct InputWhipVideoOptions { + pub decoder: VideoDecoder, +} + #[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] #[serde(rename_all = "snake_case", deny_unknown_fields)] pub enum VideoDecoder { diff --git a/compositor_pipeline/Cargo.toml b/compositor_pipeline/Cargo.toml index 37c905f98..160f602cf 100644 --- a/compositor_pipeline/Cargo.toml +++ b/compositor_pipeline/Cargo.toml @@ -32,10 +32,14 @@ wgpu = { workspace = true } vk-video = { path = "../vk-video/", optional = true } glyphon = { workspace = true } webrtc = "0.11.0" -tokio = {workspace = true } +tokio = { workspace = true } serde_json = { workspace = true } serde = { workspace = true } +axum = { version = "0.7.7", features = ["macros"] } +tower-http = { version = "0.6.1", features = ["cors"] } +tracing-subscriber = "0.3.18" url = "2.5.2" +urlencoding = "2.1.3" [target.x86_64-unknown-linux-gnu.dependencies] decklink = { path = "../decklink", optional = true } diff --git a/compositor_pipeline/src/error.rs b/compositor_pipeline/src/error.rs index 3f8d95c7a..1d7183faa 100644 --- a/compositor_pipeline/src/error.rs +++ b/compositor_pipeline/src/error.rs @@ -23,6 +23,9 @@ pub enum InitPipelineError { #[error("Failed to create tokio::Runtime.")] CreateTokioRuntime(#[source] std::io::Error), + + #[error("Failed to initialize WHIP WHEP server.")] + WhipWhepServerInitError, } #[derive(Debug, thiserror::Error)] @@ -127,6 +130,9 @@ pub enum InputInitError { #[error(transparent)] Mp4(#[from] crate::pipeline::input::mp4::Mp4Error), + #[error(transparent)] + Whip(#[from] crate::pipeline::input::whip::WhipReceiverError), + #[cfg(feature = "decklink")] #[error(transparent)] DeckLink(#[from] crate::pipeline::input::decklink::DeckLinkError), diff --git a/compositor_pipeline/src/pipeline.rs b/compositor_pipeline/src/pipeline.rs index fe0bf3211..18dbe7fed 100644 --- a/compositor_pipeline/src/pipeline.rs +++ b/compositor_pipeline/src/pipeline.rs @@ -27,8 +27,11 @@ use output::OutputOptions; use output::RawDataOutputOptions; use pipeline_output::register_pipeline_output; use tokio::runtime::Runtime; +use tokio::sync::oneshot; use tracing::{error, info, trace, warn}; use types::RawDataSender; +use whip_whep::run_whip_whep_server; +use whip_whep::WhipWhepState; use crate::audio_mixer::AudioMixer; use crate::audio_mixer::MixingStrategy; @@ -57,6 +60,7 @@ mod pipeline_input; mod pipeline_output; pub mod rtp; mod types; +pub mod whip_whep; use self::pipeline_input::register_pipeline_input; use self::pipeline_input::PipelineInput; @@ -113,6 +117,7 @@ pub struct Pipeline { renderer: Renderer, audio_mixer: AudioMixer, is_started: bool, + shutdown_whip_whep_sender: Option>, } #[derive(Debug)] @@ -127,6 +132,8 @@ pub struct Options { pub wgpu_features: WgpuFeatures, pub load_system_fonts: Option, pub wgpu_ctx: Option, + pub whip_whep_server_port: Option, + pub start_whip_whep: bool, pub tokio_rt: Option>, } @@ -137,7 +144,9 @@ pub struct PipelineCtx { pub stun_servers: Arc>, pub download_dir: Arc, pub event_emitter: Arc, + pub whip_whep_state: Arc, pub tokio_rt: Arc, + pub start_whip_whep: bool, #[cfg(feature = "vk-video")] pub vulkan_ctx: Option, } @@ -191,6 +200,34 @@ impl Pipeline { Some(tokio_rt) => tokio_rt, None => Arc::new(Runtime::new().map_err(InitPipelineError::CreateTokioRuntime)?), }; + let stun_servers = opts.stun_servers; + let whip_whep_state = WhipWhepState::new(stun_servers.clone()); + let start_whip_whep = opts.start_whip_whep; + let shutdown_whip_whep_sender = if start_whip_whep { + if let Some(port) = opts.whip_whep_server_port { + let whip_whep_state = whip_whep_state.clone(); + let (sender, receiver) = oneshot::channel(); + let (init_result_sender, init_result_receiver) = oneshot::channel(); + tokio_rt.spawn(async move { + run_whip_whep_server(port, whip_whep_state, receiver, init_result_sender).await + }); + match init_result_receiver.blocking_recv() { + Ok(init_result) => init_result?, + Err(err) => { + error!( + "Error while receiving WHIP WHEP server initialization result {:?}", + err + ) + } + } + Some(sender) + } else { + error!("WHIP WHEP server port not specified."); + None + } + } else { + None + }; let event_emitter = Arc::new(EventEmitter::new()); let pipeline = Pipeline { outputs: HashMap::new(), @@ -199,13 +236,16 @@ impl Pipeline { renderer, audio_mixer: AudioMixer::new(opts.output_sample_rate), is_started: false, + shutdown_whip_whep_sender, ctx: PipelineCtx { output_sample_rate: opts.output_sample_rate, output_framerate: opts.queue_options.output_framerate, - stun_servers: opts.stun_servers, + stun_servers, download_dir: download_dir.into(), event_emitter, tokio_rt, + whip_whep_state, + start_whip_whep, #[cfg(feature = "vk-video")] vulkan_ctx: preinitialized_ctx.and_then(|ctx| ctx.vulkan_ctx), }, @@ -467,6 +507,11 @@ impl Pipeline { impl Drop for Pipeline { fn drop(&mut self) { + if let Some(sender) = self.shutdown_whip_whep_sender.take() { + if sender.send(()).is_err() { + error!("Cannot send shutdown signal to WHIP WHEP server") + } + } self.queue.shutdown() } } diff --git a/compositor_pipeline/src/pipeline/decoder/video/ffmpeg_h264.rs b/compositor_pipeline/src/pipeline/decoder/video/ffmpeg_h264.rs index 8f6c2cdd1..7628b4285 100644 --- a/compositor_pipeline/src/pipeline/decoder/video/ffmpeg_h264.rs +++ b/compositor_pipeline/src/pipeline/decoder/video/ffmpeg_h264.rs @@ -127,7 +127,7 @@ fn run_decoder_thread( match decoder.send_packet(&av_packet) { Ok(()) => {} Err(e) => { - warn!("Failed to send a packet to decoder: {}", e); + warn!("Failed to send a packet to decoder: {:?}", e); continue; } } diff --git a/compositor_pipeline/src/pipeline/input.rs b/compositor_pipeline/src/pipeline/input.rs index 7705be939..69848f812 100644 --- a/compositor_pipeline/src/pipeline/input.rs +++ b/compositor_pipeline/src/pipeline/input.rs @@ -8,6 +8,7 @@ use crate::{ use compositor_render::{Frame, InputId}; use crossbeam_channel::{bounded, Receiver}; use rtp::{RtpReceiver, RtpReceiverOptions}; +use whip::{WhipReceiver, WhipReceiverOptions}; use self::mp4::{Mp4, Mp4Options}; @@ -24,10 +25,12 @@ use super::{ pub mod decklink; pub mod mp4; pub mod rtp; +pub mod whip; pub enum Input { Rtp(RtpReceiver), Mp4(Mp4), + Whip(WhipReceiver), #[cfg(feature = "decklink")] DeckLink(decklink::DeckLink), RawDataInput, @@ -37,6 +40,7 @@ pub enum Input { pub enum InputOptions { Rtp(RtpReceiverOptions), Mp4(Mp4Options), + Whip(WhipReceiverOptions), #[cfg(feature = "decklink")] DeckLink(decklink::DeckLinkOptions), } @@ -55,6 +59,9 @@ pub enum InputInitInfo { video_duration: Option, audio_duration: Option, }, + Whip { + bearer_token: String, + }, Other, } @@ -65,6 +72,7 @@ struct InputInitResult { init_info: InputInitInfo, } +#[derive(Debug)] pub(super) enum VideoInputReceiver { #[allow(dead_code)] Raw { @@ -157,6 +165,7 @@ fn start_input_threads( InputOptions::Mp4(opts) => { Mp4::start_new_input(input_id, opts, &pipeline_ctx.download_dir)? } + InputOptions::Whip(opts) => WhipReceiver::start_new_input(input_id, opts, pipeline_ctx)?, #[cfg(feature = "decklink")] InputOptions::DeckLink(opts) => decklink::DeckLink::start_new_input(input_id, opts)?, }; diff --git a/compositor_pipeline/src/pipeline/input/whip.rs b/compositor_pipeline/src/pipeline/input/whip.rs new file mode 100644 index 000000000..4319e12ff --- /dev/null +++ b/compositor_pipeline/src/pipeline/input/whip.rs @@ -0,0 +1,215 @@ +use std::{ + sync::{Arc, Mutex}, + thread, + time::Duration, +}; +use tokio::sync::mpsc; +use webrtc::track::track_remote::TrackRemote; + +use depayloader::Depayloader; +use tracing::{error, warn, Span}; + +use crate::{ + pipeline::{ + decoder, + types::EncodedChunk, + whip_whep::{bearer_token::generate_token, WhipInputConnectionOptions, WhipWhepState}, + PipelineCtx, + }, + queue::PipelineEvent, +}; +use compositor_render::InputId; +use crossbeam_channel::Sender; +use tracing::{debug, span, Level}; + +use super::{AudioInputReceiver, Input, InputInitInfo, InputInitResult, VideoInputReceiver}; + +pub mod depayloader; + +#[derive(Debug, thiserror::Error)] +pub enum WhipReceiverError { + #[error("WHIP WHEP server is not running, cannot start WHIP input")] + WhipWhepServerNotRunning, +} + +#[derive(Debug, Clone)] +pub struct WhipReceiverOptions { + pub video: Option, + pub audio: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct InputVideoStream { + pub options: decoder::VideoDecoderOptions, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct InputAudioStream { + pub options: decoder::OpusDecoderOptions, +} + +pub struct WhipReceiver { + whip_whep_state: Arc, + input_id: InputId, +} + +impl WhipReceiver { + pub(super) fn start_new_input( + input_id: &InputId, + opts: WhipReceiverOptions, + pipeline_ctx: &PipelineCtx, + ) -> Result { + if !pipeline_ctx.start_whip_whep { + return Err(WhipReceiverError::WhipWhepServerNotRunning); + } + let bearer_token = generate_token(); + let whip_whep_state = pipeline_ctx.whip_whep_state.clone(); + let depayloader = Arc::from(Mutex::new(Depayloader::new(&opts))); + + let (video_sender_async, video) = match opts.video { + Some(stream) => { + let (async_sender, async_receiver) = mpsc::channel(100); + let (sync_sender, sync_receiver) = crossbeam_channel::bounded(100); + let span = span!( + Level::INFO, + "WHIP server video async-to-sync bridge", + input_id = input_id.to_string() + ); + Self::start_forwarding_thread(async_receiver, sync_sender, span); + ( + Some(async_sender), + Some(VideoInputReceiver::Encoded { + chunk_receiver: sync_receiver, + decoder_options: stream.options, + }), + ) + } + None => (None, None), + }; + + let (audio_sender_async, audio) = match opts.audio { + Some(stream) => { + let (async_sender, async_receiver) = mpsc::channel(100); + let (sync_sender, sync_receiver) = crossbeam_channel::bounded(100); + let span = span!( + Level::INFO, + "WHIP server audio async-to-sync bridge", + input_id = input_id.to_string(), + ); + Self::start_forwarding_thread(async_receiver, sync_sender, span); + ( + Some(async_sender), + Some(AudioInputReceiver::Encoded { + chunk_receiver: sync_receiver, + decoder_options: decoder::AudioDecoderOptions::Opus(stream.options), + }), + ) + } + None => (None, None), + }; + + let mut input_connections = whip_whep_state.input_connections.lock().unwrap(); + input_connections.insert( + input_id.clone(), + WhipInputConnectionOptions { + audio_sender: audio_sender_async.clone(), + video_sender: video_sender_async.clone(), + bearer_token: Some(bearer_token.clone()), + peer_connection: None, + start_time_vid: None, + start_time_aud: None, + depayloader, + }, + ); + + Ok(InputInitResult { + input: Input::Whip(Self { + whip_whep_state: whip_whep_state.clone(), + input_id: input_id.clone(), + }), + video, + audio, + init_info: InputInitInfo::Whip { bearer_token }, + }) + } + + fn start_forwarding_thread( + mut async_receiver: mpsc::Receiver>, + sync_sender: Sender>, + span: Span, + ) { + thread::spawn(move || { + let _span = span.entered(); + loop { + let Some(chunk) = async_receiver.blocking_recv() else { + debug!("Closing WHIP async-to-sync bridge."); + break; + }; + + if let Err(err) = sync_sender.send(chunk) { + debug!("Failed to send Encoded Chunk. Channel closed: {:?}", err); + break; + } + } + }); + } +} + +impl Drop for WhipReceiver { + fn drop(&mut self) { + let mut connections = self.whip_whep_state.input_connections.lock().unwrap(); + if let Some(connection) = connections.get_mut(&self.input_id) { + if let Some(peer_connection) = connection.peer_connection.clone() { + let input_id = self.input_id.clone(); + tokio::spawn(async move { + if let Err(err) = peer_connection.close().await { + error!("Cannot close peer_connection for {:?}: {:?}", input_id, err); + }; + }); + } + } + connections.remove(&self.input_id); + } +} + +pub async fn process_track_stream( + track: Arc, + state: Arc, + input_id: InputId, + depayloader: Arc>, + sender: mpsc::Sender>, +) { + let track_kind = track.kind(); + let time_elapsed_from_input_start = + state.get_time_elapsed_from_input_start(input_id, track_kind); + + //TODO send PipelineEvent::NewPeerConnection to reset queue and decoder(drop remaining frames from previous stream) + + let mut first_pts_current_stream = None; + + while let Ok((rtp_packet, _)) = track.read_rtp().await { + let chunks = match depayloader + .lock() + .unwrap() + .depayload(rtp_packet, track_kind) + { + Ok(chunks) => chunks, + Err(err) => { + warn!("RTP depayloading error: {err:?}"); + continue; + } + }; + + if let Some(first_chunk) = chunks.first() { + first_pts_current_stream.get_or_insert(first_chunk.pts); + } + + for mut chunk in chunks { + chunk.pts = chunk.pts + time_elapsed_from_input_start.unwrap_or(Duration::ZERO) + - first_pts_current_stream.unwrap_or(Duration::ZERO); + if let Err(e) = sender.send(PipelineEvent::Data(chunk)).await { + debug!("Failed to send audio RTP packet: {e}"); + } + } + } +} diff --git a/compositor_pipeline/src/pipeline/input/whip/depayloader.rs b/compositor_pipeline/src/pipeline/input/whip/depayloader.rs new file mode 100644 index 000000000..ec875a956 --- /dev/null +++ b/compositor_pipeline/src/pipeline/input/whip/depayloader.rs @@ -0,0 +1,268 @@ +use std::{mem, time::Duration}; + +use bytes::Bytes; +use log::error; +use rtp::{ + codecs::{h264::H264Packet, opus::OpusPacket}, + packetizer::Depacketizer, +}; +use webrtc::rtp_transceiver::rtp_codec::RTPCodecType; + +use crate::pipeline::{ + decoder, + types::{AudioCodec, EncodedChunk, EncodedChunkKind, IsKeyframe, VideoCodec}, + VideoDecoder, +}; + +use super::WhipReceiverOptions; + +#[derive(Debug, thiserror::Error)] +pub enum DepayloadingError { + #[error("Bad payload type {0}")] + BadPayloadType(u8), + #[error(transparent)] + Rtp(#[from] rtp::Error), +} + +#[derive(Debug)] +pub struct Depayloader { + pub video: Option, + pub audio: Option, +} + +impl Depayloader { + pub fn new(stream: &WhipReceiverOptions) -> Self { + let video = stream + .video + .as_ref() + .map(|video| VideoDepayloader::new(&video.options)); + + let audio = stream.audio.as_ref().map(|_| AudioDepayloader::new()); + + Self { video, audio } + } + + pub fn depayload( + &mut self, + packet: rtp::packet::Packet, + track_kind: RTPCodecType, + ) -> Result, DepayloadingError> { + match track_kind { + RTPCodecType::Video => match self.video.as_mut() { + Some(video_depayloader) => video_depayloader.depayload(packet), + None => Err(DepayloadingError::BadPayloadType( + packet.header.payload_type, + )), + }, + RTPCodecType::Audio => match self.audio.as_mut() { + Some(audio_depayloader) => audio_depayloader.depayload(packet), + None => Err(DepayloadingError::BadPayloadType( + packet.header.payload_type, + )), + }, + _ => Err(DepayloadingError::BadPayloadType( + packet.header.payload_type, + )), + } + } +} + +#[derive(Debug)] +pub enum VideoDepayloader { + H264 { + depayloader: H264Packet, + buffer: Vec, + rollover_state: RolloverState, + }, +} + +impl VideoDepayloader { + pub fn new(options: &decoder::VideoDecoderOptions) -> Self { + match options.decoder { + VideoDecoder::FFmpegH264 => VideoDepayloader::H264 { + depayloader: H264Packet::default(), + buffer: vec![], + rollover_state: RolloverState::default(), + }, + + #[cfg(feature = "vk-video")] + VideoDecoder::VulkanVideoH264 => VideoDepayloader::H264 { + depayloader: H264Packet::default(), + buffer: vec![], + rollover_state: RolloverState::default(), + }, + } + } + + fn depayload( + &mut self, + packet: rtp::packet::Packet, + ) -> Result, DepayloadingError> { + match self { + VideoDepayloader::H264 { + depayloader, + buffer, + rollover_state, + } => { + let kind = EncodedChunkKind::Video(VideoCodec::H264); + let h264_chunk = depayloader.depacketize(&packet.payload)?; + + if h264_chunk.is_empty() { + return Ok(Vec::new()); + } + + buffer.push(h264_chunk); + if !packet.header.marker { + // the marker bit is set on the last packet of an access unit + return Ok(Vec::new()); + } + + let timestamp = rollover_state.timestamp(packet.header.timestamp); + let new_chunk = EncodedChunk { + data: mem::take(buffer).concat().into(), + pts: Duration::from_secs_f64(timestamp as f64 / 90000.0), + dts: None, + is_keyframe: IsKeyframe::Unknown, + kind, + }; + + Ok(vec![new_chunk]) + } + } + } +} + +#[derive(Debug)] +pub enum AudioDepayloader { + Opus { + depayloader: OpusPacket, + rollover_state: RolloverState, + }, +} + +impl Default for AudioDepayloader { + fn default() -> Self { + Self::new() + } +} + +impl AudioDepayloader { + pub fn new() -> Self { + AudioDepayloader::Opus { + depayloader: OpusPacket, + rollover_state: RolloverState::default(), + } + } + fn depayload( + &mut self, + packet: rtp::packet::Packet, + ) -> Result, DepayloadingError> { + match self { + AudioDepayloader::Opus { + depayloader, + rollover_state, + } => { + let kind = EncodedChunkKind::Audio(AudioCodec::Opus); + let opus_packet = depayloader.depacketize(&packet.payload)?; + + if opus_packet.is_empty() { + return Ok(Vec::new()); + } + + let timestamp = rollover_state.timestamp(packet.header.timestamp); + Ok(vec![EncodedChunk { + data: opus_packet, + pts: Duration::from_secs_f64(timestamp as f64 / 48000.0), + dts: None, + is_keyframe: IsKeyframe::NoKeyframes, + kind, + }]) + } + } + } +} + +#[derive(Default, Debug)] +pub struct RolloverState { + previous_timestamp: Option, + rollover_count: usize, +} + +impl RolloverState { + fn timestamp(&mut self, current_timestamp: u32) -> u64 { + let Some(previous_timestamp) = self.previous_timestamp else { + self.previous_timestamp = Some(current_timestamp); + return current_timestamp as u64; + }; + + let timestamp_diff = u32::abs_diff(previous_timestamp, current_timestamp); + if timestamp_diff >= u32::MAX / 2 { + if previous_timestamp > current_timestamp { + self.rollover_count += 1; + } else { + // We received a packet from before the rollover, so we need to decrement the count + self.rollover_count = self.rollover_count.saturating_sub(1); + } + } + + self.previous_timestamp = Some(current_timestamp); + + (self.rollover_count as u64) * (u32::MAX as u64 + 1) + current_timestamp as u64 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn timestamp_rollover() { + let mut rollover_state = RolloverState::default(); + + let current_timestamp = 1; + assert_eq!( + rollover_state.timestamp(current_timestamp), + current_timestamp as u64 + ); + + let current_timestamp = u32::MAX / 2 + 1; + assert_eq!( + rollover_state.timestamp(current_timestamp), + current_timestamp as u64 + ); + + let current_timestamp = 0; + assert_eq!( + rollover_state.timestamp(current_timestamp), + u32::MAX as u64 + 1 + current_timestamp as u64 + ); + + rollover_state.previous_timestamp = Some(u32::MAX); + let current_timestamp = 1; + assert_eq!( + rollover_state.timestamp(current_timestamp), + 2 * (u32::MAX as u64 + 1) + current_timestamp as u64 + ); + + rollover_state.previous_timestamp = Some(1); + let current_timestamp = u32::MAX; + assert_eq!( + rollover_state.timestamp(current_timestamp), + u32::MAX as u64 + 1 + current_timestamp as u64 + ); + + rollover_state.previous_timestamp = Some(u32::MAX); + let current_timestamp = u32::MAX - 1; + assert_eq!( + rollover_state.timestamp(current_timestamp), + u32::MAX as u64 + 1 + current_timestamp as u64 + ); + + rollover_state.previous_timestamp = Some(u32::MAX - 1); + let current_timestamp = u32::MAX; + assert_eq!( + rollover_state.timestamp(current_timestamp), + u32::MAX as u64 + 1 + current_timestamp as u64 + ); + } +} diff --git a/compositor_pipeline/src/pipeline/whip_whep.rs b/compositor_pipeline/src/pipeline/whip_whep.rs new file mode 100644 index 000000000..4fa1d6053 --- /dev/null +++ b/compositor_pipeline/src/pipeline/whip_whep.rs @@ -0,0 +1,179 @@ +use crate::{error::InitPipelineError, pipeline::input::whip::depayloader::Depayloader}; +use axum::{ + routing::{delete, get, patch, post}, + Router, +}; +use compositor_render::InputId; +use error::WhipServerError; +use reqwest::StatusCode; +use serde_json::json; +use std::{ + collections::HashMap, + net::SocketAddr, + sync::{Arc, Mutex}, + time::{Duration, Instant}, +}; +use tokio::sync::{ + mpsc, + oneshot::{self, Sender}, +}; +use tower_http::cors::CorsLayer; +use tracing::error; +use webrtc::{ + peer_connection::{peer_connection_state::RTCPeerConnectionState, RTCPeerConnection}, + rtp_transceiver::rtp_codec::RTPCodecType, +}; +use whip_handlers::{ + create_whip_session::handle_create_whip_session, + new_whip_ice_candidates::handle_new_whip_ice_candidates, + terminate_whip_session::handle_terminate_whip_session, +}; + +pub mod bearer_token; +mod error; +mod init_peer_connection; +mod whip_handlers; + +use crate::queue::PipelineEvent; + +use super::EncodedChunk; + +pub async fn run_whip_whep_server( + port: u16, + state: Arc, + shutdown_signal_receiver: oneshot::Receiver<()>, + init_result_sender: Sender>, +) { + let app = Router::new() + .route("/status", get((StatusCode::OK, axum::Json(json!({}))))) + .route("/whip/:id", post(handle_create_whip_session)) + .route("/session/:id", patch(handle_new_whip_ice_candidates)) + .route("/session/:id", delete(handle_terminate_whip_session)) + .layer(CorsLayer::permissive()) + .with_state(state.clone()); + + let Ok(listener) = tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await + else { + if let Err(err) = init_result_sender.send(Err(InitPipelineError::WhipWhepServerInitError)) { + error!("Cannot send init WHIP WHEP server result {err:?}"); + } + return; + }; + + if let Err(err) = init_result_sender.send(Ok(())) { + error!("Cannot send init WHIP WHEP server result {err:?}"); + }; + + if let Err(err) = axum::serve(listener, app) + .with_graceful_shutdown(shutdown_signal(shutdown_signal_receiver)) + .await + { + error!("Cannot start WHIP WHEP server: {err:?}"); + }; +} + +async fn shutdown_signal(receiver: oneshot::Receiver<()>) { + if let Err(err) = receiver.await { + error!( + "Error while receiving WHIP WHEP server shutdown signal {:?}", + err + ); + } +} + +#[derive(Debug, Clone)] +pub struct WhipInputConnectionOptions { + pub video_sender: Option>>, + pub audio_sender: Option>>, + pub bearer_token: Option, + pub peer_connection: Option>, + pub start_time_vid: Option, + pub start_time_aud: Option, + pub depayloader: Arc>, +} + +impl WhipInputConnectionOptions { + pub fn get_or_initialize_elapsed_start_time( + &mut self, + track_kind: RTPCodecType, + ) -> Option { + match track_kind { + RTPCodecType::Video => { + let start_time = self.start_time_vid.get_or_insert_with(Instant::now); + Some(start_time.elapsed()) + } + RTPCodecType::Audio => { + let start_time = self.start_time_aud.get_or_insert_with(Instant::now); + Some(start_time.elapsed()) + } + _ => None, + } + } +} + +#[derive(Debug)] +pub struct WhipWhepState { + pub input_connections: Arc>>, + pub stun_servers: Arc>, +} + +impl WhipWhepState { + pub fn new(stun_servers: Arc>) -> Arc { + Arc::new(WhipWhepState { + input_connections: Arc::from(Mutex::new(HashMap::new())), + stun_servers, + }) + } + + pub fn get_input_connection_options( + &self, + input_id: InputId, + ) -> Result { + let connections = self.input_connections.lock().unwrap(); + if let Some(connection) = connections.get(&input_id) { + if let Some(peer_connection) = connection.peer_connection.clone() { + if peer_connection.connection_state() == RTCPeerConnectionState::Connected { + return Err(WhipServerError::InternalError(format!( + "Another stream is currently connected to the given input_id: {input_id:?}. \ + Disconnect the existing stream before starting a new one, or check if the input_id is correct." + ))); + } + } + Ok(connection.clone()) + } else { + Err(WhipServerError::NotFound(format!("{input_id:?} not found"))) + } + } + + pub async fn update_peer_connection( + &self, + input_id: InputId, + peer_connection: Arc, + ) -> Result<(), WhipServerError> { + let mut connections = self.input_connections.lock().unwrap(); + if let Some(connection) = connections.get_mut(&input_id) { + connection.peer_connection = Some(peer_connection); + Ok(()) + } else { + Err(WhipServerError::InternalError(format!( + "Peer connection with input_id: {:?} does not exist", + input_id.0 + ))) + } + } + + pub fn get_time_elapsed_from_input_start( + &self, + input_id: InputId, + track_kind: RTPCodecType, + ) -> Option { + let mut connections = self.input_connections.lock().unwrap(); + match connections.get_mut(&input_id) { + Some(connection) => connection.get_or_initialize_elapsed_start_time(track_kind), + None => { + error!("{input_id:?} not found"); + None + } + } + } +} diff --git a/compositor_pipeline/src/pipeline/whip_whep/bearer_token.rs b/compositor_pipeline/src/pipeline/whip_whep/bearer_token.rs new file mode 100644 index 000000000..5f5e8aa9e --- /dev/null +++ b/compositor_pipeline/src/pipeline/whip_whep/bearer_token.rs @@ -0,0 +1,52 @@ +use std::{fmt::Write, time::Duration}; + +use axum::http::HeaderValue; +use rand::{rngs::StdRng, thread_rng, Rng, RngCore, SeedableRng}; +use tokio::time::sleep; +use tracing::error; + +use crate::pipeline::whip_whep::error::WhipServerError; + +pub fn generate_token() -> String { + let mut bytes = [0u8; 16]; + thread_rng().fill_bytes(&mut bytes); + bytes.iter().fold(String::new(), |mut acc, byte| { + if let Err(err) = write!(acc, "{byte:02X}") { + error!("Cannot generate token: {err:?}") + } + acc + }) +} + +pub async fn validate_token( + expected_token: Option, + auth_header_value: Option<&HeaderValue>, +) -> Result<(), WhipServerError> { + match (expected_token, auth_header_value) { + (Some(bearer_token), Some(auth_str)) => { + let auth_str = auth_str.to_str().map_err(|_| { + WhipServerError::Unauthorized("Invalid UTF-8 in header".to_string()) + })?; + + if let Some(token_from_header) = auth_str.strip_prefix("Bearer ") { + if token_from_header == bearer_token { + Ok(()) + } else { + let mut rng = StdRng::from_entropy(); + let nanos = rng.gen_range(0..1000); + sleep(Duration::from_nanos(nanos)).await; + Err(WhipServerError::Unauthorized( + "Invalid or mismatched token provided".to_string(), + )) + } + } else { + Err(WhipServerError::Unauthorized( + "Authorization header format incorrect".to_string(), + )) + } + } + _ => Err(WhipServerError::Unauthorized( + "Expected token and authorization header required".to_string(), + )), + } +} diff --git a/compositor_pipeline/src/pipeline/whip_whep/error.rs b/compositor_pipeline/src/pipeline/whip_whep/error.rs new file mode 100644 index 000000000..d57bee2c4 --- /dev/null +++ b/compositor_pipeline/src/pipeline/whip_whep/error.rs @@ -0,0 +1,47 @@ +use axum::response::{IntoResponse, Response}; +use reqwest::StatusCode; + +#[derive(Debug)] +pub enum WhipServerError { + BadRequest(String), + InternalError(String), + Unauthorized(String), + NotFound(String), +} + +impl From for WhipServerError +where + T: std::error::Error + 'static, +{ + fn from(err: T) -> Self { + WhipServerError::InternalError(err.to_string()) + } +} + +impl std::fmt::Display for WhipServerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + WhipServerError::InternalError(message) => message, + WhipServerError::BadRequest(message) => message, + WhipServerError::Unauthorized(message) => message, + WhipServerError::NotFound(message) => message, + }) + } +} + +impl IntoResponse for WhipServerError { + fn into_response(self) -> Response { + match self { + WhipServerError::InternalError(message) => { + (StatusCode::INTERNAL_SERVER_ERROR, message).into_response() + } + WhipServerError::BadRequest(message) => { + (StatusCode::BAD_REQUEST, message).into_response() + } + WhipServerError::Unauthorized(message) => { + (StatusCode::UNAUTHORIZED, message).into_response() + } + WhipServerError::NotFound(message) => (StatusCode::NOT_FOUND, message).into_response(), + } + } +} diff --git a/compositor_pipeline/src/pipeline/whip_whep/init_peer_connection.rs b/compositor_pipeline/src/pipeline/whip_whep/init_peer_connection.rs new file mode 100644 index 000000000..e5c241cdc --- /dev/null +++ b/compositor_pipeline/src/pipeline/whip_whep/init_peer_connection.rs @@ -0,0 +1,100 @@ +use std::sync::Arc; + +use webrtc::{ + api::{ + interceptor_registry::register_default_interceptors, + media_engine::{MediaEngine, MIME_TYPE_H264, MIME_TYPE_OPUS}, + APIBuilder, + }, + ice_transport::ice_server::RTCIceServer, + interceptor::registry::Registry, + peer_connection::{configuration::RTCConfiguration, RTCPeerConnection}, + rtp_transceiver::{ + rtp_codec::{RTCRtpCodecCapability, RTCRtpCodecParameters, RTPCodecType}, + rtp_transceiver_direction::RTCRtpTransceiverDirection, + RTCRtpTransceiverInit, + }, +}; + +use super::error::WhipServerError; + +pub async fn init_peer_connection( + add_video_track: bool, + add_audio_track: bool, + stun_servers: Vec, +) -> Result, WhipServerError> { + let mut media_engine = MediaEngine::default(); + + media_engine.register_codec( + RTCRtpCodecParameters { + capability: RTCRtpCodecCapability { + mime_type: MIME_TYPE_H264.to_owned(), + clock_rate: 90000, + channels: 0, + sdp_fmtp_line: "".to_owned(), + rtcp_feedback: vec![], + }, + payload_type: 96, + ..Default::default() + }, + RTPCodecType::Video, + )?; + + media_engine.register_codec( + RTCRtpCodecParameters { + capability: RTCRtpCodecCapability { + mime_type: MIME_TYPE_OPUS.to_owned(), + clock_rate: 48000, + channels: 2, + sdp_fmtp_line: "".to_owned(), + rtcp_feedback: vec![], + }, + payload_type: 97, + ..Default::default() + }, + RTPCodecType::Audio, + )?; + + let mut registry = Registry::new(); + + registry = register_default_interceptors(registry, &mut media_engine)?; + + let api = APIBuilder::new() + .with_media_engine(media_engine) + .with_interceptor_registry(registry) + .build(); + + let config = RTCConfiguration { + ice_servers: vec![RTCIceServer { + urls: stun_servers, + ..Default::default() + }], + ..Default::default() + }; + + let peer_connection = Arc::new(api.new_peer_connection(config).await?); + if add_video_track { + peer_connection + .add_transceiver_from_kind( + RTPCodecType::Audio, + Some(RTCRtpTransceiverInit { + direction: RTCRtpTransceiverDirection::Recvonly, + send_encodings: vec![], + }), + ) + .await?; + } + if add_audio_track { + peer_connection + .add_transceiver_from_kind( + RTPCodecType::Video, + Some(RTCRtpTransceiverInit { + direction: RTCRtpTransceiverDirection::Recvonly, + send_encodings: vec![], + }), + ) + .await?; + } + + Ok(peer_connection) +} diff --git a/compositor_pipeline/src/pipeline/whip_whep/whip_handlers.rs b/compositor_pipeline/src/pipeline/whip_whep/whip_handlers.rs new file mode 100644 index 000000000..f46cc38e8 --- /dev/null +++ b/compositor_pipeline/src/pipeline/whip_whep/whip_handlers.rs @@ -0,0 +1,3 @@ +pub mod create_whip_session; +pub mod new_whip_ice_candidates; +pub mod terminate_whip_session; diff --git a/compositor_pipeline/src/pipeline/whip_whep/whip_handlers/create_whip_session.rs b/compositor_pipeline/src/pipeline/whip_whep/whip_handlers/create_whip_session.rs new file mode 100644 index 000000000..8b424adc7 --- /dev/null +++ b/compositor_pipeline/src/pipeline/whip_whep/whip_handlers/create_whip_session.rs @@ -0,0 +1,154 @@ +use crate::pipeline::{ + input::whip::process_track_stream, + whip_whep::{ + bearer_token::validate_token, error::WhipServerError, init_peer_connection, WhipWhepState, + }, +}; +use axum::{ + body::Body, + extract::{Path, State}, + http::{HeaderMap, Response, StatusCode}, +}; +use compositor_render::InputId; +use init_peer_connection::init_peer_connection; +use std::{sync::Arc, time::Duration}; +use tokio::{sync::watch, time::timeout}; +use tracing::{debug, info}; +use urlencoding::encode; +use webrtc::{ + ice_transport::ice_gatherer_state::RTCIceGathererState, + peer_connection::{sdp::session_description::RTCSessionDescription, RTCPeerConnection}, + rtp_transceiver::rtp_codec::RTPCodecType, +}; + +pub async fn handle_create_whip_session( + Path(id): Path, + State(state): State>, + headers: HeaderMap, + offer: String, +) -> Result, WhipServerError> { + let input_id = InputId(Arc::from(id.clone())); + + validate_sdp_content_type(&headers)?; + + let input_components = state.get_input_connection_options(input_id.clone())?; + + validate_token(input_components.bearer_token, headers.get("Authorization")).await?; + + //Deleting previous peer_connection on this input which was not in Connected state + if let Some(connection) = input_components.peer_connection { + if let Err(err) = connection.close().await { + return Err(WhipServerError::InternalError(format!( + "Cannot close previously existing peer connection {input_id:?}: {err:?}" + ))); + } + } + + let peer_connection = init_peer_connection( + input_components.video_sender.is_some(), + input_components.audio_sender.is_some(), + state.stun_servers.to_vec(), + ) + .await?; + + state + .update_peer_connection(input_id.clone(), peer_connection.clone()) + .await?; + + peer_connection.on_track(Box::new(move |track, _, _| { + let track_kind = track.kind(); + let state_clone = state.clone(); + let input_id_clone = input_id.clone(); + let depayloader_clone = input_components.depayloader.clone(); + let sender = match track_kind { + RTPCodecType::Video => input_components.video_sender.clone(), + RTPCodecType::Audio => input_components.audio_sender.clone(), + _ => { + debug!("RTPCodecType not supported!"); + None + } + }; + + //tokio::spawn is necessary to concurrently process audio and video track + Box::pin(async { + if let Some(sender) = sender { + tokio::spawn(async move { + process_track_stream( + track, + state_clone, + input_id_clone, + depayloader_clone, + sender, + ) + .await; + }); + } + }) + })); + + peer_connection.on_ice_connection_state_change(Box::new(move |state| { + info!("ICE connection state changed: {state:?}"); + Box::pin(async {}) + })); + + let description = RTCSessionDescription::offer(offer)?; + + peer_connection.set_remote_description(description).await?; + let answer = peer_connection.create_answer(None).await?; + peer_connection.set_local_description(answer).await?; + gather_ice_candidates_for_one_second(peer_connection.clone()).await; + + let Some(sdp) = peer_connection.local_description().await else { + return Err(WhipServerError::InternalError( + "Local description is not set, cannot read it".to_string(), + )); + }; + debug!("Sending SDP answer: {sdp:?}"); + + let body = Body::from(sdp.sdp.to_string()); + let response = Response::builder() + .status(StatusCode::CREATED) + .header("Content-Type", "application/sdp") + .header("Access-Control-Expose-Headers", "Location") + .header("Location", format!("/session/{}", encode(&id))) + .body(body)?; + Ok(response) +} + +pub fn validate_sdp_content_type(headers: &HeaderMap) -> Result<(), WhipServerError> { + if let Some(content_type) = headers.get("Content-Type") { + if content_type.as_bytes() != b"application/sdp" { + return Err(WhipServerError::InternalError( + "Invalid Content-Type".to_string(), + )); + } + } else { + return Err(WhipServerError::BadRequest( + "Missing Content-Type header".to_string(), + )); + } + Ok(()) +} + +pub async fn gather_ice_candidates_for_one_second(peer_connection: Arc) { + let (sender, mut receiver) = watch::channel(RTCIceGathererState::Unspecified); + + peer_connection.on_ice_gathering_state_change(Box::new(move |gatherer_state| { + if let Err(err) = sender.send(gatherer_state) { + debug!("Cannot send gathering state: {err:?}"); + }; + Box::pin(async {}) + })); + + let gather_candidates = async { + while receiver.changed().await.is_ok() { + if *receiver.borrow() == RTCIceGathererState::Complete { + break; + } + } + }; + + if let Err(err) = timeout(Duration::from_secs(1), gather_candidates).await { + debug!("Maximum time for gathering candidate has elapsed: {err:?}"); + } +} diff --git a/compositor_pipeline/src/pipeline/whip_whep/whip_handlers/new_whip_ice_candidates.rs b/compositor_pipeline/src/pipeline/whip_whep/whip_handlers/new_whip_ice_candidates.rs new file mode 100644 index 000000000..53458104e --- /dev/null +++ b/compositor_pipeline/src/pipeline/whip_whep/whip_handlers/new_whip_ice_candidates.rs @@ -0,0 +1,81 @@ +use crate::pipeline::whip_whep::{ + bearer_token::validate_token, error::WhipServerError, WhipWhepState, +}; +use axum::{ + extract::{Path, State}, + http::{HeaderMap, StatusCode}, +}; +use compositor_render::InputId; + +use std::sync::Arc; +use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit; + +pub async fn handle_new_whip_ice_candidates( + Path(id): Path, + State(state): State>, + headers: HeaderMap, + sdp_fragment_content: String, +) -> Result { + let content_type = headers + .get("Content-Type") + .and_then(|value| value.to_str().ok()) + .unwrap_or(""); + + if content_type != "application/trickle-ice-sdpfrag" { + return Err(WhipServerError::BadRequest( + "Invalid Content-Type".to_owned(), + )); + } + + let input_id = InputId(Arc::from(id)); + let (bearer_token, peer_connection) = { + let connections = state.input_connections.lock().unwrap(); + connections + .get(&input_id) + .map(|conn| (conn.bearer_token.clone(), conn.peer_connection.clone())) + .ok_or_else(|| WhipServerError::NotFound(format!("{input_id:?} not found")))? + }; + + validate_token(bearer_token, headers.get("Authorization")).await?; + + if let Some(peer_connection) = peer_connection { + for candidate in ice_fragment_unmarshal(&sdp_fragment_content) { + if let Err(err) = peer_connection.add_ice_candidate(candidate.clone()).await { + return Err(WhipServerError::BadRequest(format!( + "Cannot add ice_candidate {candidate:?} for input {input_id:?}: {err:?}" + ))); + } + } + } else { + return Err(WhipServerError::InternalError(format!( + "None peer connection for {input_id:?}" + ))); + } + + Ok(StatusCode::NO_CONTENT) +} + +pub fn ice_fragment_unmarshal(sdp_fragment_content: &str) -> Vec { + let lines = sdp_fragment_content.split("\n"); + let mut candidates = Vec::new(); + let mut mid = None; + let mut mid_num = None; + + for line in lines { + if line.starts_with("a=mid:") { + mid = line + .split_once(':') + .map(|(_, value)| value.trim().to_string()); + mid_num = mid_num.map_or(Some(0), |index| Some(index + 1)); + } + if line.starts_with("a=candidate:") { + candidates.push(RTCIceCandidateInit { + candidate: line[2..].to_string(), + sdp_mid: mid.clone(), + sdp_mline_index: mid_num, + ..Default::default() + }); + } + } + candidates +} diff --git a/compositor_pipeline/src/pipeline/whip_whep/whip_handlers/terminate_whip_session.rs b/compositor_pipeline/src/pipeline/whip_whep/whip_handlers/terminate_whip_session.rs new file mode 100644 index 000000000..1d09ae001 --- /dev/null +++ b/compositor_pipeline/src/pipeline/whip_whep/whip_handlers/terminate_whip_session.rs @@ -0,0 +1,48 @@ +use crate::pipeline::whip_whep::{ + bearer_token::validate_token, error::WhipServerError, WhipWhepState, +}; +use axum::{ + extract::{Path, State}, + http::{HeaderMap, StatusCode}, +}; +use compositor_render::InputId; +use std::sync::Arc; +use tracing::info; + +pub async fn handle_terminate_whip_session( + Path(id): Path, + State(state): State>, + headers: HeaderMap, +) -> Result { + let input_id = InputId(Arc::from(id)); + + let bearer_token = { + let connections = state.input_connections.lock().unwrap(); + connections + .get(&input_id) + .map(|connection| connection.bearer_token.clone()) + .ok_or_else(|| WhipServerError::NotFound(format!("{input_id:?} not found")))? + }; + + validate_token(bearer_token, headers.get("Authorization")).await?; + + let peer_connection = { + let mut connections = state.input_connections.lock().unwrap(); + if let Some(connection) = connections.get_mut(&input_id) { + connection.peer_connection.take() + } else { + return Err(WhipServerError::NotFound(format!("{input_id:?} not found"))); + } + }; + + if let Some(peer_connection) = peer_connection { + peer_connection.close().await?; + } else { + return Err(WhipServerError::InternalError(format!( + "None peer connection for {input_id:?}" + ))); + } + + info!("WHIP session terminated for input: {:?}", input_id); + Ok(StatusCode::OK) +} diff --git a/generate/src/compositor_instance.rs b/generate/src/compositor_instance.rs index 8e5c961e5..1841982ce 100644 --- a/generate/src/compositor_instance.rs +++ b/generate/src/compositor_instance.rs @@ -34,6 +34,7 @@ impl CompositorInstance { config.api_port = api_port; config.queue_options.ahead_of_time_processing = true; config.queue_options.never_drop_output_frames = true; + config.start_whip_whep = false; info!("Starting LiveCompositor Integration Test with config:\n{config:#?}",); diff --git a/integration_tests/examples/manual_graphics_initialization.rs b/integration_tests/examples/manual_graphics_initialization.rs index ab958a415..759d97cdd 100644 --- a/integration_tests/examples/manual_graphics_initialization.rs +++ b/integration_tests/examples/manual_graphics_initialization.rs @@ -38,6 +38,8 @@ fn main() { wgpu_features: config.required_wgpu_features, load_system_fonts: Some(true), tokio_rt: Some(Arc::new(Runtime::new().unwrap())), + whip_whep_server_port: Some(config.whip_whep_server_port), + start_whip_whep: config.start_whip_whep, }) .unwrap(); } diff --git a/integration_tests/examples/raw_channel_input.rs b/integration_tests/examples/raw_channel_input.rs index e2cf3d716..0a321f012 100644 --- a/integration_tests/examples/raw_channel_input.rs +++ b/integration_tests/examples/raw_channel_input.rs @@ -55,6 +55,8 @@ fn main() { wgpu_features: config.required_wgpu_features, load_system_fonts: Some(true), wgpu_ctx: Some(ctx), + whip_whep_server_port: Some(config.whip_whep_server_port), + start_whip_whep: config.start_whip_whep, tokio_rt: Some(Arc::new(Runtime::new().unwrap())), }) .unwrap_or_else(|err| { diff --git a/integration_tests/examples/raw_channel_output.rs b/integration_tests/examples/raw_channel_output.rs index fdac94e48..c16641ec2 100644 --- a/integration_tests/examples/raw_channel_output.rs +++ b/integration_tests/examples/raw_channel_output.rs @@ -68,6 +68,8 @@ fn main() { wgpu_features: config.required_wgpu_features, load_system_fonts: Some(true), wgpu_ctx: Some(ctx), + whip_whep_server_port: Some(config.whip_whep_server_port), + start_whip_whep: config.start_whip_whep, tokio_rt: Some(Arc::new(Runtime::new().unwrap())), }) .unwrap_or_else(|err| { diff --git a/integration_tests/examples/whip_server.rs b/integration_tests/examples/whip_server.rs new file mode 100644 index 000000000..63fd8c0c0 --- /dev/null +++ b/integration_tests/examples/whip_server.rs @@ -0,0 +1,131 @@ +use anyhow::Result; +use compositor_api::types::Resolution; +use serde_json::json; +use std::{thread::sleep, time::Duration}; +use tracing::info; + +use integration_tests::{ + examples::{self, run_example}, + gstreamer::start_gst_receive_tcp, +}; + +const VIDEO_RESOLUTION: Resolution = Resolution { + width: 1280, + height: 720, +}; + +const IP: &str = "127.0.0.1"; +const OUTPUT_PORT: u16 = 8012; + +fn main() { + run_example(client_code); +} + +fn client_code() -> Result<()> { + let token_input_1 = examples::post( + "input/input_1/register", + &json!({ + "type": "whip", + "video": { + "decoder": "ffmpeg_h264" + }, + "audio": { + "decoder": "opus" + }, + }), + )? + .json::(); + + if let Ok(token) = token_input_1 { + info!("Bearer token for input_1: {}", token["bearer_token"]); + } + + let token_input_2 = examples::post( + "input/input_2/register", + &json!({ + "type": "whip", + "video": { + "decoder": "ffmpeg_h264" + }, + }), + )? + .json::(); + + if let Ok(token) = token_input_2 { + info!("Bearer token for input_2: {}", token["bearer_token"]); + } + + let token_input_3 = examples::post( + "input/input_3/register", + &json!({ + "type": "whip", + "video": { + "decoder": "ffmpeg_h264" + }, + }), + )? + .json::(); + + if let Ok(token) = token_input_3 { + info!("Bearer token for input_3: {}", token["bearer_token"]); + } + + examples::post( + "output/output_1/register", + &json!({ + "type": "rtp_stream", + "port": OUTPUT_PORT, + "transport_protocol": "tcp_server", + "video": { + "resolution": { + "width": VIDEO_RESOLUTION.width, + "height": VIDEO_RESOLUTION.height, + }, + "encoder": { + "type": "ffmpeg_h264", + "preset": "ultrafast" + }, + "initial": { + "root": { + "type": "view", + "background_color": "#4d4d4dff", + "children": [ + { + "type": "rescaler", + "child": { "type": "input_stream", "input_id": "input_1" } + }, + { + "type": "rescaler", + "child": { "type": "input_stream", "input_id": "input_2" } + }, + { + "type": "rescaler", + "child": { "type": "input_stream", "input_id": "input_3" } + } + ] + } + }, + + }, + "audio": { + "encoder": { + "type": "opus", + "channels": "stereo", + }, + "initial": { + "inputs": [ + {"input_id": "input_1"} + ] + } + } + }), + )?; + + std::thread::sleep(Duration::from_millis(500)); + start_gst_receive_tcp(IP, OUTPUT_PORT, true, true)?; + examples::post("start", &json!({}))?; + sleep(Duration::from_secs(300)); + examples::post("output/output_1/unregister", &json!({}))?; + + Ok(()) +} diff --git a/integration_tests/src/bin/benchmark/main.rs b/integration_tests/src/bin/benchmark/main.rs index eea731ad5..501590fb1 100644 --- a/integration_tests/src/bin/benchmark/main.rs +++ b/integration_tests/src/bin/benchmark/main.rs @@ -222,6 +222,8 @@ fn run_single_test(ctx: GraphicsContext, bench_config: SingleBenchConfig) -> boo stream_fallback_timeout: Duration::from_millis(500), tokio_rt: None, stun_servers: Vec::new().into(), + whip_whep_server_port: None, + start_whip_whep: false, }); let Ok((pipeline, _event_loop)) = pipeline_result else { diff --git a/integration_tests/src/compositor_instance.rs b/integration_tests/src/compositor_instance.rs index bb3967ff6..ef4288c8e 100644 --- a/integration_tests/src/compositor_instance.rs +++ b/integration_tests/src/compositor_instance.rs @@ -32,12 +32,13 @@ impl Drop for CompositorInstance { } impl CompositorInstance { - /// api port in config is overwritten + /// api port and start_whip_whep are overwritten in config pub fn start(config: Option) -> Self { init_compositor_prerequisites(); let api_port = get_free_port(); let mut config = config.unwrap_or(read_config()); config.api_port = api_port; + config.start_whip_whep = false; info!("Starting LiveCompositor Integration Test with config:\n{config:#?}",); diff --git a/src/config.rs b/src/config.rs index 4d24287b7..180a5119b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -27,6 +27,8 @@ pub struct Config { pub stun_servers: Arc>, pub required_wgpu_features: WgpuFeatures, pub load_system_fonts: bool, + pub whip_whep_server_port: u16, + pub start_whip_whep: bool, } #[derive(Debug, Clone)] @@ -197,6 +199,18 @@ fn try_read_config() -> Result { Err(_) => true, }; + let whip_whep_server_port = match env::var("LIVE_COMPOSITOR_WHIP_WHEP_SERVER_PORT") { + Ok(whip_whep_port) => whip_whep_port + .parse::() + .map_err(|_| "LIVE_COMPOSITOR_WHIP_WHEP_SERVER_PORT has to be valid port number")?, + Err(_) => 9000, + }; + + let start_whip_whep = match env::var("LIVE_COMPOSITOR_START_WHIP_WHEP_SERVER") { + Ok(enable) => bool_env_from_str(&enable).unwrap_or(true), + Err(_) => true, + }; + let log_file = match env::var("LIVE_COMPOSITOR_LOG_FILE") { Ok(path) => Some(Arc::from(PathBuf::from(path))), Err(_) => None, @@ -243,6 +257,8 @@ fn try_read_config() -> Result { stun_servers, required_wgpu_features, load_system_fonts, + whip_whep_server_port, + start_whip_whep, }; Ok(config) } diff --git a/src/routes/register_request.rs b/src/routes/register_request.rs index a0f0b50d8..61cce9c95 100644 --- a/src/routes/register_request.rs +++ b/src/routes/register_request.rs @@ -14,7 +14,7 @@ use compositor_api::{ error::ApiError, types::{ DeckLink, ImageSpec, InputId, Mp4Input, Mp4Output, OutputId, RendererId, RtpInput, - RtpOutput, ShaderSpec, WebRendererSpec, WhipOutput, + RtpOutput, ShaderSpec, WebRendererSpec, WhipInput, WhipOutput, }, }; @@ -25,6 +25,7 @@ use super::ApiState; pub enum RegisterInput { RtpStream(RtpInput), Mp4(Mp4Input), + Whip(WhipInput), #[serde(rename = "decklink")] DeckLink(DeckLink), } @@ -54,6 +55,9 @@ pub(super) async fn handle_input( RegisterInput::DeckLink(decklink) => { Pipeline::register_input(&api.pipeline, input_id.into(), decklink.try_into()?)? } + RegisterInput::Whip(whip) => { + Pipeline::register_input(&api.pipeline, input_id.into(), whip.try_into()?)? + } }; match response { InputInitInfo::Rtp { port } => Ok(Response::RegisteredPort { @@ -66,6 +70,7 @@ pub(super) async fn handle_input( video_duration_ms: video_duration.map(|v| v.as_millis() as u64), audio_duration_ms: audio_duration.map(|a| a.as_millis() as u64), }), + InputInitInfo::Whip { bearer_token } => Ok(Response::BearerToken { bearer_token }), InputInitInfo::Other => Ok(Response::Ok {}), } }) diff --git a/src/state.rs b/src/state.rs index a6b46a25c..363102dd4 100644 --- a/src/state.rs +++ b/src/state.rs @@ -22,6 +22,9 @@ pub enum Response { video_duration_ms: Option, audio_duration_ms: Option, }, + BearerToken { + bearer_token: String, + }, } impl IntoResponse for Response { @@ -51,6 +54,8 @@ impl ApiState { stun_servers, required_wgpu_features, load_system_fonts, + start_whip_whep, + whip_whep_server_port, .. } = config.clone(); let (pipeline, event_loop) = Pipeline::new(pipeline::Options { @@ -64,6 +69,8 @@ impl ApiState { wgpu_features: required_wgpu_features, wgpu_ctx: None, load_system_fonts: Some(load_system_fonts), + start_whip_whep, + whip_whep_server_port: Some(whip_whep_server_port), tokio_rt: Some(runtime), })?; Ok((