From 1d060ead3e300e6040ab678ab737a74c88f80b0f Mon Sep 17 00:00:00 2001 From: Jerzy Wilczek Date: Tue, 19 Nov 2024 17:24:34 +0100 Subject: [PATCH] Add hardware encoding to the compositor API. --- .../src/types/from_register_input.rs | 23 ++- compositor_api/src/types/register_input.rs | 25 ++- compositor_pipeline/src/pipeline/input/mp4.rs | 11 +- .../src/pipeline/input/mp4/mp4_file_reader.rs | 8 +- .../examples/encoded_channel_output.rs | 1 + integration_tests/examples/mp4_hw_dec.rs | 66 ++++++ .../examples/raw_channel_output.rs | 1 + integration_tests/examples/vulkan.rs | 192 +++++------------- 8 files changed, 171 insertions(+), 156 deletions(-) create mode 100644 integration_tests/examples/mp4_hw_dec.rs diff --git a/compositor_api/src/types/from_register_input.rs b/compositor_api/src/types/from_register_input.rs index fa2e0ab51..985924d01 100644 --- a/compositor_api/src/types/from_register_input.rs +++ b/compositor_api/src/types/from_register_input.rs @@ -101,15 +101,27 @@ impl TryFrom for pipeline::RegisterInputOptions { return Err(TypeError::new(NO_VIDEO_AUDIO_SPEC)); } + #[cfg(not(feature = "vk-video"))] + const NO_VULKAN_VIDEO: &str = + "Requested `vulkan_video` decoder, but this binary was compiled without the `vk-video` feature."; + let rtp_stream = input::rtp::RtpStream { video: video .as_ref() .map(|video| { Ok(input::rtp::InputVideoStream { - options: match video { - InputRtpVideoOptions::FfmepgH264 => decoder::VideoDecoderOptions { + options: match video.decoder { + VideoDecoder::FfmpegH264 => decoder::VideoDecoderOptions { decoder: pipeline::VideoDecoder::FFmpegH264, }, + + #[cfg(feature = "vk-video")] + VideoDecoder::VulkanVideo => decoder::VideoDecoderOptions { + decoder: pipeline::VideoDecoder::VulkanVideoH264, + }, + + #[cfg(not(feature = "vk-video"))] + VideoDecoder::VulkanVideo => return Err(TypeError::new()), }, }) }) @@ -146,6 +158,7 @@ impl TryFrom for pipeline::RegisterInputOptions { required, offset_ms, should_loop, + video_decoder, } = value; const BAD_URL_PATH_SPEC: &str = @@ -165,10 +178,16 @@ impl TryFrom for pipeline::RegisterInputOptions { buffer_duration: None, }; + let video_decoder = match video_decoder.unwrap_or(VideoDecoder::FfmpegH264) { + VideoDecoder::FfmpegH264 => pipeline::VideoDecoder::FFmpegH264, + VideoDecoder::VulkanVideo => pipeline::VideoDecoder::VulkanVideoH264, + }; + Ok(pipeline::RegisterInputOptions { input_options: input::InputOptions::Mp4(input::mp4::Mp4Options { source, should_loop: should_loop.unwrap_or(false), + video_decoder, }), queue_options, }) diff --git a/compositor_api/src/types/register_input.rs b/compositor_api/src/types/register_input.rs index e66f14fc4..d4c068ec1 100644 --- a/compositor_api/src/types/register_input.rs +++ b/compositor_api/src/types/register_input.rs @@ -45,6 +45,8 @@ pub struct Mp4Input { /// Offset in milliseconds relative to the pipeline start (start request). If offset is /// not defined then stream is synchronized based on the first frames delivery time. pub offset_ms: Option, + /// (**default=`ffmpeg_h264`**) The decoder to use for decoding video. + pub video_decoder: Option, } /// Capture streams from devices connected to Blackmagic DeckLink card. @@ -122,8 +124,25 @@ pub enum InputRtpAudioOptions { } #[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] -#[serde(tag = "decoder", rename_all = "snake_case", deny_unknown_fields)] -pub enum InputRtpVideoOptions { +#[serde(rename_all = "snake_case", deny_unknown_fields)] +pub struct InputRtpVideoOptions { + pub decoder: VideoDecoder, +} + +#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] +#[serde(rename_all = "snake_case", deny_unknown_fields)] +pub enum VideoDecoder { + /// Use the software decoder based on ffmpeg. #[serde(rename = "ffmpeg_h264")] - FfmepgH264, + FfmpegH264, + + /// Use hardware decoder based on Vulkan Video. + /// + /// This should be faster and more scalable than teh ffmpeg decoder, if the hardware and OS + /// support it. + /// + /// This requires hardware that supports Vulkan Video. Another requirement is this program has + /// to be compiled with the `vk-video` feature enabled. + #[serde(rename = "vulkan_video")] + VulkanVideo, } diff --git a/compositor_pipeline/src/pipeline/input/mp4.rs b/compositor_pipeline/src/pipeline/input/mp4.rs index c23ef87f4..1f1306f9a 100644 --- a/compositor_pipeline/src/pipeline/input/mp4.rs +++ b/compositor_pipeline/src/pipeline/input/mp4.rs @@ -6,7 +6,10 @@ use crossbeam_channel::Receiver; use tracing::error; use crate::{ - pipeline::decoder::{AudioDecoderOptions, VideoDecoderOptions}, + pipeline::{ + decoder::{AudioDecoderOptions, VideoDecoderOptions}, + VideoDecoder, + }, queue::PipelineEvent, }; @@ -20,6 +23,7 @@ pub mod mp4_file_reader; pub struct Mp4Options { pub source: Source, pub should_loop: bool, + pub video_decoder: VideoDecoder, } pub(crate) enum Mp4ReaderOptions { @@ -96,13 +100,16 @@ impl Mp4 { should_loop: options.should_loop, }, input_id.clone(), + options.video_decoder, )?; let (video_reader, video_receiver) = match video { Some((reader, receiver)) => { let input_receiver = VideoInputReceiver::Encoded { chunk_receiver: receiver, - decoder_options: reader.decoder_options(), + decoder_options: VideoDecoderOptions { + decoder: options.video_decoder, + }, }; (Some(reader), Some(input_receiver)) } diff --git a/compositor_pipeline/src/pipeline/input/mp4/mp4_file_reader.rs b/compositor_pipeline/src/pipeline/input/mp4/mp4_file_reader.rs index 205c76d6d..fedd15b2e 100644 --- a/compositor_pipeline/src/pipeline/input/mp4/mp4_file_reader.rs +++ b/compositor_pipeline/src/pipeline/input/mp4/mp4_file_reader.rs @@ -125,6 +125,7 @@ impl Mp4FileReader { pub(crate) fn new_video( options: Mp4ReaderOptions, input_id: InputId, + video_decoder: VideoDecoder, ) -> Result, ChunkReceiver)>, Mp4Error> { let stop_thread = Arc::new(AtomicBool::new(false)); let span = span!(Level::INFO, "MP4 video", input_id = input_id.to_string()); @@ -136,7 +137,7 @@ impl Mp4FileReader { Self::new( input_file, size, - Self::find_h264_info, + |r| Self::find_h264_info(r, video_decoder), None, stop_thread, span, @@ -153,7 +154,7 @@ impl Mp4FileReader { Self::new( reader, size, - Self::find_h264_info, + |r| Self::find_h264_info(r, video_decoder), Some(fragment_receiver), stop_thread, span, @@ -165,6 +166,7 @@ impl Mp4FileReader { fn find_h264_info( reader: &mp4::Mp4Reader, + video_decoder: VideoDecoder, ) -> Option Bytes>> { let (&track_id, track, avc) = reader.tracks().iter().find_map(|(id, track)| { let track_type = track.track_type().ok()?; @@ -234,7 +236,7 @@ impl Mp4FileReader { }; let decoder_options = VideoDecoderOptions { - decoder: VideoDecoder::FFmpegH264, + decoder: video_decoder, }; Some(TrackInfo { diff --git a/integration_tests/examples/encoded_channel_output.rs b/integration_tests/examples/encoded_channel_output.rs index fdf5e6d26..715235591 100644 --- a/integration_tests/examples/encoded_channel_output.rs +++ b/integration_tests/examples/encoded_channel_output.rs @@ -91,6 +91,7 @@ fn main() { input_options: InputOptions::Mp4(Mp4Options { source: Source::File(root_dir.join(BUNNY_FILE_PATH)), should_loop: false, + video_decoder: compositor_pipeline::pipeline::VideoDecoder::FFmpegH264, }), queue_options: QueueInputOptions { required: true, diff --git a/integration_tests/examples/mp4_hw_dec.rs b/integration_tests/examples/mp4_hw_dec.rs new file mode 100644 index 000000000..cbbd4c5b1 --- /dev/null +++ b/integration_tests/examples/mp4_hw_dec.rs @@ -0,0 +1,66 @@ +use anyhow::Result; +use compositor_api::types::Resolution; +use serde_json::json; +use std::time::Duration; + +use integration_tests::{ + examples::{self, run_example}, + ffmpeg::start_ffmpeg_receive, +}; + +const VIDEO_RESOLUTION: Resolution = Resolution { + width: 1280, + height: 720, +}; + +const IP: &str = "127.0.0.1"; +const OUTPUT_PORT: u16 = 8004; + +fn main() { + run_example(client_code); +} + +fn client_code() -> Result<()> { + start_ffmpeg_receive(Some(OUTPUT_PORT), None)?; + + examples::post( + "input/input_1/register", + &json!({ + "type": "mp4", + "path": "examples/assets/BigBuckBunny.mp4", + "video_decoder": "vulkan_video", + }), + )?; + + examples::post( + "output/output_1/register", + &json!({ + "type": "rtp_stream", + "port": OUTPUT_PORT, + "ip": IP, + "video": { + "resolution": { + "width": VIDEO_RESOLUTION.width, + "height": VIDEO_RESOLUTION.height, + }, + "encoder": { + "type": "ffmpeg_h264", + "preset": "ultrafast" + }, + "initial": { + "root": { + "id": "input_1", + "type": "input_stream", + "input_id": "input_1", + } + } + } + }), + )?; + + std::thread::sleep(Duration::from_millis(500)); + + examples::post("start", &json!({}))?; + + Ok(()) +} diff --git a/integration_tests/examples/raw_channel_output.rs b/integration_tests/examples/raw_channel_output.rs index 3b21da594..ec8e11b74 100644 --- a/integration_tests/examples/raw_channel_output.rs +++ b/integration_tests/examples/raw_channel_output.rs @@ -113,6 +113,7 @@ fn main() { input_options: InputOptions::Mp4(Mp4Options { source: Source::File(root_dir().join(BUNNY_FILE_PATH)), should_loop: false, + video_decoder: compositor_pipeline::pipeline::VideoDecoder::FFmpegH264, }), queue_options: QueueInputOptions { required: true, diff --git a/integration_tests/examples/vulkan.rs b/integration_tests/examples/vulkan.rs index 46e8e14e7..6e9ed6c11 100644 --- a/integration_tests/examples/vulkan.rs +++ b/integration_tests/examples/vulkan.rs @@ -1,17 +1,8 @@ use anyhow::Result; -use integration_tests::examples::download_all_assets; -use live_compositor::{ - config::read_config, - logger::{self}, -}; +use integration_tests::examples::run_example; fn main() { - ffmpeg_next::format::network::init(); - logger::init_logger(read_config().logger); - - download_all_assets().unwrap(); - - client_code().unwrap(); + run_example(client_code); } #[cfg(target_os = "macos")] @@ -22,45 +13,10 @@ fn client_code() -> Result<()> { #[cfg(target_os = "linux")] fn client_code() -> Result<()> { use compositor_api::types::Resolution; - use compositor_pipeline::{ - pipeline::{ - decoder::VideoDecoderOptions, - encoder::{ - ffmpeg_h264::{EncoderPreset, Options as H264Options}, - VideoEncoderOptions, - }, - input::{ - rtp::{InputVideoStream, RtpReceiverOptions, RtpStream}, - InputOptions, - }, - output::{ - rtp::{RtpConnectionOptions, RtpSenderOptions}, - OutputOptions, OutputProtocolOptions, - }, - rtp::{RequestedPort, TransportProtocol}, - Options, OutputVideoOptions, PipelineOutputEndCondition, Port, RegisterInputOptions, - RegisterOutputOptions, VideoCodec, VideoDecoder, - }, - queue::QueueInputOptions, - Pipeline, - }; - use compositor_render::{ - error::ErrorStack, - scene::{ - Component, ComponentId, HorizontalAlign, InputStreamComponent, RGBAColor, - TilesComponent, VerticalAlign, - }, - InputId, OutputId, - }; - use live_compositor::config::read_config; - use signal_hook::{consts, iterator::Signals}; - use std::{ - sync::{Arc, Mutex}, - time::Duration, - }; + use serde_json::json; use integration_tests::{ - examples::TestSample, + examples::{self, TestSample}, ffmpeg::{start_ffmpeg_receive, start_ffmpeg_send}, }; @@ -76,119 +32,63 @@ fn client_code() -> Result<()> { const VIDEOS: u16 = 6; start_ffmpeg_receive(Some(OUTPUT_PORT), None)?; - let config = read_config(); - let (pipeline, event_loop) = Pipeline::new(Options { - queue_options: config.queue_options, - stream_fallback_timeout: config.stream_fallback_timeout, - web_renderer: config.web_renderer, - force_gpu: config.force_gpu, - download_root: config.download_root, - output_sample_rate: config.output_sample_rate, - wgpu_features: config.required_wgpu_features, - load_system_fonts: Some(true), - wgpu_ctx: None, - }) - .unwrap_or_else(|err| { - panic!( - "Failed to start compositor.\n{}", - ErrorStack::new(&err).into_string() - ) - }); - - let pipeline = Arc::new(Mutex::new(pipeline)); - let mut children = Vec::new(); for i in 0..VIDEOS { - let input_id = InputId(format!("input_{i}").into()); - - let input_options = RegisterInputOptions { - input_options: InputOptions::Rtp(RtpReceiverOptions { - port: RequestedPort::Exact(INPUT_PORT + 2 * i), - transport_protocol: TransportProtocol::Udp, - stream: RtpStream { - video: Some(InputVideoStream { - options: VideoDecoderOptions { - decoder: VideoDecoder::VulkanVideoH264, - }, - }), - audio: None, - }, + let input_id = format!("input_{i}"); + + examples::post( + &format!("input/{input_id}/register"), + &json!({ + "type": "rtp_stream", + "port": INPUT_PORT + i * 2, + "video": { + "decoder": "vulkan_video" + } }), - queue_options: QueueInputOptions { - offset: Some(Duration::ZERO), - required: false, - buffer_duration: None, - }, - }; - - Pipeline::register_input(&pipeline, input_id.clone(), input_options).unwrap(); + )?; - children.push(Component::InputStream(InputStreamComponent { - id: None, - input_id, + children.push(json!({ + "type": "input_stream", + "input_id": input_id, })); } - let output_options = RegisterOutputOptions { - output_options: OutputOptions { - output_protocol: OutputProtocolOptions::Rtp(RtpSenderOptions { - video: Some(VideoCodec::H264), - audio: None, - connection_options: RtpConnectionOptions::Udp { - port: Port(OUTPUT_PORT), - ip: IP.into(), - }, - }), - video: Some(VideoEncoderOptions::H264(H264Options { - preset: EncoderPreset::Ultrafast, - resolution: VIDEO_RESOLUTION.into(), - raw_options: Vec::new(), - })), - audio: None, - }, - video: Some(OutputVideoOptions { - initial: Component::Tiles(TilesComponent { - id: Some(ComponentId("tiles".into())), - padding: 5.0, - background_color: RGBAColor(0x44, 0x44, 0x44, 0xff), - children, - width: None, - height: None, - margin: 0.0, - transition: None, - vertical_align: VerticalAlign::Center, - horizontal_align: HorizontalAlign::Center, - tile_aspect_ratio: (16, 9), - }), + let scene = json!({ + "type": "tiles", + "id": "tile", + "padding": 5, + "background_color_rgba": "#444444FF", + "children": children, + }); - end_condition: PipelineOutputEndCondition::Never, + examples::post( + "output/output_1/register", + &json!({ + "type": "rtp_stream", + "ip": IP, + "port": OUTPUT_PORT, + "video": { + "resolution": { + "width": VIDEO_RESOLUTION.width, + "height": VIDEO_RESOLUTION.height, + }, + "encoder": { + "type": "ffmpeg_h264", + "preset": "ultrafast", + }, + "initial": { + "root": scene, + }, + }, }), - audio: None, - }; - - pipeline - .lock() - .unwrap() - .register_output(OutputId("output_1".into()), output_options) - .unwrap(); + )?; - Pipeline::start(&pipeline); + examples::post("start", &json!({}))?; for i in 0..VIDEOS { start_ffmpeg_send(IP, Some(INPUT_PORT + 2 * i), None, TestSample::BigBuckBunny)?; } - let event_loop_fallback = || { - let mut signals = Signals::new([consts::SIGINT]).unwrap(); - signals.forever().next(); - }; - if let Err(err) = event_loop.run_with_fallback(&event_loop_fallback) { - panic!( - "Failed to start event loop.\n{}", - ErrorStack::new(&err).into_string() - ) - } - Ok(()) }