diff --git a/Cargo.toml b/Cargo.toml index 339bb41d..9e1f2106 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,7 @@ num-derive = "0.4" # Concurrency/Parallelism and Synchronization rayon = "1.10.0" -parking_lot = "0.12.3" +parking_lot = { version = "0.12.3", features = ["send_guard"] } crossbeam = "0.8.4" uuid = { version = "1.11.0", features = ["serde", "v3", "v4"] } diff --git a/pumpkin-protocol/src/lib.rs b/pumpkin-protocol/src/lib.rs index 534b5c23..b95ef21c 100644 --- a/pumpkin-protocol/src/lib.rs +++ b/pumpkin-protocol/src/lib.rs @@ -153,6 +153,20 @@ pub enum PacketError { MalformedLength, } +impl PacketError { + pub fn kickable(&self) -> bool { + // We no longer have a connection, so dont try to kick the player, just close + !matches!( + self, + Self::EncodeData + | Self::EncodeFailedWrite + | Self::FailedWrite(_) + | Self::FailedFinish + | Self::ConnectionWrite + ) + } +} + #[derive(Debug, PartialEq, Clone, Copy)] pub enum ConnectionState { HandShake, diff --git a/pumpkin-world/src/level.rs b/pumpkin-world/src/level.rs index f0b0b8d0..2821936d 100644 --- a/pumpkin-world/src/level.rs +++ b/pumpkin-world/src/level.rs @@ -1,9 +1,13 @@ use std::{path::PathBuf, sync::Arc}; use dashmap::{DashMap, Entry}; +use num_traits::Zero; use pumpkin_core::math::vector2::Vector2; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; -use tokio::sync::{mpsc, RwLock}; +use tokio::{ + sync::{mpsc, RwLock}, + task::JoinHandle, +}; use crate::{ chunk::{ @@ -12,6 +16,8 @@ use crate::{ world_gen::{get_world_gen, Seed, WorldGenerator}, }; +pub type ConcurrentChunkResult = Vec<(Vector2, JoinHandle<()>)>; + /// The `Level` module provides functionality for working with chunks within or outside a Minecraft world. /// /// Key features include: @@ -77,58 +83,108 @@ impl Level { self.loaded_chunks.len() } + pub fn list_cached(&self) { + for entry in self.loaded_chunks.iter() { + log::debug!("In map: {:?}", entry.key()); + } + } + /// Marks chunks as "watched" by a unique player. When no players are watching a chunk, /// it is removed from memory. Should only be called on chunks the player was not watching /// before pub fn mark_chunks_as_newly_watched(&self, chunks: &[Vector2]) { - chunks - .par_iter() - .for_each(|chunk| match self.chunk_watchers.entry(*chunk) { - Entry::Occupied(mut occupied) => { - let value = occupied.get_mut(); - if let Some(new_value) = value.checked_add(1) { - *value = new_value; - //log::debug!("Watch value for {:?}: {}", chunk, value); - } else { - log::error!("Watching overflow on chunk {:?}", chunk); - } - } - Entry::Vacant(vacant) => { - vacant.insert(1); + chunks.par_iter().for_each(|chunk| { + self.mark_chunk_as_newly_watched(*chunk); + }); + } + + pub fn mark_chunk_as_newly_watched(&self, chunk: Vector2) { + match self.chunk_watchers.entry(chunk) { + Entry::Occupied(mut occupied) => { + let value = occupied.get_mut(); + if let Some(new_value) = value.checked_add(1) { + *value = new_value; + //log::debug!("Watch value for {:?}: {}", chunk, value); + } else { + log::error!("Watching overflow on chunk {:?}", chunk); } - }); + } + Entry::Vacant(vacant) => { + vacant.insert(1); + } + } } /// Marks chunks no longer "watched" by a unique player. When no players are watching a chunk, /// it is removed from memory. Should only be called on chunks the player was watching before - pub async fn mark_chunk_as_not_watched_and_clean(&self, chunks: &[Vector2]) { + pub fn mark_chunks_as_not_watched(&self, chunks: &[Vector2]) -> Vec> { chunks .par_iter() - .filter(|chunk| match self.chunk_watchers.entry(**chunk) { - Entry::Occupied(mut occupied) => { - let value = occupied.get_mut(); - *value = value.saturating_sub(1); - if *value == 0 { - occupied.remove_entry(); - true - } else { - false - } - } - Entry::Vacant(_) => { - log::error!( - "Marking a chunk as not watched, but was vacant! ({:?})", - chunk - ); + .filter(|chunk| self.mark_chunk_as_not_watched(**chunk)) + .map(|chunk| *chunk) + .collect() + } + + /// Returns whether the chunk should be removed from memory + pub fn mark_chunk_as_not_watched(&self, chunk: Vector2) -> bool { + match self.chunk_watchers.entry(chunk) { + Entry::Occupied(mut occupied) => { + let value = occupied.get_mut(); + *value = value.saturating_sub(1); + if *value == 0 { + occupied.remove_entry(); + true + } else { false } - }) - .for_each(|chunk_pos| { - //log::debug!("Unloading {:?}", chunk_pos); - if let Some(data) = self.loaded_chunks.remove(chunk_pos) { - self.write_chunk(data); - }; - }); + } + Entry::Vacant(_) => { + // This can be: + // - Player disconnecting before all packets have been sent + // - Player moving so fast that the chunk leaves the render distance before it + // is loaded into memory + log::error!( + "Marking a chunk as not watched, but was vacant! ({:?})", + chunk + ); + false + } + } + } + + pub fn should_pop_chunk(&self, chunk: &Vector2) -> bool { + if let Some(entry) = self.chunk_watchers.get(chunk) { + if entry.value().is_zero() { + self.chunk_watchers.remove(chunk); + } + } + + self.chunk_watchers.get(chunk).is_none() + } + + pub fn clean_chunks(&self, chunks: &[Vector2]) { + chunks.par_iter().for_each(|chunk_pos| { + //log::debug!("Unloading {:?}", chunk_pos); + if let Some(data) = self.loaded_chunks.remove(chunk_pos) { + self.write_chunk(data); + }; + }); + } + + pub fn clean_memory(&self, chunks_to_check: &[Vector2]) { + chunks_to_check.par_iter().for_each(|chunk| { + if let Some(entry) = self.chunk_watchers.get(chunk) { + if entry.value().is_zero() { + self.chunk_watchers.remove(chunk); + } + } + + if self.chunk_watchers.get(chunk).is_none() { + self.loaded_chunks.remove(chunk); + } + }); + self.loaded_chunks.shrink_to_fit(); + self.chunk_watchers.shrink_to_fit(); } pub fn write_chunk(&self, _chunk_to_write: (Vector2, Arc>)) { @@ -157,59 +213,67 @@ impl Level { /// MUST be called from a tokio runtime thread /// /// Note: The order of the output chunks will almost never be in the same order as the order of input chunks - pub async fn fetch_chunks( + pub fn fetch_chunks( &self, chunks: &[Vector2], channel: mpsc::Sender>>, - ) { - chunks.iter().for_each(|at| { - let channel = channel.clone(); - let loaded_chunks = self.loaded_chunks.clone(); - let chunk_reader = self.chunk_reader.clone(); - let save_file = self.save_file.clone(); - let world_gen = self.world_gen.clone(); - let chunk_pos = *at; - - tokio::spawn(async move { - let chunk = loaded_chunks - .get(&chunk_pos) - .map(|entry| entry.value().clone()) - .unwrap_or_else(|| { - let loaded_chunk = save_file - .and_then(|save_file| { - match Self::load_chunk_from_save(chunk_reader, save_file, chunk_pos) - { - Ok(chunk) => chunk, - Err(err) => { - log::error!( - "Failed to read chunk (regenerating) {:?}: {:?}", - chunk_pos, - err - ); - None + ) -> ConcurrentChunkResult { + chunks + .iter() + .map(|at| { + let channel = channel.clone(); + let loaded_chunks = self.loaded_chunks.clone(); + let chunk_reader = self.chunk_reader.clone(); + let save_file = self.save_file.clone(); + let world_gen = self.world_gen.clone(); + let chunk_pos = *at; + + let join_handle = tokio::spawn(async move { + let chunk = loaded_chunks + .get(&chunk_pos) + .map(|entry| entry.value().clone()) + .unwrap_or_else(|| { + let loaded_chunk = save_file + .and_then(|save_file| { + match Self::load_chunk_from_save( + chunk_reader, + save_file, + chunk_pos, + ) { + Ok(chunk) => chunk, + Err(err) => { + log::error!( + "Failed to read chunk (regenerating) {:?}: {:?}", + chunk_pos, + err + ); + None + } } - } - }) - .unwrap_or_else(|| { - Arc::new(RwLock::new(world_gen.generate_chunk(chunk_pos))) - }); - - if let Some(data) = loaded_chunks.get(&chunk_pos) { - // Another thread populated in between the previous check and now - // We did work, but this is basically like a cache miss, not much we - // can do about it - data.value().clone() - } else { - loaded_chunks.insert(chunk_pos, loaded_chunk.clone()); - loaded_chunk - } - }); - - let _ = channel - .send(chunk) - .await - .inspect_err(|err| log::error!("unable to send chunk to channel: {}", err)); - }); - }) + }) + .unwrap_or_else(|| { + Arc::new(RwLock::new(world_gen.generate_chunk(chunk_pos))) + }); + + if let Some(data) = loaded_chunks.get(&chunk_pos) { + // Another thread populated in between the previous check and now + // We did work, but this is basically like a cache miss, not much we + // can do about it + data.value().clone() + } else { + loaded_chunks.insert(chunk_pos, loaded_chunk.clone()); + loaded_chunk + } + }); + + let _ = channel + .send(chunk) + .await + .inspect_err(|err| log::error!("unable to send chunk to channel: {}", err)); + }); + + (*at, join_handle) + }) + .collect() } } diff --git a/pumpkin/Cargo.toml b/pumpkin/Cargo.toml index 4f155c43..5e83ea91 100644 --- a/pumpkin/Cargo.toml +++ b/pumpkin/Cargo.toml @@ -25,6 +25,7 @@ thiserror.workspace = true num-traits.workspace = true num-derive.workspace = true +parking_lot.workspace = true # config serde.workspace = true diff --git a/pumpkin/src/client/mod.rs b/pumpkin/src/client/mod.rs index 4009f5e2..65948c3d 100644 --- a/pumpkin/src/client/mod.rs +++ b/pumpkin/src/client/mod.rs @@ -1,5 +1,5 @@ use std::{ - collections::VecDeque, + collections::{HashMap, VecDeque}, net::SocketAddr, sync::{ atomic::{AtomicBool, AtomicI32}, @@ -16,7 +16,7 @@ use authentication::GameProfile; use crossbeam::atomic::AtomicCell; use num_traits::FromPrimitive; use pumpkin_config::compression::CompressionInfo; -use pumpkin_core::text::TextComponent; +use pumpkin_core::{math::vector2::Vector2, text::TextComponent}; use pumpkin_protocol::{ bytebuf::DeserializerError, client::{config::CConfigDisconnect, login::CLoginDisconnect, play::CPlayDisconnect}, @@ -36,8 +36,12 @@ use pumpkin_protocol::{ }, ClientPacket, ConnectionState, PacketError, RawPacket, ServerPacket, }; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::Mutex; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + sync::Notify, + task::JoinHandle, +}; use thiserror::Error; @@ -89,9 +93,48 @@ impl Default for PlayerConfig { } } +pub struct ChunkHandleWrapper { + handle: Option>, + aborted: bool, +} + +impl ChunkHandleWrapper { + #[must_use] + pub fn new(handle: JoinHandle<()>) -> Self { + Self { + handle: Some(handle), + aborted: false, + } + } + + pub fn abort(&mut self) { + self.aborted = true; + if let Some(handle) = &self.handle { + handle.abort(); + } else { + log::error!("Trying to abort without a handle!"); + } + } + + pub fn take_handle(&mut self) -> JoinHandle<()> { + self.handle.take().unwrap() + } + + #[must_use] + pub fn aborted(&self) -> bool { + self.aborted + } +} + +pub type PlayerPendingChunks = + Arc, VecDeque>>>; + /// Everything which makes a Connection with our Server is a `Client`. /// Client will become Players when they reach the `Play` state pub struct Client { + /// The client id. This is good for coorelating a connection with a player + /// Only used for logging purposes + pub id: u16, /// The client's game profile information. pub gameprofile: Mutex>, /// The client's configuration settings, Optional @@ -121,13 +164,26 @@ pub struct Client { pub client_packets_queue: Arc>>, /// Indicates whether the client should be converted into a player. pub make_player: AtomicBool, + + //TODO: Is there a way to consolidate these two? + //Need to lookup by chunk, but also would be need to contain all the stuff + //In a PendingBatch struct. Is there a cheap way to map multiple keys to a single element? + // + /// Individual chunk tasks that this client is waiting for + pub pending_chunks: PlayerPendingChunks, + /// Chunk batches that this client is waiting for + pub pending_chunk_batch: parking_lot::Mutex>>, + + /// Tell tasks to stop if we are closing + cancel_tasks: Notify, } impl Client { #[must_use] - pub fn new(connection: tokio::net::TcpStream, address: SocketAddr) -> Self { + pub fn new(connection: tokio::net::TcpStream, address: SocketAddr, id: u16) -> Self { let (connection_reader, connection_writer) = connection.into_split(); Self { + id, protocol_version: AtomicI32::new(0), gameprofile: Mutex::new(None), config: Mutex::new(None), @@ -143,6 +199,9 @@ impl Client { closed: AtomicBool::new(false), client_packets_queue: Arc::new(Mutex::new(VecDeque::new())), make_player: AtomicBool::new(false), + pending_chunks: Arc::new(parking_lot::Mutex::new(HashMap::new())), + pending_chunk_batch: parking_lot::Mutex::new(HashMap::new()), + cancel_tasks: Notify::new(), } } @@ -184,7 +243,9 @@ impl Client { // assert!(!self.closed); let mut enc = self.enc.lock().await; if let Err(error) = enc.append_packet(packet) { - self.kick(&error.to_string()).await; + if error.kickable() { + self.kick(&error.to_string()).await; + } return; } @@ -194,10 +255,20 @@ impl Client { .await .map_err(|_| PacketError::ConnectionWrite) { - self.kick(&error.to_string()).await; - } else if let Err(error) = writer.flush().await { - log::warn!("Failed to flush writer for: {}", error.to_string()); + if error.kickable() { + self.kick(&error.to_string()).await; + } } + + /* + else if let Err(error) = writer.flush().await { + log::warn!( + "Failed to flush writer for id {}: {}", + self.id, + error.to_string() + ); + } + */ } pub async fn try_send_packet(&self, packet: &P) -> Result<(), PacketError> { @@ -219,26 +290,44 @@ impl Client { .await .map_err(|_| PacketError::ConnectionWrite)?; + /* writer .flush() .await .map_err(|_| PacketError::ConnectionWrite)?; + */ Ok(()) } + pub async fn await_cancel(&self) { + self.cancel_tasks.notified().await; + } + /// Processes all packets send by the client pub async fn process_packets(&self, server: &Arc) { let mut packet_queue = self.client_packets_queue.lock().await; while let Some(mut packet) = packet_queue.pop_front() { - if let Err(error) = self.handle_packet(server, &mut packet).await { - let text = format!("Error while reading incoming packet {error}"); - log::error!( - "Failed to read incoming packet with id {}: {}", - i32::from(packet.id), - error - ); - self.kick(&text).await; - }; + if self.closed.load(std::sync::atomic::Ordering::Relaxed) { + log::debug!("Canceling client packet processing (pre)"); + return; + } + tokio::select! { + () = self.cancel_tasks.notified() => { + log::debug!("Canceling client packet processing (interrupt)"); + return; + }, + packet_result = self.handle_packet(server, &mut packet) => { + if let Err(error) = packet_result { + let text = format!("Error while reading incoming packet {error}"); + log::error!( + "Failed to read incoming packet with id {}: {}", + i32::from(packet.id), + error + ); + self.kick(&text).await; + }; + } + } } } @@ -400,6 +489,11 @@ impl Client { /// Returns if connection is still open pub async fn poll(&self) -> bool { loop { + if self.closed.load(std::sync::atomic::Ordering::Relaxed) { + // If we manually close (like a kick) we dont want to keep reading bytes + return false; + } + let mut dec = self.dec.lock().await; match dec.decode() { @@ -438,30 +532,29 @@ impl Client { /// Kicks the Client with a reason depending on the connection state pub async fn kick(&self, reason: &str) { - log::info!("Kicking Client for {}", reason); - match self.connection_state.load() { + log::info!("Kicking Client id {} for {}", self.id, reason); + let result = match self.connection_state.load() { ConnectionState::Login => { self.try_send_packet(&CLoginDisconnect::new( &serde_json::to_string_pretty(&reason).unwrap_or_else(|_| String::new()), )) .await - .unwrap_or_else(|_| self.close()); - } - ConnectionState::Config => { - self.try_send_packet(&CConfigDisconnect::new(reason)) - .await - .unwrap_or_else(|_| self.close()); } + ConnectionState::Config => self.try_send_packet(&CConfigDisconnect::new(reason)).await, // This way players get kicked when players using client functions (e.g. poll, send_packet) ConnectionState::Play => { self.try_send_packet(&CPlayDisconnect::new(&TextComponent::text(reason))) .await - .unwrap_or_else(|_| self.close()); } _ => { log::warn!("Can't kick in {:?} State", self.connection_state); + Ok(()) } + }; + if let Err(err) = result { + log::warn!("Failed to kick {}: {}", self.id, err.to_string()); } + log::debug!("Closing connection for {}", self.id); self.close(); } @@ -469,6 +562,25 @@ impl Client { pub fn close(&self) { self.closed .store(true, std::sync::atomic::Ordering::Relaxed); + + // Abort pending chunks here too because we might clean up before chunk tasks are done + self.abort_chunks("closed"); + + self.cancel_tasks.notify_waiters(); + + log::debug!("Closed connection for {}", self.id); + } + + pub fn abort_chunks(&self, reason: &str) { + let mut pending_chunks = self.pending_chunks.lock(); + pending_chunks.iter_mut().for_each(|(chunk, handles)| { + handles.iter_mut().enumerate().for_each(|(count, handle)| { + if !handle.aborted() { + log::debug!("Aborting chunk {:?} ({}) ({})", chunk, count, reason); + handle.abort(); + } + }); + }); } } diff --git a/pumpkin/src/entity/player.rs b/pumpkin/src/entity/player.rs index dc3f245b..fa88d8cd 100644 --- a/pumpkin/src/entity/player.rs +++ b/pumpkin/src/entity/player.rs @@ -7,6 +7,7 @@ use std::{ }; use crossbeam::atomic::AtomicCell; +use itertools::Itertools; use num_derive::FromPrimitive; use num_traits::{FromPrimitive, ToPrimitive}; use pumpkin_core::{ @@ -37,7 +38,7 @@ use pumpkin_world::{cylindrical_chunk_iterator::Cylindrical, item::ItemStack}; use super::Entity; use crate::{ - client::{authentication::GameProfile, Client, PlayerConfig}, + client::{authentication::GameProfile, ChunkHandleWrapper, Client, PlayerConfig}, server::Server, world::World, }; @@ -151,33 +152,141 @@ impl Player { } /// Removes the Player out of the current World + #[allow(unused_variables)] pub async fn remove(&self) { self.living_entity.entity.world.remove_player(self).await; let watched = self.watched_section.load(); let view_distance = i32::from(get_view_distance(self).await); let cylindrical = Cylindrical::new(Vector2::new(watched.x, watched.z), view_distance); - let all_chunks = cylindrical.all_chunks_within(); + + // NOTE: This all must be synchronous to make sense! The chunks are handled asynhrously. + // Non-async code is atomic to async code + + // Radial chunks are all of the chunks the player is theoretically viewing + // Giving enough time, all of these chunks will be in memory + let radial_chunks = cylindrical.all_chunks_within(); log::debug!( - "Removing player id {}, unwatching {} chunks", + "Removing player {} ({}), unwatching {} chunks", self.gameprofile.name, - all_chunks.len() + self.client.id, + radial_chunks.len() ); + + let (watched_chunks, to_await) = { + let mut pending_chunks = self.client.pending_chunks.lock(); + + // Don't try to clean chunks that dont exist yet + // If they are still pending, we never sent the client the chunk, + // And the watcher value is not set + // + // The chunk may or may not be in the cache at this point + let watched_chunks = radial_chunks + .iter() + .filter(|chunk| !pending_chunks.contains_key(chunk)) + .copied() + .collect::>(); + + // Mark all pending chunks to be cancelled + // Cant use abort chunk because we use the lock for more + pending_chunks.iter_mut().for_each(|(chunk, handles)| { + handles.iter_mut().enumerate().for_each(|(count, handle)| { + if !handle.aborted() { + log::debug!("Aborting chunk {:?} ({}) (disconnect)", chunk, count); + handle.abort(); + } + }); + }); + + let to_await = pending_chunks + .iter_mut() + .map(|(chunk, pending)| { + ( + *chunk, + pending + .iter_mut() + .map(ChunkHandleWrapper::take_handle) + .collect_vec(), + ) + }) + .collect_vec(); + + // Return chunks to stop watching and what to wait for + (watched_chunks, to_await) + }; + + // Wait for individual chunks to finish after we cancel them + for (chunk, awaitables) in to_await { + for (count, handle) in awaitables.into_iter().enumerate() { + #[cfg(debug_assertions)] + log::debug!("Waiting for chunk {:?} ({})", chunk, count); + let _ = handle.await; + } + } + + // Allow the batch jobs to properly cull stragglers before we do our clean up + log::debug!("Collecting chunk batches..."); + let batches = { + let mut chunk_batches = self.client.pending_chunk_batch.lock(); + let keys = chunk_batches.keys().copied().collect_vec(); + let handles = keys + .iter() + .filter_map(|batch_id| { + #[cfg(debug_assertions)] + log::debug!("Batch id: {}", batch_id); + chunk_batches.remove(batch_id) + }) + .collect_vec(); + assert!(chunk_batches.is_empty()); + handles + }; + + log::debug!("Awaiting chunk batches ({})...", batches.len()); + + for (count, batch) in batches.into_iter().enumerate() { + #[cfg(debug_assertions)] + log::debug!("Awaiting batch {}", count); + let _ = batch.await; + #[cfg(debug_assertions)] + log::debug!("Done awaiting batch {}", count); + } + log::debug!("Done waiting for chunk batches"); + + // Decrement value of watched chunks + let chunks_to_clean = self + .living_entity + .entity + .world + .mark_chunks_as_not_watched(&watched_chunks); + + // Remove chunks with no watchers from the cache self.living_entity .entity .world - .mark_chunks_as_not_watched(&all_chunks) - .await; + .clean_chunks(&chunks_to_clean); + + // Remove left over entries from all possiblily loaded chunks + self.living_entity.entity.world.clean_memory(&radial_chunks); log::debug!( - "Removed player id {} ({} chunks remain cached)", + "Removed player id {} ({}) ({} chunks remain cached)", self.gameprofile.name, - self.living_entity.entity.world.get_cached_chunk_len().await + self.client.id, + self.living_entity.entity.world.get_cached_chunk_len() ); + + //self.living_entity.entity.world.level.list_cached(); } pub async fn tick(&self) { + if self + .client + .closed + .load(std::sync::atomic::Ordering::Relaxed) + { + return; + } let now = Instant::now(); self.last_attacked_ticks .fetch_add(1, std::sync::atomic::Ordering::Relaxed); @@ -283,18 +392,26 @@ impl Player { /// Kicks the Client with a reason depending on the connection state pub async fn kick<'a>(&self, reason: TextComponent<'a>) { - assert!(!self + if self .client .closed - .load(std::sync::atomic::Ordering::Relaxed)); + .load(std::sync::atomic::Ordering::Relaxed) + { + log::debug!( + "Tried to kick id {} but connection is closed!", + self.client.id + ); + return; + } self.client .try_send_packet(&CPlayDisconnect::new(&reason)) .await .unwrap_or_else(|_| self.client.close()); log::info!( - "Kicked Player {} for {}", + "Kicked Player {} ({}) for {}", self.gameprofile.name, + self.client.id, reason.to_pretty_console() ); self.client.close(); @@ -349,22 +466,34 @@ impl Player { pub async fn process_packets(self: &Arc, server: &Arc) { let mut packets = self.client.client_packets_queue.lock().await; while let Some(mut packet) = packets.pop_back() { - match self.handle_play_packet(server, &mut packet).await { - Ok(()) => {} - Err(e) => { - if e.is_kick() { - if let Some(kick_reason) = e.client_kick_reason() { - self.kick(TextComponent::text(&kick_reason)).await; - } else { - self.kick(TextComponent::text(&format!( - "Error while reading incoming packet {e}" - ))) - .await; + #[cfg(debug_assertions)] + let inst = std::time::Instant::now(); + tokio::select! { + () = self.client.await_cancel() => { + log::debug!("Canceling player packet processing"); + return; + }, + packet_result = self.handle_play_packet(server, &mut packet) => { + #[cfg(debug_assertions)] + log::debug!("Handled play packet in {:?}", inst.elapsed()); + match packet_result { + Ok(()) => {} + Err(e) => { + if e.is_kick() { + if let Some(kick_reason) = e.client_kick_reason() { + self.kick(TextComponent::text(&kick_reason)).await; + } else { + self.kick(TextComponent::text(&format!( + "Error while reading incoming packet {e}" + ))) + .await; + } + } + e.log(); } - } - e.log(); + }; } - }; + } } } diff --git a/pumpkin/src/main.rs b/pumpkin/src/main.rs index 5aece5ac..9823afda 100644 --- a/pumpkin/src/main.rs +++ b/pumpkin/src/main.rs @@ -146,6 +146,7 @@ async fn main() -> io::Result<()> { }); } + let mut master_client_id: u16 = 0; loop { // Asynchronously wait for an inbound socket. let (connection, address) = listener.accept().await?; @@ -154,12 +155,16 @@ async fn main() -> io::Result<()> { log::warn!("failed to set TCP_NODELAY {e}"); } + let id = master_client_id; + master_client_id = master_client_id.wrapping_add(1); + log::info!( - "Accepted connection from: {} ", + "Accepted connection from: {} (id {})", scrub_address(&format!("{address}")), + id ); - let client = Arc::new(Client::new(connection, addr)); + let client = Arc::new(Client::new(connection, addr, id)); let server = server.clone(); tokio::spawn(async move { @@ -190,6 +195,7 @@ async fn main() -> io::Result<()> { player.process_packets(&server).await; }; } + log::debug!("Cleaning up player for id {}", id); player.remove().await; server.remove_player().await; } diff --git a/pumpkin/src/world/mod.rs b/pumpkin/src/world/mod.rs index a20a009a..86deec4d 100644 --- a/pumpkin/src/world/mod.rs +++ b/pumpkin/src/world/mod.rs @@ -1,9 +1,12 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::{hash_map::Entry, HashMap, VecDeque}, + sync::Arc, +}; pub mod player_chunker; use crate::{ - client::Client, + client::{ChunkHandleWrapper, Client}, entity::{player::Player, Entity}, }; use num_traits::ToPrimitive; @@ -28,11 +31,19 @@ use pumpkin_world::coordinates::ChunkRelativeBlockCoordinates; use pumpkin_world::level::Level; use rand::{thread_rng, Rng}; use scoreboard::Scoreboard; -use tokio::sync::{mpsc, RwLock}; use tokio::sync::{mpsc::Receiver, Mutex}; +use tokio::{ + sync::{mpsc, RwLock}, + task::JoinHandle, +}; pub mod scoreboard; +type ChunkReceiver = ( + Vec<(Vector2, JoinHandle<()>)>, + Receiver>>, +); + /// Represents a Minecraft world, containing entities, players, and the underlying level data. /// /// Each dimension (Overworld, Nether, End) typically has its own `World`. @@ -44,7 +55,7 @@ pub mod scoreboard; /// - Provides a central hub for interacting with the world's entities and environment. pub struct World { /// The underlying level, responsible for chunk management and terrain generation. - pub level: Arc>, + pub level: Arc, /// A map of active players within the world, keyed by their unique token. pub current_players: Arc>>>, pub scoreboard: Mutex, @@ -55,7 +66,7 @@ impl World { #[must_use] pub fn load(level: Level) -> Self { Self { - level: Arc::new(Mutex::new(level)), + level: Arc::new(level), current_players: Arc::new(Mutex::new(HashMap::new())), scoreboard: Mutex::new(Scoreboard::new()), } @@ -299,30 +310,67 @@ impl World { player_chunker::player_join(self, player.clone()).await; } - pub async fn mark_chunks_as_not_watched(&self, chunks: &[Vector2]) { - let level = self.level.lock().await; - level.mark_chunk_as_not_watched_and_clean(chunks).await; + pub fn mark_chunks_as_not_watched(&self, chunks: &[Vector2]) -> Vec> { + self.level.mark_chunks_as_not_watched(chunks) } - pub async fn mark_chunks_as_watched(&self, chunks: &[Vector2]) { - let level = self.level.lock().await; - level.mark_chunks_as_newly_watched(chunks); + pub fn clean_chunks(&self, chunks: &[Vector2]) { + self.level.clean_chunks(chunks); } - pub async fn get_cached_chunk_len(&self) -> usize { - let level = self.level.lock().await; - level.loaded_chunk_count() + pub fn clean_memory(&self, chunks_to_check: &[Vector2]) { + self.level.clean_memory(chunks_to_check); } - fn spawn_world_chunks(&self, client: Arc, chunks: Vec>) { + pub fn get_cached_chunk_len(&self) -> usize { + self.level.loaded_chunk_count() + } + + fn spawn_world_chunks(&self, client: Arc, chunks: &[Vector2]) { if client.closed.load(std::sync::atomic::Ordering::Relaxed) { log::info!("The connection has closed before world chunks were spawned",); return; } + #[cfg(debug_assertions)] let inst = std::time::Instant::now(); - let mut receiver = self.receive_chunks(chunks); + // Unique id of this chunk batch for later removal + let id = uuid::Uuid::new_v4(); + + let (pending, mut receiver) = self.receive_chunks(chunks); + { + let mut pending_chunks = client.pending_chunks.lock(); + + for chunk in chunks { + if pending_chunks.contains_key(chunk) { + log::debug!( + "Client id {} is requesting chunk {:?} but its already pending!", + client.id, + chunk + ); + } + } - tokio::spawn(async move { + for (chunk, handle) in pending { + let entry = pending_chunks.entry(chunk); + let wrapper = ChunkHandleWrapper::new(handle); + match entry { + Entry::Occupied(mut entry) => { + entry.get_mut().push_back(wrapper); + } + Entry::Vacant(entry) => { + let mut queue = VecDeque::new(); + queue.push_back(wrapper); + entry.insert(queue); + } + }; + } + } + let pending_chunks = client.pending_chunks.clone(); + let level = self.level.clone(); + let retained_client = client.clone(); + let batch_id = id; + + let handle = tokio::spawn(async move { while let Some(chunk_data) = receiver.recv().await { let chunk_data = chunk_data.read().await; let packet = CChunkData(&chunk_data); @@ -340,15 +388,62 @@ impl World { ); } - // TODO: Queue player packs in a queue so we don't need to check if its closed before - // sending + { + let mut pending_chunks = pending_chunks.lock(); + let handlers = pending_chunks + .get_mut(&chunk_data.position) + .expect("All chunks should be pending"); + let handler = handlers + .pop_front() + .expect("All chunks should have a handler"); + + if handlers.is_empty() { + pending_chunks.remove(&chunk_data.position); + } + + // Chunk loading task was canceled after it was completed + if handler.aborted() { + // We never increment the watch value + if level.should_pop_chunk(&chunk_data.position) { + level.clean_chunks(&[chunk_data.position]); + } + // If ignored, dont send the packet + let loaded_chunks = level.loaded_chunk_count(); + log::debug!( + "Aborted chunk {:?} (post-process) {} cached", + chunk_data.position, + loaded_chunks + ); + + // We dont want to mark this chunk as watched or send it to the client + continue; + } + + // This must be locked with pending + level.mark_chunk_as_newly_watched(chunk_data.position); + }; + if !client.closed.load(std::sync::atomic::Ordering::Relaxed) { client.send_packet(&packet).await; } } - log::debug!("chunks sent after {}ms", inst.elapsed().as_millis()); + { + let mut batch = client.pending_chunk_batch.lock(); + batch.remove(&batch_id); + } + #[cfg(debug_assertions)] + log::debug!( + "chunks sent after {}ms (batch {})", + inst.elapsed().as_millis(), + batch_id + ); }); + + { + let mut batch_handles = retained_client.pending_chunk_batch.lock(); + batch_handles.insert(id, handle); + } } /// Gets a Player by entity id @@ -427,20 +522,14 @@ impl World { } // Stream the chunks (don't collect them and then do stuff with them) - pub fn receive_chunks(&self, chunks: Vec>) -> Receiver>> { + pub fn receive_chunks(&self, chunks: &[Vector2]) -> ChunkReceiver { let (sender, receive) = mpsc::channel(chunks.len()); - { - let level = self.level.clone(); - tokio::spawn(async move { - let level = level.lock().await; - level.fetch_chunks(&chunks, sender).await; - }); - } - receive + let pending_chunks = self.level.fetch_chunks(chunks, sender); + (pending_chunks, receive) } pub async fn receive_chunk(&self, chunk: Vector2) -> Arc> { - let mut receiver = self.receive_chunks(vec![chunk]); + let (_, mut receiver) = self.receive_chunks(&[chunk]); receiver .recv() .await diff --git a/pumpkin/src/world/player_chunker.rs b/pumpkin/src/world/player_chunker.rs index 77e6272a..f129f9c2 100644 --- a/pumpkin/src/world/player_chunker.rs +++ b/pumpkin/src/world/player_chunker.rs @@ -47,8 +47,7 @@ pub async fn player_join(world: &World, player: Arc) { let new_cylindrical = Cylindrical::new(Vector2::new(chunk_pos.x, chunk_pos.z), view_distance); let loading_chunks = new_cylindrical.all_chunks_within(); - world.mark_chunks_as_watched(&loading_chunks).await; - world.spawn_world_chunks(player.client.clone(), loading_chunks); + world.spawn_world_chunks(player.client.clone(), &loading_chunks); } pub async fn update_position(player: &Player) { @@ -91,30 +90,69 @@ pub async fn update_position(player: &Player) { }, ); if !loading_chunks.is_empty() { - entity.world.mark_chunks_as_watched(&loading_chunks).await; + //let inst = std::time::Instant::now(); entity .world - .spawn_world_chunks(player.client.clone(), loading_chunks); + .spawn_world_chunks(player.client.clone(), &loading_chunks); + //log::debug!("Loading chunks took {:?}", inst.elapsed()); } if !unloading_chunks.is_empty() { - entity - .world - .mark_chunks_as_not_watched(&unloading_chunks) - .await; - // we may don't need to iter twice - for chunk in unloading_chunks { - if !player - .client - .closed - .load(std::sync::atomic::Ordering::Relaxed) - { - player - .client + // We want to check if this chunk is still pending + // if it is -> ignore + + //let inst = std::time::Instant::now(); + + let watched_chunks: Vec<_> = { + let mut pending_chunks = player.client.pending_chunks.lock(); + unloading_chunks + .into_iter() + .filter(|chunk| { + if let Some(handles) = pending_chunks.get_mut(chunk) { + if let Some((count, handle)) = handles + .iter_mut() + .rev() + .enumerate() + .find(|(_, handle)| !handle.aborted()) + { + log::debug!("Aborting chunk {:?} ({}) (unload)", chunk, count); + // We want to abort the last queued chunk, that we if a client still + // has a pending request for this chunk, we dont need to do the work + // twice + handle.abort(); + } else { + log::warn!( + "Aborting chunk {:?} but all were already aborted!", + chunk + ); + } + false + } else { + true + } + }) + .collect() + }; + + //log::debug!("Unloading chunks took {:?} (1)", inst.elapsed()); + let chunks_to_clean = entity.world.mark_chunks_as_not_watched(&watched_chunks); + entity.world.clean_chunks(&chunks_to_clean); + + //log::debug!("Unloading chunks took {:?} (2)", inst.elapsed()); + // This can take a little if we are sending a bunch of packets, queue it up :p + let client = player.client.clone(); + tokio::spawn(async move { + for chunk in watched_chunks { + if client.closed.load(std::sync::atomic::Ordering::Relaxed) { + // We will never un-close a connection + break; + } + client .send_packet(&CUnloadChunk::new(chunk.x, chunk.z)) .await; } - } + }); + //log::debug!("Unloading chunks took {:?} (3)", inst.elapsed()); } } }