From e079ebfc9f6b99adc3f066f5a4e6d1f3870c29d5 Mon Sep 17 00:00:00 2001 From: Jonny Burger Date: Mon, 10 Feb 2025 14:20:35 +0100 Subject: [PATCH] yay --- packages/compositor/rust/main.rs | 249 +++++++++++++++++-------------- 1 file changed, 133 insertions(+), 116 deletions(-) diff --git a/packages/compositor/rust/main.rs b/packages/compositor/rust/main.rs index 085a6f2df0..849ed4348d 100644 --- a/packages/compositor/rust/main.rs +++ b/packages/compositor/rust/main.rs @@ -24,7 +24,7 @@ use frame_cache_manager::make_frame_cache_manager; use global_printer::{_print_verbose, print_error, set_verbose_logging}; use memory::{get_ideal_maximum_frame_cache_size, is_about_to_run_out_of_memory}; use opened_video_manager::make_opened_stream_manager; -use std::sync::mpsc; +use std::sync::mpsc::{self, Sender}; use std::thread; use std::{env, thread::JoinHandle}; @@ -59,7 +59,7 @@ fn mainfn() -> Result<(), ErrorWithBacktrace> { payload.concurrency ))?; - let long_running_process = + let mut long_running_process = LongRunningProcess::new(payload.concurrency, max_video_cache_size); long_running_process.start() } @@ -79,142 +79,159 @@ pub struct LongRunningProcess { threads: usize, maximum_frame_cache_size_in_bytes: u64, finish_thread_handles: Vec>, + send_to_thread_handles: Vec>, + receive_video_stats_in_main_thread_handles: Vec>, + receive_close_video_in_main_thread_handles: Vec>, + receive_free_in_main_thread_handles: Vec>, } impl LongRunningProcess { pub fn new(threads: usize, max_cache_size: u64) -> Self { + let send_to_thread_handles = vec![]; + let receive_video_stats_in_main_thread_handles: Vec> = + vec![]; + let receive_close_video_in_main_thread_handles: Vec> = vec![]; + let receive_free_in_main_thread_handles: Vec> = vec![]; + let map = LongRunningProcess { maximum_frame_cache_size_in_bytes: max_cache_size, threads, finish_thread_handles: vec![], + send_to_thread_handles, + receive_video_stats_in_main_thread_handles, + receive_close_video_in_main_thread_handles, + receive_free_in_main_thread_handles, }; map } - fn start(&self) -> Result<(), ErrorWithBacktrace> { - let mut send_to_thread_handles = vec![]; - let mut finish_thread_handles = vec![]; - let mut receive_video_stats_in_main_thread_handles: Vec> = - vec![]; - let mut receive_close_video_in_main_thread_handles: Vec> = vec![]; - let mut receive_free_in_main_thread_handles: Vec> = vec![]; - let mut thread_map = select_right_thread::ThreadMap::new(self.threads); + fn start_thread(&mut self, thread_index: usize) -> JoinHandle<()> { + let (send_to_thread, receive_in_thread) = mpsc::channel::(); + let (send_video_stats_to_main_thread, receive_video_stats_in_main_thread) = + mpsc::channel::(); + let (send_free_to_main_thread, receive_free_in_main_thread) = mpsc::channel::<()>(); + let (send_close_video_to_main_thread, receive_close_video_in_main_thread) = + mpsc::channel::<()>(); - for thread_index in 0..self.threads { - let (send_to_thread, receive_in_thread) = mpsc::channel::(); - let (send_video_stats_to_main_thread, receive_video_stats_in_main_thread) = - mpsc::channel::(); - let (send_free_to_main_thread, receive_free_in_main_thread) = mpsc::channel::<()>(); - let (send_close_video_to_main_thread, receive_close_video_in_main_thread) = - mpsc::channel::<()>(); - - let wait_for_thread_finish = thread::spawn(move || { - let mut frame_cache_manager = make_frame_cache_manager().unwrap(); - let mut opened_video_manager = make_opened_stream_manager().unwrap(); - - loop { - let _message = receive_in_thread.recv(); - if _message.is_err() { + self.receive_video_stats_in_main_thread_handles + .push(receive_video_stats_in_main_thread); + self.send_to_thread_handles.push(send_to_thread); + self.receive_close_video_in_main_thread_handles + .push(receive_close_video_in_main_thread); + self.receive_free_in_main_thread_handles + .push(receive_free_in_main_thread); + + thread::spawn(move || { + let mut frame_cache_manager = make_frame_cache_manager().unwrap(); + let mut opened_video_manager = make_opened_stream_manager().unwrap(); + + loop { + let _message = receive_in_thread.recv(); + if _message.is_err() { + break; + } + let message = _message.unwrap(); + let current_maximum_cache_size = + max_cache_size::get_instance().lock().unwrap().get_value(); + + if is_about_to_run_out_of_memory() && current_maximum_cache_size.is_some() { + ffmpeg::emergency_memory_free_up(&mut frame_cache_manager, thread_index) + .unwrap(); + max_cache_size::get_instance() + .lock() + .unwrap() + .set_value(Some(current_maximum_cache_size.unwrap() / 2)); + } + + match message.payload { + CliInputCommandPayload::Eof(_) => { break; } - let message = _message.unwrap(); - let current_maximum_cache_size = - max_cache_size::get_instance().lock().unwrap().get_value(); - - if is_about_to_run_out_of_memory() && current_maximum_cache_size.is_some() { - ffmpeg::emergency_memory_free_up(&mut frame_cache_manager, thread_index) - .unwrap(); - max_cache_size::get_instance() - .lock() - .unwrap() - .set_value(Some(current_maximum_cache_size.unwrap() / 2)); - } + _ => {} + }; + let res = (|| -> Result<(), ErrorWithBacktrace> { match message.payload { - CliInputCommandPayload::Eof(_) => { - break; + CliInputCommandPayload::CloseAllVideos(_) => { + opened_video_manager.close_all_videos(&mut frame_cache_manager)?; + send_close_video_to_main_thread.send(()).map_err(|e| { + ErrorWithBacktrace::from(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to send close video message: {}", e), + )) + })?; + Ok(()) } - _ => {} - }; - - let res = (|| -> Result<(), ErrorWithBacktrace> { - match message.payload { - CliInputCommandPayload::CloseAllVideos(_) => { - opened_video_manager.close_all_videos(&mut frame_cache_manager)?; - send_close_video_to_main_thread.send(()).map_err(|e| { - ErrorWithBacktrace::from(std::io::Error::new( - std::io::ErrorKind::Other, - format!("Failed to send close video message: {}", e), - )) - })?; - Ok(()) - } - CliInputCommandPayload::ExtractFrame(command) => { - let res = ffmpeg::extract_frame( - command.src, - command.original_src, - command.time, - command.transparent, - command.tone_mapped, - current_maximum_cache_size, - thread_index, - &mut opened_video_manager, - &mut frame_cache_manager, - )?; - global_printer::synchronized_write_buf(0, &message.nonce, &res)?; - if let Some(cache_size) = current_maximum_cache_size { - ffmpeg::keep_only_latest_frames_and_close_videos( - cache_size, - &mut opened_video_manager, - &mut frame_cache_manager, - thread_index, - )?; - } - Ok(()) - } - CliInputCommandPayload::FreeUpMemory(payload) => { + CliInputCommandPayload::ExtractFrame(command) => { + let res = ffmpeg::extract_frame( + command.src, + command.original_src, + command.time, + command.transparent, + command.tone_mapped, + current_maximum_cache_size, + thread_index, + &mut opened_video_manager, + &mut frame_cache_manager, + )?; + global_printer::synchronized_write_buf(0, &message.nonce, &res)?; + if let Some(cache_size) = current_maximum_cache_size { ffmpeg::keep_only_latest_frames_and_close_videos( - payload.remaining_bytes, + cache_size, &mut opened_video_manager, &mut frame_cache_manager, thread_index, )?; - send_free_to_main_thread.send(()).map_err(|e| { + } + Ok(()) + } + CliInputCommandPayload::FreeUpMemory(payload) => { + ffmpeg::keep_only_latest_frames_and_close_videos( + payload.remaining_bytes, + &mut opened_video_manager, + &mut frame_cache_manager, + thread_index, + )?; + send_free_to_main_thread.send(()).map_err(|e| { + ErrorWithBacktrace::from(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to send free memory message: {}", e), + )) + })?; + Ok(()) + } + CliInputCommandPayload::GetOpenVideoStats(_) => { + let res = ffmpeg::get_open_video_stats( + &mut frame_cache_manager, + &mut opened_video_manager, + )?; + send_video_stats_to_main_thread + .send(res.clone()) + .map_err(|e| { ErrorWithBacktrace::from(std::io::Error::new( std::io::ErrorKind::Other, - format!("Failed to send free memory message: {}", e), + format!("Failed to send video stats message: {}", e), )) })?; - Ok(()) - } - CliInputCommandPayload::GetOpenVideoStats(_) => { - let res = ffmpeg::get_open_video_stats( - &mut frame_cache_manager, - &mut opened_video_manager, - )?; - send_video_stats_to_main_thread - .send(res.clone()) - .map_err(|e| { - ErrorWithBacktrace::from(std::io::Error::new( - std::io::ErrorKind::Other, - format!("Failed to send video stats message: {}", e), - )) - })?; - Ok(()) - } - _ => panic!("Command cannot be executed on thread"), + Ok(()) } - })(); - if res.is_err() { - print_error(&message.nonce, res.err().unwrap()) + _ => panic!("Command cannot be executed on thread"), } + })(); + if res.is_err() { + print_error(&message.nonce, res.err().unwrap()) } - }); - send_to_thread_handles.push(send_to_thread); - receive_video_stats_in_main_thread_handles.push(receive_video_stats_in_main_thread); - receive_close_video_in_main_thread_handles.push(receive_close_video_in_main_thread); - receive_free_in_main_thread_handles.push(receive_free_in_main_thread); - finish_thread_handles.push(wait_for_thread_finish); + } + }) + } + + fn start(&mut self) -> Result<(), ErrorWithBacktrace> { + let mut finish_thread_handles = vec![]; + let mut thread_map = select_right_thread::ThreadMap::new(self.threads); + + for thread_index in 0..self.threads { + let wait_for_thread_finish = self.start_thread(thread_index); + finish_thread_handles.push(wait_for_thread_finish) } max_cache_size::get_instance() @@ -233,7 +250,7 @@ impl LongRunningProcess { input = matched.trim().to_string(); if input == "EOF" { - for send_handle in send_to_thread_handles { + for send_handle in self.send_to_thread_handles { send_handle.send(CliInputCommand { payload: CliInputCommandPayload::Eof(Eof {}), nonce: "".to_string(), @@ -252,16 +269,16 @@ impl LongRunningProcess { command.transparent, )?; let input_to_send = parse_cli(&input)?; - send_to_thread_handles[thread_id].send(input_to_send)?; + self.send_to_thread_handles[thread_id].send(input_to_send)?; Ok(()) } CliInputCommandPayload::GetOpenVideoStats(_) => { - for handle in &send_to_thread_handles { + for handle in &self.send_to_thread_handles { handle.send(opts.clone())?; } let mut open_video_stats_all: Vec = vec![]; - for handle in &receive_video_stats_in_main_thread_handles { + for handle in &self.receive_video_stats_in_main_thread_handles { let data = handle.recv()?; open_video_stats_all.push(data.clone()); } @@ -278,11 +295,11 @@ impl LongRunningProcess { Ok(()) } CliInputCommandPayload::FreeUpMemory(_) => { - for handle in &send_to_thread_handles { + for handle in &self.send_to_thread_handles { handle.send(opts.clone())?; } - for handle in &receive_free_in_main_thread_handles { + for handle in &self.receive_free_in_main_thread_handles { handle.recv()?; } // TODO: Is "Hi" right? @@ -290,11 +307,11 @@ impl LongRunningProcess { Ok(()) } CliInputCommandPayload::CloseAllVideos(_) => { - for handle in &send_to_thread_handles { + for handle in &self.send_to_thread_handles { handle.send(opts.clone())?; } - for handle in &receive_close_video_in_main_thread_handles { + for handle in &self.receive_close_video_in_main_thread_handles { handle.recv()?; } // TODO: Is "Hi" right?