Skip to content

Commit

Permalink
yay
Browse files Browse the repository at this point in the history
  • Loading branch information
JonnyBurger committed Feb 10, 2025
1 parent 43b40fa commit e079ebf
Showing 1 changed file with 133 additions and 116 deletions.
249 changes: 133 additions & 116 deletions packages/compositor/rust/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use frame_cache_manager::make_frame_cache_manager;
use global_printer::{_print_verbose, print_error, set_verbose_logging};
use memory::{get_ideal_maximum_frame_cache_size, is_about_to_run_out_of_memory};
use opened_video_manager::make_opened_stream_manager;
use std::sync::mpsc;
use std::sync::mpsc::{self, Sender};
use std::thread;
use std::{env, thread::JoinHandle};

Expand Down Expand Up @@ -59,7 +59,7 @@ fn mainfn() -> Result<(), ErrorWithBacktrace> {
payload.concurrency
))?;

let long_running_process =
let mut long_running_process =
LongRunningProcess::new(payload.concurrency, max_video_cache_size);
long_running_process.start()
}
Expand All @@ -79,142 +79,159 @@ pub struct LongRunningProcess {
threads: usize,
maximum_frame_cache_size_in_bytes: u64,
finish_thread_handles: Vec<JoinHandle<()>>,
send_to_thread_handles: Vec<Sender<CliInputCommand>>,
receive_video_stats_in_main_thread_handles: Vec<mpsc::Receiver<OpenVideoStats>>,
receive_close_video_in_main_thread_handles: Vec<mpsc::Receiver<()>>,
receive_free_in_main_thread_handles: Vec<mpsc::Receiver<()>>,
}

impl LongRunningProcess {
pub fn new(threads: usize, max_cache_size: u64) -> Self {
let send_to_thread_handles = vec![];
let receive_video_stats_in_main_thread_handles: Vec<mpsc::Receiver<OpenVideoStats>> =
vec![];
let receive_close_video_in_main_thread_handles: Vec<mpsc::Receiver<()>> = vec![];
let receive_free_in_main_thread_handles: Vec<mpsc::Receiver<()>> = vec![];

let map = LongRunningProcess {
maximum_frame_cache_size_in_bytes: max_cache_size,
threads,
finish_thread_handles: vec![],
send_to_thread_handles,
receive_video_stats_in_main_thread_handles,
receive_close_video_in_main_thread_handles,
receive_free_in_main_thread_handles,
};
map
}

fn start(&self) -> Result<(), ErrorWithBacktrace> {
let mut send_to_thread_handles = vec![];
let mut finish_thread_handles = vec![];
let mut receive_video_stats_in_main_thread_handles: Vec<mpsc::Receiver<OpenVideoStats>> =
vec![];
let mut receive_close_video_in_main_thread_handles: Vec<mpsc::Receiver<()>> = vec![];
let mut receive_free_in_main_thread_handles: Vec<mpsc::Receiver<()>> = vec![];
let mut thread_map = select_right_thread::ThreadMap::new(self.threads);
fn start_thread(&mut self, thread_index: usize) -> JoinHandle<()> {
let (send_to_thread, receive_in_thread) = mpsc::channel::<CliInputCommand>();
let (send_video_stats_to_main_thread, receive_video_stats_in_main_thread) =
mpsc::channel::<OpenVideoStats>();
let (send_free_to_main_thread, receive_free_in_main_thread) = mpsc::channel::<()>();
let (send_close_video_to_main_thread, receive_close_video_in_main_thread) =
mpsc::channel::<()>();

for thread_index in 0..self.threads {
let (send_to_thread, receive_in_thread) = mpsc::channel::<CliInputCommand>();
let (send_video_stats_to_main_thread, receive_video_stats_in_main_thread) =
mpsc::channel::<OpenVideoStats>();
let (send_free_to_main_thread, receive_free_in_main_thread) = mpsc::channel::<()>();
let (send_close_video_to_main_thread, receive_close_video_in_main_thread) =
mpsc::channel::<()>();

let wait_for_thread_finish = thread::spawn(move || {
let mut frame_cache_manager = make_frame_cache_manager().unwrap();
let mut opened_video_manager = make_opened_stream_manager().unwrap();

loop {
let _message = receive_in_thread.recv();
if _message.is_err() {
self.receive_video_stats_in_main_thread_handles
.push(receive_video_stats_in_main_thread);
self.send_to_thread_handles.push(send_to_thread);
self.receive_close_video_in_main_thread_handles
.push(receive_close_video_in_main_thread);
self.receive_free_in_main_thread_handles
.push(receive_free_in_main_thread);

thread::spawn(move || {
let mut frame_cache_manager = make_frame_cache_manager().unwrap();
let mut opened_video_manager = make_opened_stream_manager().unwrap();

loop {
let _message = receive_in_thread.recv();
if _message.is_err() {
break;
}
let message = _message.unwrap();
let current_maximum_cache_size =
max_cache_size::get_instance().lock().unwrap().get_value();

if is_about_to_run_out_of_memory() && current_maximum_cache_size.is_some() {
ffmpeg::emergency_memory_free_up(&mut frame_cache_manager, thread_index)
.unwrap();
max_cache_size::get_instance()
.lock()
.unwrap()
.set_value(Some(current_maximum_cache_size.unwrap() / 2));
}

match message.payload {
CliInputCommandPayload::Eof(_) => {
break;
}
let message = _message.unwrap();
let current_maximum_cache_size =
max_cache_size::get_instance().lock().unwrap().get_value();

if is_about_to_run_out_of_memory() && current_maximum_cache_size.is_some() {
ffmpeg::emergency_memory_free_up(&mut frame_cache_manager, thread_index)
.unwrap();
max_cache_size::get_instance()
.lock()
.unwrap()
.set_value(Some(current_maximum_cache_size.unwrap() / 2));
}
_ => {}
};

let res = (|| -> Result<(), ErrorWithBacktrace> {
match message.payload {
CliInputCommandPayload::Eof(_) => {
break;
CliInputCommandPayload::CloseAllVideos(_) => {
opened_video_manager.close_all_videos(&mut frame_cache_manager)?;
send_close_video_to_main_thread.send(()).map_err(|e| {
ErrorWithBacktrace::from(std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to send close video message: {}", e),
))
})?;
Ok(())
}
_ => {}
};

let res = (|| -> Result<(), ErrorWithBacktrace> {
match message.payload {
CliInputCommandPayload::CloseAllVideos(_) => {
opened_video_manager.close_all_videos(&mut frame_cache_manager)?;
send_close_video_to_main_thread.send(()).map_err(|e| {
ErrorWithBacktrace::from(std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to send close video message: {}", e),
))
})?;
Ok(())
}
CliInputCommandPayload::ExtractFrame(command) => {
let res = ffmpeg::extract_frame(
command.src,
command.original_src,
command.time,
command.transparent,
command.tone_mapped,
current_maximum_cache_size,
thread_index,
&mut opened_video_manager,
&mut frame_cache_manager,
)?;
global_printer::synchronized_write_buf(0, &message.nonce, &res)?;
if let Some(cache_size) = current_maximum_cache_size {
ffmpeg::keep_only_latest_frames_and_close_videos(
cache_size,
&mut opened_video_manager,
&mut frame_cache_manager,
thread_index,
)?;
}
Ok(())
}
CliInputCommandPayload::FreeUpMemory(payload) => {
CliInputCommandPayload::ExtractFrame(command) => {
let res = ffmpeg::extract_frame(
command.src,
command.original_src,
command.time,
command.transparent,
command.tone_mapped,
current_maximum_cache_size,
thread_index,
&mut opened_video_manager,
&mut frame_cache_manager,
)?;
global_printer::synchronized_write_buf(0, &message.nonce, &res)?;
if let Some(cache_size) = current_maximum_cache_size {
ffmpeg::keep_only_latest_frames_and_close_videos(
payload.remaining_bytes,
cache_size,
&mut opened_video_manager,
&mut frame_cache_manager,
thread_index,
)?;
send_free_to_main_thread.send(()).map_err(|e| {
}
Ok(())
}
CliInputCommandPayload::FreeUpMemory(payload) => {
ffmpeg::keep_only_latest_frames_and_close_videos(
payload.remaining_bytes,
&mut opened_video_manager,
&mut frame_cache_manager,
thread_index,
)?;
send_free_to_main_thread.send(()).map_err(|e| {
ErrorWithBacktrace::from(std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to send free memory message: {}", e),
))
})?;
Ok(())
}
CliInputCommandPayload::GetOpenVideoStats(_) => {
let res = ffmpeg::get_open_video_stats(
&mut frame_cache_manager,
&mut opened_video_manager,
)?;
send_video_stats_to_main_thread
.send(res.clone())
.map_err(|e| {
ErrorWithBacktrace::from(std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to send free memory message: {}", e),
format!("Failed to send video stats message: {}", e),
))
})?;
Ok(())
}
CliInputCommandPayload::GetOpenVideoStats(_) => {
let res = ffmpeg::get_open_video_stats(
&mut frame_cache_manager,
&mut opened_video_manager,
)?;
send_video_stats_to_main_thread
.send(res.clone())
.map_err(|e| {
ErrorWithBacktrace::from(std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to send video stats message: {}", e),
))
})?;
Ok(())
}
_ => panic!("Command cannot be executed on thread"),
Ok(())
}
})();
if res.is_err() {
print_error(&message.nonce, res.err().unwrap())
_ => panic!("Command cannot be executed on thread"),
}
})();
if res.is_err() {
print_error(&message.nonce, res.err().unwrap())
}
});
send_to_thread_handles.push(send_to_thread);
receive_video_stats_in_main_thread_handles.push(receive_video_stats_in_main_thread);
receive_close_video_in_main_thread_handles.push(receive_close_video_in_main_thread);
receive_free_in_main_thread_handles.push(receive_free_in_main_thread);
finish_thread_handles.push(wait_for_thread_finish);
}
})
}

fn start(&mut self) -> Result<(), ErrorWithBacktrace> {
let mut finish_thread_handles = vec![];
let mut thread_map = select_right_thread::ThreadMap::new(self.threads);

for thread_index in 0..self.threads {
let wait_for_thread_finish = self.start_thread(thread_index);
finish_thread_handles.push(wait_for_thread_finish)
}

max_cache_size::get_instance()
Expand All @@ -233,7 +250,7 @@ impl LongRunningProcess {

input = matched.trim().to_string();
if input == "EOF" {
for send_handle in send_to_thread_handles {
for send_handle in self.send_to_thread_handles {
send_handle.send(CliInputCommand {
payload: CliInputCommandPayload::Eof(Eof {}),
nonce: "".to_string(),
Expand All @@ -252,16 +269,16 @@ impl LongRunningProcess {
command.transparent,
)?;
let input_to_send = parse_cli(&input)?;
send_to_thread_handles[thread_id].send(input_to_send)?;
self.send_to_thread_handles[thread_id].send(input_to_send)?;
Ok(())
}
CliInputCommandPayload::GetOpenVideoStats(_) => {
for handle in &send_to_thread_handles {
for handle in &self.send_to_thread_handles {
handle.send(opts.clone())?;
}

let mut open_video_stats_all: Vec<OpenVideoStats> = vec![];
for handle in &receive_video_stats_in_main_thread_handles {
for handle in &self.receive_video_stats_in_main_thread_handles {
let data = handle.recv()?;
open_video_stats_all.push(data.clone());
}
Expand All @@ -278,23 +295,23 @@ impl LongRunningProcess {
Ok(())
}
CliInputCommandPayload::FreeUpMemory(_) => {
for handle in &send_to_thread_handles {
for handle in &self.send_to_thread_handles {
handle.send(opts.clone())?;
}

for handle in &receive_free_in_main_thread_handles {
for handle in &self.receive_free_in_main_thread_handles {
handle.recv()?;
}
// TODO: Is "Hi" right?
global_printer::synchronized_write_buf(0, &nonce, &format!("hi").as_bytes())?;
Ok(())
}
CliInputCommandPayload::CloseAllVideos(_) => {
for handle in &send_to_thread_handles {
for handle in &self.send_to_thread_handles {
handle.send(opts.clone())?;
}

for handle in &receive_close_video_in_main_thread_handles {
for handle in &self.receive_close_video_in_main_thread_handles {
handle.recv()?;
}
// TODO: Is "Hi" right?
Expand Down

0 comments on commit e079ebf

Please sign in to comment.