Skip to content

Commit

Permalink
Add Tick system
Browse files Browse the repository at this point in the history
  • Loading branch information
Snowiiii committed Oct 21, 2024
1 parent 8381324 commit bb05095
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 81 deletions.
5 changes: 1 addition & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,10 @@ Cargo.lock
#.idea/

# === PROJECT SPECIFIC ===
# mc decompiled source
mc-source-code/

plugins/*
world/*

# project's binary configurations
# project's configurations
configuration.toml
features.toml

Expand Down
3 changes: 3 additions & 0 deletions pumpkin-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ pub struct BasicConfiguration {
/// The server's description displayed on the status screen.
#[serde_inline_default("A Blazing fast Pumpkin Server!".to_string())]
pub motd: String,
#[serde_inline_default(20.0)]
pub tps: f32,
/// The default game mode for players.
#[serde_inline_default(GameMode::Survival)]
pub default_gamemode: GameMode,
Expand Down Expand Up @@ -123,6 +125,7 @@ impl Default for BasicConfiguration {
online_mode: true,
encryption: true,
motd: "A Blazing fast Pumpkin Server!".to_string(),
tps: 20.0,
default_gamemode: GameMode::Survival,
scrub_ips: true,
use_favicon: true,
Expand Down
8 changes: 7 additions & 1 deletion pumpkin-protocol/src/client/play/c_keep_alive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,11 @@ use serde::Serialize;
#[packet(0x26)]
#[derive(Serialize)]
pub struct CKeepAlive {
pub keep_alive_id: i64,
keep_alive_id: i64,
}

impl CKeepAlive {
pub fn new(keep_alive_id: i64) -> Self {
Self { keep_alive_id }
}
}
22 changes: 5 additions & 17 deletions pumpkin/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ pub struct Client {
/// The underlying TCP connection to the client.
pub connection_reader: Arc<Mutex<tokio::net::tcp::OwnedReadHalf>>,
pub connection_writer: Arc<Mutex<tokio::net::tcp::OwnedWriteHalf>>,

/// The client's IP address.
pub address: Mutex<SocketAddr>,
/// The packet encoder for outgoing packets.
Expand All @@ -114,22 +113,12 @@ pub struct Client {
dec: Arc<Mutex<PacketDecoder>>,
/// A queue of raw packets received from the client, waiting to be processed.
pub client_packets_queue: Arc<Mutex<VecDeque<RawPacket>>>,

/// Indicates whether the client should be converted into a player.
pub make_player: AtomicBool,
/// Sends each keep alive packet that the server receives for a player to here, which gets picked up in a tokio task
pub keep_alive_sender: Arc<tokio::sync::mpsc::Sender<i64>>,
/// Stores the last time it was confirmed that the client is alive
pub last_alive_received: AtomicCell<std::time::Instant>,
}

impl Client {
pub fn new(
id: usize,
connection: tokio::net::TcpStream,
address: SocketAddr,
keep_alive_sender: Arc<tokio::sync::mpsc::Sender<i64>>,
) -> Self {
pub fn new(id: usize, connection: tokio::net::TcpStream, address: SocketAddr) -> Self {
let (connection_reader, connection_writer) = connection.into_split();
Self {
protocol_version: AtomicI32::new(0),
Expand All @@ -148,8 +137,6 @@ impl Client {
closed: AtomicBool::new(false),
client_packets_queue: Arc::new(Mutex::new(VecDeque::new())),
make_player: AtomicBool::new(false),
keep_alive_sender,
last_alive_received: AtomicCell::new(std::time::Instant::now()),
}
}

Expand Down Expand Up @@ -382,12 +369,13 @@ impl Client {

/// Reads the connection until our buffer of len 4096 is full, then decode
/// Close connection when an error occurs or when the Client closed the connection
pub async fn poll(&self) {
/// Returns if connection is still open
pub async fn poll(&self) -> bool {
loop {
let mut dec = self.dec.lock().await;
if let Ok(Some(packet)) = dec.decode() {
self.add_packet(packet).await;
return;
return true;
};

dec.reserve(4096);
Expand All @@ -403,7 +391,7 @@ impl Client {
== 0
{
self.close();
return;
return false;
}

// This should always be an O(1) unsplit because we reserved space earlier and
Expand Down
25 changes: 21 additions & 4 deletions pumpkin/src/client/player_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@ use pumpkin_core::{
};
use pumpkin_entity::EntityId;
use pumpkin_inventory::{InventoryError, WindowType};
use pumpkin_protocol::server::play::{SCloseContainer, SKeepAlive, SSetPlayerGround, SUseItem};
use pumpkin_protocol::{
client::play::{
Animation, CAcknowledgeBlockChange, CBlockUpdate, CEntityAnimation, CEntityVelocity,
CHeadRot, CHurtAnimation, CPingResponse, CPlayerChatMessage, CUpdateEntityPos,
CUpdateEntityPosRot, CUpdateEntityRot, CWorldEvent, FilterType,
},
server::play::{
Action, ActionType, SChatCommand, SChatMessage, SClientInformationPlay, SCloseContainer,
SConfirmTeleport, SInteract, SPlayPingRequest, SPlayerAbilities, SPlayerAction,
SPlayerCommand, SPlayerPosition, SPlayerPositionRotation, SPlayerRotation,
SSetCreativeSlot, SSetHeldItem, SSetPlayerGround, SSwingArm, SUseItem, SUseItemOn, Status,
Action, ActionType, SChatCommand, SChatMessage, SClientInformationPlay, SConfirmTeleport,
SInteract, SPlayPingRequest, SPlayerAbilities, SPlayerAction, SPlayerCommand,
SPlayerPosition, SPlayerPositionRotation, SPlayerRotation, SSetCreativeSlot, SSetHeldItem,
SSwingArm, SUseItemOn, Status,
},
};
use pumpkin_world::block::{BlockFace, BlockState};
Expand Down Expand Up @@ -539,6 +540,22 @@ impl Player {
}
}

pub async fn handle_keep_alive(&self, keep_alive: SKeepAlive) {
if self
.wait_for_keep_alive
.load(std::sync::atomic::Ordering::Relaxed)
&& keep_alive.keep_alive_id
== self
.keep_alive_id
.load(std::sync::atomic::Ordering::Relaxed)
{
self.wait_for_keep_alive
.store(false, std::sync::atomic::Ordering::Relaxed);
} else {
self.kick(TextComponent::text("Timeout")).await
}
}

pub async fn handle_player_abilities(&self, player_abilities: SPlayerAbilities) {
let mut abilities = self.abilities.lock().await;

Expand Down
48 changes: 38 additions & 10 deletions pumpkin/src/entity/player.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::sync::{
atomic::{AtomicI32, AtomicU8},
Arc,
use std::{
sync::{
atomic::{AtomicBool, AtomicI32, AtomicI64, AtomicU8},
Arc,
},
time::{Duration, Instant},
};

use crossbeam::atomic::AtomicCell;
Expand All @@ -16,7 +19,7 @@ use pumpkin_inventory::player::PlayerInventory;
use pumpkin_protocol::{
bytebuf::packet_id::Packet,
client::play::{
CGameEvent, CPlayDisconnect, CPlayerAbilities, CPlayerInfoUpdate, CSetHealth,
CGameEvent, CKeepAlive, CPlayDisconnect, CPlayerAbilities, CPlayerInfoUpdate, CSetHealth,
CSyncPlayerPosition, CSystemChatMessage, GameEvent, PlayerAction,
},
server::play::{
Expand Down Expand Up @@ -85,9 +88,14 @@ pub struct Player {
pub teleport_id_count: AtomicI32,
/// The pending teleport information, including the teleport ID and target location.
pub awaiting_teleport: Mutex<Option<(VarInt, Vector3<f64>)>>,

/// The coordinates of the chunk section the player is currently watching.
pub watched_section: AtomicCell<Vector3<i32>>,
/// Did we send a keep alive Packet and wait for the response?
pub wait_for_keep_alive: AtomicBool,
/// Whats the keep alive packet payload we send, The client should responde with the same id
pub keep_alive_id: AtomicI64,
/// Last time we send a keep alive
pub last_keep_alive_time: AtomicCell<Instant>,
}

impl Player {
Expand Down Expand Up @@ -133,6 +141,9 @@ impl Player {
gamemode: AtomicCell::new(gamemode),
watched_section: AtomicCell::new(Vector3::new(0, 0, 0)),
last_position: AtomicCell::new(Vector3::new(0.0, 0.0, 0.0)),
wait_for_keep_alive: AtomicBool::new(false),
keep_alive_id: AtomicI64::new(0),
last_keep_alive_time: AtomicCell::new(std::time::Instant::now()),
}
}

Expand All @@ -150,6 +161,27 @@ impl Player {
.await;
}

pub async fn tick(&self) {
let now = Instant::now();
if now.duration_since(self.last_keep_alive_time.load()) >= Duration::from_secs(15) {
// We never got a response from our last keep alive we send
if self
.wait_for_keep_alive
.load(std::sync::atomic::Ordering::Relaxed)
{
self.kick(TextComponent::text("Timeout")).await;
return;
}
self.wait_for_keep_alive
.store(true, std::sync::atomic::Ordering::Relaxed);
self.last_keep_alive_time.store(now);
let id = now.elapsed().as_millis() as i64;
self.keep_alive_id
.store(id, std::sync::atomic::Ordering::Relaxed);
self.client.send_packet(&CKeepAlive::new(id)).await;
}
}

pub const fn entity_id(&self) -> EntityId {
self.living_entity.entity.entity_id
}
Expand Down Expand Up @@ -418,11 +450,7 @@ impl Player {
Ok(())
}
SKeepAlive::PACKET_ID => {
self.client
.keep_alive_sender
.send(SKeepAlive::read(bytebuf)?.keep_alive_id)
.await
.unwrap();
self.handle_keep_alive(SKeepAlive::read(bytebuf)?).await;
Ok(())
}
_ => {
Expand Down
63 changes: 18 additions & 45 deletions pumpkin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@ compile_error!("Compiling for WASI targets is not supported!");
use log::LevelFilter;

use client::Client;
use pumpkin_protocol::client::play::CKeepAlive;
use pumpkin_protocol::ConnectionState;
use server::Server;
use server::{ticker::Ticker, Server};
use std::io::{self};
use std::time::Duration;

// Setup some tokens to allow us to identify which event is for which socket.

Expand Down Expand Up @@ -126,6 +123,8 @@ async fn main() -> io::Result<()> {
let rcon = ADVANCED_CONFIG.rcon.clone();

let server = Arc::new(Server::new());
let mut ticker = Ticker::new(BASIC_CONFIG.tps);

log::info!("Started Server took {}ms", time.elapsed().as_millis());
log::info!("You now can connect to the server, Listening on {}", addr);

Expand Down Expand Up @@ -154,6 +153,12 @@ async fn main() -> io::Result<()> {
RCONServer::new(&rcon, server).await.unwrap();
});
}
{
let server = server.clone();
tokio::spawn(async move {
ticker.run(&server).await;
});
}
let mut unique_id = 0;
loop {
// Asynchronously wait for an inbound socket.
Expand All @@ -172,53 +177,19 @@ async fn main() -> io::Result<()> {
id
);

let keep_alive = tokio::sync::mpsc::channel(1024);
let client = Arc::new(Client::new(id, connection, addr, keep_alive.0.into()));

{
let client = client.clone();
let mut receiver = keep_alive.1;
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
let now = std::time::Instant::now();
if client.connection_state.load() == ConnectionState::Play {
if now.duration_since(client.last_alive_received.load())
>= Duration::from_secs(15)
{
dbg!("no keep alive");
client.kick("No keep alive received").await;
break;
}
let random = rand::random::<i64>();
client
.send_packet(&CKeepAlive {
keep_alive_id: random,
})
.await;
if let Some(id) = receiver.recv().await {
if id == random {
client.last_alive_received.store(now);
}
}
} else {
client.last_alive_received.store(now);
}
}
});
}
let client = Arc::new(Client::new(id, connection, addr));

let server = server.clone();
tokio::spawn(async move {
let server = &server; // Reference to server
while !client.closed.load(std::sync::atomic::Ordering::Relaxed)
&& !client
.make_player
.load(std::sync::atomic::Ordering::Relaxed)
{
client.process_packets(server).await;
client.poll().await;
let open = client.poll().await;
if open {
client.process_packets(&server).await
};
}
if client
.make_player
Expand All @@ -233,8 +204,10 @@ async fn main() -> io::Result<()> {
.closed
.load(std::sync::atomic::Ordering::Relaxed)
{
player.process_packets(server).await;
player.client.poll().await;
let open = player.client.poll().await;
if open {
player.process_packets(&server).await
};
}
player.remove().await;
}
Expand Down
8 changes: 8 additions & 0 deletions pumpkin/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use crate::{

mod connection_cache;
mod key_store;
pub mod ticker;

pub const CURRENT_MC_VERSION: &str = "1.21.1";

pub struct Server {
Expand Down Expand Up @@ -168,4 +170,10 @@ impl Server {
pub fn digest_secret(&self, secret: &[u8]) -> String {
self.key_store.get_digest(secret)
}

async fn tick(&self) {
for world in &self.worlds {
world.tick().await;
}
}
}
Loading

0 comments on commit bb05095

Please sign in to comment.