From a84ce8fbcff2fbae04db27c8da40d57e15dbf993 Mon Sep 17 00:00:00 2001 From: Wojciech Kozyra Date: Tue, 1 Oct 2024 11:48:13 +0200 Subject: [PATCH] Add pipline scoped event emitter (#800) --- compositor_pipeline/src/event.rs | 39 ++++++++++++++++++- compositor_pipeline/src/pipeline.rs | 11 +++++- compositor_pipeline/src/pipeline/output.rs | 11 ++---- .../src/pipeline/output/mp4.rs | 12 +++--- .../src/pipeline/output/rtp.rs | 10 +++-- compositor_pipeline/src/queue.rs | 7 ++-- compositor_pipeline/src/queue/audio_queue.rs | 22 ++++++++--- compositor_pipeline/src/queue/utils.rs | 22 ++++++++--- compositor_pipeline/src/queue/video_queue.rs | 17 ++++++-- compositor_render/src/event_handler.rs | 32 +++++++++------ 10 files changed, 136 insertions(+), 47 deletions(-) diff --git a/compositor_pipeline/src/event.rs b/compositor_pipeline/src/event.rs index 0c9016612..58fc4f002 100644 --- a/compositor_pipeline/src/event.rs +++ b/compositor_pipeline/src/event.rs @@ -1,6 +1,13 @@ -use compositor_render::{event_handler, InputId, OutputId}; +use std::fmt::Debug; -pub(crate) enum Event { +use compositor_render::{ + event_handler::{self, emit_event, Emitter}, + InputId, OutputId, +}; +use crossbeam_channel::Receiver; + +#[derive(Debug, Clone)] +pub enum Event { AudioInputStreamDelivered(InputId), VideoInputStreamDelivered(InputId), AudioInputStreamPlaying(InputId), @@ -37,3 +44,31 @@ impl From for event_handler::Event { } } } + +pub struct EventEmitter { + emitter: Emitter, +} + +impl Debug for EventEmitter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("EventEmitter").finish() + } +} + +impl EventEmitter { + pub(super) fn new() -> Self { + Self { + emitter: Emitter::new(), + } + } + + pub(super) fn emit(&self, event: Event) { + self.emitter.send_event(event.clone()); + // emit global event + emit_event(event) + } + + pub(super) fn subscribe(&self) -> Receiver { + self.emitter.subscribe() + } +} diff --git a/compositor_pipeline/src/pipeline.rs b/compositor_pipeline/src/pipeline.rs index e07ae2297..34da59cb9 100644 --- a/compositor_pipeline/src/pipeline.rs +++ b/compositor_pipeline/src/pipeline.rs @@ -36,6 +36,8 @@ use crate::error::{ RegisterInputError, RegisterOutputError, UnregisterInputError, UnregisterOutputError, }; +use crate::event::Event; +use crate::event::EventEmitter; use crate::pipeline::pipeline_output::OutputSender; use crate::queue::PipelineEvent; use crate::queue::QueueAudioOutput; @@ -125,6 +127,7 @@ pub struct PipelineCtx { pub output_sample_rate: u32, pub output_framerate: Framerate, pub download_dir: Arc, + pub event_emitter: Arc, } impl Pipeline { @@ -144,10 +147,11 @@ impl Pipeline { .join(format!("live-compositor-{}", rand::random::())); std::fs::create_dir_all(&download_dir).map_err(InitPipelineError::CreateDownloadDir)?; + let event_emitter = Arc::new(EventEmitter::new()); let pipeline = Pipeline { outputs: HashMap::new(), inputs: HashMap::new(), - queue: Queue::new(opts.queue_options), + queue: Queue::new(opts.queue_options, &event_emitter), renderer, audio_mixer: AudioMixer::new(opts.output_sample_rate), is_started: false, @@ -155,6 +159,7 @@ impl Pipeline { output_sample_rate: opts.output_sample_rate, output_framerate: opts.queue_options.output_framerate, download_dir: download_dir.into(), + event_emitter, }, }; @@ -165,6 +170,10 @@ impl Pipeline { &self.queue } + pub fn subscribe_pipeline_events(&self) -> Receiver { + self.ctx.event_emitter.subscribe() + } + pub fn register_input( pipeline: &Arc>, input_id: InputId, diff --git a/compositor_pipeline/src/pipeline/output.rs b/compositor_pipeline/src/pipeline/output.rs index f16806217..e9e988ad0 100644 --- a/compositor_pipeline/src/pipeline/output.rs +++ b/compositor_pipeline/src/pipeline/output.rs @@ -103,19 +103,14 @@ impl OutputOptionsExt> for OutputOptions { match &self.output_protocol { OutputProtocolOptions::Rtp(rtp_options) => { let (sender, port) = - rtp::RtpSender::new(output_id, rtp_options.clone(), packets) + rtp::RtpSender::new(output_id, rtp_options.clone(), packets, ctx) .map_err(|e| RegisterOutputError::OutputError(output_id.clone(), e))?; Ok((Output::Rtp { sender, encoder }, Some(port))) } OutputProtocolOptions::Mp4(mp4_opt) => { - let writer = Mp4FileWriter::new( - output_id.clone(), - mp4_opt.clone(), - packets, - ctx.output_sample_rate, - ) - .map_err(|e| RegisterOutputError::OutputError(output_id.clone(), e))?; + let writer = Mp4FileWriter::new(output_id.clone(), mp4_opt.clone(), packets, ctx) + .map_err(|e| RegisterOutputError::OutputError(output_id.clone(), e))?; Ok((Output::Mp4 { writer, encoder }, None)) } diff --git a/compositor_pipeline/src/pipeline/output/mp4.rs b/compositor_pipeline/src/pipeline/output/mp4.rs index ecfa7633e..bba8c80c1 100644 --- a/compositor_pipeline/src/pipeline/output/mp4.rs +++ b/compositor_pipeline/src/pipeline/output/mp4.rs @@ -1,6 +1,6 @@ use std::{fs, path::PathBuf, ptr, time::Duration}; -use compositor_render::{event_handler::emit_event, OutputId}; +use compositor_render::OutputId; use crossbeam_channel::Receiver; use ffmpeg_next as ffmpeg; use log::error; @@ -10,7 +10,7 @@ use crate::{ audio_mixer::AudioChannels, error::OutputInitError, event::Event, - pipeline::{EncodedChunk, EncodedChunkKind, EncoderOutputEvent, VideoCodec}, + pipeline::{EncodedChunk, EncodedChunkKind, EncoderOutputEvent, PipelineCtx, VideoCodec}, }; #[derive(Debug, Clone)] @@ -48,7 +48,7 @@ impl Mp4FileWriter { output_id: OutputId, options: Mp4OutputOptions, packets_receiver: Receiver, - sample_rate: u32, + pipeline_ctx: &PipelineCtx, ) -> Result { if options.output_path.exists() { let mut old_index = 0; @@ -75,8 +75,10 @@ impl Mp4FileWriter { }; } - let (output_ctx, video_stream, audio_stream) = init_ffmpeg_output(options, sample_rate)?; + let (output_ctx, video_stream, audio_stream) = + init_ffmpeg_output(options, pipeline_ctx.output_sample_rate)?; + let event_emitter = pipeline_ctx.event_emitter.clone(); std::thread::Builder::new() .name(format!("MP4 writer thread for output {}", output_id)) .spawn(move || { @@ -84,7 +86,7 @@ impl Mp4FileWriter { tracing::info_span!("MP4 writer", output_id = output_id.to_string()).entered(); run_ffmpeg_output_thread(output_ctx, video_stream, audio_stream, packets_receiver); - emit_event(Event::OutputDone(output_id)); + event_emitter.emit(Event::OutputDone(output_id)); debug!("Closing MP4 writer thread."); }) .unwrap(); diff --git a/compositor_pipeline/src/pipeline/output/rtp.rs b/compositor_pipeline/src/pipeline/output/rtp.rs index 2c07f8f2a..b898e3f45 100644 --- a/compositor_pipeline/src/pipeline/output/rtp.rs +++ b/compositor_pipeline/src/pipeline/output/rtp.rs @@ -1,4 +1,4 @@ -use compositor_render::{event_handler::emit_event, OutputId}; +use compositor_render::OutputId; use crossbeam_channel::Receiver; use std::sync::{atomic::AtomicBool, Arc}; use tracing::{debug, span, Level}; @@ -6,7 +6,9 @@ use tracing::{debug, span, Level}; use crate::{ error::OutputInitError, event::Event, - pipeline::{rtp::RequestedPort, types::EncoderOutputEvent, AudioCodec, Port, VideoCodec}, + pipeline::{ + rtp::RequestedPort, types::EncoderOutputEvent, AudioCodec, PipelineCtx, Port, VideoCodec, + }, }; use self::{packet_stream::PacketStream, payloader::Payloader}; @@ -48,6 +50,7 @@ impl RtpSender { output_id: &OutputId, options: RtpSenderOptions, packets_receiver: Receiver, + pipeline_ctx: &PipelineCtx, ) -> Result<(Self, Port), OutputInitError> { let payloader = Payloader::new(options.video, options.audio); let mtu = match options.connection_options { @@ -65,6 +68,7 @@ impl RtpSender { let connection_options = options.connection_options.clone(); let output_id = output_id.clone(); let should_close2 = should_close.clone(); + let event_emitter = pipeline_ctx.event_emitter.clone(); std::thread::Builder::new() .name(format!("RTP sender for output {}", output_id)) .spawn(move || { @@ -78,7 +82,7 @@ impl RtpSender { tcp_server::run_tcp_sender_thread(socket, should_close2, packet_stream) } } - emit_event(Event::OutputDone(output_id)); + event_emitter.emit(Event::OutputDone(output_id)); debug!("Closing RTP sender thread.") }) .unwrap(); diff --git a/compositor_pipeline/src/queue.rs b/compositor_pipeline/src/queue.rs index 04cb0d93e..78233227d 100644 --- a/compositor_pipeline/src/queue.rs +++ b/compositor_pipeline/src/queue.rs @@ -18,6 +18,7 @@ use crossbeam_channel::{bounded, Sender}; use crate::{ audio_mixer::{InputSamples, InputSamplesSet}, + event::EventEmitter, pipeline::decoder::DecodedDataReceiver, }; @@ -170,14 +171,14 @@ impl Clone for PipelineEvent { } impl Queue { - pub fn new(opts: QueueOptions) -> Arc { + pub(crate) fn new(opts: QueueOptions, event_emitter: &Arc) -> Arc { let (queue_start_sender, queue_start_receiver) = bounded(0); let (scheduled_event_sender, scheduled_event_receiver) = bounded(0); let queue = Arc::new(Queue { - video_queue: Mutex::new(VideoQueue::new()), + video_queue: Mutex::new(VideoQueue::new(event_emitter.clone())), output_framerate: opts.output_framerate, - audio_queue: Mutex::new(AudioQueue::new()), + audio_queue: Mutex::new(AudioQueue::new(event_emitter.clone())), audio_chunk_duration: DEFAULT_AUDIO_CHUNK_DURATION, scheduled_event_sender, diff --git a/compositor_pipeline/src/queue/audio_queue.rs b/compositor_pipeline/src/queue/audio_queue.rs index 94ee5694e..e3b6d2a47 100644 --- a/compositor_pipeline/src/queue/audio_queue.rs +++ b/compositor_pipeline/src/queue/audio_queue.rs @@ -1,27 +1,33 @@ use std::{ collections::{HashMap, VecDeque}, + sync::Arc, time::{Duration, Instant}, vec, }; -use crate::{audio_mixer::InputSamples, event::Event}; +use crate::{ + audio_mixer::InputSamples, + event::{Event, EventEmitter}, +}; use super::{ utils::{Clock, InputProcessor}, InputOptions, PipelineEvent, QueueAudioOutput, }; -use compositor_render::{event_handler::emit_event, InputId}; +use compositor_render::InputId; use crossbeam_channel::{Receiver, TryRecvError}; #[derive(Debug)] pub struct AudioQueue { inputs: HashMap, + event_emitter: Arc, } impl AudioQueue { - pub fn new() -> Self { + pub fn new(event_emitter: Arc) -> Self { AudioQueue { inputs: HashMap::new(), + event_emitter, } } @@ -42,11 +48,13 @@ impl AudioQueue { opts.buffer_duration, clock, input_id.clone(), + self.event_emitter.clone(), ), required: opts.required, offset: opts.offset, eos_sent: false, first_samples_sent: false, + event_emitter: self.event_emitter.clone(), }, ); } @@ -142,6 +150,8 @@ struct AudioQueueInput { eos_sent: bool, first_samples_sent: bool, + + event_emitter: Arc, } impl AudioQueueInput { @@ -217,11 +227,13 @@ impl AudioQueueInput { && !self.eos_sent { self.eos_sent = true; - emit_event(Event::AudioInputStreamEos(self.input_id.clone())); + self.event_emitter + .emit(Event::AudioInputStreamEos(self.input_id.clone())); PipelineEvent::EOS } else { if !self.first_samples_sent && !popped_samples.is_empty() { - emit_event(Event::AudioInputStreamPlaying(self.input_id.clone())); + self.event_emitter + .emit(Event::AudioInputStreamPlaying(self.input_id.clone())); self.first_samples_sent = true } PipelineEvent::Data(popped_samples) diff --git a/compositor_pipeline/src/queue/utils.rs b/compositor_pipeline/src/queue/utils.rs index 63f27e851..70794b1f8 100644 --- a/compositor_pipeline/src/queue/utils.rs +++ b/compositor_pipeline/src/queue/utils.rs @@ -8,11 +8,11 @@ use std::{ time::{Duration, Instant}, }; -use compositor_render::{event_handler::emit_event, Frame, InputId}; +use compositor_render::{Frame, InputId}; use tracing::warn; -use crate::audio_mixer::InputSamples; use crate::event::Event; +use crate::{audio_mixer::InputSamples, event::EventEmitter}; use super::PipelineEvent; @@ -42,6 +42,8 @@ pub(super) struct InputProcessor { state: InputState, clock: Clock, + + event_emitter: Arc, } #[derive(Debug)] @@ -59,13 +61,19 @@ pub(super) enum InputState { } impl InputProcessor { - pub(super) fn new(buffer_duration: Duration, clock: Clock, input_id: InputId) -> Self { + pub(super) fn new( + buffer_duration: Duration, + clock: Clock, + input_id: InputId, + event_emitter: Arc, + ) -> Self { Self { buffer_duration, start_time: None, state: InputState::WaitingForStart, clock, input_id, + event_emitter, } } @@ -163,8 +171,12 @@ impl InputProcessor { fn on_ready(&self) { match Payload::media_type() { - MediaType::Audio => emit_event(Event::AudioInputStreamDelivered(self.input_id.clone())), - MediaType::Video => emit_event(Event::VideoInputStreamDelivered(self.input_id.clone())), + MediaType::Audio => self + .event_emitter + .emit(Event::AudioInputStreamDelivered(self.input_id.clone())), + MediaType::Video => self + .event_emitter + .emit(Event::VideoInputStreamDelivered(self.input_id.clone())), } } } diff --git a/compositor_pipeline/src/queue/video_queue.rs b/compositor_pipeline/src/queue/video_queue.rs index cfc4f335f..fffcfb0c2 100644 --- a/compositor_pipeline/src/queue/video_queue.rs +++ b/compositor_pipeline/src/queue/video_queue.rs @@ -1,4 +1,3 @@ -use compositor_render::event_handler::emit_event; use compositor_render::Frame; use compositor_render::InputId; use crossbeam_channel::Receiver; @@ -6,10 +5,12 @@ use crossbeam_channel::TryRecvError; use std::collections::HashMap; use std::collections::VecDeque; +use std::sync::Arc; use std::time::Duration; use std::time::Instant; use crate::event::Event; +use crate::event::EventEmitter; use super::utils::Clock; use super::utils::InputProcessor; @@ -19,12 +20,14 @@ use super::QueueVideoOutput; pub struct VideoQueue { inputs: HashMap, + event_emitter: Arc, } impl VideoQueue { - pub fn new() -> Self { + pub fn new(event_emitter: Arc) -> Self { VideoQueue { inputs: HashMap::new(), + event_emitter, } } @@ -45,11 +48,13 @@ impl VideoQueue { opts.buffer_duration, clock, input_id.clone(), + self.event_emitter.clone(), ), required: opts.required, offset: opts.offset, eos_sent: false, first_frame_sent: false, + event_emitter: self.event_emitter.clone(), }, ); } @@ -148,6 +153,8 @@ pub struct VideoQueueInput { eos_sent: bool, first_frame_sent: bool, + + event_emitter: Arc, } impl VideoQueueInput { @@ -185,11 +192,13 @@ impl VideoQueueInput { if self.input_frames_processor.did_receive_eos() && frame.is_none() && !self.eos_sent { self.eos_sent = true; - emit_event(Event::VideoInputStreamEos(self.input_id.clone())); + self.event_emitter + .emit(Event::VideoInputStreamEos(self.input_id.clone())); Some(PipelineEvent::EOS) } else { if !self.first_frame_sent && frame.is_some() { - emit_event(Event::VideoInputStreamPlaying(self.input_id.clone())); + self.event_emitter + .emit(Event::VideoInputStreamPlaying(self.input_id.clone())); self.first_frame_sent = true } frame.map(PipelineEvent::Data) diff --git a/compositor_render/src/event_handler.rs b/compositor_render/src/event_handler.rs index dc788d496..5bf8ab315 100644 --- a/compositor_render/src/event_handler.rs +++ b/compositor_render/src/event_handler.rs @@ -14,26 +14,30 @@ pub struct Event { } pub fn emit_event>(event: T) { - Emitter::instance().send_event(event) + global_instance().send_event(event) } pub fn subscribe() -> Receiver { - Emitter::instance().subscribe() + global_instance().subscribe() } -struct Emitter { - subscribers: RwLock>>, +pub struct Emitter { + subscribers: RwLock>>, } -impl Emitter { - fn instance() -> &'static Self { - static EMITTER: OnceLock = OnceLock::new(); - EMITTER.get_or_init(|| Self { +fn global_instance() -> &'static Emitter { + static EMITTER: OnceLock> = OnceLock::new(); + EMITTER.get_or_init(Emitter::new) +} + +impl Emitter { + pub fn new() -> Self { + Self { subscribers: vec![].into(), - }) + } } - fn send_event>(&self, event: T) { + pub fn send_event>(&self, event: T) { let event = event.into(); let mut disconnected_subscriber_indexes = HashSet::new(); for (index, subscriber) in self.subscribers.read().unwrap().iter().enumerate() { @@ -47,7 +51,7 @@ impl Emitter { } } - fn subscribe(&self) -> Receiver { + pub fn subscribe(&self) -> Receiver { let (sender, receiver) = unbounded(); self.subscribers.write().unwrap().push(sender); receiver @@ -68,3 +72,9 @@ impl Emitter { .collect() } } + +impl Default for Emitter { + fn default() -> Self { + Self::new() + } +}