Skip to content

Commit

Permalink
Add input type that returns data via channel (#621)
Browse files Browse the repository at this point in the history
  • Loading branch information
wkozyra95 authored Jul 11, 2024
1 parent 516ea16 commit f326ea5
Show file tree
Hide file tree
Showing 21 changed files with 826 additions and 263 deletions.
24 changes: 21 additions & 3 deletions compositor_pipeline/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ use compositor_render::WgpuFeatures;
use compositor_render::{error::UpdateSceneError, Renderer};
use compositor_render::{EventLoop, InputId, OutputId, RendererId, RendererSpec};
use crossbeam_channel::{bounded, Receiver};
use input::InputInitInfo;
use input::RawDataInputOptions;
use output::EncodedDataOutputOptions;
use output::OutputOptions;
use output::RawDataOutputOptions;
use tracing::{error, info, trace, warn};
use types::RawDataSender;

use crate::audio_mixer::AudioMixer;
use crate::audio_mixer::MixingStrategy;
Expand All @@ -34,6 +37,7 @@ use crate::error::{
use crate::pipeline::pipeline_output::OutputSender;
use crate::queue::PipelineEvent;
use crate::queue::QueueAudioOutput;
use crate::queue::QueueInputOptions;
use crate::queue::{self, Queue, QueueOptions, QueueVideoOutput};

use self::input::InputOptions;
Expand Down Expand Up @@ -112,7 +116,7 @@ pub struct Options {
pub wgpu_features: WgpuFeatures,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct PipelineCtx {
pub output_sample_rate: u32,
pub output_framerate: Framerate,
Expand Down Expand Up @@ -159,8 +163,22 @@ impl Pipeline {
pipeline: &Arc<Mutex<Self>>,
input_id: InputId,
register_options: RegisterInputOptions,
) -> Result<Option<Port>, RegisterInputError> {
register_pipeline_input(pipeline, input_id, register_options)
) -> Result<InputInitInfo, RegisterInputError> {
register_pipeline_input(
pipeline,
input_id,
&register_options.input_options,
register_options.queue_options,
)
}

pub fn register_raw_data_input(
pipeline: &Arc<Mutex<Self>>,
input_id: InputId,
raw_input_options: RawDataInputOptions,
queue_options: QueueInputOptions,
) -> Result<RawDataSender, RegisterInputError> {
register_pipeline_input(pipeline, input_id, &raw_input_options, queue_options)
}

pub fn unregister_input(&mut self, input_id: &InputId) -> Result<(), UnregisterInputError> {
Expand Down
181 changes: 117 additions & 64 deletions compositor_pipeline/src/pipeline/input.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::path::Path;

use crate::{error::InputInitError, queue::PipelineEvent};
use crate::{
error::{InputInitError, RegisterInputError},
queue::PipelineEvent,
};

use compositor_render::{Frame, InputId};
use crossbeam_channel::{bounded, Receiver};
Expand All @@ -13,8 +14,8 @@ use super::{
start_audio_decoder_thread, start_audio_resampler_only_thread, start_video_decoder_thread,
AudioDecoderOptions, DecodedDataReceiver, VideoDecoderOptions,
},
types::{DecodedSamples, EncodedChunk},
Port,
types::{DecodedSamples, EncodedChunk, RawDataSender},
PipelineCtx, Port,
};

#[cfg(feature = "decklink")]
Expand All @@ -27,23 +28,126 @@ pub enum Input {
Mp4(Mp4),
#[cfg(feature = "decklink")]
DeckLink(decklink::DeckLink),
RawDataInput,
}

#[derive(Debug, Clone)]
pub enum InputOptions {
Rtp(RtpReceiverOptions),
Mp4(Mp4Options),
#[cfg(feature = "decklink")]
DeckLink(decklink::DeckLinkOptions),
}

#[derive(Debug, Clone)]
pub struct RawDataInputOptions {
pub video: bool,
pub audio: bool,
}

pub struct InputInitInfo {
pub port: Option<Port>,
}

struct InputInitResult {
input: Input,
video: Option<VideoInputReceiver>,
audio: Option<AudioInputReceiver>,
init_info: InputInitInfo,
}

pub(super) enum VideoInputReceiver {
#[allow(dead_code)]
Raw {
frame_receiver: Receiver<PipelineEvent<Frame>>,
},
Encoded {
chunk_receiver: Receiver<PipelineEvent<EncodedChunk>>,
decoder_options: VideoDecoderOptions,
},
}

pub(super) enum AudioInputReceiver {
#[allow(dead_code)]
Raw {
sample_receiver: Receiver<PipelineEvent<DecodedSamples>>,
sample_rate: u32,
},
Encoded {
chunk_receiver: Receiver<PipelineEvent<EncodedChunk>>,
decoder_options: AudioDecoderOptions,
},
}

pub(super) trait InputOptionsExt<NewInputResult> {
fn new_input(
&self,
input_id: &InputId,
ctx: &PipelineCtx,
) -> Result<(Input, DecodedDataReceiver, NewInputResult), RegisterInputError>;
}

impl InputOptionsExt<InputInitInfo> for InputOptions {
fn new_input(
&self,
input_id: &InputId,
ctx: &PipelineCtx,
) -> Result<(Input, DecodedDataReceiver, InputInitInfo), RegisterInputError> {
start_input_threads(input_id, self.clone(), ctx)
.map_err(|e| RegisterInputError::InputError(input_id.clone(), e))
}
}

impl InputOptionsExt<RawDataSender> for RawDataInputOptions {
fn new_input(
&self,
_input_id: &InputId,
_ctx: &PipelineCtx,
) -> Result<(Input, DecodedDataReceiver, RawDataSender), RegisterInputError> {
let (video_sender, video_receiver) = match self.video {
true => {
let (sender, receiver) = bounded(1000);
(Some(sender), Some(receiver))
}
false => (None, None),
};
let (audio_sender, audio_receiver) = match self.audio {
true => {
let (sender, receiver) = bounded(1000);
(Some(sender), Some(receiver))
}
false => (None, None),
};
Ok((
Input::RawDataInput,
DecodedDataReceiver {
video: video_receiver,
audio: audio_receiver,
},
RawDataSender {
video: video_sender,
audio: audio_sender,
},
))
}
}

/// Start entire processing pipeline for an input, including decoders and resamplers.
pub(super) fn start_input_threads(
fn start_input_threads(
input_id: &InputId,
options: InputOptions,
download_dir: &Path,
output_sample_rate: u32,
) -> Result<InputStartResult, InputInitError> {
pipeline_ctx: &PipelineCtx,
) -> Result<(Input, DecodedDataReceiver, InputInitInfo), InputInitError> {
let InputInitResult {
input,
video,
audio,
init_info,
} = match options {
InputOptions::Rtp(opts) => RtpReceiver::start_new_input(input_id, opts)?,
InputOptions::Mp4(opts) => Mp4::start_new_input(input_id, opts, download_dir)?,
InputOptions::Mp4(opts) => {
Mp4::start_new_input(input_id, opts, &pipeline_ctx.download_dir)?
}
#[cfg(feature = "decklink")]
InputOptions::DeckLink(opts) => decklink::DeckLink::start_new_input(input_id, opts)?,
};
Expand Down Expand Up @@ -78,7 +182,7 @@ pub(super) fn start_input_threads(
let (sender, receiver) = bounded(10);
start_audio_resampler_only_thread(
sample_rate,
output_sample_rate,
pipeline_ctx.output_sample_rate,
sample_receiver,
sender,
input_id.clone(),
Expand All @@ -92,7 +196,7 @@ pub(super) fn start_input_threads(
let (sender, receiver) = bounded(10);
start_audio_decoder_thread(
decoder_options,
output_sample_rate,
pipeline_ctx.output_sample_rate,
chunk_receiver,
sender,
input_id.clone(),
Expand All @@ -103,56 +207,5 @@ pub(super) fn start_input_threads(
} else {
None
};
Ok(InputStartResult {
input,
receiver: DecodedDataReceiver { video, audio },
init_info,
})
}

pub enum InputOptions {
Rtp(RtpReceiverOptions),
Mp4(Mp4Options),
#[cfg(feature = "decklink")]
DeckLink(decklink::DeckLinkOptions),
}

pub struct InputInitInfo {
pub port: Option<Port>,
}

struct InputInitResult {
input: Input,
video: Option<VideoInputReceiver>,
audio: Option<AudioInputReceiver>,
init_info: InputInitInfo,
}

pub(super) struct InputStartResult {
pub(super) input: Input,
pub(super) receiver: DecodedDataReceiver,
pub(super) init_info: InputInitInfo,
}

pub(super) enum VideoInputReceiver {
#[allow(dead_code)]
Raw {
frame_receiver: Receiver<PipelineEvent<Frame>>,
},
Encoded {
chunk_receiver: Receiver<PipelineEvent<EncodedChunk>>,
decoder_options: VideoDecoderOptions,
},
}

pub(super) enum AudioInputReceiver {
#[allow(dead_code)]
Raw {
sample_receiver: Receiver<PipelineEvent<DecodedSamples>>,
sample_rate: u32,
},
Encoded {
chunk_receiver: Receiver<PipelineEvent<EncodedChunk>>,
decoder_options: AudioDecoderOptions,
},
Ok((input, DecodedDataReceiver { video, audio }, init_info))
}
1 change: 1 addition & 0 deletions compositor_pipeline/src/pipeline/input/decklink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct DeckLinkInfo {
pub subdevice_index: Option<u32>,
}

#[derive(Debug, Clone)]
pub struct DeckLinkOptions {
pub subdevice_index: Option<u32>,
pub display_name: Option<String>,
Expand Down
2 changes: 2 additions & 0 deletions compositor_pipeline/src/pipeline/input/mp4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use super::{AudioInputReceiver, Input, InputInitInfo, InputInitResult, VideoInpu

pub mod mp4_file_reader;

#[derive(Debug, Clone)]
pub struct Mp4Options {
pub source: Source,
}
Expand All @@ -31,6 +32,7 @@ pub(crate) enum Mp4ReaderOptions {
},
}

#[derive(Debug, Clone)]
pub enum Source {
Url(String),
File(PathBuf),
Expand Down
1 change: 1 addition & 0 deletions compositor_pipeline/src/pipeline/input/rtp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub enum RtpReceiverError {
DepayloaderError(#[from] DepayloaderNewError),
}

#[derive(Debug, Clone)]
pub struct RtpReceiverOptions {
pub port: RequestedPort,
pub transport_protocol: TransportProtocol,
Expand Down
35 changes: 9 additions & 26 deletions compositor_pipeline/src/pipeline/pipeline_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ use std::sync::{Arc, Mutex};

use compositor_render::InputId;

use crate::{error::RegisterInputError, Pipeline};
use crate::{error::RegisterInputError, queue::QueueInputOptions, Pipeline};

use super::{
input::{self, start_input_threads, InputStartResult},
Port, RegisterInputOptions,
};
use super::input::{self, InputOptionsExt};

pub struct PipelineInput {
pub input: input::Input,
Expand All @@ -20,29 +17,15 @@ pub struct PipelineInput {
pub(super) video_eos_received: Option<bool>,
}

pub(super) fn register_pipeline_input(
pub(super) fn register_pipeline_input<NewInputResult>(
pipeline: &Arc<Mutex<Pipeline>>,
input_id: InputId,
register_options: RegisterInputOptions,
) -> Result<Option<Port>, RegisterInputError> {
let RegisterInputOptions {
input_options,
queue_options,
} = register_options;
let (download_dir, output_sample_rate) = {
let guard = pipeline.lock().unwrap();
if guard.inputs.contains_key(&input_id) {
return Err(RegisterInputError::AlreadyRegistered(input_id));
}
(guard.ctx.download_dir.clone(), guard.ctx.output_sample_rate)
};
input_options: &dyn InputOptionsExt<NewInputResult>,
queue_options: QueueInputOptions,
) -> Result<NewInputResult, RegisterInputError> {
let pipeline_ctx = pipeline.lock().unwrap().ctx.clone();

let InputStartResult {
input,
receiver,
init_info,
} = start_input_threads(&input_id, input_options, &download_dir, output_sample_rate)
.map_err(|e| RegisterInputError::InputError(input_id.clone(), e))?;
let (input, receiver, input_result) = input_options.new_input(&input_id, &pipeline_ctx)?;

let (audio_eos_received, video_eos_received) = (
receiver.audio.as_ref().map(|_| false),
Expand Down Expand Up @@ -77,7 +60,7 @@ pub(super) fn register_pipeline_input(
guard.queue.add_input(&input_id, receiver, queue_options);
guard.renderer.register_input(input_id);

Ok(init_info.port)
Ok(input_result)
}

impl PipelineInput {
Expand Down
Loading

0 comments on commit f326ea5

Please sign in to comment.