Skip to content

Commit

Permalink
Add pipline scoped event emitter (#800)
Browse files Browse the repository at this point in the history
  • Loading branch information
wkozyra95 authored Oct 1, 2024
1 parent 37657af commit a84ce8f
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 47 deletions.
39 changes: 37 additions & 2 deletions compositor_pipeline/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
use compositor_render::{event_handler, InputId, OutputId};
use std::fmt::Debug;

pub(crate) enum Event {
use compositor_render::{
event_handler::{self, emit_event, Emitter},
InputId, OutputId,
};
use crossbeam_channel::Receiver;

#[derive(Debug, Clone)]
pub enum Event {
AudioInputStreamDelivered(InputId),
VideoInputStreamDelivered(InputId),
AudioInputStreamPlaying(InputId),
Expand Down Expand Up @@ -37,3 +44,31 @@ impl From<Event> for event_handler::Event {
}
}
}

pub struct EventEmitter {
emitter: Emitter<Event>,
}

impl Debug for EventEmitter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventEmitter").finish()
}
}

impl EventEmitter {
pub(super) fn new() -> Self {
Self {
emitter: Emitter::new(),
}
}

pub(super) fn emit(&self, event: Event) {
self.emitter.send_event(event.clone());
// emit global event
emit_event(event)
}

pub(super) fn subscribe(&self) -> Receiver<Event> {
self.emitter.subscribe()
}
}
11 changes: 10 additions & 1 deletion compositor_pipeline/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ use crate::error::{
RegisterInputError, RegisterOutputError, UnregisterInputError, UnregisterOutputError,
};

use crate::event::Event;
use crate::event::EventEmitter;
use crate::pipeline::pipeline_output::OutputSender;
use crate::queue::PipelineEvent;
use crate::queue::QueueAudioOutput;
Expand Down Expand Up @@ -125,6 +127,7 @@ pub struct PipelineCtx {
pub output_sample_rate: u32,
pub output_framerate: Framerate,
pub download_dir: Arc<PathBuf>,
pub event_emitter: Arc<EventEmitter>,
}

impl Pipeline {
Expand All @@ -144,17 +147,19 @@ impl Pipeline {
.join(format!("live-compositor-{}", rand::random::<u64>()));
std::fs::create_dir_all(&download_dir).map_err(InitPipelineError::CreateDownloadDir)?;

let event_emitter = Arc::new(EventEmitter::new());
let pipeline = Pipeline {
outputs: HashMap::new(),
inputs: HashMap::new(),
queue: Queue::new(opts.queue_options),
queue: Queue::new(opts.queue_options, &event_emitter),
renderer,
audio_mixer: AudioMixer::new(opts.output_sample_rate),
is_started: false,
ctx: PipelineCtx {
output_sample_rate: opts.output_sample_rate,
output_framerate: opts.queue_options.output_framerate,
download_dir: download_dir.into(),
event_emitter,
},
};

Expand All @@ -165,6 +170,10 @@ impl Pipeline {
&self.queue
}

pub fn subscribe_pipeline_events(&self) -> Receiver<Event> {
self.ctx.event_emitter.subscribe()
}

pub fn register_input(
pipeline: &Arc<Mutex<Self>>,
input_id: InputId,
Expand Down
11 changes: 3 additions & 8 deletions compositor_pipeline/src/pipeline/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,14 @@ impl OutputOptionsExt<Option<Port>> for OutputOptions {
match &self.output_protocol {
OutputProtocolOptions::Rtp(rtp_options) => {
let (sender, port) =
rtp::RtpSender::new(output_id, rtp_options.clone(), packets)
rtp::RtpSender::new(output_id, rtp_options.clone(), packets, ctx)
.map_err(|e| RegisterOutputError::OutputError(output_id.clone(), e))?;

Ok((Output::Rtp { sender, encoder }, Some(port)))
}
OutputProtocolOptions::Mp4(mp4_opt) => {
let writer = Mp4FileWriter::new(
output_id.clone(),
mp4_opt.clone(),
packets,
ctx.output_sample_rate,
)
.map_err(|e| RegisterOutputError::OutputError(output_id.clone(), e))?;
let writer = Mp4FileWriter::new(output_id.clone(), mp4_opt.clone(), packets, ctx)
.map_err(|e| RegisterOutputError::OutputError(output_id.clone(), e))?;

Ok((Output::Mp4 { writer, encoder }, None))
}
Expand Down
12 changes: 7 additions & 5 deletions compositor_pipeline/src/pipeline/output/mp4.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{fs, path::PathBuf, ptr, time::Duration};

use compositor_render::{event_handler::emit_event, OutputId};
use compositor_render::OutputId;
use crossbeam_channel::Receiver;
use ffmpeg_next as ffmpeg;
use log::error;
Expand All @@ -10,7 +10,7 @@ use crate::{
audio_mixer::AudioChannels,
error::OutputInitError,
event::Event,
pipeline::{EncodedChunk, EncodedChunkKind, EncoderOutputEvent, VideoCodec},
pipeline::{EncodedChunk, EncodedChunkKind, EncoderOutputEvent, PipelineCtx, VideoCodec},
};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -48,7 +48,7 @@ impl Mp4FileWriter {
output_id: OutputId,
options: Mp4OutputOptions,
packets_receiver: Receiver<EncoderOutputEvent>,
sample_rate: u32,
pipeline_ctx: &PipelineCtx,
) -> Result<Self, OutputInitError> {
if options.output_path.exists() {
let mut old_index = 0;
Expand All @@ -75,16 +75,18 @@ impl Mp4FileWriter {
};
}

let (output_ctx, video_stream, audio_stream) = init_ffmpeg_output(options, sample_rate)?;
let (output_ctx, video_stream, audio_stream) =
init_ffmpeg_output(options, pipeline_ctx.output_sample_rate)?;

let event_emitter = pipeline_ctx.event_emitter.clone();
std::thread::Builder::new()
.name(format!("MP4 writer thread for output {}", output_id))
.spawn(move || {
let _span =
tracing::info_span!("MP4 writer", output_id = output_id.to_string()).entered();

run_ffmpeg_output_thread(output_ctx, video_stream, audio_stream, packets_receiver);
emit_event(Event::OutputDone(output_id));
event_emitter.emit(Event::OutputDone(output_id));
debug!("Closing MP4 writer thread.");
})
.unwrap();
Expand Down
10 changes: 7 additions & 3 deletions compositor_pipeline/src/pipeline/output/rtp.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use compositor_render::{event_handler::emit_event, OutputId};
use compositor_render::OutputId;
use crossbeam_channel::Receiver;
use std::sync::{atomic::AtomicBool, Arc};
use tracing::{debug, span, Level};

use crate::{
error::OutputInitError,
event::Event,
pipeline::{rtp::RequestedPort, types::EncoderOutputEvent, AudioCodec, Port, VideoCodec},
pipeline::{
rtp::RequestedPort, types::EncoderOutputEvent, AudioCodec, PipelineCtx, Port, VideoCodec,
},
};

use self::{packet_stream::PacketStream, payloader::Payloader};
Expand Down Expand Up @@ -48,6 +50,7 @@ impl RtpSender {
output_id: &OutputId,
options: RtpSenderOptions,
packets_receiver: Receiver<EncoderOutputEvent>,
pipeline_ctx: &PipelineCtx,
) -> Result<(Self, Port), OutputInitError> {
let payloader = Payloader::new(options.video, options.audio);
let mtu = match options.connection_options {
Expand All @@ -65,6 +68,7 @@ impl RtpSender {
let connection_options = options.connection_options.clone();
let output_id = output_id.clone();
let should_close2 = should_close.clone();
let event_emitter = pipeline_ctx.event_emitter.clone();
std::thread::Builder::new()
.name(format!("RTP sender for output {}", output_id))
.spawn(move || {
Expand All @@ -78,7 +82,7 @@ impl RtpSender {
tcp_server::run_tcp_sender_thread(socket, should_close2, packet_stream)
}
}
emit_event(Event::OutputDone(output_id));
event_emitter.emit(Event::OutputDone(output_id));
debug!("Closing RTP sender thread.")
})
.unwrap();
Expand Down
7 changes: 4 additions & 3 deletions compositor_pipeline/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crossbeam_channel::{bounded, Sender};

use crate::{
audio_mixer::{InputSamples, InputSamplesSet},
event::EventEmitter,
pipeline::decoder::DecodedDataReceiver,
};

Expand Down Expand Up @@ -170,14 +171,14 @@ impl<T: Clone> Clone for PipelineEvent<T> {
}

impl Queue {
pub fn new(opts: QueueOptions) -> Arc<Self> {
pub(crate) fn new(opts: QueueOptions, event_emitter: &Arc<EventEmitter>) -> Arc<Self> {
let (queue_start_sender, queue_start_receiver) = bounded(0);
let (scheduled_event_sender, scheduled_event_receiver) = bounded(0);
let queue = Arc::new(Queue {
video_queue: Mutex::new(VideoQueue::new()),
video_queue: Mutex::new(VideoQueue::new(event_emitter.clone())),
output_framerate: opts.output_framerate,

audio_queue: Mutex::new(AudioQueue::new()),
audio_queue: Mutex::new(AudioQueue::new(event_emitter.clone())),
audio_chunk_duration: DEFAULT_AUDIO_CHUNK_DURATION,

scheduled_event_sender,
Expand Down
22 changes: 17 additions & 5 deletions compositor_pipeline/src/queue/audio_queue.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,33 @@
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
time::{Duration, Instant},
vec,
};

use crate::{audio_mixer::InputSamples, event::Event};
use crate::{
audio_mixer::InputSamples,
event::{Event, EventEmitter},
};

use super::{
utils::{Clock, InputProcessor},
InputOptions, PipelineEvent, QueueAudioOutput,
};
use compositor_render::{event_handler::emit_event, InputId};
use compositor_render::InputId;
use crossbeam_channel::{Receiver, TryRecvError};

#[derive(Debug)]
pub struct AudioQueue {
inputs: HashMap<InputId, AudioQueueInput>,
event_emitter: Arc<EventEmitter>,
}

impl AudioQueue {
pub fn new() -> Self {
pub fn new(event_emitter: Arc<EventEmitter>) -> Self {
AudioQueue {
inputs: HashMap::new(),
event_emitter,
}
}

Expand All @@ -42,11 +48,13 @@ impl AudioQueue {
opts.buffer_duration,
clock,
input_id.clone(),
self.event_emitter.clone(),
),
required: opts.required,
offset: opts.offset,
eos_sent: false,
first_samples_sent: false,
event_emitter: self.event_emitter.clone(),
},
);
}
Expand Down Expand Up @@ -142,6 +150,8 @@ struct AudioQueueInput {

eos_sent: bool,
first_samples_sent: bool,

event_emitter: Arc<EventEmitter>,
}

impl AudioQueueInput {
Expand Down Expand Up @@ -217,11 +227,13 @@ impl AudioQueueInput {
&& !self.eos_sent
{
self.eos_sent = true;
emit_event(Event::AudioInputStreamEos(self.input_id.clone()));
self.event_emitter
.emit(Event::AudioInputStreamEos(self.input_id.clone()));
PipelineEvent::EOS
} else {
if !self.first_samples_sent && !popped_samples.is_empty() {
emit_event(Event::AudioInputStreamPlaying(self.input_id.clone()));
self.event_emitter
.emit(Event::AudioInputStreamPlaying(self.input_id.clone()));
self.first_samples_sent = true
}
PipelineEvent::Data(popped_samples)
Expand Down
22 changes: 17 additions & 5 deletions compositor_pipeline/src/queue/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use std::{
time::{Duration, Instant},
};

use compositor_render::{event_handler::emit_event, Frame, InputId};
use compositor_render::{Frame, InputId};
use tracing::warn;

use crate::audio_mixer::InputSamples;
use crate::event::Event;
use crate::{audio_mixer::InputSamples, event::EventEmitter};

use super::PipelineEvent;

Expand Down Expand Up @@ -42,6 +42,8 @@ pub(super) struct InputProcessor<Payload: InputProcessorMediaExt> {
state: InputState<Payload>,

clock: Clock,

event_emitter: Arc<EventEmitter>,
}

#[derive(Debug)]
Expand All @@ -59,13 +61,19 @@ pub(super) enum InputState<Payload: InputProcessorMediaExt> {
}

impl<Payload: InputProcessorMediaExt> InputProcessor<Payload> {
pub(super) fn new(buffer_duration: Duration, clock: Clock, input_id: InputId) -> Self {
pub(super) fn new(
buffer_duration: Duration,
clock: Clock,
input_id: InputId,
event_emitter: Arc<EventEmitter>,
) -> Self {
Self {
buffer_duration,
start_time: None,
state: InputState::WaitingForStart,
clock,
input_id,
event_emitter,
}
}

Expand Down Expand Up @@ -163,8 +171,12 @@ impl<Payload: InputProcessorMediaExt> InputProcessor<Payload> {

fn on_ready(&self) {
match Payload::media_type() {
MediaType::Audio => emit_event(Event::AudioInputStreamDelivered(self.input_id.clone())),
MediaType::Video => emit_event(Event::VideoInputStreamDelivered(self.input_id.clone())),
MediaType::Audio => self
.event_emitter
.emit(Event::AudioInputStreamDelivered(self.input_id.clone())),
MediaType::Video => self
.event_emitter
.emit(Event::VideoInputStreamDelivered(self.input_id.clone())),
}
}
}
Expand Down
Loading

0 comments on commit a84ce8f

Please sign in to comment.