Skip to content

Commit

Permalink
Extract out audio_source
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolaschan committed Sep 4, 2024
1 parent 5dd8ba4 commit 3bba420
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 102 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions insanity-core/src/audio_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use std::future::Future;

pub trait AudioSource {
fn next(&mut self) -> impl Future<Output = Option<f32>> + Send;
fn sample_rate(&self) -> u32;
fn channels(&self) -> u16;
}

pub trait SyncAudioSource: AudioSource {
fn next_sync(&mut self) -> Option<f32>;
}
1 change: 1 addition & 0 deletions insanity-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod audio_source;
pub mod user_input_event;
1 change: 0 additions & 1 deletion insanity-native-tui-app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ path = "src/main.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1.77"
bincode = "1.3.3"
clap = { version = "^4.5.3", features = ["derive"] }
cpal = "0.13.5"
Expand Down
9 changes: 5 additions & 4 deletions insanity-native-tui-app/src/clerver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::{atomic::AtomicBool, Arc, Mutex};

use cpal::traits::{HostTrait, StreamTrait};

use insanity_core::audio_source::AudioSource;
use insanity_tui_adapter::AppEvent;
use opus::{Application, Channels, Decoder, Encoder};
use serde::{Deserialize, Serialize};
Expand All @@ -13,16 +14,16 @@ use crate::{
client::{get_output_config, setup_output_stream},
processor::{AudioChunk, AudioFormat, AudioProcessor, AUDIO_CHUNK_SIZE},
protocol::ProtocolMessage,
resampler::ResampledAudioReceiver,
server::{make_audio_receiver, AudioReceiver},
resampler::ResampledAudioSource,
server::make_audio_receiver,
};

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct AudioFrame(u128, Vec<u8>);

// A clerver is a CLient + sERVER.

async fn run_audio_sender<R: AudioReceiver + Send + Sync + 'static>(
async fn run_audio_sender<R: AudioSource + Send + Sync + 'static>(
mut conn: VeqSessionAlias,
sender_is_muted: Arc<AtomicBool>,
make_receiver: impl (FnOnce() -> R) + Send + Clone + 'static,
Expand All @@ -32,7 +33,7 @@ async fn run_audio_sender<R: AudioReceiver + Send + Sync + 'static>(
let channels_count = audio_receiver.channels();
let channels = u16_to_channels(channels_count);

let mut audio_receiver = ResampledAudioReceiver::new(audio_receiver, 48000);
let mut audio_receiver = ResampledAudioSource::new(audio_receiver, 48000);
let mut encoder = Encoder::new(48000, channels, Application::Audio).unwrap();
let mut sequence_number = 0;

Expand Down
12 changes: 6 additions & 6 deletions insanity-native-tui-app/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};

use cpal::{Sample, SampleRate};
use insanity_core::audio_source::SyncAudioSource;
use nnnoiseless::DenoiseState;
use serde::{Deserialize, Serialize};

use crate::realtime_buffer::RealTimeBuffer;
use crate::resampler::ResampledAudioReceiver;
use crate::server::RealtimeAudioReceiver;
use crate::server::SyncAudioReceiver;
use crate::resampler::ResampledAudioSource;
use crate::server::RealtimeAudioSource;

pub const AUDIO_CHUNK_SIZE: usize = 480;
pub const AUDIO_CHANNELS: u16 = 2;
Expand Down Expand Up @@ -192,7 +192,7 @@ pub struct AudioProcessor<'a> {
volume: Arc<Mutex<usize>>,
denoiser: Mutex<MultiChannelDenoiser<'a>>,
chunk_buffer: Arc<Mutex<RealTimeBuffer<AudioChunk>>>,
audio_receiver: Mutex<ResampledAudioReceiver<RealtimeAudioReceiver>>,
audio_receiver: Mutex<ResampledAudioSource<RealtimeAudioSource>>,
}

impl AudioProcessor<'_> {
Expand All @@ -202,8 +202,8 @@ impl AudioProcessor<'_> {
output_sample_rate: SampleRate,
) -> Self {
let chunk_buffer = Arc::new(Mutex::new(RealTimeBuffer::new(10)));
let audio_receiver = RealtimeAudioReceiver::new(chunk_buffer.clone(), 48000, 2);
let audio_receiver = ResampledAudioReceiver::new(audio_receiver, output_sample_rate.0);
let audio_receiver = RealtimeAudioSource::new(chunk_buffer.clone(), 48000, 2);
let audio_receiver = ResampledAudioSource::new(audio_receiver, output_sample_rate.0);

AudioProcessor {
enable_denoise,
Expand Down
21 changes: 8 additions & 13 deletions insanity-native-tui-app/src/resampler.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
use std::{collections::VecDeque, sync::Mutex};

use insanity_core::audio_source::{AudioSource, SyncAudioSource};
use log::trace;
use rubato::{Resampler, SincFixedIn};

use async_trait::async_trait;
use crate::processor::AUDIO_CHUNK_SIZE;

use crate::{
processor::AUDIO_CHUNK_SIZE,
server::{AudioReceiver, SyncAudioReceiver},
};

pub struct ResampledAudioReceiver<R: AudioReceiver> {
pub struct ResampledAudioSource<R: AudioSource> {
resampler: Mutex<SincFixedIn<f32>>,
resampled_buffer: VecDeque<f32>,
original_samples_buffer: VecDeque<f32>,
delegate: R,
sample_rate: u32,
}

impl<R: AudioReceiver + Send + Sync> ResampledAudioReceiver<R> {
pub fn new(delegate: R, sample_rate: u32) -> ResampledAudioReceiver<R> {
impl<R: AudioSource + Send + Sync> ResampledAudioSource<R> {
pub fn new(delegate: R, sample_rate: u32) -> ResampledAudioSource<R> {
let params = rubato::InterpolationParameters {
sinc_len: 256,
f_cutoff: 0.95,
Expand All @@ -33,7 +29,7 @@ impl<R: AudioReceiver + Send + Sync> ResampledAudioReceiver<R> {
AUDIO_CHUNK_SIZE,
delegate.channels() as usize,
);
ResampledAudioReceiver {
ResampledAudioSource {
resampler: Mutex::new(resampler),
resampled_buffer: VecDeque::new(),
original_samples_buffer: VecDeque::new(),
Expand Down Expand Up @@ -65,8 +61,7 @@ fn interleave_channels(channels: &[Vec<f32>]) -> Vec<f32> {
samples
}

#[async_trait]
impl<R: AudioReceiver + Send> AudioReceiver for ResampledAudioReceiver<R> {
impl<R: AudioSource + Send> AudioSource for ResampledAudioSource<R> {
async fn next(&mut self) -> Option<f32> {
if self.delegate.sample_rate() == self.sample_rate {
return self.delegate.next().await;
Expand Down Expand Up @@ -113,7 +108,7 @@ impl<R: AudioReceiver + Send> AudioReceiver for ResampledAudioReceiver<R> {
}
}

impl<R: SyncAudioReceiver + Send> SyncAudioReceiver for ResampledAudioReceiver<R> {
impl<R: SyncAudioSource + Send> SyncAudioSource for ResampledAudioSource<R> {
fn next_sync(&mut self) -> Option<f32> {
if self.delegate.sample_rate() == self.sample_rate {
return self.delegate.next_sync();
Expand Down
85 changes: 8 additions & 77 deletions insanity-native-tui-app/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ use std::sync::{Arc, Mutex};

use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{BufferSize, Device, Sample, SampleFormat, SampleRate, Stream, StreamConfig};
use insanity_core::audio_source::{AudioSource, SyncAudioSource};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};

use async_trait::async_trait;

use crate::processor::{AudioChunk, AUDIO_CHANNELS};
use crate::realtime_buffer::RealTimeBuffer;

Expand Down Expand Up @@ -79,61 +78,6 @@ fn find_stereo_input(
something
}

// async fn start_clerver_with_ui<R: AudioReceiver + Send + 'static>(
// mut conn: NewConnection,
// denoise: bool,
// make_receiver: impl (FnOnce() -> R) + Send + Clone + 'static,
// ui_message_sender: crossbeam::channel::Sender<TuiEvent>,
// ) {
// let peer_address = conn.connection.remote_address();
// if let Some(Ok(mut recv)) = conn.uni_streams.next().await {
// let message = ProtocolMessage::read_from_stream(&mut recv).await.unwrap();
// if let ProtocolMessage::IdentityDeclaration(identity) = message {
// if ui_message_sender
// .send(TuiEvent::Message(TuiMessage::UpdatePeer(
// identity.canonical_name.clone(),
// Peer {
// name: identity.canonical_name,
// status: PeerStatus::Connected(peer_address),
// },
// )))
// .is_ok()
// {}
// }
// }
// start_clerver(conn, denoise, make_receiver).await;
// if ui_message_sender
// .send(TuiEvent::Message(TuiMessage::UpdatePeer(
// peer_address.to_string(),
// Peer {
// name: peer_address.to_string(),
// status: PeerStatus::Disconnected,
// },
// )))
// .is_ok()
// {}
// }

// pub async fn start_server_with_receiver<R: AudioReceiver + Send + 'static>(
// socket: VeqSocket,
// make_receiver: impl (FnOnce() -> R) + Send + Clone + 'static,
// config: InsanityConfig,
// ) {
// loop {
// var incoming
// let incoming_conn = incoming.next().await.expect("1");
// let conn = incoming_conn.await.expect("2");
// let make_receiver_clone = make_receiver.clone();
// let ui_message_sender_clone = config.ui_message_sender.clone();
// tokio::spawn(start_clerver_with_ui(
// conn,
// config.denoise,
// make_receiver_clone,
// ui_message_sender_clone,
// ));
// }
// }

pub struct CpalStreamReceiver {
#[allow(dead_code)]
input_stream: send_safe::SendWrapperThread<Stream>,
Expand All @@ -142,19 +86,7 @@ pub struct CpalStreamReceiver {
channels: u16,
}

#[async_trait]
pub trait AudioReceiver {
async fn next(&mut self) -> Option<f32>;
fn sample_rate(&self) -> u32;
fn channels(&self) -> u16;
}

pub trait SyncAudioReceiver: AudioReceiver {
fn next_sync(&mut self) -> Option<f32>;
}

#[async_trait]
impl AudioReceiver for CpalStreamReceiver {
impl AudioSource for CpalStreamReceiver {
async fn next(&mut self) -> Option<f32> {
self.input_receiver.recv().await
}
Expand Down Expand Up @@ -233,20 +165,20 @@ pub fn make_audio_receiver() -> CpalStreamReceiver {
// }
// }

pub struct RealtimeAudioReceiver {
pub struct RealtimeAudioSource {
chunk_buffer: Arc<Mutex<RealTimeBuffer<AudioChunk>>>,
sample_buffer: VecDeque<f32>,
sample_rate: u32,
channels: u16,
}

impl RealtimeAudioReceiver {
impl RealtimeAudioSource {
pub fn new(
chunk_buffer: Arc<Mutex<RealTimeBuffer<AudioChunk>>>,
sample_rate: u32,
channels: u16,
) -> RealtimeAudioReceiver {
RealtimeAudioReceiver {
) -> RealtimeAudioSource {
RealtimeAudioSource {
chunk_buffer,
sample_buffer: VecDeque::new(),
sample_rate,
Expand All @@ -255,8 +187,7 @@ impl RealtimeAudioReceiver {
}
}

#[async_trait]
impl AudioReceiver for RealtimeAudioReceiver {
impl AudioSource for RealtimeAudioSource {
async fn next(&mut self) -> Option<f32> {
self.next_sync()
}
Expand All @@ -268,7 +199,7 @@ impl AudioReceiver for RealtimeAudioReceiver {
}
}

impl SyncAudioReceiver for RealtimeAudioReceiver {
impl SyncAudioSource for RealtimeAudioSource {
fn next_sync(&mut self) -> Option<f32> {
if self.sample_buffer.is_empty() {
let mut buffer = self.chunk_buffer.lock().unwrap();
Expand Down

0 comments on commit 3bba420

Please sign in to comment.