Skip to content

Commit

Permalink
fixes #158 for good
Browse files Browse the repository at this point in the history
  • Loading branch information
allow authored and allow committed May 11, 2024
1 parent 4d737b9 commit 8b3e514
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 138 deletions.
144 changes: 42 additions & 102 deletions crates/mockingbird/src/deemix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,27 +273,7 @@ async fn _deemix_stream(uri: &str, pipesize: i32) -> Result<(std::process::Child
Ok((deemix, metadata_from_deemix_output(&metadata_raw)))
}

fn _balloon_loader(proc: &mut std::process::Child, pipesize: i32) -> Result<std::process::Child, DeemixError> {
let balloon = std::process::Command::new("balloon")
.stdin(
proc.stdout.take()
.ok_or(SongbirdError::Stdout)?
)
.stderr(Stdio::null())
.stdout(Stdio::piped())
.spawn()
.expect("Failed to start child process");

let balloon_ptr = balloon.stdout.as_ref()
.ok_or(SongbirdError::Stdout)?
.as_raw_fd();

unsafe { bigpipe(balloon_ptr, pipesize); }

Ok(balloon)
}

fn _ffmpeg(proc: &mut std::process::Child, pre_args: &[&str], pipesize: i32) -> Result<std::process::Child, DeemixError> {
fn _ffmpeg(proc: &mut std::process::Child, pre_args: &[&str], pipesize: i32) -> Result<std::process::Child, DeemixError> {
let ffmpeg_args = [
"-f",
"s16le",
Expand Down Expand Up @@ -330,104 +310,64 @@ fn _ffmpeg(proc: &mut std::process::Child, pre_args: &[&str], pipesize: i32) ->
Ok(ffmpeg)
}

pub struct PreloadInput {
ffmpeg_stdout: RawFd,
pub children: PreloadChildContainer,
pub metadata: Option<DeemixMetadata>,
balloon: bool,
}

pub async fn _deemix_preload(
uri: &str,
pre_args: &[&str],
balloon: bool,
pipesize: i32
) -> Result<PreloadInput, DeemixError>
{
let mut children = Vec::with_capacity(3);

tracing::info!("Running: deemix-stream {} {}", pre_args.join(" "), uri);
let (mut deemix, metadata) = _deemix_stream(uri, pipesize).await?;

let mut balloon_proc = if balloon {
tracing::info!("running balloon");
Some(_balloon_loader(&mut deemix, pipesize)?)
} else { None };

let output = balloon_proc.as_mut()
.unwrap_or(&mut deemix);

let ffmpeg = _ffmpeg(output, pre_args, pipesize)?;

children.push(deemix);

if let Some(balloon) = balloon_proc {
children.push(balloon);
}

children.push(ffmpeg);

let stdout_fd = children.last().unwrap().stdout.as_ref()
.ok_or(SongbirdError::Stdout)?
.as_raw_fd();

return Ok(PreloadInput {
balloon,
ffmpeg_stdout: stdout_fd,
children: PreloadChildContainer::new(children),
metadata: Some(metadata),
})
}


pub async fn _deemix(
uri: &str,
pre_args: &[&str],
balloon: bool,
wait: bool,
) -> Result<(Input, Option<DeemixMetadata>), DeemixError>
{
let pipesize = max_pipe_size().await.unwrap();

let mut preload_input = _deemix_preload(uri, pre_args, balloon, pipesize).await?;
let now = std::time::Instant::now();

let pipe_threshold = std::env::var("MKBIRD_PIPE_THRESHOLD")
.unwrap_or_else(|_| "0.8".to_string())
.parse::<f32>()
.unwrap_or(0.8);

loop {
let avail = unsafe { availbytes(preload_input.ffmpeg_stdout) };
let mut percentage = 0.0;
if 0 > avail {
break
}
if avail > 0 {
percentage = pipesize as f32 / avail as f32;
}
let pipesize = max_pipe_size().await.unwrap();

if pipe_threshold > percentage {
tokio::time::sleep(std::time::Duration::from_micros(200)).await;
tracing::debug!("availbytes: {}", avail);
tracing::debug!("pipesize: {}", pipesize);
}
else {
tracing::info!("load time: {}", now.elapsed().as_secs_f64());
tracing::debug!("availbytes: {}", avail);
tracing::debug!("pipesize: {}", pipesize);
break
tracing::info!("Running: deemix-stream {} {}", pre_args.join(" "), uri);
let (mut deemix, metadata) = _deemix_stream(uri, pipesize).await?;

let ffmpeg = _ffmpeg(&mut deemix, pre_args, pipesize)?;
let stdout_fd = ffmpeg.stdout.as_ref()
.ok_or(SongbirdError::Stdout)?
.as_raw_fd();

if wait {
let now = std::time::Instant::now();
loop {
let avail = unsafe { availbytes(stdout_fd) };
let mut percentage = 0.0;
if 0 > avail {
break
}
if avail > 0 {
percentage = pipesize as f32 / avail as f32;
}

if pipe_threshold > percentage {
tokio::time::sleep(std::time::Duration::from_micros(200)).await;
tracing::debug!("availbytes: {}", avail);
tracing::debug!("pipesize: {}", pipesize);
}
else {
tracing::info!("load time: {}", now.elapsed().as_secs_f64());
tracing::debug!("availbytes: {}", avail);
tracing::debug!("pipesize: {}", pipesize);
break
}
}
}

Ok((
Input::new(
true,
children_to_reader::<f32>(preload_input.children.inner()),
Codec::FloatPcm,
Container::Raw,
preload_input.metadata.clone().map(|x| x.into()),
),
preload_input.metadata.clone()))
true,
children_to_reader::<f32>(vec![deemix, ffmpeg]),
Codec::FloatPcm,
Container::Raw,
Some(metadata.clone().into()),
),
Some(metadata.clone())
))
}

#[derive(Debug, Clone)]
Expand Down
83 changes: 47 additions & 36 deletions crates/mockingbird/src/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,14 @@ use tokio::{

};

use songbird::input::cached::Compressed;
use std::sync::{Mutex};


use cutils::{availbytes, bigpipe, max_pipe_size};

#[cfg(feature = "deemix")]
use crate::deemix::{DeemixMetadata, PreloadInput, _deemix_preload};
use crate::deemix::{DeemixMetadata, _deemix};

#[group]
#[commands(join, leave, queue, now_playing, skip, list)]
Expand All @@ -64,6 +68,12 @@ const TS_PRELOAD_OFFSET: Duration = Duration::from_secs(20);
const TS_ABANDONED_HB: Duration = Duration::from_secs(720);
const HASPLAYED_MAX_LEN: usize = 10;

struct DeemixPreloadCache;

impl TypeMapKey for DeemixPreloadCache {
type Value = Arc<Mutex<HashMap<String, Compressed>>>;
}

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum EventEnd {
Skipped,
Expand Down Expand Up @@ -93,7 +103,7 @@ struct ColdQueue {
pub use_radio: bool,
// urls
pub radio_queue: VecDeque<String>,
pub radio_next: Option<PreloadInput>,
pub radio_next: Option<(Compressed, Option<MetadataType>)>,
}

pub struct QueueContext {
Expand All @@ -106,6 +116,7 @@ pub struct QueueContext {
manager: Arc<Songbird>,
cold_queue: Arc<RwLock<ColdQueue>>,
}

#[derive(Debug, Clone)]
enum MetadataType {
#[cfg(feature = "deemix")]
Expand Down Expand Up @@ -177,10 +188,15 @@ async fn preload_radio_track(
};

if let Some(uri) = uri {
let pipesize = max_pipe_size().await.unwrap();
match _deemix_preload(&uri, &[], true, pipesize).await {
Ok(preload_input) => {
cold_queue.radio_next = Some(preload_input);
match _deemix(&uri, &[], false).await {
Ok((preload_input, metadata)) => {
cold_queue.radio_next = Some((Compressed::new(
preload_input,
songbird::driver::Bitrate::BitsPerSecond(128_000)
).unwrap(),

metadata.map(|x| x.into())
));
return Ok(())
}

Expand All @@ -200,11 +216,12 @@ async fn preload_radio_track(

async fn play_preload_radio_track(
call: &mut Call,
radio_preload: PreloadInput,
radio_preload: Compressed,
metadata: Option<MetadataType>,
qctx: Arc<QueueContext>
)
{
let preload_result = Players::play_preload(call, radio_preload).await;
let preload_result = Players::play_preload(call, radio_preload.new_handle().into(), metadata).await;

match preload_result {
Err(why) =>{
Expand All @@ -221,7 +238,9 @@ async fn play_preload_radio_track(
).unwrap()
}
}

struct TrackEndLoader(Arc<QueueContext>);

#[async_trait]
impl VoiceEventHandler for TrackEndLoader {
async fn act(&self, _ctx: &EventContext<'_>) -> Option<Event> {
Expand All @@ -242,9 +261,8 @@ impl VoiceEventHandler for TrackEndLoader {

else if cold_queue.use_radio {
// if the user queue is empty, try the preloaded radio track
if let Some(radio_preload) = cold_queue.radio_next.take() {
play_preload_radio_track(&mut call, radio_preload, self.0.clone()).await;
cold_queue.radio_next = None;
if let Some((radio_preload, metadata)) = cold_queue.radio_next.take() {
play_preload_radio_track(&mut call, radio_preload, metadata, self.0.clone()).await;
let _ = preload_radio_track(&mut cold_queue).await;
return None;
}
Expand Down Expand Up @@ -520,23 +538,16 @@ impl Players {

async fn play_preload(
handler: &mut Call,
preload: PreloadInput // &mut Vec<std::process::Child>,
// metadata: Option<MetadataType>
preload: Input, // &mut Vec<std::process::Child>,
metadata: Option<MetadataType>
)
-> Result<(TrackHandle, Option<MetadataType>), HandlerError>
{
let input = Input::new(
true,
children_to_reader::<f32>(preload.children.inner()),
Codec::FloatPcm,
Container::Raw,
preload.metadata.clone().map(|x| x.into())
);

let (track, track_handle) = create_player(input);
let (track, track_handle) = create_player(preload);
handler.enqueue(track);

Ok((track_handle, preload.metadata.map(|x| x.into())))
Ok((track_handle, metadata
//TODO: FIXME!: preload.metadata.map(|x| x.into())
))
}

async fn fan_collection(&self, uri: &str) -> Result<VecDeque<String>, HandlerError> {
Expand Down Expand Up @@ -1139,18 +1150,18 @@ async fn list(ctx: &Context, msg: &Message) -> CommandResult {
.queue.clone()
.drain(..)
.chain(cold_queue.radio_queue.clone().drain(..))
.chain(
cold_queue.radio_next
.iter()
.filter_map(
|next|
next.metadata
.clone()
.unwrap()
.metadata
.source_url
.map(|x| x.to_string())
))
// .chain(
// cold_queue.radio_next
// .iter()
// .filter_map(
// |next|
// next.metadata
// .clone()
// .unwrap()
// .metadata
// .source_url
// .map(|x| x.to_string())
// ))
.collect::<Vec<_>>()
.join("\n"),

Expand Down

0 comments on commit 8b3e514

Please sign in to comment.