Skip to content

Commit

Permalink
Fix chunk loading (#378)
Browse files Browse the repository at this point in the history
* fix chunk loading

* make tokio and rayon work together
  • Loading branch information
kralverde authored Dec 7, 2024
1 parent 3143f73 commit cd6ac14
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 470 deletions.
127 changes: 56 additions & 71 deletions pumpkin-world/src/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use pumpkin_config::BASIC_CONFIG;
use pumpkin_core::math::vector2::Vector2;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use tokio::{
runtime::Handle,
sync::{mpsc, RwLock},
task::JoinHandle,
};

use crate::{
Expand All @@ -17,8 +17,6 @@ use crate::{
world_gen::{get_world_gen, Seed, WorldGenerator},
};

pub type ConcurrentChunkResult = Vec<(Vector2<i32>, JoinHandle<()>)>;

/// The `Level` module provides functionality for working with chunks within or outside a Minecraft world.
///
/// Key features include:
Expand Down Expand Up @@ -140,6 +138,7 @@ impl Level {
Entry::Occupied(mut occupied) => {
let value = occupied.get_mut();
*value = value.saturating_sub(1);

if *value == 0 {
occupied.remove_entry();
true
Expand All @@ -152,34 +151,28 @@ impl Level {
// - Player disconnecting before all packets have been sent
// - Player moving so fast that the chunk leaves the render distance before it
// is loaded into memory
log::error!(
"Marking a chunk as not watched, but was vacant! ({:?})",
chunk
);
false
}
}
}

pub fn should_pop_chunk(&self, chunk: &Vector2<i32>) -> bool {
if let Some(entry) = self.chunk_watchers.get(chunk) {
if entry.value().is_zero() {
self.chunk_watchers.remove(chunk);
}
}

self.chunk_watchers.get(chunk).is_none()
}

pub fn clean_chunks(&self, chunks: &[Vector2<i32>]) {
chunks.par_iter().for_each(|chunk_pos| {
//log::debug!("Unloading {:?}", chunk_pos);
if let Some(data) = self.loaded_chunks.remove(chunk_pos) {
self.write_chunk(data);
};
self.clean_chunk(chunk_pos);
});
}

pub fn clean_chunk(&self, chunk: &Vector2<i32>) {
if let Some(data) = self.loaded_chunks.remove(chunk) {
self.write_chunk(data);
}
}

pub fn is_chunk_watched(&self, chunk: &Vector2<i32>) -> bool {
self.chunk_watchers.get(chunk).is_some()
}

pub fn clean_memory(&self, chunks_to_check: &[Vector2<i32>]) {
chunks_to_check.par_iter().for_each(|chunk| {
if let Some(entry) = self.chunk_watchers.get(chunk) {
Expand Down Expand Up @@ -226,63 +219,55 @@ impl Level {
&self,
chunks: &[Vector2<i32>],
channel: mpsc::Sender<Arc<RwLock<ChunkData>>>,
) -> ConcurrentChunkResult {
chunks
.iter()
.map(|at| {
let channel = channel.clone();
let loaded_chunks = self.loaded_chunks.clone();
let chunk_reader = self.chunk_reader.clone();
let save_file = self.save_file.clone();
let world_gen = self.world_gen.clone();
let chunk_pos = *at;
rt: &Handle,
) {
chunks.par_iter().for_each(|at| {
let channel = channel.clone();
let loaded_chunks = self.loaded_chunks.clone();
let chunk_reader = self.chunk_reader.clone();
let save_file = self.save_file.clone();
let world_gen = self.world_gen.clone();
let chunk_pos = *at;

let join_handle = tokio::spawn(async move {
let chunk = loaded_chunks
.get(&chunk_pos)
.map(|entry| entry.value().clone())
.unwrap_or_else(|| {
let loaded_chunk = save_file
.and_then(|save_file| {
match Self::load_chunk_from_save(
chunk_reader,
save_file,
let chunk = loaded_chunks
.get(&chunk_pos)
.map(|entry| entry.value().clone())
.unwrap_or_else(|| {
let loaded_chunk = save_file
.and_then(|save_file| {
match Self::load_chunk_from_save(chunk_reader, save_file, chunk_pos) {
Ok(chunk) => chunk,
Err(err) => {
log::error!(
"Failed to read chunk (regenerating) {:?}: {:?}",
chunk_pos,
) {
Ok(chunk) => chunk,
Err(err) => {
log::error!(
"Failed to read chunk (regenerating) {:?}: {:?}",
chunk_pos,
err
);
None
}
}
})
.unwrap_or_else(|| {
Arc::new(RwLock::new(world_gen.generate_chunk(chunk_pos)))
});

if let Some(data) = loaded_chunks.get(&chunk_pos) {
// Another thread populated in between the previous check and now
// We did work, but this is basically like a cache miss, not much we
// can do about it
data.value().clone()
} else {
loaded_chunks.insert(chunk_pos, loaded_chunk.clone());
loaded_chunk
err
);
None
}
}
})
.unwrap_or_else(|| {
Arc::new(RwLock::new(world_gen.generate_chunk(chunk_pos)))
});

let _ = channel
.send(chunk)
.await
.inspect_err(|err| log::error!("unable to send chunk to channel: {}", err));
if let Some(data) = loaded_chunks.get(&chunk_pos) {
// Another thread populated in between the previous check and now
// We did work, but this is basically like a cache miss, not much we
// can do about it
data.value().clone()
} else {
loaded_chunks.insert(chunk_pos, loaded_chunk.clone());
loaded_chunk
}
});

(*at, join_handle)
})
.collect()
rt.spawn(async move {
let _ = channel
.send(chunk)
.await
.inspect_err(|err| log::error!("unable to send chunk to channel: {}", err));
});
});
}
}
Loading

0 comments on commit cd6ac14

Please sign in to comment.