diff --git a/Cargo.lock b/Cargo.lock index 13ad0ce2..17239472 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -824,6 +824,7 @@ dependencies = [ "specta", "tempfile", "thiserror", + "tokio", "tracing", "windows 0.58.0", "windows-capture", @@ -1689,6 +1690,7 @@ dependencies = [ "cocoa 0.26.0", "core-foundation 0.10.0", "core-graphics 0.24.0", + "cpal 0.15.3 (git+https://github.com/RustAudio/cpal?rev=f43d36e55494993bbbde3299af0c53e5cdf4d4cf)", "device_query", "dotenvy_macro", "fast_image_resize", @@ -1699,6 +1701,7 @@ dependencies = [ "global-hotkey", "image 0.25.2", "indexmap 2.5.0", + "keyed_priority_queue", "mp4", "nix 0.29.0", "num-traits", @@ -3441,6 +3444,15 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "keyed_priority_queue" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ee7893dab2e44ae5f9d0173f26ff4aa327c10b01b06a72b52dd9405b628640d" +dependencies = [ + "indexmap 2.5.0", +] + [[package]] name = "khronos-egl" version = "6.0.0" diff --git a/apps/desktop/src-tauri/Cargo.toml b/apps/desktop/src-tauri/Cargo.toml index 3a213cff..33e62590 100644 --- a/apps/desktop/src-tauri/Cargo.toml +++ b/apps/desktop/src-tauri/Cargo.toml @@ -79,6 +79,8 @@ dotenvy_macro = "0.15.7" global-hotkey = "0.6.3" tauri-plugin-http = "2.0.0-rc.0" rand = "0.8.5" +cpal.workspace = true +keyed_priority_queue = "0.4.2" [target.'cfg(target_os = "macos")'.dependencies] core-graphics = "0.24.0" diff --git a/apps/desktop/src-tauri/src/audio_meter.rs b/apps/desktop/src-tauri/src/audio_meter.rs new file mode 100644 index 00000000..767a7531 --- /dev/null +++ b/apps/desktop/src-tauri/src/audio_meter.rs @@ -0,0 +1,145 @@ +use cap_media::feeds::{AudioInputSamples, AudioInputSamplesReceiver}; +use cpal::{SampleFormat, StreamInstant}; +use keyed_priority_queue::KeyedPriorityQueue; +use serde::{Deserialize, Serialize}; +use std::cmp::Ordering; +use std::collections::VecDeque; +use std::time::Duration; +use tauri::AppHandle; +use tauri_specta::Event; + +const MAX_AMPLITUDE_F32: f64 = (u16::MAX / 2) as f64; // i16 max value +const ZERO_AMPLITUDE: u16 = 0; +const MIN_DB: f64 = -96.0; + +#[derive(Deserialize, specta::Type, Serialize, tauri_specta::Event, Debug, Clone)] +pub struct AudioInputLevelChange(f64); + +pub fn spawn_event_emitter(app_handle: AppHandle, audio_input_rx: AudioInputSamplesReceiver) { + let mut time_window = VolumeMeter::new(0.2); + tokio::spawn(async move { + while let Ok(samples) = audio_input_rx.recv_async().await { + let floats = samples_to_f64(&samples); + + let db = db_fs(floats); + + time_window.push(samples.info.timestamp().capture, db); + + let max = time_window.max(); + + AudioInputLevelChange(max).emit(&app_handle).ok(); + } + }); +} + +// https://github.com/cgbur/meter/blob/master/src/time_window.rs +struct VolumeMeter { + keep_duration: Duration, // secs + maxes: KeyedPriorityQueue, + times: VecDeque, +} + +impl VolumeMeter { + pub fn new(keep_secs: f32) -> Self { + Self { + keep_duration: Duration::from_secs_f32(keep_secs), + maxes: KeyedPriorityQueue::new(), + times: Default::default(), + } + } + + pub fn push(&mut self, time: StreamInstant, value: f64) { + let value = MinNonNan(-value); + self.maxes.push(time, value); + self.times.push_back(time); + + loop { + if let Some(time) = self + .times + .back() + .unwrap() + .duration_since(self.times.front().unwrap()) + { + if time > self.keep_duration { + self.maxes.remove(self.times.front().unwrap()); + self.times.pop_front(); + } else { + break; + } + } else { + break; + } + + if self.times.len() <= 1 { + break; + } + } + } + + pub fn max(&self) -> f64 { + -self.maxes.peek().map(|(_, db)| db.0).unwrap_or(-MIN_DB) + } +} + +#[derive(PartialEq)] +struct MinNonNan(f64); + +impl Eq for MinNonNan {} + +impl PartialOrd for MinNonNan { + fn partial_cmp(&self, other: &Self) -> Option { + other.0.partial_cmp(&self.0) + } +} + +impl Ord for MinNonNan { + fn cmp(&self, other: &MinNonNan) -> Ordering { + self.partial_cmp(other).unwrap() + } +} + +use cpal::Sample; + +fn db_fs(data: impl Iterator) -> f64 { + let max = data + .map(|f| f.to_sample::().unsigned_abs()) + .max() + .unwrap_or(ZERO_AMPLITUDE); + + (20.0 * (max as f64 / MAX_AMPLITUDE_F32).log10()).clamp(MIN_DB, 0.0) +} + +fn samples_to_f64(samples: &AudioInputSamples) -> impl Iterator + use<'_> { + samples + .data + .chunks(samples.format.sample_size()) + .map(|data| match samples.format { + SampleFormat::I8 => i8::from_ne_bytes([data[0]]) as f64 / i8::MAX as f64, + SampleFormat::U8 => u8::from_ne_bytes([data[0]]) as f64 / u8::MAX as f64, + SampleFormat::I16 => i16::from_ne_bytes([data[0], data[1]]) as f64 / i16::MAX as f64, + SampleFormat::U16 => u16::from_ne_bytes([data[0], data[1]]) as f64 / u16::MAX as f64, + SampleFormat::I32 => { + i32::from_ne_bytes([data[0], data[1], data[2], data[3]]) as f64 / i32::MAX as f64 + } + SampleFormat::U32 => { + u32::from_ne_bytes([data[0], data[1], data[2], data[3]]) as f64 / u32::MAX as f64 + } + SampleFormat::U64 => { + u64::from_ne_bytes([ + data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7], + ]) as f64 + / u64::MAX as f64 + } + SampleFormat::I64 => { + i64::from_ne_bytes([ + data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7], + ]) as f64 + / i64::MAX as f64 + } + SampleFormat::F32 => f32::from_ne_bytes([data[0], data[1], data[2], data[3]]) as f64, + SampleFormat::F64 => f64::from_ne_bytes([ + data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7], + ]), + _ => todo!(), + }) +} diff --git a/apps/desktop/src-tauri/src/lib.rs b/apps/desktop/src-tauri/src/lib.rs index 97b68a4c..9dfc7af3 100644 --- a/apps/desktop/src-tauri/src/lib.rs +++ b/apps/desktop/src-tauri/src/lib.rs @@ -9,6 +9,7 @@ mod permissions; mod platform; mod recording; // mod resource; +mod audio_meter; mod cursor; mod tray; mod upload; @@ -19,11 +20,12 @@ use audio::AppSounds; use auth::{AuthStore, AuthenticationInvalid}; use cap_editor::{EditorInstance, FRAMES_WS_PATH}; use cap_editor::{EditorState, ProjectRecordings}; +use cap_media::feeds::{AudioInputFeed, AudioInputSamplesSender}; use cap_media::sources::CaptureScreen; use cap_media::{ feeds::{AudioFrameBuffer, CameraFeed, CameraFrameSender}, platform::Bounds, - sources::{AudioInputSource, ScreenCaptureTarget}, + sources::ScreenCaptureTarget, }; use cap_project::{ ProjectConfiguration, RecordingMeta, SharingMeta, TimelineConfiguration, TimelineSegment, @@ -91,6 +93,10 @@ pub struct App { #[serde(skip)] camera_feed: Option, #[serde(skip)] + audio_input_feed: Option, + #[serde(skip)] + audio_input_tx: AudioInputSamplesSender, + #[serde(skip)] handle: AppHandle, #[serde(skip)] current_recording: Option, @@ -167,16 +173,16 @@ impl App { _ => {} } - match &new_options.camera_label { + match new_options.camera_label() { Some(camera_label) => { - if self.camera_feed.is_none() { - self.camera_feed = CameraFeed::init(camera_label, self.camera_tx.clone()) + if let Some(camera_feed) = self.camera_feed.as_mut() { + camera_feed + .switch_cameras(camera_label) .await .map_err(|error| eprintln!("{error}")) .ok(); - } else if let Some(camera_feed) = self.camera_feed.as_mut() { - camera_feed - .switch_cameras(camera_label) + } else { + self.camera_feed = CameraFeed::init(camera_label, self.camera_tx.clone()) .await .map_err(|error| eprintln!("{error}")) .ok(); @@ -187,6 +193,31 @@ impl App { } } + match new_options.audio_input_name() { + Some(audio_input_name) => { + if let Some(audio_input_feed) = self.audio_input_feed.as_mut() { + audio_input_feed + .switch_input(audio_input_name) + .await + .map_err(|error| eprintln!("{error}")) + .ok(); + } else { + self.audio_input_feed = if let Ok(feed) = AudioInputFeed::init(audio_input_name) + .await + .map_err(|error| eprintln!("{error}")) + { + feed.add_sender(self.audio_input_tx.clone()).await.unwrap(); + Some(feed) + } else { + None + }; + } + } + None => { + self.audio_input_feed = None; + } + } + self.start_recording_options = new_options; RecordingOptionsChanged.emit(&self.handle).ok(); @@ -342,6 +373,7 @@ async fn start_recording(app: AppHandle, state: MutableState<'_, App>) -> Result recording_dir, &state.start_recording_options, state.camera_feed.as_ref(), + state.audio_input_feed.as_ref(), ) .await { @@ -1671,14 +1703,7 @@ async fn list_audio_devices() -> Result, ()> { return Ok(vec![]); } - // TODO: Check - is this necessary? `spawn_blocking` is quite a bit of overhead. - tokio::task::spawn_blocking(|| { - let devices = AudioInputSource::get_devices(); - - devices.keys().cloned().collect() - }) - .await - .map_err(|_| ()) + Ok(AudioInputFeed::list_devices().keys().cloned().collect()) } #[tauri::command(async)] @@ -2442,7 +2467,8 @@ pub async fn run() { RequestNewScreenshot, RequestOpenSettings, NewNotification, - AuthenticationInvalid + AuthenticationInvalid, + audio_meter::AudioInputLevelChange ]) .typ::() .typ::() @@ -2461,6 +2487,8 @@ pub async fn run() { let (camera_tx, camera_rx) = CameraFeed::create_channel(); let camera_ws_port = camera::create_camera_ws(camera_rx.clone()).await; + let (audio_input_tx, audio_input_rx) = AudioInputFeed::create_channel(); + tauri::async_runtime::set(tokio::runtime::Handle::current()); #[allow(unused_mut)] @@ -2509,11 +2537,15 @@ pub async fn run() { CapWindow::Main.show(&app_handle).ok(); } + audio_meter::spawn_event_emitter(app_handle.clone(), audio_input_rx); + app.manage(Arc::new(RwLock::new(App { handle: app_handle.clone(), camera_tx, camera_ws_port, camera_feed: None, + audio_input_tx, + audio_input_feed: None, start_recording_options: RecordingOptions { capture_target: ScreenCaptureTarget::Screen(CaptureScreen { id: 1, diff --git a/apps/desktop/src-tauri/src/recording.rs b/apps/desktop/src-tauri/src/recording.rs index 2fced24b..6ad4eb1e 100644 --- a/apps/desktop/src-tauri/src/recording.rs +++ b/apps/desktop/src-tauri/src/recording.rs @@ -186,6 +186,7 @@ pub async fn start( recording_dir: PathBuf, recording_options: &RecordingOptions, camera_feed: Option<&CameraFeed>, + audio_input_feed: Option<&AudioInputFeed>, ) -> Result { let content_dir = recording_dir.join("content"); let cursors_dir = content_dir.join("cursors"); @@ -242,11 +243,7 @@ pub async fn start( .sink("screen_capture_encoder", screen_encoder); } - if let Some(mic_source) = recording_options - .audio_input_name - .as_ref() - .and_then(|name| AudioInputSource::init(name)) - { + if let Some(mic_source) = audio_input_feed.map(AudioInputSource::init) { let mic_config = mic_source.info(); audio_output_path = Some(content_dir.join("audio-input.mp3")); @@ -263,7 +260,7 @@ pub async fn start( .sink("microphone_encoder", mic_encoder); } - if let Some(camera_source) = CameraSource::init(camera_feed) { + if let Some(camera_source) = camera_feed.map(CameraSource::init) { let camera_config = camera_source.info(); let output_config = camera_config.scaled(1920, 30); camera_output_path = Some(content_dir.join("camera.mp4")); diff --git a/apps/desktop/src/routes/(window-chrome)/(main).tsx b/apps/desktop/src/routes/(window-chrome)/(main).tsx index 2d9b5f77..ed128d0c 100644 --- a/apps/desktop/src/routes/(window-chrome)/(main).tsx +++ b/apps/desktop/src/routes/(window-chrome)/(main).tsx @@ -18,7 +18,7 @@ import { createSignal, onMount, } from "solid-js"; -import { createStore } from "solid-js/store"; +import { createStore, produce } from "solid-js/store"; import { fetch } from "@tauri-apps/plugin-http"; import { authStore } from "~/store"; @@ -397,6 +397,8 @@ function MicrophoneSelect(props: { options: ReturnType["options"]["data"]; setOptions: ReturnType["setOptions"]; }) { + const DB_SCALE = 40; + const devices = createQuery(() => listAudioDevices); const permissions = createQuery(() => getPermissions); const currentRecording = createCurrentRecordingQuery(); @@ -422,8 +424,25 @@ function MicrophoneSelect(props: { ...props.options, audioInputName: item.deviceId !== "" ? item.name : null, }); + if (!item.deviceId) setDbs(); }; + // raw db level + const [dbs, setDbs] = createSignal(); + + createEffect(() => { + if (!props.options?.audioInputName) setDbs(); + }); + + events.audioInputLevelChange.listen((dbs) => { + if (!props.options?.audioInputName) setDbs(); + else setDbs(dbs.payload); + }); + + // visual audio level from 0 -> 1 + const audioLevel = () => + Math.pow(1 - Math.max((dbs() ?? 0) + DB_SCALE, 0) / DB_SCALE, 0.5); + return (
@@ -452,7 +471,17 @@ function MicrophoneSelect(props: { setOpen(isOpen); }} > - + + + {(s) => ( +
+ )} + ({ +audioInputLevelChange: "audio-input-level-change", authenticationInvalid: "authentication-invalid", currentRecordingChanged: "current-recording-changed", editorStateChanged: "editor-state-changed", @@ -366,6 +368,7 @@ showCapturesPanel: "show-captures-panel" export type AspectRatio = "wide" | "vertical" | "square" | "classic" | "tall" export type Audio = { duration: number; sample_rate: number; channels: number } export type AudioConfiguration = { mute: boolean; improve: boolean } +export type AudioInputLevelChange = number export type AudioMeta = { path: string } export type AuthStore = { token: string; expires: number; plan: Plan | null } export type AuthenticationInvalid = null diff --git a/crates/media/Cargo.toml b/crates/media/Cargo.toml index 5477a10b..e5e6202e 100644 --- a/crates/media/Cargo.toml +++ b/crates/media/Cargo.toml @@ -26,6 +26,7 @@ tempfile = "3.12.0" thiserror = "1.0" tracing = "0.1" futures = "0.3.31" +tokio.workspace = true [target.'cfg(target_os = "macos")'.dependencies] cocoa = "0.26.0" diff --git a/crates/media/src/feeds/audio_input.rs b/crates/media/src/feeds/audio_input.rs new file mode 100644 index 00000000..d780bd3a --- /dev/null +++ b/crates/media/src/feeds/audio_input.rs @@ -0,0 +1,263 @@ +use cpal::traits::{DeviceTrait, HostTrait}; +use cpal::{Device, InputCallbackInfo, SampleFormat, StreamConfig, SupportedStreamConfig}; +use flume::{Receiver, Sender, TrySendError}; +use indexmap::IndexMap; +use tracing::warn; + +use crate::{ + data::{ffmpeg_sample_format_for, AudioInfo}, + MediaError, +}; + +#[derive(Clone)] +pub struct AudioInputSamples { + pub data: Vec, + pub format: SampleFormat, + pub info: InputCallbackInfo, +} + +enum AudioInputControl { + Switch(String, Sender>), + AttachSender(AudioInputSamplesSender), + Shutdown, +} + +pub struct AudioInputConnection { + control: Sender, +} + +impl AudioInputConnection { + pub fn attach(&self) -> Receiver { + let (sender, receiver) = flume::bounded(5); + self.control + .send(AudioInputControl::AttachSender(sender)) + .unwrap(); + + receiver + } +} + +pub type AudioInputSamplesSender = Sender; +pub type AudioInputSamplesReceiver = Receiver; + +pub type AudioInputDeviceMap = IndexMap; + +pub struct AudioInputFeed { + control_tx: Sender, + audio_info: AudioInfo, + // rx: Receiver, +} + +impl AudioInputFeed { + pub fn create_channel() -> (AudioInputSamplesSender, AudioInputSamplesReceiver) { + flume::bounded(60) + } + + pub async fn init(selected_input: &str) -> Result { + let (device, config) = Self::list_devices() + .swap_remove_entry(selected_input) + .map(|(device_name, (device, config))| { + println!("Using audio device: {}", device_name); + (device, config) + }) + .unwrap(); + + let audio_info = AudioInfo::from_stream_config(&config); + let (control_tx, control_rx) = flume::bounded(1); + + std::thread::spawn(|| start_capturing(device, config, control_rx)); + + Ok(Self { + control_tx, + audio_info, + // rx: samples_rx, + }) + } + + pub fn list_devices() -> AudioInputDeviceMap { + let host = cpal::default_host(); + let mut device_map = IndexMap::new(); + + let get_usable_device = |device: Device| { + device + .supported_input_configs() + .map_err(|error| eprintln!("Error: {error}")) + .ok() + .and_then(|configs| { + let mut configs = configs.collect::>(); + configs.sort_by(|a, b| { + b.sample_format() + .sample_size() + .cmp(&a.sample_format().sample_size()) + }); + configs + .into_iter() + .find(|c| ffmpeg_sample_format_for(c.sample_format()).is_some()) + }) + .and_then(|config| { + device + .name() + .ok() + .map(|name| (name, device, config.with_max_sample_rate())) + }) + }; + + if let Some((name, device, config)) = + host.default_input_device().and_then(get_usable_device) + { + device_map.insert(name, (device, config)); + } + + match host.input_devices() { + Ok(devices) => { + for (name, device, config) in devices.filter_map(get_usable_device) { + device_map.entry(name).or_insert((device, config)); + } + } + Err(error) => { + eprintln!("Could not access audio input devices"); + eprintln!("{error}"); + } + } + + device_map + } + + pub async fn switch_input(&mut self, name: &str) -> Result<(), MediaError> { + let (tx, rx) = flume::bounded(1); + + self.control_tx + .send_async(AudioInputControl::Switch(name.to_string(), tx)) + .await + .map_err(|error| { + eprintln!("Error while switching audio input: {error}"); + MediaError::TaskLaunch("Failed to switch audio input".into()) + })?; + + let config = rx.recv_async().await.map_err(|error| { + eprintln!("Error while switching audio input: {error}"); + MediaError::TaskLaunch("Failed to switch audio input".into()) + })??; + + dbg!(&config); + + self.audio_info = AudioInfo::from_stream_config(&config); + + Ok(()) + } + + pub async fn add_sender(&self, sender: AudioInputSamplesSender) -> Result<(), MediaError> { + self.control_tx + .send_async(AudioInputControl::AttachSender(sender)) + .await + .map_err(|error| { + eprintln!("Error while attaching audio input sender: {error}"); + MediaError::TaskLaunch("Failed to attach audio input sender".into()) + })?; + + Ok(()) + } + + pub fn audio_info(&self) -> AudioInfo { + self.audio_info + } + + pub fn create_connection(&self) -> AudioInputConnection { + AudioInputConnection { + control: self.control_tx.clone(), + } + } +} + +fn start_capturing( + mut device: Device, + mut config: SupportedStreamConfig, + control: Receiver, +) { + let mut senders: Vec = vec![]; + + loop { + let (tx, rx) = flume::bounded(4); + + let stream_config: StreamConfig = config.clone().into(); + let stream = device + .build_input_stream_raw( + &stream_config, + config.sample_format(), + move |data, info| { + tx.send(AudioInputSamples { + data: data.bytes().to_vec(), + format: data.sample_format(), + info: info.clone(), + }) + .ok(); + }, + |_e| {}, + None, + ) + .map_err(|error| { + eprintln!("Error while preparing audio capture: {error}"); + MediaError::TaskLaunch("Failed to start audio capture".into()) + }); + + loop { + match control.try_recv() { + Ok(AudioInputControl::Switch(name, response)) => { + // list_devices hangs if the stream isn't dropped + drop(stream); + let Some(items) = AudioInputFeed::list_devices().swap_remove_entry(&name).map( + |(device_name, (device, config))| { + println!("Using audio device: {}", device_name); + (device, config) + }, + ) else { + response + .send(Err(MediaError::DeviceUnreachable(name))) + .unwrap(); + break; + }; + + device = items.0; + config = items.1; + + response.send(Ok(config.clone())).unwrap(); + break; + } + Ok(AudioInputControl::Shutdown) => { + return; + } + Ok(AudioInputControl::AttachSender(sender)) => { + senders.push(sender); + } + Err(flume::TryRecvError::Disconnected) => { + println!("Control receiver is unreachable! Shutting down"); + return; + } + Err(flume::TryRecvError::Empty) => { + // No signal received, nothing to do + } + } + + match rx.recv() { + Ok(data) => { + let mut to_remove = vec![]; + for (i, sender) in senders.iter().enumerate() { + if let Err(TrySendError::Disconnected(_)) = sender.try_send(data.clone()) { + to_remove.push(i); + }; + } + + for i in to_remove.into_iter().rev() { + senders.swap_remove(i); + } + } + Err(error) => { + warn!("Failed to capture audio sampels: {:?}", error); + // Optionally, add a small delay to avoid busy-waiting + std::thread::sleep(std::time::Duration::from_millis(10)); + continue; + } + } + } + } +} diff --git a/crates/media/src/feeds/camera.rs b/crates/media/src/feeds/camera.rs index 7bef973d..f3fd461a 100644 --- a/crates/media/src/feeds/camera.rs +++ b/crates/media/src/feeds/camera.rs @@ -57,7 +57,7 @@ impl CameraFeed { } pub async fn init( - selected_camera: &String, + selected_camera: &str, rgba_data: Sender>, ) -> Result { println!("Selected camera: {:?}", selected_camera); @@ -95,14 +95,14 @@ impl CameraFeed { self.video_info } - pub async fn switch_cameras(&mut self, camera_name: &String) -> Result<(), MediaError> { + pub async fn switch_cameras(&mut self, camera_name: &str) -> Result<(), MediaError> { let current_camera_name = self.camera_info.human_name(); if camera_name != ¤t_camera_name { let (result_tx, result_rx) = flume::bounded::(1); let _ = self .control - .send_async(CameraControl::Switch(camera_name.clone(), result_tx)) + .send_async(CameraControl::Switch(camera_name.to_string(), result_tx)) .await; let (camera_info, video_info) = result_rx @@ -130,13 +130,13 @@ impl Drop for CameraFeed { } } -fn find_camera(selected_camera: &String) -> Result { +fn find_camera(selected_camera: &str) -> Result { let all_cameras = nokhwa::query(ApiBackend::Auto)?; all_cameras .into_iter() .find(|c| &c.human_name() == selected_camera) - .ok_or(MediaError::DeviceUnreachable(selected_camera.clone())) + .ok_or(MediaError::DeviceUnreachable(selected_camera.to_string())) } fn create_camera(info: &CameraInfo) -> Result { diff --git a/crates/media/src/feeds/mod.rs b/crates/media/src/feeds/mod.rs index 4b3bf358..7be4e726 100644 --- a/crates/media/src/feeds/mod.rs +++ b/crates/media/src/feeds/mod.rs @@ -1,5 +1,7 @@ mod audio; +mod audio_input; mod camera; pub use audio::*; +pub use audio_input::*; pub use camera::*; diff --git a/crates/media/src/sources/audio_input.rs b/crates/media/src/sources/audio_input.rs index b40a08dd..203cb19c 100644 --- a/crates/media/src/sources/audio_input.rs +++ b/crates/media/src/sources/audio_input.rs @@ -1,12 +1,10 @@ -use std::sync::{Arc, Mutex}; - -use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use cpal::{BufferSize, Device, Stream, StreamConfig, StreamInstant, SupportedStreamConfig}; -use flume::Sender; +use cpal::{Device, StreamInstant, SupportedStreamConfig}; +use flume::{Receiver, Sender}; use indexmap::IndexMap; +use crate::feeds::{AudioInputConnection, AudioInputFeed, AudioInputSamples}; use crate::{ - data::{ffmpeg_sample_format_for, AudioInfo, FFAudio}, + data::{AudioInfo, FFAudio}, pipeline::{ clock::{LocalTimestamp, RealTimeClock}, control::Control, @@ -24,132 +22,58 @@ impl LocalTimestamp for StreamInstant { } pub struct AudioInputSource { - device: Device, - device_name: String, - config: SupportedStreamConfig, + feed_connection: AudioInputConnection, + audio_info: AudioInfo, } impl AudioInputSource { - pub fn init(selected_audio_input: &str) -> Option { - println!("Selected audio input: {selected_audio_input}",); - - Self::get_devices() - .swap_remove_entry(selected_audio_input) - .map(|(device_name, (device, config))| { - println!("Using audio device: {}", device_name); - - Self { - device, - device_name, - config, - } - }) + pub fn init(feed: &AudioInputFeed) -> Self { + Self { + feed_connection: feed.create_connection(), + audio_info: feed.audio_info(), + } } pub fn info(&self) -> AudioInfo { - AudioInfo::from_stream_config(&self.config) + self.audio_info } - pub fn get_devices() -> AudioInputDeviceMap { - let host = cpal::default_host(); - let mut device_map = IndexMap::new(); - - let get_usable_device = |device: Device| { - device - .supported_input_configs() - .map_err(|error| eprintln!("Error: {error}")) - .ok() - .and_then(|configs| { - let mut configs = configs.collect::>(); - configs.sort_by(|a, b| { - b.sample_format() - .sample_size() - .cmp(&a.sample_format().sample_size()) - }); - configs - .into_iter() - .find(|c| ffmpeg_sample_format_for(c.sample_format()).is_some()) - }) - .and_then(|config| { - device - .name() - .ok() - .map(|name| (name, device, config.with_max_sample_rate())) - }) - }; - - if let Some((name, device, config)) = - host.default_input_device().and_then(get_usable_device) - { - device_map.insert(name, (device, config)); - } - - match host.input_devices() { - Ok(devices) => { - for (name, device, config) in devices.filter_map(get_usable_device) { - device_map.entry(name).or_insert((device, config)); - } + fn process_frame( + &self, + clock: &mut RealTimeClock, + output: &Sender, + samples: AudioInputSamples, + ) -> Result<(), MediaError> { + match clock.timestamp_for(samples.info.timestamp().capture) { + None => { + eprintln!("Clock is currently stopped. Dropping frames."); } - Err(error) => { - eprintln!("Could not access audio input devices"); - eprintln!("{error}"); + Some(timestamp) => { + let frame = self.audio_info.wrap_frame(&samples.data, timestamp); + if let Err(_) = output.send(frame) { + return Err(MediaError::Any("Pipeline is unreachable! Stopping capture")); + } } } - device_map + Ok(()) } - pub fn build_stream( + fn pause_and_drain_frames( &self, - mut clock: RealTimeClock, - output: Arc>>>, - ) -> Result { - let audio_info = self.info(); - let mut stream_config: StreamConfig = self.config.clone().into(); - stream_config.buffer_size = BufferSize::Fixed(audio_info.buffer_size); - let sample_format = self.config.sample_format(); - - let data_callback = move |data: &cpal::Data, info: &cpal::InputCallbackInfo| { - println!("data_callback"); - let capture_time = info.timestamp().capture; - - let Ok(output) = output.try_lock() else { - return; - }; - let Some(output) = output.as_ref() else { - return; - }; - - let Some(timestamp) = clock.timestamp_for(capture_time) else { - eprintln!("Clock is currently stopped. Dropping samples."); - return; - }; - - let buffer = audio_info.wrap_frame(data.bytes(), timestamp.try_into().unwrap()); - // TODO(PJ): Send error when I bring error infra back online - output.send(buffer).unwrap(); - // if let Err(_) = output.send(buffer) { - // tracing::debug!("Pipeline is unreachable. Recording will shut down."); - // } - }; - - let error_callback = |err| { - // TODO: Handle errors such as device being disconnected. Some kind of fallback or pop-up? - eprintln!("An error occurred on the audio stream: {}", err); - }; - - self.device - .build_input_stream_raw( - &stream_config, - sample_format, - data_callback, - error_callback, - None, - ) - .map_err(|error| { - eprintln!("Error while preparing audio capture: {error}"); - MediaError::TaskLaunch("Failed to start audio capture".into()) - }) + clock: &mut RealTimeClock, + output: &Sender, + frames_rx: Receiver, + ) { + let frames: Vec = frames_rx.drain().collect(); + drop(frames_rx); + + for frame in frames { + if let Err(error) = self.process_frame(clock, output, frame) { + eprintln!("{error}"); + break; + } + } } } @@ -161,49 +85,50 @@ impl PipelineSourceTask for AudioInputSource { // #[tracing::instrument(skip_all)] fn run( &mut self, - clock: Self::Clock, + mut clock: Self::Clock, ready_signal: crate::pipeline::task::PipelineReadySignal, mut control_signal: crate::pipeline::control::PipelineControlSignal, output: Sender, ) { println!("Preparing audio input source thread..."); - let output = Arc::new(Mutex::new(Some(output))); - - match self.build_stream(clock.clone(), output.clone()) { - Err(error) => ready_signal.send(Err(error)).unwrap(), - Ok(stream) => { - println!("Using audio input device {}", self.device_name); - ready_signal.send(Ok(())).unwrap(); - - loop { - // TODO: Handle these more gracefully than crashing (e.g. if user unplugged mic between pausing and resuming). - // Some kind of error stream? - match control_signal.blocking_last() { - Some(Control::Play) => { - stream - .play() - .expect("Failed to start audio input recording"); - println!("Audio input recording started."); - } - Some(Control::Pause) => { - stream - .pause() - .expect("Failed to pause audio input recording"); - } - Some(Control::Shutdown) | None => { - if let Err(error) = stream.pause() { - eprintln!("Error while stopping audio stream: {error}"); + let mut samples_rx: Option> = None; + ready_signal.send(Ok(())).unwrap(); + + loop { + match control_signal.last() { + Some(Control::Play) => { + let samples = samples_rx.get_or_insert_with(|| self.feed_connection.attach()); + + match samples.recv() { + Ok(samples) => { + if let Err(error) = self.process_frame(&mut clock, &output, samples) { + eprintln!("{error}"); + break; } - output.lock().unwrap().take(); - drop(stream); + } + Err(_) => { + eprintln!("Lost connection with the camera feed"); break; } } } - - println!("Shutting down audio input source thread.") + Some(Control::Pause) => { + // TODO: This blocks to process frames in the queue, which may delay resumption + // Some way to prevent this from delaying the listen loop? + if let Some(rx) = samples_rx.take() { + self.pause_and_drain_frames(&mut clock, &output, rx); + } + } + Some(Control::Shutdown) | None => { + if let Some(rx) = samples_rx.take() { + self.pause_and_drain_frames(&mut clock, &output, rx); + } + break; + } } } + + println!("Shutting down audio input source thread."); } } diff --git a/crates/media/src/sources/camera.rs b/crates/media/src/sources/camera.rs index e15d023c..1ca33e64 100644 --- a/crates/media/src/sources/camera.rs +++ b/crates/media/src/sources/camera.rs @@ -14,11 +14,11 @@ pub struct CameraSource { } impl CameraSource { - pub fn init(camera_feed: Option<&CameraFeed>) -> Option { - camera_feed.map(|feed| Self { + pub fn init(feed: &CameraFeed) -> Self { + Self { feed_connection: feed.create_connection(), video_info: feed.video_info(), - }) + } } pub fn info(&self) -> VideoInfo { @@ -82,6 +82,7 @@ impl PipelineSourceTask for CameraSource { output: Sender, ) { println!("Preparing camera source thread..."); + let mut frames_rx: Option> = None; ready_signal.send(Ok(())).unwrap(); @@ -119,6 +120,6 @@ impl PipelineSourceTask for CameraSource { } } - println!("Shutting down screen capture source thread."); + println!("Shutting down camera source thread."); } }