diff --git a/pumpkin-world/Cargo.toml b/pumpkin-world/Cargo.toml index 38dfc8a01..7e073f6c7 100644 --- a/pumpkin-world/Cargo.toml +++ b/pumpkin-world/Cargo.toml @@ -8,6 +8,7 @@ edition.workspace = true fastnbt = { git = "https://github.com/owengage/fastnbt.git" } fastsnbt = "0.2" tokio.workspace = true +rayon.workspace = true itertools = "0.13.0" thiserror = "1.0.63" futures = "0.3.30" @@ -15,4 +16,3 @@ flate2 = "1.0.31" serde = { version = "1.0.205", features = ["derive"] } lazy_static = "1.5.0" serde_json = "1.0.122" -rayon = "1.10.0" diff --git a/pumpkin-world/src/chunk.rs b/pumpkin-world/src/chunk.rs index 60729c83f..f41034d58 100644 --- a/pumpkin-world/src/chunk.rs +++ b/pumpkin-world/src/chunk.rs @@ -32,7 +32,7 @@ use fastnbt::LongArray; use crate::{world::WorldError, WORLD_HEIGHT}; pub struct ChunkData { - pub blocks: Box<[i64; 16 * 16 * WORLD_HEIGHT]>, + pub blocks: Box<[i32; 16 * 16 * WORLD_HEIGHT]>, pub position: (i32, i32), pub heightmaps: ChunkHeightmaps, } @@ -96,6 +96,7 @@ impl ChunkData { &entry.name, entry.properties.as_ref(), ) + .map(|v| v as i32) }) .collect::, _>>()?; let block_data = match block_states.data { diff --git a/pumpkin-world/src/world.rs b/pumpkin-world/src/world.rs index f297ccdfd..2f2c0d407 100644 --- a/pumpkin-world/src/world.rs +++ b/pumpkin-world/src/world.rs @@ -1,4 +1,8 @@ -use std::{io::Read, path::PathBuf, sync::Arc}; +use std::{ + io::{Read, Seek}, + path::PathBuf, + sync::{atomic::AtomicUsize, Arc}, +}; use flate2::bufread::ZlibDecoder; use itertools::Itertools; @@ -7,7 +11,9 @@ use thiserror::Error; use tokio::{ fs::File, io::{AsyncReadExt, AsyncSeekExt}, - sync::Mutex, + runtime::Handle, + sync::{mpsc, oneshot, Mutex}, + task::spawn_blocking, }; use crate::chunk::ChunkData; @@ -50,107 +56,81 @@ impl Level { Level { root_folder } } - /// Read one chunk in the world - /// - /// Do not use this function if reading many chunks is required, since in case those two chunks which are read seperately using `.read_chunk` are in the same region file, it will need to be opened and closed separately for both of them, leading to a performance loss. - pub async fn read_chunk(&self, chunk: (i32, i32)) -> Result { - self.read_chunks(vec![chunk]) - .await - .pop() - .expect("Read chunks must return a chunk") - .1 - } + // /// Read one chunk in the world + // /// + // /// Do not use this function if reading many chunks is required, since in case those two chunks which are read seperately using `.read_chunk` are in the same region file, it will need to be opened and closed separately for both of them, leading to a performance loss. + // pub async fn read_chunk(&self, chunk: (i32, i32)) -> Result { + // self.read_chunks(vec![chunk]) + // .await + // .pop() + // .expect("Read chunks must return a chunk") + // .1 + // } /// Read many chunks in a world + /// MUST be called from a tokio runtime thread /// /// Note: The order of the output chunks will almost never be in the same order as the order of input chunks pub async fn read_chunks( &self, chunks: Vec<(i32, i32)>, - ) -> Vec<((i32, i32), Result)> { - futures::future::join_all( - chunks - .into_iter() - // split chunks into their corresponding region files to be able to read all of them at once, instead of reopening the file multiple times - .chunk_by(|chunk| { - ( - ((chunk.0 as f32) / 32.0).floor() as i32, - ((chunk.1 as f32) / 32.0).floor() as i32, - ) - }) - .into_iter() - .map(|(region, chunk_vec)| { - let mut path = self.root_folder.clone(); - let chunk_vec = chunk_vec.collect_vec(); - path.push("region"); - path.push(format!("r.{}.{}.mca", region.0, region.1)); - self.read_region_chunks(path, chunk_vec) - }), - ) - .await - .into_iter() - .flatten() - .collect_vec() - } - async fn read_region_chunks( - &self, - region_file_path: PathBuf, - chunks: Vec<(i32, i32)>, - ) -> Vec<((i32, i32), Result)> { - // dbg!(at); - // return different error when file is not found (because that means that the chunks have just not been generated yet) - let mut region_file = match File::open(®ion_file_path).await { - Ok(f) => f, - Err(err) => match err.kind() { - std::io::ErrorKind::NotFound => { - return chunks - .into_iter() - .map(|c| (c, Err(WorldError::RegionNotFound))) - .collect_vec() - } - _ => { - return chunks - .into_iter() - .map(|c| (c, Err(WorldError::IoError(err.kind())))) - .collect_vec() - } - }, - }; - - let mut location_table: [u8; 4096] = [0; 4096]; - let mut timestamp_table: [u8; 4096] = [0; 4096]; - - // fill the location and timestamp tables - { - match region_file.read_exact(&mut location_table).await { - Ok(_) => {} - Err(err) => { - return chunks - .into_iter() - .map(|c| (c, Err(WorldError::IoError(err.kind())))) - .collect_vec() - } - } - match region_file.read_exact(&mut timestamp_table).await { - Ok(_) => {} - Err(err) => { - return chunks - .into_iter() - .map(|c| (c, Err(WorldError::IoError(err.kind())))) - .collect_vec() + channel: mpsc::Sender<((i32, i32), Result)>, + ) { + chunks + .into_par_iter() + .map(|chunk| { + let region = ( + ((chunk.0 as f32) / 32.0).floor() as i32, + ((chunk.1 as f32) / 32.0).floor() as i32, + ); + let channel = channel.clone(); + let mut region_file_path = self.root_folder.clone(); + region_file_path.push("region"); + region_file_path.push(format!("r.{}.{}.mca", region.0, region.1)); + + // return different error when file is not found (because that means that the chunks have just not been generated yet) + let mut region_file = match std::fs::File::options().read(true).write(false).open(®ion_file_path) { + Ok(f) => f, + Err(err) => match err.kind() { + std::io::ErrorKind::NotFound => { + let _ = channel.blocking_send((chunk, Err(WorldError::RegionNotFound))); + return; + } + _ => { + let _ = channel + .blocking_send((chunk, Err(WorldError::IoError(err.kind())))); + return; + } + }, + }; + + let mut location_table: [u8; 4096] = [0; 4096]; + let mut timestamp_table: [u8; 4096] = [0; 4096]; + + // fill the location and timestamp tables + { + match region_file.read_exact(&mut location_table) { + Ok(_) => {} + Err(err) => { + let _ = channel + .blocking_send((chunk, Err(WorldError::IoError(err.kind())))); + return; + } + } + match region_file.read_exact(&mut timestamp_table) { + Ok(_) => {} + Err(err) => { + let _ = channel + .blocking_send((chunk, Err(WorldError::IoError(err.kind())))); + return; + } + } } - } - } - // println!("Location table: {:?}", &location_table); - - // wrap file with arc mutex to allow for multithreading - let region_file = Arc::new(Mutex::new(region_file)); - futures::future::join_all(chunks.into_iter().map(|(old_chunk_x, old_chunk_z)| { - let region_file = region_file.clone(); - let modulus = |a: i32, b: i32| ((a % b) + b) % b; - let chunk_x = modulus(old_chunk_x, 32) as u32; - let chunk_z = modulus(old_chunk_z, 32) as u32; - async move { + + let modulus = |a: i32, b: i32| ((a % b) + b) % b; + let chunk_x = modulus(chunk.0, 32) as u32; + let chunk_z = modulus(chunk.1, 32) as u32; + let channel = channel.clone(); let table_entry = (chunk_x + chunk_z * 32) * 4; let mut offset = vec![0u8]; @@ -161,19 +141,24 @@ impl Level { let size = location_table[table_entry as usize + 3] as usize * 4096; if offset == 0 && size == 0 { - return ((old_chunk_x, old_chunk_z), Err(WorldError::ChunkNotFound)); + let _ = + channel.blocking_send(((chunk.0, chunk.1), Err(WorldError::ChunkNotFound))); + return; } // Read the file using the offset and size let mut file_buf = { - let mut region_file = region_file.lock().await; - let seek_result = region_file.seek(std::io::SeekFrom::Start(offset)).await; + let seek_result = region_file.seek(std::io::SeekFrom::Start(offset)); if seek_result.is_err() { - return ((old_chunk_x, old_chunk_z), Err(WorldError::RegionIsInvalid)); + let _ = channel + .blocking_send(((chunk.0, chunk.1), Err(WorldError::RegionIsInvalid))); + return; } let mut out = vec![0; size]; - let read_result = region_file.read_exact(&mut out).await; + let read_result = region_file.read_exact(&mut out); if read_result.is_err() { - return ((old_chunk_x, old_chunk_z), Err(WorldError::RegionIsInvalid)); + let _ = channel + .blocking_send(((chunk.0, chunk.1), Err(WorldError::RegionIsInvalid))); + return; } out }; @@ -186,7 +171,11 @@ impl Level { 2 => Compression::Zlib, 3 => Compression::None, 4 => Compression::LZ4, - _ => return ((old_chunk_x, old_chunk_z), Err(WorldError::RegionIsInvalid)), + _ => { + let _ = + channel.send(((chunk.0, chunk.1), Err(WorldError::RegionIsInvalid))); + return; + } }; match compression { @@ -195,32 +184,22 @@ impl Level { } let size = u32::from_be_bytes(header[0..4].try_into().unwrap()); - + // size includes the compression scheme byte, so we need to subtract 1 let chunk_data = file_buf.drain(0..size as usize - 1).collect_vec(); - ((old_chunk_x, old_chunk_z), Ok(chunk_data)) - } - })) - .await - .into_par_iter() - .map(|((old_chunk_x, old_chunk_z), chunk_data)| { - let chunk_data = match chunk_data { - Ok(c) => c, - Err(e) => return ((old_chunk_x, old_chunk_z), Err(e)), - }; - let mut z = ZlibDecoder::new(&chunk_data[..]); - let mut chunk_data = Vec::new(); - match z.read_to_end(&mut chunk_data) { - Ok(_) => {} - Err(err) => return ((old_chunk_x, old_chunk_z), Err(WorldError::ZlibError(err))), - } - - ( - (old_chunk_x, old_chunk_z), - ChunkData::from_bytes(chunk_data, (old_chunk_x, old_chunk_z)), - ) - }) - .collect() + let mut z = ZlibDecoder::new(&chunk_data[..]); + let mut chunk_data = Vec::new(); + match z.read_to_end(&mut chunk_data) { + Ok(_) => {} + Err(e) => { + let _ = channel.blocking_send((chunk, Err(WorldError::ZlibError(e)))); + return; + } + } + + let _ = channel.blocking_send((chunk, ChunkData::from_bytes(chunk_data, chunk))); + }) + .collect::>(); } } diff --git a/pumpkin/src/server.rs b/pumpkin/src/server.rs index 4c2820204..3b885882c 100644 --- a/pumpkin/src/server.rs +++ b/pumpkin/src/server.rs @@ -30,6 +30,7 @@ use pumpkin_world::{dimension::Dimension, radial_chunk_iterator::RadialIterator} use pumpkin_registry::Registry; use rsa::{traits::PublicKeyParts, RsaPrivateKey, RsaPublicKey}; use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc; use crate::{ client::Client, @@ -330,24 +331,33 @@ impl Server { // TODO: do this in a world async fn spawn_test_chunk(client: &mut Client) { - let chunks = Dimension::OverWorld - .into_level( + let inst = std::time::Instant::now(); + let (sender, mut chunk_receiver) = mpsc::channel(1024); + tokio::spawn(async move { + let level = Dimension::OverWorld.into_level( // TODO: load form config "./world".parse().unwrap(), - ) - .read_chunks(RadialIterator::new(32).collect()) - .await; + ); + level + .read_chunks(RadialIterator::new(32).collect(), sender) + .await; + }); client.send_packet(&CCenterChunk { chunk_x: 0.into(), chunk_z: 0.into(), }); - - chunks.iter().for_each(|chunk| { + + while let Some((chunk_pos, chunk_data)) = chunk_receiver.recv().await { + // dbg!(chunk_pos); + let chunk_data = match chunk_data { + Ok(d) => d, + Err(_) => continue, + }; #[cfg(debug_assertions)] - if chunk.0 == (0, 0) { + if chunk_pos == (0, 0) { let mut test = ByteBuffer::empty(); - CChunkData(chunk.1.as_ref().unwrap()).write(&mut test); + CChunkData(&chunk_data).write(&mut test); let len = test.buf().len(); log::debug!( "Chunk packet size: {}B {}KB {}MB", @@ -356,11 +366,10 @@ impl Server { len / (1024 * 1024) ); } - match &chunk.1 { - Err(err) => {}, - Ok(data) => client.send_packet(&CChunkData(data)), - } - }); + client.send_packet(&CChunkData(&chunk_data)); + } + let t = std::time::Instant::now().duration_since(inst); + dbg!("DONE", t); } // move to world