diff --git a/Cargo.toml b/Cargo.toml index a1dc7459..e31cdf61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ tokio = { version = "1.40", features = [ "net", "rt-multi-thread", "sync", + "io-std", ] } thiserror = "1.0" diff --git a/pumpkin-core/src/math/position.rs b/pumpkin-core/src/math/position.rs index cd1f7ce2..7c51dff4 100644 --- a/pumpkin-core/src/math/position.rs +++ b/pumpkin-core/src/math/position.rs @@ -1,13 +1,33 @@ +use super::vector3::Vector3; use std::fmt; +use crate::math::vector2::Vector2; +use num_traits::Euclid; use serde::{Deserialize, Serialize}; -use super::vector3::Vector3; - #[derive(Clone, Copy)] /// Aka Block Position pub struct WorldPosition(pub Vector3); +impl WorldPosition { + pub fn chunk_and_chunk_relative_position(&self) -> (Vector2, Vector3) { + let (z_chunk, z_rem) = self.0.z.div_rem_euclid(&16); + let (x_chunk, x_rem) = self.0.x.div_rem_euclid(&16); + let chunk_coordinate = Vector2 { + x: x_chunk, + z: z_chunk, + }; + + // Since we divide by 16 remnant can never exceed u8 + let relative = Vector3 { + x: x_rem, + z: z_rem, + + y: self.0.y, + }; + (chunk_coordinate, relative) + } +} impl Serialize for WorldPosition { fn serialize(&self, serializer: S) -> Result where diff --git a/pumpkin-world/src/chunk/anvil.rs b/pumpkin-world/src/chunk/anvil.rs index 983b3fea..06705fb0 100644 --- a/pumpkin-world/src/chunk/anvil.rs +++ b/pumpkin-world/src/chunk/anvil.rs @@ -10,6 +10,7 @@ use crate::level::SaveFile; use super::{ChunkData, ChunkReader, ChunkReadingError, CompressionError}; +#[derive(Clone)] pub struct AnvilChunkReader {} impl Default for AnvilChunkReader { diff --git a/pumpkin-world/src/chunk/mod.rs b/pumpkin-world/src/chunk/mod.rs index c75de68a..f6c71604 100644 --- a/pumpkin-world/src/chunk/mod.rs +++ b/pumpkin-world/src/chunk/mod.rs @@ -58,7 +58,6 @@ pub struct ChunkData { pub blocks: ChunkBlocks, pub position: Vector2, } - pub struct ChunkBlocks { // TODO make this a Vec that doesn't store the upper layers that only contain air diff --git a/pumpkin-world/src/coordinates.rs b/pumpkin-world/src/coordinates.rs index fed4a49c..f35c1145 100644 --- a/pumpkin-world/src/coordinates.rs +++ b/pumpkin-world/src/coordinates.rs @@ -1,12 +1,12 @@ use std::ops::Deref; +use crate::{WORLD_LOWEST_Y, WORLD_MAX_Y}; use derive_more::derive::{AsMut, AsRef, Display, Into}; use num_traits::{PrimInt, Signed, Unsigned}; use pumpkin_core::math::vector2::Vector2; +use pumpkin_core::math::vector3::Vector3; use serde::{Deserialize, Serialize}; -use crate::{WORLD_LOWEST_Y, WORLD_MAX_Y}; - #[derive( Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, AsRef, AsMut, Into, Display, )] @@ -130,3 +130,13 @@ impl ChunkRelativeXZBlockCoordinates { } } } + +impl From> for ChunkRelativeBlockCoordinates { + fn from(value: Vector3) -> Self { + Self { + x: (value.x as u8).into(), + z: (value.z as u8).into(), + y: value.y.into(), + } + } +} diff --git a/pumpkin-world/src/level.rs b/pumpkin-world/src/level.rs index 5cfae743..b4207583 100644 --- a/pumpkin-world/src/level.rs +++ b/pumpkin-world/src/level.rs @@ -1,16 +1,16 @@ use std::{collections::HashMap, path::PathBuf, sync::Arc}; -use parking_lot::{Mutex, RwLock}; -use pumpkin_core::math::vector2::Vector2; -use rayon::prelude::*; -use tokio::sync::mpsc; - use crate::{ chunk::{ anvil::AnvilChunkReader, ChunkData, ChunkParsingError, ChunkReader, ChunkReadingError, }, world_gen::{get_world_gen, Seed, WorldGenerator}, }; +use pumpkin_core::math::vector2::Vector2; +use tokio::sync::mpsc; +use tokio::sync::{Mutex, RwLock}; + +type RAMChunkStorage = Arc, Arc>>>>; /// The `Level` module provides functionality for working with chunks within or outside a Minecraft world. /// @@ -23,12 +23,12 @@ use crate::{ /// For more details on world generation, refer to the `WorldGenerator` module. pub struct Level { save_file: Option, - loaded_chunks: Arc, Arc>>>, + loaded_chunks: RAMChunkStorage, chunk_watchers: Arc, usize>>>, - chunk_reader: Box, - world_gen: Box, + chunk_reader: Arc>, + world_gen: Arc>, } - +#[derive(Clone)] pub struct SaveFile { #[expect(dead_code)] root_folder: PathBuf, @@ -37,7 +37,7 @@ pub struct SaveFile { impl Level { pub fn from_root_folder(root_folder: PathBuf) -> Self { - let world_gen = get_world_gen(Seed(0)); // TODO Read Seed from config. + let world_gen = get_world_gen(Seed(0)).into(); // TODO Read Seed from config. if root_folder.exists() { let region_folder = root_folder.join("region"); @@ -52,7 +52,7 @@ impl Level { root_folder, region_folder, }), - chunk_reader: Box::new(AnvilChunkReader::new()), + chunk_reader: Arc::new(Box::new(AnvilChunkReader::new())), loaded_chunks: Arc::new(RwLock::new(HashMap::new())), chunk_watchers: Arc::new(Mutex::new(HashMap::new())), } @@ -64,7 +64,7 @@ impl Level { Self { world_gen, save_file: None, - chunk_reader: Box::new(AnvilChunkReader::new()), + chunk_reader: Arc::new(Box::new(AnvilChunkReader::new())), loaded_chunks: Arc::new(RwLock::new(HashMap::new())), chunk_watchers: Arc::new(Mutex::new(HashMap::new())), } @@ -76,8 +76,8 @@ impl Level { /// Marks chunks as "watched" by a unique player. When no players are watching a chunk, /// it is removed from memory. Should only be called on chunks the player was not watching /// before - pub fn mark_chunk_as_newly_watched(&self, chunks: &[Vector2]) { - let mut watchers = self.chunk_watchers.lock(); + pub async fn mark_chunk_as_newly_watched(&self, chunks: &[Vector2]) { + let mut watchers = self.chunk_watchers.lock().await; for chunk in chunks { match watchers.entry(*chunk) { std::collections::hash_map::Entry::Occupied(mut occupied) => { @@ -93,9 +93,9 @@ impl Level { /// Marks chunks no longer "watched" by a unique player. When no players are watching a chunk, /// it is removed from memory. Should only be called on chunks the player was watching before - pub fn mark_chunk_as_not_watched_and_clean(&self, chunks: &[Vector2]) { + pub async fn mark_chunk_as_not_watched_and_clean(&self, chunks: &[Vector2]) { let dropped_chunks = { - let mut watchers = self.chunk_watchers.lock(); + let mut watchers = self.chunk_watchers.lock().await; chunks .iter() .filter(|chunk| match watchers.entry(**chunk) { @@ -119,18 +119,18 @@ impl Level { }) .collect::>() }; - let mut loaded_chunks = self.loaded_chunks.write(); + let mut loaded_chunks = self.loaded_chunks.write().await; let dropped_chunk_data = dropped_chunks .iter() .filter_map(|chunk| { - log::debug!("Unloading chunk {:?}", chunk); + //log::debug!("Unloading chunk {:?}", chunk); loaded_chunks.remove_entry(*chunk) }) .collect(); self.write_chunks(dropped_chunk_data); } - pub fn write_chunks(&self, _chunks_to_write: Vec<(Vector2, Arc)>) { + pub fn write_chunks(&self, _chunks_to_write: Vec<(Vector2, Arc>)>) { //TODO } @@ -138,58 +138,76 @@ impl Level { /// MUST be called from a tokio runtime thread /// /// Note: The order of the output chunks will almost never be in the same order as the order of input chunks + pub fn fetch_chunks( + &self, + chunks: &[Vector2], + channel: mpsc::Sender>>, + ) { + for chunk in chunks { + { + let chunk_location = *chunk; + let channel = channel.clone(); + let loaded_chunks = self.loaded_chunks.clone(); + let chunk_reader = self.chunk_reader.clone(); + let save_file = self.save_file.clone(); + let world_gen = self.world_gen.clone(); + tokio::spawn(async move { + let loaded_chunks_read = loaded_chunks.read().await; + let possibly_loaded_chunk = loaded_chunks_read.get(&chunk_location).cloned(); + drop(loaded_chunks_read); + match possibly_loaded_chunk { + Some(chunk) => { + let chunk = chunk.clone(); + channel.send(chunk).await.unwrap(); + } + None => { + let chunk_data = match save_file { + Some(save_file) => { + match chunk_reader.read_chunk(&save_file, &chunk_location) { + Ok(data) => Ok(Arc::new(RwLock::new(data))), + Err( + ChunkReadingError::ChunkNotExist + | ChunkReadingError::ParsingError( + ChunkParsingError::ChunkNotGenerated, + ), + ) => { + // This chunk was not generated yet. + let chunk = Arc::new(RwLock::new( + world_gen.generate_chunk(chunk_location), + )); + let mut loaded_chunks = loaded_chunks.write().await; + loaded_chunks.insert(chunk_location, chunk.clone()); + drop(loaded_chunks); + Ok(chunk) + } + Err(err) => Err(err), + } + } + None => { + // There is no savefile yet -> generate the chunks + let chunk = Arc::new(RwLock::new( + world_gen.generate_chunk(chunk_location), + )); - pub fn fetch_chunks(&self, chunks: &[Vector2], channel: mpsc::Sender>) { - chunks.into_par_iter().for_each(|at| { - let channel = channel.clone(); - - let maybe_chunk = { - let loaded_chunks = self.loaded_chunks.read(); - loaded_chunks.get(at).cloned() - } - .or_else(|| { - let chunk_data = match &self.save_file { - Some(save_file) => { - match self.chunk_reader.read_chunk(save_file, at) { - Ok(data) => Ok(Arc::new(data)), - Err( - ChunkReadingError::ChunkNotExist - | ChunkReadingError::ParsingError( - ChunkParsingError::ChunkNotGenerated, - ), - ) => { - // This chunk was not generated yet. - let chunk = Arc::new(self.world_gen.generate_chunk(*at)); - Ok(chunk) + let mut loaded_chunks = loaded_chunks.write().await; + loaded_chunks.insert(chunk_location, chunk.clone()); + Ok(chunk) + } + }; + match chunk_data { + Ok(data) => channel.send(data).await.unwrap(), + Err(err) => { + log::warn!( + "Failed to read chunk {:?}: {:?}", + chunk_location, + err + ); + } } - Err(err) => Err(err), } } - None => { - // There is no savefile yet -> generate the chunks - let chunk = Arc::new(self.world_gen.generate_chunk(*at)); - Ok(chunk) - } - }; - match chunk_data { - Ok(data) => Some(data), - Err(err) => { - // TODO: Panic here? - log::warn!("Failed to read chunk {:?}: {:?}", at, err); - None - } - } - }); - match maybe_chunk { - Some(chunk) => { - channel - .blocking_send(chunk.clone()) - .expect("Failed sending ChunkData."); - } - None => { - log::error!("Unable to send chunk {:?}!", at); - } - }; - }) + }); + } + } } } diff --git a/pumpkin/src/client/client_packet.rs b/pumpkin/src/client/client_packet.rs index fe43d2cb..05e57d5c 100644 --- a/pumpkin/src/client/client_packet.rs +++ b/pumpkin/src/client/client_packet.rs @@ -34,12 +34,16 @@ use super::{authentication::AuthError, Client, PlayerConfig}; /// NEVER TRUST THE CLIENT. HANDLE EVERY ERROR, UNWRAP/EXPECT impl Client { pub async fn handle_handshake(&self, handshake: SHandShake) { - log::debug!("handshake"); let version = handshake.protocol_version.0; self.protocol_version .store(version, std::sync::atomic::Ordering::Relaxed); *self.server_address.lock().await = handshake.server_address; + log::debug!( + "Handshake: id {} is now in state {:?}", + self.id, + &handshake.next_state + ); self.connection_state.store(handshake.next_state); if self.connection_state.load() != ConnectionState::Status { let protocol = version; @@ -56,11 +60,12 @@ impl Client { } pub async fn handle_status_request(&self, server: &Server, _status_request: SStatusRequest) { + log::debug!("Handling status request for id {}", self.id); self.send_packet(&server.get_status()).await; } pub async fn handle_ping_request(&self, ping_request: SStatusPingRequest) { - log::debug!("recieved ping request"); + log::debug!("Handling ping request for id {}", self.id); self.send_packet(&CPingResponse::new(ping_request.payload)) .await; self.close(); @@ -74,7 +79,11 @@ impl Client { } pub async fn handle_login_start(&self, server: &Server, login_start: SLoginStart) { - log::debug!("login start, State {:?}", self.connection_state); + log::debug!( + "login start for id {}, State {:?}", + self.id, + self.connection_state + ); if !Self::is_valid_player_name(&login_start.name) { self.kick("Invalid characters in username").await; @@ -127,13 +136,8 @@ impl Client { server: &Server, encryption_response: SEncryptionResponse, ) { - let shared_secret = match server.decrypt(&encryption_response.shared_secret) { - Ok(shared_secret) => shared_secret, - Err(error) => { - self.kick(&error.to_string()).await; - return; - } - }; + log::debug!("Handling encryption for id {}", self.id); + let shared_secret = server.decrypt(&encryption_response.shared_secret).unwrap(); if let Err(error) = self.set_encryption(Some(&shared_secret)).await { self.kick(&error.to_string()).await; @@ -223,6 +227,7 @@ impl Client { } pub async fn handle_plugin_response(&self, plugin_response: SLoginPluginResponse) { + log::debug!("Handling plugin for id {}", self.id); let velocity_config = &ADVANCED_CONFIG.proxy.velocity; if velocity_config.enabled { let mut address = self.address.lock().await; @@ -246,6 +251,7 @@ impl Client { server: &Server, _login_acknowledged: SLoginAcknowledged, ) { + log::debug!("Handling login acknowledged for id {}", self.id); self.connection_state.store(ConnectionState::Config); self.send_packet(&server.get_branding()).await; @@ -282,7 +288,7 @@ impl Client { &self, client_information: SClientInformationConfig, ) { - log::debug!("got client settings"); + log::debug!("Handling client settings for id {}", self.id); if let (Some(main_hand), Some(chat_mode)) = ( Hand::from_i32(client_information.main_hand.into()), ChatMode::from_i32(client_information.chat_mode.into()), @@ -303,6 +309,7 @@ impl Client { } pub async fn handle_plugin_message(&self, plugin_message: SPluginMessage) { + log::debug!("Handling plugin message for id {}", self.id); if plugin_message.channel.starts_with("minecraft:brand") || plugin_message.channel.starts_with("MC|Brand") { @@ -315,6 +322,7 @@ impl Client { } pub async fn handle_known_packs(&self, server: &Server, _config_acknowledged: SKnownPacks) { + log::debug!("Handling known packs for id {}", self.id); for registry in &server.cached_registry { self.send_packet(&CRegistryData::new( ®istry.registry_id, @@ -329,7 +337,7 @@ impl Client { } pub fn handle_config_acknowledged(&self, _config_acknowledged: &SAcknowledgeFinishConfig) { - log::debug!("config acknowledged"); + log::debug!("Handling config acknowledge for id {}", self.id); self.connection_state.store(ConnectionState::Play); self.make_player .store(true, std::sync::atomic::Ordering::Relaxed); diff --git a/pumpkin/src/client/mod.rs b/pumpkin/src/client/mod.rs index 5bbfc76b..4294bd7e 100644 --- a/pumpkin/src/client/mod.rs +++ b/pumpkin/src/client/mod.rs @@ -175,44 +175,65 @@ impl Client { /// Send a Clientbound Packet to the Client pub async fn send_packet(&self, packet: &P) { + log::debug!("Sending packet with id {} to {}", P::PACKET_ID, self.id); // assert!(!self.closed); let mut enc = self.enc.lock().await; if let Err(error) = enc.append_packet(packet) { self.kick(&error.to_string()).await; return; } - if let Err(error) = self - .connection_writer - .lock() - .await + + let mut writer = self.connection_writer.lock().await; + if let Err(error) = writer .write_all(&enc.take()) .await .map_err(|_| PacketError::ConnectionWrite) { self.kick(&error.to_string()).await; + } else if let Err(error) = writer.flush().await { + log::warn!( + "Failed to flush writer for id {}: {}", + self.id, + error.to_string() + ); } } pub async fn try_send_packet(&self, packet: &P) -> Result<(), PacketError> { // assert!(!self.closed); + log::debug!( + "Trying to send packet with id {} to {}", + P::PACKET_ID, + self.id + ); let mut enc = self.enc.lock().await; enc.append_packet(packet)?; - self.connection_writer - .lock() - .await + + let mut writer = self.connection_writer.lock().await; + writer .write_all(&enc.take()) .await .map_err(|_| PacketError::ConnectionWrite)?; + + writer + .flush() + .await + .map_err(|_| PacketError::ConnectionWrite)?; Ok(()) } /// Processes all packets send by the client pub async fn process_packets(&self, server: &Arc) { - while let Some(mut packet) = self.client_packets_queue.lock().await.pop_front() { + let mut packet_queue = self.client_packets_queue.lock().await; + while let Some(mut packet) = packet_queue.pop_front() { if let Err(error) = self.handle_packet(server, &mut packet).await { let text = format!("Error while reading incoming packet {error}"); - log::error!("{text}"); + log::error!( + "Failed to read incoming packet with id {}: {}", + i32::from(packet.id), + error + ); self.kick(&text).await; }; } @@ -250,15 +271,19 @@ impl Client { &self, packet: &mut RawPacket, ) -> Result<(), DeserializerError> { + log::debug!("Handling handshake group for id {}", self.id); let bytebuf = &mut packet.bytebuf; - if packet.id.0 == SHandShake::PACKET_ID { - self.handle_handshake(SHandShake::read(bytebuf)?).await; - } else { - log::error!( - "Failed to handle packet id {} while in Handshake state", - packet.id.0 - ); - } + match packet.id.0 { + SHandShake::PACKET_ID => { + self.handle_handshake(SHandShake::read(bytebuf)?).await; + } + _ => { + log::error!( + "Failed to handle packet id {} while in Handshake state", + packet.id.0 + ); + } + }; Ok(()) } @@ -267,6 +292,7 @@ impl Client { server: &Arc, packet: &mut RawPacket, ) -> Result<(), DeserializerError> { + log::debug!("Handling status group for id {}", self.id); let bytebuf = &mut packet.bytebuf; match packet.id.0 { SStatusRequest::PACKET_ID => { @@ -283,7 +309,7 @@ impl Client { packet.id.0 ); } - } + }; Ok(()) } @@ -292,6 +318,7 @@ impl Client { server: &Arc, packet: &mut RawPacket, ) -> Result<(), DeserializerError> { + log::debug!("Handling login group for id {}", self.id); let bytebuf = &mut packet.bytebuf; match packet.id.0 { SLoginStart::PACKET_ID => { @@ -316,7 +343,7 @@ impl Client { packet.id.0 ); } - } + }; Ok(()) } @@ -325,6 +352,7 @@ impl Client { server: &Arc, packet: &mut RawPacket, ) -> Result<(), DeserializerError> { + log::debug!("Handling config group for id {}", self.id); let bytebuf = &mut packet.bytebuf; match packet.id.0 { SClientInformationConfig::PACKET_ID => { @@ -348,7 +376,7 @@ impl Client { packet.id.0 ); } - } + }; Ok(()) } @@ -358,26 +386,38 @@ impl Client { pub async fn poll(&self) -> bool { loop { let mut dec = self.dec.lock().await; - if let Ok(Some(packet)) = dec.decode() { - self.add_packet(packet).await; - return true; - }; + + match dec.decode() { + Ok(Some(packet)) => { + self.add_packet(packet).await; + return true; + } + Ok(None) => log::debug!("Waiting for more data to complete packet..."), + Err(err) => log::warn!( + "Failed to decode packet for id {}: {}", + self.id, + err.to_string() + ), + } dec.reserve(4096); let mut buf = dec.take_capacity(); - match self.connection_reader.lock().await.read_buf(&mut buf).await { - Ok(0) => { - self.close(); - return false; + let bytes_read = self.connection_reader.lock().await.read_buf(&mut buf).await; + match bytes_read { + Ok(cnt) => { + log::debug!("Read {} bytes", cnt); + if cnt == 0 { + self.close(); + return false; + } } Err(error) => { log::error!("Error while reading incoming packet {}", error); self.close(); return false; } - _ => {} - } + }; // This should always be an O(1) unsplit because we reserved space earlier and // the call to `read_buf` shouldn't have grown the allocation. @@ -387,7 +427,7 @@ impl Client { /// Kicks the Client with a reason depending on the connection state pub async fn kick(&self, reason: &str) { - log::debug!("Kicking client with reason: {}", reason); + log::info!("Kicking for id {} for {}", self.id, reason); match self.connection_state.load() { ConnectionState::Login => { self.try_send_packet(&CLoginDisconnect::new( diff --git a/pumpkin/src/client/player_packet.rs b/pumpkin/src/client/player_packet.rs index feec47f2..3d893018 100644 --- a/pumpkin/src/client/player_packet.rs +++ b/pumpkin/src/client/player_packet.rs @@ -8,8 +8,9 @@ use crate::{ }; use num_traits::FromPrimitive; use pumpkin_config::ADVANCED_CONFIG; +use pumpkin_core::math::position::WorldPosition; use pumpkin_core::{ - math::{position::WorldPosition, vector3::Vector3, wrap_degrees}, + math::{vector3::Vector3, wrap_degrees}, text::TextComponent, GameMode, }; @@ -18,9 +19,9 @@ use pumpkin_inventory::{InventoryError, WindowType}; use pumpkin_protocol::server::play::{SCloseContainer, SKeepAlive, SSetPlayerGround, SUseItem}; use pumpkin_protocol::{ client::play::{ - Animation, CAcknowledgeBlockChange, CBlockUpdate, CEntityAnimation, CEntityVelocity, - CHeadRot, CHurtAnimation, CPingResponse, CPlayerChatMessage, CUpdateEntityPos, - CUpdateEntityPosRot, CUpdateEntityRot, CWorldEvent, FilterType, + Animation, CAcknowledgeBlockChange, CEntityAnimation, CEntityVelocity, CHeadRot, + CHurtAnimation, CPingResponse, CPlayerChatMessage, CUpdateEntityPos, CUpdateEntityPosRot, + CUpdateEntityRot, FilterType, }, server::play::{ Action, ActionType, SChatCommand, SChatMessage, SClientInformationPlay, SConfirmTeleport, @@ -29,7 +30,7 @@ use pumpkin_protocol::{ SSwingArm, SUseItemOn, Status, }, }; -use pumpkin_world::block::{BlockFace, BlockState}; +use pumpkin_world::block::{BlockFace, BlockId, BlockState}; use pumpkin_world::global_registry; use super::PlayerConfig; @@ -486,13 +487,7 @@ impl Player { // TODO: currently this is always dirt replace it let entity = &self.living_entity.entity; let world = &entity.world; - world - .broadcast_packet_all(&CWorldEvent::new(2001, &location, 11, false)) - .await; - // AIR - world - .broadcast_packet_all(&CBlockUpdate::new(&location, 0.into())) - .await; + world.break_block(location).await; } } Status::CancelledDigging => { @@ -522,13 +517,7 @@ impl Player { // TODO: currently this is always dirt replace it let entity = &self.living_entity.entity; let world = &entity.world; - world - .broadcast_packet_all(&CWorldEvent::new(2001, &location, 11, false)) - .await; - // AIR - world - .broadcast_packet_all(&CBlockUpdate::new(&location, 0.into())) - .await; + world.break_block(location).await; // TODO: Send this every tick self.client .send_packet(&CAcknowledgeBlockChange::new(player_action.sequence)) @@ -592,17 +581,14 @@ impl Player { if let Ok(block_state_id) = BlockState::new(minecraft_id, None) { let entity = &self.living_entity.entity; let world = &entity.world; + world - .broadcast_packet_all(&CBlockUpdate::new( - &location, - block_state_id.get_id_mojang_repr().into(), - )) - .await; - world - .broadcast_packet_all(&CBlockUpdate::new( - &WorldPosition(location.0 + face.to_offset()), - block_state_id.get_id_mojang_repr().into(), - )) + .set_block( + WorldPosition(location.0 + face.to_offset()), + BlockId { + data: block_state_id.get_id_mojang_repr() as u16, + }, + ) .await; } } diff --git a/pumpkin/src/entity/player.rs b/pumpkin/src/entity/player.rs index dd8c23b4..ab388f3c 100644 --- a/pumpkin/src/entity/player.rs +++ b/pumpkin/src/entity/player.rs @@ -353,110 +353,97 @@ impl Player { server: &Arc, packet: &mut RawPacket, ) -> Result<(), Box> { + /* + log::debug!( + "Handling player packet with id {} for {}", + packet.id.0, + self.client.id + ); + */ + let bytebuf = &mut packet.bytebuf; match packet.id.0 { SConfirmTeleport::PACKET_ID => { self.handle_confirm_teleport(SConfirmTeleport::read(bytebuf)?) .await; - Ok(()) } SChatCommand::PACKET_ID => { self.handle_chat_command(server, SChatCommand::read(bytebuf)?) .await; - Ok(()) } SPlayerPosition::PACKET_ID => { self.handle_position(SPlayerPosition::read(bytebuf)?).await; - Ok(()) } SPlayerPositionRotation::PACKET_ID => { self.handle_position_rotation(SPlayerPositionRotation::read(bytebuf)?) .await; - Ok(()) } SPlayerRotation::PACKET_ID => { self.handle_rotation(SPlayerRotation::read(bytebuf)?).await; - Ok(()) } SSetPlayerGround::PACKET_ID => { self.handle_player_ground(&SSetPlayerGround::read(bytebuf)?); - Ok(()) } SPlayerCommand::PACKET_ID => { self.handle_player_command(SPlayerCommand::read(bytebuf)?) .await; - Ok(()) } SSwingArm::PACKET_ID => { self.handle_swing_arm(SSwingArm::read(bytebuf)?).await; - Ok(()) } SChatMessage::PACKET_ID => { self.handle_chat_message(SChatMessage::read(bytebuf)?).await; - Ok(()) } SClientInformationPlay::PACKET_ID => { self.handle_client_information_play(SClientInformationPlay::read(bytebuf)?) .await; - Ok(()) } SInteract::PACKET_ID => { self.handle_interact(server, SInteract::read(bytebuf)?) .await; - Ok(()) } SPlayerAction::PACKET_ID => { self.handle_player_action(SPlayerAction::read(bytebuf)?) .await; - Ok(()) } SPlayerAbilities::PACKET_ID => { self.handle_player_abilities(SPlayerAbilities::read(bytebuf)?) .await; - Ok(()) } SUseItemOn::PACKET_ID => { self.handle_use_item_on(SUseItemOn::read(bytebuf)?).await; - Ok(()) } SUseItem::PACKET_ID => { self.handle_use_item(&SUseItem::read(bytebuf)?); - Ok(()) } SSetHeldItem::PACKET_ID => { self.handle_set_held_item(SSetHeldItem::read(bytebuf)?) .await; - Ok(()) } SSetCreativeSlot::PACKET_ID => { self.handle_set_creative_slot(SSetCreativeSlot::read(bytebuf)?) .await?; - Ok(()) } SPlayPingRequest::PACKET_ID => { self.handle_play_ping_request(SPlayPingRequest::read(bytebuf)?) .await; - Ok(()) } SClickContainer::PACKET_ID => { self.handle_click_container(server, SClickContainer::read(bytebuf)?) .await?; - Ok(()) } SCloseContainer::PACKET_ID => { self.handle_close_container(server, SCloseContainer::read(bytebuf)?) .await; - Ok(()) } SKeepAlive::PACKET_ID => { self.handle_keep_alive(SKeepAlive::read(bytebuf)?).await; - Ok(()) } _ => { log::error!("Failed to handle player packet id {:#04x}", packet.id.0); - Ok(()) } - } + }; + Ok(()) } } diff --git a/pumpkin/src/main.rs b/pumpkin/src/main.rs index a25ed3a4..d6e5cde4 100644 --- a/pumpkin/src/main.rs +++ b/pumpkin/src/main.rs @@ -30,6 +30,7 @@ use log::LevelFilter; use client::Client; use server::{ticker::Ticker, Server}; use std::io::{self}; +use tokio::io::{AsyncBufReadExt, BufReader}; // Setup some tokens to allow us to identify which event is for which socket. @@ -140,11 +141,14 @@ async fn main() -> io::Result<()> { if use_console { let server = server.clone(); tokio::spawn(async move { - let stdin = std::io::stdin(); + let stdin = tokio::io::stdin(); + let mut reader = BufReader::new(stdin); loop { let mut out = String::new(); - stdin + + reader .read_line(&mut out) + .await .expect("Failed to read console line"); if !out.is_empty() { @@ -205,6 +209,7 @@ async fn main() -> io::Result<()> { .load(std::sync::atomic::Ordering::Relaxed) { let id = client.id; + log::debug!("Creating player for id {}", id); let (player, world) = server.add_player(id, client).await; world.spawn_player(&BASIC_CONFIG, player.clone()).await; // poll Player diff --git a/pumpkin/src/server/ticker.rs b/pumpkin/src/server/ticker.rs index a8395b6a..ea7d3284 100644 --- a/pumpkin/src/server/ticker.rs +++ b/pumpkin/src/server/ticker.rs @@ -1,5 +1,7 @@ use std::time::{Duration, Instant}; +use tokio::time::sleep; + use super::Server; pub struct Ticker { @@ -28,7 +30,7 @@ impl Ticker { } else { // Wait for the remaining time until the next tick let sleep_time = self.tick_interval - elapsed; - std::thread::sleep(sleep_time); + sleep(sleep_time).await; } } } diff --git a/pumpkin/src/world/mod.rs b/pumpkin/src/world/mod.rs index 5626862f..f1d0e002 100644 --- a/pumpkin/src/world/mod.rs +++ b/pumpkin/src/world/mod.rs @@ -8,8 +8,10 @@ use crate::{ }; use num_traits::ToPrimitive; use pumpkin_config::BasicConfiguration; +use pumpkin_core::math::position::WorldPosition; use pumpkin_core::math::vector2::Vector2; use pumpkin_entity::{entity_type::EntityType, EntityId}; +use pumpkin_protocol::client::play::{CBlockUpdate, CWorldEvent}; use pumpkin_protocol::{ client::play::{ CChunkData, CGameEvent, CLogin, CPlayerAbilities, CPlayerInfoUpdate, CRemoveEntities, @@ -17,10 +19,13 @@ use pumpkin_protocol::{ }, ClientPacket, VarInt, }; +use pumpkin_world::block::BlockId; +use pumpkin_world::chunk::ChunkData; +use pumpkin_world::coordinates::ChunkRelativeBlockCoordinates; use pumpkin_world::level::Level; use scoreboard::Scoreboard; -use tokio::sync::mpsc; use tokio::sync::Mutex; +use tokio::sync::{mpsc, RwLock}; pub mod scoreboard; @@ -93,8 +98,13 @@ impl World { // This code follows the vanilla packet order let entity_id = player.entity_id(); let gamemode = player.gamemode.load(); - log::debug!("spawning player, entity id {}", entity_id); + log::debug!( + "spawning player {}, entity id {}", + player.client.id, + entity_id + ); + log::debug!("Sending login packet to {}", player.client.id); // login packet for our new player player .client @@ -120,9 +130,10 @@ impl World { false, )) .await; - log::debug!("sending abilities"); + // player abilities // TODO: this is for debug purpose, remove later + log::debug!("Sending player abilities to {}", player.client.id); player .client .send_packet(&CPlayerAbilities::new(0x02, 0.4, 0.1)) @@ -134,6 +145,8 @@ impl World { let z = 10.0; let yaw = 10.0; let pitch = 10.0; + + log::debug!("Sending player teleport to {}", player.client.id); player.teleport(x, y, z, yaw, pitch).await; let pos = player.living_entity.entity.pos.load(); @@ -142,6 +155,7 @@ impl World { let gameprofile = &player.gameprofile; // first send info update to our new player, So he can see his Skin // also send his info to everyone else + log::debug!("Broadcasting player info for {}", player.client.id); self.broadcast_packet_all(&CPlayerInfoUpdate::new( 0x01 | 0x08, &[pumpkin_protocol::client::play::Player { @@ -177,6 +191,7 @@ impl World { ], }); } + log::debug!("Sending player info to {}", player.client.id); player .client .send_packet(&CPlayerInfoUpdate::new(0x01 | 0x08, &entries)) @@ -185,6 +200,7 @@ impl World { let gameprofile = &player.gameprofile; + log::debug!("Broadcasting player spawn for {}", player.client.id); // spawn player for every client self.broadcast_packet_expect( &[player.client.id], @@ -218,6 +234,7 @@ impl World { let entity = &existing_player.living_entity.entity; let pos = entity.pos.load(); let gameprofile = &existing_player.gameprofile; + log::debug!("Sending player entities to {}", player.client.id); player .client .send_packet(&CSpawnEntity::new( @@ -244,10 +261,12 @@ impl World { entity_id.into(), Metadata::new(17, VarInt(0), config.skin_parts), ); + log::debug!("Broadcasting skin for {}", player.client.id); self.broadcast_packet_all(&packet).await; } // Start waiting for level chunks, Sets the "Loading Terrain" screen + log::debug!("Sending waiting chunks to {}", player.client.id); player .client .send_packet(&CGameEvent::new(GameEvent::StartWaitingChunks, 0.0)) @@ -259,15 +278,15 @@ impl World { pub async fn mark_chunks_as_not_watched(&self, chunks: &[Vector2]) { let level = self.level.lock().await; - level.mark_chunk_as_not_watched_and_clean(chunks); + level.mark_chunk_as_not_watched_and_clean(chunks).await; } pub async fn mark_chunks_as_watched(&self, chunks: &[Vector2]) { let level = self.level.lock().await; - level.mark_chunk_as_newly_watched(chunks); + level.mark_chunk_as_newly_watched(chunks).await; } - fn spawn_world_chunks(&self, client: Arc, chunks: Vec>, distance: i32) { + async fn spawn_world_chunks(&self, client: Arc, chunks: Vec>) { if client.closed.load(std::sync::atomic::Ordering::Relaxed) { log::info!( "The connection with {} has closed before world chunks were spawned", @@ -276,23 +295,17 @@ impl World { return; } let inst = std::time::Instant::now(); - let (sender, mut chunk_receiver) = mpsc::channel(distance as usize); + let chunks = self.get_chunks(chunks).await; - let level = self.level.clone(); - let chunks = Arc::new(chunks); tokio::spawn(async move { - let level = level.lock().await; - level.fetch_chunks(&chunks, sender); - }); - - tokio::spawn(async move { - while let Some(chunk_data) = chunk_receiver.recv().await { - // dbg!(chunk_pos); + for chunk_data in chunks { + let chunk_data = chunk_data.read().await; + let packet = CChunkData(&chunk_data); #[cfg(debug_assertions)] if chunk_data.position == (0, 0).into() { use pumpkin_protocol::bytebuf::ByteBuffer; let mut test = ByteBuffer::empty(); - CChunkData(&chunk_data).write(&mut test); + packet.write(&mut test); let len = test.buf().len(); log::debug!( "Chunk packet size: {}B {}KB {}MB", @@ -305,7 +318,7 @@ impl World { // TODO: Queue player packs in a queue so we don't need to check if its closed before // sending if !client.closed.load(std::sync::atomic::Ordering::Relaxed) { - client.send_packet(&CChunkData(&chunk_data)).await; + client.send_packet(&packet).await; } } @@ -357,4 +370,55 @@ impl World { self.broadcast_packet_all(&CRemoveEntities::new(&[entity.entity_id.into()])) .await; } + pub async fn set_block(&self, position: WorldPosition, block_id: BlockId) { + let (chunk_coordinate, relative_coordinates) = position.chunk_and_chunk_relative_position(); + + // Since we divide by 16 remnant can never exceed u8 + let relative = ChunkRelativeBlockCoordinates::from(relative_coordinates); + + let chunk = self.get_chunks(vec![chunk_coordinate]).await[0].clone(); + chunk.write().await.blocks.set_block(relative, block_id); + + self.broadcast_packet_all(&CBlockUpdate::new( + &position, + i32::from(block_id.data).into(), + )) + .await; + } + + pub async fn get_chunks(&self, chunks: Vec>) -> Vec>> { + let (sender, mut receive) = mpsc::channel(chunks.len()); + { + let level = self.level.clone(); + tokio::spawn(async move { level.lock().await.fetch_chunks(&chunks, sender) }); + } + tokio::spawn(async move { + let mut received = vec![]; + + while let Some(chunk) = receive.recv().await { + received.push(chunk); + } + received + }) + .await + .unwrap() + } + + pub async fn break_block(&self, position: WorldPosition) { + self.set_block(position, BlockId { data: 0 }).await; + + self.broadcast_packet_all(&CWorldEvent::new(2001, &position, 11, false)) + .await; + } + + pub async fn get_block(&self, position: WorldPosition) -> BlockId { + let (chunk, relative) = position.chunk_and_chunk_relative_position(); + let relative = ChunkRelativeBlockCoordinates::from(relative); + self.get_chunks(vec![chunk]).await[0] + .clone() + .read() + .await + .blocks + .get_block(relative) + } } diff --git a/pumpkin/src/world/player_chunker.rs b/pumpkin/src/world/player_chunker.rs index 5cfc89fa..7842245f 100644 --- a/pumpkin/src/world/player_chunker.rs +++ b/pumpkin/src/world/player_chunker.rs @@ -25,6 +25,8 @@ pub async fn player_join(world: &World, player: Arc) { player.watched_section.store(new_watched); let watched_section = new_watched; let chunk_pos = player.living_entity.entity.chunk_pos.load(); + + log::debug!("Sending center chunk to {}", player.client.id); player .client .send_packet(&CCenterChunk { @@ -34,10 +36,12 @@ pub async fn player_join(world: &World, player: Arc) { .await; let view_distance = i32::from(get_view_distance(&player).await); log::debug!( - "Player {} joined with view distance: {}", + "Player {} ({}) joined with view distance: {}", player.gameprofile.name, + player.client.id, view_distance ); + let old_cylindrical = Cylindrical::new( Vector2::new(watched_section.x, watched_section.z), view_distance, @@ -59,9 +63,20 @@ pub async fn player_join(world: &World, player: Arc) { }, true, ); + + log::debug!( + "{} added {} remove ({}) for {}", + loading_chunks.len(), + unloading_chunks.len(), + view_distance, + player.client.id + ); + if !loading_chunks.is_empty() { world.mark_chunks_as_watched(&loading_chunks).await; - world.spawn_world_chunks(player.client.clone(), loading_chunks, view_distance); + world + .spawn_world_chunks(player.client.clone(), loading_chunks) + .await; } if !unloading_chunks.is_empty() { @@ -121,7 +136,8 @@ pub async fn update_position(player: &Player) { entity.world.mark_chunks_as_watched(&loading_chunks).await; entity .world - .spawn_world_chunks(player.client.clone(), loading_chunks, view_distance); + .spawn_world_chunks(player.client.clone(), loading_chunks) + .await; } if !unloading_chunks.is_empty() {