From 8b3e514563a59b71c7cddc4ef7483fd32ea99032 Mon Sep 17 00:00:00 2001 From: allow Date: Fri, 10 May 2024 23:19:48 -0500 Subject: [PATCH] fixes #158 for good --- crates/mockingbird/src/deemix.rs | 144 +++++++++---------------------- crates/mockingbird/src/player.rs | 83 ++++++++++-------- 2 files changed, 89 insertions(+), 138 deletions(-) diff --git a/crates/mockingbird/src/deemix.rs b/crates/mockingbird/src/deemix.rs index d3b5410..80b6644 100644 --- a/crates/mockingbird/src/deemix.rs +++ b/crates/mockingbird/src/deemix.rs @@ -273,27 +273,7 @@ async fn _deemix_stream(uri: &str, pipesize: i32) -> Result<(std::process::Child Ok((deemix, metadata_from_deemix_output(&metadata_raw))) } -fn _balloon_loader(proc: &mut std::process::Child, pipesize: i32) -> Result { - let balloon = std::process::Command::new("balloon") - .stdin( - proc.stdout.take() - .ok_or(SongbirdError::Stdout)? - ) - .stderr(Stdio::null()) - .stdout(Stdio::piped()) - .spawn() - .expect("Failed to start child process"); - - let balloon_ptr = balloon.stdout.as_ref() - .ok_or(SongbirdError::Stdout)? - .as_raw_fd(); - - unsafe { bigpipe(balloon_ptr, pipesize); } - - Ok(balloon) -} - -fn _ffmpeg(proc: &mut std::process::Child, pre_args: &[&str], pipesize: i32) -> Result { +fn _ffmpeg(proc: &mut std::process::Child, pre_args: &[&str], pipesize: i32) -> Result { let ffmpeg_args = [ "-f", "s16le", @@ -330,104 +310,64 @@ fn _ffmpeg(proc: &mut std::process::Child, pre_args: &[&str], pipesize: i32) -> Ok(ffmpeg) } -pub struct PreloadInput { - ffmpeg_stdout: RawFd, - pub children: PreloadChildContainer, - pub metadata: Option, - balloon: bool, -} - -pub async fn _deemix_preload( - uri: &str, - pre_args: &[&str], - balloon: bool, - pipesize: i32 -) -> Result -{ - let mut children = Vec::with_capacity(3); - - tracing::info!("Running: deemix-stream {} {}", pre_args.join(" "), uri); - let (mut deemix, metadata) = _deemix_stream(uri, pipesize).await?; - - let mut balloon_proc = if balloon { - tracing::info!("running balloon"); - Some(_balloon_loader(&mut deemix, pipesize)?) - } else { None }; - - let output = balloon_proc.as_mut() - .unwrap_or(&mut deemix); - - let ffmpeg = _ffmpeg(output, pre_args, pipesize)?; - - children.push(deemix); - - if let Some(balloon) = balloon_proc { - children.push(balloon); - } - - children.push(ffmpeg); - - let stdout_fd = children.last().unwrap().stdout.as_ref() - .ok_or(SongbirdError::Stdout)? - .as_raw_fd(); - - return Ok(PreloadInput { - balloon, - ffmpeg_stdout: stdout_fd, - children: PreloadChildContainer::new(children), - metadata: Some(metadata), - }) -} - pub async fn _deemix( uri: &str, pre_args: &[&str], - balloon: bool, + wait: bool, ) -> Result<(Input, Option), DeemixError> { - let pipesize = max_pipe_size().await.unwrap(); - - let mut preload_input = _deemix_preload(uri, pre_args, balloon, pipesize).await?; - let now = std::time::Instant::now(); - let pipe_threshold = std::env::var("MKBIRD_PIPE_THRESHOLD") .unwrap_or_else(|_| "0.8".to_string()) .parse::() .unwrap_or(0.8); - loop { - let avail = unsafe { availbytes(preload_input.ffmpeg_stdout) }; - let mut percentage = 0.0; - if 0 > avail { - break - } - if avail > 0 { - percentage = pipesize as f32 / avail as f32; - } + let pipesize = max_pipe_size().await.unwrap(); - if pipe_threshold > percentage { - tokio::time::sleep(std::time::Duration::from_micros(200)).await; - tracing::debug!("availbytes: {}", avail); - tracing::debug!("pipesize: {}", pipesize); - } - else { - tracing::info!("load time: {}", now.elapsed().as_secs_f64()); - tracing::debug!("availbytes: {}", avail); - tracing::debug!("pipesize: {}", pipesize); - break + tracing::info!("Running: deemix-stream {} {}", pre_args.join(" "), uri); + let (mut deemix, metadata) = _deemix_stream(uri, pipesize).await?; + + let ffmpeg = _ffmpeg(&mut deemix, pre_args, pipesize)?; + let stdout_fd = ffmpeg.stdout.as_ref() + .ok_or(SongbirdError::Stdout)? + .as_raw_fd(); + + if wait { + let now = std::time::Instant::now(); + loop { + let avail = unsafe { availbytes(stdout_fd) }; + let mut percentage = 0.0; + if 0 > avail { + break + } + if avail > 0 { + percentage = pipesize as f32 / avail as f32; + } + + if pipe_threshold > percentage { + tokio::time::sleep(std::time::Duration::from_micros(200)).await; + tracing::debug!("availbytes: {}", avail); + tracing::debug!("pipesize: {}", pipesize); + } + else { + tracing::info!("load time: {}", now.elapsed().as_secs_f64()); + tracing::debug!("availbytes: {}", avail); + tracing::debug!("pipesize: {}", pipesize); + break + } } } Ok(( Input::new( - true, - children_to_reader::(preload_input.children.inner()), - Codec::FloatPcm, - Container::Raw, - preload_input.metadata.clone().map(|x| x.into()), - ), - preload_input.metadata.clone())) + true, + children_to_reader::(vec![deemix, ffmpeg]), + Codec::FloatPcm, + Container::Raw, + Some(metadata.clone().into()), + ), + Some(metadata.clone()) + )) } #[derive(Debug, Clone)] diff --git a/crates/mockingbird/src/player.rs b/crates/mockingbird/src/player.rs index 8f2b198..00cf4fe 100644 --- a/crates/mockingbird/src/player.rs +++ b/crates/mockingbird/src/player.rs @@ -45,10 +45,14 @@ use tokio::{ }; +use songbird::input::cached::Compressed; +use std::sync::{Mutex}; + + use cutils::{availbytes, bigpipe, max_pipe_size}; #[cfg(feature = "deemix")] -use crate::deemix::{DeemixMetadata, PreloadInput, _deemix_preload}; +use crate::deemix::{DeemixMetadata, _deemix}; #[group] #[commands(join, leave, queue, now_playing, skip, list)] @@ -64,6 +68,12 @@ const TS_PRELOAD_OFFSET: Duration = Duration::from_secs(20); const TS_ABANDONED_HB: Duration = Duration::from_secs(720); const HASPLAYED_MAX_LEN: usize = 10; +struct DeemixPreloadCache; + +impl TypeMapKey for DeemixPreloadCache { + type Value = Arc>>; +} + #[derive(Debug, PartialEq, Eq, Clone, Copy)] enum EventEnd { Skipped, @@ -93,7 +103,7 @@ struct ColdQueue { pub use_radio: bool, // urls pub radio_queue: VecDeque, - pub radio_next: Option, + pub radio_next: Option<(Compressed, Option)>, } pub struct QueueContext { @@ -106,6 +116,7 @@ pub struct QueueContext { manager: Arc, cold_queue: Arc>, } + #[derive(Debug, Clone)] enum MetadataType { #[cfg(feature = "deemix")] @@ -177,10 +188,15 @@ async fn preload_radio_track( }; if let Some(uri) = uri { - let pipesize = max_pipe_size().await.unwrap(); - match _deemix_preload(&uri, &[], true, pipesize).await { - Ok(preload_input) => { - cold_queue.radio_next = Some(preload_input); + match _deemix(&uri, &[], false).await { + Ok((preload_input, metadata)) => { + cold_queue.radio_next = Some((Compressed::new( + preload_input, + songbird::driver::Bitrate::BitsPerSecond(128_000) + ).unwrap(), + + metadata.map(|x| x.into()) + )); return Ok(()) } @@ -200,11 +216,12 @@ async fn preload_radio_track( async fn play_preload_radio_track( call: &mut Call, - radio_preload: PreloadInput, + radio_preload: Compressed, + metadata: Option, qctx: Arc ) { - let preload_result = Players::play_preload(call, radio_preload).await; + let preload_result = Players::play_preload(call, radio_preload.new_handle().into(), metadata).await; match preload_result { Err(why) =>{ @@ -221,7 +238,9 @@ async fn play_preload_radio_track( ).unwrap() } } + struct TrackEndLoader(Arc); + #[async_trait] impl VoiceEventHandler for TrackEndLoader { async fn act(&self, _ctx: &EventContext<'_>) -> Option { @@ -242,9 +261,8 @@ impl VoiceEventHandler for TrackEndLoader { else if cold_queue.use_radio { // if the user queue is empty, try the preloaded radio track - if let Some(radio_preload) = cold_queue.radio_next.take() { - play_preload_radio_track(&mut call, radio_preload, self.0.clone()).await; - cold_queue.radio_next = None; + if let Some((radio_preload, metadata)) = cold_queue.radio_next.take() { + play_preload_radio_track(&mut call, radio_preload, metadata, self.0.clone()).await; let _ = preload_radio_track(&mut cold_queue).await; return None; } @@ -520,23 +538,16 @@ impl Players { async fn play_preload( handler: &mut Call, - preload: PreloadInput // &mut Vec, - // metadata: Option + preload: Input, // &mut Vec, + metadata: Option ) -> Result<(TrackHandle, Option), HandlerError> { - let input = Input::new( - true, - children_to_reader::(preload.children.inner()), - Codec::FloatPcm, - Container::Raw, - preload.metadata.clone().map(|x| x.into()) - ); - - let (track, track_handle) = create_player(input); + let (track, track_handle) = create_player(preload); handler.enqueue(track); - - Ok((track_handle, preload.metadata.map(|x| x.into()))) + Ok((track_handle, metadata + //TODO: FIXME!: preload.metadata.map(|x| x.into()) + )) } async fn fan_collection(&self, uri: &str) -> Result, HandlerError> { @@ -1139,18 +1150,18 @@ async fn list(ctx: &Context, msg: &Message) -> CommandResult { .queue.clone() .drain(..) .chain(cold_queue.radio_queue.clone().drain(..)) - .chain( - cold_queue.radio_next - .iter() - .filter_map( - |next| - next.metadata - .clone() - .unwrap() - .metadata - .source_url - .map(|x| x.to_string()) - )) + // .chain( + // cold_queue.radio_next + // .iter() + // .filter_map( + // |next| + // next.metadata + // .clone() + // .unwrap() + // .metadata + // .source_url + // .map(|x| x.to_string()) + // )) .collect::>() .join("\n"),