From e7b179ae8212043e9ce43c3d8038a920ed0eccf7 Mon Sep 17 00:00:00 2001 From: kralverde Date: Sat, 7 Dec 2024 09:37:31 -0500 Subject: [PATCH] use all cpus to load chunks --- CONTRIBUTING.md | 12 +- .../src/cylindrical_chunk_iterator.rs | 15 +- pumpkin-world/src/level.rs | 12 +- pumpkin/src/client/container.rs | 3 + pumpkin/src/main.rs | 225 +++++++++--------- 5 files changed, 142 insertions(+), 125 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 956f0ca67..06bed628c 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -26,7 +26,17 @@ There are several ways you can contribute to Pumpkin: Write clear and concise commit messages that describe your changes. ### Docs -The Documentation of Pumpkin can be found at https://snowiiii.github.io/Pumpkin/ + +The Documentation of Pumpkin can be found at + +### Coding Guidelines + +- **Working with Tokio and Rayon:** + When invoking CPU intensive work, this work should be done in the Rayon thread pool via `rayon::spawn`, Rayon's + parallel iterators, or otherwise instead of from the Tokio runtime. However, it is important that the + Tokio runtime not block on these Rayon calls. Instead, the data should be passed to the Tokio runtime + via async means, for example: `tokio::sync::mpsc`. An example of how to properly do this can be found + in `pumpkin_world::level::Level::fetch_chunks`. ### Additional Information diff --git a/pumpkin-world/src/cylindrical_chunk_iterator.rs b/pumpkin-world/src/cylindrical_chunk_iterator.rs index f7ee02e9b..1f6d7b006 100644 --- a/pumpkin-world/src/cylindrical_chunk_iterator.rs +++ b/pumpkin-world/src/cylindrical_chunk_iterator.rs @@ -1,3 +1,4 @@ +use itertools::Itertools; use pumpkin_core::math::vector2::Vector2; #[derive(Debug, Clone, Copy, PartialEq)] @@ -80,10 +81,20 @@ impl Cylindrical { all_chunks.push(Vector2::new(x, z)); } } - all_chunks + + let mut result = all_chunks .into_iter() .filter(|chunk| self.is_within_distance(chunk.x, chunk.z)) - .collect() + .collect_vec(); + + // Sort such that the first chunks are closest to the center + result.sort_unstable_by_key(|pos| { + let rel_x = pos.x - self.center.x; + let rel_z = pos.z - self.center.z; + rel_x * rel_x + rel_z * rel_z + }); + + result } } diff --git a/pumpkin-world/src/level.rs b/pumpkin-world/src/level.rs index 0c2cea384..51dc3aec2 100644 --- a/pumpkin-world/src/level.rs +++ b/pumpkin-world/src/level.rs @@ -100,7 +100,7 @@ impl Level { /// it is removed from memory. Should only be called on chunks the player was not watching /// before pub fn mark_chunks_as_newly_watched(&self, chunks: &[Vector2]) { - chunks.par_iter().for_each(|chunk| { + chunks.iter().for_each(|chunk| { self.mark_chunk_as_newly_watched(*chunk); }); } @@ -126,9 +126,9 @@ impl Level { /// it is removed from memory. Should only be called on chunks the player was watching before pub fn mark_chunks_as_not_watched(&self, chunks: &[Vector2]) -> Vec> { chunks - .par_iter() + .iter() .filter(|chunk| self.mark_chunk_as_not_watched(**chunk)) - .map(|chunk| *chunk) + .copied() .collect() } @@ -157,7 +157,7 @@ impl Level { } pub fn clean_chunks(&self, chunks: &[Vector2]) { - chunks.par_iter().for_each(|chunk_pos| { + chunks.iter().for_each(|chunk_pos| { //log::debug!("Unloading {:?}", chunk_pos); self.clean_chunk(chunk_pos); }); @@ -174,7 +174,7 @@ impl Level { } pub fn clean_memory(&self, chunks_to_check: &[Vector2]) { - chunks_to_check.par_iter().for_each(|chunk| { + chunks_to_check.iter().for_each(|chunk| { if let Some(entry) = self.chunk_watchers.get(chunk) { if entry.value().is_zero() { self.chunk_watchers.remove(chunk); @@ -212,8 +212,6 @@ impl Level { } /// Reads/Generates many chunks in a world - /// MUST be called from a tokio runtime thread - /// /// Note: The order of the output chunks will almost never be in the same order as the order of input chunks pub fn fetch_chunks( &self, diff --git a/pumpkin/src/client/container.rs b/pumpkin/src/client/container.rs index 04361b551..2ebbb4a17 100644 --- a/pumpkin/src/client/container.rs +++ b/pumpkin/src/client/container.rs @@ -179,6 +179,9 @@ impl Player { if combined.crafted_item_slot().is_none() && crafted_item.is_some() { combined.recipe_used(); } + + // TODO: `combined.craft` uses rayon! It should be called from `rayon::spawn` and its + // result passed to the tokio runtime via a channel! if combined.craft() { drop(inventory); self.set_container_content(opened_container.as_deref_mut()) diff --git a/pumpkin/src/main.rs b/pumpkin/src/main.rs index 9358d5378..04752d8cd 100644 --- a/pumpkin/src/main.rs +++ b/pumpkin/src/main.rs @@ -101,11 +101,9 @@ const GIT_VERSION: &str = env!("GIT_VERSION"); #[expect(clippy::too_many_lines)] fn main() { + let time = Instant::now(); init_logger(); - //NOTE: ensure rayon is built outside of tokio scope AND the tokio runtime is a rayon task - rayon::ThreadPoolBuilder::new().build_global().unwrap(); - let default_panic = std::panic::take_hook(); std::panic::set_hook(Box::new(move |info| { default_panic(info); @@ -131,137 +129,134 @@ fn main() { log::info!("Report Issues on https://github.com/Snowiiii/Pumpkin/issues"); log::info!("Join our Discord for community support https://discord.com/invite/wT8XjrjKkf"); - let (tokio_stop_send, tokio_stop_recv) = std::sync::mpsc::sync_channel(1); - rayon::spawn(move || { - let time = Instant::now(); - - // NOTE: The tokio runtime must be known by rayon, otherwise the cpu intensive tasks will - // choke the async tasks. Also, there is not need for the tokio to span multiple threads, - // one thread should be sufficient as cpu intensive work should be passed to rayon with the - // runtime only waiting on the results. If you need to call async code from a thread, pass a - // tokio handle from `Handle::current()` from the tokio thread to the code being - // parallelized - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - - rt.block_on(async move { - tokio::spawn(async { - setup_sighandler() - .await - .expect("Unable to setup signal handlers"); - }); - - // Setup the TCP server socket. - let listener = tokio::net::TcpListener::bind(BASIC_CONFIG.server_address) + // NOTE: The tokio runtime must be seperate from rayon, otherwise the cpu intensive tasks will + // choke the async tasks. THIS MEANS THERE SHOULD BE NO BLOCKING CALLS TO RAYON FROM THE TOKIO + // RUNTIME TO INCLUDE `par_iter`!!! + // Also, there is no need for the tokio to span multiple threads, + // one thread should be sufficient as cpu intensive work should be passed to rayon with the + // runtime only waiting on the results. If you need to call async code from a thread, pass a + // tokio handle from `Handle::current()` from the tokio thread to the code being + // parallelized + // + // WARNING: All rayon calls from the tokio runtime must be non-blocking! This includes things + // like `par_iter`. These should be spawned in the the rayon pool and then passed to the tokio + // runtime with a channel! See `Level::fetch_chunks` as an example! + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + rt.block_on(async move { + tokio::spawn(async { + setup_sighandler() .await - .expect("Failed to start TcpListener"); - // In the event the user puts 0 for their port, this will allow us to know what port it is running on - let addr = listener - .local_addr() - .expect("Unable to get the address of server!"); + .expect("Unable to setup signal handlers"); + }); - let use_console = ADVANCED_CONFIG.commands.use_console; - let rcon = ADVANCED_CONFIG.rcon.clone(); + // Setup the TCP server socket. + let listener = tokio::net::TcpListener::bind(BASIC_CONFIG.server_address) + .await + .expect("Failed to start TcpListener"); + // In the event the user puts 0 for their port, this will allow us to know what port it is running on + let addr = listener + .local_addr() + .expect("Unable to get the address of server!"); - let server = Arc::new(Server::new()); - let mut ticker = Ticker::new(BASIC_CONFIG.tps); + let use_console = ADVANCED_CONFIG.commands.use_console; + let rcon = ADVANCED_CONFIG.rcon.clone(); - log::info!("Started Server took {}ms", time.elapsed().as_millis()); - log::info!("You now can connect to the server, Listening on {}", addr); + let server = Arc::new(Server::new()); + let mut ticker = Ticker::new(BASIC_CONFIG.tps); - if use_console { - setup_console(server.clone()); - } - if rcon.enabled { - let server = server.clone(); - tokio::spawn(async move { - RCONServer::new(&rcon, server).await.unwrap(); - }); - } + log::info!("Started Server took {}ms", time.elapsed().as_millis()); + log::info!("You now can connect to the server, Listening on {}", addr); - if ADVANCED_CONFIG.query.enabled { - log::info!("Query protocol enabled. Starting..."); - tokio::spawn(query::start_query_handler(server.clone(), addr)); - } + if use_console { + setup_console(server.clone()); + } + if rcon.enabled { + let server = server.clone(); + tokio::spawn(async move { + RCONServer::new(&rcon, server).await.unwrap(); + }); + } - if ADVANCED_CONFIG.lan_broadcast.enabled { - log::info!("LAN broadcast enabled. Starting..."); - tokio::spawn(lan_broadcast::start_lan_broadcast(addr)); - } + if ADVANCED_CONFIG.query.enabled { + log::info!("Query protocol enabled. Starting..."); + tokio::spawn(query::start_query_handler(server.clone(), addr)); + } - { - let server = server.clone(); - tokio::spawn(async move { - ticker.run(&server).await; - }); - } + if ADVANCED_CONFIG.lan_broadcast.enabled { + log::info!("LAN broadcast enabled. Starting..."); + tokio::spawn(lan_broadcast::start_lan_broadcast(addr)); + } - let mut master_client_id: u16 = 0; - loop { - // Asynchronously wait for an inbound socket. - let (connection, address) = listener.accept().await.unwrap(); + { + let server = server.clone(); + tokio::spawn(async move { + ticker.run(&server).await; + }); + } - if let Err(e) = connection.set_nodelay(true) { - log::warn!("failed to set TCP_NODELAY {e}"); - } + let mut master_client_id: u16 = 0; + loop { + // Asynchronously wait for an inbound socket. + let (connection, address) = listener.accept().await.unwrap(); - let id = master_client_id; - master_client_id = master_client_id.wrapping_add(1); + if let Err(e) = connection.set_nodelay(true) { + log::warn!("failed to set TCP_NODELAY {e}"); + } - log::info!( - "Accepted connection from: {} (id {})", - scrub_address(&format!("{address}")), - id - ); + let id = master_client_id; + master_client_id = master_client_id.wrapping_add(1); - let client = Arc::new(Client::new(connection, addr, id)); + log::info!( + "Accepted connection from: {} (id {})", + scrub_address(&format!("{address}")), + id + ); - let server = server.clone(); - tokio::spawn(async move { - while !client.closed.load(std::sync::atomic::Ordering::Relaxed) - && !client - .make_player - .load(std::sync::atomic::Ordering::Relaxed) - { - let open = client.poll().await; - if open { - client.process_packets(&server).await; - }; - } - if client + let client = Arc::new(Client::new(connection, addr, id)); + + let server = server.clone(); + tokio::spawn(async move { + while !client.closed.load(std::sync::atomic::Ordering::Relaxed) + && !client .make_player .load(std::sync::atomic::Ordering::Relaxed) + { + let open = client.poll().await; + if open { + client.process_packets(&server).await; + }; + } + if client + .make_player + .load(std::sync::atomic::Ordering::Relaxed) + { + let (player, world) = server.add_player(client).await; + world + .spawn_player(&BASIC_CONFIG, player.clone(), &server) + .await; + + // poll Player + while !player + .client + .closed + .load(core::sync::atomic::Ordering::Relaxed) { - let (player, world) = server.add_player(client).await; - world - .spawn_player(&BASIC_CONFIG, player.clone(), &server) - .await; - - // poll Player - while !player - .client - .closed - .load(core::sync::atomic::Ordering::Relaxed) - { - let open = player.client.poll().await; - if open { - player.process_packets(&server).await; - }; - } - log::debug!("Cleaning up player for id {}", id); - player.remove().await; - server.remove_player().await; + let open = player.client.poll().await; + if open { + player.process_packets(&server).await; + }; } - }); - } - }); - - tokio_stop_send.send(()).unwrap(); + log::debug!("Cleaning up player for id {}", id); + player.remove().await; + server.remove_player().await; + } + }); + } }); - - tokio_stop_recv.recv().unwrap(); } fn handle_interrupt() {