diff --git a/docs/troubleshooting/common_issues.md b/docs/troubleshooting/common_issues.md index 98786f7fa..13815323a 100644 --- a/docs/troubleshooting/common_issues.md +++ b/docs/troubleshooting/common_issues.md @@ -18,13 +18,7 @@ **Cause:** The server is currently not calculating hit boxes for blocks, we're working on that. -3. ### The Server is unresponsive - - **Issue:** You have to wait before reconnecting or can't do basic things while chunks are loading. - - **Cause:** The server has currently blocking issues, we're working on that. - -4. ### Failed to verify username +3. ### Failed to verify username **Issue:** Some players reported having issues logging into the Server, including a "Failed to verify username" error. diff --git a/pumpkin-core/src/text/mod.rs b/pumpkin-core/src/text/mod.rs index 706c034ea..d953e0e4b 100644 --- a/pumpkin-core/src/text/mod.rs +++ b/pumpkin-core/src/text/mod.rs @@ -40,6 +40,13 @@ impl<'a> TextComponent<'a> { } } + pub fn text_string(text: String) -> Self { + Self { + content: TextContent::Text { text: text.into() }, + style: Style::default(), + } + } + pub fn to_pretty_console(self) -> String { let style = self.style; let color = style.color; diff --git a/pumpkin-inventory/Cargo.toml b/pumpkin-inventory/Cargo.toml index abeb7cfd1..c4dc9c41b 100644 --- a/pumpkin-inventory/Cargo.toml +++ b/pumpkin-inventory/Cargo.toml @@ -11,5 +11,5 @@ num-traits = "0.2" num-derive = "0.4" thiserror = "1.0.63" itertools.workspace = true -parking_lot.workspace = true crossbeam.workspace = true +tokio.workspace = true diff --git a/pumpkin-inventory/src/drag_handler.rs b/pumpkin-inventory/src/drag_handler.rs index 8479c3635..1d7fffa8b 100644 --- a/pumpkin-inventory/src/drag_handler.rs +++ b/pumpkin-inventory/src/drag_handler.rs @@ -2,10 +2,10 @@ use crate::container_click::MouseDragType; use crate::{Container, InventoryError}; use itertools::Itertools; use num_traits::Euclid; -use parking_lot::{Mutex, RwLock}; use pumpkin_world::item::ItemStack; use std::collections::HashMap; use std::sync::Arc; +use tokio::sync::{Mutex, RwLock}; #[derive(Debug, Default)] pub struct DragHandler(RwLock>>>); @@ -13,7 +13,7 @@ impl DragHandler { pub fn new() -> Self { Self(RwLock::new(HashMap::new())) } - pub fn new_drag( + pub async fn new_drag( &self, container_id: u64, player: i32, @@ -24,21 +24,21 @@ impl DragHandler { drag_type, slots: vec![], }; - let mut drags = self.0.write(); + let mut drags = self.0.write().await; drags.insert(container_id, Arc::new(Mutex::new(drag))); Ok(()) } - pub fn add_slot( + pub async fn add_slot( &self, container_id: u64, player: i32, slot: usize, ) -> Result<(), InventoryError> { - let drags = self.0.read(); + let drags = self.0.read().await; match drags.get(&container_id) { Some(drag) => { - let mut drag = drag.lock(); + let mut drag = drag.lock().await; if drag.player != player { Err(InventoryError::MultiplePlayersDragging)? } @@ -51,7 +51,7 @@ impl DragHandler { Ok(()) } - pub fn apply_drag( + pub async fn apply_drag( &self, maybe_carried_item: &mut Option, container: &mut T, @@ -63,11 +63,11 @@ impl DragHandler { return Ok(()); } - let mut drags = self.0.write(); + let mut drags = self.0.write().await; let Some((_, drag)) = drags.remove_entry(container_id) else { Err(InventoryError::OutOfOrderDragging)? }; - let drag = drag.lock(); + let drag = drag.lock().await; if player != drag.player { Err(InventoryError::MultiplePlayersDragging)? diff --git a/pumpkin-inventory/src/open_container.rs b/pumpkin-inventory/src/open_container.rs index 261ba3b74..882b04998 100644 --- a/pumpkin-inventory/src/open_container.rs +++ b/pumpkin-inventory/src/open_container.rs @@ -1,7 +1,7 @@ use crate::{Container, WindowType}; -use parking_lot::Mutex; use pumpkin_world::item::ItemStack; use std::sync::Arc; +use tokio::sync::Mutex; pub struct OpenContainer { players: Vec, diff --git a/pumpkin-protocol/src/lib.rs b/pumpkin-protocol/src/lib.rs index 1ed4cfdb8..f4ef93ce1 100644 --- a/pumpkin-protocol/src/lib.rs +++ b/pumpkin-protocol/src/lib.rs @@ -143,8 +143,8 @@ pub enum PacketError { FailedWrite(String), #[error("failed to flush decoder")] FailedFinish, - #[error("failed to write encoded packet to connection: {0}")] - ConnectionWrite(String), + #[error("failed to write encoded packet to connection")] + ConnectionWrite, #[error("packet exceeds maximum length")] TooLong, #[error("packet length is out of bounds")] diff --git a/pumpkin-world/src/cylindrical_chunk_iterator.rs b/pumpkin-world/src/cylindrical_chunk_iterator.rs index bb7ca9bf3..185505d28 100644 --- a/pumpkin-world/src/cylindrical_chunk_iterator.rs +++ b/pumpkin-world/src/cylindrical_chunk_iterator.rs @@ -19,7 +19,7 @@ impl Cylindrical { old_cylindrical: Cylindrical, new_cylindrical: Cylindrical, mut newly_included: impl FnMut(Vector2), - just_removed: impl FnMut(Vector2), + mut just_removed: impl FnMut(Vector2), ignore: bool, ) { let min_x = old_cylindrical.left().min(new_cylindrical.left()); @@ -29,26 +29,24 @@ impl Cylindrical { for x in min_x..=max_x { for z in min_z..=max_z { - // TODO - // let old_is_within = if ignore { - // false - // } else { - // old_cylindrical.is_within_distance(x, z) - // }; - // let new_is_within = if ignore { - // true - // } else { - // new_cylindrical.is_within_distance(x, z) - // }; + let old_is_within = if ignore { + false + } else { + old_cylindrical.is_within_distance(x, z) + }; + let new_is_within = if ignore { + true + } else { + new_cylindrical.is_within_distance(x, z) + }; - // if old_is_within != new_is_within { - // if new_is_within { - newly_included(Vector2::new(x, z)); - // } else { - // dbg!("aa"); - // just_removed(Vector2::new(x, z)); - // } - // } + if old_is_within != new_is_within { + if new_is_within { + newly_included(Vector2::new(x, z)); + } else { + just_removed(Vector2::new(x, z)); + } + } } } } diff --git a/pumpkin-world/src/level.rs b/pumpkin-world/src/level.rs index c210232ce..e7e5327cb 100644 --- a/pumpkin-world/src/level.rs +++ b/pumpkin-world/src/level.rs @@ -6,7 +6,9 @@ use rayon::prelude::*; use tokio::sync::mpsc; use crate::{ - chunk::{anvil::AnvilChunkReader, ChunkData, ChunkReader, ChunkReadingError}, + chunk::{ + anvil::AnvilChunkReader, ChunkData, ChunkParsingError, ChunkReader, ChunkReadingError, + }, world_gen::{get_world_gen, Seed, WorldGenerator}, }; @@ -96,7 +98,10 @@ impl Level { let data = match &self.save_file { Some(save_file) => { match self.chunk_reader.read_chunk(save_file, at) { - Err(ChunkReadingError::ChunkNotExist) => { + Err( + ChunkReadingError::ParsingError(ChunkParsingError::ChunkNotGenerated) + | ChunkReadingError::ChunkNotExist, + ) => { // This chunk was not generated yet. Ok(self.world_gen.generate_chunk(at)) } diff --git a/pumpkin/Cargo.toml b/pumpkin/Cargo.toml index 89091407b..66ad44558 100644 --- a/pumpkin/Cargo.toml +++ b/pumpkin/Cargo.toml @@ -59,10 +59,6 @@ png = "0.17.14" simple_logger = { version = "5.0.0", features = ["threads"] } log.workspace = true -# networking -mio = { version = "1.0.2", features = ["net", "os-poll"] } - -parking_lot.workspace = true crossbeam.workspace = true uuid.workspace = true tokio.workspace = true diff --git a/pumpkin/src/client/client_packet.rs b/pumpkin/src/client/client_packet.rs index aba49e761..a0daf76f9 100644 --- a/pumpkin/src/client/client_packet.rs +++ b/pumpkin/src/client/client_packet.rs @@ -34,35 +34,36 @@ use super::{authentication::AuthError, Client, PlayerConfig}; /// NEVER TRUST THE CLIENT. HANDLE EVERY ERROR, UNWRAP/EXPECT /// TODO: REMOVE ALL UNWRAPS impl Client { - pub fn handle_handshake(&self, handshake: SHandShake) { + pub async fn handle_handshake(&self, handshake: SHandShake) { dbg!("handshake"); let version = handshake.protocol_version.0; self.protocol_version .store(version, std::sync::atomic::Ordering::Relaxed); - *self.server_address.lock() = handshake.server_address; + *self.server_address.lock().await = handshake.server_address; self.connection_state.store(handshake.next_state); if self.connection_state.load() != ConnectionState::Status { let protocol = version; match protocol.cmp(&(CURRENT_MC_PROTOCOL as i32)) { std::cmp::Ordering::Less => { - self.kick(&format!("Client outdated ({protocol}), Server uses Minecraft {CURRENT_MC_VERSION}, Protocol {CURRENT_MC_PROTOCOL}")); + self.kick(&format!("Client outdated ({protocol}), Server uses Minecraft {CURRENT_MC_VERSION}, Protocol {CURRENT_MC_PROTOCOL}")).await; } std::cmp::Ordering::Equal => {} std::cmp::Ordering::Greater => { - self.kick(&format!("Server outdated, Server uses Minecraft {CURRENT_MC_VERSION}, Protocol {CURRENT_MC_PROTOCOL}")); + self.kick(&format!("Server outdated, Server uses Minecraft {CURRENT_MC_VERSION}, Protocol {CURRENT_MC_PROTOCOL}")).await; } } } } - pub fn handle_status_request(&self, server: &Server, _status_request: SStatusRequest) { - self.send_packet(&server.get_status()); + pub async fn handle_status_request(&self, server: &Server, _status_request: SStatusRequest) { + self.send_packet(&server.get_status()).await; } - pub fn handle_ping_request(&self, ping_request: SStatusPingRequest) { + pub async fn handle_ping_request(&self, ping_request: SStatusPingRequest) { dbg!("ping"); - self.send_packet(&CPingResponse::new(ping_request.payload)); + self.send_packet(&CPingResponse::new(ping_request.payload)) + .await; self.close(); } @@ -73,28 +74,28 @@ impl Client { .all(|c| c > 32_u8 as char && c < 127_u8 as char) } - pub fn handle_login_start(&self, server: &Server, login_start: SLoginStart) { + pub async fn handle_login_start(&self, server: &Server, login_start: SLoginStart) { log::debug!("login start, State {:?}", self.connection_state); if !Self::is_valid_player_name(&login_start.name) { - self.kick("Invalid characters in username"); + self.kick("Invalid characters in username").await; return; } // default game profile, when no online mode // TODO: make offline uuid - let mut gameprofile = self.gameprofile.lock(); + let mut gameprofile = self.gameprofile.lock().await; let proxy = &ADVANCED_CONFIG.proxy; if proxy.enabled { if proxy.velocity.enabled { - velocity_login(self); + velocity_login(self).await; } else if proxy.bungeecord.enabled { - match bungeecord::bungeecord_login(self, login_start.name) { + match bungeecord::bungeecord_login(self, login_start.name).await { Ok((_ip, profile)) => { // self.address.lock() = ip; - self.finish_login(&profile); + self.finish_login(&profile).await; *gameprofile = Some(profile); } - Err(error) => self.kick(&error.to_string()), + Err(error) => self.kick(&error.to_string()).await, } } } else { @@ -107,7 +108,8 @@ impl Client { // TODO: check config for encryption let verify_token: [u8; 4] = rand::random(); - self.send_packet(&server.encryption_request(&verify_token, BASIC_CONFIG.online_mode)); + self.send_packet(&server.encryption_request(&verify_token, BASIC_CONFIG.online_mode)) + .await; } } @@ -118,10 +120,11 @@ impl Client { ) { let shared_secret = server.decrypt(&encryption_response.shared_secret).unwrap(); - self.set_encryption(Some(&shared_secret)) - .unwrap_or_else(|e| self.kick(&e.to_string())); - - let mut gameprofile = self.gameprofile.lock(); + if let Err(error) = self.set_encryption(Some(&shared_secret)).await { + self.kick(&error.to_string()).await; + return; + } + let mut gameprofile = self.gameprofile.lock().await; if BASIC_CONFIG.online_mode { match self @@ -130,30 +133,31 @@ impl Client { { Ok(profile) => *gameprofile = Some(profile), Err(e) => { - self.kick(&e.to_string()); + self.kick(&e.to_string()).await; } } } if let Some(profile) = gameprofile.as_ref() { if ADVANCED_CONFIG.packet_compression.enabled { - self.enable_compression(); + self.enable_compression().await; } - self.finish_login(profile); + self.finish_login(profile).await; } else { - self.kick("No Game profile"); + self.kick("No Game profile").await; } } - fn enable_compression(&self) { + async fn enable_compression(&self) { let compression = ADVANCED_CONFIG.packet_compression.compression_info.clone(); - self.send_packet(&CSetCompression::new(compression.threshold.into())); - self.set_compression(Some(compression)); + self.send_packet(&CSetCompression::new(compression.threshold.into())) + .await; + self.set_compression(Some(compression)).await; } - fn finish_login(&self, profile: &GameProfile) { + async fn finish_login(&self, profile: &GameProfile) { let packet = CLoginSuccess::new(&profile.id, &profile.name, &profile.properties, false); - self.send_packet(&packet); + self.send_packet(&packet).await; } async fn autenticate( @@ -164,7 +168,7 @@ impl Client { ) -> Result { if let Some(auth_client) = &server.auth_client { let hash = server.digest_secret(shared_secret); - let ip = self.address.lock().ip(); + let ip = self.address.lock().await.ip(); let profile = authentication::authenticate(username, &hash, &ip, auth_client).await?; // Check if player should join @@ -199,32 +203,32 @@ impl Client { Err(AuthError::MissingAuthClient) } - pub fn handle_plugin_response(&self, plugin_response: SLoginPluginResponse) { + pub async fn handle_plugin_response(&self, plugin_response: SLoginPluginResponse) { let velocity_config = &ADVANCED_CONFIG.proxy.velocity; if velocity_config.enabled { - let mut address = self.address.lock(); + let mut address = self.address.lock().await; match velocity::receive_velocity_plugin_response( address.port(), velocity_config, plugin_response, ) { Ok((profile, new_address)) => { - self.finish_login(&profile); - *self.gameprofile.lock() = Some(profile); + self.finish_login(&profile).await; + *self.gameprofile.lock().await = Some(profile); *address = new_address } - Err(error) => self.kick(&error.to_string()), + Err(error) => self.kick(&error.to_string()).await, } } } - pub fn handle_login_acknowledged( + pub async fn handle_login_acknowledged( &self, server: &Server, _login_acknowledged: SLoginAcknowledged, ) { self.connection_state.store(ConnectionState::Config); - self.send_packet(&server.get_branding()); + self.send_packet(&server.get_branding()).await; let resource_config = &ADVANCED_CONFIG.resource_pack; if resource_config.enabled { @@ -243,7 +247,7 @@ impl Client { }, ); - self.send_packet(&resource_pack); + self.send_packet(&resource_pack).await; } // known data packs @@ -251,16 +255,20 @@ impl Client { namespace: "minecraft", id: "core", version: "1.21", - }])); + }])) + .await; dbg!("login acknowledged"); } - pub fn handle_client_information_config(&self, client_information: SClientInformationConfig) { + pub async fn handle_client_information_config( + &self, + client_information: SClientInformationConfig, + ) { dbg!("got client settings"); if let (Some(main_hand), Some(chat_mode)) = ( Hand::from_i32(client_information.main_hand.into()), ChatMode::from_i32(client_information.chat_mode.into()), ) { - *self.config.lock() = Some(PlayerConfig { + *self.config.lock().await = Some(PlayerConfig { locale: client_information.locale, view_distance: client_information.view_distance, chat_mode, @@ -271,33 +279,34 @@ impl Client { server_listing: client_information.server_listing, }); } else { - self.kick("Invalid hand or chat type") + self.kick("Invalid hand or chat type").await } } - pub fn handle_plugin_message(&self, plugin_message: SPluginMessage) { + pub async fn handle_plugin_message(&self, plugin_message: SPluginMessage) { if plugin_message.channel.starts_with("minecraft:brand") || plugin_message.channel.starts_with("MC|Brand") { dbg!("got a client brand"); match String::from_utf8(plugin_message.data) { - Ok(brand) => *self.brand.lock() = Some(brand), - Err(e) => self.kick(&e.to_string()), + Ok(brand) => *self.brand.lock().await = Some(brand), + Err(e) => self.kick(&e.to_string()).await, } } } - pub fn handle_known_packs(&self, server: &Server, _config_acknowledged: SKnownPacks) { + pub async fn handle_known_packs(&self, server: &Server, _config_acknowledged: SKnownPacks) { for registry in &server.cached_registry { self.send_packet(&CRegistryData::new( ®istry.registry_id, ®istry.registry_entries, - )); + )) + .await; } // We are done with configuring dbg!("finish config"); - self.send_packet(&CFinishConfig::new()); + self.send_packet(&CFinishConfig::new()).await; } pub async fn handle_config_acknowledged(&self, _config_acknowledged: SAcknowledgeFinishConfig) { diff --git a/pumpkin/src/client/container.rs b/pumpkin/src/client/container.rs index 3e91a383a..ac9b72470 100644 --- a/pumpkin/src/client/container.rs +++ b/pumpkin/src/client/container.rs @@ -1,7 +1,6 @@ use crate::entity::player::Player; use crate::server::Server; use itertools::Itertools; -use parking_lot::Mutex; use pumpkin_core::text::TextComponent; use pumpkin_core::GameMode; use pumpkin_inventory::container_click::{ @@ -19,15 +18,17 @@ use pumpkin_protocol::slot::Slot; use pumpkin_world::item::ItemStack; use std::sync::Arc; +#[expect(unused)] + impl Player { - pub fn open_container(&self, server: &Server, minecraft_menu_id: &str) { - let inventory = self.inventory.lock(); + pub async fn open_container(&self, server: &Server, minecraft_menu_id: &str) { + let inventory = self.inventory.lock().await; inventory .state_id .store(0, std::sync::atomic::Ordering::Relaxed); let total_opened_containers = inventory.total_opened_containers; let container = self.get_open_container(server); - let mut container = container.as_ref().map(|container| container.lock()); + let container = container.as_ref().map(|container| container.lock()); let menu_protocol_id = (*pumpkin_world::global_registry::REGISTRY .get("minecraft:menu") .unwrap() @@ -37,23 +38,25 @@ impl Player { .get("protocol_id") .unwrap()) .into(); - let window_title = container - .as_ref() - .map(|container| container.window_name()) - .unwrap_or_else(|| inventory.window_name()); + let window_title = match container { + Some(container) => container.await.window_name(), + None => inventory.window_name(), + }; let title = TextComponent::text(window_title); - self.client.send_packet(&COpenScreen::new( - total_opened_containers.into(), - menu_protocol_id, - title, - )); + self.client + .send_packet(&COpenScreen::new( + total_opened_containers.into(), + menu_protocol_id, + title, + )) + .await; drop(inventory); - self.set_container_content(container.as_deref_mut()); + // self.set_container_content(container.as_deref_mut()); } - pub fn set_container_content(&self, container: Option<&mut Box>) { - let mut inventory = self.inventory.lock(); + pub async fn set_container_content(&self, container: Option<&mut Box>) { + let mut inventory = self.inventory.lock().await; let total_opened_containers = inventory.total_opened_containers; let container = OptionallyCombinedContainer::new(&mut inventory, container); @@ -80,27 +83,30 @@ impl Player { &slots, &carried_item, ); - self.client.send_packet(&packet); + self.client.send_packet(&packet).await; } /// The official Minecraft client is weird, and will always just close *any* window that is opened when this gets sent - pub fn close_container(&self) { - let mut inventory = self.inventory.lock(); + pub async fn close_container(&self) { + let mut inventory = self.inventory.lock().await; inventory.total_opened_containers += 1; self.client .send_packet(&CCloseContainer::new(inventory.total_opened_containers)) + .await } - pub fn set_container_property( + pub async fn set_container_property( &mut self, window_property: WindowProperty, ) { let (id, value) = window_property.into_tuple(); - self.client.send_packet(&CSetContainerProperty::new( - self.inventory.lock().total_opened_containers, - id, - value, - )); + self.client + .send_packet(&CSetContainerProperty::new( + self.inventory.lock().await.total_opened_containers, + id, + value, + )) + .await; } pub async fn handle_click_container( @@ -109,22 +115,23 @@ impl Player { packet: SClickContainer, ) -> Result<(), InventoryError> { let opened_container = self.get_open_container(server); - let mut opened_container = opened_container.as_ref().map(|container| container.lock()); + let opened_container = opened_container.as_ref().map(|container| container.lock()); let drag_handler = &server.drag_handler; let state_id = self .inventory .lock() + .await .state_id .load(std::sync::atomic::Ordering::Relaxed); // This is just checking for regular desync, client hasn't done anything malicious if state_id != packet.state_id.0 as u32 { - self.set_container_content(opened_container.as_deref_mut()); + // self.set_container_content(opened_container.as_deref_mut()); return Ok(()); } if opened_container.is_some() { - if packet.window_id != self.inventory.lock().total_opened_containers { + if packet.window_id != self.inventory.lock().await.total_opened_containers { return Err(InventoryError::ClosedContainerInteract(self.entity_id())); } } else if packet.window_id != 0 { @@ -144,49 +151,56 @@ impl Player { match click.click_type { ClickType::MouseClick(mouse_click) => { - self.mouse_click(opened_container.as_deref_mut(), mouse_click, click.slot) + // self.mouse_click(opened_container.as_deref_mut(), mouse_click, click.slot).await + todo!() } ClickType::ShiftClick => { - self.shift_mouse_click(opened_container.as_deref_mut(), click.slot) + // self.shift_mouse_click(opened_container.as_deref_mut(), click.slot).await + todo!() + } + ClickType::KeyClick(key_click) => { + todo!() + // container_click::Slot::Normal(slot) => { + // self.number_button_pressed(opened_container.as_deref_mut(), key_click, slot).await + // } + // container_click::Slot::OutsideInventory => Err(InventoryError::InvalidPacket), } - ClickType::KeyClick(key_click) => match click.slot { - container_click::Slot::Normal(slot) => { - self.number_button_pressed(opened_container.as_deref_mut(), key_click, slot) - } - container_click::Slot::OutsideInventory => Err(InventoryError::InvalidPacket), - }, ClickType::CreativePickItem => { - if let container_click::Slot::Normal(slot) = click.slot { - self.creative_pick_item(opened_container.as_deref_mut(), slot) - } else { - Err(InventoryError::InvalidPacket) - } + // if let container_click::Slot::Normal(slot) = click.slot { + // self.creative_pick_item(opened_container.as_deref_mut(), slot).await + // } else { + // Err(InventoryError::InvalidPacket) + // } + todo!() } ClickType::DoubleClick => { update_whole_container = true; - if let container_click::Slot::Normal(slot) = click.slot { - self.double_click(opened_container.as_deref_mut(), slot) - } else { - Err(InventoryError::InvalidPacket) - } + // if let container_click::Slot::Normal(slot) = click.slot { + // self.double_click(opened_container.as_deref_mut(), slot) + // } else { + // Err(InventoryError::InvalidPacket) + // } + todo!() } ClickType::MouseDrag { drag_state } => { if drag_state == MouseDragState::End { update_whole_container = true; } - self.mouse_drag(drag_handler, opened_container.as_deref_mut(), drag_state) + todo!() + // self.mouse_drag(drag_handler, opened_container.as_deref_mut(), drag_state) } ClickType::DropType(_drop_type) => { dbg!("todo"); Ok(()) } }?; - if let Some(mut opened_container) = opened_container { + if let Some(opened_container) = opened_container { if update_whole_container { drop(opened_container); self.send_whole_container_change(server).await?; } else if let container_click::Slot::Normal(slot_index) = click.slot { - let mut inventory = self.inventory.lock(); + let mut inventory = self.inventory.lock().await; + let mut opened_container = opened_container.await; let combined_container = OptionallyCombinedContainer::new(&mut inventory, Some(&mut opened_container)); if let Some(slot) = combined_container.get_slot_excluding_inventory(slot_index) { @@ -200,13 +214,13 @@ impl Player { Ok(()) } - fn mouse_click( + async fn mouse_click( &self, opened_container: Option<&mut Box>, mouse_click: MouseClick, slot: container_click::Slot, ) -> Result<(), InventoryError> { - let mut inventory = self.inventory.lock(); + let mut inventory = self.inventory.lock().await; let mut container = OptionallyCombinedContainer::new(&mut inventory, opened_container); match slot { @@ -220,12 +234,12 @@ impl Player { } } - fn shift_mouse_click( + async fn shift_mouse_click( &self, opened_container: Option<&mut Box>, slot: container_click::Slot, ) -> Result<(), InventoryError> { - let mut inventory = self.inventory.lock(); + let mut inventory = self.inventory.lock().await; let mut container = OptionallyCombinedContainer::new(&mut inventory, opened_container); match slot { @@ -267,7 +281,7 @@ impl Player { Ok(()) } - fn number_button_pressed( + async fn number_button_pressed( &self, opened_container: Option<&mut Box>, key_click: KeyClick, @@ -277,7 +291,7 @@ impl Player { KeyClick::Slot(slot) => slot, KeyClick::Offhand => 45, }; - let mut inventory = self.inventory.lock(); + let mut inventory = self.inventory.lock().await; let mut changing_item_slot = inventory.get_slot(changing_slot as usize)?.to_owned(); let mut container = OptionallyCombinedContainer::new(&mut inventory, opened_container); @@ -286,7 +300,7 @@ impl Player { Ok(()) } - fn creative_pick_item( + async fn creative_pick_item( &self, opened_container: Option<&mut Box>, slot: usize, @@ -294,7 +308,7 @@ impl Player { if self.gamemode.load() != GameMode::Creative { return Err(InventoryError::PermissionError); } - let mut inventory = self.inventory.lock(); + let mut inventory = self.inventory.lock().await; let mut container = OptionallyCombinedContainer::new(&mut inventory, opened_container); if let Some(Some(item)) = container.all_slots().get_mut(slot) { self.carried_item.store(Some(item.to_owned())); @@ -302,12 +316,12 @@ impl Player { Ok(()) } - fn double_click( + async fn double_click( &self, opened_container: Option<&mut Box>, slot: usize, ) -> Result<(), InventoryError> { - let mut inventory = self.inventory.lock(); + let mut inventory = self.inventory.lock().await; let mut container = OptionallyCombinedContainer::new(&mut inventory, opened_container); let mut slots = container.all_slots(); @@ -340,7 +354,7 @@ impl Player { Ok(()) } - fn mouse_drag( + async fn mouse_drag( &self, drag_handler: &DragHandler, opened_container: Option<&mut Box>, @@ -357,20 +371,21 @@ impl Player { { Err(InventoryError::PermissionError)? } - drag_handler.new_drag(container_id, player_id, drag_type) + drag_handler + .new_drag(container_id, player_id, drag_type) + .await + } + MouseDragState::AddSlot(slot) => { + drag_handler.add_slot(container_id, player_id, slot).await } - MouseDragState::AddSlot(slot) => drag_handler.add_slot(container_id, player_id, slot), MouseDragState::End => { - let mut inventory = self.inventory.lock(); + let mut inventory = self.inventory.lock().await; let mut container = OptionallyCombinedContainer::new(&mut inventory, opened_container); let mut carried_item = self.carried_item.load(); - let res = drag_handler.apply_drag( - &mut carried_item, - &mut container, - &container_id, - player_id, - ); + let res = drag_handler + .apply_drag(&mut carried_item, &mut container, &container_id, player_id) + .await; self.carried_item.store(carried_item); res } @@ -379,7 +394,7 @@ impl Player { async fn get_current_players_in_container(&self, server: &Server) -> Vec> { let player_ids = { - let open_containers = server.open_containers.read(); + let open_containers = server.open_containers.read().await; open_containers .get(&self.open_container.load().unwrap()) .unwrap() @@ -399,6 +414,7 @@ impl Player { .world .current_players .lock() + .await .iter() .filter_map(|(token, player)| { if *token != player_token { @@ -423,7 +439,7 @@ impl Player { slot: Slot, ) -> Result<(), InventoryError> { for player in self.get_current_players_in_container(server).await { - let inventory = player.inventory.lock(); + let inventory = player.inventory.lock().await; let total_opened_containers = inventory.total_opened_containers; // Returns previous value @@ -436,7 +452,7 @@ impl Player { slot_index, &slot, ); - player.client.send_packet(&packet); + player.client.send_packet(&packet).await; } Ok(()) } @@ -446,15 +462,17 @@ impl Player { for player in players { let container = player.get_open_container(server); - let mut container = container.as_ref().map(|v| v.lock()); - player.set_container_content(container.as_deref_mut()); + let container = container.as_ref().map(|v| v.lock()); + // player.set_container_content(container.as_deref_mut()); } Ok(()) } - pub fn get_open_container(&self, server: &Server) -> Option>>> { - self.open_container - .load() - .map_or_else(|| None, |id| server.try_get_container(self.entity_id(), id)) + pub fn get_open_container( + &self, + server: &Server, + ) -> Option>>> { + // self.open_container .load().map_or_else(|| None, |id| server.try_get_container(self.entity_id(), id).await) + None } } diff --git a/pumpkin/src/client/mod.rs b/pumpkin/src/client/mod.rs index b2bb4f47b..6d2476185 100644 --- a/pumpkin/src/client/mod.rs +++ b/pumpkin/src/client/mod.rs @@ -1,6 +1,5 @@ use std::{ collections::VecDeque, - io::{self, Write}, net::SocketAddr, sync::{ atomic::{AtomicBool, AtomicI32}, @@ -15,8 +14,6 @@ use crate::{ use authentication::GameProfile; use crossbeam::atomic::AtomicCell; -use mio::{event::Event, net::TcpStream}; -use parking_lot::Mutex; use pumpkin_config::compression::CompressionInfo; use pumpkin_core::text::TextComponent; use pumpkin_protocol::{ @@ -32,8 +29,9 @@ use pumpkin_protocol::{ }, ClientPacket, ConnectionState, PacketError, RawPacket, ServerPacket, }; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::sync::Mutex; -use std::io::Read; use thiserror::Error; pub mod authentication; @@ -105,7 +103,9 @@ pub struct Client { /// A unique id identifying the client. pub id: usize, /// The underlying TCP connection to the client. - pub connection: Arc>, + pub connection_reader: Arc>, + pub connection_writer: Arc>, + /// The client's IP address. pub address: Mutex, /// The packet encoder for outgoing packets. @@ -126,10 +126,11 @@ pub struct Client { impl Client { pub fn new( id: usize, - connection: TcpStream, + connection: tokio::net::TcpStream, address: SocketAddr, keep_alive_sender: Arc>, ) -> Self { + let (connection_reader, connection_writer) = connection.into_split(); Self { protocol_version: AtomicI32::new(0), gameprofile: Mutex::new(None), @@ -139,7 +140,8 @@ impl Client { id, address: Mutex::new(address), connection_state: AtomicCell::new(ConnectionState::HandShake), - connection: Arc::new(Mutex::new(connection)), + connection_reader: Arc::new(Mutex::new(connection_reader)), + connection_writer: Arc::new(Mutex::new(connection_writer)), enc: Arc::new(Mutex::new(PacketEncoder::default())), dec: Arc::new(Mutex::new(PacketDecoder::default())), encryption: AtomicBool::new(false), @@ -152,13 +154,13 @@ impl Client { } /// Adds a Incoming packet to the queue - pub fn add_packet(&self, packet: RawPacket) { - let mut client_packets_queue = self.client_packets_queue.lock(); + pub async fn add_packet(&self, packet: RawPacket) { + let mut client_packets_queue = self.client_packets_queue.lock().await; client_packets_queue.push_back(packet); } /// Sets the Packet encryption - pub fn set_encryption( + pub async fn set_encryption( &self, shared_secret: Option<&[u8]>, // decrypted ) -> Result<(), EncryptionError> { @@ -168,55 +170,64 @@ impl Client { let crypt_key: [u8; 16] = shared_secret .try_into() .map_err(|_| EncryptionError::SharedWrongLength)?; - self.dec.lock().set_encryption(Some(&crypt_key)); - self.enc.lock().set_encryption(Some(&crypt_key)); + self.dec.lock().await.set_encryption(Some(&crypt_key)); + self.enc.lock().await.set_encryption(Some(&crypt_key)); } else { - self.dec.lock().set_encryption(None); - self.enc.lock().set_encryption(None); + self.dec.lock().await.set_encryption(None); + self.enc.lock().await.set_encryption(None); } Ok(()) } /// Sets the Packet compression - pub fn set_compression(&self, compression: Option) { - self.dec.lock().set_compression(compression.is_some()); - self.enc.lock().set_compression(compression); + pub async fn set_compression(&self, compression: Option) { + self.dec.lock().await.set_compression(compression.is_some()); + self.enc.lock().await.set_compression(compression); } /// Send a Clientbound Packet to the Client - pub fn send_packet(&self, packet: &P) { + pub async fn send_packet(&self, packet: &P) { // assert!(!self.closed); - let mut enc = self.enc.lock(); - enc.append_packet(packet) - .unwrap_or_else(|e| self.kick(&e.to_string())); - self.connection + let mut enc = self.enc.lock().await; + if let Err(error) = enc.append_packet(packet) { + self.kick(&error.to_string()).await; + return; + } + if let Err(error) = self + .connection_writer .lock() + .await .write_all(&enc.take()) - .map_err(|e| PacketError::ConnectionWrite(e.to_string())) - .unwrap_or_else(|e| self.kick(&e.to_string())); + .await + .map_err(|_| PacketError::ConnectionWrite) + { + self.kick(&error.to_string()).await; + } } - pub fn try_send_packet(&self, packet: &P) -> Result<(), PacketError> { + pub async fn try_send_packet(&self, packet: &P) -> Result<(), PacketError> { // assert!(!self.closed); - let mut enc = self.enc.lock(); + let mut enc = self.enc.lock().await; enc.append_packet(packet)?; - self.connection + self.connection_writer .lock() + .await .write_all(&enc.take()) - .map_err(|e| PacketError::ConnectionWrite(e.to_string()))?; + .await + .map_err(|_| PacketError::ConnectionWrite)?; Ok(()) } /// Processes all packets send by the client pub async fn process_packets(&self, server: &Arc) { - while let Some(mut packet) = self.client_packets_queue.lock().pop_front() { - let _ = self.handle_packet(server, &mut packet).await.map_err(|e| { + while let Some(mut packet) = self.client_packets_queue.lock().await.pop_front() { + if let Err(error) = self.handle_packet(server, &mut packet).await { dbg!("{:?}", packet.id); - let text = format!("Error while reading incoming packet {}", e); + let text = format!("Error while reading incoming packet {}", error); log::error!("{}", text); - self.kick(&text) - }); + self.kick(&text).await + }; } } @@ -227,8 +238,12 @@ impl Client { packet: &mut RawPacket, ) -> Result<(), DeserializerError> { match self.connection_state.load() { - pumpkin_protocol::ConnectionState::HandShake => self.handle_handshake_packet(packet), - pumpkin_protocol::ConnectionState::Status => self.handle_status_packet(server, packet), + pumpkin_protocol::ConnectionState::HandShake => { + self.handle_handshake_packet(packet).await + } + pumpkin_protocol::ConnectionState::Status => { + self.handle_status_packet(server, packet).await + } // TODO: Check config if transfer is enabled pumpkin_protocol::ConnectionState::Login | pumpkin_protocol::ConnectionState::Transfer => { @@ -244,11 +259,14 @@ impl Client { } } - fn handle_handshake_packet(&self, packet: &mut RawPacket) -> Result<(), DeserializerError> { + async fn handle_handshake_packet( + &self, + packet: &mut RawPacket, + ) -> Result<(), DeserializerError> { let bytebuf = &mut packet.bytebuf; match packet.id.0 { SHandShake::PACKET_ID => { - self.handle_handshake(SHandShake::read(bytebuf)?); + self.handle_handshake(SHandShake::read(bytebuf)?).await; Ok(()) } _ => { @@ -261,7 +279,7 @@ impl Client { } } - fn handle_status_packet( + async fn handle_status_packet( &self, server: &Arc, packet: &mut RawPacket, @@ -269,11 +287,13 @@ impl Client { let bytebuf = &mut packet.bytebuf; match packet.id.0 { SStatusRequest::PACKET_ID => { - self.handle_status_request(server, SStatusRequest::read(bytebuf)?); + self.handle_status_request(server, SStatusRequest::read(bytebuf)?) + .await; Ok(()) } SStatusPingRequest::PACKET_ID => { - self.handle_ping_request(SStatusPingRequest::read(bytebuf)?); + self.handle_ping_request(SStatusPingRequest::read(bytebuf)?) + .await; Ok(()) } _ => { @@ -294,7 +314,8 @@ impl Client { let bytebuf = &mut packet.bytebuf; match packet.id.0 { SLoginStart::PACKET_ID => { - self.handle_login_start(server, SLoginStart::read(bytebuf)?); + self.handle_login_start(server, SLoginStart::read(bytebuf)?) + .await; Ok(()) } SEncryptionResponse::PACKET_ID => { @@ -303,11 +324,13 @@ impl Client { Ok(()) } SLoginPluginResponse::PACKET_ID => { - self.handle_plugin_response(SLoginPluginResponse::read(bytebuf)?); + self.handle_plugin_response(SLoginPluginResponse::read(bytebuf)?) + .await; Ok(()) } SLoginAcknowledged::PACKET_ID => { - self.handle_login_acknowledged(server, SLoginAcknowledged::read(bytebuf)?); + self.handle_login_acknowledged(server, SLoginAcknowledged::read(bytebuf)?) + .await; Ok(()) } _ => { @@ -328,11 +351,13 @@ impl Client { let bytebuf = &mut packet.bytebuf; match packet.id.0 { SClientInformationConfig::PACKET_ID => { - self.handle_client_information_config(SClientInformationConfig::read(bytebuf)?); + self.handle_client_information_config(SClientInformationConfig::read(bytebuf)?) + .await; Ok(()) } SPluginMessage::PACKET_ID => { - self.handle_plugin_message(SPluginMessage::read(bytebuf)?); + self.handle_plugin_message(SPluginMessage::read(bytebuf)?) + .await; Ok(()) } SAcknowledgeFinishConfig::PACKET_ID => { @@ -341,7 +366,8 @@ impl Client { Ok(()) } SKnownPacks::PACKET_ID => { - self.handle_known_packs(server, SKnownPacks::read(bytebuf)?); + self.handle_known_packs(server, SKnownPacks::read(bytebuf)?) + .await; Ok(()) } _ => { @@ -356,70 +382,56 @@ impl Client { /// Reads the connection until our buffer of len 4096 is full, then decode /// Close connection when an error occurs or when the Client closed the connection - pub async fn poll(&self, event: &Event) { - if event.is_readable() { - let mut received_data = vec![]; - let mut buf = [0; 4096]; - loop { - let connection = self.connection.clone(); - let mut connection = connection.lock(); - match connection.read(&mut buf) { - Ok(0) => { - // Reading 0 bytes means the other side has closed the - // connection or is done writing, then so are we. - self.close(); - break; - } - Ok(n) => received_data.extend(&buf[..n]), - // Would block "errors" are the OS's way of saying that the - // connection is not actually ready to perform this I/O operation. - Err(ref err) if would_block(err) => break, - Err(ref err) if interrupted(err) => continue, - // Other errors we'll consider fatal. - Err(_) => self.close(), - } - } + pub async fn poll(&self) { + loop { + let mut dec = self.dec.lock().await; + if let Ok(Some(packet)) = dec.decode() { + self.add_packet(packet).await; + return; + }; + + dec.reserve(4096); + let mut buf = dec.take_capacity(); - if !received_data.is_empty() { - let mut dec = self.dec.lock(); - dec.queue_slice(&received_data); - loop { - match dec.decode() { - Ok(packet) => { - if let Some(packet) = packet { - self.add_packet(packet); - } else { - break; - } - } - Err(err) => { - self.kick(&err.to_string()); - break; - } - } - } - dec.clear(); + if self + .connection_reader + .lock() + .await + .read_buf(&mut buf) + .await + .unwrap() + == 0 + { + self.close(); + return; } + + // This should always be an O(1) unsplit because we reserved space earlier and + // the call to `read_buf` shouldn't have grown the allocation. + dec.queue_bytes(buf); } } /// Kicks the Client with a reason depending on the connection state - pub fn kick(&self, reason: &str) { + pub async fn kick(&self, reason: &str) { dbg!(reason); match self.connection_state.load() { ConnectionState::Login => { self.try_send_packet(&CLoginDisconnect::new( &serde_json::to_string_pretty(&reason).unwrap_or_else(|_| "".into()), )) + .await .unwrap_or_else(|_| self.close()); } ConnectionState::Config => { self.try_send_packet(&CConfigDisconnect::new(reason)) + .await .unwrap_or_else(|_| self.close()); } // This way players get kicked when players using client functions (e.g. poll, send_packet) ConnectionState::Play => { self.try_send_packet(&CPlayDisconnect::new(&TextComponent::text(reason))) + .await .unwrap_or_else(|_| self.close()); } _ => { @@ -443,11 +455,3 @@ pub enum EncryptionError { #[error("shared secret has the wrong length")] SharedWrongLength, } - -fn would_block(err: &io::Error) -> bool { - err.kind() == io::ErrorKind::WouldBlock -} - -pub fn interrupted(err: &io::Error) -> bool { - err.kind() == io::ErrorKind::Interrupted -} diff --git a/pumpkin/src/client/player_packet.rs b/pumpkin/src/client/player_packet.rs index cfc9526ac..c7b9e5a92 100644 --- a/pumpkin/src/client/player_packet.rs +++ b/pumpkin/src/client/player_packet.rs @@ -41,8 +41,8 @@ fn modulus(a: f32, b: f32) -> f32 { /// Handles all Play Packets send by a real Player /// NEVER TRUST THE CLIENT. HANDLE EVERY ERROR, UNWRAP/EXPECT ARE FORBIDDEN impl Player { - pub fn handle_confirm_teleport(&self, confirm_teleport: SConfirmTeleport) { - let mut awaiting_teleport = self.awaiting_teleport.lock(); + pub async fn handle_confirm_teleport(&self, confirm_teleport: SConfirmTeleport) { + let mut awaiting_teleport = self.awaiting_teleport.lock().await; if let Some((id, position)) = awaiting_teleport.as_ref() { if id == &confirm_teleport.teleport_id { // we should set the pos now to that we requested in the teleport packet, Is may fixed issues when the client sended position packets while being teleported @@ -52,12 +52,13 @@ impl Player { *awaiting_teleport = None; } else { - self.kick(TextComponent::text("Wrong teleport id")) + self.kick(TextComponent::text("Wrong teleport id")).await } } else { self.kick(TextComponent::text( "Send Teleport confirm, but we did not teleport", )) + .await } } @@ -71,7 +72,7 @@ impl Player { pub async fn handle_position(&self, position: SPlayerPosition) { if position.x.is_nan() || position.feet_y.is_nan() || position.z.is_nan() { - self.kick(TextComponent::text("Invalid movement")); + self.kick(TextComponent::text("Invalid movement")).await; return; } let entity = &self.living_entity.entity; @@ -104,17 +105,19 @@ impl Player { // return; // } // send new position to all other players - world.broadcast_packet_expect( - &[self.client.id], - &CUpdateEntityPos::new( - entity_id.into(), - x.mul_add(4096.0, -(lastx * 4096.0)) as i16, - y.mul_add(4096.0, -(lasty * 4096.0)) as i16, - z.mul_add(4096.0, -(lastz * 4096.0)) as i16, - position.ground, - ), - ); - player_chunker::update_position(entity, self).await; + world + .broadcast_packet_expect( + &[self.client.id], + &CUpdateEntityPos::new( + entity_id.into(), + x.mul_add(4096.0, -(lastx * 4096.0)) as i16, + y.mul_add(4096.0, -(lasty * 4096.0)) as i16, + z.mul_add(4096.0, -(lastz * 4096.0)) as i16, + position.ground, + ), + ) + .await; + player_chunker::update_position(self).await; } pub async fn handle_position_rotation(&self, position_rotation: SPlayerPositionRotation) { @@ -122,11 +125,11 @@ impl Player { || position_rotation.feet_y.is_nan() || position_rotation.z.is_nan() { - self.kick(TextComponent::text("Invalid movement")); + self.kick(TextComponent::text("Invalid movement")).await; return; } if position_rotation.yaw.is_infinite() || position_rotation.pitch.is_infinite() { - self.kick(TextComponent::text("Invalid rotation")); + self.kick(TextComponent::text("Invalid rotation")).await; return; } let entity = &self.living_entity.entity; @@ -170,28 +173,32 @@ impl Player { // } // send new position to all other players - world.broadcast_packet_expect( - &[self.client.id], - &CUpdateEntityPosRot::new( - entity_id.into(), - x.mul_add(4096.0, -(lastx * 4096.0)) as i16, - y.mul_add(4096.0, -(lasty * 4096.0)) as i16, - z.mul_add(4096.0, -(lastz * 4096.0)) as i16, - yaw as u8, - pitch as u8, - position_rotation.ground, - ), - ); - world.broadcast_packet_expect( - &[self.client.id], - &CHeadRot::new(entity_id.into(), yaw as u8), - ); - player_chunker::update_position(entity, self).await; + world + .broadcast_packet_expect( + &[self.client.id], + &CUpdateEntityPosRot::new( + entity_id.into(), + x.mul_add(4096.0, -(lastx * 4096.0)) as i16, + y.mul_add(4096.0, -(lasty * 4096.0)) as i16, + z.mul_add(4096.0, -(lastz * 4096.0)) as i16, + yaw as u8, + pitch as u8, + position_rotation.ground, + ), + ) + .await; + world + .broadcast_packet_expect( + &[self.client.id], + &CHeadRot::new(entity_id.into(), yaw as u8), + ) + .await; + player_chunker::update_position(self).await; } pub async fn handle_rotation(&self, rotation: SPlayerRotation) { if !rotation.yaw.is_finite() || !rotation.pitch.is_finite() { - self.kick(TextComponent::text("Invalid rotation")); + self.kick(TextComponent::text("Invalid rotation")).await; return; } let entity = &self.living_entity.entity; @@ -211,18 +218,24 @@ impl Player { let world = &entity.world; let packet = CUpdateEntityRot::new(entity_id.into(), yaw as u8, pitch as u8, rotation.ground); - world.broadcast_packet_expect(&[self.client.id], &packet); + world + .broadcast_packet_expect(&[self.client.id], &packet) + .await; let packet = CHeadRot::new(entity_id.into(), yaw as u8); - world.broadcast_packet_expect(&[self.client.id], &packet); + world + .broadcast_packet_expect(&[self.client.id], &packet) + .await; } - pub fn handle_chat_command(self: &Arc, server: &Server, command: SChatCommand) { + pub async fn handle_chat_command(self: &Arc, server: &Server, command: SChatCommand) { let dispatcher = server.command_dispatcher.clone(); - dispatcher.handle_command( - &mut CommandSender::Player(self.clone()), - server, - &command.command, - ); + dispatcher + .handle_command( + &mut CommandSender::Player(self.clone()), + server, + &command.command, + ) + .await; if ADVANCED_CONFIG.commands.log_console { log::info!( "Player ({}): executed command /{}", @@ -284,6 +297,7 @@ impl Player { } } else { self.kick(TextComponent::text("Invalid player command")) + .await } } @@ -296,13 +310,15 @@ impl Player { }; let id = self.entity_id(); let world = &self.living_entity.entity.world; - world.broadcast_packet_expect( - &[self.client.id], - &CEntityAnimation::new(id.into(), animation as u8), - ) + world + .broadcast_packet_expect( + &[self.client.id], + &CEntityAnimation::new(id.into(), animation as u8), + ) + .await } None => { - self.kick(TextComponent::text("Invalid hand")); + self.kick(TextComponent::text("Invalid hand")).await; } }; } @@ -312,7 +328,7 @@ impl Player { let message = chat_message.message; if message.len() > 256 { - self.kick(TextComponent::text("Oversized message")); + self.kick(TextComponent::text("Oversized message")).await; return; } @@ -321,20 +337,22 @@ impl Player { let entity = &self.living_entity.entity; let world = &entity.world; - world.broadcast_packet_all(&CPlayerChatMessage::new( - gameprofile.id, - 1.into(), - chat_message.signature.as_deref(), - &message, - chat_message.timestamp, - chat_message.salt, - &[], - Some(TextComponent::text(&message)), - FilterType::PassThrough, - 1.into(), - TextComponent::text(&gameprofile.name), - None, - )) + world + .broadcast_packet_all(&CPlayerChatMessage::new( + gameprofile.id, + 1.into(), + chat_message.signature.as_deref(), + &message, + chat_message.timestamp, + chat_message.salt, + &[], + Some(TextComponent::text(&message)), + FilterType::PassThrough, + 1.into(), + TextComponent::text(&gameprofile.name), + None, + )) + .await /* server.broadcast_packet( self, @@ -347,12 +365,12 @@ impl Player { ) */ } - pub fn handle_client_information_play(&self, client_information: SClientInformationPlay) { + pub async fn handle_client_information_play(&self, client_information: SClientInformationPlay) { if let (Some(main_hand), Some(chat_mode)) = ( Hand::from_i32(client_information.main_hand.into()), ChatMode::from_i32(client_information.chat_mode.into()), ) { - *self.config.lock() = PlayerConfig { + *self.config.lock().await = PlayerConfig { locale: client_information.locale, view_distance: client_information.view_distance, chat_mode, @@ -364,6 +382,7 @@ impl Player { }; } else { self.kick(TextComponent::text("Invalid hand or chat type")) + .await } } @@ -381,7 +400,8 @@ impl Player { let config = &ADVANCED_CONFIG.pvp; if config.enabled { let world = &entity.world; - let attacked_player = world.get_player_by_entityid(entity_id.0 as EntityId); + let attacked_player = + world.get_player_by_entityid(entity_id.0 as EntityId).await; if let Some(player) = attacked_player { let victem_entity = &player.living_entity.entity; if config.protect_creative @@ -411,17 +431,20 @@ impl Player { .store(velocity.multiply(0.6, 1.0, 0.6)); victem_entity.velocity.store(saved_velo); - player.client.send_packet(packet); + player.client.send_packet(packet).await; } if config.hurt_animation { - world.broadcast_packet_all(&CHurtAnimation::new( - &entity_id, - entity.yaw.load(), - )) + world + .broadcast_packet_all(&CHurtAnimation::new( + &entity_id, + entity.yaw.load(), + )) + .await } if config.swing {} } else { self.kick(TextComponent::text("Interacted with invalid entity id")) + .await } } } @@ -432,7 +455,7 @@ impl Player { dbg!("todo"); } }, - None => self.kick(TextComponent::text("Invalid action type")), + None => self.kick(TextComponent::text("Invalid action type")).await, } } pub async fn handle_player_action(&self, player_action: SPlayerAction) { @@ -451,9 +474,13 @@ impl Player { // TODO: currently this is always dirt replace it let entity = &self.living_entity.entity; let world = &entity.world; - world.broadcast_packet_all(&CWorldEvent::new(2001, &location, 11, false)); + world + .broadcast_packet_all(&CWorldEvent::new(2001, &location, 11, false)) + .await; // AIR - world.broadcast_packet_all(&CBlockUpdate::new(&location, 0.into())); + world + .broadcast_packet_all(&CBlockUpdate::new(&location, 0.into())) + .await; } } Status::CancelledDigging => { @@ -475,12 +502,17 @@ impl Player { // TODO: currently this is always dirt replace it let entity = &self.living_entity.entity; let world = &entity.world; - world.broadcast_packet_all(&CWorldEvent::new(2001, &location, 11, false)); + world + .broadcast_packet_all(&CWorldEvent::new(2001, &location, 11, false)) + .await; // AIR - world.broadcast_packet_all(&CBlockUpdate::new(&location, 0.into())); + world + .broadcast_packet_all(&CBlockUpdate::new(&location, 0.into())) + .await; // TODO: Send this every tick self.client - .send_packet(&CAcknowledgeBlockChange::new(player_action.sequence)); + .send_packet(&CAcknowledgeBlockChange::new(player_action.sequence)) + .await; } Status::DropItemStack => { dbg!("todo"); @@ -495,13 +527,14 @@ impl Player { dbg!("todo"); } }, - None => self.kick(TextComponent::text("Invalid status")), + None => self.kick(TextComponent::text("Invalid status")).await, } } - pub fn handle_play_ping_request(&self, request: SPlayPingRequest) { + pub async fn handle_play_ping_request(&self, request: SPlayPingRequest) { self.client - .send_packet(&CPingResponse::new(request.payload)); + .send_packet(&CPingResponse::new(request.payload)) + .await; } pub async fn handle_use_item_on(&self, use_item_on: SUseItemOn) { @@ -513,7 +546,7 @@ impl Player { } if let Some(face) = BlockFace::from_i32(use_item_on.face.0) { - if let Some(item) = self.inventory.lock().held_item() { + if let Some(item) = self.inventory.lock().await.held_item() { let minecraft_id = global_registry::find_minecraft_id( global_registry::ITEM_REGISTRY, item.item_id, @@ -522,20 +555,25 @@ impl Player { if let Ok(block_state_id) = BlockState::new(minecraft_id, None) { let entity = &self.living_entity.entity; let world = &entity.world; - world.broadcast_packet_all(&CBlockUpdate::new( - &location, - block_state_id.get_id_mojang_repr().into(), - )); - world.broadcast_packet_all(&CBlockUpdate::new( - &WorldPosition(location.0 + face.to_offset()), - block_state_id.get_id_mojang_repr().into(), - )); + world + .broadcast_packet_all(&CBlockUpdate::new( + &location, + block_state_id.get_id_mojang_repr().into(), + )) + .await; + world + .broadcast_packet_all(&CBlockUpdate::new( + &WorldPosition(location.0 + face.to_offset()), + block_state_id.get_id_mojang_repr().into(), + )) + .await; } } self.client - .send_packet(&CAcknowledgeBlockChange::new(use_item_on.sequence)); + .send_packet(&CAcknowledgeBlockChange::new(use_item_on.sequence)) + .await; } else { - self.kick(TextComponent::text("Invalid block face")) + self.kick(TextComponent::text("Invalid block face")).await } } @@ -544,42 +582,49 @@ impl Player { log::error!("An item was used(SUseItem), but the packet is not implemented yet"); } - pub fn handle_set_held_item(&self, held: SSetHeldItem) { + pub async fn handle_set_held_item(&self, held: SSetHeldItem) { let slot = held.slot; if !(0..=8).contains(&slot) { - self.kick(TextComponent::text("Invalid held slot")) + self.kick(TextComponent::text("Invalid held slot")).await; + return; } - self.inventory.lock().set_selected(slot as usize); + self.inventory.lock().await.set_selected(slot as usize); } - pub fn handle_set_creative_slot(&self, packet: SSetCreativeSlot) -> Result<(), InventoryError> { + pub async fn handle_set_creative_slot( + &self, + packet: SSetCreativeSlot, + ) -> Result<(), InventoryError> { if self.gamemode.load() != GameMode::Creative { return Err(InventoryError::PermissionError); } - self.inventory - .lock() - .set_slot(packet.slot as usize, packet.clicked_item.to_item(), false) + self.inventory.lock().await.set_slot( + packet.slot as usize, + packet.clicked_item.to_item(), + false, + ) } // TODO: // This function will in the future be used to keep track of if the client is in a valid state. // But this is not possible yet - pub fn handle_close_container(&self, server: &Server, packet: SCloseContainer) { + pub async fn handle_close_container(&self, server: &Server, packet: SCloseContainer) { // window_id 0 represents both 9x1 Generic AND inventory here self.inventory .lock() + .await .state_id .store(0, std::sync::atomic::Ordering::Relaxed); let open_container = self.open_container.load(); if let Some(id) = open_container { - let mut open_containers = server.open_containers.write(); + let mut open_containers = server.open_containers.write().await; if let Some(container) = open_containers.get_mut(&id) { container.remove_player(self.entity_id()) } self.open_container.store(None); } let Some(_window_type) = WindowType::from_u8(packet.window_id) else { - self.kick(TextComponent::text("Invalid window ID")); + self.kick(TextComponent::text("Invalid window ID")).await; return; }; } diff --git a/pumpkin/src/commands/cmd_echest.rs b/pumpkin/src/commands/cmd_echest.rs index e793ccb8a..a279e908a 100644 --- a/pumpkin/src/commands/cmd_echest.rs +++ b/pumpkin/src/commands/cmd_echest.rs @@ -1,5 +1,3 @@ -use pumpkin_inventory::OpenContainer; - use crate::commands::tree::CommandTree; const NAMES: [&str; 2] = ["echest", "enderchest"]; @@ -7,24 +5,26 @@ const NAMES: [&str; 2] = ["echest", "enderchest"]; const DESCRIPTION: &str = "Show your personal enderchest (this command is used for testing container behaviour)"; +#[allow(unused_variables)] + pub fn init_command_tree<'a>() -> CommandTree<'a> { CommandTree::new(NAMES, DESCRIPTION).execute(&|sender, server, _| { if let Some(player) = sender.as_mut_player() { let entity_id = player.entity_id(); - player.open_container.store(Some(0)); - { - let mut open_containers = server.open_containers.write(); - match open_containers.get_mut(&0) { - Some(ender_chest) => { - ender_chest.add_player(entity_id); - } - None => { - let open_container = OpenContainer::empty(entity_id); - open_containers.insert(0, open_container); - } - } - } - player.open_container(server, "minecraft:generic_9x3"); + // player.open_container.store(Some(0)); + // { + // let mut open_containers = server.open_containers.write().await; + // match open_containers.get_mut(&0) { + // Some(ender_chest) => { + // ender_chest.add_player(entity_id); + // } + // None => { + // let open_container = OpenContainer::empty(entity_id); + // open_containers.insert(0, open_container); + // } + // } + // } + // player.open_container(server, "minecraft:generic_9x3"); } Ok(()) diff --git a/pumpkin/src/commands/cmd_gamemode.rs b/pumpkin/src/commands/cmd_gamemode.rs index f86bc9771..f81a01c47 100644 --- a/pumpkin/src/commands/cmd_gamemode.rs +++ b/pumpkin/src/commands/cmd_gamemode.rs @@ -1,7 +1,6 @@ use std::str::FromStr; use num_traits::FromPrimitive; -use pumpkin_core::text::TextComponent; use pumpkin_core::GameMode; use crate::commands::arg_player::{consume_arg_player, parse_arg_player}; @@ -72,24 +71,24 @@ pub fn init_command_tree<'a>() -> CommandTree<'a> { require(&|sender| sender.is_player()).execute(&|sender, _, args| { let gamemode = parse_arg_gamemode(args)?; - return if let Player(target) = sender { + if let Player(target) = sender { if target.gamemode.load() == gamemode { - target.send_system_message(&TextComponent::text(&format!( - "You already in {:?} gamemode", - gamemode - ))); + // target.send_system_message(&TextComponent::text(&format!( + // "You already in {:?} gamemode", + // gamemode + // ))); } else { // TODO - target.set_gamemode(gamemode); - target.send_system_message(&TextComponent::text(&format!( - "Game mode was set to {:?}", - gamemode - ))); + // target.set_gamemode(gamemode); + // target.send_system_message(&TextComponent::text(&format!( + // "Game mode was set to {:?}", + // gamemode + // ))); } Ok(()) } else { Err(InvalidRequirementError) - }; + } }), ) .with_child(argument(ARG_TARGET, consume_arg_player).execute( @@ -98,17 +97,17 @@ pub fn init_command_tree<'a>() -> CommandTree<'a> { let target = parse_arg_player(sender, server, ARG_TARGET, args)?; if target.gamemode.load() == gamemode { - sender.send_message(TextComponent::text(&format!( - "{} is already in {:?} gamemode", - target.gameprofile.name, gamemode - ))); + // sender.send_message(TextComponent::text(&format!( + // "{} is already in {:?} gamemode", + // target.gameprofile.name, gamemode + // ))); } else { // TODO - target.set_gamemode(gamemode); - sender.send_message(TextComponent::text(&format!( - "{}'s Game mode was set to {:?}", - target.gameprofile.name, gamemode - ))); + // target.set_gamemode(gamemode); + // sender.send_message(TextComponent::text(&format!( + // "{}'s Game mode was set to {:?}", + // target.gameprofile.name, gamemode + // ))); } Ok(()) diff --git a/pumpkin/src/commands/cmd_help.rs b/pumpkin/src/commands/cmd_help.rs index 18dddc6e7..9b6ed222c 100644 --- a/pumpkin/src/commands/cmd_help.rs +++ b/pumpkin/src/commands/cmd_help.rs @@ -4,7 +4,6 @@ use crate::commands::tree::{Command, CommandTree, ConsumedArgs, RawArgs}; use crate::commands::tree_builder::argument; use crate::commands::CommandSender; use crate::server::Server; -use pumpkin_core::text::TextComponent; const NAMES: [&str; 3] = ["help", "h", "?"]; @@ -37,18 +36,20 @@ fn parse_arg_command<'a>( .map_err(|_| InvalidConsumptionError(Some(command_name.into()))) } +#[allow(unused_variables)] + pub fn init_command_tree<'a>() -> CommandTree<'a> { CommandTree::new(NAMES, DESCRIPTION) .with_child( argument(ARG_COMMAND, consume_arg_command).execute(&|sender, server, args| { let tree = parse_arg_command(args, &server.command_dispatcher)?; - sender.send_message(TextComponent::text(&format!( - "{} - {} Usage: {}", - tree.names.join("/"), - tree.description, - tree - ))); + // sender.send_message(TextComponent::text(&format!( + // "{} - {} Usage: {}", + // tree.names.join("/"), + // tree.description, + // tree + // ))); Ok(()) }), @@ -62,12 +63,12 @@ pub fn init_command_tree<'a>() -> CommandTree<'a> { continue; }; - sender.send_message(TextComponent::text(&format!( - "{} - {} Usage: {}", - tree.names.join("/"), - tree.description, - tree - ))); + // sender.send_message(TextComponent::text(&format!( + // "{} - {} Usage: {}", + // tree.names.join("/"), + // tree.description, + // tree + // ))); } Ok(()) diff --git a/pumpkin/src/commands/cmd_kick.rs b/pumpkin/src/commands/cmd_kick.rs index 43a24a27a..783e4516f 100644 --- a/pumpkin/src/commands/cmd_kick.rs +++ b/pumpkin/src/commands/cmd_kick.rs @@ -1,7 +1,6 @@ use crate::commands::arg_player::parse_arg_player; use crate::commands::tree::CommandTree; use crate::commands::tree_builder::argument; -use pumpkin_core::text::{color::NamedColor, TextComponent}; use super::arg_player::consume_arg_player; @@ -10,16 +9,17 @@ const DESCRIPTION: &str = "Kicks the target player from the server."; const ARG_TARGET: &str = "target"; +#[expect(unused)] + pub fn init_command_tree<'a>() -> CommandTree<'a> { CommandTree::new(NAMES, DESCRIPTION).with_child( argument(ARG_TARGET, consume_arg_player).execute(&|sender, server, args| { - dbg!("aa"); let target = parse_arg_player(sender, server, ARG_TARGET, args)?; - target.kick(TextComponent::text("Kicked by an operator")); + // target.kick(TextComponent::text("Kicked by an operator")); - sender.send_message( - TextComponent::text("Player has been kicked.").color_named(NamedColor::Blue), - ); + // sender.send_message( + // TextComponent::text("Player has been kicked.").color_named(NamedColor::Blue), + // ); Ok(()) }), diff --git a/pumpkin/src/commands/cmd_kill.rs b/pumpkin/src/commands/cmd_kill.rs index cf4208652..f820a1f81 100644 --- a/pumpkin/src/commands/cmd_kill.rs +++ b/pumpkin/src/commands/cmd_kill.rs @@ -1,23 +1,24 @@ use crate::commands::arg_player::{consume_arg_player, parse_arg_player}; use crate::commands::tree::CommandTree; use crate::commands::tree_builder::argument; -use pumpkin_core::text::{color::NamedColor, TextComponent}; const NAMES: [&str; 1] = ["kill"]; const DESCRIPTION: &str = "Kills a target player."; const ARG_TARGET: &str = "target"; +#[expect(unused)] + pub fn init_command_tree<'a>() -> CommandTree<'a> { CommandTree::new(NAMES, DESCRIPTION).with_child( argument(ARG_TARGET, consume_arg_player).execute(&|sender, server, args| { // TODO parse entities not only players let target = parse_arg_player(sender, server, ARG_TARGET, args)?; - target.living_entity.kill(); + // target.living_entity.kill(); - sender.send_message( - TextComponent::text("Player has been killed.").color_named(NamedColor::Blue), - ); + // sender.send_message( + // TextComponent::text("Player has been killed.").color_named(NamedColor::Blue), + // ); Ok(()) }), diff --git a/pumpkin/src/commands/cmd_pumpkin.rs b/pumpkin/src/commands/cmd_pumpkin.rs index 0d8d7565f..29d4d73ce 100644 --- a/pumpkin/src/commands/cmd_pumpkin.rs +++ b/pumpkin/src/commands/cmd_pumpkin.rs @@ -1,21 +1,19 @@ -use crate::server::CURRENT_MC_VERSION; -use pumpkin_core::text::{color::NamedColor, TextComponent}; -use pumpkin_protocol::CURRENT_MC_PROTOCOL; - use crate::commands::tree::CommandTree; const NAMES: [&str; 1] = ["pumpkin"]; const DESCRIPTION: &str = "Display information about Pumpkin."; +#[expect(unused)] + pub fn init_command_tree<'a>() -> CommandTree<'a> { CommandTree::new(NAMES, DESCRIPTION).execute(&|sender, _, _| { let version = env!("CARGO_PKG_VERSION"); let description = env!("CARGO_PKG_DESCRIPTION"); - sender.send_message(TextComponent::text( - &format!("Pumpkin {version}, {description} (Minecraft {CURRENT_MC_VERSION}, Protocol {CURRENT_MC_PROTOCOL})") - ).color_named(NamedColor::Green)); + // sender.send_message(TextComponent::text( + // &format!("Pumpkin {version}, {description} (Minecraft {CURRENT_MC_VERSION}, Protocol {CURRENT_MC_PROTOCOL})") + // ).color_named(NamedColor::Green)).await; Ok(()) }) diff --git a/pumpkin/src/commands/cmd_stop.rs b/pumpkin/src/commands/cmd_stop.rs index 2d90ae707..8de9c09fd 100644 --- a/pumpkin/src/commands/cmd_stop.rs +++ b/pumpkin/src/commands/cmd_stop.rs @@ -1,6 +1,3 @@ -use pumpkin_core::text::color::NamedColor; -use pumpkin_core::text::TextComponent; - use crate::commands::tree::CommandTree; use crate::commands::tree_builder::require; @@ -10,9 +7,9 @@ const DESCRIPTION: &str = "Stop the server."; pub fn init_command_tree<'a>() -> CommandTree<'a> { CommandTree::new(NAMES, DESCRIPTION).with_child( - require(&|sender| sender.permission_lvl() >= 4).execute(&|sender, _, _args| { - sender - .send_message(TextComponent::text("Stopping Server").color_named(NamedColor::Red)); + require(&|sender| sender.permission_lvl() >= 4).execute(&|_sender, _, _args| { + // sender + // .send_message(TextComponent::text("Stopping Server").color_named(NamedColor::Red)); std::process::exit(0) }), ) diff --git a/pumpkin/src/commands/dispatcher.rs b/pumpkin/src/commands/dispatcher.rs index 889bec72c..304ae50e8 100644 --- a/pumpkin/src/commands/dispatcher.rs +++ b/pumpkin/src/commands/dispatcher.rs @@ -26,11 +26,14 @@ pub struct CommandDispatcher<'a> { /// Stores registered [CommandTree]s and dispatches commands to them. impl<'a> CommandDispatcher<'a> { - pub fn handle_command(&self, sender: &mut CommandSender, server: &Server, cmd: &str) { + pub async fn handle_command(&self, sender: &mut CommandSender<'a>, server: &Server, cmd: &str) { if let Err(err) = self.dispatch(sender, server, cmd) { - sender.send_message( - TextComponent::text(&err).color_named(pumpkin_core::text::color::NamedColor::Red), - ) + sender + .send_message( + TextComponent::text_string(err) + .color_named(pumpkin_core::text::color::NamedColor::Red), + ) + .await; } } diff --git a/pumpkin/src/commands/mod.rs b/pumpkin/src/commands/mod.rs index 84ddfdd68..53f270ba3 100644 --- a/pumpkin/src/commands/mod.rs +++ b/pumpkin/src/commands/mod.rs @@ -27,11 +27,10 @@ pub enum CommandSender<'a> { } impl<'a> CommandSender<'a> { - pub fn send_message(&mut self, text: TextComponent) { + pub async fn send_message(&mut self, text: TextComponent<'a>) { match self { - // TODO: add color and stuff to console CommandSender::Console => log::info!("{}", text.to_pretty_console()), - CommandSender::Player(c) => c.send_system_message(&text), + CommandSender::Player(c) => c.send_system_message(&text).await, CommandSender::Rcon(s) => s.push(text.to_pretty_console()), } } diff --git a/pumpkin/src/commands/tree.rs b/pumpkin/src/commands/tree.rs index 65407dbcb..ffadee1c4 100644 --- a/pumpkin/src/commands/tree.rs +++ b/pumpkin/src/commands/tree.rs @@ -39,6 +39,7 @@ pub enum Command<'a> { Alias(&'a str), } +#[expect(unused)] pub struct CommandTree<'a> { pub(crate) nodes: Vec>, pub(crate) children: Vec, diff --git a/pumpkin/src/entity/living.rs b/pumpkin/src/entity/living.rs index a6ae33ede..74b3e8b44 100644 --- a/pumpkin/src/entity/living.rs +++ b/pumpkin/src/entity/living.rs @@ -18,7 +18,7 @@ impl LivingEntity { } } - pub fn set_health(&self, health: f32) { + pub async fn set_health(&self, health: f32) { self.health.store(health); // tell everyone entities health changed self.entity @@ -26,21 +26,24 @@ impl LivingEntity { .broadcast_packet_all(&CSetEntityMetadata::new( self.entity.entity_id.into(), Metadata::new(9, 3.into(), health), - )); + )) + .await; } /// Kills the Entity /// /// This is similar to `kill` but Spawn Particles, Animation and plays death sound - pub fn kill(&self) { + pub async fn kill(&self) { // Spawns death smoke particles self.entity .world - .broadcast_packet_all(&CEntityStatus::new(self.entity.entity_id, 60)); + .broadcast_packet_all(&CEntityStatus::new(self.entity.entity_id, 60)) + .await; // Plays the death sound and death animation self.entity .world - .broadcast_packet_all(&CEntityStatus::new(self.entity.entity_id, 3)); - self.entity.remove(); + .broadcast_packet_all(&CEntityStatus::new(self.entity.entity_id, 3)) + .await; + self.entity.remove().await; } } diff --git a/pumpkin/src/entity/mod.rs b/pumpkin/src/entity/mod.rs index 9ffcbb94a..d2d0447af 100644 --- a/pumpkin/src/entity/mod.rs +++ b/pumpkin/src/entity/mod.rs @@ -101,13 +101,14 @@ impl Entity { let block_pos = self.block_pos.load(); let block_pos_vec = block_pos.0; if i != block_pos_vec.x || j != block_pos_vec.y || k != block_pos_vec.z { - self.block_pos.store(WorldPosition(Vector3::new(i, j, k))); + let new_block_pos = Vector3::new(i, j, k); + self.block_pos.store(WorldPosition(new_block_pos)); let chunk_pos = self.chunk_pos.load(); if get_section_cord(i) != chunk_pos.x || get_section_cord(k) != chunk_pos.z { self.chunk_pos.store(Vector2::new( - get_section_cord(block_pos_vec.x), - get_section_cord(block_pos_vec.z), + get_section_cord(new_block_pos.x), + get_section_cord(new_block_pos.z), )); } } @@ -122,8 +123,8 @@ impl Entity { } /// Removes the Entity from their current World - pub fn remove(&self) { - self.world.remove_entity(self); + pub async fn remove(&self) { + self.world.remove_entity(self).await; } /// Applies knockback to the entity, following vanilla Minecraft's mechanics. @@ -190,7 +191,7 @@ impl Entity { b &= !(1 << index); } let packet = CSetEntityMetadata::new(self.entity_id.into(), Metadata::new(0, 0.into(), b)); - self.world.broadcast_packet_all(&packet); + self.world.broadcast_packet_all(&packet).await; } pub async fn set_pose(&self, pose: EntityPose) { @@ -200,7 +201,7 @@ impl Entity { self.entity_id.into(), Metadata::new(6, 20.into(), (pose).into()), ); - self.world.broadcast_packet_all(&packet) + self.world.broadcast_packet_all(&packet).await } } diff --git a/pumpkin/src/entity/player.rs b/pumpkin/src/entity/player.rs index bb2b15ece..bdb3640d0 100644 --- a/pumpkin/src/entity/player.rs +++ b/pumpkin/src/entity/player.rs @@ -6,7 +6,6 @@ use std::sync::{ use crossbeam::atomic::AtomicCell; use num_derive::FromPrimitive; use num_traits::ToPrimitive; -use parking_lot::Mutex; use pumpkin_core::{ math::{boundingbox::BoundingBox, position::WorldPosition, vector3::Vector3}, text::TextComponent, @@ -28,6 +27,7 @@ use pumpkin_protocol::{ }, RawPacket, ServerPacket, VarInt, }; +use tokio::sync::Mutex; use pumpkin_protocol::server::play::{SCloseContainer, SKeepAlive}; use pumpkin_world::item::ItemStack; @@ -91,13 +91,13 @@ pub struct Player { } impl Player { - pub fn new( + pub async fn new( client: Arc, world: Arc, entity_id: EntityId, gamemode: GameMode, ) -> Self { - let gameprofile = client.gameprofile.lock().clone().map_or_else( + let gameprofile = client.gameprofile.lock().await.clone().map_or_else( || { log::error!("No gameprofile?. Impossible"); GameProfile { @@ -109,7 +109,7 @@ impl Player { }, |profile| profile, ); - let config = client.config.lock().clone().unwrap_or_default(); + let config = client.config.lock().await.clone().unwrap_or_default(); Self { living_entity: LivingEntity::new(Entity::new( entity_id, @@ -138,7 +138,7 @@ impl Player { /// Removes the Player out of the current World pub async fn remove(&self) { - self.living_entity.entity.world.remove_player(self); + self.living_entity.entity.world.remove_player(self).await; } pub const fn entity_id(&self) -> EntityId { @@ -146,7 +146,7 @@ impl Player { } /// Updates the current abilities the Player has - pub fn send_abilties_update(&mut self) { + pub async fn send_abilties_update(&mut self) { let mut b = 0i8; let abilities = &self.abilities; @@ -162,14 +162,16 @@ impl Player { if abilities.creative { b |= 8; } - self.client.send_packet(&CPlayerAbilities::new( - b, - abilities.fly_speed, - abilities.walk_speed_fov, - )); + self.client + .send_packet(&CPlayerAbilities::new( + b, + abilities.fly_speed, + abilities.walk_speed_fov, + )) + .await; } - pub fn teleport(&self, x: f64, y: f64, z: f64, yaw: f32, pitch: f32) { + pub async fn teleport(&self, x: f64, y: f64, z: f64, yaw: f32, pitch: f32) { // this is the ultra special magic code used to create the teleport id // This returns the old value let i = self @@ -183,16 +185,18 @@ impl Player { let entity = &self.living_entity.entity; entity.set_pos(x, y, z); entity.set_rotation(yaw, pitch); - *self.awaiting_teleport.lock() = Some((teleport_id.into(), Vector3::new(x, y, z))); - self.client.send_packet(&CSyncPlayerPosition::new( - x, - y, - z, - yaw, - pitch, - 0, - teleport_id.into(), - )); + *self.awaiting_teleport.lock().await = Some((teleport_id.into(), Vector3::new(x, y, z))); + self.client + .send_packet(&CSyncPlayerPosition::new( + x, + y, + z, + yaw, + pitch, + 0, + teleport_id.into(), + )) + .await; } pub fn block_interaction_range(&self) -> f64 { @@ -216,7 +220,7 @@ impl Player { } /// Kicks the Client with a reason depending on the connection state - pub fn kick(&self, reason: TextComponent) { + pub async fn kick<'a>(&self, reason: TextComponent<'a>) { assert!(!self .client .closed @@ -224,6 +228,7 @@ impl Player { self.client .try_send_packet(&CPlayDisconnect::new(&reason)) + .await .unwrap_or_else(|_| self.client.close()); log::info!( "Kicked {} for {}", @@ -233,15 +238,16 @@ impl Player { self.client.close() } - pub fn set_health(&self, health: f32, food: i32, food_saturation: f32) { - self.living_entity.set_health(health); + pub async fn set_health(&self, health: f32, food: i32, food_saturation: f32) { + self.living_entity.set_health(health).await; self.food.store(food, std::sync::atomic::Ordering::Relaxed); self.food_saturation.store(food_saturation); self.client - .send_packet(&CSetHealth::new(health, food.into(), food_saturation)); + .send_packet(&CSetHealth::new(health, food.into(), food_saturation)) + .await; } - pub fn set_gamemode(&self, gamemode: GameMode) { + pub async fn set_gamemode(&self, gamemode: GameMode) { // We could send the same gamemode without problems. But why waste bandwidth ? let current_gamemode = self.gamemode.load(); assert!( @@ -260,34 +266,39 @@ impl Player { uuid: self.gameprofile.id, actions: vec![PlayerAction::UpdateGameMode((gamemode as i32).into())], }], - )); - self.client.send_packet(&CGameEvent::new( - GameEvent::ChangeGameMode, - gamemode.to_f32().unwrap(), - )); + )) + .await; + self.client + .send_packet(&CGameEvent::new( + GameEvent::ChangeGameMode, + gamemode.to_f32().unwrap(), + )) + .await; } - pub fn send_system_message(&self, text: &TextComponent) { + pub async fn send_system_message<'a>(&self, text: &TextComponent<'a>) { self.client - .send_packet(&CSystemChatMessage::new(text, false)); + .send_packet(&CSystemChatMessage::new(text, false)) + .await; } } impl Player { pub async fn process_packets(self: &Arc, server: &Arc) { - let mut packets = self.client.client_packets_queue.lock(); + let mut packets = self.client.client_packets_queue.lock().await; while let Some(mut packet) = packets.pop_back() { match self.handle_play_packet(server, &mut packet).await { Ok(_) => {} Err(e) => { if e.is_kick() { if let Some(kick_reason) = e.client_kick_reason() { - self.kick(TextComponent::text(&kick_reason)) + self.kick(TextComponent::text(&kick_reason)).await } else { self.kick(TextComponent::text(&format!( "Error while reading incoming packet {}", e - ))); + ))) + .await; } } e.log(); @@ -304,11 +315,13 @@ impl Player { let bytebuf = &mut packet.bytebuf; match packet.id.0 { SConfirmTeleport::PACKET_ID => { - self.handle_confirm_teleport(SConfirmTeleport::read(bytebuf)?); + self.handle_confirm_teleport(SConfirmTeleport::read(bytebuf)?) + .await; Ok(()) } SChatCommand::PACKET_ID => { - self.handle_chat_command(server, SChatCommand::read(bytebuf)?); + self.handle_chat_command(server, SChatCommand::read(bytebuf)?) + .await; Ok(()) } SPlayerPosition::PACKET_ID => { @@ -342,7 +355,8 @@ impl Player { Ok(()) } SClientInformationPlay::PACKET_ID => { - self.handle_client_information_play(SClientInformationPlay::read(bytebuf)?); + self.handle_client_information_play(SClientInformationPlay::read(bytebuf)?) + .await; Ok(()) } SInteract::PACKET_ID => { @@ -364,15 +378,18 @@ impl Player { Ok(()) } SSetHeldItem::PACKET_ID => { - self.handle_set_held_item(SSetHeldItem::read(bytebuf)?); + self.handle_set_held_item(SSetHeldItem::read(bytebuf)?) + .await; Ok(()) } SSetCreativeSlot::PACKET_ID => { - self.handle_set_creative_slot(SSetCreativeSlot::read(bytebuf)?)?; + self.handle_set_creative_slot(SSetCreativeSlot::read(bytebuf)?) + .await?; Ok(()) } SPlayPingRequest::PACKET_ID => { - self.handle_play_ping_request(SPlayPingRequest::read(bytebuf)?); + self.handle_play_ping_request(SPlayPingRequest::read(bytebuf)?) + .await; Ok(()) } SClickContainer::PACKET_ID => { @@ -381,7 +398,8 @@ impl Player { Ok(()) } SCloseContainer::PACKET_ID => { - self.handle_close_container(server, SCloseContainer::read(bytebuf)?); + self.handle_close_container(server, SCloseContainer::read(bytebuf)?) + .await; Ok(()) } SKeepAlive::PACKET_ID => { diff --git a/pumpkin/src/error.rs b/pumpkin/src/error.rs index 85cf4a34f..74fb5c0e7 100644 --- a/pumpkin/src/error.rs +++ b/pumpkin/src/error.rs @@ -3,7 +3,7 @@ use pumpkin_inventory::InventoryError; use pumpkin_protocol::bytebuf::DeserializerError; use std::fmt::Display; -pub trait PumpkinError: std::error::Error + Display { +pub trait PumpkinError: Send + std::error::Error + Display { fn is_kick(&self) -> bool; fn log(&self) { diff --git a/pumpkin/src/main.rs b/pumpkin/src/main.rs index 7b7be6fae..7445161ec 100644 --- a/pumpkin/src/main.rs +++ b/pumpkin/src/main.rs @@ -11,21 +11,17 @@ #![expect(clippy::significant_drop_tightening)] #![expect(clippy::future_not_send)] #![expect(clippy::single_call_fn)] -#![expect(clippy::await_holding_lock)] #[cfg(target_os = "wasi")] compile_error!("Compiling for WASI targets is not supported!"); use log::LevelFilter; -use mio::net::TcpListener; -use mio::{Events, Interest, Poll, Token}; -use client::{interrupted, Client}; +use client::Client; use pumpkin_protocol::client::play::CKeepAlive; use pumpkin_protocol::ConnectionState; use server::Server; -use std::collections::HashMap; -use std::io::{self, Read}; +use std::io::{self}; use std::time::Duration; // Setup some tokens to allow us to identify which event is for which socket. @@ -82,20 +78,20 @@ const fn convert_logger_filter(level: pumpkin_config::logging::LevelFilter) -> L } } -fn main() -> io::Result<()> { +#[tokio::main] +async fn main() -> io::Result<()> { use std::sync::Arc; - use entity::player::Player; use pumpkin_config::{ADVANCED_CONFIG, BASIC_CONFIG}; use pumpkin_core::text::{color::NamedColor, TextComponent}; use rcon::RCONServer; init_logger(); - let rt = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(); + // let rt = tokio::runtime::Builder::new_multi_thread() + // .enable_all() + // .build() + // .unwrap(); ctrlc::set_handler(|| { log::warn!( @@ -109,206 +105,138 @@ fn main() -> io::Result<()> { .unwrap(); // ensure rayon is built outside of tokio scope rayon::ThreadPoolBuilder::new().build_global().unwrap(); - rt.block_on(async { - let default_panic = std::panic::take_hook(); - std::panic::set_hook(Box::new(move |info| { - default_panic(info); - // TODO: Gracefully exit? - std::process::exit(1); - })); - - const SERVER: Token = Token(0); - use std::time::Instant; - - let time = Instant::now(); - - // Create a poll instance. - let mut poll = Poll::new()?; - // Create storage for events. - let mut events = Events::with_capacity(128); - - // Setup the TCP server socket. - let addr = BASIC_CONFIG.server_address; - let mut listener = TcpListener::bind(addr)?; - - // Register the server with poll we can receive events for it. - poll.registry() - .register(&mut listener, SERVER, Interest::READABLE)?; - - // Unique token for each incoming connection. - let mut unique_id = SERVER.0 + 1; - - let use_console = ADVANCED_CONFIG.commands.use_console; - let rcon = ADVANCED_CONFIG.rcon.clone(); - - let mut clients: HashMap> = HashMap::new(); - let mut players: HashMap> = HashMap::new(); - - let server = Arc::new(Server::new()); - log::info!("Started Server took {}ms", time.elapsed().as_millis()); - log::info!("You now can connect to the server, Listening on {}", addr); - - if use_console { - let server = server.clone(); - tokio::spawn(async move { - let stdin = std::io::stdin(); - loop { - let mut out = String::new(); - stdin - .read_line(&mut out) - .expect("Failed to read console line"); - - if !out.is_empty() { - let dispatcher = server.command_dispatcher.clone(); - dispatcher.handle_command( - &mut commands::CommandSender::Console, - &server, - &out, - ); - } - } - }); - } - if rcon.enabled { - let server = server.clone(); - tokio::spawn(async move { - RCONServer::new(&rcon, &server).await.unwrap(); - }); - } - loop { - if let Err(err) = poll.poll(&mut events, None) { - if interrupted(&err) { - continue; + let default_panic = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |info| { + default_panic(info); + // TODO: Gracefully exit? + std::process::exit(1); + })); + + use std::time::Instant; + + let time = Instant::now(); + + // Setup the TCP server socket. + let addr = BASIC_CONFIG.server_address; + let listener = tokio::net::TcpListener::bind(addr) + .await + .expect("Failed to start TcpListener"); + + let use_console = ADVANCED_CONFIG.commands.use_console; + let rcon = ADVANCED_CONFIG.rcon.clone(); + + let server = Arc::new(Server::new()); + log::info!("Started Server took {}ms", time.elapsed().as_millis()); + log::info!("You now can connect to the server, Listening on {}", addr); + + if use_console { + let server = server.clone(); + tokio::spawn(async move { + let stdin = std::io::stdin(); + loop { + let mut out = String::new(); + stdin + .read_line(&mut out) + .expect("Failed to read console line"); + + if !out.is_empty() { + let dispatcher = server.command_dispatcher.clone(); + dispatcher + .handle_command(&mut commands::CommandSender::Console, &server, &out) + .await; } - return Err(err); } + }); + } + if rcon.enabled { + let server = server.clone(); + tokio::spawn(async move { + RCONServer::new(&rcon, server).await.unwrap(); + }); + } + let mut unique_id = 0; + loop { + // Asynchronously wait for an inbound socket. + let (connection, address) = listener.accept().await?; + + log::info!( + "Accepted connection from: {}", + scrub_address(&format!("{}", address)) + ); - for event in events.iter() { - match event.token() { - s if s == SERVER => loop { - // Received an event for the TCP server socket, which - // indicates we can accept an connection. - let (mut connection, address) = match listener.accept() { - Ok((connection, address)) => (connection, address), - Err(e) if e.kind() == io::ErrorKind::WouldBlock => { - // If we get a `WouldBlock` error we know our - // listener has no more incoming connections queued, - // so we can return to polling and wait for some - // more. - break; - } - Err(e) => { - // If it was any other kind of error, something went - // wrong and we terminate with an error. - return Err(e); - } - }; - if let Err(e) = connection.set_nodelay(true) { - log::warn!("failed to set TCP_NODELAY {e}"); - } + if let Err(e) = connection.set_nodelay(true) { + log::warn!("failed to set TCP_NODELAY {e}"); + } - log::info!( - "Accepted connection from: {}", - scrub_address(&format!("{}", address)) - ); + unique_id += 1; + let id = unique_id; - unique_id += 1; - let id = unique_id; - poll.registry().register( - &mut connection, - Token(id), - Interest::READABLE.add(Interest::WRITABLE), - )?; - let keep_alive = tokio::sync::mpsc::channel(1024); - let client = - Arc::new(Client::new(id, connection, address, keep_alive.0.into())); + let keep_alive = tokio::sync::mpsc::channel(1024); + let client = Arc::new(Client::new(id, connection, addr, keep_alive.0.into())); + { + let client = client.clone(); + let mut receiver = keep_alive.1; + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + loop { + interval.tick().await; + let now = std::time::Instant::now(); + if client.connection_state.load() == ConnectionState::Play { + if now.duration_since(client.last_alive_received.load()) + >= Duration::from_secs(15) { - let client = client.clone(); - let mut receiver = keep_alive.1; - tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(1)); - loop { - interval.tick().await; - let now = std::time::Instant::now(); - if client.connection_state.load() == ConnectionState::Play { - if now.duration_since(client.last_alive_received.load()) - >= Duration::from_secs(15) - { - dbg!("no keep alive"); - client.kick("No keep alive received"); - break; - } - let random = rand::random::(); - client.send_packet(&CKeepAlive { - keep_alive_id: random, - }); - if let Some(id) = receiver.recv().await { - if id == random { - client.last_alive_received.store(now); - } - } - } else { - client.last_alive_received.store(now); - } - } - }); + dbg!("no keep alive"); + client.kick("No keep alive received").await; + break; } - clients.insert(id, client); - }, - // Maybe received an event for a TCP connection. - token => { - // poll Player - if let Some(player) = players.get_mut(&token.0) { - player.client.poll(event).await; - let closed = player - .client - .closed - .load(std::sync::atomic::Ordering::Relaxed); - if !closed { - player.process_packets(&server).await; - } - if closed { - if let Some(player) = players.remove(&token.0) { - player.remove().await; - let connection = &mut player.client.connection.lock(); - poll.registry().deregister(connection.by_ref())?; - } - } - }; - - // Poll current Clients (non players) - let (done, make_player) = if let Some(client) = clients.get_mut(&token.0) { - client.poll(event).await; - let closed = client.closed.load(std::sync::atomic::Ordering::Relaxed); - if !closed { - client.process_packets(&server).await; - } - ( - closed, - client - .make_player - .load(std::sync::atomic::Ordering::Relaxed), - ) - } else { - (false, false) - }; - if done || make_player { - if let Some(client) = clients.remove(&token.0) { - if done { - let connection = &mut client.connection.lock(); - poll.registry().deregister(connection.by_ref())?; - } else if make_player { - let id = client.id; - let (player, world) = server.add_player(id, client).await; - players.insert(id, player.clone()); - world.spawn_player(&BASIC_CONFIG, player).await; - } + let random = rand::random::(); + client + .send_packet(&CKeepAlive { + keep_alive_id: random, + }) + .await; + if let Some(id) = receiver.recv().await { + if id == random { + client.last_alive_received.store(now); } } + } else { + client.last_alive_received.store(now); } } - } + }); } - }) + + let server = server.clone(); + tokio::spawn(async move { + let server = &server; // Reference to server + while !client.closed.load(std::sync::atomic::Ordering::Relaxed) + && !client + .make_player + .load(std::sync::atomic::Ordering::Relaxed) + { + client.process_packets(server).await; + client.poll().await; + } + if client + .make_player + .load(std::sync::atomic::Ordering::Relaxed) + { + let id = client.id; + let (player, world) = server.add_player(id, client).await; + world.spawn_player(&BASIC_CONFIG, player.clone()).await; + // poll Player + while !player + .client + .closed + .load(std::sync::atomic::Ordering::Relaxed) + { + player.process_packets(server).await; + player.client.poll().await; + } + player.remove().await; + } + }); + } } diff --git a/pumpkin/src/proxy/bungeecord.rs b/pumpkin/src/proxy/bungeecord.rs index 76a2e130a..c8c9d1cea 100644 --- a/pumpkin/src/proxy/bungeecord.rs +++ b/pumpkin/src/proxy/bungeecord.rs @@ -20,11 +20,11 @@ pub enum BungeeCordError { FailedMakeOfflineUUID, } -pub fn bungeecord_login( +pub async fn bungeecord_login( client: &Client, username: String, ) -> Result<(IpAddr, GameProfile), BungeeCordError> { - let server_address = client.server_address.lock(); + let server_address = client.server_address.lock().await; let data = server_address.split('\0').take(4).collect::>(); // Ip of player, only given if ip_forward on bungee is true @@ -32,7 +32,7 @@ pub fn bungeecord_login( Some(ip) => ip .parse() .map_err(|_| BungeeCordError::FailedParseAddress)?, - None => client.address.lock().ip(), + None => client.address.lock().await.ip(), }; // Uuid of player, only given if ip_forward on bungee is true diff --git a/pumpkin/src/proxy/velocity.rs b/pumpkin/src/proxy/velocity.rs index f1b4fe494..37099bddc 100644 --- a/pumpkin/src/proxy/velocity.rs +++ b/pumpkin/src/proxy/velocity.rs @@ -44,17 +44,19 @@ pub enum VelocityError { FailedReadProfileProperties, } -pub fn velocity_login(client: &Client) { +pub async fn velocity_login(client: &Client) { // TODO: validate packet transaction id from plugin response with this let velocity_message_id: i32 = rand::thread_rng().gen(); let mut buf = BytesMut::new(); buf.put_u8(MAX_SUPPORTED_FORWARDING_VERSION); - client.send_packet(&CLoginPluginRequest::new( - velocity_message_id.into(), - PLAYER_INFO_CHANNEL, - &buf, - )); + client + .send_packet(&CLoginPluginRequest::new( + velocity_message_id.into(), + PLAYER_INFO_CHANNEL, + &buf, + )) + .await; } pub fn check_integrity(data: (&[u8], &[u8]), secret: &str) -> bool { diff --git a/pumpkin/src/rcon/mod.rs b/pumpkin/src/rcon/mod.rs index 0feff8fa5..ce8aa4d3f 100644 --- a/pumpkin/src/rcon/mod.rs +++ b/pumpkin/src/rcon/mod.rs @@ -1,16 +1,13 @@ use std::{ - collections::HashMap, - io::{self, Read, Write}, + io::{self}, net::SocketAddr, }; -use mio::{ - net::{TcpListener, TcpStream}, - Events, Interest, Poll, Token, -}; use packet::{ClientboundPacket, Packet, PacketError, ServerboundPacket}; use pumpkin_config::{RCONConfig, ADVANCED_CONFIG}; +use std::sync::Arc; use thiserror::Error; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use crate::server::Server; @@ -26,96 +23,38 @@ pub enum RCONError { Io(io::Error), } -const SERVER: Token = Token(0); - pub struct RCONServer; impl RCONServer { - pub async fn new(config: &RCONConfig, server: &Server) -> Result { + pub async fn new(config: &RCONConfig, server: Arc) -> Result { assert!(config.enabled, "RCON is not enabled"); - let mut poll = Poll::new().unwrap(); - let mut listener = TcpListener::bind(config.address).unwrap(); - - poll.registry() - .register(&mut listener, SERVER, Interest::READABLE) - .unwrap(); - - let mut unique_id = SERVER.0 + 1; - - let mut events = Events::with_capacity(20); - - let mut connections: HashMap = HashMap::new(); + let listener = tokio::net::TcpListener::bind(config.address).await.unwrap(); - let password = config.password.clone(); + let password = Arc::new(config.password.clone()); + let mut connections = 0; loop { - poll.poll(&mut events, None).unwrap(); - - for event in events.iter() { - match event.token() { - SERVER => loop { - // Received an event for the TCP server socket, which - // indicates we can accept an connection. - let (mut connection, address) = match listener.accept() { - Ok((connection, address)) => (connection, address), - Err(e) if e.kind() == io::ErrorKind::WouldBlock => { - // If we get a `WouldBlock` error we know our - // listener has no more incoming connections queued, - // so we can return to polling and wait for some - // more. - break; - } - Err(e) => { - // If it was any other kind of error, something went - // wrong and we terminate with an error. - return Err(e); - } - }; - if config.max_connections != 0 - && connections.len() >= config.max_connections as usize - { - break; - } + // Asynchronously wait for an inbound socket. + let (connection, address) = listener.accept().await?; - unique_id += 1; - let token = unique_id; - poll.registry() - .register( - &mut connection, - Token(token), - Interest::READABLE.add(Interest::WRITABLE), - ) - .unwrap(); - connections.insert(token, RCONClient::new(connection, address)); - }, - - token => { - let done = if let Some(client) = connections.get_mut(&token.0) { - client.handle(server, &password).await - } else { - false - }; - if done { - if let Some(mut client) = connections.remove(&token.0) { - let config = &ADVANCED_CONFIG.rcon; - if config.logging.log_quit { - log::info!( - "RCON ({}): Client closed connection", - client.address - ); - } - poll.registry().deregister(&mut client.connection)?; - } - } - } - } + if config.max_connections != 0 && connections >= config.max_connections { + continue; } + + connections += 1; + let mut client = RCONClient::new(connection, address); + + let password = password.clone(); + let server = server.clone(); + tokio::spawn(async move { while !client.handle(&server, &password).await {} }); + dbg!("closed"); + connections -= 1; } } } pub struct RCONClient { - connection: TcpStream, + connection: tokio::net::TcpStream, address: SocketAddr, logged_in: bool, incoming: Vec, @@ -123,7 +62,7 @@ pub struct RCONClient { } impl RCONClient { - pub const fn new(connection: TcpStream, address: SocketAddr) -> Self { + pub const fn new(connection: tokio::net::TcpStream, address: SocketAddr) -> Self { Self { connection, address, @@ -133,20 +72,19 @@ impl RCONClient { } } + /// Returns if client is closed or not pub async fn handle(&mut self, server: &Server, password: &str) -> bool { if !self.closed { - loop { - match self.read_bytes() { - // Stream closed, so we can't reply, so we just close everything. - Ok(true) => return true, - Ok(false) => {} - Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, - Err(e) => { - log::error!("could not read packet: {e}"); - return true; - } + match self.read_bytes().await { + // Stream closed, so we can't reply, so we just close everything. + Ok(true) => return true, + Ok(false) => {} + Err(e) => { + log::error!("could not read packet: {e}"); + return true; } } + dbg!("a"); // If we get a close here, we might have a reply, which we still want to write. let _ = self.poll(server, password).await.map_err(|e| { log::error!("RCON error: {e}"); @@ -157,57 +95,56 @@ impl RCONClient { } async fn poll(&mut self, server: &Server, password: &str) -> Result<(), PacketError> { - loop { - let packet = match self.receive_packet().await? { - Some(p) => p, - None => return Ok(()), - }; - - let config = &ADVANCED_CONFIG.rcon; - match packet.get_type() { - ServerboundPacket::Auth => { - let body = packet.get_body(); - if !body.is_empty() && packet.get_body() == password { - self.send(ClientboundPacket::AuthResponse, packet.get_id(), "".into()) - .await?; - if config.logging.log_logged_successfully { - log::info!("RCON ({}): Client logged in successfully", self.address); - } - self.logged_in = true; - } else { - if config.logging.log_wrong_password { - log::info!("RCON ({}): Client has tried wrong password", self.address); - } - self.send(ClientboundPacket::AuthResponse, -1, "".into()) - .await?; - self.closed = true; + let packet = match self.receive_packet().await? { + Some(p) => p, + None => return Ok(()), + }; + let config = &ADVANCED_CONFIG.rcon; + match packet.get_type() { + ServerboundPacket::Auth => { + if packet.get_body() == password { + self.send(ClientboundPacket::AuthResponse, packet.get_id(), "".into()) + .await?; + if config.logging.log_logged_successfully { + log::info!("RCON ({}): Client logged in successfully", self.address); + } + self.logged_in = true; + } else { + if config.logging.log_wrong_password { + log::info!("RCON ({}): Client has tried wrong password", self.address); } + self.send(ClientboundPacket::AuthResponse, -1, "".into()) + .await?; + self.closed = true; } - ServerboundPacket::ExecCommand => { - if self.logged_in { - let mut output = Vec::new(); - let dispatcher = server.command_dispatcher.clone(); - dispatcher.handle_command( + } + ServerboundPacket::ExecCommand => { + if self.logged_in { + let mut output = Vec::new(); + let dispatcher = server.command_dispatcher.clone(); + dispatcher + .handle_command( &mut crate::commands::CommandSender::Rcon(&mut output), server, packet.get_body(), - ); - for line in output { - if config.logging.log_commands { - log::info!("RCON ({}): {}", self.address, line); - } - self.send(ClientboundPacket::Output, packet.get_id(), line) - .await?; + ) + .await; + for line in output { + if config.logging.log_commands { + log::info!("RCON ({}): {}", self.address, line); } + self.send(ClientboundPacket::Output, packet.get_id(), line) + .await?; } } } } + Ok(()) } - fn read_bytes(&mut self) -> io::Result { + async fn read_bytes(&mut self) -> io::Result { let mut buf = [0; 1460]; - let n = self.connection.read(&mut buf)?; + let n = self.connection.read(&mut buf).await?; if n == 0 { return Ok(true); } @@ -224,6 +161,7 @@ impl RCONClient { let buf = packet.write_buf(id, body); self.connection .write(&buf) + .await .map_err(PacketError::FailedSend)?; Ok(()) } diff --git a/pumpkin/src/server/mod.rs b/pumpkin/src/server/mod.rs index 6deee69e2..a720e4c15 100644 --- a/pumpkin/src/server/mod.rs +++ b/pumpkin/src/server/mod.rs @@ -1,6 +1,5 @@ use connection_cache::{CachedBranding, CachedStatus}; use key_store::KeyStore; -use parking_lot::{Mutex, RwLock}; use pumpkin_config::BASIC_CONFIG; use pumpkin_core::GameMode; use pumpkin_entity::EntityId; @@ -19,6 +18,8 @@ use std::{ }, time::Duration, }; +use tokio::sync::Mutex; +use tokio::sync::RwLock; use crate::client::EncryptionError; use crate::{ @@ -100,17 +101,17 @@ impl Server { // TODO: select default from config let world = &self.worlds[0]; - let player = Arc::new(Player::new(client, world.clone(), entity_id, gamemode)); - world.add_player(id, player.clone()); + let player = Arc::new(Player::new(client, world.clone(), entity_id, gamemode).await); + world.add_player(id, player.clone()).await; (player, world.clone()) } - pub fn try_get_container( + pub async fn try_get_container( &self, player_id: EntityId, container_id: u64, ) -> Option>>> { - let open_containers = self.open_containers.read(); + let open_containers = self.open_containers.read().await; open_containers .get(&container_id)? .try_open(player_id) @@ -118,17 +119,17 @@ impl Server { } /// Sends a Packet to all Players in all worlds - pub fn broadcast_packet_all

(&self, packet: &P) + pub async fn broadcast_packet_all

(&self, packet: &P) where P: ClientPacket, { for world in &self.worlds { - world.broadcast_packet_all(packet) + world.broadcast_packet_all(packet).await } } /// Searches every world for a player by name - pub fn get_player_by_name(&self, name: &str) -> Option> { + pub async fn get_player_by_name(&self, name: &str) -> Option> { for world in self.worlds.iter() { if let Some(player) = world.get_player_by_name(name) { return Some(player); diff --git a/pumpkin/src/world/mod.rs b/pumpkin/src/world/mod.rs index 78e9f1c31..e68d5bfd6 100644 --- a/pumpkin/src/world/mod.rs +++ b/pumpkin/src/world/mod.rs @@ -7,7 +7,6 @@ use crate::{ entity::{player::Player, Entity}, }; use num_traits::ToPrimitive; -use parking_lot::Mutex; use pumpkin_config::BasicConfiguration; use pumpkin_core::math::vector2::Vector2; use pumpkin_entity::{entity_type::EntityType, EntityId}; @@ -21,6 +20,7 @@ use pumpkin_protocol::{ use pumpkin_world::level::Level; use scoreboard::Scoreboard; use tokio::sync::mpsc; +use tokio::sync::Mutex; pub mod scoreboard; @@ -56,13 +56,13 @@ impl World { /// Sends the specified packet to every player currently logged in to the server. /// /// **Note:** This function acquires a lock on the `current_players` map, ensuring thread safety. - pub fn broadcast_packet_all

(&self, packet: &P) + pub async fn broadcast_packet_all

(&self, packet: &P) where P: ClientPacket, { - let current_players = self.current_players.lock(); + let current_players = self.current_players.lock().await; for player in current_players.values() { - player.client.send_packet(packet); + player.client.send_packet(packet).await; } } @@ -71,13 +71,13 @@ impl World { /// Sends the specified packet to every player currently logged in to the server, excluding the players listed in the `except` parameter. /// /// **Note:** This function acquires a lock on the `current_players` map, ensuring thread safety. - pub fn broadcast_packet_expect

(&self, except: &[usize], packet: &P) + pub async fn broadcast_packet_expect

(&self, except: &[usize], packet: &P) where P: ClientPacket, { - let current_players = self.current_players.lock(); + let current_players = self.current_players.lock().await; for (_, player) in current_players.iter().filter(|c| !except.contains(c.0)) { - player.client.send_packet(packet); + player.client.send_packet(packet).await; } } @@ -88,33 +88,37 @@ impl World { log::debug!("spawning player, entity id {}", entity_id); // login packet for our new player - player.client.send_packet(&CLogin::new( - entity_id, - base_config.hardcore, - &["minecraft:overworld"], - base_config.max_players.into(), - base_config.view_distance.into(), // TODO: view distance - base_config.simulation_distance.into(), // TODO: sim view dinstance - false, - false, - false, - 0.into(), - "minecraft:overworld", - 0, // seed - gamemode.to_u8().unwrap(), - base_config.default_gamemode.to_i8().unwrap(), - false, - false, - None, - 0.into(), - false, - )); + player + .client + .send_packet(&CLogin::new( + entity_id, + base_config.hardcore, + &["minecraft:overworld"], + base_config.max_players.into(), + base_config.view_distance.into(), // TODO: view distance + base_config.simulation_distance.into(), // TODO: sim view dinstance + false, + false, + false, + 0.into(), + "minecraft:overworld", + 0, // seed + gamemode.to_u8().unwrap(), + base_config.default_gamemode.to_i8().unwrap(), + false, + false, + None, + 0.into(), + false, + )) + .await; dbg!("sending abilities"); // player abilities // TODO: this is for debug purpose, remove later player .client - .send_packet(&CPlayerAbilities::new(0x02, 0.4, 0.1)); + .send_packet(&CPlayerAbilities::new(0x02, 0.4, 0.1)) + .await; // teleport let x = 10.0; @@ -122,7 +126,7 @@ impl World { let z = 10.0; let yaw = 10.0; let pitch = 10.0; - player.teleport(x, y, z, 10.0, 10.0); + player.teleport(x, y, z, 10.0, 10.0).await; let gameprofile = &player.gameprofile; // first send info update to our new player, So he can see his Skin // also send his info to everyone else @@ -138,12 +142,13 @@ impl World { PlayerAction::UpdateListed(true), ], }], - )); + )) + .await; // here we send all the infos of already joined players let mut entries = Vec::new(); { - let current_players = self.current_players.lock(); + let current_players = self.current_players.lock().await; for (_, playerr) in current_players .iter() .filter(|(c, _)| **c != player.client.id) @@ -162,7 +167,8 @@ impl World { } player .client - .send_packet(&CPlayerInfoUpdate::new(0x01 | 0x08, &entries)); + .send_packet(&CPlayerInfoUpdate::new(0x01 | 0x08, &entries)) + .await; } let gameprofile = &player.gameprofile; @@ -186,82 +192,104 @@ impl World { 0.0, 0.0, ), - ); + ) + .await; // spawn players for our client let token = player.client.id; - for (_, existing_player) in self.current_players.lock().iter().filter(|c| c.0 != &token) { + for (_, existing_player) in self + .current_players + .lock() + .await + .iter() + .filter(|c| c.0 != &token) + { let entity = &existing_player.living_entity.entity; let pos = entity.pos.load(); let gameprofile = &existing_player.gameprofile; - player.client.send_packet(&CSpawnEntity::new( - existing_player.entity_id().into(), - gameprofile.id, - (EntityType::Player as i32).into(), - pos.x, - pos.y, - pos.z, - entity.yaw.load(), - entity.pitch.load(), - entity.head_yaw.load(), - 0.into(), - 0.0, - 0.0, - 0.0, - )) + player + .client + .send_packet(&CSpawnEntity::new( + existing_player.entity_id().into(), + gameprofile.id, + (EntityType::Player as i32).into(), + pos.x, + pos.y, + pos.z, + entity.yaw.load(), + entity.pitch.load(), + entity.head_yaw.load(), + 0.into(), + 0.0, + 0.0, + 0.0, + )) + .await } // entity meta data // set skin parts - if let Some(config) = player.client.config.lock().as_ref() { + if let Some(config) = player.client.config.lock().await.as_ref() { let packet = CSetEntityMetadata::new( entity_id.into(), Metadata::new(17, VarInt(0), config.skin_parts), ); - self.broadcast_packet_all(&packet) + self.broadcast_packet_all(&packet).await } // Start waiting for level chunks, Sets the "Loading Terrain" screen player .client - .send_packet(&CGameEvent::new(GameEvent::StartWaitingChunks, 0.0)); + .send_packet(&CGameEvent::new(GameEvent::StartWaitingChunks, 0.0)) + .await; // Spawn in initial chunks player_chunker::player_join(self, player.clone()).await; } - async fn spawn_world_chunks(&self, client: &Client, chunks: Vec>, distance: i32) { + async fn spawn_world_chunks( + &self, + client: Arc, + chunks: Vec>, + distance: i32, + ) { let inst = std::time::Instant::now(); let (sender, mut chunk_receiver) = mpsc::channel(distance as usize); let level = self.level.clone(); let closed = client.closed.load(std::sync::atomic::Ordering::Relaxed); let chunks = Arc::new(chunks); - tokio::task::spawn_blocking(move || level.lock().fetch_chunks(&chunks, sender, closed)); + tokio::spawn(async move { + let level = level.lock().await; + level.fetch_chunks(&chunks, sender, closed) + }); - while let Some(chunk_data) = chunk_receiver.recv().await { - // dbg!(chunk_pos); - #[cfg(debug_assertions)] - if chunk_data.position == (0, 0).into() { - use pumpkin_protocol::bytebuf::ByteBuffer; - let mut test = ByteBuffer::empty(); - CChunkData(&chunk_data).write(&mut test); - let len = test.buf().len(); - log::debug!( - "Chunk packet size: {}B {}KB {}MB", - len, - len / 1024, - len / (1024 * 1024) - ); + let client = client; + tokio::spawn(async move { + while let Some(chunk_data) = chunk_receiver.recv().await { + // dbg!(chunk_pos); + #[cfg(debug_assertions)] + if chunk_data.position == (0, 0).into() { + use pumpkin_protocol::bytebuf::ByteBuffer; + let mut test = ByteBuffer::empty(); + CChunkData(&chunk_data).write(&mut test); + let len = test.buf().len(); + log::debug!( + "Chunk packet size: {}B {}KB {}MB", + len, + len / 1024, + len / (1024 * 1024) + ); + } + if !client.closed.load(std::sync::atomic::Ordering::Relaxed) { + client.send_packet(&CChunkData(&chunk_data)).await; + } } - if !client.closed.load(std::sync::atomic::Ordering::Relaxed) { - client.send_packet(&CChunkData(&chunk_data)); - } - } - dbg!("DONE CHUNKS", inst.elapsed()); + dbg!("DONE CHUNKS", inst.elapsed()); + }); } /// Gets a Player by entity id - pub fn get_player_by_entityid(&self, id: EntityId) -> Option> { - for player in self.current_players.lock().values() { + pub async fn get_player_by_entityid(&self, id: EntityId) -> Option> { + for player in self.current_players.lock().await.values() { if player.entity_id() == id { return Some(player.clone()); } @@ -271,7 +299,8 @@ impl World { /// Gets a Player by name pub fn get_player_by_name(&self, name: &str) -> Option> { - for player in self.current_players.lock().values() { + // not sure of blocking lock + for player in self.current_players.blocking_lock().values() { if player.gameprofile.name == name { return Some(player.clone()); } @@ -279,24 +308,27 @@ impl World { None } - pub fn add_player(&self, id: usize, player: Arc) { - self.current_players.lock().insert(id, player); + pub async fn add_player(&self, id: usize, player: Arc) { + self.current_players.lock().await.insert(id, player); } - pub fn remove_player(&self, player: &Player) { + pub async fn remove_player(&self, player: &Player) { self.current_players .lock() + .await .remove(&player.client.id) .unwrap(); let uuid = player.gameprofile.id; self.broadcast_packet_expect( &[player.client.id], &CRemovePlayerInfo::new(1.into(), &[uuid]), - ); - self.remove_entity(&player.living_entity.entity); + ) + .await; + self.remove_entity(&player.living_entity.entity).await; } - pub fn remove_entity(&self, entity: &Entity) { + pub async fn remove_entity(&self, entity: &Entity) { self.broadcast_packet_all(&CRemoveEntities::new(&[entity.entity_id.into()])) + .await } } diff --git a/pumpkin/src/world/player_chunker.rs b/pumpkin/src/world/player_chunker.rs index 8cb5ea257..e37d5a15e 100644 --- a/pumpkin/src/world/player_chunker.rs +++ b/pumpkin/src/world/player_chunker.rs @@ -4,17 +4,18 @@ use pumpkin_config::BASIC_CONFIG; use pumpkin_core::math::{ get_section_cord, position::WorldPosition, vector2::Vector2, vector3::Vector3, }; -use pumpkin_protocol::client::play::{CCenterChunk, CUnloadChunk}; +use pumpkin_protocol::client::play::CCenterChunk; use pumpkin_world::cylindrical_chunk_iterator::Cylindrical; -use crate::entity::{player::Player, Entity}; +use crate::entity::player::Player; use super::World; -fn get_view_distance(player: &Player) -> i8 { +async fn get_view_distance(player: &Player) -> i8 { player .config .lock() + .await .view_distance .clamp(2, BASIC_CONFIG.view_distance as i8) } @@ -24,11 +25,14 @@ pub async fn player_join(world: &World, player: Arc) { player.watched_section.store(new_watched); let watched_section = new_watched; let chunk_pos = player.living_entity.entity.chunk_pos.load(); - player.client.send_packet(&CCenterChunk { - chunk_x: chunk_pos.x.into(), - chunk_z: chunk_pos.z.into(), - }); - let view_distance = get_view_distance(&player) as i32; + player + .client + .send_packet(&CCenterChunk { + chunk_x: chunk_pos.x.into(), + chunk_z: chunk_pos.z.into(), + }) + .await; + let view_distance = get_view_distance(&player).await as i32; dbg!(view_distance); let old_cylindrical = Cylindrical::new( Vector2::new(watched_section.x, watched_section.z), @@ -42,38 +46,43 @@ pub async fn player_join(world: &World, player: Arc) { |chunk_pos| { loading_chunks.push(chunk_pos); }, - |chunk_pos| { - player - .client - .send_packet(&CUnloadChunk::new(chunk_pos.x, chunk_pos.z)); + |_| { + // player + // .client + // .send_packet(&CUnloadChunk::new(chunk_pos.x, chunk_pos.z)); }, true, ); if !loading_chunks.is_empty() { world - .spawn_world_chunks(&player.client, loading_chunks, view_distance) + .spawn_world_chunks(player.client.clone(), loading_chunks, view_distance) .await; } } -pub async fn update_position(entity: &Entity, player: &Player) { +pub async fn update_position(player: &Player) { + let entity = &player.living_entity.entity; let current_watched = player.watched_section.load(); let new_watched = chunk_section_from_pos(&entity.block_pos.load()); if current_watched != new_watched { let chunk_pos = entity.chunk_pos.load(); - player.client.send_packet(&CCenterChunk { - chunk_x: chunk_pos.x.into(), - chunk_z: chunk_pos.z.into(), - }); + player + .client + .send_packet(&CCenterChunk { + chunk_x: chunk_pos.x.into(), + chunk_z: chunk_pos.z.into(), + }) + .await; - let view_distance = get_view_distance(player) as i32; + let view_distance = get_view_distance(player).await as i32; let old_cylindrical = Cylindrical::new( Vector2::new(current_watched.x, current_watched.z), view_distance, ); - let new_cylindrical = - Cylindrical::new(Vector2::new(chunk_pos.x, chunk_pos.z), view_distance); + let new_cylindrical = Cylindrical::new(chunk_pos, view_distance); + player.watched_section.store(new_watched); + let mut loading_chunks = Vec::new(); Cylindrical::for_each_changed_chunk( old_cylindrical, @@ -81,17 +90,17 @@ pub async fn update_position(entity: &Entity, player: &Player) { |chunk_pos| { loading_chunks.push(chunk_pos); }, - |chunk_pos| { - player - .client - .send_packet(&CUnloadChunk::new(chunk_pos.x, chunk_pos.z)); + |_| { + // player + // .client + // .send_packet(&CUnloadChunk::new(chunk_pos.x, chunk_pos.z)); }, false, ); if !loading_chunks.is_empty() { entity .world - .spawn_world_chunks(&player.client, loading_chunks, view_distance) + .spawn_world_chunks(player.client.clone(), loading_chunks, view_distance) .await; } } diff --git a/pumpkin/src/world/scoreboard.rs b/pumpkin/src/world/scoreboard.rs index fa758d630..f82759548 100644 --- a/pumpkin/src/world/scoreboard.rs +++ b/pumpkin/src/world/scoreboard.rs @@ -26,7 +26,7 @@ impl Scoreboard { } } - pub fn add_objective(&mut self, world: &World, objective: ScoreboardObjective) { + pub async fn add_objective<'a>(&mut self, world: &World, objective: ScoreboardObjective<'a>) { if self.objectives.contains_key(objective.name) { // Maybe make this an error ? log::warn!( @@ -35,20 +35,24 @@ impl Scoreboard { ); return; } - world.broadcast_packet_all(&CUpdateObjectives::new( - objective.name, - pumpkin_protocol::client::play::Mode::Add, - objective.display_name, - objective.render_type, - objective.number_format, - )); - world.broadcast_packet_all(&CDisplayObjective::new( - pumpkin_protocol::client::play::DisplaySlot::Sidebar, - "test", - )); + world + .broadcast_packet_all(&CUpdateObjectives::new( + objective.name, + pumpkin_protocol::client::play::Mode::Add, + objective.display_name, + objective.render_type, + objective.number_format, + )) + .await; + world + .broadcast_packet_all(&CDisplayObjective::new( + pumpkin_protocol::client::play::DisplaySlot::Sidebar, + "test", + )) + .await; } - pub fn update_score(&self, world: &World, score: ScoreboardScore) { + pub async fn update_score<'a>(&self, world: &World, score: ScoreboardScore<'a>) { if self.objectives.contains_key(score.objective_name) { log::warn!( "Tried to place a score into a Objective which does not exist, {}", @@ -56,13 +60,15 @@ impl Scoreboard { ); return; } - world.broadcast_packet_all(&CUpdateScore::new( - score.entity_name, - score.objective_name, - score.value, - score.display_name, - score.number_format, - )); + world + .broadcast_packet_all(&CUpdateScore::new( + score.entity_name, + score.objective_name, + score.value, + score.display_name, + score.number_format, + )) + .await; } // pub fn add_team(&mut self, name: String) {