From 03f9b377d58e3a016795613abd208f4ab9568965 Mon Sep 17 00:00:00 2001 From: lukas0008 Date: Tue, 13 Aug 2024 23:30:20 +0200 Subject: [PATCH] Optimize chunk loading - Chunk loading now is done in a more simple manner. Instead of splitting it into regions and having one file handle for all chunks in the same region, now all chunks make their own file handle for the region, the previous overhead was not worth it, since this seemed to increase performance. - Speed up of ~15% - Now the `.read_chunks` function does not return anything, instead it sends event to an mpsc channel. - Dramatically decreased memory footprint. --- pumpkin-world/Cargo.toml | 2 +- pumpkin-world/src/chunk.rs | 3 +- pumpkin-world/src/world.rs | 231 +++++++++++++++++-------------------- pumpkin/src/server.rs | 37 +++--- 4 files changed, 131 insertions(+), 142 deletions(-) diff --git a/pumpkin-world/Cargo.toml b/pumpkin-world/Cargo.toml index 38dfc8a01..7e073f6c7 100644 --- a/pumpkin-world/Cargo.toml +++ b/pumpkin-world/Cargo.toml @@ -8,6 +8,7 @@ edition.workspace = true fastnbt = { git = "https://github.com/owengage/fastnbt.git" } fastsnbt = "0.2" tokio.workspace = true +rayon.workspace = true itertools = "0.13.0" thiserror = "1.0.63" futures = "0.3.30" @@ -15,4 +16,3 @@ flate2 = "1.0.31" serde = { version = "1.0.205", features = ["derive"] } lazy_static = "1.5.0" serde_json = "1.0.122" -rayon = "1.10.0" diff --git a/pumpkin-world/src/chunk.rs b/pumpkin-world/src/chunk.rs index 60729c83f..f41034d58 100644 --- a/pumpkin-world/src/chunk.rs +++ b/pumpkin-world/src/chunk.rs @@ -32,7 +32,7 @@ use fastnbt::LongArray; use crate::{world::WorldError, WORLD_HEIGHT}; pub struct ChunkData { - pub blocks: Box<[i64; 16 * 16 * WORLD_HEIGHT]>, + pub blocks: Box<[i32; 16 * 16 * WORLD_HEIGHT]>, pub position: (i32, i32), pub heightmaps: ChunkHeightmaps, } @@ -96,6 +96,7 @@ impl ChunkData { &entry.name, entry.properties.as_ref(), ) + .map(|v| v as i32) }) .collect::, _>>()?; let block_data = match block_states.data { diff --git a/pumpkin-world/src/world.rs b/pumpkin-world/src/world.rs index f297ccdfd..2f2c0d407 100644 --- a/pumpkin-world/src/world.rs +++ b/pumpkin-world/src/world.rs @@ -1,4 +1,8 @@ -use std::{io::Read, path::PathBuf, sync::Arc}; +use std::{ + io::{Read, Seek}, + path::PathBuf, + sync::{atomic::AtomicUsize, Arc}, +}; use flate2::bufread::ZlibDecoder; use itertools::Itertools; @@ -7,7 +11,9 @@ use thiserror::Error; use tokio::{ fs::File, io::{AsyncReadExt, AsyncSeekExt}, - sync::Mutex, + runtime::Handle, + sync::{mpsc, oneshot, Mutex}, + task::spawn_blocking, }; use crate::chunk::ChunkData; @@ -50,107 +56,81 @@ impl Level { Level { root_folder } } - /// Read one chunk in the world - /// - /// Do not use this function if reading many chunks is required, since in case those two chunks which are read seperately using `.read_chunk` are in the same region file, it will need to be opened and closed separately for both of them, leading to a performance loss. - pub async fn read_chunk(&self, chunk: (i32, i32)) -> Result { - self.read_chunks(vec![chunk]) - .await - .pop() - .expect("Read chunks must return a chunk") - .1 - } + // /// Read one chunk in the world + // /// + // /// Do not use this function if reading many chunks is required, since in case those two chunks which are read seperately using `.read_chunk` are in the same region file, it will need to be opened and closed separately for both of them, leading to a performance loss. + // pub async fn read_chunk(&self, chunk: (i32, i32)) -> Result { + // self.read_chunks(vec![chunk]) + // .await + // .pop() + // .expect("Read chunks must return a chunk") + // .1 + // } /// Read many chunks in a world + /// MUST be called from a tokio runtime thread /// /// Note: The order of the output chunks will almost never be in the same order as the order of input chunks pub async fn read_chunks( &self, chunks: Vec<(i32, i32)>, - ) -> Vec<((i32, i32), Result)> { - futures::future::join_all( - chunks - .into_iter() - // split chunks into their corresponding region files to be able to read all of them at once, instead of reopening the file multiple times - .chunk_by(|chunk| { - ( - ((chunk.0 as f32) / 32.0).floor() as i32, - ((chunk.1 as f32) / 32.0).floor() as i32, - ) - }) - .into_iter() - .map(|(region, chunk_vec)| { - let mut path = self.root_folder.clone(); - let chunk_vec = chunk_vec.collect_vec(); - path.push("region"); - path.push(format!("r.{}.{}.mca", region.0, region.1)); - self.read_region_chunks(path, chunk_vec) - }), - ) - .await - .into_iter() - .flatten() - .collect_vec() - } - async fn read_region_chunks( - &self, - region_file_path: PathBuf, - chunks: Vec<(i32, i32)>, - ) -> Vec<((i32, i32), Result)> { - // dbg!(at); - // return different error when file is not found (because that means that the chunks have just not been generated yet) - let mut region_file = match File::open(®ion_file_path).await { - Ok(f) => f, - Err(err) => match err.kind() { - std::io::ErrorKind::NotFound => { - return chunks - .into_iter() - .map(|c| (c, Err(WorldError::RegionNotFound))) - .collect_vec() - } - _ => { - return chunks - .into_iter() - .map(|c| (c, Err(WorldError::IoError(err.kind())))) - .collect_vec() - } - }, - }; - - let mut location_table: [u8; 4096] = [0; 4096]; - let mut timestamp_table: [u8; 4096] = [0; 4096]; - - // fill the location and timestamp tables - { - match region_file.read_exact(&mut location_table).await { - Ok(_) => {} - Err(err) => { - return chunks - .into_iter() - .map(|c| (c, Err(WorldError::IoError(err.kind())))) - .collect_vec() - } - } - match region_file.read_exact(&mut timestamp_table).await { - Ok(_) => {} - Err(err) => { - return chunks - .into_iter() - .map(|c| (c, Err(WorldError::IoError(err.kind())))) - .collect_vec() + channel: mpsc::Sender<((i32, i32), Result)>, + ) { + chunks + .into_par_iter() + .map(|chunk| { + let region = ( + ((chunk.0 as f32) / 32.0).floor() as i32, + ((chunk.1 as f32) / 32.0).floor() as i32, + ); + let channel = channel.clone(); + let mut region_file_path = self.root_folder.clone(); + region_file_path.push("region"); + region_file_path.push(format!("r.{}.{}.mca", region.0, region.1)); + + // return different error when file is not found (because that means that the chunks have just not been generated yet) + let mut region_file = match std::fs::File::options().read(true).write(false).open(®ion_file_path) { + Ok(f) => f, + Err(err) => match err.kind() { + std::io::ErrorKind::NotFound => { + let _ = channel.blocking_send((chunk, Err(WorldError::RegionNotFound))); + return; + } + _ => { + let _ = channel + .blocking_send((chunk, Err(WorldError::IoError(err.kind())))); + return; + } + }, + }; + + let mut location_table: [u8; 4096] = [0; 4096]; + let mut timestamp_table: [u8; 4096] = [0; 4096]; + + // fill the location and timestamp tables + { + match region_file.read_exact(&mut location_table) { + Ok(_) => {} + Err(err) => { + let _ = channel + .blocking_send((chunk, Err(WorldError::IoError(err.kind())))); + return; + } + } + match region_file.read_exact(&mut timestamp_table) { + Ok(_) => {} + Err(err) => { + let _ = channel + .blocking_send((chunk, Err(WorldError::IoError(err.kind())))); + return; + } + } } - } - } - // println!("Location table: {:?}", &location_table); - - // wrap file with arc mutex to allow for multithreading - let region_file = Arc::new(Mutex::new(region_file)); - futures::future::join_all(chunks.into_iter().map(|(old_chunk_x, old_chunk_z)| { - let region_file = region_file.clone(); - let modulus = |a: i32, b: i32| ((a % b) + b) % b; - let chunk_x = modulus(old_chunk_x, 32) as u32; - let chunk_z = modulus(old_chunk_z, 32) as u32; - async move { + + let modulus = |a: i32, b: i32| ((a % b) + b) % b; + let chunk_x = modulus(chunk.0, 32) as u32; + let chunk_z = modulus(chunk.1, 32) as u32; + let channel = channel.clone(); let table_entry = (chunk_x + chunk_z * 32) * 4; let mut offset = vec![0u8]; @@ -161,19 +141,24 @@ impl Level { let size = location_table[table_entry as usize + 3] as usize * 4096; if offset == 0 && size == 0 { - return ((old_chunk_x, old_chunk_z), Err(WorldError::ChunkNotFound)); + let _ = + channel.blocking_send(((chunk.0, chunk.1), Err(WorldError::ChunkNotFound))); + return; } // Read the file using the offset and size let mut file_buf = { - let mut region_file = region_file.lock().await; - let seek_result = region_file.seek(std::io::SeekFrom::Start(offset)).await; + let seek_result = region_file.seek(std::io::SeekFrom::Start(offset)); if seek_result.is_err() { - return ((old_chunk_x, old_chunk_z), Err(WorldError::RegionIsInvalid)); + let _ = channel + .blocking_send(((chunk.0, chunk.1), Err(WorldError::RegionIsInvalid))); + return; } let mut out = vec![0; size]; - let read_result = region_file.read_exact(&mut out).await; + let read_result = region_file.read_exact(&mut out); if read_result.is_err() { - return ((old_chunk_x, old_chunk_z), Err(WorldError::RegionIsInvalid)); + let _ = channel + .blocking_send(((chunk.0, chunk.1), Err(WorldError::RegionIsInvalid))); + return; } out }; @@ -186,7 +171,11 @@ impl Level { 2 => Compression::Zlib, 3 => Compression::None, 4 => Compression::LZ4, - _ => return ((old_chunk_x, old_chunk_z), Err(WorldError::RegionIsInvalid)), + _ => { + let _ = + channel.send(((chunk.0, chunk.1), Err(WorldError::RegionIsInvalid))); + return; + } }; match compression { @@ -195,32 +184,22 @@ impl Level { } let size = u32::from_be_bytes(header[0..4].try_into().unwrap()); - + // size includes the compression scheme byte, so we need to subtract 1 let chunk_data = file_buf.drain(0..size as usize - 1).collect_vec(); - ((old_chunk_x, old_chunk_z), Ok(chunk_data)) - } - })) - .await - .into_par_iter() - .map(|((old_chunk_x, old_chunk_z), chunk_data)| { - let chunk_data = match chunk_data { - Ok(c) => c, - Err(e) => return ((old_chunk_x, old_chunk_z), Err(e)), - }; - let mut z = ZlibDecoder::new(&chunk_data[..]); - let mut chunk_data = Vec::new(); - match z.read_to_end(&mut chunk_data) { - Ok(_) => {} - Err(err) => return ((old_chunk_x, old_chunk_z), Err(WorldError::ZlibError(err))), - } - - ( - (old_chunk_x, old_chunk_z), - ChunkData::from_bytes(chunk_data, (old_chunk_x, old_chunk_z)), - ) - }) - .collect() + let mut z = ZlibDecoder::new(&chunk_data[..]); + let mut chunk_data = Vec::new(); + match z.read_to_end(&mut chunk_data) { + Ok(_) => {} + Err(e) => { + let _ = channel.blocking_send((chunk, Err(WorldError::ZlibError(e)))); + return; + } + } + + let _ = channel.blocking_send((chunk, ChunkData::from_bytes(chunk_data, chunk))); + }) + .collect::>(); } } diff --git a/pumpkin/src/server.rs b/pumpkin/src/server.rs index 4c2820204..3b885882c 100644 --- a/pumpkin/src/server.rs +++ b/pumpkin/src/server.rs @@ -30,6 +30,7 @@ use pumpkin_world::{dimension::Dimension, radial_chunk_iterator::RadialIterator} use pumpkin_registry::Registry; use rsa::{traits::PublicKeyParts, RsaPrivateKey, RsaPublicKey}; use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc; use crate::{ client::Client, @@ -330,24 +331,33 @@ impl Server { // TODO: do this in a world async fn spawn_test_chunk(client: &mut Client) { - let chunks = Dimension::OverWorld - .into_level( + let inst = std::time::Instant::now(); + let (sender, mut chunk_receiver) = mpsc::channel(1024); + tokio::spawn(async move { + let level = Dimension::OverWorld.into_level( // TODO: load form config "./world".parse().unwrap(), - ) - .read_chunks(RadialIterator::new(32).collect()) - .await; + ); + level + .read_chunks(RadialIterator::new(32).collect(), sender) + .await; + }); client.send_packet(&CCenterChunk { chunk_x: 0.into(), chunk_z: 0.into(), }); - - chunks.iter().for_each(|chunk| { + + while let Some((chunk_pos, chunk_data)) = chunk_receiver.recv().await { + // dbg!(chunk_pos); + let chunk_data = match chunk_data { + Ok(d) => d, + Err(_) => continue, + }; #[cfg(debug_assertions)] - if chunk.0 == (0, 0) { + if chunk_pos == (0, 0) { let mut test = ByteBuffer::empty(); - CChunkData(chunk.1.as_ref().unwrap()).write(&mut test); + CChunkData(&chunk_data).write(&mut test); let len = test.buf().len(); log::debug!( "Chunk packet size: {}B {}KB {}MB", @@ -356,11 +366,10 @@ impl Server { len / (1024 * 1024) ); } - match &chunk.1 { - Err(err) => {}, - Ok(data) => client.send_packet(&CChunkData(data)), - } - }); + client.send_packet(&CChunkData(&chunk_data)); + } + let t = std::time::Instant::now().duration_since(inst); + dbg!("DONE", t); } // move to world