Skip to content

Commit

Permalink
use all cpus to load chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
kralverde committed Dec 7, 2024
1 parent a04b23f commit e7b179a
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 125 deletions.
12 changes: 11 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,17 @@ There are several ways you can contribute to Pumpkin:
Write clear and concise commit messages that describe your changes.

### Docs
The Documentation of Pumpkin can be found at https://snowiiii.github.io/Pumpkin/

The Documentation of Pumpkin can be found at <https://snowiiii.github.io/Pumpkin/>

### Coding Guidelines

- **Working with Tokio and Rayon:**
When invoking CPU intensive work, this work should be done in the Rayon thread pool via `rayon::spawn`, Rayon's
parallel iterators, or otherwise instead of from the Tokio runtime. However, it is important that the
Tokio runtime not block on these Rayon calls. Instead, the data should be passed to the Tokio runtime
via async means, for example: `tokio::sync::mpsc`. An example of how to properly do this can be found
in `pumpkin_world::level::Level::fetch_chunks`.

### Additional Information

Expand Down
15 changes: 13 additions & 2 deletions pumpkin-world/src/cylindrical_chunk_iterator.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use itertools::Itertools;
use pumpkin_core::math::vector2::Vector2;

#[derive(Debug, Clone, Copy, PartialEq)]
Expand Down Expand Up @@ -80,10 +81,20 @@ impl Cylindrical {
all_chunks.push(Vector2::new(x, z));
}
}
all_chunks

let mut result = all_chunks
.into_iter()
.filter(|chunk| self.is_within_distance(chunk.x, chunk.z))
.collect()
.collect_vec();

// Sort such that the first chunks are closest to the center
result.sort_unstable_by_key(|pos| {
let rel_x = pos.x - self.center.x;
let rel_z = pos.z - self.center.z;
rel_x * rel_x + rel_z * rel_z
});

result
}
}

Expand Down
12 changes: 5 additions & 7 deletions pumpkin-world/src/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl Level {
/// it is removed from memory. Should only be called on chunks the player was not watching
/// before
pub fn mark_chunks_as_newly_watched(&self, chunks: &[Vector2<i32>]) {
chunks.par_iter().for_each(|chunk| {
chunks.iter().for_each(|chunk| {
self.mark_chunk_as_newly_watched(*chunk);
});
}
Expand All @@ -126,9 +126,9 @@ impl Level {
/// it is removed from memory. Should only be called on chunks the player was watching before
pub fn mark_chunks_as_not_watched(&self, chunks: &[Vector2<i32>]) -> Vec<Vector2<i32>> {
chunks
.par_iter()
.iter()
.filter(|chunk| self.mark_chunk_as_not_watched(**chunk))
.map(|chunk| *chunk)
.copied()
.collect()
}

Expand Down Expand Up @@ -157,7 +157,7 @@ impl Level {
}

pub fn clean_chunks(&self, chunks: &[Vector2<i32>]) {
chunks.par_iter().for_each(|chunk_pos| {
chunks.iter().for_each(|chunk_pos| {
//log::debug!("Unloading {:?}", chunk_pos);
self.clean_chunk(chunk_pos);
});
Expand All @@ -174,7 +174,7 @@ impl Level {
}

pub fn clean_memory(&self, chunks_to_check: &[Vector2<i32>]) {
chunks_to_check.par_iter().for_each(|chunk| {
chunks_to_check.iter().for_each(|chunk| {
if let Some(entry) = self.chunk_watchers.get(chunk) {
if entry.value().is_zero() {
self.chunk_watchers.remove(chunk);
Expand Down Expand Up @@ -212,8 +212,6 @@ impl Level {
}

/// Reads/Generates many chunks in a world
/// MUST be called from a tokio runtime thread
///
/// Note: The order of the output chunks will almost never be in the same order as the order of input chunks
pub fn fetch_chunks(
&self,
Expand Down
3 changes: 3 additions & 0 deletions pumpkin/src/client/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ impl Player {
if combined.crafted_item_slot().is_none() && crafted_item.is_some() {
combined.recipe_used();
}

// TODO: `combined.craft` uses rayon! It should be called from `rayon::spawn` and its
// result passed to the tokio runtime via a channel!
if combined.craft() {
drop(inventory);
self.set_container_content(opened_container.as_deref_mut())
Expand Down
225 changes: 110 additions & 115 deletions pumpkin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,9 @@ const GIT_VERSION: &str = env!("GIT_VERSION");

#[expect(clippy::too_many_lines)]
fn main() {
let time = Instant::now();
init_logger();

//NOTE: ensure rayon is built outside of tokio scope AND the tokio runtime is a rayon task
rayon::ThreadPoolBuilder::new().build_global().unwrap();

let default_panic = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
default_panic(info);
Expand All @@ -131,137 +129,134 @@ fn main() {
log::info!("Report Issues on https://github.com/Snowiiii/Pumpkin/issues");
log::info!("Join our Discord for community support https://discord.com/invite/wT8XjrjKkf");

let (tokio_stop_send, tokio_stop_recv) = std::sync::mpsc::sync_channel(1);
rayon::spawn(move || {
let time = Instant::now();

// NOTE: The tokio runtime must be known by rayon, otherwise the cpu intensive tasks will
// choke the async tasks. Also, there is not need for the tokio to span multiple threads,
// one thread should be sufficient as cpu intensive work should be passed to rayon with the
// runtime only waiting on the results. If you need to call async code from a thread, pass a
// tokio handle from `Handle::current()` from the tokio thread to the code being
// parallelized
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

rt.block_on(async move {
tokio::spawn(async {
setup_sighandler()
.await
.expect("Unable to setup signal handlers");
});

// Setup the TCP server socket.
let listener = tokio::net::TcpListener::bind(BASIC_CONFIG.server_address)
// NOTE: The tokio runtime must be seperate from rayon, otherwise the cpu intensive tasks will
// choke the async tasks. THIS MEANS THERE SHOULD BE NO BLOCKING CALLS TO RAYON FROM THE TOKIO
// RUNTIME TO INCLUDE `par_iter`!!!
// Also, there is no need for the tokio to span multiple threads,
// one thread should be sufficient as cpu intensive work should be passed to rayon with the
// runtime only waiting on the results. If you need to call async code from a thread, pass a
// tokio handle from `Handle::current()` from the tokio thread to the code being
// parallelized
//
// WARNING: All rayon calls from the tokio runtime must be non-blocking! This includes things
// like `par_iter`. These should be spawned in the the rayon pool and then passed to the tokio
// runtime with a channel! See `Level::fetch_chunks` as an example!
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

rt.block_on(async move {
tokio::spawn(async {
setup_sighandler()
.await
.expect("Failed to start TcpListener");
// In the event the user puts 0 for their port, this will allow us to know what port it is running on
let addr = listener
.local_addr()
.expect("Unable to get the address of server!");
.expect("Unable to setup signal handlers");
});

let use_console = ADVANCED_CONFIG.commands.use_console;
let rcon = ADVANCED_CONFIG.rcon.clone();
// Setup the TCP server socket.
let listener = tokio::net::TcpListener::bind(BASIC_CONFIG.server_address)
.await
.expect("Failed to start TcpListener");
// In the event the user puts 0 for their port, this will allow us to know what port it is running on
let addr = listener
.local_addr()
.expect("Unable to get the address of server!");

let server = Arc::new(Server::new());
let mut ticker = Ticker::new(BASIC_CONFIG.tps);
let use_console = ADVANCED_CONFIG.commands.use_console;
let rcon = ADVANCED_CONFIG.rcon.clone();

log::info!("Started Server took {}ms", time.elapsed().as_millis());
log::info!("You now can connect to the server, Listening on {}", addr);
let server = Arc::new(Server::new());
let mut ticker = Ticker::new(BASIC_CONFIG.tps);

if use_console {
setup_console(server.clone());
}
if rcon.enabled {
let server = server.clone();
tokio::spawn(async move {
RCONServer::new(&rcon, server).await.unwrap();
});
}
log::info!("Started Server took {}ms", time.elapsed().as_millis());
log::info!("You now can connect to the server, Listening on {}", addr);

if ADVANCED_CONFIG.query.enabled {
log::info!("Query protocol enabled. Starting...");
tokio::spawn(query::start_query_handler(server.clone(), addr));
}
if use_console {
setup_console(server.clone());
}
if rcon.enabled {
let server = server.clone();
tokio::spawn(async move {
RCONServer::new(&rcon, server).await.unwrap();
});
}

if ADVANCED_CONFIG.lan_broadcast.enabled {
log::info!("LAN broadcast enabled. Starting...");
tokio::spawn(lan_broadcast::start_lan_broadcast(addr));
}
if ADVANCED_CONFIG.query.enabled {
log::info!("Query protocol enabled. Starting...");
tokio::spawn(query::start_query_handler(server.clone(), addr));
}

{
let server = server.clone();
tokio::spawn(async move {
ticker.run(&server).await;
});
}
if ADVANCED_CONFIG.lan_broadcast.enabled {
log::info!("LAN broadcast enabled. Starting...");
tokio::spawn(lan_broadcast::start_lan_broadcast(addr));
}

let mut master_client_id: u16 = 0;
loop {
// Asynchronously wait for an inbound socket.
let (connection, address) = listener.accept().await.unwrap();
{
let server = server.clone();
tokio::spawn(async move {
ticker.run(&server).await;
});
}

if let Err(e) = connection.set_nodelay(true) {
log::warn!("failed to set TCP_NODELAY {e}");
}
let mut master_client_id: u16 = 0;
loop {
// Asynchronously wait for an inbound socket.
let (connection, address) = listener.accept().await.unwrap();

let id = master_client_id;
master_client_id = master_client_id.wrapping_add(1);
if let Err(e) = connection.set_nodelay(true) {
log::warn!("failed to set TCP_NODELAY {e}");
}

log::info!(
"Accepted connection from: {} (id {})",
scrub_address(&format!("{address}")),
id
);
let id = master_client_id;
master_client_id = master_client_id.wrapping_add(1);

let client = Arc::new(Client::new(connection, addr, id));
log::info!(
"Accepted connection from: {} (id {})",
scrub_address(&format!("{address}")),
id
);

let server = server.clone();
tokio::spawn(async move {
while !client.closed.load(std::sync::atomic::Ordering::Relaxed)
&& !client
.make_player
.load(std::sync::atomic::Ordering::Relaxed)
{
let open = client.poll().await;
if open {
client.process_packets(&server).await;
};
}
if client
let client = Arc::new(Client::new(connection, addr, id));

let server = server.clone();
tokio::spawn(async move {
while !client.closed.load(std::sync::atomic::Ordering::Relaxed)
&& !client
.make_player
.load(std::sync::atomic::Ordering::Relaxed)
{
let open = client.poll().await;
if open {
client.process_packets(&server).await;
};
}
if client
.make_player
.load(std::sync::atomic::Ordering::Relaxed)
{
let (player, world) = server.add_player(client).await;
world
.spawn_player(&BASIC_CONFIG, player.clone(), &server)
.await;

// poll Player
while !player
.client
.closed
.load(core::sync::atomic::Ordering::Relaxed)
{
let (player, world) = server.add_player(client).await;
world
.spawn_player(&BASIC_CONFIG, player.clone(), &server)
.await;

// poll Player
while !player
.client
.closed
.load(core::sync::atomic::Ordering::Relaxed)
{
let open = player.client.poll().await;
if open {
player.process_packets(&server).await;
};
}
log::debug!("Cleaning up player for id {}", id);
player.remove().await;
server.remove_player().await;
let open = player.client.poll().await;
if open {
player.process_packets(&server).await;
};
}
});
}
});

tokio_stop_send.send(()).unwrap();
log::debug!("Cleaning up player for id {}", id);
player.remove().await;
server.remove_player().await;
}
});
}
});

tokio_stop_recv.recv().unwrap();
}

fn handle_interrupt() {
Expand Down

0 comments on commit e7b179a

Please sign in to comment.