From 8d01d7297b136161bcf2c15eb9f07497e09cb219 Mon Sep 17 00:00:00 2001 From: noituri Date: Sat, 19 Oct 2024 14:48:35 +0200 Subject: [PATCH] Handle it better --- compositor_pipeline/src/pipeline.rs | 14 ++++++--- compositor_pipeline/src/pipeline/encoder.rs | 13 -------- .../src/pipeline/encoder/fdk_aac.rs | 13 -------- .../src/pipeline/encoder/ffmpeg_h264.rs | 18 +---------- .../src/pipeline/encoder/opus.rs | 14 --------- compositor_pipeline/src/pipeline/output.rs | 30 ------------------- .../src/pipeline/pipeline_output.rs | 4 +++ 7 files changed, 15 insertions(+), 91 deletions(-) diff --git a/compositor_pipeline/src/pipeline.rs b/compositor_pipeline/src/pipeline.rs index 61cbeac1d..ae07a6c81 100644 --- a/compositor_pipeline/src/pipeline.rs +++ b/compositor_pipeline/src/pipeline.rs @@ -379,9 +379,12 @@ impl Pipeline { .get(&output_id) .ok_or_else(|| UpdateSceneError::OutputNotRegistered(output_id.clone()))?; - if output.output.is_video_finished() { - return Err(UpdateSceneError::UpdateAfterEOS(output_id)); + if let Some(cond) = &output.video_end_condition { + if cond.did_output_end() { + return Err(UpdateSceneError::UpdateAfterEOS(output_id.clone())); + } } + let (Some(resolution), Some(frame_format)) = ( output.output.resolution(), output.output.output_frame_format(), @@ -404,8 +407,11 @@ impl Pipeline { .outputs .get(output_id) .ok_or_else(|| UpdateSceneError::OutputNotRegistered(output_id.clone()))?; - if output.output.is_audio_finished() { - return Err(UpdateSceneError::UpdateAfterEOS(output_id.clone())); + + if let Some(cond) = &output.audio_end_condition { + if cond.did_output_end() { + return Err(UpdateSceneError::UpdateAfterEOS(output_id.clone())); + } } info!(?output_id, "Update audio mixer {:#?}", audio); diff --git a/compositor_pipeline/src/pipeline/encoder.rs b/compositor_pipeline/src/pipeline/encoder.rs index 34485cc44..ba81efa04 100644 --- a/compositor_pipeline/src/pipeline/encoder.rs +++ b/compositor_pipeline/src/pipeline/encoder.rs @@ -143,12 +143,6 @@ impl VideoEncoder { Self::H264(encoder) => encoder.request_keyframe(), } } - - pub fn is_finished(&self) -> bool { - match self { - Self::H264(encoder) => encoder.is_finished(), - } - } } impl AudioEncoder { @@ -174,13 +168,6 @@ impl AudioEncoder { Self::Aac(encoder) => encoder.samples_batch_sender(), } } - - pub fn is_finished(&self) -> bool { - match self { - AudioEncoder::Opus(encoder) => encoder.is_finished(), - AudioEncoder::Aac(encoder) => encoder.is_finished(), - } - } } impl AudioEncoderOptions { diff --git a/compositor_pipeline/src/pipeline/encoder/fdk_aac.rs b/compositor_pipeline/src/pipeline/encoder/fdk_aac.rs index 0ed8bea4d..a3e25cc8c 100644 --- a/compositor_pipeline/src/pipeline/encoder/fdk_aac.rs +++ b/compositor_pipeline/src/pipeline/encoder/fdk_aac.rs @@ -2,10 +2,6 @@ use std::{ mem::{self, MaybeUninit}, os::raw::{c_int, c_void}, ptr, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, time::Duration, }; @@ -27,7 +23,6 @@ use crate::{ /// https://github.com/mstorsjo/fdk-aac/blob/master/documentation/aacEncoder.pdf pub struct AacEncoder { samples_batch_sender: Sender>, - is_finished: Arc, } #[derive(Debug, Clone)] @@ -43,12 +38,10 @@ impl AacEncoder { packets_sender: Sender, ) -> Result { let (samples_batch_sender, samples_batch_receiver) = bounded(5); - let is_finished = Arc::new(AtomicBool::new(false)); // Since AAC encoder holds ref to internal structure (handler), it's unsafe to send it between threads. let (init_result_sender, init_result_receiver) = bounded(0); let output_id = output_id.to_string(); - let is_finished_clone = is_finished.clone(); std::thread::Builder::new() .name("AAC encoder thread".to_string()) @@ -62,7 +55,6 @@ impl AacEncoder { samples_batch_receiver, packets_sender, ); - is_finished_clone.store(true, Ordering::Relaxed); debug!("Closing AAC encoder thread."); }) .unwrap(); @@ -74,17 +66,12 @@ impl AacEncoder { Ok(Self { samples_batch_sender, - is_finished, }) } pub fn samples_batch_sender(&self) -> &Sender> { &self.samples_batch_sender } - - pub fn is_finished(&self) -> bool { - self.is_finished.load(Ordering::Relaxed) - } } struct AacEncoderInner { diff --git a/compositor_pipeline/src/pipeline/encoder/ffmpeg_h264.rs b/compositor_pipeline/src/pipeline/encoder/ffmpeg_h264.rs index 6f9ad5a0a..4f0f46264 100644 --- a/compositor_pipeline/src/pipeline/encoder/ffmpeg_h264.rs +++ b/compositor_pipeline/src/pipeline/encoder/ffmpeg_h264.rs @@ -1,10 +1,4 @@ -use std::{ - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - time::Duration, -}; +use std::time::Duration; use compositor_render::{Frame, FrameData, OutputId, Resolution}; use crossbeam_channel::{Receiver, Sender}; @@ -95,7 +89,6 @@ pub struct LibavH264Encoder { resolution: Resolution, frame_sender: Sender>, keyframe_req_sender: Sender<()>, - is_finished: Arc, } impl LibavH264Encoder { @@ -107,11 +100,9 @@ impl LibavH264Encoder { let (frame_sender, frame_receiver) = crossbeam_channel::bounded(5); let (result_sender, result_receiver) = crossbeam_channel::bounded(0); let (keyframe_req_sender, keyframe_req_receiver) = crossbeam_channel::unbounded(); - let is_finished = Arc::new(AtomicBool::new(false)); let options_clone = options.clone(); let output_id = output_id.clone(); - let is_finished_clone = is_finished.clone(); std::thread::Builder::new() .name(format!("Encoder thread for output {}", output_id)) @@ -130,8 +121,6 @@ impl LibavH264Encoder { &result_sender, ); - is_finished_clone.store(true, Ordering::Relaxed); - if let Err(err) = encoder_result { warn!(%err, "Encoder thread finished with an error."); if let Err(err) = result_sender.send(Err(err)) { @@ -148,7 +137,6 @@ impl LibavH264Encoder { frame_sender, resolution: options.resolution, keyframe_req_sender, - is_finished, }) } @@ -165,10 +153,6 @@ impl LibavH264Encoder { debug!(%err, "Failed to send keyframe request to the encoder."); } } - - pub fn is_finished(&self) -> bool { - self.is_finished.load(Ordering::Relaxed) - } } fn run_encoder_thread( diff --git a/compositor_pipeline/src/pipeline/encoder/opus.rs b/compositor_pipeline/src/pipeline/encoder/opus.rs index fc4b31a67..7960e9623 100644 --- a/compositor_pipeline/src/pipeline/encoder/opus.rs +++ b/compositor_pipeline/src/pipeline/encoder/opus.rs @@ -1,8 +1,3 @@ -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, -}; - use crossbeam_channel::{bounded, Receiver, Sender}; use log::error; use tracing::{span, trace, warn, Level}; @@ -27,7 +22,6 @@ pub struct OpusEncoderOptions { pub struct OpusEncoder { samples_batch_sender: Sender>, - is_finished: Arc, } impl OpusEncoder { @@ -37,34 +31,26 @@ impl OpusEncoder { packets_sender: Sender, ) -> Result { let (samples_batch_sender, samples_batch_receiver) = bounded(2); - let is_finished = Arc::new(AtomicBool::new(false)); let encoder = opus::Encoder::new(sample_rate, options.channels.into(), options.preset.into())?; - let is_finished_clone = is_finished.clone(); std::thread::Builder::new() .name("Opus encoder thread".to_string()) .spawn(move || { let _span = span!(Level::INFO, "Opus encoder thread",).entered(); run_encoder_thread(encoder, samples_batch_receiver, packets_sender); - is_finished_clone.store(true, Ordering::Relaxed); }) .unwrap(); Ok(Self { samples_batch_sender, - is_finished, }) } pub fn samples_batch_sender(&self) -> &Sender> { &self.samples_batch_sender } - - pub fn is_finished(&self) -> bool { - self.is_finished.load(Ordering::Relaxed) - } } fn run_encoder_thread( diff --git a/compositor_pipeline/src/pipeline/output.rs b/compositor_pipeline/src/pipeline/output.rs index 910039e43..e9e988ad0 100644 --- a/compositor_pipeline/src/pipeline/output.rs +++ b/compositor_pipeline/src/pipeline/output.rs @@ -215,36 +215,6 @@ impl Output { Ok(()) } - pub fn is_video_finished(&self) -> bool { - let encoder = match self { - Output::Rtp { encoder, .. } => encoder, - Output::Mp4 { encoder, .. } => encoder, - Output::EncodedData { encoder, .. } => encoder, - Output::RawData { .. } => return false, - }; - - encoder - .video - .as_ref() - .map(|v| v.is_finished()) - .unwrap_or(true) - } - - pub fn is_audio_finished(&self) -> bool { - let encoder = match self { - Output::Rtp { encoder, .. } => encoder, - Output::Mp4 { encoder, .. } => encoder, - Output::EncodedData { encoder, .. } => encoder, - Output::RawData { .. } => return false, - }; - - encoder - .audio - .as_ref() - .map(|a| a.is_finished()) - .unwrap_or(true) - } - pub(super) fn output_frame_format(&self) -> Option { match &self { Output::Rtp { encoder, .. } => encoder diff --git a/compositor_pipeline/src/pipeline/pipeline_output.rs b/compositor_pipeline/src/pipeline/pipeline_output.rs index eb9b79aaa..cfb695886 100644 --- a/compositor_pipeline/src/pipeline/pipeline_output.rs +++ b/compositor_pipeline/src/pipeline/pipeline_output.rs @@ -234,6 +234,10 @@ impl PipelineOutputEndConditionState { EosStatus::None } + pub(super) fn did_output_end(&self) -> bool { + self.did_end + } + pub(super) fn on_input_registered(&mut self, input_id: &InputId) { self.on_event(StateChange::AddInput(input_id)) }