From 60f1bb619576bb434168749d5a9c2533ec68eda7 Mon Sep 17 00:00:00 2001 From: kralverde <80051564+kralverde@users.noreply.github.com> Date: Mon, 21 Oct 2024 15:06:50 -0400 Subject: [PATCH 1/9] More debug messages for packets (#156) * rebase to master * rebase to master * rebase to master * rebase to master * add debug messages for chunk sender * add more debug messages for chunk sender * add more debug messages for chunk sender * fix dead lock * rebase to master * revert fixs leaving only debug messages * better logging --- pumpkin-world/src/level.rs | 2 +- pumpkin/src/client/client_packet.rs | 32 +++++---- pumpkin/src/client/mod.rs | 102 +++++++++++++++++++--------- pumpkin/src/entity/player.rs | 33 +++------ pumpkin/src/main.rs | 1 + pumpkin/src/world/mod.rs | 21 +++++- pumpkin/src/world/player_chunker.rs | 15 +++- 7 files changed, 136 insertions(+), 70 deletions(-) diff --git a/pumpkin-world/src/level.rs b/pumpkin-world/src/level.rs index 5cfae743d..8fff3a6cf 100644 --- a/pumpkin-world/src/level.rs +++ b/pumpkin-world/src/level.rs @@ -123,7 +123,7 @@ impl Level { let dropped_chunk_data = dropped_chunks .iter() .filter_map(|chunk| { - log::debug!("Unloading chunk {:?}", chunk); + //log::debug!("Unloading chunk {:?}", chunk); loaded_chunks.remove_entry(*chunk) }) .collect(); diff --git a/pumpkin/src/client/client_packet.rs b/pumpkin/src/client/client_packet.rs index fe43d2cbf..05e57d5c2 100644 --- a/pumpkin/src/client/client_packet.rs +++ b/pumpkin/src/client/client_packet.rs @@ -34,12 +34,16 @@ use super::{authentication::AuthError, Client, PlayerConfig}; /// NEVER TRUST THE CLIENT. HANDLE EVERY ERROR, UNWRAP/EXPECT impl Client { pub async fn handle_handshake(&self, handshake: SHandShake) { - log::debug!("handshake"); let version = handshake.protocol_version.0; self.protocol_version .store(version, std::sync::atomic::Ordering::Relaxed); *self.server_address.lock().await = handshake.server_address; + log::debug!( + "Handshake: id {} is now in state {:?}", + self.id, + &handshake.next_state + ); self.connection_state.store(handshake.next_state); if self.connection_state.load() != ConnectionState::Status { let protocol = version; @@ -56,11 +60,12 @@ impl Client { } pub async fn handle_status_request(&self, server: &Server, _status_request: SStatusRequest) { + log::debug!("Handling status request for id {}", self.id); self.send_packet(&server.get_status()).await; } pub async fn handle_ping_request(&self, ping_request: SStatusPingRequest) { - log::debug!("recieved ping request"); + log::debug!("Handling ping request for id {}", self.id); self.send_packet(&CPingResponse::new(ping_request.payload)) .await; self.close(); @@ -74,7 +79,11 @@ impl Client { } pub async fn handle_login_start(&self, server: &Server, login_start: SLoginStart) { - log::debug!("login start, State {:?}", self.connection_state); + log::debug!( + "login start for id {}, State {:?}", + self.id, + self.connection_state + ); if !Self::is_valid_player_name(&login_start.name) { self.kick("Invalid characters in username").await; @@ -127,13 +136,8 @@ impl Client { server: &Server, encryption_response: SEncryptionResponse, ) { - let shared_secret = match server.decrypt(&encryption_response.shared_secret) { - Ok(shared_secret) => shared_secret, - Err(error) => { - self.kick(&error.to_string()).await; - return; - } - }; + log::debug!("Handling encryption for id {}", self.id); + let shared_secret = server.decrypt(&encryption_response.shared_secret).unwrap(); if let Err(error) = self.set_encryption(Some(&shared_secret)).await { self.kick(&error.to_string()).await; @@ -223,6 +227,7 @@ impl Client { } pub async fn handle_plugin_response(&self, plugin_response: SLoginPluginResponse) { + log::debug!("Handling plugin for id {}", self.id); let velocity_config = &ADVANCED_CONFIG.proxy.velocity; if velocity_config.enabled { let mut address = self.address.lock().await; @@ -246,6 +251,7 @@ impl Client { server: &Server, _login_acknowledged: SLoginAcknowledged, ) { + log::debug!("Handling login acknowledged for id {}", self.id); self.connection_state.store(ConnectionState::Config); self.send_packet(&server.get_branding()).await; @@ -282,7 +288,7 @@ impl Client { &self, client_information: SClientInformationConfig, ) { - log::debug!("got client settings"); + log::debug!("Handling client settings for id {}", self.id); if let (Some(main_hand), Some(chat_mode)) = ( Hand::from_i32(client_information.main_hand.into()), ChatMode::from_i32(client_information.chat_mode.into()), @@ -303,6 +309,7 @@ impl Client { } pub async fn handle_plugin_message(&self, plugin_message: SPluginMessage) { + log::debug!("Handling plugin message for id {}", self.id); if plugin_message.channel.starts_with("minecraft:brand") || plugin_message.channel.starts_with("MC|Brand") { @@ -315,6 +322,7 @@ impl Client { } pub async fn handle_known_packs(&self, server: &Server, _config_acknowledged: SKnownPacks) { + log::debug!("Handling known packs for id {}", self.id); for registry in &server.cached_registry { self.send_packet(&CRegistryData::new( ®istry.registry_id, @@ -329,7 +337,7 @@ impl Client { } pub fn handle_config_acknowledged(&self, _config_acknowledged: &SAcknowledgeFinishConfig) { - log::debug!("config acknowledged"); + log::debug!("Handling config acknowledge for id {}", self.id); self.connection_state.store(ConnectionState::Play); self.make_player .store(true, std::sync::atomic::Ordering::Relaxed); diff --git a/pumpkin/src/client/mod.rs b/pumpkin/src/client/mod.rs index 5bbfc76be..4294bd7ef 100644 --- a/pumpkin/src/client/mod.rs +++ b/pumpkin/src/client/mod.rs @@ -175,44 +175,65 @@ impl Client { /// Send a Clientbound Packet to the Client pub async fn send_packet(&self, packet: &P) { + log::debug!("Sending packet with id {} to {}", P::PACKET_ID, self.id); // assert!(!self.closed); let mut enc = self.enc.lock().await; if let Err(error) = enc.append_packet(packet) { self.kick(&error.to_string()).await; return; } - if let Err(error) = self - .connection_writer - .lock() - .await + + let mut writer = self.connection_writer.lock().await; + if let Err(error) = writer .write_all(&enc.take()) .await .map_err(|_| PacketError::ConnectionWrite) { self.kick(&error.to_string()).await; + } else if let Err(error) = writer.flush().await { + log::warn!( + "Failed to flush writer for id {}: {}", + self.id, + error.to_string() + ); } } pub async fn try_send_packet(&self, packet: &P) -> Result<(), PacketError> { // assert!(!self.closed); + log::debug!( + "Trying to send packet with id {} to {}", + P::PACKET_ID, + self.id + ); let mut enc = self.enc.lock().await; enc.append_packet(packet)?; - self.connection_writer - .lock() - .await + + let mut writer = self.connection_writer.lock().await; + writer .write_all(&enc.take()) .await .map_err(|_| PacketError::ConnectionWrite)?; + + writer + .flush() + .await + .map_err(|_| PacketError::ConnectionWrite)?; Ok(()) } /// Processes all packets send by the client pub async fn process_packets(&self, server: &Arc) { - while let Some(mut packet) = self.client_packets_queue.lock().await.pop_front() { + let mut packet_queue = self.client_packets_queue.lock().await; + while let Some(mut packet) = packet_queue.pop_front() { if let Err(error) = self.handle_packet(server, &mut packet).await { let text = format!("Error while reading incoming packet {error}"); - log::error!("{text}"); + log::error!( + "Failed to read incoming packet with id {}: {}", + i32::from(packet.id), + error + ); self.kick(&text).await; }; } @@ -250,15 +271,19 @@ impl Client { &self, packet: &mut RawPacket, ) -> Result<(), DeserializerError> { + log::debug!("Handling handshake group for id {}", self.id); let bytebuf = &mut packet.bytebuf; - if packet.id.0 == SHandShake::PACKET_ID { - self.handle_handshake(SHandShake::read(bytebuf)?).await; - } else { - log::error!( - "Failed to handle packet id {} while in Handshake state", - packet.id.0 - ); - } + match packet.id.0 { + SHandShake::PACKET_ID => { + self.handle_handshake(SHandShake::read(bytebuf)?).await; + } + _ => { + log::error!( + "Failed to handle packet id {} while in Handshake state", + packet.id.0 + ); + } + }; Ok(()) } @@ -267,6 +292,7 @@ impl Client { server: &Arc, packet: &mut RawPacket, ) -> Result<(), DeserializerError> { + log::debug!("Handling status group for id {}", self.id); let bytebuf = &mut packet.bytebuf; match packet.id.0 { SStatusRequest::PACKET_ID => { @@ -283,7 +309,7 @@ impl Client { packet.id.0 ); } - } + }; Ok(()) } @@ -292,6 +318,7 @@ impl Client { server: &Arc, packet: &mut RawPacket, ) -> Result<(), DeserializerError> { + log::debug!("Handling login group for id {}", self.id); let bytebuf = &mut packet.bytebuf; match packet.id.0 { SLoginStart::PACKET_ID => { @@ -316,7 +343,7 @@ impl Client { packet.id.0 ); } - } + }; Ok(()) } @@ -325,6 +352,7 @@ impl Client { server: &Arc, packet: &mut RawPacket, ) -> Result<(), DeserializerError> { + log::debug!("Handling config group for id {}", self.id); let bytebuf = &mut packet.bytebuf; match packet.id.0 { SClientInformationConfig::PACKET_ID => { @@ -348,7 +376,7 @@ impl Client { packet.id.0 ); } - } + }; Ok(()) } @@ -358,26 +386,38 @@ impl Client { pub async fn poll(&self) -> bool { loop { let mut dec = self.dec.lock().await; - if let Ok(Some(packet)) = dec.decode() { - self.add_packet(packet).await; - return true; - }; + + match dec.decode() { + Ok(Some(packet)) => { + self.add_packet(packet).await; + return true; + } + Ok(None) => log::debug!("Waiting for more data to complete packet..."), + Err(err) => log::warn!( + "Failed to decode packet for id {}: {}", + self.id, + err.to_string() + ), + } dec.reserve(4096); let mut buf = dec.take_capacity(); - match self.connection_reader.lock().await.read_buf(&mut buf).await { - Ok(0) => { - self.close(); - return false; + let bytes_read = self.connection_reader.lock().await.read_buf(&mut buf).await; + match bytes_read { + Ok(cnt) => { + log::debug!("Read {} bytes", cnt); + if cnt == 0 { + self.close(); + return false; + } } Err(error) => { log::error!("Error while reading incoming packet {}", error); self.close(); return false; } - _ => {} - } + }; // This should always be an O(1) unsplit because we reserved space earlier and // the call to `read_buf` shouldn't have grown the allocation. @@ -387,7 +427,7 @@ impl Client { /// Kicks the Client with a reason depending on the connection state pub async fn kick(&self, reason: &str) { - log::debug!("Kicking client with reason: {}", reason); + log::info!("Kicking for id {} for {}", self.id, reason); match self.connection_state.load() { ConnectionState::Login => { self.try_send_packet(&CLoginDisconnect::new( diff --git a/pumpkin/src/entity/player.rs b/pumpkin/src/entity/player.rs index dd8c23b41..ab388f3cd 100644 --- a/pumpkin/src/entity/player.rs +++ b/pumpkin/src/entity/player.rs @@ -353,110 +353,97 @@ impl Player { server: &Arc, packet: &mut RawPacket, ) -> Result<(), Box> { + /* + log::debug!( + "Handling player packet with id {} for {}", + packet.id.0, + self.client.id + ); + */ + let bytebuf = &mut packet.bytebuf; match packet.id.0 { SConfirmTeleport::PACKET_ID => { self.handle_confirm_teleport(SConfirmTeleport::read(bytebuf)?) .await; - Ok(()) } SChatCommand::PACKET_ID => { self.handle_chat_command(server, SChatCommand::read(bytebuf)?) .await; - Ok(()) } SPlayerPosition::PACKET_ID => { self.handle_position(SPlayerPosition::read(bytebuf)?).await; - Ok(()) } SPlayerPositionRotation::PACKET_ID => { self.handle_position_rotation(SPlayerPositionRotation::read(bytebuf)?) .await; - Ok(()) } SPlayerRotation::PACKET_ID => { self.handle_rotation(SPlayerRotation::read(bytebuf)?).await; - Ok(()) } SSetPlayerGround::PACKET_ID => { self.handle_player_ground(&SSetPlayerGround::read(bytebuf)?); - Ok(()) } SPlayerCommand::PACKET_ID => { self.handle_player_command(SPlayerCommand::read(bytebuf)?) .await; - Ok(()) } SSwingArm::PACKET_ID => { self.handle_swing_arm(SSwingArm::read(bytebuf)?).await; - Ok(()) } SChatMessage::PACKET_ID => { self.handle_chat_message(SChatMessage::read(bytebuf)?).await; - Ok(()) } SClientInformationPlay::PACKET_ID => { self.handle_client_information_play(SClientInformationPlay::read(bytebuf)?) .await; - Ok(()) } SInteract::PACKET_ID => { self.handle_interact(server, SInteract::read(bytebuf)?) .await; - Ok(()) } SPlayerAction::PACKET_ID => { self.handle_player_action(SPlayerAction::read(bytebuf)?) .await; - Ok(()) } SPlayerAbilities::PACKET_ID => { self.handle_player_abilities(SPlayerAbilities::read(bytebuf)?) .await; - Ok(()) } SUseItemOn::PACKET_ID => { self.handle_use_item_on(SUseItemOn::read(bytebuf)?).await; - Ok(()) } SUseItem::PACKET_ID => { self.handle_use_item(&SUseItem::read(bytebuf)?); - Ok(()) } SSetHeldItem::PACKET_ID => { self.handle_set_held_item(SSetHeldItem::read(bytebuf)?) .await; - Ok(()) } SSetCreativeSlot::PACKET_ID => { self.handle_set_creative_slot(SSetCreativeSlot::read(bytebuf)?) .await?; - Ok(()) } SPlayPingRequest::PACKET_ID => { self.handle_play_ping_request(SPlayPingRequest::read(bytebuf)?) .await; - Ok(()) } SClickContainer::PACKET_ID => { self.handle_click_container(server, SClickContainer::read(bytebuf)?) .await?; - Ok(()) } SCloseContainer::PACKET_ID => { self.handle_close_container(server, SCloseContainer::read(bytebuf)?) .await; - Ok(()) } SKeepAlive::PACKET_ID => { self.handle_keep_alive(SKeepAlive::read(bytebuf)?).await; - Ok(()) } _ => { log::error!("Failed to handle player packet id {:#04x}", packet.id.0); - Ok(()) } - } + }; + Ok(()) } } diff --git a/pumpkin/src/main.rs b/pumpkin/src/main.rs index a25ed3a4d..74c51e921 100644 --- a/pumpkin/src/main.rs +++ b/pumpkin/src/main.rs @@ -205,6 +205,7 @@ async fn main() -> io::Result<()> { .load(std::sync::atomic::Ordering::Relaxed) { let id = client.id; + log::debug!("Creating player for id {}", id); let (player, world) = server.add_player(id, client).await; world.spawn_player(&BASIC_CONFIG, player.clone()).await; // poll Player diff --git a/pumpkin/src/world/mod.rs b/pumpkin/src/world/mod.rs index 5626862f0..7aec512b8 100644 --- a/pumpkin/src/world/mod.rs +++ b/pumpkin/src/world/mod.rs @@ -93,8 +93,13 @@ impl World { // This code follows the vanilla packet order let entity_id = player.entity_id(); let gamemode = player.gamemode.load(); - log::debug!("spawning player, entity id {}", entity_id); + log::debug!( + "spawning player {}, entity id {}", + player.client.id, + entity_id + ); + log::debug!("Sending login packet to {}", player.client.id); // login packet for our new player player .client @@ -120,9 +125,10 @@ impl World { false, )) .await; - log::debug!("sending abilities"); + // player abilities // TODO: this is for debug purpose, remove later + log::debug!("Sending player abilities to {}", player.client.id); player .client .send_packet(&CPlayerAbilities::new(0x02, 0.4, 0.1)) @@ -134,6 +140,8 @@ impl World { let z = 10.0; let yaw = 10.0; let pitch = 10.0; + + log::debug!("Sending player teleport to {}", player.client.id); player.teleport(x, y, z, yaw, pitch).await; let pos = player.living_entity.entity.pos.load(); @@ -142,6 +150,7 @@ impl World { let gameprofile = &player.gameprofile; // first send info update to our new player, So he can see his Skin // also send his info to everyone else + log::debug!("Broadcasting player info for {}", player.client.id); self.broadcast_packet_all(&CPlayerInfoUpdate::new( 0x01 | 0x08, &[pumpkin_protocol::client::play::Player { @@ -177,6 +186,7 @@ impl World { ], }); } + log::debug!("Sending player info to {}", player.client.id); player .client .send_packet(&CPlayerInfoUpdate::new(0x01 | 0x08, &entries)) @@ -185,6 +195,7 @@ impl World { let gameprofile = &player.gameprofile; + log::debug!("Broadcasting player spawn for {}", player.client.id); // spawn player for every client self.broadcast_packet_expect( &[player.client.id], @@ -218,6 +229,7 @@ impl World { let entity = &existing_player.living_entity.entity; let pos = entity.pos.load(); let gameprofile = &existing_player.gameprofile; + log::debug!("Sending player entities to {}", player.client.id); player .client .send_packet(&CSpawnEntity::new( @@ -244,10 +256,12 @@ impl World { entity_id.into(), Metadata::new(17, VarInt(0), config.skin_parts), ); + log::debug!("Broadcasting skin for {}", player.client.id); self.broadcast_packet_all(&packet).await; } // Start waiting for level chunks, Sets the "Loading Terrain" screen + log::debug!("Sending waiting chunks to {}", player.client.id); player .client .send_packet(&CGameEvent::new(GameEvent::StartWaitingChunks, 0.0)) @@ -277,15 +291,18 @@ impl World { } let inst = std::time::Instant::now(); let (sender, mut chunk_receiver) = mpsc::channel(distance as usize); + let client_id = client.id; let level = self.level.clone(); let chunks = Arc::new(chunks); tokio::spawn(async move { + log::debug!("Spawned chunk fetcher for {}", client_id); let level = level.lock().await; level.fetch_chunks(&chunks, sender); }); tokio::spawn(async move { + log::debug!("Spawned chunk sender for {}", client_id); while let Some(chunk_data) = chunk_receiver.recv().await { // dbg!(chunk_pos); #[cfg(debug_assertions)] diff --git a/pumpkin/src/world/player_chunker.rs b/pumpkin/src/world/player_chunker.rs index 5cfc89fa6..3965ba560 100644 --- a/pumpkin/src/world/player_chunker.rs +++ b/pumpkin/src/world/player_chunker.rs @@ -25,6 +25,8 @@ pub async fn player_join(world: &World, player: Arc) { player.watched_section.store(new_watched); let watched_section = new_watched; let chunk_pos = player.living_entity.entity.chunk_pos.load(); + + log::debug!("Sending center chunk to {}", player.client.id); player .client .send_packet(&CCenterChunk { @@ -34,10 +36,12 @@ pub async fn player_join(world: &World, player: Arc) { .await; let view_distance = i32::from(get_view_distance(&player).await); log::debug!( - "Player {} joined with view distance: {}", + "Player {} ({}) joined with view distance: {}", player.gameprofile.name, + player.client.id, view_distance ); + let old_cylindrical = Cylindrical::new( Vector2::new(watched_section.x, watched_section.z), view_distance, @@ -59,6 +63,15 @@ pub async fn player_join(world: &World, player: Arc) { }, true, ); + + log::debug!( + "{} added {} remove ({}) for {}", + loading_chunks.len(), + unloading_chunks.len(), + view_distance, + player.client.id + ); + if !loading_chunks.is_empty() { world.mark_chunks_as_watched(&loading_chunks).await; world.spawn_world_chunks(player.client.clone(), loading_chunks, view_distance); From 1e15f8b3baad022b7c243219516a75ee9e798c46 Mon Sep 17 00:00:00 2001 From: Edvin Bryntesson Date: Sun, 20 Oct 2024 16:15:34 +0200 Subject: [PATCH 2/9] implemented persistent block breaking --- .../src/client/play/c_chunk_data.rs | 4 +- pumpkin-world/src/chunk/anvil.rs | 1 + pumpkin-world/src/chunk/mod.rs | 3 +- pumpkin-world/src/level.rs | 152 ++++++++++-------- pumpkin/src/client/player_packet.rs | 18 +-- pumpkin/src/world/mod.rs | 68 ++++++-- 6 files changed, 148 insertions(+), 98 deletions(-) diff --git a/pumpkin-protocol/src/client/play/c_chunk_data.rs b/pumpkin-protocol/src/client/play/c_chunk_data.rs index 0a68eace2..85c12effd 100644 --- a/pumpkin-protocol/src/client/play/c_chunk_data.rs +++ b/pumpkin-protocol/src/client/play/c_chunk_data.rs @@ -6,9 +6,9 @@ use pumpkin_macros::packet; use pumpkin_world::{chunk::ChunkData, DIRECT_PALETTE_BITS}; #[packet(0x27)] -pub struct CChunkData<'a>(pub &'a ChunkData); +pub struct CChunkData(pub ChunkData); -impl<'a> ClientPacket for CChunkData<'a> { +impl ClientPacket for CChunkData { fn write(&self, buf: &mut crate::bytebuf::ByteBuffer) { // Chunk X buf.put_i32(self.0.position.x); diff --git a/pumpkin-world/src/chunk/anvil.rs b/pumpkin-world/src/chunk/anvil.rs index 983b3fea1..06705fb08 100644 --- a/pumpkin-world/src/chunk/anvil.rs +++ b/pumpkin-world/src/chunk/anvil.rs @@ -10,6 +10,7 @@ use crate::level::SaveFile; use super::{ChunkData, ChunkReader, ChunkReadingError, CompressionError}; +#[derive(Clone)] pub struct AnvilChunkReader {} impl Default for AnvilChunkReader { diff --git a/pumpkin-world/src/chunk/mod.rs b/pumpkin-world/src/chunk/mod.rs index c75de68a6..9136de62b 100644 --- a/pumpkin-world/src/chunk/mod.rs +++ b/pumpkin-world/src/chunk/mod.rs @@ -54,11 +54,12 @@ pub enum CompressionError { LZ4Error(std::io::Error), } +#[derive(Clone)] pub struct ChunkData { pub blocks: ChunkBlocks, pub position: Vector2, } - +#[derive(Clone)] pub struct ChunkBlocks { // TODO make this a Vec that doesn't store the upper layers that only contain air diff --git a/pumpkin-world/src/level.rs b/pumpkin-world/src/level.rs index 8fff3a6cf..b4207583a 100644 --- a/pumpkin-world/src/level.rs +++ b/pumpkin-world/src/level.rs @@ -1,16 +1,16 @@ use std::{collections::HashMap, path::PathBuf, sync::Arc}; -use parking_lot::{Mutex, RwLock}; -use pumpkin_core::math::vector2::Vector2; -use rayon::prelude::*; -use tokio::sync::mpsc; - use crate::{ chunk::{ anvil::AnvilChunkReader, ChunkData, ChunkParsingError, ChunkReader, ChunkReadingError, }, world_gen::{get_world_gen, Seed, WorldGenerator}, }; +use pumpkin_core::math::vector2::Vector2; +use tokio::sync::mpsc; +use tokio::sync::{Mutex, RwLock}; + +type RAMChunkStorage = Arc, Arc>>>>; /// The `Level` module provides functionality for working with chunks within or outside a Minecraft world. /// @@ -23,12 +23,12 @@ use crate::{ /// For more details on world generation, refer to the `WorldGenerator` module. pub struct Level { save_file: Option, - loaded_chunks: Arc, Arc>>>, + loaded_chunks: RAMChunkStorage, chunk_watchers: Arc, usize>>>, - chunk_reader: Box, - world_gen: Box, + chunk_reader: Arc>, + world_gen: Arc>, } - +#[derive(Clone)] pub struct SaveFile { #[expect(dead_code)] root_folder: PathBuf, @@ -37,7 +37,7 @@ pub struct SaveFile { impl Level { pub fn from_root_folder(root_folder: PathBuf) -> Self { - let world_gen = get_world_gen(Seed(0)); // TODO Read Seed from config. + let world_gen = get_world_gen(Seed(0)).into(); // TODO Read Seed from config. if root_folder.exists() { let region_folder = root_folder.join("region"); @@ -52,7 +52,7 @@ impl Level { root_folder, region_folder, }), - chunk_reader: Box::new(AnvilChunkReader::new()), + chunk_reader: Arc::new(Box::new(AnvilChunkReader::new())), loaded_chunks: Arc::new(RwLock::new(HashMap::new())), chunk_watchers: Arc::new(Mutex::new(HashMap::new())), } @@ -64,7 +64,7 @@ impl Level { Self { world_gen, save_file: None, - chunk_reader: Box::new(AnvilChunkReader::new()), + chunk_reader: Arc::new(Box::new(AnvilChunkReader::new())), loaded_chunks: Arc::new(RwLock::new(HashMap::new())), chunk_watchers: Arc::new(Mutex::new(HashMap::new())), } @@ -76,8 +76,8 @@ impl Level { /// Marks chunks as "watched" by a unique player. When no players are watching a chunk, /// it is removed from memory. Should only be called on chunks the player was not watching /// before - pub fn mark_chunk_as_newly_watched(&self, chunks: &[Vector2]) { - let mut watchers = self.chunk_watchers.lock(); + pub async fn mark_chunk_as_newly_watched(&self, chunks: &[Vector2]) { + let mut watchers = self.chunk_watchers.lock().await; for chunk in chunks { match watchers.entry(*chunk) { std::collections::hash_map::Entry::Occupied(mut occupied) => { @@ -93,9 +93,9 @@ impl Level { /// Marks chunks no longer "watched" by a unique player. When no players are watching a chunk, /// it is removed from memory. Should only be called on chunks the player was watching before - pub fn mark_chunk_as_not_watched_and_clean(&self, chunks: &[Vector2]) { + pub async fn mark_chunk_as_not_watched_and_clean(&self, chunks: &[Vector2]) { let dropped_chunks = { - let mut watchers = self.chunk_watchers.lock(); + let mut watchers = self.chunk_watchers.lock().await; chunks .iter() .filter(|chunk| match watchers.entry(**chunk) { @@ -119,7 +119,7 @@ impl Level { }) .collect::>() }; - let mut loaded_chunks = self.loaded_chunks.write(); + let mut loaded_chunks = self.loaded_chunks.write().await; let dropped_chunk_data = dropped_chunks .iter() .filter_map(|chunk| { @@ -130,7 +130,7 @@ impl Level { self.write_chunks(dropped_chunk_data); } - pub fn write_chunks(&self, _chunks_to_write: Vec<(Vector2, Arc)>) { + pub fn write_chunks(&self, _chunks_to_write: Vec<(Vector2, Arc>)>) { //TODO } @@ -138,58 +138,76 @@ impl Level { /// MUST be called from a tokio runtime thread /// /// Note: The order of the output chunks will almost never be in the same order as the order of input chunks + pub fn fetch_chunks( + &self, + chunks: &[Vector2], + channel: mpsc::Sender>>, + ) { + for chunk in chunks { + { + let chunk_location = *chunk; + let channel = channel.clone(); + let loaded_chunks = self.loaded_chunks.clone(); + let chunk_reader = self.chunk_reader.clone(); + let save_file = self.save_file.clone(); + let world_gen = self.world_gen.clone(); + tokio::spawn(async move { + let loaded_chunks_read = loaded_chunks.read().await; + let possibly_loaded_chunk = loaded_chunks_read.get(&chunk_location).cloned(); + drop(loaded_chunks_read); + match possibly_loaded_chunk { + Some(chunk) => { + let chunk = chunk.clone(); + channel.send(chunk).await.unwrap(); + } + None => { + let chunk_data = match save_file { + Some(save_file) => { + match chunk_reader.read_chunk(&save_file, &chunk_location) { + Ok(data) => Ok(Arc::new(RwLock::new(data))), + Err( + ChunkReadingError::ChunkNotExist + | ChunkReadingError::ParsingError( + ChunkParsingError::ChunkNotGenerated, + ), + ) => { + // This chunk was not generated yet. + let chunk = Arc::new(RwLock::new( + world_gen.generate_chunk(chunk_location), + )); + let mut loaded_chunks = loaded_chunks.write().await; + loaded_chunks.insert(chunk_location, chunk.clone()); + drop(loaded_chunks); + Ok(chunk) + } + Err(err) => Err(err), + } + } + None => { + // There is no savefile yet -> generate the chunks + let chunk = Arc::new(RwLock::new( + world_gen.generate_chunk(chunk_location), + )); - pub fn fetch_chunks(&self, chunks: &[Vector2], channel: mpsc::Sender>) { - chunks.into_par_iter().for_each(|at| { - let channel = channel.clone(); - - let maybe_chunk = { - let loaded_chunks = self.loaded_chunks.read(); - loaded_chunks.get(at).cloned() - } - .or_else(|| { - let chunk_data = match &self.save_file { - Some(save_file) => { - match self.chunk_reader.read_chunk(save_file, at) { - Ok(data) => Ok(Arc::new(data)), - Err( - ChunkReadingError::ChunkNotExist - | ChunkReadingError::ParsingError( - ChunkParsingError::ChunkNotGenerated, - ), - ) => { - // This chunk was not generated yet. - let chunk = Arc::new(self.world_gen.generate_chunk(*at)); - Ok(chunk) + let mut loaded_chunks = loaded_chunks.write().await; + loaded_chunks.insert(chunk_location, chunk.clone()); + Ok(chunk) + } + }; + match chunk_data { + Ok(data) => channel.send(data).await.unwrap(), + Err(err) => { + log::warn!( + "Failed to read chunk {:?}: {:?}", + chunk_location, + err + ); + } } - Err(err) => Err(err), } } - None => { - // There is no savefile yet -> generate the chunks - let chunk = Arc::new(self.world_gen.generate_chunk(*at)); - Ok(chunk) - } - }; - match chunk_data { - Ok(data) => Some(data), - Err(err) => { - // TODO: Panic here? - log::warn!("Failed to read chunk {:?}: {:?}", at, err); - None - } - } - }); - match maybe_chunk { - Some(chunk) => { - channel - .blocking_send(chunk.clone()) - .expect("Failed sending ChunkData."); - } - None => { - log::error!("Unable to send chunk {:?}!", at); - } - }; - }) + }); + } + } } } diff --git a/pumpkin/src/client/player_packet.rs b/pumpkin/src/client/player_packet.rs index feec47f28..30bd72518 100644 --- a/pumpkin/src/client/player_packet.rs +++ b/pumpkin/src/client/player_packet.rs @@ -20,7 +20,7 @@ use pumpkin_protocol::{ client::play::{ Animation, CAcknowledgeBlockChange, CBlockUpdate, CEntityAnimation, CEntityVelocity, CHeadRot, CHurtAnimation, CPingResponse, CPlayerChatMessage, CUpdateEntityPos, - CUpdateEntityPosRot, CUpdateEntityRot, CWorldEvent, FilterType, + CUpdateEntityPosRot, CUpdateEntityRot, FilterType, }, server::play::{ Action, ActionType, SChatCommand, SChatMessage, SClientInformationPlay, SConfirmTeleport, @@ -486,13 +486,7 @@ impl Player { // TODO: currently this is always dirt replace it let entity = &self.living_entity.entity; let world = &entity.world; - world - .broadcast_packet_all(&CWorldEvent::new(2001, &location, 11, false)) - .await; - // AIR - world - .broadcast_packet_all(&CBlockUpdate::new(&location, 0.into())) - .await; + world.break_block(location).await; } } Status::CancelledDigging => { @@ -522,13 +516,7 @@ impl Player { // TODO: currently this is always dirt replace it let entity = &self.living_entity.entity; let world = &entity.world; - world - .broadcast_packet_all(&CWorldEvent::new(2001, &location, 11, false)) - .await; - // AIR - world - .broadcast_packet_all(&CBlockUpdate::new(&location, 0.into())) - .await; + world.break_block(location).await; // TODO: Send this every tick self.client .send_packet(&CAcknowledgeBlockChange::new(player_action.sequence)) diff --git a/pumpkin/src/world/mod.rs b/pumpkin/src/world/mod.rs index 7aec512b8..607244cdd 100644 --- a/pumpkin/src/world/mod.rs +++ b/pumpkin/src/world/mod.rs @@ -1,3 +1,4 @@ +use std::ops::Deref; use std::{collections::HashMap, sync::Arc}; pub mod player_chunker; @@ -6,10 +7,12 @@ use crate::{ client::Client, entity::{player::Player, Entity}, }; -use num_traits::ToPrimitive; +use num_traits::{Euclid, ToPrimitive}; use pumpkin_config::BasicConfiguration; +use pumpkin_core::math::position::WorldPosition; use pumpkin_core::math::vector2::Vector2; use pumpkin_entity::{entity_type::EntityType, EntityId}; +use pumpkin_protocol::client::play::{CBlockUpdate, CWorldEvent}; use pumpkin_protocol::{ client::play::{ CChunkData, CGameEvent, CLogin, CPlayerAbilities, CPlayerInfoUpdate, CRemoveEntities, @@ -17,6 +20,8 @@ use pumpkin_protocol::{ }, ClientPacket, VarInt, }; +use pumpkin_world::block::BlockId; +use pumpkin_world::coordinates::ChunkRelativeBlockCoordinates; use pumpkin_world::level::Level; use scoreboard::Scoreboard; use tokio::sync::mpsc; @@ -273,12 +278,12 @@ impl World { pub async fn mark_chunks_as_not_watched(&self, chunks: &[Vector2]) { let level = self.level.lock().await; - level.mark_chunk_as_not_watched_and_clean(chunks); + level.mark_chunk_as_not_watched_and_clean(chunks).await; } pub async fn mark_chunks_as_watched(&self, chunks: &[Vector2]) { let level = self.level.lock().await; - level.mark_chunk_as_newly_watched(chunks); + level.mark_chunk_as_newly_watched(chunks).await; } fn spawn_world_chunks(&self, client: Arc, chunks: Vec>, distance: i32) { @@ -293,23 +298,24 @@ impl World { let (sender, mut chunk_receiver) = mpsc::channel(distance as usize); let client_id = client.id; - let level = self.level.clone(); - let chunks = Arc::new(chunks); - tokio::spawn(async move { - log::debug!("Spawned chunk fetcher for {}", client_id); - let level = level.lock().await; - level.fetch_chunks(&chunks, sender); - }); + { + let level = self.level.clone(); + tokio::spawn(async move { + log::debug!("Spawned chunk fetcher for {}", client_id); + let level = level.lock().await; + level.fetch_chunks(&chunks, sender); + }); + } tokio::spawn(async move { log::debug!("Spawned chunk sender for {}", client_id); while let Some(chunk_data) = chunk_receiver.recv().await { - // dbg!(chunk_pos); + let chunk_data = chunk_data.read().await.deref().clone(); #[cfg(debug_assertions)] if chunk_data.position == (0, 0).into() { use pumpkin_protocol::bytebuf::ByteBuffer; let mut test = ByteBuffer::empty(); - CChunkData(&chunk_data).write(&mut test); + CChunkData(chunk_data.clone()).write(&mut test); let len = test.buf().len(); log::debug!( "Chunk packet size: {}B {}KB {}MB", @@ -318,11 +324,12 @@ impl World { len / (1024 * 1024) ); } + let packet = CChunkData(chunk_data); // TODO: Queue player packs in a queue so we don't need to check if its closed before // sending if !client.closed.load(std::sync::atomic::Ordering::Relaxed) { - client.send_packet(&CChunkData(&chunk_data)).await; + client.send_packet(&packet).await; } } @@ -374,4 +381,39 @@ impl World { self.broadcast_packet_all(&CRemoveEntities::new(&[entity.entity_id.into()])) .await; } + pub async fn set_block(&self, position: WorldPosition, block_id: BlockId) { + let (z_chunk, z_rem) = position.0.z.div_rem_euclid(&16); + let (x_chunk, x_rem) = position.0.x.div_rem_euclid(&16); + let chunk_coordinate = Vector2 { + x: x_chunk, + z: z_chunk, + }; + + // Since we divide by 16 remnant can never exceed u8 + let relative = ChunkRelativeBlockCoordinates { + x: (x_rem as u8).into(), + z: (z_rem as u8).into(), + y: position.0.y.into(), + }; + + let (sender, mut receive) = mpsc::channel(1024); + { + let level = self.level.clone(); + tokio::spawn( + async move { level.lock().await.fetch_chunks(&[chunk_coordinate], sender) }, + ); + } + if let Some(data) = receive.recv().await { + data.write().await.blocks.set_block(relative, block_id); + self.broadcast_packet_all(&CBlockUpdate::new(&position, (block_id.data as i32).into())) + .await; + } + } + + pub async fn break_block(&self, position: WorldPosition) { + self.set_block(position, BlockId { data: 0 }).await; + + self.broadcast_packet_all(&CWorldEvent::new(2001, &position, 11, false)) + .await; + } } From 843a1d751915d3ecc7c0cfb050bdcebbdd2ff6e5 Mon Sep 17 00:00:00 2001 From: Edvin Bryntesson Date: Sun, 20 Oct 2024 16:25:31 +0200 Subject: [PATCH 3/9] add block placing --- pumpkin/src/client/player_packet.rs | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/pumpkin/src/client/player_packet.rs b/pumpkin/src/client/player_packet.rs index 30bd72518..3d893018d 100644 --- a/pumpkin/src/client/player_packet.rs +++ b/pumpkin/src/client/player_packet.rs @@ -8,8 +8,9 @@ use crate::{ }; use num_traits::FromPrimitive; use pumpkin_config::ADVANCED_CONFIG; +use pumpkin_core::math::position::WorldPosition; use pumpkin_core::{ - math::{position::WorldPosition, vector3::Vector3, wrap_degrees}, + math::{vector3::Vector3, wrap_degrees}, text::TextComponent, GameMode, }; @@ -18,9 +19,9 @@ use pumpkin_inventory::{InventoryError, WindowType}; use pumpkin_protocol::server::play::{SCloseContainer, SKeepAlive, SSetPlayerGround, SUseItem}; use pumpkin_protocol::{ client::play::{ - Animation, CAcknowledgeBlockChange, CBlockUpdate, CEntityAnimation, CEntityVelocity, - CHeadRot, CHurtAnimation, CPingResponse, CPlayerChatMessage, CUpdateEntityPos, - CUpdateEntityPosRot, CUpdateEntityRot, FilterType, + Animation, CAcknowledgeBlockChange, CEntityAnimation, CEntityVelocity, CHeadRot, + CHurtAnimation, CPingResponse, CPlayerChatMessage, CUpdateEntityPos, CUpdateEntityPosRot, + CUpdateEntityRot, FilterType, }, server::play::{ Action, ActionType, SChatCommand, SChatMessage, SClientInformationPlay, SConfirmTeleport, @@ -29,7 +30,7 @@ use pumpkin_protocol::{ SSwingArm, SUseItemOn, Status, }, }; -use pumpkin_world::block::{BlockFace, BlockState}; +use pumpkin_world::block::{BlockFace, BlockId, BlockState}; use pumpkin_world::global_registry; use super::PlayerConfig; @@ -580,17 +581,14 @@ impl Player { if let Ok(block_state_id) = BlockState::new(minecraft_id, None) { let entity = &self.living_entity.entity; let world = &entity.world; + world - .broadcast_packet_all(&CBlockUpdate::new( - &location, - block_state_id.get_id_mojang_repr().into(), - )) - .await; - world - .broadcast_packet_all(&CBlockUpdate::new( - &WorldPosition(location.0 + face.to_offset()), - block_state_id.get_id_mojang_repr().into(), - )) + .set_block( + WorldPosition(location.0 + face.to_offset()), + BlockId { + data: block_state_id.get_id_mojang_repr() as u16, + }, + ) .await; } } From d7b162e68e3b7e44ff48d32dd81291947b1bbc85 Mon Sep 17 00:00:00 2001 From: Edvin Bryntesson Date: Sun, 20 Oct 2024 16:31:44 +0200 Subject: [PATCH 4/9] remove unnecessary cloning --- pumpkin-protocol/src/client/play/c_chunk_data.rs | 4 ++-- pumpkin-world/src/chunk/mod.rs | 2 -- pumpkin/src/world/mod.rs | 7 +++---- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/pumpkin-protocol/src/client/play/c_chunk_data.rs b/pumpkin-protocol/src/client/play/c_chunk_data.rs index 85c12effd..0a68eace2 100644 --- a/pumpkin-protocol/src/client/play/c_chunk_data.rs +++ b/pumpkin-protocol/src/client/play/c_chunk_data.rs @@ -6,9 +6,9 @@ use pumpkin_macros::packet; use pumpkin_world::{chunk::ChunkData, DIRECT_PALETTE_BITS}; #[packet(0x27)] -pub struct CChunkData(pub ChunkData); +pub struct CChunkData<'a>(pub &'a ChunkData); -impl ClientPacket for CChunkData { +impl<'a> ClientPacket for CChunkData<'a> { fn write(&self, buf: &mut crate::bytebuf::ByteBuffer) { // Chunk X buf.put_i32(self.0.position.x); diff --git a/pumpkin-world/src/chunk/mod.rs b/pumpkin-world/src/chunk/mod.rs index 9136de62b..f6c71604d 100644 --- a/pumpkin-world/src/chunk/mod.rs +++ b/pumpkin-world/src/chunk/mod.rs @@ -54,12 +54,10 @@ pub enum CompressionError { LZ4Error(std::io::Error), } -#[derive(Clone)] pub struct ChunkData { pub blocks: ChunkBlocks, pub position: Vector2, } -#[derive(Clone)] pub struct ChunkBlocks { // TODO make this a Vec that doesn't store the upper layers that only contain air diff --git a/pumpkin/src/world/mod.rs b/pumpkin/src/world/mod.rs index 607244cdd..2d303deda 100644 --- a/pumpkin/src/world/mod.rs +++ b/pumpkin/src/world/mod.rs @@ -1,4 +1,3 @@ -use std::ops::Deref; use std::{collections::HashMap, sync::Arc}; pub mod player_chunker; @@ -310,12 +309,13 @@ impl World { tokio::spawn(async move { log::debug!("Spawned chunk sender for {}", client_id); while let Some(chunk_data) = chunk_receiver.recv().await { - let chunk_data = chunk_data.read().await.deref().clone(); + let chunk_data = chunk_data.read().await; + let packet = CChunkData(&chunk_data); #[cfg(debug_assertions)] if chunk_data.position == (0, 0).into() { use pumpkin_protocol::bytebuf::ByteBuffer; let mut test = ByteBuffer::empty(); - CChunkData(chunk_data.clone()).write(&mut test); + packet.write(&mut test); let len = test.buf().len(); log::debug!( "Chunk packet size: {}B {}KB {}MB", @@ -324,7 +324,6 @@ impl World { len / (1024 * 1024) ); } - let packet = CChunkData(chunk_data); // TODO: Queue player packs in a queue so we don't need to check if its closed before // sending From 4338bcff1641da9a02692e51f3329cf25c2aea65 Mon Sep 17 00:00:00 2001 From: Edvin Bryntesson Date: Mon, 21 Oct 2024 15:10:09 +0200 Subject: [PATCH 5/9] refactor things out into functions --- pumpkin-core/src/math/position.rs | 24 +++++++++++- pumpkin-world/src/coordinates.rs | 14 ++++++- pumpkin/src/world/mod.rs | 58 ++++++++++++----------------- pumpkin/src/world/player_chunker.rs | 7 +++- 4 files changed, 63 insertions(+), 40 deletions(-) diff --git a/pumpkin-core/src/math/position.rs b/pumpkin-core/src/math/position.rs index cd1f7ce26..7c51dff45 100644 --- a/pumpkin-core/src/math/position.rs +++ b/pumpkin-core/src/math/position.rs @@ -1,13 +1,33 @@ +use super::vector3::Vector3; use std::fmt; +use crate::math::vector2::Vector2; +use num_traits::Euclid; use serde::{Deserialize, Serialize}; -use super::vector3::Vector3; - #[derive(Clone, Copy)] /// Aka Block Position pub struct WorldPosition(pub Vector3); +impl WorldPosition { + pub fn chunk_and_chunk_relative_position(&self) -> (Vector2, Vector3) { + let (z_chunk, z_rem) = self.0.z.div_rem_euclid(&16); + let (x_chunk, x_rem) = self.0.x.div_rem_euclid(&16); + let chunk_coordinate = Vector2 { + x: x_chunk, + z: z_chunk, + }; + + // Since we divide by 16 remnant can never exceed u8 + let relative = Vector3 { + x: x_rem, + z: z_rem, + + y: self.0.y, + }; + (chunk_coordinate, relative) + } +} impl Serialize for WorldPosition { fn serialize(&self, serializer: S) -> Result where diff --git a/pumpkin-world/src/coordinates.rs b/pumpkin-world/src/coordinates.rs index fed4a49c4..f35c11453 100644 --- a/pumpkin-world/src/coordinates.rs +++ b/pumpkin-world/src/coordinates.rs @@ -1,12 +1,12 @@ use std::ops::Deref; +use crate::{WORLD_LOWEST_Y, WORLD_MAX_Y}; use derive_more::derive::{AsMut, AsRef, Display, Into}; use num_traits::{PrimInt, Signed, Unsigned}; use pumpkin_core::math::vector2::Vector2; +use pumpkin_core::math::vector3::Vector3; use serde::{Deserialize, Serialize}; -use crate::{WORLD_LOWEST_Y, WORLD_MAX_Y}; - #[derive( Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, AsRef, AsMut, Into, Display, )] @@ -130,3 +130,13 @@ impl ChunkRelativeXZBlockCoordinates { } } } + +impl From> for ChunkRelativeBlockCoordinates { + fn from(value: Vector3) -> Self { + Self { + x: (value.x as u8).into(), + z: (value.z as u8).into(), + y: value.y.into(), + } + } +} diff --git a/pumpkin/src/world/mod.rs b/pumpkin/src/world/mod.rs index 2d303deda..8f5f74f62 100644 --- a/pumpkin/src/world/mod.rs +++ b/pumpkin/src/world/mod.rs @@ -6,7 +6,7 @@ use crate::{ client::Client, entity::{player::Player, Entity}, }; -use num_traits::{Euclid, ToPrimitive}; +use num_traits::ToPrimitive; use pumpkin_config::BasicConfiguration; use pumpkin_core::math::position::WorldPosition; use pumpkin_core::math::vector2::Vector2; @@ -20,11 +20,12 @@ use pumpkin_protocol::{ ClientPacket, VarInt, }; use pumpkin_world::block::BlockId; +use pumpkin_world::chunk::ChunkData; use pumpkin_world::coordinates::ChunkRelativeBlockCoordinates; use pumpkin_world::level::Level; use scoreboard::Scoreboard; -use tokio::sync::mpsc; use tokio::sync::Mutex; +use tokio::sync::{mpsc, RwLock}; pub mod scoreboard; @@ -285,7 +286,7 @@ impl World { level.mark_chunk_as_newly_watched(chunks).await; } - fn spawn_world_chunks(&self, client: Arc, chunks: Vec>, distance: i32) { + async fn spawn_world_chunks(&self, client: Arc, chunks: Vec>) { if client.closed.load(std::sync::atomic::Ordering::Relaxed) { log::info!( "The connection with {} has closed before world chunks were spawned", @@ -294,21 +295,10 @@ impl World { return; } let inst = std::time::Instant::now(); - let (sender, mut chunk_receiver) = mpsc::channel(distance as usize); - let client_id = client.id; - - { - let level = self.level.clone(); - tokio::spawn(async move { - log::debug!("Spawned chunk fetcher for {}", client_id); - let level = level.lock().await; - level.fetch_chunks(&chunks, sender); - }); - } + let chunks = self.get_chunks(chunks).await; tokio::spawn(async move { - log::debug!("Spawned chunk sender for {}", client_id); - while let Some(chunk_data) = chunk_receiver.recv().await { + for chunk_data in chunks { let chunk_data = chunk_data.read().await; let packet = CChunkData(&chunk_data); #[cfg(debug_assertions)] @@ -381,32 +371,32 @@ impl World { .await; } pub async fn set_block(&self, position: WorldPosition, block_id: BlockId) { - let (z_chunk, z_rem) = position.0.z.div_rem_euclid(&16); - let (x_chunk, x_rem) = position.0.x.div_rem_euclid(&16); - let chunk_coordinate = Vector2 { - x: x_chunk, - z: z_chunk, - }; + let (chunk_coordinate, relative_coordinates) = position.chunk_and_chunk_relative_position(); // Since we divide by 16 remnant can never exceed u8 - let relative = ChunkRelativeBlockCoordinates { - x: (x_rem as u8).into(), - z: (z_rem as u8).into(), - y: position.0.y.into(), - }; + let relative = ChunkRelativeBlockCoordinates::from(relative_coordinates); + + let chunk = self.get_chunks(vec![chunk_coordinate]).await[0].clone(); + chunk.write().await.blocks.set_block(relative, block_id); + self.broadcast_packet_all(&CBlockUpdate::new( + &position, + i32::from(block_id.data).into(), + )) + .await; + } + + pub async fn get_chunks(&self, chunks: Vec>) -> Vec>> { let (sender, mut receive) = mpsc::channel(1024); { let level = self.level.clone(); - tokio::spawn( - async move { level.lock().await.fetch_chunks(&[chunk_coordinate], sender) }, - ); + tokio::spawn(async move { level.lock().await.fetch_chunks(&chunks, sender) }); } - if let Some(data) = receive.recv().await { - data.write().await.blocks.set_block(relative, block_id); - self.broadcast_packet_all(&CBlockUpdate::new(&position, (block_id.data as i32).into())) - .await; + let mut received = vec![]; + while let Some(chunk) = receive.recv().await { + received.push(chunk); } + received } pub async fn break_block(&self, position: WorldPosition) { diff --git a/pumpkin/src/world/player_chunker.rs b/pumpkin/src/world/player_chunker.rs index 3965ba560..7842245f9 100644 --- a/pumpkin/src/world/player_chunker.rs +++ b/pumpkin/src/world/player_chunker.rs @@ -74,7 +74,9 @@ pub async fn player_join(world: &World, player: Arc) { if !loading_chunks.is_empty() { world.mark_chunks_as_watched(&loading_chunks).await; - world.spawn_world_chunks(player.client.clone(), loading_chunks, view_distance); + world + .spawn_world_chunks(player.client.clone(), loading_chunks) + .await; } if !unloading_chunks.is_empty() { @@ -134,7 +136,8 @@ pub async fn update_position(player: &Player) { entity.world.mark_chunks_as_watched(&loading_chunks).await; entity .world - .spawn_world_chunks(player.client.clone(), loading_chunks, view_distance); + .spawn_world_chunks(player.client.clone(), loading_chunks) + .await; } if !unloading_chunks.is_empty() { From f88ce46480a27a5f33865cd3fe30dd76bdee890d Mon Sep 17 00:00:00 2001 From: Edvin Bryntesson Date: Mon, 21 Oct 2024 15:56:08 +0200 Subject: [PATCH 6/9] add get_block function --- pumpkin/src/world/mod.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pumpkin/src/world/mod.rs b/pumpkin/src/world/mod.rs index 8f5f74f62..9f6d559ad 100644 --- a/pumpkin/src/world/mod.rs +++ b/pumpkin/src/world/mod.rs @@ -405,4 +405,15 @@ impl World { self.broadcast_packet_all(&CWorldEvent::new(2001, &position, 11, false)) .await; } + + pub async fn get_block(&self, position: WorldPosition) -> BlockId { + let (chunk, relative) = position.chunk_and_chunk_relative_position(); + let relative = ChunkRelativeBlockCoordinates::from(relative); + self.get_chunks(vec![chunk]).await[0] + .clone() + .read() + .await + .blocks + .get_block(relative) + } } From e9e0bb7102b3dc7b5c26f599b7e3616851f57247 Mon Sep 17 00:00:00 2001 From: Edvin Bryntesson Date: Mon, 21 Oct 2024 15:59:18 +0200 Subject: [PATCH 7/9] make buffer to chunks length --- pumpkin/src/world/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pumpkin/src/world/mod.rs b/pumpkin/src/world/mod.rs index 9f6d559ad..68e44613e 100644 --- a/pumpkin/src/world/mod.rs +++ b/pumpkin/src/world/mod.rs @@ -387,7 +387,7 @@ impl World { } pub async fn get_chunks(&self, chunks: Vec>) -> Vec>> { - let (sender, mut receive) = mpsc::channel(1024); + let (sender, mut receive) = mpsc::channel(chunks.len()); { let level = self.level.clone(); tokio::spawn(async move { level.lock().await.fetch_chunks(&chunks, sender) }); From b0009d1afe9a9409aff77bea00b69fc60aa2513d Mon Sep 17 00:00:00 2001 From: Edvin Bryntesson Date: Mon, 21 Oct 2024 22:25:28 +0200 Subject: [PATCH 8/9] make not blocking --- pumpkin/src/world/mod.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/pumpkin/src/world/mod.rs b/pumpkin/src/world/mod.rs index 68e44613e..f1d0e0023 100644 --- a/pumpkin/src/world/mod.rs +++ b/pumpkin/src/world/mod.rs @@ -392,11 +392,16 @@ impl World { let level = self.level.clone(); tokio::spawn(async move { level.lock().await.fetch_chunks(&chunks, sender) }); } - let mut received = vec![]; - while let Some(chunk) = receive.recv().await { - received.push(chunk); - } - received + tokio::spawn(async move { + let mut received = vec![]; + + while let Some(chunk) = receive.recv().await { + received.push(chunk); + } + received + }) + .await + .unwrap() } pub async fn break_block(&self, position: WorldPosition) { From 43116c54e9163b3f6aa2e3218dcc26a11e9c5bd1 Mon Sep 17 00:00:00 2001 From: kralverde <80051564+kralverde@users.noreply.github.com> Date: Tue, 22 Oct 2024 05:27:40 -0400 Subject: [PATCH 9/9] fix tick system (#169) * fix tick system * also async wait on stdin --- Cargo.toml | 1 + pumpkin/src/main.rs | 8 ++++++-- pumpkin/src/server/ticker.rs | 4 +++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d32f8e1a0..eacd58092 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ tokio = { version = "1.40", features = [ "net", "rt-multi-thread", "sync", + "io-std", ] } # Concurrency/Parallelism and Synchronization diff --git a/pumpkin/src/main.rs b/pumpkin/src/main.rs index 74c51e921..d6e5cde48 100644 --- a/pumpkin/src/main.rs +++ b/pumpkin/src/main.rs @@ -30,6 +30,7 @@ use log::LevelFilter; use client::Client; use server::{ticker::Ticker, Server}; use std::io::{self}; +use tokio::io::{AsyncBufReadExt, BufReader}; // Setup some tokens to allow us to identify which event is for which socket. @@ -140,11 +141,14 @@ async fn main() -> io::Result<()> { if use_console { let server = server.clone(); tokio::spawn(async move { - let stdin = std::io::stdin(); + let stdin = tokio::io::stdin(); + let mut reader = BufReader::new(stdin); loop { let mut out = String::new(); - stdin + + reader .read_line(&mut out) + .await .expect("Failed to read console line"); if !out.is_empty() { diff --git a/pumpkin/src/server/ticker.rs b/pumpkin/src/server/ticker.rs index a8395b6a6..ea7d32847 100644 --- a/pumpkin/src/server/ticker.rs +++ b/pumpkin/src/server/ticker.rs @@ -1,5 +1,7 @@ use std::time::{Duration, Instant}; +use tokio::time::sleep; + use super::Server; pub struct Ticker { @@ -28,7 +30,7 @@ impl Ticker { } else { // Wait for the remaining time until the next tick let sleep_time = self.tick_interval - elapsed; - std::thread::sleep(sleep_time); + sleep(sleep_time).await; } } }