Skip to content

Commit

Permalink
Update main.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
JonnyBurger committed Feb 10, 2025
1 parent eaa0ace commit b2c2277
Showing 1 changed file with 74 additions and 65 deletions.
139 changes: 74 additions & 65 deletions packages/compositor/rust/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub struct LongRunningProcess {
receive_video_stats_in_main_thread_handles: Vec<mpsc::Receiver<OpenVideoStats>>,
receive_close_video_in_main_thread_handles: Vec<mpsc::Receiver<()>>,
receive_free_in_main_thread_handles: Vec<mpsc::Receiver<()>>,
thread_map: select_right_thread::ThreadMap,
}

impl LongRunningProcess {
Expand All @@ -90,6 +91,7 @@ impl LongRunningProcess {
vec![];
let receive_close_video_in_main_thread_handles: Vec<mpsc::Receiver<()>> = vec![];
let receive_free_in_main_thread_handles: Vec<mpsc::Receiver<()>> = vec![];
let thread_map = select_right_thread::ThreadMap::new(threads);

let map = LongRunningProcess {
maximum_frame_cache_size_in_bytes: max_cache_size,
Expand All @@ -98,6 +100,7 @@ impl LongRunningProcess {
receive_video_stats_in_main_thread_handles,
receive_close_video_in_main_thread_handles,
receive_free_in_main_thread_handles,
thread_map,
};
map
}
Expand Down Expand Up @@ -131,7 +134,6 @@ impl LongRunningProcess {

fn start(&mut self) -> Result<(), ErrorWithBacktrace> {
let mut finish_thread_handles = vec![];
let mut thread_map = select_right_thread::ThreadMap::new(self.threads);

for thread_index in 0..self.threads {
let wait_for_thread_finish = self.start_thread(thread_index);
Expand Down Expand Up @@ -165,77 +167,84 @@ impl LongRunningProcess {

let opts = parse_cli(&input)?;
let nonce = opts.nonce.clone();
let _result: Result<(), ErrorWithBacktrace> = match opts.payload {
CliInputCommandPayload::ExtractFrame(command) => {
let thread_id = thread_map.select_right_thread(
&command.src,
command.time,
command.transparent,
)?;
let input_to_send = parse_cli(&input)?;
self.send_to_thread_handles[thread_id].send(input_to_send)?;
Ok(())
}
CliInputCommandPayload::GetOpenVideoStats(_) => {
for handle in &self.send_to_thread_handles {
handle.send(opts.clone())?;
}

let mut open_video_stats_all: Vec<OpenVideoStats> = vec![];
for handle in &self.receive_video_stats_in_main_thread_handles {
let data = handle.recv()?;
open_video_stats_all.push(data.clone());
}
let concated: OpenVideoStats = OpenVideoStats {
frames_in_cache: open_video_stats_all
.iter()
.map(|x| x.frames_in_cache)
.sum(),
open_streams: open_video_stats_all.iter().map(|x| x.open_streams).sum(),
open_videos: open_video_stats_all.iter().map(|x| x.open_videos).sum(),
};
let str = serde_json::to_string(&concated)?;
global_printer::synchronized_write_buf(0, &nonce, &str.as_bytes())?;
Ok(())
self.run_main_thread_command(opts, input, nonce)?;
}

for handle in finish_thread_handles {
handle.join()?;
}

Ok(())
}

fn run_main_thread_command(
&mut self,
opts: CliInputCommand,
input: String,
nonce: String,
) -> Result<(), ErrorWithBacktrace> {
let _result: Result<(), ErrorWithBacktrace> = match opts.payload {
CliInputCommandPayload::ExtractFrame(command) => {
let thread_id = self.thread_map.select_right_thread(
&command.src,
command.time,
command.transparent,
)?;
let input_to_send = parse_cli(&input)?;
self.send_to_thread_handles[thread_id].send(input_to_send)?;
Ok(())
}
CliInputCommandPayload::GetOpenVideoStats(_) => {
for handle in &self.send_to_thread_handles {
handle.send(opts.clone())?;
}
CliInputCommandPayload::FreeUpMemory(_) => {
for handle in &self.send_to_thread_handles {
handle.send(opts.clone())?;
}

for handle in &self.receive_free_in_main_thread_handles {
handle.recv()?;
}
// TODO: Is "Hi" right?
global_printer::synchronized_write_buf(0, &nonce, &format!("hi").as_bytes())?;
Ok(())

let mut open_video_stats_all: Vec<OpenVideoStats> = vec![];
for handle in &self.receive_video_stats_in_main_thread_handles {
let data = handle.recv()?;
open_video_stats_all.push(data.clone());
}
CliInputCommandPayload::CloseAllVideos(_) => {
for handle in &self.send_to_thread_handles {
handle.send(opts.clone())?;
}

for handle in &self.receive_close_video_in_main_thread_handles {
handle.recv()?;
}
// TODO: Is "Hi" right?
global_printer::synchronized_write_buf(0, &nonce, &format!("hi").as_bytes())?;
Ok(())
let concated: OpenVideoStats = OpenVideoStats {
frames_in_cache: open_video_stats_all.iter().map(|x| x.frames_in_cache).sum(),
open_streams: open_video_stats_all.iter().map(|x| x.open_streams).sum(),
open_videos: open_video_stats_all.iter().map(|x| x.open_videos).sum(),
};
let str = serde_json::to_string(&concated)?;
global_printer::synchronized_write_buf(0, &nonce, &str.as_bytes())?;
Ok(())
}
CliInputCommandPayload::FreeUpMemory(_) => {
for handle in &self.send_to_thread_handles {
handle.send(opts.clone())?;
}
_ => {
execute_command_and_print(opts)?;
Ok(())

for handle in &self.receive_free_in_main_thread_handles {
handle.recv()?;
}
};
if _result.is_err() {
print_error(&nonce, _result.err().unwrap())
// TODO: Is "Hi" right?
global_printer::synchronized_write_buf(0, &nonce, &format!("hi").as_bytes())?;
Ok(())
}
}
CliInputCommandPayload::CloseAllVideos(_) => {
for handle in &self.send_to_thread_handles {
handle.send(opts.clone())?;
}

for handle in finish_thread_handles {
handle.join()?;
for handle in &self.receive_close_video_in_main_thread_handles {
handle.recv()?;
}
// TODO: Is "Hi" right?
global_printer::synchronized_write_buf(0, &nonce, &format!("hi").as_bytes())?;
Ok(())
}
_ => {
execute_command_and_print(opts)?;
Ok(())
}
};
if _result.is_err() {
print_error(&nonce, _result.err().unwrap())
}

Ok(())
}
}
Expand Down

0 comments on commit b2c2277

Please sign in to comment.