Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use all cpus for intensive tasks #381

Merged
merged 3 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,17 @@ There are several ways you can contribute to Pumpkin:
Write clear and concise commit messages that describe your changes.

### Docs
The Documentation of Pumpkin can be found at https://snowiiii.github.io/Pumpkin/

The Documentation of Pumpkin can be found at <https://snowiiii.github.io/Pumpkin/>

### Coding Guidelines

- **Working with Tokio and Rayon:**
When invoking CPU intensive work, this work should be done in the Rayon thread pool via `rayon::spawn`, Rayon's
parallel iterators, or otherwise instead of from the Tokio runtime. However, it is important that the
Tokio runtime not block on these Rayon calls. Instead, the data should be passed to the Tokio runtime
via async means, for example: `tokio::sync::mpsc`. An example of how to properly do this can be found
in `pumpkin_world::level::Level::fetch_chunks`.

### Additional Information

Expand Down
15 changes: 13 additions & 2 deletions pumpkin-world/src/cylindrical_chunk_iterator.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use itertools::Itertools;
use pumpkin_core::math::vector2::Vector2;

#[derive(Debug, Clone, Copy, PartialEq)]
Expand Down Expand Up @@ -80,10 +81,20 @@ impl Cylindrical {
all_chunks.push(Vector2::new(x, z));
}
}
all_chunks

let mut result = all_chunks
.into_iter()
.filter(|chunk| self.is_within_distance(chunk.x, chunk.z))
.collect()
.collect_vec();

// Sort such that the first chunks are closest to the center
result.sort_unstable_by_key(|pos| {
let rel_x = pos.x - self.center.x;
let rel_z = pos.z - self.center.z;
rel_x * rel_x + rel_z * rel_z
});

result
}
}

Expand Down
12 changes: 5 additions & 7 deletions pumpkin-world/src/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl Level {
/// it is removed from memory. Should only be called on chunks the player was not watching
/// before
pub fn mark_chunks_as_newly_watched(&self, chunks: &[Vector2<i32>]) {
chunks.par_iter().for_each(|chunk| {
chunks.iter().for_each(|chunk| {
self.mark_chunk_as_newly_watched(*chunk);
});
}
Expand All @@ -126,9 +126,9 @@ impl Level {
/// it is removed from memory. Should only be called on chunks the player was watching before
pub fn mark_chunks_as_not_watched(&self, chunks: &[Vector2<i32>]) -> Vec<Vector2<i32>> {
chunks
.par_iter()
.iter()
.filter(|chunk| self.mark_chunk_as_not_watched(**chunk))
.map(|chunk| *chunk)
.copied()
.collect()
}

Expand Down Expand Up @@ -157,7 +157,7 @@ impl Level {
}

pub fn clean_chunks(&self, chunks: &[Vector2<i32>]) {
chunks.par_iter().for_each(|chunk_pos| {
chunks.iter().for_each(|chunk_pos| {
//log::debug!("Unloading {:?}", chunk_pos);
self.clean_chunk(chunk_pos);
});
Expand All @@ -174,7 +174,7 @@ impl Level {
}

pub fn clean_memory(&self, chunks_to_check: &[Vector2<i32>]) {
chunks_to_check.par_iter().for_each(|chunk| {
chunks_to_check.iter().for_each(|chunk| {
if let Some(entry) = self.chunk_watchers.get(chunk) {
if entry.value().is_zero() {
self.chunk_watchers.remove(chunk);
Expand Down Expand Up @@ -212,8 +212,6 @@ impl Level {
}

/// Reads/Generates many chunks in a world
/// MUST be called from a tokio runtime thread
///
/// Note: The order of the output chunks will almost never be in the same order as the order of input chunks
pub fn fetch_chunks(
&self,
Expand Down
3 changes: 3 additions & 0 deletions pumpkin/src/client/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ impl Player {
if combined.crafted_item_slot().is_none() && crafted_item.is_some() {
combined.recipe_used();
}

// TODO: `combined.craft` uses rayon! It should be called from `rayon::spawn` and its
// result passed to the tokio runtime via a channel!
if combined.craft() {
drop(inventory);
self.set_container_content(opened_container.as_deref_mut())
Expand Down
224 changes: 102 additions & 122 deletions pumpkin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,15 @@ const fn convert_logger_filter(level: pumpkin_config::logging::LevelFilter) -> L
const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
const GIT_VERSION: &str = env!("GIT_VERSION");

// WARNING: All rayon calls from the tokio runtime must be non-blocking! This includes things
// like `par_iter`. These should be spawned in the the rayon pool and then passed to the tokio
// runtime with a channel! See `Level::fetch_chunks` as an example!
#[tokio::main]
#[expect(clippy::too_many_lines)]
fn main() {
async fn main() {
let time = Instant::now();
init_logger();

//NOTE: ensure rayon is built outside of tokio scope AND the tokio runtime is a rayon task
rayon::ThreadPoolBuilder::new().build_global().unwrap();

let default_panic = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
default_panic(info);
Expand All @@ -131,137 +133,115 @@ fn main() {
log::info!("Report Issues on https://github.com/Snowiiii/Pumpkin/issues");
log::info!("Join our Discord for community support https://discord.com/invite/wT8XjrjKkf");

let (tokio_stop_send, tokio_stop_recv) = std::sync::mpsc::sync_channel(1);
rayon::spawn(move || {
let time = Instant::now();

// NOTE: The tokio runtime must be known by rayon, otherwise the cpu intensive tasks will
// choke the async tasks. Also, there is not need for the tokio to span multiple threads,
// one thread should be sufficient as cpu intensive work should be passed to rayon with the
// runtime only waiting on the results. If you need to call async code from a thread, pass a
// tokio handle from `Handle::current()` from the tokio thread to the code being
// parallelized
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

rt.block_on(async move {
tokio::spawn(async {
setup_sighandler()
.await
.expect("Unable to setup signal handlers");
});

// Setup the TCP server socket.
let listener = tokio::net::TcpListener::bind(BASIC_CONFIG.server_address)
.await
.expect("Failed to start TcpListener");
// In the event the user puts 0 for their port, this will allow us to know what port it is running on
let addr = listener
.local_addr()
.expect("Unable to get the address of server!");
tokio::spawn(async {
setup_sighandler()
.await
.expect("Unable to setup signal handlers");
});

let use_console = ADVANCED_CONFIG.commands.use_console;
let rcon = ADVANCED_CONFIG.rcon.clone();
// Setup the TCP server socket.
let listener = tokio::net::TcpListener::bind(BASIC_CONFIG.server_address)
.await
.expect("Failed to start TcpListener");
// In the event the user puts 0 for their port, this will allow us to know what port it is running on
let addr = listener
.local_addr()
.expect("Unable to get the address of server!");

let server = Arc::new(Server::new());
let mut ticker = Ticker::new(BASIC_CONFIG.tps);
let use_console = ADVANCED_CONFIG.commands.use_console;
let rcon = ADVANCED_CONFIG.rcon.clone();

log::info!("Started Server took {}ms", time.elapsed().as_millis());
log::info!("You now can connect to the server, Listening on {}", addr);
let server = Arc::new(Server::new());
let mut ticker = Ticker::new(BASIC_CONFIG.tps);

if use_console {
setup_console(server.clone());
}
if rcon.enabled {
let server = server.clone();
tokio::spawn(async move {
RCONServer::new(&rcon, server).await.unwrap();
});
}
log::info!("Started Server took {}ms", time.elapsed().as_millis());
log::info!("You now can connect to the server, Listening on {}", addr);

if ADVANCED_CONFIG.query.enabled {
log::info!("Query protocol enabled. Starting...");
tokio::spawn(query::start_query_handler(server.clone(), addr));
}
if use_console {
setup_console(server.clone());
}
if rcon.enabled {
let server = server.clone();
tokio::spawn(async move {
RCONServer::new(&rcon, server).await.unwrap();
});
}

if ADVANCED_CONFIG.lan_broadcast.enabled {
log::info!("LAN broadcast enabled. Starting...");
tokio::spawn(lan_broadcast::start_lan_broadcast(addr));
}
if ADVANCED_CONFIG.query.enabled {
log::info!("Query protocol enabled. Starting...");
tokio::spawn(query::start_query_handler(server.clone(), addr));
}

if ADVANCED_CONFIG.lan_broadcast.enabled {
log::info!("LAN broadcast enabled. Starting...");
tokio::spawn(lan_broadcast::start_lan_broadcast(addr));
}

{
let server = server.clone();
tokio::spawn(async move {
ticker.run(&server).await;
});
}

let mut master_client_id: u16 = 0;
loop {
// Asynchronously wait for an inbound socket.
let (connection, address) = listener.accept().await.unwrap();

if let Err(e) = connection.set_nodelay(true) {
log::warn!("failed to set TCP_NODELAY {e}");
}

let id = master_client_id;
master_client_id = master_client_id.wrapping_add(1);

log::info!(
"Accepted connection from: {} (id {})",
scrub_address(&format!("{address}")),
id
);

let client = Arc::new(Client::new(connection, addr, id));

let server = server.clone();
tokio::spawn(async move {
while !client.closed.load(std::sync::atomic::Ordering::Relaxed)
&& !client
.make_player
.load(std::sync::atomic::Ordering::Relaxed)
{
let server = server.clone();
tokio::spawn(async move {
ticker.run(&server).await;
});
let open = client.poll().await;
if open {
client.process_packets(&server).await;
};
}
if client
.make_player
.load(std::sync::atomic::Ordering::Relaxed)
{
let (player, world) = server.add_player(client).await;
world
.spawn_player(&BASIC_CONFIG, player.clone(), &server)
.await;

let mut master_client_id: u16 = 0;
loop {
// Asynchronously wait for an inbound socket.
let (connection, address) = listener.accept().await.unwrap();

if let Err(e) = connection.set_nodelay(true) {
log::warn!("failed to set TCP_NODELAY {e}");
// poll Player
while !player
.client
.closed
.load(core::sync::atomic::Ordering::Relaxed)
{
let open = player.client.poll().await;
if open {
player.process_packets(&server).await;
};
}

let id = master_client_id;
master_client_id = master_client_id.wrapping_add(1);

log::info!(
"Accepted connection from: {} (id {})",
scrub_address(&format!("{address}")),
id
);

let client = Arc::new(Client::new(connection, addr, id));

let server = server.clone();
tokio::spawn(async move {
while !client.closed.load(std::sync::atomic::Ordering::Relaxed)
&& !client
.make_player
.load(std::sync::atomic::Ordering::Relaxed)
{
let open = client.poll().await;
if open {
client.process_packets(&server).await;
};
}
if client
.make_player
.load(std::sync::atomic::Ordering::Relaxed)
{
let (player, world) = server.add_player(client).await;
world
.spawn_player(&BASIC_CONFIG, player.clone(), &server)
.await;

// poll Player
while !player
.client
.closed
.load(core::sync::atomic::Ordering::Relaxed)
{
let open = player.client.poll().await;
if open {
player.process_packets(&server).await;
};
}
log::debug!("Cleaning up player for id {}", id);
player.remove().await;
server.remove_player().await;
}
});
log::debug!("Cleaning up player for id {}", id);
player.remove().await;
server.remove_player().await;
}
});

tokio_stop_send.send(()).unwrap();
});

tokio_stop_recv.recv().unwrap();
}
}

fn handle_interrupt() {
Expand Down
Loading