Skip to content

Commit

Permalink
reduce client id to u16 (#172)
Browse files Browse the repository at this point in the history
  • Loading branch information
kralverde authored Oct 25, 2024
1 parent a395e61 commit c152329
Show file tree
Hide file tree
Showing 9 changed files with 645 additions and 192 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ num-derive = "0.4"

# Concurrency/Parallelism and Synchronization
rayon = "1.10.0"
parking_lot = "0.12.3"
parking_lot = { version = "0.12.3", features = ["send_guard"] }
crossbeam = "0.8.4"

uuid = { version = "1.11.0", features = ["serde", "v3", "v4"] }
Expand Down
14 changes: 14 additions & 0 deletions pumpkin-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,20 @@ pub enum PacketError {
MalformedLength,
}

impl PacketError {
pub fn kickable(&self) -> bool {
// We no longer have a connection, so dont try to kick the player, just close
!matches!(
self,
Self::EncodeData
| Self::EncodeFailedWrite
| Self::FailedWrite(_)
| Self::FailedFinish
| Self::ConnectionWrite
)
}
}

#[derive(Debug, PartialEq, Clone, Copy)]
pub enum ConnectionState {
HandShake,
Expand Down
244 changes: 154 additions & 90 deletions pumpkin-world/src/level.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::{path::PathBuf, sync::Arc};

use dashmap::{DashMap, Entry};
use num_traits::Zero;
use pumpkin_core::math::vector2::Vector2;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use tokio::sync::{mpsc, RwLock};
use tokio::{
sync::{mpsc, RwLock},
task::JoinHandle,
};

use crate::{
chunk::{
Expand All @@ -12,6 +16,8 @@ use crate::{
world_gen::{get_world_gen, Seed, WorldGenerator},
};

pub type ConcurrentChunkResult = Vec<(Vector2<i32>, JoinHandle<()>)>;

/// The `Level` module provides functionality for working with chunks within or outside a Minecraft world.
///
/// Key features include:
Expand Down Expand Up @@ -77,58 +83,108 @@ impl Level {
self.loaded_chunks.len()
}

pub fn list_cached(&self) {
for entry in self.loaded_chunks.iter() {
log::debug!("In map: {:?}", entry.key());
}
}

/// Marks chunks as "watched" by a unique player. When no players are watching a chunk,
/// it is removed from memory. Should only be called on chunks the player was not watching
/// before
pub fn mark_chunks_as_newly_watched(&self, chunks: &[Vector2<i32>]) {
chunks
.par_iter()
.for_each(|chunk| match self.chunk_watchers.entry(*chunk) {
Entry::Occupied(mut occupied) => {
let value = occupied.get_mut();
if let Some(new_value) = value.checked_add(1) {
*value = new_value;
//log::debug!("Watch value for {:?}: {}", chunk, value);
} else {
log::error!("Watching overflow on chunk {:?}", chunk);
}
}
Entry::Vacant(vacant) => {
vacant.insert(1);
chunks.par_iter().for_each(|chunk| {
self.mark_chunk_as_newly_watched(*chunk);
});
}

pub fn mark_chunk_as_newly_watched(&self, chunk: Vector2<i32>) {
match self.chunk_watchers.entry(chunk) {
Entry::Occupied(mut occupied) => {
let value = occupied.get_mut();
if let Some(new_value) = value.checked_add(1) {
*value = new_value;
//log::debug!("Watch value for {:?}: {}", chunk, value);
} else {
log::error!("Watching overflow on chunk {:?}", chunk);
}
});
}
Entry::Vacant(vacant) => {
vacant.insert(1);
}
}
}

/// Marks chunks no longer "watched" by a unique player. When no players are watching a chunk,
/// it is removed from memory. Should only be called on chunks the player was watching before
pub async fn mark_chunk_as_not_watched_and_clean(&self, chunks: &[Vector2<i32>]) {
pub fn mark_chunks_as_not_watched(&self, chunks: &[Vector2<i32>]) -> Vec<Vector2<i32>> {
chunks
.par_iter()
.filter(|chunk| match self.chunk_watchers.entry(**chunk) {
Entry::Occupied(mut occupied) => {
let value = occupied.get_mut();
*value = value.saturating_sub(1);
if *value == 0 {
occupied.remove_entry();
true
} else {
false
}
}
Entry::Vacant(_) => {
log::error!(
"Marking a chunk as not watched, but was vacant! ({:?})",
chunk
);
.filter(|chunk| self.mark_chunk_as_not_watched(**chunk))
.map(|chunk| *chunk)
.collect()
}

/// Returns whether the chunk should be removed from memory
pub fn mark_chunk_as_not_watched(&self, chunk: Vector2<i32>) -> bool {
match self.chunk_watchers.entry(chunk) {
Entry::Occupied(mut occupied) => {
let value = occupied.get_mut();
*value = value.saturating_sub(1);
if *value == 0 {
occupied.remove_entry();
true
} else {
false
}
})
.for_each(|chunk_pos| {
//log::debug!("Unloading {:?}", chunk_pos);
if let Some(data) = self.loaded_chunks.remove(chunk_pos) {
self.write_chunk(data);
};
});
}
Entry::Vacant(_) => {
// This can be:
// - Player disconnecting before all packets have been sent
// - Player moving so fast that the chunk leaves the render distance before it
// is loaded into memory
log::error!(
"Marking a chunk as not watched, but was vacant! ({:?})",
chunk
);
false
}
}
}

pub fn should_pop_chunk(&self, chunk: &Vector2<i32>) -> bool {
if let Some(entry) = self.chunk_watchers.get(chunk) {
if entry.value().is_zero() {
self.chunk_watchers.remove(chunk);
}
}

self.chunk_watchers.get(chunk).is_none()
}

pub fn clean_chunks(&self, chunks: &[Vector2<i32>]) {
chunks.par_iter().for_each(|chunk_pos| {
//log::debug!("Unloading {:?}", chunk_pos);
if let Some(data) = self.loaded_chunks.remove(chunk_pos) {
self.write_chunk(data);
};
});
}

pub fn clean_memory(&self, chunks_to_check: &[Vector2<i32>]) {
chunks_to_check.par_iter().for_each(|chunk| {
if let Some(entry) = self.chunk_watchers.get(chunk) {
if entry.value().is_zero() {
self.chunk_watchers.remove(chunk);
}
}

if self.chunk_watchers.get(chunk).is_none() {
self.loaded_chunks.remove(chunk);
}
});
self.loaded_chunks.shrink_to_fit();
self.chunk_watchers.shrink_to_fit();
}

pub fn write_chunk(&self, _chunk_to_write: (Vector2<i32>, Arc<RwLock<ChunkData>>)) {
Expand Down Expand Up @@ -157,59 +213,67 @@ impl Level {
/// MUST be called from a tokio runtime thread
///
/// Note: The order of the output chunks will almost never be in the same order as the order of input chunks
pub async fn fetch_chunks(
pub fn fetch_chunks(
&self,
chunks: &[Vector2<i32>],
channel: mpsc::Sender<Arc<RwLock<ChunkData>>>,
) {
chunks.iter().for_each(|at| {
let channel = channel.clone();
let loaded_chunks = self.loaded_chunks.clone();
let chunk_reader = self.chunk_reader.clone();
let save_file = self.save_file.clone();
let world_gen = self.world_gen.clone();
let chunk_pos = *at;

tokio::spawn(async move {
let chunk = loaded_chunks
.get(&chunk_pos)
.map(|entry| entry.value().clone())
.unwrap_or_else(|| {
let loaded_chunk = save_file
.and_then(|save_file| {
match Self::load_chunk_from_save(chunk_reader, save_file, chunk_pos)
{
Ok(chunk) => chunk,
Err(err) => {
log::error!(
"Failed to read chunk (regenerating) {:?}: {:?}",
chunk_pos,
err
);
None
) -> ConcurrentChunkResult {
chunks
.iter()
.map(|at| {
let channel = channel.clone();
let loaded_chunks = self.loaded_chunks.clone();
let chunk_reader = self.chunk_reader.clone();
let save_file = self.save_file.clone();
let world_gen = self.world_gen.clone();
let chunk_pos = *at;

let join_handle = tokio::spawn(async move {
let chunk = loaded_chunks
.get(&chunk_pos)
.map(|entry| entry.value().clone())
.unwrap_or_else(|| {
let loaded_chunk = save_file
.and_then(|save_file| {
match Self::load_chunk_from_save(
chunk_reader,
save_file,
chunk_pos,
) {
Ok(chunk) => chunk,
Err(err) => {
log::error!(
"Failed to read chunk (regenerating) {:?}: {:?}",
chunk_pos,
err
);
None
}
}
}
})
.unwrap_or_else(|| {
Arc::new(RwLock::new(world_gen.generate_chunk(chunk_pos)))
});

if let Some(data) = loaded_chunks.get(&chunk_pos) {
// Another thread populated in between the previous check and now
// We did work, but this is basically like a cache miss, not much we
// can do about it
data.value().clone()
} else {
loaded_chunks.insert(chunk_pos, loaded_chunk.clone());
loaded_chunk
}
});

let _ = channel
.send(chunk)
.await
.inspect_err(|err| log::error!("unable to send chunk to channel: {}", err));
});
})
})
.unwrap_or_else(|| {
Arc::new(RwLock::new(world_gen.generate_chunk(chunk_pos)))
});

if let Some(data) = loaded_chunks.get(&chunk_pos) {
// Another thread populated in between the previous check and now
// We did work, but this is basically like a cache miss, not much we
// can do about it
data.value().clone()
} else {
loaded_chunks.insert(chunk_pos, loaded_chunk.clone());
loaded_chunk
}
});

let _ = channel
.send(chunk)
.await
.inspect_err(|err| log::error!("unable to send chunk to channel: {}", err));
});

(*at, join_handle)
})
.collect()
}
}
1 change: 1 addition & 0 deletions pumpkin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ thiserror.workspace = true

num-traits.workspace = true
num-derive.workspace = true
parking_lot.workspace = true

# config
serde.workspace = true
Expand Down
Loading

0 comments on commit c152329

Please sign in to comment.