From f326ea59d70e7aa9b11965b0ed546343c3341e81 Mon Sep 17 00:00:00 2001 From: Wojciech Kozyra Date: Thu, 11 Jul 2024 17:28:03 +0200 Subject: [PATCH] Add input type that returns data via channel (#621) --- compositor_pipeline/src/pipeline.rs | 24 +- compositor_pipeline/src/pipeline/input.rs | 181 +++++++----- .../src/pipeline/input/decklink.rs | 1 + compositor_pipeline/src/pipeline/input/mp4.rs | 2 + compositor_pipeline/src/pipeline/input/rtp.rs | 1 + .../src/pipeline/pipeline_input.rs | 35 +-- compositor_pipeline/src/pipeline/types.rs | 13 +- compositor_render/src/state/render_loop.rs | 32 ++- compositor_render/src/wgpu.rs | 1 + compositor_render/src/wgpu/ctx.rs | 7 +- compositor_render/src/wgpu/texture.rs | 271 ++++++++---------- compositor_render/src/wgpu/texture/base.rs | 61 +++- compositor_render/src/wgpu/utils.rs | 22 ++ .../src/wgpu/utils/r8_fill_value.wgsl | 26 ++ .../src/wgpu/utils/r8_fill_with_color.rs | 87 ++++++ .../examples/encoded_channel_output.rs | 2 +- .../examples/raw_channel_input.rs | 223 ++++++++++++++ .../examples/raw_channel_output.rs | 2 +- integration_tests/src/lib.rs | 4 +- integration_tests/src/test_input.rs | 92 ++++++ src/routes/register_request.rs | 2 +- 21 files changed, 826 insertions(+), 263 deletions(-) create mode 100644 compositor_render/src/wgpu/utils.rs create mode 100644 compositor_render/src/wgpu/utils/r8_fill_value.wgsl create mode 100644 compositor_render/src/wgpu/utils/r8_fill_with_color.rs create mode 100644 integration_tests/examples/raw_channel_input.rs create mode 100644 integration_tests/src/test_input.rs diff --git a/compositor_pipeline/src/pipeline.rs b/compositor_pipeline/src/pipeline.rs index 1e6e1089f..5565b8373 100644 --- a/compositor_pipeline/src/pipeline.rs +++ b/compositor_pipeline/src/pipeline.rs @@ -19,10 +19,13 @@ use compositor_render::WgpuFeatures; use compositor_render::{error::UpdateSceneError, Renderer}; use compositor_render::{EventLoop, InputId, OutputId, RendererId, RendererSpec}; use crossbeam_channel::{bounded, Receiver}; +use input::InputInitInfo; +use input::RawDataInputOptions; use output::EncodedDataOutputOptions; use output::OutputOptions; use output::RawDataOutputOptions; use tracing::{error, info, trace, warn}; +use types::RawDataSender; use crate::audio_mixer::AudioMixer; use crate::audio_mixer::MixingStrategy; @@ -34,6 +37,7 @@ use crate::error::{ use crate::pipeline::pipeline_output::OutputSender; use crate::queue::PipelineEvent; use crate::queue::QueueAudioOutput; +use crate::queue::QueueInputOptions; use crate::queue::{self, Queue, QueueOptions, QueueVideoOutput}; use self::input::InputOptions; @@ -112,7 +116,7 @@ pub struct Options { pub wgpu_features: WgpuFeatures, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct PipelineCtx { pub output_sample_rate: u32, pub output_framerate: Framerate, @@ -159,8 +163,22 @@ impl Pipeline { pipeline: &Arc>, input_id: InputId, register_options: RegisterInputOptions, - ) -> Result, RegisterInputError> { - register_pipeline_input(pipeline, input_id, register_options) + ) -> Result { + register_pipeline_input( + pipeline, + input_id, + ®ister_options.input_options, + register_options.queue_options, + ) + } + + pub fn register_raw_data_input( + pipeline: &Arc>, + input_id: InputId, + raw_input_options: RawDataInputOptions, + queue_options: QueueInputOptions, + ) -> Result { + register_pipeline_input(pipeline, input_id, &raw_input_options, queue_options) } pub fn unregister_input(&mut self, input_id: &InputId) -> Result<(), UnregisterInputError> { diff --git a/compositor_pipeline/src/pipeline/input.rs b/compositor_pipeline/src/pipeline/input.rs index c02f50724..e0342df4d 100644 --- a/compositor_pipeline/src/pipeline/input.rs +++ b/compositor_pipeline/src/pipeline/input.rs @@ -1,6 +1,7 @@ -use std::path::Path; - -use crate::{error::InputInitError, queue::PipelineEvent}; +use crate::{ + error::{InputInitError, RegisterInputError}, + queue::PipelineEvent, +}; use compositor_render::{Frame, InputId}; use crossbeam_channel::{bounded, Receiver}; @@ -13,8 +14,8 @@ use super::{ start_audio_decoder_thread, start_audio_resampler_only_thread, start_video_decoder_thread, AudioDecoderOptions, DecodedDataReceiver, VideoDecoderOptions, }, - types::{DecodedSamples, EncodedChunk}, - Port, + types::{DecodedSamples, EncodedChunk, RawDataSender}, + PipelineCtx, Port, }; #[cfg(feature = "decklink")] @@ -27,15 +28,116 @@ pub enum Input { Mp4(Mp4), #[cfg(feature = "decklink")] DeckLink(decklink::DeckLink), + RawDataInput, +} + +#[derive(Debug, Clone)] +pub enum InputOptions { + Rtp(RtpReceiverOptions), + Mp4(Mp4Options), + #[cfg(feature = "decklink")] + DeckLink(decklink::DeckLinkOptions), +} + +#[derive(Debug, Clone)] +pub struct RawDataInputOptions { + pub video: bool, + pub audio: bool, +} + +pub struct InputInitInfo { + pub port: Option, +} + +struct InputInitResult { + input: Input, + video: Option, + audio: Option, + init_info: InputInitInfo, +} + +pub(super) enum VideoInputReceiver { + #[allow(dead_code)] + Raw { + frame_receiver: Receiver>, + }, + Encoded { + chunk_receiver: Receiver>, + decoder_options: VideoDecoderOptions, + }, +} + +pub(super) enum AudioInputReceiver { + #[allow(dead_code)] + Raw { + sample_receiver: Receiver>, + sample_rate: u32, + }, + Encoded { + chunk_receiver: Receiver>, + decoder_options: AudioDecoderOptions, + }, +} + +pub(super) trait InputOptionsExt { + fn new_input( + &self, + input_id: &InputId, + ctx: &PipelineCtx, + ) -> Result<(Input, DecodedDataReceiver, NewInputResult), RegisterInputError>; +} + +impl InputOptionsExt for InputOptions { + fn new_input( + &self, + input_id: &InputId, + ctx: &PipelineCtx, + ) -> Result<(Input, DecodedDataReceiver, InputInitInfo), RegisterInputError> { + start_input_threads(input_id, self.clone(), ctx) + .map_err(|e| RegisterInputError::InputError(input_id.clone(), e)) + } +} + +impl InputOptionsExt for RawDataInputOptions { + fn new_input( + &self, + _input_id: &InputId, + _ctx: &PipelineCtx, + ) -> Result<(Input, DecodedDataReceiver, RawDataSender), RegisterInputError> { + let (video_sender, video_receiver) = match self.video { + true => { + let (sender, receiver) = bounded(1000); + (Some(sender), Some(receiver)) + } + false => (None, None), + }; + let (audio_sender, audio_receiver) = match self.audio { + true => { + let (sender, receiver) = bounded(1000); + (Some(sender), Some(receiver)) + } + false => (None, None), + }; + Ok(( + Input::RawDataInput, + DecodedDataReceiver { + video: video_receiver, + audio: audio_receiver, + }, + RawDataSender { + video: video_sender, + audio: audio_sender, + }, + )) + } } /// Start entire processing pipeline for an input, including decoders and resamplers. -pub(super) fn start_input_threads( +fn start_input_threads( input_id: &InputId, options: InputOptions, - download_dir: &Path, - output_sample_rate: u32, -) -> Result { + pipeline_ctx: &PipelineCtx, +) -> Result<(Input, DecodedDataReceiver, InputInitInfo), InputInitError> { let InputInitResult { input, video, @@ -43,7 +145,9 @@ pub(super) fn start_input_threads( init_info, } = match options { InputOptions::Rtp(opts) => RtpReceiver::start_new_input(input_id, opts)?, - InputOptions::Mp4(opts) => Mp4::start_new_input(input_id, opts, download_dir)?, + InputOptions::Mp4(opts) => { + Mp4::start_new_input(input_id, opts, &pipeline_ctx.download_dir)? + } #[cfg(feature = "decklink")] InputOptions::DeckLink(opts) => decklink::DeckLink::start_new_input(input_id, opts)?, }; @@ -78,7 +182,7 @@ pub(super) fn start_input_threads( let (sender, receiver) = bounded(10); start_audio_resampler_only_thread( sample_rate, - output_sample_rate, + pipeline_ctx.output_sample_rate, sample_receiver, sender, input_id.clone(), @@ -92,7 +196,7 @@ pub(super) fn start_input_threads( let (sender, receiver) = bounded(10); start_audio_decoder_thread( decoder_options, - output_sample_rate, + pipeline_ctx.output_sample_rate, chunk_receiver, sender, input_id.clone(), @@ -103,56 +207,5 @@ pub(super) fn start_input_threads( } else { None }; - Ok(InputStartResult { - input, - receiver: DecodedDataReceiver { video, audio }, - init_info, - }) -} - -pub enum InputOptions { - Rtp(RtpReceiverOptions), - Mp4(Mp4Options), - #[cfg(feature = "decklink")] - DeckLink(decklink::DeckLinkOptions), -} - -pub struct InputInitInfo { - pub port: Option, -} - -struct InputInitResult { - input: Input, - video: Option, - audio: Option, - init_info: InputInitInfo, -} - -pub(super) struct InputStartResult { - pub(super) input: Input, - pub(super) receiver: DecodedDataReceiver, - pub(super) init_info: InputInitInfo, -} - -pub(super) enum VideoInputReceiver { - #[allow(dead_code)] - Raw { - frame_receiver: Receiver>, - }, - Encoded { - chunk_receiver: Receiver>, - decoder_options: VideoDecoderOptions, - }, -} - -pub(super) enum AudioInputReceiver { - #[allow(dead_code)] - Raw { - sample_receiver: Receiver>, - sample_rate: u32, - }, - Encoded { - chunk_receiver: Receiver>, - decoder_options: AudioDecoderOptions, - }, + Ok((input, DecodedDataReceiver { video, audio }, init_info)) } diff --git a/compositor_pipeline/src/pipeline/input/decklink.rs b/compositor_pipeline/src/pipeline/input/decklink.rs index e98343853..92538b417 100644 --- a/compositor_pipeline/src/pipeline/input/decklink.rs +++ b/compositor_pipeline/src/pipeline/input/decklink.rs @@ -33,6 +33,7 @@ pub struct DeckLinkInfo { pub subdevice_index: Option, } +#[derive(Debug, Clone)] pub struct DeckLinkOptions { pub subdevice_index: Option, pub display_name: Option, diff --git a/compositor_pipeline/src/pipeline/input/mp4.rs b/compositor_pipeline/src/pipeline/input/mp4.rs index 3ea666918..7d64b94c8 100644 --- a/compositor_pipeline/src/pipeline/input/mp4.rs +++ b/compositor_pipeline/src/pipeline/input/mp4.rs @@ -16,6 +16,7 @@ use super::{AudioInputReceiver, Input, InputInitInfo, InputInitResult, VideoInpu pub mod mp4_file_reader; +#[derive(Debug, Clone)] pub struct Mp4Options { pub source: Source, } @@ -31,6 +32,7 @@ pub(crate) enum Mp4ReaderOptions { }, } +#[derive(Debug, Clone)] pub enum Source { Url(String), File(PathBuf), diff --git a/compositor_pipeline/src/pipeline/input/rtp.rs b/compositor_pipeline/src/pipeline/input/rtp.rs index 43cbc720b..5520e7e29 100644 --- a/compositor_pipeline/src/pipeline/input/rtp.rs +++ b/compositor_pipeline/src/pipeline/input/rtp.rs @@ -45,6 +45,7 @@ pub enum RtpReceiverError { DepayloaderError(#[from] DepayloaderNewError), } +#[derive(Debug, Clone)] pub struct RtpReceiverOptions { pub port: RequestedPort, pub transport_protocol: TransportProtocol, diff --git a/compositor_pipeline/src/pipeline/pipeline_input.rs b/compositor_pipeline/src/pipeline/pipeline_input.rs index 05a514c8a..ae572e922 100644 --- a/compositor_pipeline/src/pipeline/pipeline_input.rs +++ b/compositor_pipeline/src/pipeline/pipeline_input.rs @@ -2,12 +2,9 @@ use std::sync::{Arc, Mutex}; use compositor_render::InputId; -use crate::{error::RegisterInputError, Pipeline}; +use crate::{error::RegisterInputError, queue::QueueInputOptions, Pipeline}; -use super::{ - input::{self, start_input_threads, InputStartResult}, - Port, RegisterInputOptions, -}; +use super::input::{self, InputOptionsExt}; pub struct PipelineInput { pub input: input::Input, @@ -20,29 +17,15 @@ pub struct PipelineInput { pub(super) video_eos_received: Option, } -pub(super) fn register_pipeline_input( +pub(super) fn register_pipeline_input( pipeline: &Arc>, input_id: InputId, - register_options: RegisterInputOptions, -) -> Result, RegisterInputError> { - let RegisterInputOptions { - input_options, - queue_options, - } = register_options; - let (download_dir, output_sample_rate) = { - let guard = pipeline.lock().unwrap(); - if guard.inputs.contains_key(&input_id) { - return Err(RegisterInputError::AlreadyRegistered(input_id)); - } - (guard.ctx.download_dir.clone(), guard.ctx.output_sample_rate) - }; + input_options: &dyn InputOptionsExt, + queue_options: QueueInputOptions, +) -> Result { + let pipeline_ctx = pipeline.lock().unwrap().ctx.clone(); - let InputStartResult { - input, - receiver, - init_info, - } = start_input_threads(&input_id, input_options, &download_dir, output_sample_rate) - .map_err(|e| RegisterInputError::InputError(input_id.clone(), e))?; + let (input, receiver, input_result) = input_options.new_input(&input_id, &pipeline_ctx)?; let (audio_eos_received, video_eos_received) = ( receiver.audio.as_ref().map(|_| false), @@ -77,7 +60,7 @@ pub(super) fn register_pipeline_input( guard.queue.add_input(&input_id, receiver, queue_options); guard.renderer.register_input(input_id); - Ok(init_info.port) + Ok(input_result) } impl PipelineInput { diff --git a/compositor_pipeline/src/pipeline/types.rs b/compositor_pipeline/src/pipeline/types.rs index 852810adb..18d4f62f7 100644 --- a/compositor_pipeline/src/pipeline/types.rs +++ b/compositor_pipeline/src/pipeline/types.rs @@ -2,9 +2,12 @@ use std::{fmt, sync::Arc, time::Duration}; use bytes::Bytes; use compositor_render::Frame; -use crossbeam_channel::Receiver; +use crossbeam_channel::{Receiver, Sender}; -use crate::{audio_mixer::OutputSamples, queue::PipelineEvent}; +use crate::{ + audio_mixer::{InputSamples, OutputSamples}, + queue::PipelineEvent, +}; /// A struct representing a chunk of encoded data. /// @@ -45,6 +48,12 @@ pub struct RawDataReceiver { pub audio: Option>>, } +#[derive(Debug)] +pub struct RawDataSender { + pub video: Option>>, + pub audio: Option>>, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum VideoCodec { H264, diff --git a/compositor_render/src/state/render_loop.rs b/compositor_render/src/state/render_loop.rs index 0c2a2e7b7..29e627076 100644 --- a/compositor_render/src/state/render_loop.rs +++ b/compositor_render/src/state/render_loop.rs @@ -1,8 +1,9 @@ use std::{collections::HashMap, time::Duration}; -use tracing::{error, warn}; +use tracing::error; use crate::{ + scene::RGBColor, state::{node::RenderNode, render_graph::RenderGraph, RenderCtx}, wgpu::texture::{InputTexture, NodeTexture, PlanarYuvPendingDownload}, Frame, FrameData, FrameSet, InputId, OutputFrameFormat, OutputId, Resolution, @@ -74,7 +75,7 @@ pub(super) fn read_outputs( let texture = node .rgba_texture() .texture() - .copy_to_wgpu_texture(ctx.wgpu_ctx); + .copy_wgpu_texture(ctx.wgpu_ctx); let size = texture.size(); let frame = Frame { data: FrameData::Rgba8UnormWgpuTexture(texture.into()), @@ -91,10 +92,29 @@ pub(super) fn read_outputs( } }, None => { - warn!(?output_id, "Missing output texture"); - // registering output is not possible without a scene so this - // should never happen - continue; + let (y, u, v) = RGBColor::BLACK.to_yuv(); + ctx.wgpu_ctx.utils.fill_r8_with_value( + ctx.wgpu_ctx, + output.output_texture.yuv_textures().plane(0), + y, + ); + ctx.wgpu_ctx.utils.fill_r8_with_value( + ctx.wgpu_ctx, + output.output_texture.yuv_textures().plane(1), + u, + ); + ctx.wgpu_ctx.utils.fill_r8_with_value( + ctx.wgpu_ctx, + output.output_texture.yuv_textures().plane(2), + v, + ); + + let pending_download = output.output_texture.start_download(ctx.wgpu_ctx); + partial_textures.push(PartialOutputFrame::PendingYuvDownload { + output_id: output_id.clone(), + pending_download, + resolution: output.output_texture.resolution().to_owned(), + }); } }; } diff --git a/compositor_render/src/wgpu.rs b/compositor_render/src/wgpu.rs index d5930a300..13c7d145e 100644 --- a/compositor_render/src/wgpu.rs +++ b/compositor_render/src/wgpu.rs @@ -4,6 +4,7 @@ pub(crate) mod common_pipeline; mod ctx; pub(crate) mod format; pub(crate) mod texture; +pub(crate) mod utils; pub use ctx::use_global_wgpu_ctx; pub(crate) use ctx::WgpuCtx; diff --git a/compositor_render/src/wgpu/ctx.rs b/compositor_render/src/wgpu/ctx.rs index 3b4d13dc9..0ab10c8e0 100644 --- a/compositor_render/src/wgpu/ctx.rs +++ b/compositor_render/src/wgpu/ctx.rs @@ -3,8 +3,8 @@ use std::sync::{atomic::AtomicBool, Arc, OnceLock}; use log::{error, info}; use super::{ - common_pipeline::plane::Plane, format::TextureFormat, texture::Texture, CreateWgpuCtxError, - WgpuErrorScope, + common_pipeline::plane::Plane, format::TextureFormat, texture::Texture, utils::TextureUtils, + CreateWgpuCtxError, WgpuErrorScope, }; static USE_GLOBAL_WGPU_CTX: AtomicBool = AtomicBool::new(false); @@ -31,6 +31,7 @@ pub struct WgpuCtx { pub shader_header: naga::Module, pub format: TextureFormat, + pub utils: TextureUtils, pub uniform_bgl: wgpu::BindGroupLayout, pub plane: Plane, @@ -98,6 +99,7 @@ impl WgpuCtx { let scope = WgpuErrorScope::push(&device); let format = TextureFormat::new(&device); + let utils = TextureUtils::new(&device); let uniform_bgl = uniform_bind_group_layout(&device); @@ -115,6 +117,7 @@ impl WgpuCtx { queue: queue.into(), shader_header, format, + utils, uniform_bgl, plane, empty_texture, diff --git a/compositor_render/src/wgpu/texture.rs b/compositor_render/src/wgpu/texture.rs index 8d7daf301..280545dff 100644 --- a/compositor_render/src/wgpu/texture.rs +++ b/compositor_render/src/wgpu/texture.rs @@ -1,11 +1,11 @@ -use std::{io::Write, mem}; +use std::{io::Write, mem, sync::Arc}; use bytes::{BufMut, Bytes, BytesMut}; use crossbeam_channel::bounded; use log::error; use wgpu::{Buffer, BufferAsyncError, MapMode}; -use crate::{Frame, FrameData, Resolution}; +use crate::{Frame, FrameData, Resolution, YuvPlanes}; use self::utils::{pad_to_256, texture_size_to_resolution}; @@ -28,179 +28,164 @@ pub type Texture = base::Texture; pub use planar_yuv::YuvPendingDownload as PlanarYuvPendingDownload; -struct InputTextureState { - textures: InnerInputTexture, - bind_group: wgpu::BindGroup, -} - -enum InnerInputTexture { - PlanarYuvTextures(PlanarYuvTextures), - InterleavedYuv422Texture(InterleavedYuv422Texture), -} - -#[derive(Debug, Clone, Copy, PartialEq)] -enum InputTextureKind { - PlanarYuvTextures, - InterleavedYuv422Texture, +enum InputTextureState { + PlanarYuvTextures { + textures: PlanarYuvTextures, + bind_group: wgpu::BindGroup, + }, + InterleavedYuv422Texture { + texture: InterleavedYuv422Texture, + bind_group: wgpu::BindGroup, + }, + Rgba8UnormWgpuTexture(Arc), } impl InputTextureState { fn resolution(&self) -> Resolution { - match &self.textures { - InnerInputTexture::PlanarYuvTextures(texture) => texture.resolution, - InnerInputTexture::InterleavedYuv422Texture(texture) => texture.resolution, - } - } -} - -impl InnerInputTexture { - fn kind(&self) -> InputTextureKind { - match self { - InnerInputTexture::PlanarYuvTextures(_) => InputTextureKind::PlanarYuvTextures, - InnerInputTexture::InterleavedYuv422Texture(_) => { - InputTextureKind::InterleavedYuv422Texture + match &self { + InputTextureState::PlanarYuvTextures { textures, .. } => textures.resolution, + InputTextureState::InterleavedYuv422Texture { texture, .. } => texture.resolution, + InputTextureState::Rgba8UnormWgpuTexture(texture) => { + let size = texture.size(); + Resolution { + width: size.width as usize, + height: size.height as usize, + } } } } } -pub struct InputTexture(OptionalState); +pub struct InputTexture(Option); impl InputTexture { pub fn new() -> Self { - Self(OptionalState::new()) + Self(None) } pub fn clear(&mut self) { - self.0.clear() + self.0 = None; } pub fn upload(&mut self, ctx: &WgpuCtx, frame: Frame) { match frame.data { - FrameData::PlanarYuv420(planes) => { - let state = self.ensure_type_and_size( - ctx, - frame.resolution, - InputTextureKind::PlanarYuvTextures, - ); - - let InnerInputTexture::PlanarYuvTextures(textures) = &mut state.textures else { - error!("Invalid texture format."); - return; - }; - textures.upload(ctx, &planes, planar_yuv::YuvVariant::YUV420) - } - FrameData::PlanarYuvJ420(planes) => { - let state = self.ensure_type_and_size( - ctx, - frame.resolution, - InputTextureKind::PlanarYuvTextures, - ); - let InnerInputTexture::PlanarYuvTextures(textures) = &mut state.textures else { - error!("Invalid texture format."); - return; - }; - textures.upload(ctx, &planes, planar_yuv::YuvVariant::YUVJ420) - } + FrameData::PlanarYuv420(planes) => self.upload_planar_yuv( + ctx, + planes, + frame.resolution, + planar_yuv::YuvVariant::YUV420, + ), + FrameData::PlanarYuvJ420(planes) => self.upload_planar_yuv( + ctx, + planes, + frame.resolution, + planar_yuv::YuvVariant::YUVJ420, + ), FrameData::InterleavedYuv422(data) => { - let state = self.ensure_type_and_size( - ctx, - frame.resolution, - InputTextureKind::InterleavedYuv422Texture, - ); - let InnerInputTexture::InterleavedYuv422Texture(textures) = &mut state.textures - else { - error!("Invalid texture format."); - return; - }; - textures.upload(ctx, &data) + self.upload_interleaved_yuv(ctx, data, frame.resolution) } - FrameData::Rgba8UnormWgpuTexture(_) => { - // TODO: Implement wgpu texture input - error!("Unsupported input format") + FrameData::Rgba8UnormWgpuTexture(texture) => { + self.0 = Some(InputTextureState::Rgba8UnormWgpuTexture(texture)) } } } - pub fn convert_to_node_texture(&self, ctx: &WgpuCtx, dest: &mut NodeTexture) { - match self.state() { - Some(input_texture) => { - let dest_state = dest.ensure_size(ctx, input_texture.resolution()); - match &input_texture.textures { - InnerInputTexture::PlanarYuvTextures(textures) => { - ctx.format.convert_planar_yuv_to_rgba( - ctx, - (textures, &input_texture.bind_group), - dest_state.rgba_texture(), - ) - } - InnerInputTexture::InterleavedYuv422Texture(texture) => { - ctx.format.convert_interleaved_yuv_to_rgba( - ctx, - (texture, &input_texture.bind_group), - dest_state.rgba_texture(), - ) - } - } + fn upload_planar_yuv( + &mut self, + ctx: &WgpuCtx, + planes: YuvPlanes, + resolution: Resolution, + variant: planar_yuv::YuvVariant, + ) { + let should_recreate = match &self.0 { + Some(state) => { + !matches!(state, InputTextureState::PlanarYuvTextures { .. }) + || resolution != state.resolution() } - None => dest.clear(), + None => true, + }; + + if should_recreate { + let textures = PlanarYuvTextures::new(ctx, resolution); + let bind_group = textures.new_bind_group(ctx, ctx.format.planar_yuv_layout()); + self.0 = Some(InputTextureState::PlanarYuvTextures { + textures, + bind_group, + }) } + let Some(InputTextureState::PlanarYuvTextures { textures, .. }) = self.0.as_mut() else { + error!("Invalid texture format."); + return; + }; + textures.upload(ctx, &planes, variant) } - fn ensure_type_and_size<'a>( - &'a mut self, + fn upload_interleaved_yuv( + &mut self, ctx: &WgpuCtx, - new_resolution: Resolution, - new_texture_kind: InputTextureKind, - ) -> &'a mut InputTextureState { - fn new_state( - ctx: &WgpuCtx, - new_resolution: Resolution, - new_texture_kind: InputTextureKind, - ) -> InputTextureState { - match new_texture_kind { - InputTextureKind::PlanarYuvTextures => { - let textures = PlanarYuvTextures::new(ctx, new_resolution); - let bind_group = textures.new_bind_group(ctx, ctx.format.planar_yuv_layout()); - InputTextureState { - textures: InnerInputTexture::PlanarYuvTextures(textures), - bind_group, - } - } - InputTextureKind::InterleavedYuv422Texture => { - let textures = InterleavedYuv422Texture::new(ctx, new_resolution); - let bind_group = - textures.new_bind_group(ctx, ctx.format.interleaved_yuv_layout()); - InputTextureState { - textures: InnerInputTexture::InterleavedYuv422Texture(textures), - bind_group, - } - } + data: bytes::Bytes, + resolution: Resolution, + ) { + let should_recreate = match &self.0 { + Some(state) => { + !matches!(state, InputTextureState::InterleavedYuv422Texture { .. }) + || resolution != state.resolution() } + None => true, + }; + + if should_recreate { + let texture = InterleavedYuv422Texture::new(ctx, resolution); + let bind_group = texture.new_bind_group(ctx, ctx.format.interleaved_yuv_layout()); + + self.0 = Some(InputTextureState::InterleavedYuv422Texture { + texture, + bind_group, + }); } - self.0 = match self.0.replace(OptionalState::None) { - OptionalState::Some(state) | OptionalState::NoneWithOldState(state) => { - if state.resolution() == new_resolution && state.textures.kind() == new_texture_kind - { - OptionalState::Some(state) - } else { - OptionalState::Some(new_state(ctx, new_resolution, new_texture_kind)) - } - } - OptionalState::None => { - OptionalState::Some(new_state(ctx, new_resolution, new_texture_kind)) - } + let Some(InputTextureState::InterleavedYuv422Texture { texture, .. }) = self.0.as_mut() + else { + error!("Invalid texture format."); + return; }; - self.state_mut().unwrap() + texture.upload(ctx, &data) } - fn state(&self) -> Option<&InputTextureState> { - self.0.state() - } - - fn state_mut(&mut self) -> Option<&mut InputTextureState> { - self.0.state_mut() + pub fn convert_to_node_texture(&self, ctx: &WgpuCtx, dest: &mut NodeTexture) { + match &self.0 { + Some(input_texture) => { + let dest_state = dest.ensure_size(ctx, input_texture.resolution()); + match &input_texture { + InputTextureState::PlanarYuvTextures { + textures, + bind_group, + } => ctx.format.convert_planar_yuv_to_rgba( + ctx, + (textures, bind_group), + dest_state.rgba_texture(), + ), + InputTextureState::InterleavedYuv422Texture { + texture, + bind_group, + } => ctx.format.convert_interleaved_yuv_to_rgba( + ctx, + (texture, bind_group), + dest_state.rgba_texture(), + ), + InputTextureState::Rgba8UnormWgpuTexture(texture) => { + if let Err(err) = dest_state + .rgba_texture() + .texture() + .fill_from_wgpu_texture(ctx, texture) + { + error!("Invalid texture passed as an input: {err}") + } + } + } + } + None => dest.clear(), + } } } @@ -394,14 +379,6 @@ impl OptionalState { } } - fn state_mut(&mut self) -> Option<&mut State> { - match self { - OptionalState::None => None, - OptionalState::NoneWithOldState(_) => None, - OptionalState::Some(ref mut state) => Some(state), - } - } - fn replace(&mut self, replacement: Self) -> Self { mem::replace(self, replacement) } diff --git a/compositor_render/src/wgpu/texture/base.rs b/compositor_render/src/wgpu/texture/base.rs index cd9c655b8..8cb6e415b 100644 --- a/compositor_render/src/wgpu/texture/base.rs +++ b/compositor_render/src/wgpu/texture/base.rs @@ -38,10 +38,8 @@ impl Texture { Self { texture, view } } - pub fn copy_to_wgpu_texture(&self, ctx: &WgpuCtx) -> wgpu::Texture { - let mut encoder = ctx.device.create_command_encoder(&Default::default()); - - let dst_texture = ctx.device.create_texture(&wgpu::TextureDescriptor { + pub fn copy_wgpu_texture(&self, ctx: &WgpuCtx) -> wgpu::Texture { + let destination = ctx.device.create_texture(&wgpu::TextureDescriptor { label: None, size: self.texture.size(), mip_level_count: self.texture.mip_level_count(), @@ -51,13 +49,40 @@ impl Texture { usage: self.texture.usage(), view_formats: &[self.texture.format()], }); - encoder.copy_texture_to_texture( - self.texture.as_image_copy(), - dst_texture.as_image_copy(), + copy_texture_to_texture(ctx, &self.texture, &destination); + destination + } + + pub fn fill_from_wgpu_texture( + &self, + ctx: &WgpuCtx, + source: &wgpu::Texture, + ) -> Result<(), TextureCopyError> { + let expected = ( self.texture.size(), + self.texture.mip_level_count(), + self.texture.sample_count(), + self.texture.dimension(), + self.texture.format(), + self.texture.usage(), ); - ctx.queue.submit(Some(encoder.finish())); - dst_texture + let actual = ( + source.size(), + source.mip_level_count(), + source.sample_count(), + source.dimension(), + source.format(), + source.usage(), + ); + + if expected != actual { + return Err(TextureCopyError { + expected: format!("{expected:?}"), + actual: format!("{actual:?}"), + }); + } + copy_texture_to_texture(ctx, source, &self.texture); + Ok(()) } pub fn empty(device: &wgpu::Device) -> Self { @@ -137,3 +162,21 @@ impl Texture { ); } } + +#[derive(Debug, thiserror::Error)] +#[error("Passed invalid texture. Expected: {expected}, Actual: {actual}")] +pub struct TextureCopyError { + expected: String, + actual: String, +} + +fn copy_texture_to_texture(ctx: &WgpuCtx, source: &wgpu::Texture, destination: &wgpu::Texture) { + let mut encoder = ctx.device.create_command_encoder(&Default::default()); + + encoder.copy_texture_to_texture( + source.as_image_copy(), + destination.as_image_copy(), + source.size(), + ); + ctx.queue.submit(Some(encoder.finish())); +} diff --git a/compositor_render/src/wgpu/utils.rs b/compositor_render/src/wgpu/utils.rs new file mode 100644 index 000000000..15011c744 --- /dev/null +++ b/compositor_render/src/wgpu/utils.rs @@ -0,0 +1,22 @@ +use self::r8_fill_with_color::R8FillWithValue; + +use super::{texture::Texture, WgpuCtx}; + +mod r8_fill_with_color; + +#[derive(Debug)] +pub struct TextureUtils { + pub r8_fill_with_value: R8FillWithValue, +} + +impl TextureUtils { + pub fn new(device: &wgpu::Device) -> Self { + Self { + r8_fill_with_value: R8FillWithValue::new(device), + } + } + + pub fn fill_r8_with_value(&self, ctx: &WgpuCtx, dst: &Texture, value: f32) { + self.r8_fill_with_value.fill(ctx, dst, value) + } +} diff --git a/compositor_render/src/wgpu/utils/r8_fill_value.wgsl b/compositor_render/src/wgpu/utils/r8_fill_value.wgsl new file mode 100644 index 000000000..80cb7c9da --- /dev/null +++ b/compositor_render/src/wgpu/utils/r8_fill_value.wgsl @@ -0,0 +1,26 @@ +struct VertexInput { + @location(0) position: vec3, + @location(1) tex_coords: vec2, +} + +struct VertexOutput { + @builtin(position) position: vec4, + @location(0) tex_coords: vec2, +} + +@vertex +fn vs_main(input: VertexInput) -> VertexOutput { + var output: VertexOutput; + + output.position = vec4(input.position, 1.0); + output.tex_coords = input.tex_coords; + + return output; +} + +var value: f32; + +@fragment +fn fs_main(input: VertexOutput) -> @location(0) f32 { + return value; +} diff --git a/compositor_render/src/wgpu/utils/r8_fill_with_color.rs b/compositor_render/src/wgpu/utils/r8_fill_with_color.rs new file mode 100644 index 000000000..8d5ac5a14 --- /dev/null +++ b/compositor_render/src/wgpu/utils/r8_fill_with_color.rs @@ -0,0 +1,87 @@ +use wgpu::ShaderStages; + +use crate::wgpu::{ + common_pipeline::{Vertex, PRIMITIVE_STATE}, + texture::Texture, + WgpuCtx, +}; + +#[derive(Debug)] +pub struct R8FillWithValue { + pipeline: wgpu::RenderPipeline, +} + +impl R8FillWithValue { + pub fn new(device: &wgpu::Device) -> Self { + let shader_module = device.create_shader_module(wgpu::include_wgsl!("r8_fill_value.wgsl")); + + let pipeline_layout = device.create_pipeline_layout(&wgpu::PipelineLayoutDescriptor { + label: Some("Fill with value render pipeline layout"), + bind_group_layouts: &[], + push_constant_ranges: &[wgpu::PushConstantRange { + stages: wgpu::ShaderStages::FRAGMENT, + range: 0..4, + }], + }); + + let pipeline = device.create_render_pipeline(&wgpu::RenderPipelineDescriptor { + label: Some("Fill with value pipeline"), + layout: Some(&pipeline_layout), + primitive: PRIMITIVE_STATE, + vertex: wgpu::VertexState { + module: &shader_module, + entry_point: "vs_main", + buffers: &[Vertex::LAYOUT], + }, + fragment: Some(wgpu::FragmentState { + module: &shader_module, + entry_point: "fs_main", + targets: &[Some(wgpu::ColorTargetState { + format: wgpu::TextureFormat::R8Unorm, + write_mask: wgpu::ColorWrites::all(), + blend: None, + })], + }), + depth_stencil: None, + multisample: wgpu::MultisampleState { + count: 1, + mask: !0, + alpha_to_coverage_enabled: false, + }, + multiview: None, + }); + + Self { pipeline } + } + + pub fn fill(&self, ctx: &WgpuCtx, dst: &Texture, value: f32) { + let mut encoder = ctx + .device + .create_command_encoder(&wgpu::CommandEncoderDescriptor { + label: Some("Fill R8 texture with value command encoder"), + }); + + { + let mut render_pass = encoder.begin_render_pass(&wgpu::RenderPassDescriptor { + label: Some("Fill R8 texture with value render pass"), + color_attachments: &[Some(wgpu::RenderPassColorAttachment { + ops: wgpu::Operations { + load: wgpu::LoadOp::Clear(wgpu::Color::BLACK), + store: wgpu::StoreOp::Store, + }, + view: &dst.view, + resolve_target: None, + })], + depth_stencil_attachment: None, + timestamp_writes: None, + occlusion_query_set: None, + }); + + render_pass.set_pipeline(&self.pipeline); + render_pass.set_push_constants(ShaderStages::FRAGMENT, 0, bytemuck::bytes_of(&value)); + ctx.plane.draw(&mut render_pass); + } + + ctx.queue.submit(Some(encoder.finish())); + } +} diff --git a/integration_tests/examples/encoded_channel_output.rs b/integration_tests/examples/encoded_channel_output.rs index aa674e2a4..3c37b8a66 100644 --- a/integration_tests/examples/encoded_channel_output.rs +++ b/integration_tests/examples/encoded_channel_output.rs @@ -49,7 +49,7 @@ fn main() { // no chromium support, so we can ignore _event_loop let (state, _event_loop) = ApiState::new(config).unwrap_or_else(|err| { panic!( - "Failed to start event loop.\n{}", + "Failed to start compositor.\n{}", ErrorStack::new(&err).into_string() ) }); diff --git a/integration_tests/examples/raw_channel_input.rs b/integration_tests/examples/raw_channel_input.rs new file mode 100644 index 000000000..fa0f8a1ae --- /dev/null +++ b/integration_tests/examples/raw_channel_input.rs @@ -0,0 +1,223 @@ +use core::panic; +use std::{process::Command, sync::Arc, thread, time::Duration}; + +use compositor_pipeline::{ + pipeline::{ + encoder::{ + ffmpeg_h264::{self, EncoderPreset}, + VideoEncoderOptions, + }, + input::RawDataInputOptions, + output::{ + rtp::{RtpConnectionOptions, RtpSenderOptions}, + OutputOptions, OutputProtocolOptions, + }, + rtp::RequestedPort, + Pipeline, PipelineOutputEndCondition, RegisterOutputOptions, VideoCodec, + }, + queue::{PipelineEvent, QueueInputOptions}, +}; +use compositor_render::{ + error::ErrorStack, + scene::{Component, InputStreamComponent}, + Frame, FrameData, InputId, OutputId, Resolution, +}; +use integration_tests::test_input::TestInput; +use live_compositor::{ + config::{read_config, LoggerConfig, LoggerFormat}, + logger::{self, FfmpegLogLevel}, + state::ApiState, +}; + +const VIDEO_OUTPUT_PORT: u16 = 8002; + +// Start simple pipeline with input that sends PCM audio and wgpu::Textures via Rust channel. +fn main() { + ffmpeg_next::format::network::init(); + logger::init_logger(LoggerConfig { + ffmpeg_logger_level: FfmpegLogLevel::Info, + format: LoggerFormat::Compact, + level: "info,wgpu_hal=warn,wgpu_core=warn".to_string(), + }); + let config = read_config(); + // no chromium support, so we can ignore _event_loop + let (state, _event_loop) = ApiState::new(config).unwrap_or_else(|err| { + panic!( + "Failed to start compositor.\n{}", + ErrorStack::new(&err).into_string() + ) + }); + let output_id = OutputId("output_1".into()); + let input_id = InputId("input_id".into()); + + let output_options = RegisterOutputOptions { + output_options: OutputOptions { + output_protocol: OutputProtocolOptions::Rtp(RtpSenderOptions { + connection_options: RtpConnectionOptions::TcpServer { + port: RequestedPort::Exact(VIDEO_OUTPUT_PORT), + }, + video: Some(VideoCodec::H264), + audio: None, + }), + video: Some(VideoEncoderOptions::H264(ffmpeg_h264::Options { + preset: EncoderPreset::Ultrafast, + resolution: Resolution { + width: 1280, + height: 720, + }, + raw_options: vec![], + })), + audio: None, + }, + video: Some(compositor_pipeline::pipeline::OutputVideoOptions { + initial: Component::InputStream(InputStreamComponent { + id: None, + input_id: input_id.clone(), + }), + end_condition: PipelineOutputEndCondition::Never, + }), + audio: None, // TODO: add audio example + }; + + let sender = Pipeline::register_raw_data_input( + &state.pipeline, + input_id.clone(), + RawDataInputOptions { + video: true, + audio: false, + }, + QueueInputOptions { + required: true, + offset: Some(Duration::ZERO), + buffer_duration: None, + }, + ) + .unwrap(); + + let (wgpu_device, wgpu_queue) = state.pipeline.lock().unwrap().wgpu_ctx(); + + state + .pipeline + .lock() + .unwrap() + .register_output(output_id.clone(), output_options) + .unwrap(); + + let frames = generate_frames(&wgpu_device, &wgpu_queue); + + let gst_output_command = format!("gst-launch-1.0 -v tcpclientsrc host=127.0.0.1 port={VIDEO_OUTPUT_PORT} ! \"application/x-rtp-stream\" ! rtpstreamdepay ! rtph264depay ! decodebin ! videoconvert ! autovideosink"); + Command::new("bash") + .arg("-c") + .arg(gst_output_command) + .spawn() + .unwrap(); + + Pipeline::start(&state.pipeline); + + let video_sender = sender.video.unwrap(); + for frame in frames { + video_sender.send(PipelineEvent::Data(frame)).unwrap(); + } + thread::sleep(Duration::from_millis(30000)); +} + +fn generate_frames(device: &wgpu::Device, queue: &wgpu::Queue) -> Vec { + let texture_a = create_texture(0, device, queue); + let texture_b = create_texture(1, device, queue); + let texture_c = create_texture(2, device, queue); + let resolution = Resolution { + width: 640, + height: 360, + }; + let mut frames = vec![]; + + for i in 0..200 { + frames.push(Frame { + data: FrameData::Rgba8UnormWgpuTexture(texture_a.clone()), + resolution, + pts: Duration::from_millis(i * 20), + }) + } + + for i in 200..400 { + frames.push(Frame { + data: FrameData::Rgba8UnormWgpuTexture(texture_b.clone()), + resolution, + pts: Duration::from_millis(i * 20), + }) + } + + for i in 400..600 { + frames.push(Frame { + data: FrameData::Rgba8UnormWgpuTexture(texture_c.clone()), + resolution, + pts: Duration::from_millis(i * 20), + }) + } + + for i in 600..800 { + frames.push(Frame { + data: FrameData::Rgba8UnormWgpuTexture(texture_a.clone()), + resolution, + pts: Duration::from_millis(i * 20), + }) + } + + for i in 800..1000 { + frames.push(Frame { + data: FrameData::Rgba8UnormWgpuTexture(texture_b.clone()), + resolution, + pts: Duration::from_millis(i * 20), + }) + } + + for i in 1000..1200 { + frames.push(Frame { + data: FrameData::Rgba8UnormWgpuTexture(texture_c.clone()), + resolution, + pts: Duration::from_millis(i * 20), + }) + } + + frames +} + +fn create_texture(index: usize, device: &wgpu::Device, queue: &wgpu::Queue) -> Arc { + let input = TestInput::new(index); + let size = wgpu::Extent3d { + width: input.resolution.width as u32, + height: input.resolution.height as u32, + depth_or_array_layers: 1, + }; + let texture = device.create_texture(&wgpu::TextureDescriptor { + label: None, + size, + mip_level_count: 1, + sample_count: 1, + dimension: wgpu::TextureDimension::D2, + format: wgpu::TextureFormat::Rgba8Unorm, + usage: wgpu::TextureUsages::RENDER_ATTACHMENT + | wgpu::TextureUsages::COPY_DST + | wgpu::TextureUsages::COPY_SRC + | wgpu::TextureUsages::TEXTURE_BINDING, + view_formats: &[wgpu::TextureFormat::Rgba8Unorm], + }); + + queue.write_texture( + wgpu::ImageCopyTexture { + aspect: wgpu::TextureAspect::All, + mip_level: 0, + origin: wgpu::Origin3d::ZERO, + texture: &texture, + }, + &input.data, + wgpu::ImageDataLayout { + offset: 0, + bytes_per_row: Some(texture.width() * 4), + rows_per_image: Some(texture.height()), + }, + size, + ); + queue.submit([]); + texture.into() +} diff --git a/integration_tests/examples/raw_channel_output.rs b/integration_tests/examples/raw_channel_output.rs index fecdb7a41..8e4b4269a 100644 --- a/integration_tests/examples/raw_channel_output.rs +++ b/integration_tests/examples/raw_channel_output.rs @@ -53,7 +53,7 @@ fn main() { // no chromium support, so we can ignore _event_loop let (state, _event_loop) = ApiState::new(config).unwrap_or_else(|err| { panic!( - "Failed to start event loop.\n{}", + "Failed to start compositor.\n{}", ErrorStack::new(&err).into_string() ) }); diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index 7992782ce..9473d6257 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -1,13 +1,15 @@ mod audio_decoder; mod common; mod compositor_instance; -pub mod examples; mod output_receiver; mod packet_sender; mod texture; mod validation; mod video_decoder; +pub mod examples; +pub mod test_input; + #[cfg(test)] mod tests; diff --git a/integration_tests/src/test_input.rs b/integration_tests/src/test_input.rs new file mode 100644 index 000000000..2896f1a6d --- /dev/null +++ b/integration_tests/src/test_input.rs @@ -0,0 +1,92 @@ +use compositor_render::{scene::RGBColor, Resolution}; + +#[derive(Debug, Clone)] +pub struct TestInput { + pub resolution: Resolution, + pub data: Vec, +} + +impl TestInput { + const COLOR_VARIANTS: [RGBColor; 17] = [ + // RED, input_0 + RGBColor(255, 0, 0), + // GREEN, input_1 + RGBColor(0, 255, 0), + // YELLOW, input_2 + RGBColor(255, 255, 0), + // MAGENTA, input_3 + RGBColor(255, 0, 255), + // BLUE, input_4 + RGBColor(0, 0, 255), + // CYAN, input_5 + RGBColor(0, 255, 255), + // ORANGE, input_6 + RGBColor(255, 165, 0), + // WHITE, input_7 + RGBColor(255, 255, 255), + // GRAY, input_8 + RGBColor(128, 128, 128), + // LIGHT_RED, input_9 + RGBColor(255, 128, 128), + // LIGHT_BLUE, input_10 + RGBColor(128, 128, 255), + // LIGHT_GREEN, input_11 + RGBColor(128, 255, 128), + // PINK, input_12 + RGBColor(255, 192, 203), + // PURPLE, input_13 + RGBColor(128, 0, 128), + // BROWN, input_14 + RGBColor(165, 42, 42), + // YELLOW_GREEN, input_15 + RGBColor(154, 205, 50), + // LIGHT_YELLOW, input_16 + RGBColor(255, 255, 224), + ]; + + pub fn new(index: usize) -> Self { + Self::new_with_resolution( + index, + Resolution { + width: 640, + height: 360, + }, + ) + } + + pub fn new_with_resolution(index: usize, resolution: Resolution) -> Self { + let primary_color = Self::COLOR_VARIANTS[index]; + let secondary_color = Self::COLOR_VARIANTS[(index + 6) % 17]; + let mut data = vec![0; resolution.width * resolution.height * 4]; + + let color_for_pixel = |x: usize, y: usize| { + const BORDER_SIZE: usize = 18; + const GRID_SIZE: usize = 72; + + let is_border_in_x = + x <= BORDER_SIZE || (x <= resolution.width && x >= resolution.width - BORDER_SIZE); + let is_border_in_y: bool = y <= BORDER_SIZE + || (y <= resolution.height && y >= resolution.height - BORDER_SIZE); + let is_on_grid = (x / GRID_SIZE + y / GRID_SIZE) % 2 == 0; + + if is_border_in_x || is_border_in_y || is_on_grid { + secondary_color + } else { + primary_color + } + }; + + for x_coord in 0..resolution.width { + for y_coord in 0..resolution.height { + let RGBColor(r, g, b) = color_for_pixel(x_coord, y_coord); + + data[(y_coord * resolution.width + x_coord) * 4] = r; + data[(y_coord * resolution.width + x_coord) * 4 + 1] = g; + data[(y_coord * resolution.width + x_coord) * 4 + 2] = b; + data[(y_coord * resolution.width + x_coord) * 4 + 3] = u8::MAX; + } + } + + Self { resolution, data } + } +} diff --git a/src/routes/register_request.rs b/src/routes/register_request.rs index aee1e80a1..4f9a2b5d9 100644 --- a/src/routes/register_request.rs +++ b/src/routes/register_request.rs @@ -48,7 +48,7 @@ pub(super) async fn handle_input( Pipeline::register_input(&api.pipeline, input_id.into(), decklink.try_into()?)? } }; - match response { + match response.port { Some(Port(port)) => Ok(Response::RegisteredPort { port }), None => Ok(Response::Ok {}), }