From bb050958a22e72970dcb3f114e896b0739b78157 Mon Sep 17 00:00:00 2001 From: Snowiiii Date: Mon, 21 Oct 2024 13:25:04 +0200 Subject: [PATCH] Add Tick system --- .gitignore | 5 +- pumpkin-config/src/lib.rs | 3 + .../src/client/play/c_keep_alive.rs | 8 ++- pumpkin/src/client/mod.rs | 22 ++----- pumpkin/src/client/player_packet.rs | 25 ++++++-- pumpkin/src/entity/player.rs | 48 +++++++++++--- pumpkin/src/main.rs | 63 ++++++------------- pumpkin/src/server/mod.rs | 8 +++ pumpkin/src/server/ticker.rs | 34 ++++++++++ pumpkin/src/world/mod.rs | 7 +++ 10 files changed, 142 insertions(+), 81 deletions(-) create mode 100644 pumpkin/src/server/ticker.rs diff --git a/.gitignore b/.gitignore index 874a1cc2f..b546dffbd 100644 --- a/.gitignore +++ b/.gitignore @@ -76,13 +76,10 @@ Cargo.lock #.idea/ # === PROJECT SPECIFIC === -# mc decompiled source -mc-source-code/ - plugins/* world/* -# project's binary configurations +# project's configurations configuration.toml features.toml diff --git a/pumpkin-config/src/lib.rs b/pumpkin-config/src/lib.rs index ebe9c99cf..b2fd20d05 100644 --- a/pumpkin-config/src/lib.rs +++ b/pumpkin-config/src/lib.rs @@ -91,6 +91,8 @@ pub struct BasicConfiguration { /// The server's description displayed on the status screen. #[serde_inline_default("A Blazing fast Pumpkin Server!".to_string())] pub motd: String, + #[serde_inline_default(20.0)] + pub tps: f32, /// The default game mode for players. #[serde_inline_default(GameMode::Survival)] pub default_gamemode: GameMode, @@ -123,6 +125,7 @@ impl Default for BasicConfiguration { online_mode: true, encryption: true, motd: "A Blazing fast Pumpkin Server!".to_string(), + tps: 20.0, default_gamemode: GameMode::Survival, scrub_ips: true, use_favicon: true, diff --git a/pumpkin-protocol/src/client/play/c_keep_alive.rs b/pumpkin-protocol/src/client/play/c_keep_alive.rs index c1d95f094..ded73ab7d 100644 --- a/pumpkin-protocol/src/client/play/c_keep_alive.rs +++ b/pumpkin-protocol/src/client/play/c_keep_alive.rs @@ -4,5 +4,11 @@ use serde::Serialize; #[packet(0x26)] #[derive(Serialize)] pub struct CKeepAlive { - pub keep_alive_id: i64, + keep_alive_id: i64, +} + +impl CKeepAlive { + pub fn new(keep_alive_id: i64) -> Self { + Self { keep_alive_id } + } } diff --git a/pumpkin/src/client/mod.rs b/pumpkin/src/client/mod.rs index 6d2476185..c7eb05ac0 100644 --- a/pumpkin/src/client/mod.rs +++ b/pumpkin/src/client/mod.rs @@ -105,7 +105,6 @@ pub struct Client { /// The underlying TCP connection to the client. pub connection_reader: Arc>, pub connection_writer: Arc>, - /// The client's IP address. pub address: Mutex, /// The packet encoder for outgoing packets. @@ -114,22 +113,12 @@ pub struct Client { dec: Arc>, /// A queue of raw packets received from the client, waiting to be processed. pub client_packets_queue: Arc>>, - /// Indicates whether the client should be converted into a player. pub make_player: AtomicBool, - /// Sends each keep alive packet that the server receives for a player to here, which gets picked up in a tokio task - pub keep_alive_sender: Arc>, - /// Stores the last time it was confirmed that the client is alive - pub last_alive_received: AtomicCell, } impl Client { - pub fn new( - id: usize, - connection: tokio::net::TcpStream, - address: SocketAddr, - keep_alive_sender: Arc>, - ) -> Self { + pub fn new(id: usize, connection: tokio::net::TcpStream, address: SocketAddr) -> Self { let (connection_reader, connection_writer) = connection.into_split(); Self { protocol_version: AtomicI32::new(0), @@ -148,8 +137,6 @@ impl Client { closed: AtomicBool::new(false), client_packets_queue: Arc::new(Mutex::new(VecDeque::new())), make_player: AtomicBool::new(false), - keep_alive_sender, - last_alive_received: AtomicCell::new(std::time::Instant::now()), } } @@ -382,12 +369,13 @@ impl Client { /// Reads the connection until our buffer of len 4096 is full, then decode /// Close connection when an error occurs or when the Client closed the connection - pub async fn poll(&self) { + /// Returns if connection is still open + pub async fn poll(&self) -> bool { loop { let mut dec = self.dec.lock().await; if let Ok(Some(packet)) = dec.decode() { self.add_packet(packet).await; - return; + return true; }; dec.reserve(4096); @@ -403,7 +391,7 @@ impl Client { == 0 { self.close(); - return; + return false; } // This should always be an O(1) unsplit because we reserved space earlier and diff --git a/pumpkin/src/client/player_packet.rs b/pumpkin/src/client/player_packet.rs index a1a88bd61..d51ff8a2a 100644 --- a/pumpkin/src/client/player_packet.rs +++ b/pumpkin/src/client/player_packet.rs @@ -15,6 +15,7 @@ use pumpkin_core::{ }; use pumpkin_entity::EntityId; use pumpkin_inventory::{InventoryError, WindowType}; +use pumpkin_protocol::server::play::{SCloseContainer, SKeepAlive, SSetPlayerGround, SUseItem}; use pumpkin_protocol::{ client::play::{ Animation, CAcknowledgeBlockChange, CBlockUpdate, CEntityAnimation, CEntityVelocity, @@ -22,10 +23,10 @@ use pumpkin_protocol::{ CUpdateEntityPosRot, CUpdateEntityRot, CWorldEvent, FilterType, }, server::play::{ - Action, ActionType, SChatCommand, SChatMessage, SClientInformationPlay, SCloseContainer, - SConfirmTeleport, SInteract, SPlayPingRequest, SPlayerAbilities, SPlayerAction, - SPlayerCommand, SPlayerPosition, SPlayerPositionRotation, SPlayerRotation, - SSetCreativeSlot, SSetHeldItem, SSetPlayerGround, SSwingArm, SUseItem, SUseItemOn, Status, + Action, ActionType, SChatCommand, SChatMessage, SClientInformationPlay, SConfirmTeleport, + SInteract, SPlayPingRequest, SPlayerAbilities, SPlayerAction, SPlayerCommand, + SPlayerPosition, SPlayerPositionRotation, SPlayerRotation, SSetCreativeSlot, SSetHeldItem, + SSwingArm, SUseItemOn, Status, }, }; use pumpkin_world::block::{BlockFace, BlockState}; @@ -539,6 +540,22 @@ impl Player { } } + pub async fn handle_keep_alive(&self, keep_alive: SKeepAlive) { + if self + .wait_for_keep_alive + .load(std::sync::atomic::Ordering::Relaxed) + && keep_alive.keep_alive_id + == self + .keep_alive_id + .load(std::sync::atomic::Ordering::Relaxed) + { + self.wait_for_keep_alive + .store(false, std::sync::atomic::Ordering::Relaxed); + } else { + self.kick(TextComponent::text("Timeout")).await + } + } + pub async fn handle_player_abilities(&self, player_abilities: SPlayerAbilities) { let mut abilities = self.abilities.lock().await; diff --git a/pumpkin/src/entity/player.rs b/pumpkin/src/entity/player.rs index 7ac249388..ad8834795 100644 --- a/pumpkin/src/entity/player.rs +++ b/pumpkin/src/entity/player.rs @@ -1,6 +1,9 @@ -use std::sync::{ - atomic::{AtomicI32, AtomicU8}, - Arc, +use std::{ + sync::{ + atomic::{AtomicBool, AtomicI32, AtomicI64, AtomicU8}, + Arc, + }, + time::{Duration, Instant}, }; use crossbeam::atomic::AtomicCell; @@ -16,7 +19,7 @@ use pumpkin_inventory::player::PlayerInventory; use pumpkin_protocol::{ bytebuf::packet_id::Packet, client::play::{ - CGameEvent, CPlayDisconnect, CPlayerAbilities, CPlayerInfoUpdate, CSetHealth, + CGameEvent, CKeepAlive, CPlayDisconnect, CPlayerAbilities, CPlayerInfoUpdate, CSetHealth, CSyncPlayerPosition, CSystemChatMessage, GameEvent, PlayerAction, }, server::play::{ @@ -85,9 +88,14 @@ pub struct Player { pub teleport_id_count: AtomicI32, /// The pending teleport information, including the teleport ID and target location. pub awaiting_teleport: Mutex)>>, - /// The coordinates of the chunk section the player is currently watching. pub watched_section: AtomicCell>, + /// Did we send a keep alive Packet and wait for the response? + pub wait_for_keep_alive: AtomicBool, + /// Whats the keep alive packet payload we send, The client should responde with the same id + pub keep_alive_id: AtomicI64, + /// Last time we send a keep alive + pub last_keep_alive_time: AtomicCell, } impl Player { @@ -133,6 +141,9 @@ impl Player { gamemode: AtomicCell::new(gamemode), watched_section: AtomicCell::new(Vector3::new(0, 0, 0)), last_position: AtomicCell::new(Vector3::new(0.0, 0.0, 0.0)), + wait_for_keep_alive: AtomicBool::new(false), + keep_alive_id: AtomicI64::new(0), + last_keep_alive_time: AtomicCell::new(std::time::Instant::now()), } } @@ -150,6 +161,27 @@ impl Player { .await; } + pub async fn tick(&self) { + let now = Instant::now(); + if now.duration_since(self.last_keep_alive_time.load()) >= Duration::from_secs(15) { + // We never got a response from our last keep alive we send + if self + .wait_for_keep_alive + .load(std::sync::atomic::Ordering::Relaxed) + { + self.kick(TextComponent::text("Timeout")).await; + return; + } + self.wait_for_keep_alive + .store(true, std::sync::atomic::Ordering::Relaxed); + self.last_keep_alive_time.store(now); + let id = now.elapsed().as_millis() as i64; + self.keep_alive_id + .store(id, std::sync::atomic::Ordering::Relaxed); + self.client.send_packet(&CKeepAlive::new(id)).await; + } + } + pub const fn entity_id(&self) -> EntityId { self.living_entity.entity.entity_id } @@ -418,11 +450,7 @@ impl Player { Ok(()) } SKeepAlive::PACKET_ID => { - self.client - .keep_alive_sender - .send(SKeepAlive::read(bytebuf)?.keep_alive_id) - .await - .unwrap(); + self.handle_keep_alive(SKeepAlive::read(bytebuf)?).await; Ok(()) } _ => { diff --git a/pumpkin/src/main.rs b/pumpkin/src/main.rs index 8eb3922e0..6d52e9197 100644 --- a/pumpkin/src/main.rs +++ b/pumpkin/src/main.rs @@ -18,11 +18,8 @@ compile_error!("Compiling for WASI targets is not supported!"); use log::LevelFilter; use client::Client; -use pumpkin_protocol::client::play::CKeepAlive; -use pumpkin_protocol::ConnectionState; -use server::Server; +use server::{ticker::Ticker, Server}; use std::io::{self}; -use std::time::Duration; // Setup some tokens to allow us to identify which event is for which socket. @@ -126,6 +123,8 @@ async fn main() -> io::Result<()> { let rcon = ADVANCED_CONFIG.rcon.clone(); let server = Arc::new(Server::new()); + let mut ticker = Ticker::new(BASIC_CONFIG.tps); + log::info!("Started Server took {}ms", time.elapsed().as_millis()); log::info!("You now can connect to the server, Listening on {}", addr); @@ -154,6 +153,12 @@ async fn main() -> io::Result<()> { RCONServer::new(&rcon, server).await.unwrap(); }); } + { + let server = server.clone(); + tokio::spawn(async move { + ticker.run(&server).await; + }); + } let mut unique_id = 0; loop { // Asynchronously wait for an inbound socket. @@ -172,53 +177,19 @@ async fn main() -> io::Result<()> { id ); - let keep_alive = tokio::sync::mpsc::channel(1024); - let client = Arc::new(Client::new(id, connection, addr, keep_alive.0.into())); - - { - let client = client.clone(); - let mut receiver = keep_alive.1; - tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(1)); - loop { - interval.tick().await; - let now = std::time::Instant::now(); - if client.connection_state.load() == ConnectionState::Play { - if now.duration_since(client.last_alive_received.load()) - >= Duration::from_secs(15) - { - dbg!("no keep alive"); - client.kick("No keep alive received").await; - break; - } - let random = rand::random::(); - client - .send_packet(&CKeepAlive { - keep_alive_id: random, - }) - .await; - if let Some(id) = receiver.recv().await { - if id == random { - client.last_alive_received.store(now); - } - } - } else { - client.last_alive_received.store(now); - } - } - }); - } + let client = Arc::new(Client::new(id, connection, addr)); let server = server.clone(); tokio::spawn(async move { - let server = &server; // Reference to server while !client.closed.load(std::sync::atomic::Ordering::Relaxed) && !client .make_player .load(std::sync::atomic::Ordering::Relaxed) { - client.process_packets(server).await; - client.poll().await; + let open = client.poll().await; + if open { + client.process_packets(&server).await + }; } if client .make_player @@ -233,8 +204,10 @@ async fn main() -> io::Result<()> { .closed .load(std::sync::atomic::Ordering::Relaxed) { - player.process_packets(server).await; - player.client.poll().await; + let open = player.client.poll().await; + if open { + player.process_packets(&server).await + }; } player.remove().await; } diff --git a/pumpkin/src/server/mod.rs b/pumpkin/src/server/mod.rs index a720e4c15..7d92064e8 100644 --- a/pumpkin/src/server/mod.rs +++ b/pumpkin/src/server/mod.rs @@ -31,6 +31,8 @@ use crate::{ mod connection_cache; mod key_store; +pub mod ticker; + pub const CURRENT_MC_VERSION: &str = "1.21.1"; pub struct Server { @@ -168,4 +170,10 @@ impl Server { pub fn digest_secret(&self, secret: &[u8]) -> String { self.key_store.get_digest(secret) } + + async fn tick(&self) { + for world in &self.worlds { + world.tick().await; + } + } } diff --git a/pumpkin/src/server/ticker.rs b/pumpkin/src/server/ticker.rs new file mode 100644 index 000000000..291b77777 --- /dev/null +++ b/pumpkin/src/server/ticker.rs @@ -0,0 +1,34 @@ +use std::time::{Duration, Instant}; + +use super::Server; + +pub struct Ticker { + tick_interval: Duration, + last_tick: Instant, +} + +impl Ticker { + pub fn new(tps: f32) -> Self { + Self { + tick_interval: Duration::from_millis((1000.0 / tps) as u64), + last_tick: Instant::now(), + } + } + + /// IMPORTANT: Run this in a new thread/tokio task + pub async fn run(&mut self, server: &Server) { + loop { + let now = Instant::now(); + let elapsed = now - self.last_tick; + + if elapsed >= self.tick_interval { + server.tick().await; + self.last_tick = now; + } else { + // Wait for the remaining time until the next tick + let sleep_time = self.tick_interval - elapsed; + std::thread::sleep(sleep_time); + } + } + } +} diff --git a/pumpkin/src/world/mod.rs b/pumpkin/src/world/mod.rs index dd795496b..9a422b49c 100644 --- a/pumpkin/src/world/mod.rs +++ b/pumpkin/src/world/mod.rs @@ -81,6 +81,13 @@ impl World { } } + pub async fn tick(&self) { + let current_players = self.current_players.lock().await; + for player in current_players.values() { + player.tick().await; + } + } + pub async fn spawn_player(&self, base_config: &BasicConfiguration, player: Arc) { // This code follows the vanilla packet order let entity_id = player.entity_id();