From 60f1bb619576bb434168749d5a9c2533ec68eda7 Mon Sep 17 00:00:00 2001 From: kralverde <80051564+kralverde@users.noreply.github.com> Date: Mon, 21 Oct 2024 15:06:50 -0400 Subject: [PATCH] More debug messages for packets (#156) * rebase to master * rebase to master * rebase to master * rebase to master * add debug messages for chunk sender * add more debug messages for chunk sender * add more debug messages for chunk sender * fix dead lock * rebase to master * revert fixs leaving only debug messages * better logging --- pumpkin-world/src/level.rs | 2 +- pumpkin/src/client/client_packet.rs | 32 +++++---- pumpkin/src/client/mod.rs | 102 +++++++++++++++++++--------- pumpkin/src/entity/player.rs | 33 +++------ pumpkin/src/main.rs | 1 + pumpkin/src/world/mod.rs | 21 +++++- pumpkin/src/world/player_chunker.rs | 15 +++- 7 files changed, 136 insertions(+), 70 deletions(-) diff --git a/pumpkin-world/src/level.rs b/pumpkin-world/src/level.rs index 5cfae743d..8fff3a6cf 100644 --- a/pumpkin-world/src/level.rs +++ b/pumpkin-world/src/level.rs @@ -123,7 +123,7 @@ impl Level { let dropped_chunk_data = dropped_chunks .iter() .filter_map(|chunk| { - log::debug!("Unloading chunk {:?}", chunk); + //log::debug!("Unloading chunk {:?}", chunk); loaded_chunks.remove_entry(*chunk) }) .collect(); diff --git a/pumpkin/src/client/client_packet.rs b/pumpkin/src/client/client_packet.rs index fe43d2cbf..05e57d5c2 100644 --- a/pumpkin/src/client/client_packet.rs +++ b/pumpkin/src/client/client_packet.rs @@ -34,12 +34,16 @@ use super::{authentication::AuthError, Client, PlayerConfig}; /// NEVER TRUST THE CLIENT. HANDLE EVERY ERROR, UNWRAP/EXPECT impl Client { pub async fn handle_handshake(&self, handshake: SHandShake) { - log::debug!("handshake"); let version = handshake.protocol_version.0; self.protocol_version .store(version, std::sync::atomic::Ordering::Relaxed); *self.server_address.lock().await = handshake.server_address; + log::debug!( + "Handshake: id {} is now in state {:?}", + self.id, + &handshake.next_state + ); self.connection_state.store(handshake.next_state); if self.connection_state.load() != ConnectionState::Status { let protocol = version; @@ -56,11 +60,12 @@ impl Client { } pub async fn handle_status_request(&self, server: &Server, _status_request: SStatusRequest) { + log::debug!("Handling status request for id {}", self.id); self.send_packet(&server.get_status()).await; } pub async fn handle_ping_request(&self, ping_request: SStatusPingRequest) { - log::debug!("recieved ping request"); + log::debug!("Handling ping request for id {}", self.id); self.send_packet(&CPingResponse::new(ping_request.payload)) .await; self.close(); @@ -74,7 +79,11 @@ impl Client { } pub async fn handle_login_start(&self, server: &Server, login_start: SLoginStart) { - log::debug!("login start, State {:?}", self.connection_state); + log::debug!( + "login start for id {}, State {:?}", + self.id, + self.connection_state + ); if !Self::is_valid_player_name(&login_start.name) { self.kick("Invalid characters in username").await; @@ -127,13 +136,8 @@ impl Client { server: &Server, encryption_response: SEncryptionResponse, ) { - let shared_secret = match server.decrypt(&encryption_response.shared_secret) { - Ok(shared_secret) => shared_secret, - Err(error) => { - self.kick(&error.to_string()).await; - return; - } - }; + log::debug!("Handling encryption for id {}", self.id); + let shared_secret = server.decrypt(&encryption_response.shared_secret).unwrap(); if let Err(error) = self.set_encryption(Some(&shared_secret)).await { self.kick(&error.to_string()).await; @@ -223,6 +227,7 @@ impl Client { } pub async fn handle_plugin_response(&self, plugin_response: SLoginPluginResponse) { + log::debug!("Handling plugin for id {}", self.id); let velocity_config = &ADVANCED_CONFIG.proxy.velocity; if velocity_config.enabled { let mut address = self.address.lock().await; @@ -246,6 +251,7 @@ impl Client { server: &Server, _login_acknowledged: SLoginAcknowledged, ) { + log::debug!("Handling login acknowledged for id {}", self.id); self.connection_state.store(ConnectionState::Config); self.send_packet(&server.get_branding()).await; @@ -282,7 +288,7 @@ impl Client { &self, client_information: SClientInformationConfig, ) { - log::debug!("got client settings"); + log::debug!("Handling client settings for id {}", self.id); if let (Some(main_hand), Some(chat_mode)) = ( Hand::from_i32(client_information.main_hand.into()), ChatMode::from_i32(client_information.chat_mode.into()), @@ -303,6 +309,7 @@ impl Client { } pub async fn handle_plugin_message(&self, plugin_message: SPluginMessage) { + log::debug!("Handling plugin message for id {}", self.id); if plugin_message.channel.starts_with("minecraft:brand") || plugin_message.channel.starts_with("MC|Brand") { @@ -315,6 +322,7 @@ impl Client { } pub async fn handle_known_packs(&self, server: &Server, _config_acknowledged: SKnownPacks) { + log::debug!("Handling known packs for id {}", self.id); for registry in &server.cached_registry { self.send_packet(&CRegistryData::new( ®istry.registry_id, @@ -329,7 +337,7 @@ impl Client { } pub fn handle_config_acknowledged(&self, _config_acknowledged: &SAcknowledgeFinishConfig) { - log::debug!("config acknowledged"); + log::debug!("Handling config acknowledge for id {}", self.id); self.connection_state.store(ConnectionState::Play); self.make_player .store(true, std::sync::atomic::Ordering::Relaxed); diff --git a/pumpkin/src/client/mod.rs b/pumpkin/src/client/mod.rs index 5bbfc76be..4294bd7ef 100644 --- a/pumpkin/src/client/mod.rs +++ b/pumpkin/src/client/mod.rs @@ -175,44 +175,65 @@ impl Client { /// Send a Clientbound Packet to the Client pub async fn send_packet(&self, packet: &P) { + log::debug!("Sending packet with id {} to {}", P::PACKET_ID, self.id); // assert!(!self.closed); let mut enc = self.enc.lock().await; if let Err(error) = enc.append_packet(packet) { self.kick(&error.to_string()).await; return; } - if let Err(error) = self - .connection_writer - .lock() - .await + + let mut writer = self.connection_writer.lock().await; + if let Err(error) = writer .write_all(&enc.take()) .await .map_err(|_| PacketError::ConnectionWrite) { self.kick(&error.to_string()).await; + } else if let Err(error) = writer.flush().await { + log::warn!( + "Failed to flush writer for id {}: {}", + self.id, + error.to_string() + ); } } pub async fn try_send_packet(&self, packet: &P) -> Result<(), PacketError> { // assert!(!self.closed); + log::debug!( + "Trying to send packet with id {} to {}", + P::PACKET_ID, + self.id + ); let mut enc = self.enc.lock().await; enc.append_packet(packet)?; - self.connection_writer - .lock() - .await + + let mut writer = self.connection_writer.lock().await; + writer .write_all(&enc.take()) .await .map_err(|_| PacketError::ConnectionWrite)?; + + writer + .flush() + .await + .map_err(|_| PacketError::ConnectionWrite)?; Ok(()) } /// Processes all packets send by the client pub async fn process_packets(&self, server: &Arc) { - while let Some(mut packet) = self.client_packets_queue.lock().await.pop_front() { + let mut packet_queue = self.client_packets_queue.lock().await; + while let Some(mut packet) = packet_queue.pop_front() { if let Err(error) = self.handle_packet(server, &mut packet).await { let text = format!("Error while reading incoming packet {error}"); - log::error!("{text}"); + log::error!( + "Failed to read incoming packet with id {}: {}", + i32::from(packet.id), + error + ); self.kick(&text).await; }; } @@ -250,15 +271,19 @@ impl Client { &self, packet: &mut RawPacket, ) -> Result<(), DeserializerError> { + log::debug!("Handling handshake group for id {}", self.id); let bytebuf = &mut packet.bytebuf; - if packet.id.0 == SHandShake::PACKET_ID { - self.handle_handshake(SHandShake::read(bytebuf)?).await; - } else { - log::error!( - "Failed to handle packet id {} while in Handshake state", - packet.id.0 - ); - } + match packet.id.0 { + SHandShake::PACKET_ID => { + self.handle_handshake(SHandShake::read(bytebuf)?).await; + } + _ => { + log::error!( + "Failed to handle packet id {} while in Handshake state", + packet.id.0 + ); + } + }; Ok(()) } @@ -267,6 +292,7 @@ impl Client { server: &Arc, packet: &mut RawPacket, ) -> Result<(), DeserializerError> { + log::debug!("Handling status group for id {}", self.id); let bytebuf = &mut packet.bytebuf; match packet.id.0 { SStatusRequest::PACKET_ID => { @@ -283,7 +309,7 @@ impl Client { packet.id.0 ); } - } + }; Ok(()) } @@ -292,6 +318,7 @@ impl Client { server: &Arc, packet: &mut RawPacket, ) -> Result<(), DeserializerError> { + log::debug!("Handling login group for id {}", self.id); let bytebuf = &mut packet.bytebuf; match packet.id.0 { SLoginStart::PACKET_ID => { @@ -316,7 +343,7 @@ impl Client { packet.id.0 ); } - } + }; Ok(()) } @@ -325,6 +352,7 @@ impl Client { server: &Arc, packet: &mut RawPacket, ) -> Result<(), DeserializerError> { + log::debug!("Handling config group for id {}", self.id); let bytebuf = &mut packet.bytebuf; match packet.id.0 { SClientInformationConfig::PACKET_ID => { @@ -348,7 +376,7 @@ impl Client { packet.id.0 ); } - } + }; Ok(()) } @@ -358,26 +386,38 @@ impl Client { pub async fn poll(&self) -> bool { loop { let mut dec = self.dec.lock().await; - if let Ok(Some(packet)) = dec.decode() { - self.add_packet(packet).await; - return true; - }; + + match dec.decode() { + Ok(Some(packet)) => { + self.add_packet(packet).await; + return true; + } + Ok(None) => log::debug!("Waiting for more data to complete packet..."), + Err(err) => log::warn!( + "Failed to decode packet for id {}: {}", + self.id, + err.to_string() + ), + } dec.reserve(4096); let mut buf = dec.take_capacity(); - match self.connection_reader.lock().await.read_buf(&mut buf).await { - Ok(0) => { - self.close(); - return false; + let bytes_read = self.connection_reader.lock().await.read_buf(&mut buf).await; + match bytes_read { + Ok(cnt) => { + log::debug!("Read {} bytes", cnt); + if cnt == 0 { + self.close(); + return false; + } } Err(error) => { log::error!("Error while reading incoming packet {}", error); self.close(); return false; } - _ => {} - } + }; // This should always be an O(1) unsplit because we reserved space earlier and // the call to `read_buf` shouldn't have grown the allocation. @@ -387,7 +427,7 @@ impl Client { /// Kicks the Client with a reason depending on the connection state pub async fn kick(&self, reason: &str) { - log::debug!("Kicking client with reason: {}", reason); + log::info!("Kicking for id {} for {}", self.id, reason); match self.connection_state.load() { ConnectionState::Login => { self.try_send_packet(&CLoginDisconnect::new( diff --git a/pumpkin/src/entity/player.rs b/pumpkin/src/entity/player.rs index dd8c23b41..ab388f3cd 100644 --- a/pumpkin/src/entity/player.rs +++ b/pumpkin/src/entity/player.rs @@ -353,110 +353,97 @@ impl Player { server: &Arc, packet: &mut RawPacket, ) -> Result<(), Box> { + /* + log::debug!( + "Handling player packet with id {} for {}", + packet.id.0, + self.client.id + ); + */ + let bytebuf = &mut packet.bytebuf; match packet.id.0 { SConfirmTeleport::PACKET_ID => { self.handle_confirm_teleport(SConfirmTeleport::read(bytebuf)?) .await; - Ok(()) } SChatCommand::PACKET_ID => { self.handle_chat_command(server, SChatCommand::read(bytebuf)?) .await; - Ok(()) } SPlayerPosition::PACKET_ID => { self.handle_position(SPlayerPosition::read(bytebuf)?).await; - Ok(()) } SPlayerPositionRotation::PACKET_ID => { self.handle_position_rotation(SPlayerPositionRotation::read(bytebuf)?) .await; - Ok(()) } SPlayerRotation::PACKET_ID => { self.handle_rotation(SPlayerRotation::read(bytebuf)?).await; - Ok(()) } SSetPlayerGround::PACKET_ID => { self.handle_player_ground(&SSetPlayerGround::read(bytebuf)?); - Ok(()) } SPlayerCommand::PACKET_ID => { self.handle_player_command(SPlayerCommand::read(bytebuf)?) .await; - Ok(()) } SSwingArm::PACKET_ID => { self.handle_swing_arm(SSwingArm::read(bytebuf)?).await; - Ok(()) } SChatMessage::PACKET_ID => { self.handle_chat_message(SChatMessage::read(bytebuf)?).await; - Ok(()) } SClientInformationPlay::PACKET_ID => { self.handle_client_information_play(SClientInformationPlay::read(bytebuf)?) .await; - Ok(()) } SInteract::PACKET_ID => { self.handle_interact(server, SInteract::read(bytebuf)?) .await; - Ok(()) } SPlayerAction::PACKET_ID => { self.handle_player_action(SPlayerAction::read(bytebuf)?) .await; - Ok(()) } SPlayerAbilities::PACKET_ID => { self.handle_player_abilities(SPlayerAbilities::read(bytebuf)?) .await; - Ok(()) } SUseItemOn::PACKET_ID => { self.handle_use_item_on(SUseItemOn::read(bytebuf)?).await; - Ok(()) } SUseItem::PACKET_ID => { self.handle_use_item(&SUseItem::read(bytebuf)?); - Ok(()) } SSetHeldItem::PACKET_ID => { self.handle_set_held_item(SSetHeldItem::read(bytebuf)?) .await; - Ok(()) } SSetCreativeSlot::PACKET_ID => { self.handle_set_creative_slot(SSetCreativeSlot::read(bytebuf)?) .await?; - Ok(()) } SPlayPingRequest::PACKET_ID => { self.handle_play_ping_request(SPlayPingRequest::read(bytebuf)?) .await; - Ok(()) } SClickContainer::PACKET_ID => { self.handle_click_container(server, SClickContainer::read(bytebuf)?) .await?; - Ok(()) } SCloseContainer::PACKET_ID => { self.handle_close_container(server, SCloseContainer::read(bytebuf)?) .await; - Ok(()) } SKeepAlive::PACKET_ID => { self.handle_keep_alive(SKeepAlive::read(bytebuf)?).await; - Ok(()) } _ => { log::error!("Failed to handle player packet id {:#04x}", packet.id.0); - Ok(()) } - } + }; + Ok(()) } } diff --git a/pumpkin/src/main.rs b/pumpkin/src/main.rs index a25ed3a4d..74c51e921 100644 --- a/pumpkin/src/main.rs +++ b/pumpkin/src/main.rs @@ -205,6 +205,7 @@ async fn main() -> io::Result<()> { .load(std::sync::atomic::Ordering::Relaxed) { let id = client.id; + log::debug!("Creating player for id {}", id); let (player, world) = server.add_player(id, client).await; world.spawn_player(&BASIC_CONFIG, player.clone()).await; // poll Player diff --git a/pumpkin/src/world/mod.rs b/pumpkin/src/world/mod.rs index 5626862f0..7aec512b8 100644 --- a/pumpkin/src/world/mod.rs +++ b/pumpkin/src/world/mod.rs @@ -93,8 +93,13 @@ impl World { // This code follows the vanilla packet order let entity_id = player.entity_id(); let gamemode = player.gamemode.load(); - log::debug!("spawning player, entity id {}", entity_id); + log::debug!( + "spawning player {}, entity id {}", + player.client.id, + entity_id + ); + log::debug!("Sending login packet to {}", player.client.id); // login packet for our new player player .client @@ -120,9 +125,10 @@ impl World { false, )) .await; - log::debug!("sending abilities"); + // player abilities // TODO: this is for debug purpose, remove later + log::debug!("Sending player abilities to {}", player.client.id); player .client .send_packet(&CPlayerAbilities::new(0x02, 0.4, 0.1)) @@ -134,6 +140,8 @@ impl World { let z = 10.0; let yaw = 10.0; let pitch = 10.0; + + log::debug!("Sending player teleport to {}", player.client.id); player.teleport(x, y, z, yaw, pitch).await; let pos = player.living_entity.entity.pos.load(); @@ -142,6 +150,7 @@ impl World { let gameprofile = &player.gameprofile; // first send info update to our new player, So he can see his Skin // also send his info to everyone else + log::debug!("Broadcasting player info for {}", player.client.id); self.broadcast_packet_all(&CPlayerInfoUpdate::new( 0x01 | 0x08, &[pumpkin_protocol::client::play::Player { @@ -177,6 +186,7 @@ impl World { ], }); } + log::debug!("Sending player info to {}", player.client.id); player .client .send_packet(&CPlayerInfoUpdate::new(0x01 | 0x08, &entries)) @@ -185,6 +195,7 @@ impl World { let gameprofile = &player.gameprofile; + log::debug!("Broadcasting player spawn for {}", player.client.id); // spawn player for every client self.broadcast_packet_expect( &[player.client.id], @@ -218,6 +229,7 @@ impl World { let entity = &existing_player.living_entity.entity; let pos = entity.pos.load(); let gameprofile = &existing_player.gameprofile; + log::debug!("Sending player entities to {}", player.client.id); player .client .send_packet(&CSpawnEntity::new( @@ -244,10 +256,12 @@ impl World { entity_id.into(), Metadata::new(17, VarInt(0), config.skin_parts), ); + log::debug!("Broadcasting skin for {}", player.client.id); self.broadcast_packet_all(&packet).await; } // Start waiting for level chunks, Sets the "Loading Terrain" screen + log::debug!("Sending waiting chunks to {}", player.client.id); player .client .send_packet(&CGameEvent::new(GameEvent::StartWaitingChunks, 0.0)) @@ -277,15 +291,18 @@ impl World { } let inst = std::time::Instant::now(); let (sender, mut chunk_receiver) = mpsc::channel(distance as usize); + let client_id = client.id; let level = self.level.clone(); let chunks = Arc::new(chunks); tokio::spawn(async move { + log::debug!("Spawned chunk fetcher for {}", client_id); let level = level.lock().await; level.fetch_chunks(&chunks, sender); }); tokio::spawn(async move { + log::debug!("Spawned chunk sender for {}", client_id); while let Some(chunk_data) = chunk_receiver.recv().await { // dbg!(chunk_pos); #[cfg(debug_assertions)] diff --git a/pumpkin/src/world/player_chunker.rs b/pumpkin/src/world/player_chunker.rs index 5cfc89fa6..3965ba560 100644 --- a/pumpkin/src/world/player_chunker.rs +++ b/pumpkin/src/world/player_chunker.rs @@ -25,6 +25,8 @@ pub async fn player_join(world: &World, player: Arc) { player.watched_section.store(new_watched); let watched_section = new_watched; let chunk_pos = player.living_entity.entity.chunk_pos.load(); + + log::debug!("Sending center chunk to {}", player.client.id); player .client .send_packet(&CCenterChunk { @@ -34,10 +36,12 @@ pub async fn player_join(world: &World, player: Arc) { .await; let view_distance = i32::from(get_view_distance(&player).await); log::debug!( - "Player {} joined with view distance: {}", + "Player {} ({}) joined with view distance: {}", player.gameprofile.name, + player.client.id, view_distance ); + let old_cylindrical = Cylindrical::new( Vector2::new(watched_section.x, watched_section.z), view_distance, @@ -59,6 +63,15 @@ pub async fn player_join(world: &World, player: Arc) { }, true, ); + + log::debug!( + "{} added {} remove ({}) for {}", + loading_chunks.len(), + unloading_chunks.len(), + view_distance, + player.client.id + ); + if !loading_chunks.is_empty() { world.mark_chunks_as_watched(&loading_chunks).await; world.spawn_world_chunks(player.client.clone(), loading_chunks, view_distance);