Skip to content

Commit

Permalink
make main async again
Browse files Browse the repository at this point in the history
  • Loading branch information
kralverde committed Dec 7, 2024
1 parent e7b179a commit d6a02c6
Showing 1 changed file with 105 additions and 111 deletions.
216 changes: 105 additions & 111 deletions pumpkin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,21 @@ const fn convert_logger_filter(level: pumpkin_config::logging::LevelFilter) -> L
const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
const GIT_VERSION: &str = env!("GIT_VERSION");

// NOTE: The tokio runtime must be seperate from rayon, otherwise the cpu intensive tasks will
// choke the async tasks. THIS MEANS THERE SHOULD BE NO BLOCKING CALLS TO RAYON FROM THE TOKIO
// RUNTIME TO INCLUDE `par_iter`!!!
// Also, there is no need for the tokio to span multiple threads,
// one thread should be sufficient as cpu intensive work should be passed to rayon with the
// runtime only waiting on the results. If you need to call async code from a thread, pass a
// tokio handle from `Handle::current()` from the tokio thread to the code being
// parallelized
//
// WARNING: All rayon calls from the tokio runtime must be non-blocking! This includes things
// like `par_iter`. These should be spawned in the the rayon pool and then passed to the tokio
// runtime with a channel! See `Level::fetch_chunks` as an example!
#[tokio::main]
#[expect(clippy::too_many_lines)]
fn main() {
async fn main() {
let time = Instant::now();
init_logger();

Expand Down Expand Up @@ -129,134 +142,115 @@ fn main() {
log::info!("Report Issues on https://github.com/Snowiiii/Pumpkin/issues");
log::info!("Join our Discord for community support https://discord.com/invite/wT8XjrjKkf");

// NOTE: The tokio runtime must be seperate from rayon, otherwise the cpu intensive tasks will
// choke the async tasks. THIS MEANS THERE SHOULD BE NO BLOCKING CALLS TO RAYON FROM THE TOKIO
// RUNTIME TO INCLUDE `par_iter`!!!
// Also, there is no need for the tokio to span multiple threads,
// one thread should be sufficient as cpu intensive work should be passed to rayon with the
// runtime only waiting on the results. If you need to call async code from a thread, pass a
// tokio handle from `Handle::current()` from the tokio thread to the code being
// parallelized
//
// WARNING: All rayon calls from the tokio runtime must be non-blocking! This includes things
// like `par_iter`. These should be spawned in the the rayon pool and then passed to the tokio
// runtime with a channel! See `Level::fetch_chunks` as an example!
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

rt.block_on(async move {
tokio::spawn(async {
setup_sighandler()
.await
.expect("Unable to setup signal handlers");
});

// Setup the TCP server socket.
let listener = tokio::net::TcpListener::bind(BASIC_CONFIG.server_address)
tokio::spawn(async {
setup_sighandler()
.await
.expect("Failed to start TcpListener");
// In the event the user puts 0 for their port, this will allow us to know what port it is running on
let addr = listener
.local_addr()
.expect("Unable to get the address of server!");
.expect("Unable to setup signal handlers");
});

let use_console = ADVANCED_CONFIG.commands.use_console;
let rcon = ADVANCED_CONFIG.rcon.clone();
// Setup the TCP server socket.
let listener = tokio::net::TcpListener::bind(BASIC_CONFIG.server_address)
.await
.expect("Failed to start TcpListener");
// In the event the user puts 0 for their port, this will allow us to know what port it is running on
let addr = listener
.local_addr()
.expect("Unable to get the address of server!");

let server = Arc::new(Server::new());
let mut ticker = Ticker::new(BASIC_CONFIG.tps);
let use_console = ADVANCED_CONFIG.commands.use_console;
let rcon = ADVANCED_CONFIG.rcon.clone();

log::info!("Started Server took {}ms", time.elapsed().as_millis());
log::info!("You now can connect to the server, Listening on {}", addr);
let server = Arc::new(Server::new());
let mut ticker = Ticker::new(BASIC_CONFIG.tps);

if use_console {
setup_console(server.clone());
}
if rcon.enabled {
let server = server.clone();
tokio::spawn(async move {
RCONServer::new(&rcon, server).await.unwrap();
});
}
log::info!("Started Server took {}ms", time.elapsed().as_millis());
log::info!("You now can connect to the server, Listening on {}", addr);

if ADVANCED_CONFIG.query.enabled {
log::info!("Query protocol enabled. Starting...");
tokio::spawn(query::start_query_handler(server.clone(), addr));
}
if use_console {
setup_console(server.clone());
}
if rcon.enabled {
let server = server.clone();
tokio::spawn(async move {
RCONServer::new(&rcon, server).await.unwrap();
});
}

if ADVANCED_CONFIG.lan_broadcast.enabled {
log::info!("LAN broadcast enabled. Starting...");
tokio::spawn(lan_broadcast::start_lan_broadcast(addr));
}
if ADVANCED_CONFIG.query.enabled {
log::info!("Query protocol enabled. Starting...");
tokio::spawn(query::start_query_handler(server.clone(), addr));
}

{
let server = server.clone();
tokio::spawn(async move {
ticker.run(&server).await;
});
}
if ADVANCED_CONFIG.lan_broadcast.enabled {
log::info!("LAN broadcast enabled. Starting...");
tokio::spawn(lan_broadcast::start_lan_broadcast(addr));
}

let mut master_client_id: u16 = 0;
loop {
// Asynchronously wait for an inbound socket.
let (connection, address) = listener.accept().await.unwrap();
{
let server = server.clone();
tokio::spawn(async move {
ticker.run(&server).await;
});
}

if let Err(e) = connection.set_nodelay(true) {
log::warn!("failed to set TCP_NODELAY {e}");
}
let mut master_client_id: u16 = 0;
loop {
// Asynchronously wait for an inbound socket.
let (connection, address) = listener.accept().await.unwrap();

let id = master_client_id;
master_client_id = master_client_id.wrapping_add(1);
if let Err(e) = connection.set_nodelay(true) {
log::warn!("failed to set TCP_NODELAY {e}");
}

log::info!(
"Accepted connection from: {} (id {})",
scrub_address(&format!("{address}")),
id
);
let id = master_client_id;
master_client_id = master_client_id.wrapping_add(1);

let client = Arc::new(Client::new(connection, addr, id));
log::info!(
"Accepted connection from: {} (id {})",
scrub_address(&format!("{address}")),
id
);

let server = server.clone();
tokio::spawn(async move {
while !client.closed.load(std::sync::atomic::Ordering::Relaxed)
&& !client
.make_player
.load(std::sync::atomic::Ordering::Relaxed)
{
let open = client.poll().await;
if open {
client.process_packets(&server).await;
};
}
if client
let client = Arc::new(Client::new(connection, addr, id));

let server = server.clone();
tokio::spawn(async move {
while !client.closed.load(std::sync::atomic::Ordering::Relaxed)
&& !client
.make_player
.load(std::sync::atomic::Ordering::Relaxed)
{
let open = client.poll().await;
if open {
client.process_packets(&server).await;
};
}
if client
.make_player
.load(std::sync::atomic::Ordering::Relaxed)
{
let (player, world) = server.add_player(client).await;
world
.spawn_player(&BASIC_CONFIG, player.clone(), &server)
.await;

// poll Player
while !player
.client
.closed
.load(core::sync::atomic::Ordering::Relaxed)
{
let (player, world) = server.add_player(client).await;
world
.spawn_player(&BASIC_CONFIG, player.clone(), &server)
.await;

// poll Player
while !player
.client
.closed
.load(core::sync::atomic::Ordering::Relaxed)
{
let open = player.client.poll().await;
if open {
player.process_packets(&server).await;
};
}
log::debug!("Cleaning up player for id {}", id);
player.remove().await;
server.remove_player().await;
let open = player.client.poll().await;
if open {
player.process_packets(&server).await;
};
}
});
}
});
log::debug!("Cleaning up player for id {}", id);
player.remove().await;
server.remove_player().await;
}
});
}
}

fn handle_interrupt() {
Expand Down

0 comments on commit d6a02c6

Please sign in to comment.