diff --git a/pumpkin-world/src/chunk/anvil.rs b/pumpkin-world/src/chunk/anvil.rs index db3ce9032..983b3fea1 100644 --- a/pumpkin-world/src/chunk/anvil.rs +++ b/pumpkin-world/src/chunk/anvil.rs @@ -88,7 +88,7 @@ impl ChunkReader for AnvilChunkReader { fn read_chunk( &self, save_file: &SaveFile, - at: pumpkin_core::math::vector2::Vector2, + at: &pumpkin_core::math::vector2::Vector2, ) -> Result { let region = ( ((at.x as f32) / 32.0).floor() as i32, @@ -158,6 +158,6 @@ impl ChunkReader for AnvilChunkReader { .decompress_data(chunk_data) .map_err(ChunkReadingError::Compression)?; - ChunkData::from_bytes(decompressed_chunk, at).map_err(ChunkReadingError::ParsingError) + ChunkData::from_bytes(decompressed_chunk, *at).map_err(ChunkReadingError::ParsingError) } } diff --git a/pumpkin-world/src/chunk/mod.rs b/pumpkin-world/src/chunk/mod.rs index 645c4c3cb..c75de68a6 100644 --- a/pumpkin-world/src/chunk/mod.rs +++ b/pumpkin-world/src/chunk/mod.rs @@ -24,7 +24,7 @@ pub trait ChunkReader: Sync + Send { fn read_chunk( &self, save_file: &SaveFile, - at: Vector2, + at: &Vector2, ) -> Result; } diff --git a/pumpkin-world/src/cylindrical_chunk_iterator.rs b/pumpkin-world/src/cylindrical_chunk_iterator.rs index 185505d28..0f08f4286 100644 --- a/pumpkin-world/src/cylindrical_chunk_iterator.rs +++ b/pumpkin-world/src/cylindrical_chunk_iterator.rs @@ -67,7 +67,6 @@ impl Cylindrical { self.center.z + self.view_distance + 1 } - #[allow(dead_code)] fn is_within_distance(&self, x: i32, z: i32) -> bool { let max_dist_squared = self.view_distance * self.view_distance; let max_dist = self.view_distance as i64; @@ -76,4 +75,19 @@ impl Cylindrical { let dist_squared = dist_x.pow(2) + (max_dist.min(dist_z as i64) as i32).pow(2); dist_squared < max_dist_squared } + + /// Returns an iterator of all chunks within this cylinder + pub fn all_chunks_within(&self) -> Vec> { + // This is a naive implementation: start with square and cut out ones that dont fit + let mut all_chunks = Vec::new(); + for x in -self.view_distance..=self.view_distance { + for z in -self.view_distance..=self.view_distance { + all_chunks.push(Vector2::new(self.center.x + x, self.center.z + z)); + } + } + all_chunks + .into_iter() + .filter(|chunk| self.is_within_distance(chunk.x, chunk.z)) + .collect() + } } diff --git a/pumpkin-world/src/level.rs b/pumpkin-world/src/level.rs index e7e5327cb..5cfae743d 100644 --- a/pumpkin-world/src/level.rs +++ b/pumpkin-world/src/level.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, path::PathBuf, sync::Arc}; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; use pumpkin_core::math::vector2::Vector2; use rayon::prelude::*; use tokio::sync::mpsc; @@ -23,7 +23,8 @@ use crate::{ /// For more details on world generation, refer to the `WorldGenerator` module. pub struct Level { save_file: Option, - loaded_chunks: Arc, Arc>>>, + loaded_chunks: Arc, Arc>>>, + chunk_watchers: Arc, usize>>>, chunk_reader: Box, world_gen: Box, } @@ -52,7 +53,8 @@ impl Level { region_folder, }), chunk_reader: Box::new(AnvilChunkReader::new()), - loaded_chunks: Arc::new(Mutex::new(HashMap::new())), + loaded_chunks: Arc::new(RwLock::new(HashMap::new())), + chunk_watchers: Arc::new(Mutex::new(HashMap::new())), } } else { log::warn!( @@ -63,63 +65,131 @@ impl Level { world_gen, save_file: None, chunk_reader: Box::new(AnvilChunkReader::new()), - loaded_chunks: Arc::new(Mutex::new(HashMap::new())), + loaded_chunks: Arc::new(RwLock::new(HashMap::new())), + chunk_watchers: Arc::new(Mutex::new(HashMap::new())), } } } pub fn get_block() {} + /// Marks chunks as "watched" by a unique player. When no players are watching a chunk, + /// it is removed from memory. Should only be called on chunks the player was not watching + /// before + pub fn mark_chunk_as_newly_watched(&self, chunks: &[Vector2]) { + let mut watchers = self.chunk_watchers.lock(); + for chunk in chunks { + match watchers.entry(*chunk) { + std::collections::hash_map::Entry::Occupied(mut occupied) => { + let value = occupied.get_mut(); + *value = value.saturating_add(1); + } + std::collections::hash_map::Entry::Vacant(vacant) => { + vacant.insert(1); + } + } + } + } + + /// Marks chunks no longer "watched" by a unique player. When no players are watching a chunk, + /// it is removed from memory. Should only be called on chunks the player was watching before + pub fn mark_chunk_as_not_watched_and_clean(&self, chunks: &[Vector2]) { + let dropped_chunks = { + let mut watchers = self.chunk_watchers.lock(); + chunks + .iter() + .filter(|chunk| match watchers.entry(**chunk) { + std::collections::hash_map::Entry::Occupied(mut occupied) => { + let value = occupied.get_mut(); + *value = value.saturating_sub(1); + if *value == 0 { + occupied.remove_entry(); + true + } else { + false + } + } + std::collections::hash_map::Entry::Vacant(_) => { + log::error!( + "Marking a chunk as not watched, but was vacant! ({:?})", + chunk + ); + false + } + }) + .collect::>() + }; + let mut loaded_chunks = self.loaded_chunks.write(); + let dropped_chunk_data = dropped_chunks + .iter() + .filter_map(|chunk| { + log::debug!("Unloading chunk {:?}", chunk); + loaded_chunks.remove_entry(*chunk) + }) + .collect(); + self.write_chunks(dropped_chunk_data); + } + + pub fn write_chunks(&self, _chunks_to_write: Vec<(Vector2, Arc)>) { + //TODO + } + /// Reads/Generates many chunks in a world /// MUST be called from a tokio runtime thread /// /// Note: The order of the output chunks will almost never be in the same order as the order of input chunks - pub fn fetch_chunks( - &self, - chunks: &[Vector2], - channel: mpsc::Sender>, - is_alive: bool, - ) { + + pub fn fetch_chunks(&self, chunks: &[Vector2], channel: mpsc::Sender>) { chunks.into_par_iter().for_each(|at| { - if is_alive { - return; - } - let mut loaded_chunks = self.loaded_chunks.lock(); let channel = channel.clone(); - // Check if chunks is already loaded - if loaded_chunks.contains_key(at) { - channel - .blocking_send(loaded_chunks.get(at).unwrap().clone()) - .expect("Failed sending ChunkData."); - return; + let maybe_chunk = { + let loaded_chunks = self.loaded_chunks.read(); + loaded_chunks.get(at).cloned() } - let at = *at; - let data = match &self.save_file { - Some(save_file) => { - match self.chunk_reader.read_chunk(save_file, at) { - Err( - ChunkReadingError::ParsingError(ChunkParsingError::ChunkNotGenerated) - | ChunkReadingError::ChunkNotExist, - ) => { - // This chunk was not generated yet. - Ok(self.world_gen.generate_chunk(at)) + .or_else(|| { + let chunk_data = match &self.save_file { + Some(save_file) => { + match self.chunk_reader.read_chunk(save_file, at) { + Ok(data) => Ok(Arc::new(data)), + Err( + ChunkReadingError::ChunkNotExist + | ChunkReadingError::ParsingError( + ChunkParsingError::ChunkNotGenerated, + ), + ) => { + // This chunk was not generated yet. + let chunk = Arc::new(self.world_gen.generate_chunk(*at)); + Ok(chunk) + } + Err(err) => Err(err), } - // TODO this doesn't warn the user about the error. fix. - result => result, + } + None => { + // There is no savefile yet -> generate the chunks + let chunk = Arc::new(self.world_gen.generate_chunk(*at)); + Ok(chunk) + } + }; + match chunk_data { + Ok(data) => Some(data), + Err(err) => { + // TODO: Panic here? + log::warn!("Failed to read chunk {:?}: {:?}", at, err); + None } } + }); + match maybe_chunk { + Some(chunk) => { + channel + .blocking_send(chunk.clone()) + .expect("Failed sending ChunkData."); + } None => { - // There is no savefile yet -> generate the chunks - Ok(self.world_gen.generate_chunk(at)) + log::error!("Unable to send chunk {:?}!", at); } - } - .unwrap(); - let data = Arc::new(data); - channel - .blocking_send(data.clone()) - .expect("Failed sending ChunkData."); - loaded_chunks.insert(at, data); + }; }) } } diff --git a/pumpkin/src/entity/player.rs b/pumpkin/src/entity/player.rs index bdb3640d0..6927abfc1 100644 --- a/pumpkin/src/entity/player.rs +++ b/pumpkin/src/entity/player.rs @@ -7,7 +7,7 @@ use crossbeam::atomic::AtomicCell; use num_derive::FromPrimitive; use num_traits::ToPrimitive; use pumpkin_core::{ - math::{boundingbox::BoundingBox, position::WorldPosition, vector3::Vector3}, + math::{boundingbox::BoundingBox, position::WorldPosition, vector2::Vector2, vector3::Vector3}, text::TextComponent, GameMode, }; @@ -30,15 +30,15 @@ use pumpkin_protocol::{ use tokio::sync::Mutex; use pumpkin_protocol::server::play::{SCloseContainer, SKeepAlive}; -use pumpkin_world::item::ItemStack; +use pumpkin_world::{cylindrical_chunk_iterator::Cylindrical, item::ItemStack}; use super::Entity; -use crate::error::PumpkinError; use crate::{ client::{authentication::GameProfile, Client, PlayerConfig}, server::Server, - world::World, + world::{player_chunker::chunk_section_from_pos, World}, }; +use crate::{error::PumpkinError, world::player_chunker::get_view_distance}; use super::living::LivingEntity; @@ -139,6 +139,15 @@ impl Player { /// Removes the Player out of the current World pub async fn remove(&self) { self.living_entity.entity.world.remove_player(self).await; + + let watched = chunk_section_from_pos(&self.living_entity.entity.block_pos.load()); + let view_distance = get_view_distance(self).await as i32; + let cylindrical = Cylindrical::new(Vector2::new(watched.x, watched.z), view_distance); + self.living_entity + .entity + .world + .mark_chunks_as_not_watched(&cylindrical.all_chunks_within()) + .await; } pub const fn entity_id(&self) -> EntityId { diff --git a/pumpkin/src/main.rs b/pumpkin/src/main.rs index 7445161ec..8eb3922e0 100644 --- a/pumpkin/src/main.rs +++ b/pumpkin/src/main.rs @@ -159,11 +159,6 @@ async fn main() -> io::Result<()> { // Asynchronously wait for an inbound socket. let (connection, address) = listener.accept().await?; - log::info!( - "Accepted connection from: {}", - scrub_address(&format!("{}", address)) - ); - if let Err(e) = connection.set_nodelay(true) { log::warn!("failed to set TCP_NODELAY {e}"); } @@ -171,6 +166,12 @@ async fn main() -> io::Result<()> { unique_id += 1; let id = unique_id; + log::info!( + "Accepted connection from: {} (id: {})", + scrub_address(&format!("{}", address)), + id + ); + let keep_alive = tokio::sync::mpsc::channel(1024); let client = Arc::new(Client::new(id, connection, addr, keep_alive.0.into())); diff --git a/pumpkin/src/world/mod.rs b/pumpkin/src/world/mod.rs index e68d5bfd6..99662fb0e 100644 --- a/pumpkin/src/world/mod.rs +++ b/pumpkin/src/world/mod.rs @@ -245,24 +245,39 @@ impl World { player_chunker::player_join(self, player.clone()).await; } + pub async fn mark_chunks_as_not_watched(&self, chunks: &[Vector2]) { + let level = self.level.lock().await; + level.mark_chunk_as_not_watched_and_clean(chunks); + } + + pub async fn mark_chunks_as_watched(&self, chunks: &[Vector2]) { + let level = self.level.lock().await; + level.mark_chunk_as_newly_watched(chunks); + } + async fn spawn_world_chunks( &self, client: Arc, chunks: Vec>, distance: i32, ) { + if client.closed.load(std::sync::atomic::Ordering::Relaxed) { + log::info!( + "The connection with {} has closed before world chunks were spawned", + client.id + ); + return; + } let inst = std::time::Instant::now(); let (sender, mut chunk_receiver) = mpsc::channel(distance as usize); let level = self.level.clone(); - let closed = client.closed.load(std::sync::atomic::Ordering::Relaxed); let chunks = Arc::new(chunks); tokio::spawn(async move { let level = level.lock().await; - level.fetch_chunks(&chunks, sender, closed) + level.fetch_chunks(&chunks, sender) }); - let client = client; tokio::spawn(async move { while let Some(chunk_data) = chunk_receiver.recv().await { // dbg!(chunk_pos); @@ -279,6 +294,9 @@ impl World { len / (1024 * 1024) ); } + + // TODO: Queue player packs in a queue so we don't need to check if its closed before + // sending if !client.closed.load(std::sync::atomic::Ordering::Relaxed) { client.send_packet(&CChunkData(&chunk_data)).await; } diff --git a/pumpkin/src/world/player_chunker.rs b/pumpkin/src/world/player_chunker.rs index e37d5a15e..2ef29d65d 100644 --- a/pumpkin/src/world/player_chunker.rs +++ b/pumpkin/src/world/player_chunker.rs @@ -4,14 +4,14 @@ use pumpkin_config::BASIC_CONFIG; use pumpkin_core::math::{ get_section_cord, position::WorldPosition, vector2::Vector2, vector3::Vector3, }; -use pumpkin_protocol::client::play::CCenterChunk; +use pumpkin_protocol::client::play::{CCenterChunk, CUnloadChunk}; use pumpkin_world::cylindrical_chunk_iterator::Cylindrical; use crate::entity::player::Player; use super::World; -async fn get_view_distance(player: &Player) -> i8 { +pub async fn get_view_distance(player: &Player) -> i8 { player .config .lock() @@ -40,13 +40,15 @@ pub async fn player_join(world: &World, player: Arc) { ); let new_cylindrical = Cylindrical::new(Vector2::new(chunk_pos.x, chunk_pos.z), view_distance); let mut loading_chunks = Vec::new(); + let mut unloading_chunks = Vec::new(); Cylindrical::for_each_changed_chunk( old_cylindrical, new_cylindrical, |chunk_pos| { loading_chunks.push(chunk_pos); }, - |_| { + |chunk_pos| { + unloading_chunks.push(chunk_pos); // player // .client // .send_packet(&CUnloadChunk::new(chunk_pos.x, chunk_pos.z)); @@ -54,10 +56,27 @@ pub async fn player_join(world: &World, player: Arc) { true, ); if !loading_chunks.is_empty() { + world.mark_chunks_as_watched(&loading_chunks).await; world .spawn_world_chunks(player.client.clone(), loading_chunks, view_distance) .await; } + + if !unloading_chunks.is_empty() { + world.mark_chunks_as_not_watched(&unloading_chunks).await; + for chunk in unloading_chunks { + if !player + .client + .closed + .load(std::sync::atomic::Ordering::Relaxed) + { + player + .client + .send_packet(&CUnloadChunk::new(chunk.x, chunk.z)) + .await; + } + } + } } pub async fn update_position(player: &Player) { @@ -84,13 +103,15 @@ pub async fn update_position(player: &Player) { player.watched_section.store(new_watched); let mut loading_chunks = Vec::new(); + let mut unloading_chunks = Vec::new(); Cylindrical::for_each_changed_chunk( old_cylindrical, new_cylindrical, |chunk_pos| { loading_chunks.push(chunk_pos); }, - |_| { + |chunk_pos| { + unloading_chunks.push(chunk_pos); // player // .client // .send_packet(&CUnloadChunk::new(chunk_pos.x, chunk_pos.z)); @@ -98,15 +119,35 @@ pub async fn update_position(player: &Player) { false, ); if !loading_chunks.is_empty() { + entity.world.mark_chunks_as_watched(&loading_chunks).await; entity .world .spawn_world_chunks(player.client.clone(), loading_chunks, view_distance) .await; } + + if !unloading_chunks.is_empty() { + entity + .world + .mark_chunks_as_not_watched(&unloading_chunks) + .await; + for chunk in unloading_chunks { + if !player + .client + .closed + .load(std::sync::atomic::Ordering::Relaxed) + { + player + .client + .send_packet(&CUnloadChunk::new(chunk.x, chunk.z)) + .await; + } + } + } } } -const fn chunk_section_from_pos(block_pos: &WorldPosition) -> Vector3 { +pub const fn chunk_section_from_pos(block_pos: &WorldPosition) -> Vector3 { let block_pos = block_pos.0; Vector3::new( get_section_cord(block_pos.x),