Skip to content

Commit

Permalink
Merge pull request #152 from kralverde/unload_chunks_tokio
Browse files Browse the repository at this point in the history
unload chunks when no players are using them
  • Loading branch information
Snowiiii authored Oct 19, 2024
2 parents 650c28f + 637104c commit 524fecf
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 62 deletions.
4 changes: 2 additions & 2 deletions pumpkin-world/src/chunk/anvil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl ChunkReader for AnvilChunkReader {
fn read_chunk(
&self,
save_file: &SaveFile,
at: pumpkin_core::math::vector2::Vector2<i32>,
at: &pumpkin_core::math::vector2::Vector2<i32>,
) -> Result<super::ChunkData, ChunkReadingError> {
let region = (
((at.x as f32) / 32.0).floor() as i32,
Expand Down Expand Up @@ -158,6 +158,6 @@ impl ChunkReader for AnvilChunkReader {
.decompress_data(chunk_data)
.map_err(ChunkReadingError::Compression)?;

ChunkData::from_bytes(decompressed_chunk, at).map_err(ChunkReadingError::ParsingError)
ChunkData::from_bytes(decompressed_chunk, *at).map_err(ChunkReadingError::ParsingError)
}
}
2 changes: 1 addition & 1 deletion pumpkin-world/src/chunk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub trait ChunkReader: Sync + Send {
fn read_chunk(
&self,
save_file: &SaveFile,
at: Vector2<i32>,
at: &Vector2<i32>,
) -> Result<ChunkData, ChunkReadingError>;
}

Expand Down
16 changes: 15 additions & 1 deletion pumpkin-world/src/cylindrical_chunk_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ impl Cylindrical {
self.center.z + self.view_distance + 1
}

#[allow(dead_code)]
fn is_within_distance(&self, x: i32, z: i32) -> bool {
let max_dist_squared = self.view_distance * self.view_distance;
let max_dist = self.view_distance as i64;
Expand All @@ -76,4 +75,19 @@ impl Cylindrical {
let dist_squared = dist_x.pow(2) + (max_dist.min(dist_z as i64) as i32).pow(2);
dist_squared < max_dist_squared
}

/// Returns an iterator of all chunks within this cylinder
pub fn all_chunks_within(&self) -> Vec<Vector2<i32>> {
// This is a naive implementation: start with square and cut out ones that dont fit
let mut all_chunks = Vec::new();
for x in -self.view_distance..=self.view_distance {
for z in -self.view_distance..=self.view_distance {
all_chunks.push(Vector2::new(self.center.x + x, self.center.z + z));
}
}
all_chunks
.into_iter()
.filter(|chunk| self.is_within_distance(chunk.x, chunk.z))
.collect()
}
}
152 changes: 111 additions & 41 deletions pumpkin-world/src/level.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::HashMap, path::PathBuf, sync::Arc};

use parking_lot::Mutex;
use parking_lot::{Mutex, RwLock};
use pumpkin_core::math::vector2::Vector2;
use rayon::prelude::*;
use tokio::sync::mpsc;
Expand All @@ -23,7 +23,8 @@ use crate::{
/// For more details on world generation, refer to the `WorldGenerator` module.
pub struct Level {
save_file: Option<SaveFile>,
loaded_chunks: Arc<Mutex<HashMap<Vector2<i32>, Arc<ChunkData>>>>,
loaded_chunks: Arc<RwLock<HashMap<Vector2<i32>, Arc<ChunkData>>>>,
chunk_watchers: Arc<Mutex<HashMap<Vector2<i32>, usize>>>,
chunk_reader: Box<dyn ChunkReader>,
world_gen: Box<dyn WorldGenerator>,
}
Expand Down Expand Up @@ -52,7 +53,8 @@ impl Level {
region_folder,
}),
chunk_reader: Box::new(AnvilChunkReader::new()),
loaded_chunks: Arc::new(Mutex::new(HashMap::new())),
loaded_chunks: Arc::new(RwLock::new(HashMap::new())),
chunk_watchers: Arc::new(Mutex::new(HashMap::new())),
}
} else {
log::warn!(
Expand All @@ -63,63 +65,131 @@ impl Level {
world_gen,
save_file: None,
chunk_reader: Box::new(AnvilChunkReader::new()),
loaded_chunks: Arc::new(Mutex::new(HashMap::new())),
loaded_chunks: Arc::new(RwLock::new(HashMap::new())),
chunk_watchers: Arc::new(Mutex::new(HashMap::new())),
}
}
}

pub fn get_block() {}

/// Marks chunks as "watched" by a unique player. When no players are watching a chunk,
/// it is removed from memory. Should only be called on chunks the player was not watching
/// before
pub fn mark_chunk_as_newly_watched(&self, chunks: &[Vector2<i32>]) {
let mut watchers = self.chunk_watchers.lock();
for chunk in chunks {
match watchers.entry(*chunk) {
std::collections::hash_map::Entry::Occupied(mut occupied) => {
let value = occupied.get_mut();
*value = value.saturating_add(1);
}
std::collections::hash_map::Entry::Vacant(vacant) => {
vacant.insert(1);
}
}
}
}

/// Marks chunks no longer "watched" by a unique player. When no players are watching a chunk,
/// it is removed from memory. Should only be called on chunks the player was watching before
pub fn mark_chunk_as_not_watched_and_clean(&self, chunks: &[Vector2<i32>]) {
let dropped_chunks = {
let mut watchers = self.chunk_watchers.lock();
chunks
.iter()
.filter(|chunk| match watchers.entry(**chunk) {
std::collections::hash_map::Entry::Occupied(mut occupied) => {
let value = occupied.get_mut();
*value = value.saturating_sub(1);
if *value == 0 {
occupied.remove_entry();
true
} else {
false
}
}
std::collections::hash_map::Entry::Vacant(_) => {
log::error!(
"Marking a chunk as not watched, but was vacant! ({:?})",
chunk
);
false
}
})
.collect::<Vec<_>>()
};
let mut loaded_chunks = self.loaded_chunks.write();
let dropped_chunk_data = dropped_chunks
.iter()
.filter_map(|chunk| {
log::debug!("Unloading chunk {:?}", chunk);
loaded_chunks.remove_entry(*chunk)
})
.collect();
self.write_chunks(dropped_chunk_data);
}

pub fn write_chunks(&self, _chunks_to_write: Vec<(Vector2<i32>, Arc<ChunkData>)>) {
//TODO
}

/// Reads/Generates many chunks in a world
/// MUST be called from a tokio runtime thread
///
/// Note: The order of the output chunks will almost never be in the same order as the order of input chunks
pub fn fetch_chunks(
&self,
chunks: &[Vector2<i32>],
channel: mpsc::Sender<Arc<ChunkData>>,
is_alive: bool,
) {
pub fn fetch_chunks(&self, chunks: &[Vector2<i32>], channel: mpsc::Sender<Arc<ChunkData>>) {
chunks.into_par_iter().for_each(|at| {
if is_alive {
return;
}
let mut loaded_chunks = self.loaded_chunks.lock();
let channel = channel.clone();

// Check if chunks is already loaded
if loaded_chunks.contains_key(at) {
channel
.blocking_send(loaded_chunks.get(at).unwrap().clone())
.expect("Failed sending ChunkData.");
return;
let maybe_chunk = {
let loaded_chunks = self.loaded_chunks.read();
loaded_chunks.get(at).cloned()
}
let at = *at;
let data = match &self.save_file {
Some(save_file) => {
match self.chunk_reader.read_chunk(save_file, at) {
Err(
ChunkReadingError::ParsingError(ChunkParsingError::ChunkNotGenerated)
| ChunkReadingError::ChunkNotExist,
) => {
// This chunk was not generated yet.
Ok(self.world_gen.generate_chunk(at))
.or_else(|| {
let chunk_data = match &self.save_file {
Some(save_file) => {
match self.chunk_reader.read_chunk(save_file, at) {
Ok(data) => Ok(Arc::new(data)),
Err(
ChunkReadingError::ChunkNotExist
| ChunkReadingError::ParsingError(
ChunkParsingError::ChunkNotGenerated,
),
) => {
// This chunk was not generated yet.
let chunk = Arc::new(self.world_gen.generate_chunk(*at));
Ok(chunk)
}
Err(err) => Err(err),
}
// TODO this doesn't warn the user about the error. fix.
result => result,
}
None => {
// There is no savefile yet -> generate the chunks
let chunk = Arc::new(self.world_gen.generate_chunk(*at));
Ok(chunk)
}
};
match chunk_data {
Ok(data) => Some(data),
Err(err) => {
// TODO: Panic here?
log::warn!("Failed to read chunk {:?}: {:?}", at, err);
None
}
}
});
match maybe_chunk {
Some(chunk) => {
channel
.blocking_send(chunk.clone())
.expect("Failed sending ChunkData.");
}
None => {
// There is no savefile yet -> generate the chunks
Ok(self.world_gen.generate_chunk(at))
log::error!("Unable to send chunk {:?}!", at);
}
}
.unwrap();
let data = Arc::new(data);
channel
.blocking_send(data.clone())
.expect("Failed sending ChunkData.");
loaded_chunks.insert(at, data);
};
})
}
}
17 changes: 13 additions & 4 deletions pumpkin/src/entity/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crossbeam::atomic::AtomicCell;
use num_derive::FromPrimitive;
use num_traits::ToPrimitive;
use pumpkin_core::{
math::{boundingbox::BoundingBox, position::WorldPosition, vector3::Vector3},
math::{boundingbox::BoundingBox, position::WorldPosition, vector2::Vector2, vector3::Vector3},
text::TextComponent,
GameMode,
};
Expand All @@ -30,15 +30,15 @@ use pumpkin_protocol::{
use tokio::sync::Mutex;

use pumpkin_protocol::server::play::{SCloseContainer, SKeepAlive};
use pumpkin_world::item::ItemStack;
use pumpkin_world::{cylindrical_chunk_iterator::Cylindrical, item::ItemStack};

use super::Entity;
use crate::error::PumpkinError;
use crate::{
client::{authentication::GameProfile, Client, PlayerConfig},
server::Server,
world::World,
world::{player_chunker::chunk_section_from_pos, World},
};
use crate::{error::PumpkinError, world::player_chunker::get_view_distance};

use super::living::LivingEntity;

Expand Down Expand Up @@ -139,6 +139,15 @@ impl Player {
/// Removes the Player out of the current World
pub async fn remove(&self) {
self.living_entity.entity.world.remove_player(self).await;

let watched = chunk_section_from_pos(&self.living_entity.entity.block_pos.load());
let view_distance = get_view_distance(self).await as i32;
let cylindrical = Cylindrical::new(Vector2::new(watched.x, watched.z), view_distance);
self.living_entity
.entity
.world
.mark_chunks_as_not_watched(&cylindrical.all_chunks_within())
.await;
}

pub const fn entity_id(&self) -> EntityId {
Expand Down
11 changes: 6 additions & 5 deletions pumpkin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,19 @@ async fn main() -> io::Result<()> {
// Asynchronously wait for an inbound socket.
let (connection, address) = listener.accept().await?;

log::info!(
"Accepted connection from: {}",
scrub_address(&format!("{}", address))
);

if let Err(e) = connection.set_nodelay(true) {
log::warn!("failed to set TCP_NODELAY {e}");
}

unique_id += 1;
let id = unique_id;

log::info!(
"Accepted connection from: {} (id: {})",
scrub_address(&format!("{}", address)),
id
);

let keep_alive = tokio::sync::mpsc::channel(1024);
let client = Arc::new(Client::new(id, connection, addr, keep_alive.0.into()));

Expand Down
24 changes: 21 additions & 3 deletions pumpkin/src/world/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,24 +245,39 @@ impl World {
player_chunker::player_join(self, player.clone()).await;
}

pub async fn mark_chunks_as_not_watched(&self, chunks: &[Vector2<i32>]) {
let level = self.level.lock().await;
level.mark_chunk_as_not_watched_and_clean(chunks);
}

pub async fn mark_chunks_as_watched(&self, chunks: &[Vector2<i32>]) {
let level = self.level.lock().await;
level.mark_chunk_as_newly_watched(chunks);
}

async fn spawn_world_chunks(
&self,
client: Arc<Client>,
chunks: Vec<Vector2<i32>>,
distance: i32,
) {
if client.closed.load(std::sync::atomic::Ordering::Relaxed) {
log::info!(
"The connection with {} has closed before world chunks were spawned",
client.id
);
return;
}
let inst = std::time::Instant::now();
let (sender, mut chunk_receiver) = mpsc::channel(distance as usize);

let level = self.level.clone();
let closed = client.closed.load(std::sync::atomic::Ordering::Relaxed);
let chunks = Arc::new(chunks);
tokio::spawn(async move {
let level = level.lock().await;
level.fetch_chunks(&chunks, sender, closed)
level.fetch_chunks(&chunks, sender)
});

let client = client;
tokio::spawn(async move {
while let Some(chunk_data) = chunk_receiver.recv().await {
// dbg!(chunk_pos);
Expand All @@ -279,6 +294,9 @@ impl World {
len / (1024 * 1024)
);
}

// TODO: Queue player packs in a queue so we don't need to check if its closed before
// sending
if !client.closed.load(std::sync::atomic::Ordering::Relaxed) {
client.send_packet(&CChunkData(&chunk_data)).await;
}
Expand Down
Loading

0 comments on commit 524fecf

Please sign in to comment.