Skip to content

Commit

Permalink
More debug messages for packets (#156)
Browse files Browse the repository at this point in the history
* rebase to master

* rebase to master

* rebase to master

* rebase to master

* add debug messages for chunk sender

* add more debug messages for chunk sender

* add more debug messages for chunk sender

* fix dead lock

* rebase to master

* revert fixs leaving only debug messages

* better logging
  • Loading branch information
kralverde authored Oct 21, 2024
1 parent 1121102 commit 60f1bb6
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 70 deletions.
2 changes: 1 addition & 1 deletion pumpkin-world/src/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl Level {
let dropped_chunk_data = dropped_chunks
.iter()
.filter_map(|chunk| {
log::debug!("Unloading chunk {:?}", chunk);
//log::debug!("Unloading chunk {:?}", chunk);
loaded_chunks.remove_entry(*chunk)
})
.collect();
Expand Down
32 changes: 20 additions & 12 deletions pumpkin/src/client/client_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@ use super::{authentication::AuthError, Client, PlayerConfig};
/// NEVER TRUST THE CLIENT. HANDLE EVERY ERROR, UNWRAP/EXPECT
impl Client {
pub async fn handle_handshake(&self, handshake: SHandShake) {
log::debug!("handshake");
let version = handshake.protocol_version.0;
self.protocol_version
.store(version, std::sync::atomic::Ordering::Relaxed);
*self.server_address.lock().await = handshake.server_address;

log::debug!(
"Handshake: id {} is now in state {:?}",
self.id,
&handshake.next_state
);
self.connection_state.store(handshake.next_state);
if self.connection_state.load() != ConnectionState::Status {
let protocol = version;
Expand All @@ -56,11 +60,12 @@ impl Client {
}

pub async fn handle_status_request(&self, server: &Server, _status_request: SStatusRequest) {
log::debug!("Handling status request for id {}", self.id);
self.send_packet(&server.get_status()).await;
}

pub async fn handle_ping_request(&self, ping_request: SStatusPingRequest) {
log::debug!("recieved ping request");
log::debug!("Handling ping request for id {}", self.id);
self.send_packet(&CPingResponse::new(ping_request.payload))
.await;
self.close();
Expand All @@ -74,7 +79,11 @@ impl Client {
}

pub async fn handle_login_start(&self, server: &Server, login_start: SLoginStart) {
log::debug!("login start, State {:?}", self.connection_state);
log::debug!(
"login start for id {}, State {:?}",
self.id,
self.connection_state
);

if !Self::is_valid_player_name(&login_start.name) {
self.kick("Invalid characters in username").await;
Expand Down Expand Up @@ -127,13 +136,8 @@ impl Client {
server: &Server,
encryption_response: SEncryptionResponse,
) {
let shared_secret = match server.decrypt(&encryption_response.shared_secret) {
Ok(shared_secret) => shared_secret,
Err(error) => {
self.kick(&error.to_string()).await;
return;
}
};
log::debug!("Handling encryption for id {}", self.id);
let shared_secret = server.decrypt(&encryption_response.shared_secret).unwrap();

if let Err(error) = self.set_encryption(Some(&shared_secret)).await {
self.kick(&error.to_string()).await;
Expand Down Expand Up @@ -223,6 +227,7 @@ impl Client {
}

pub async fn handle_plugin_response(&self, plugin_response: SLoginPluginResponse) {
log::debug!("Handling plugin for id {}", self.id);
let velocity_config = &ADVANCED_CONFIG.proxy.velocity;
if velocity_config.enabled {
let mut address = self.address.lock().await;
Expand All @@ -246,6 +251,7 @@ impl Client {
server: &Server,
_login_acknowledged: SLoginAcknowledged,
) {
log::debug!("Handling login acknowledged for id {}", self.id);
self.connection_state.store(ConnectionState::Config);
self.send_packet(&server.get_branding()).await;

Expand Down Expand Up @@ -282,7 +288,7 @@ impl Client {
&self,
client_information: SClientInformationConfig,
) {
log::debug!("got client settings");
log::debug!("Handling client settings for id {}", self.id);
if let (Some(main_hand), Some(chat_mode)) = (
Hand::from_i32(client_information.main_hand.into()),
ChatMode::from_i32(client_information.chat_mode.into()),
Expand All @@ -303,6 +309,7 @@ impl Client {
}

pub async fn handle_plugin_message(&self, plugin_message: SPluginMessage) {
log::debug!("Handling plugin message for id {}", self.id);
if plugin_message.channel.starts_with("minecraft:brand")
|| plugin_message.channel.starts_with("MC|Brand")
{
Expand All @@ -315,6 +322,7 @@ impl Client {
}

pub async fn handle_known_packs(&self, server: &Server, _config_acknowledged: SKnownPacks) {
log::debug!("Handling known packs for id {}", self.id);
for registry in &server.cached_registry {
self.send_packet(&CRegistryData::new(
&registry.registry_id,
Expand All @@ -329,7 +337,7 @@ impl Client {
}

pub fn handle_config_acknowledged(&self, _config_acknowledged: &SAcknowledgeFinishConfig) {
log::debug!("config acknowledged");
log::debug!("Handling config acknowledge for id {}", self.id);
self.connection_state.store(ConnectionState::Play);
self.make_player
.store(true, std::sync::atomic::Ordering::Relaxed);
Expand Down
102 changes: 71 additions & 31 deletions pumpkin/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,44 +175,65 @@ impl Client {

/// Send a Clientbound Packet to the Client
pub async fn send_packet<P: ClientPacket>(&self, packet: &P) {
log::debug!("Sending packet with id {} to {}", P::PACKET_ID, self.id);
// assert!(!self.closed);
let mut enc = self.enc.lock().await;
if let Err(error) = enc.append_packet(packet) {
self.kick(&error.to_string()).await;
return;
}
if let Err(error) = self
.connection_writer
.lock()
.await

let mut writer = self.connection_writer.lock().await;
if let Err(error) = writer
.write_all(&enc.take())
.await
.map_err(|_| PacketError::ConnectionWrite)
{
self.kick(&error.to_string()).await;
} else if let Err(error) = writer.flush().await {
log::warn!(
"Failed to flush writer for id {}: {}",
self.id,
error.to_string()
);
}
}

pub async fn try_send_packet<P: ClientPacket>(&self, packet: &P) -> Result<(), PacketError> {
// assert!(!self.closed);
log::debug!(
"Trying to send packet with id {} to {}",
P::PACKET_ID,
self.id
);

let mut enc = self.enc.lock().await;
enc.append_packet(packet)?;
self.connection_writer
.lock()
.await

let mut writer = self.connection_writer.lock().await;
writer
.write_all(&enc.take())
.await
.map_err(|_| PacketError::ConnectionWrite)?;

writer
.flush()
.await
.map_err(|_| PacketError::ConnectionWrite)?;
Ok(())
}

/// Processes all packets send by the client
pub async fn process_packets(&self, server: &Arc<Server>) {
while let Some(mut packet) = self.client_packets_queue.lock().await.pop_front() {
let mut packet_queue = self.client_packets_queue.lock().await;
while let Some(mut packet) = packet_queue.pop_front() {
if let Err(error) = self.handle_packet(server, &mut packet).await {
let text = format!("Error while reading incoming packet {error}");
log::error!("{text}");
log::error!(
"Failed to read incoming packet with id {}: {}",
i32::from(packet.id),
error
);
self.kick(&text).await;
};
}
Expand Down Expand Up @@ -250,15 +271,19 @@ impl Client {
&self,
packet: &mut RawPacket,
) -> Result<(), DeserializerError> {
log::debug!("Handling handshake group for id {}", self.id);
let bytebuf = &mut packet.bytebuf;
if packet.id.0 == SHandShake::PACKET_ID {
self.handle_handshake(SHandShake::read(bytebuf)?).await;
} else {
log::error!(
"Failed to handle packet id {} while in Handshake state",
packet.id.0
);
}
match packet.id.0 {
SHandShake::PACKET_ID => {
self.handle_handshake(SHandShake::read(bytebuf)?).await;
}
_ => {
log::error!(
"Failed to handle packet id {} while in Handshake state",
packet.id.0
);
}
};
Ok(())
}

Expand All @@ -267,6 +292,7 @@ impl Client {
server: &Arc<Server>,
packet: &mut RawPacket,
) -> Result<(), DeserializerError> {
log::debug!("Handling status group for id {}", self.id);
let bytebuf = &mut packet.bytebuf;
match packet.id.0 {
SStatusRequest::PACKET_ID => {
Expand All @@ -283,7 +309,7 @@ impl Client {
packet.id.0
);
}
}
};
Ok(())
}

Expand All @@ -292,6 +318,7 @@ impl Client {
server: &Arc<Server>,
packet: &mut RawPacket,
) -> Result<(), DeserializerError> {
log::debug!("Handling login group for id {}", self.id);
let bytebuf = &mut packet.bytebuf;
match packet.id.0 {
SLoginStart::PACKET_ID => {
Expand All @@ -316,7 +343,7 @@ impl Client {
packet.id.0
);
}
}
};
Ok(())
}

Expand All @@ -325,6 +352,7 @@ impl Client {
server: &Arc<Server>,
packet: &mut RawPacket,
) -> Result<(), DeserializerError> {
log::debug!("Handling config group for id {}", self.id);
let bytebuf = &mut packet.bytebuf;
match packet.id.0 {
SClientInformationConfig::PACKET_ID => {
Expand All @@ -348,7 +376,7 @@ impl Client {
packet.id.0
);
}
}
};
Ok(())
}

Expand All @@ -358,26 +386,38 @@ impl Client {
pub async fn poll(&self) -> bool {
loop {
let mut dec = self.dec.lock().await;
if let Ok(Some(packet)) = dec.decode() {
self.add_packet(packet).await;
return true;
};

match dec.decode() {
Ok(Some(packet)) => {
self.add_packet(packet).await;
return true;
}
Ok(None) => log::debug!("Waiting for more data to complete packet..."),
Err(err) => log::warn!(
"Failed to decode packet for id {}: {}",
self.id,
err.to_string()
),
}

dec.reserve(4096);
let mut buf = dec.take_capacity();

match self.connection_reader.lock().await.read_buf(&mut buf).await {
Ok(0) => {
self.close();
return false;
let bytes_read = self.connection_reader.lock().await.read_buf(&mut buf).await;
match bytes_read {
Ok(cnt) => {
log::debug!("Read {} bytes", cnt);
if cnt == 0 {
self.close();
return false;
}
}
Err(error) => {
log::error!("Error while reading incoming packet {}", error);
self.close();
return false;
}
_ => {}
}
};

// This should always be an O(1) unsplit because we reserved space earlier and
// the call to `read_buf` shouldn't have grown the allocation.
Expand All @@ -387,7 +427,7 @@ impl Client {

/// Kicks the Client with a reason depending on the connection state
pub async fn kick(&self, reason: &str) {
log::debug!("Kicking client with reason: {}", reason);
log::info!("Kicking for id {} for {}", self.id, reason);
match self.connection_state.load() {
ConnectionState::Login => {
self.try_send_packet(&CLoginDisconnect::new(
Expand Down
Loading

0 comments on commit 60f1bb6

Please sign in to comment.