diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 956f0ca6..06bed628 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -26,7 +26,17 @@ There are several ways you can contribute to Pumpkin:
Write clear and concise commit messages that describe your changes.
### Docs
-The Documentation of Pumpkin can be found at https://snowiiii.github.io/Pumpkin/
+
+The Documentation of Pumpkin can be found at
+
+### Coding Guidelines
+
+- **Working with Tokio and Rayon:**
+ When invoking CPU intensive work, this work should be done in the Rayon thread pool via `rayon::spawn`, Rayon's
+ parallel iterators, or otherwise instead of from the Tokio runtime. However, it is important that the
+ Tokio runtime not block on these Rayon calls. Instead, the data should be passed to the Tokio runtime
+ via async means, for example: `tokio::sync::mpsc`. An example of how to properly do this can be found
+ in `pumpkin_world::level::Level::fetch_chunks`.
### Additional Information
diff --git a/pumpkin-world/src/cylindrical_chunk_iterator.rs b/pumpkin-world/src/cylindrical_chunk_iterator.rs
index f7ee02e9..1f6d7b00 100644
--- a/pumpkin-world/src/cylindrical_chunk_iterator.rs
+++ b/pumpkin-world/src/cylindrical_chunk_iterator.rs
@@ -1,3 +1,4 @@
+use itertools::Itertools;
use pumpkin_core::math::vector2::Vector2;
#[derive(Debug, Clone, Copy, PartialEq)]
@@ -80,10 +81,20 @@ impl Cylindrical {
all_chunks.push(Vector2::new(x, z));
}
}
- all_chunks
+
+ let mut result = all_chunks
.into_iter()
.filter(|chunk| self.is_within_distance(chunk.x, chunk.z))
- .collect()
+ .collect_vec();
+
+ // Sort such that the first chunks are closest to the center
+ result.sort_unstable_by_key(|pos| {
+ let rel_x = pos.x - self.center.x;
+ let rel_z = pos.z - self.center.z;
+ rel_x * rel_x + rel_z * rel_z
+ });
+
+ result
}
}
diff --git a/pumpkin-world/src/level.rs b/pumpkin-world/src/level.rs
index 0c2cea38..51dc3aec 100644
--- a/pumpkin-world/src/level.rs
+++ b/pumpkin-world/src/level.rs
@@ -100,7 +100,7 @@ impl Level {
/// it is removed from memory. Should only be called on chunks the player was not watching
/// before
pub fn mark_chunks_as_newly_watched(&self, chunks: &[Vector2]) {
- chunks.par_iter().for_each(|chunk| {
+ chunks.iter().for_each(|chunk| {
self.mark_chunk_as_newly_watched(*chunk);
});
}
@@ -126,9 +126,9 @@ impl Level {
/// it is removed from memory. Should only be called on chunks the player was watching before
pub fn mark_chunks_as_not_watched(&self, chunks: &[Vector2]) -> Vec> {
chunks
- .par_iter()
+ .iter()
.filter(|chunk| self.mark_chunk_as_not_watched(**chunk))
- .map(|chunk| *chunk)
+ .copied()
.collect()
}
@@ -157,7 +157,7 @@ impl Level {
}
pub fn clean_chunks(&self, chunks: &[Vector2]) {
- chunks.par_iter().for_each(|chunk_pos| {
+ chunks.iter().for_each(|chunk_pos| {
//log::debug!("Unloading {:?}", chunk_pos);
self.clean_chunk(chunk_pos);
});
@@ -174,7 +174,7 @@ impl Level {
}
pub fn clean_memory(&self, chunks_to_check: &[Vector2]) {
- chunks_to_check.par_iter().for_each(|chunk| {
+ chunks_to_check.iter().for_each(|chunk| {
if let Some(entry) = self.chunk_watchers.get(chunk) {
if entry.value().is_zero() {
self.chunk_watchers.remove(chunk);
@@ -212,8 +212,6 @@ impl Level {
}
/// Reads/Generates many chunks in a world
- /// MUST be called from a tokio runtime thread
- ///
/// Note: The order of the output chunks will almost never be in the same order as the order of input chunks
pub fn fetch_chunks(
&self,
diff --git a/pumpkin/src/client/container.rs b/pumpkin/src/client/container.rs
index 04361b55..2ebbb4a1 100644
--- a/pumpkin/src/client/container.rs
+++ b/pumpkin/src/client/container.rs
@@ -179,6 +179,9 @@ impl Player {
if combined.crafted_item_slot().is_none() && crafted_item.is_some() {
combined.recipe_used();
}
+
+ // TODO: `combined.craft` uses rayon! It should be called from `rayon::spawn` and its
+ // result passed to the tokio runtime via a channel!
if combined.craft() {
drop(inventory);
self.set_container_content(opened_container.as_deref_mut())
diff --git a/pumpkin/src/main.rs b/pumpkin/src/main.rs
index 9358d537..04752d8c 100644
--- a/pumpkin/src/main.rs
+++ b/pumpkin/src/main.rs
@@ -101,11 +101,9 @@ const GIT_VERSION: &str = env!("GIT_VERSION");
#[expect(clippy::too_many_lines)]
fn main() {
+ let time = Instant::now();
init_logger();
- //NOTE: ensure rayon is built outside of tokio scope AND the tokio runtime is a rayon task
- rayon::ThreadPoolBuilder::new().build_global().unwrap();
-
let default_panic = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
default_panic(info);
@@ -131,137 +129,134 @@ fn main() {
log::info!("Report Issues on https://github.com/Snowiiii/Pumpkin/issues");
log::info!("Join our Discord for community support https://discord.com/invite/wT8XjrjKkf");
- let (tokio_stop_send, tokio_stop_recv) = std::sync::mpsc::sync_channel(1);
- rayon::spawn(move || {
- let time = Instant::now();
-
- // NOTE: The tokio runtime must be known by rayon, otherwise the cpu intensive tasks will
- // choke the async tasks. Also, there is not need for the tokio to span multiple threads,
- // one thread should be sufficient as cpu intensive work should be passed to rayon with the
- // runtime only waiting on the results. If you need to call async code from a thread, pass a
- // tokio handle from `Handle::current()` from the tokio thread to the code being
- // parallelized
- let rt = tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()
- .unwrap();
-
- rt.block_on(async move {
- tokio::spawn(async {
- setup_sighandler()
- .await
- .expect("Unable to setup signal handlers");
- });
-
- // Setup the TCP server socket.
- let listener = tokio::net::TcpListener::bind(BASIC_CONFIG.server_address)
+ // NOTE: The tokio runtime must be seperate from rayon, otherwise the cpu intensive tasks will
+ // choke the async tasks. THIS MEANS THERE SHOULD BE NO BLOCKING CALLS TO RAYON FROM THE TOKIO
+ // RUNTIME TO INCLUDE `par_iter`!!!
+ // Also, there is no need for the tokio to span multiple threads,
+ // one thread should be sufficient as cpu intensive work should be passed to rayon with the
+ // runtime only waiting on the results. If you need to call async code from a thread, pass a
+ // tokio handle from `Handle::current()` from the tokio thread to the code being
+ // parallelized
+ //
+ // WARNING: All rayon calls from the tokio runtime must be non-blocking! This includes things
+ // like `par_iter`. These should be spawned in the the rayon pool and then passed to the tokio
+ // runtime with a channel! See `Level::fetch_chunks` as an example!
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap();
+
+ rt.block_on(async move {
+ tokio::spawn(async {
+ setup_sighandler()
.await
- .expect("Failed to start TcpListener");
- // In the event the user puts 0 for their port, this will allow us to know what port it is running on
- let addr = listener
- .local_addr()
- .expect("Unable to get the address of server!");
+ .expect("Unable to setup signal handlers");
+ });
- let use_console = ADVANCED_CONFIG.commands.use_console;
- let rcon = ADVANCED_CONFIG.rcon.clone();
+ // Setup the TCP server socket.
+ let listener = tokio::net::TcpListener::bind(BASIC_CONFIG.server_address)
+ .await
+ .expect("Failed to start TcpListener");
+ // In the event the user puts 0 for their port, this will allow us to know what port it is running on
+ let addr = listener
+ .local_addr()
+ .expect("Unable to get the address of server!");
- let server = Arc::new(Server::new());
- let mut ticker = Ticker::new(BASIC_CONFIG.tps);
+ let use_console = ADVANCED_CONFIG.commands.use_console;
+ let rcon = ADVANCED_CONFIG.rcon.clone();
- log::info!("Started Server took {}ms", time.elapsed().as_millis());
- log::info!("You now can connect to the server, Listening on {}", addr);
+ let server = Arc::new(Server::new());
+ let mut ticker = Ticker::new(BASIC_CONFIG.tps);
- if use_console {
- setup_console(server.clone());
- }
- if rcon.enabled {
- let server = server.clone();
- tokio::spawn(async move {
- RCONServer::new(&rcon, server).await.unwrap();
- });
- }
+ log::info!("Started Server took {}ms", time.elapsed().as_millis());
+ log::info!("You now can connect to the server, Listening on {}", addr);
- if ADVANCED_CONFIG.query.enabled {
- log::info!("Query protocol enabled. Starting...");
- tokio::spawn(query::start_query_handler(server.clone(), addr));
- }
+ if use_console {
+ setup_console(server.clone());
+ }
+ if rcon.enabled {
+ let server = server.clone();
+ tokio::spawn(async move {
+ RCONServer::new(&rcon, server).await.unwrap();
+ });
+ }
- if ADVANCED_CONFIG.lan_broadcast.enabled {
- log::info!("LAN broadcast enabled. Starting...");
- tokio::spawn(lan_broadcast::start_lan_broadcast(addr));
- }
+ if ADVANCED_CONFIG.query.enabled {
+ log::info!("Query protocol enabled. Starting...");
+ tokio::spawn(query::start_query_handler(server.clone(), addr));
+ }
- {
- let server = server.clone();
- tokio::spawn(async move {
- ticker.run(&server).await;
- });
- }
+ if ADVANCED_CONFIG.lan_broadcast.enabled {
+ log::info!("LAN broadcast enabled. Starting...");
+ tokio::spawn(lan_broadcast::start_lan_broadcast(addr));
+ }
- let mut master_client_id: u16 = 0;
- loop {
- // Asynchronously wait for an inbound socket.
- let (connection, address) = listener.accept().await.unwrap();
+ {
+ let server = server.clone();
+ tokio::spawn(async move {
+ ticker.run(&server).await;
+ });
+ }
- if let Err(e) = connection.set_nodelay(true) {
- log::warn!("failed to set TCP_NODELAY {e}");
- }
+ let mut master_client_id: u16 = 0;
+ loop {
+ // Asynchronously wait for an inbound socket.
+ let (connection, address) = listener.accept().await.unwrap();
- let id = master_client_id;
- master_client_id = master_client_id.wrapping_add(1);
+ if let Err(e) = connection.set_nodelay(true) {
+ log::warn!("failed to set TCP_NODELAY {e}");
+ }
- log::info!(
- "Accepted connection from: {} (id {})",
- scrub_address(&format!("{address}")),
- id
- );
+ let id = master_client_id;
+ master_client_id = master_client_id.wrapping_add(1);
- let client = Arc::new(Client::new(connection, addr, id));
+ log::info!(
+ "Accepted connection from: {} (id {})",
+ scrub_address(&format!("{address}")),
+ id
+ );
- let server = server.clone();
- tokio::spawn(async move {
- while !client.closed.load(std::sync::atomic::Ordering::Relaxed)
- && !client
- .make_player
- .load(std::sync::atomic::Ordering::Relaxed)
- {
- let open = client.poll().await;
- if open {
- client.process_packets(&server).await;
- };
- }
- if client
+ let client = Arc::new(Client::new(connection, addr, id));
+
+ let server = server.clone();
+ tokio::spawn(async move {
+ while !client.closed.load(std::sync::atomic::Ordering::Relaxed)
+ && !client
.make_player
.load(std::sync::atomic::Ordering::Relaxed)
+ {
+ let open = client.poll().await;
+ if open {
+ client.process_packets(&server).await;
+ };
+ }
+ if client
+ .make_player
+ .load(std::sync::atomic::Ordering::Relaxed)
+ {
+ let (player, world) = server.add_player(client).await;
+ world
+ .spawn_player(&BASIC_CONFIG, player.clone(), &server)
+ .await;
+
+ // poll Player
+ while !player
+ .client
+ .closed
+ .load(core::sync::atomic::Ordering::Relaxed)
{
- let (player, world) = server.add_player(client).await;
- world
- .spawn_player(&BASIC_CONFIG, player.clone(), &server)
- .await;
-
- // poll Player
- while !player
- .client
- .closed
- .load(core::sync::atomic::Ordering::Relaxed)
- {
- let open = player.client.poll().await;
- if open {
- player.process_packets(&server).await;
- };
- }
- log::debug!("Cleaning up player for id {}", id);
- player.remove().await;
- server.remove_player().await;
+ let open = player.client.poll().await;
+ if open {
+ player.process_packets(&server).await;
+ };
}
- });
- }
- });
-
- tokio_stop_send.send(()).unwrap();
+ log::debug!("Cleaning up player for id {}", id);
+ player.remove().await;
+ server.remove_player().await;
+ }
+ });
+ }
});
-
- tokio_stop_recv.recv().unwrap();
}
fn handle_interrupt() {