From b317a6dc2cc6a4b3b69e38fbba178f25c21d183e Mon Sep 17 00:00:00 2001 From: Hennadii Chernyshchyk Date: Wed, 13 Nov 2024 03:26:30 +0200 Subject: [PATCH 01/29] Optimize replication message packing See comments in the code for details. I also switched to plain `Vec` for serialization since we no longer need `Cursor` for it and `Writer::write_all` is much slower then `Vec::extend_from_slice`. --- CHANGELOG.md | 2 + Cargo.toml | 1 + src/client.rs | 73 +-- src/core/replication.rs | 15 + .../replicated_clients/client_visibility.rs | 30 +- .../replication_registry/component_fns.rs | 10 +- .../replication_registry/rule_fns.rs | 8 +- .../replication_registry/test_fns.rs | 6 +- src/lib.rs | 9 - src/server.rs | 281 +++++++--- src/server/replication_messages.rs | 68 +-- .../replication_messages/init_message.rs | 506 ++++++++---------- .../replication_messages/serialized_data.rs | 105 ++++ .../replication_messages/update_message.rs | 326 +++++------ tests/stats.rs | 2 +- 15 files changed, 740 insertions(+), 702 deletions(-) create mode 100644 src/server/replication_messages/serialized_data.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 52ad37bb..a5a98bee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- Various optimizations for replication messages to use fewer bytes. +- Accept `Vec` instead of `Cursor>` for serialization. - All `TestFnsEntityExt` now accept `FnsId`. - Move replication-related modules from `core` module under `core::replication`. - Move `Replicated` to the `replication` module. diff --git a/Cargo.toml b/Cargo.toml index fba5e7c9..941c1fd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ bincode = "1.3" serde = "1.0" integer-encoding = "4.0" ordered-multimap = "0.7" +bitflags = "2.6" [dev-dependencies] bevy = { version = "0.14", default-features = false, features = [ diff --git a/src/client.rs b/src/client.rs index 40df014b..d7b44d5d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -8,7 +8,7 @@ use std::{io::Cursor, mem}; use bevy::{ecs::world::CommandQueue, prelude::*}; use bincode::{DefaultOptions, Options}; use bytes::Bytes; -use integer_encoding::VarIntReader; +use integer_encoding::{FixedIntReader, VarIntReader, VarIntWriter}; use crate::core::{ channels::{ReplicationChannel, RepliconChannels}, @@ -20,7 +20,7 @@ use crate::core::{ ctx::{DespawnCtx, RemoveCtx, WriteCtx}, ReplicationRegistry, }, - Replicated, + InitMessageHeader, Replicated, }, replicon_client::RepliconClient, replicon_tick::RepliconTick, @@ -163,7 +163,7 @@ fn apply_replication( let mut acks = Vec::with_capacity(acks_size); for message in client.receive(ReplicationChannel::Update) { let update_index = read_update_message(params, buffered_updates, message)?; - bincode::serialize_into(&mut acks, &update_index)?; + acks.write_varint(update_index)?; } client.send(ReplicationChannel::Init, acks); } @@ -184,39 +184,38 @@ fn apply_init_message( stats.bytes += end_pos; } - let message_tick = bincode::deserialize_from(&mut cursor)?; + let header = InitMessageHeader::from_bits_retain(cursor.read_fixedint()?); + let message_tick = DefaultOptions::new().deserialize_from(&mut cursor)?; trace!("applying init message for {message_tick:?}"); world.resource_mut::().0 = message_tick; - debug_assert!(cursor.position() < end_pos, "init message can't be empty"); - apply_entity_mappings(world, params, &mut cursor)?; - if cursor.position() == end_pos { - return Ok(()); + if header.contains(InitMessageHeader::MAPPINGS) { + apply_entity_mappings(world, params, &mut cursor)?; } - apply_despawns(world, params, &mut cursor, message_tick)?; - if cursor.position() == end_pos { - return Ok(()); + if header.contains(InitMessageHeader::DESPAWNS) { + apply_despawns(world, params, &mut cursor, message_tick)?; } - apply_init_components( - world, - params, - ComponentsKind::Removal, - &mut cursor, - message_tick, - )?; - if cursor.position() == end_pos { - return Ok(()); + if header.contains(InitMessageHeader::REMOVALS) { + apply_init_components( + world, + params, + ComponentsKind::Removal, + &mut cursor, + message_tick, + )?; } - apply_init_components( - world, - params, - ComponentsKind::Insert, - &mut cursor, - message_tick, - )?; + if header.contains(InitMessageHeader::CHANGES) { + apply_init_components( + world, + params, + ComponentsKind::Insert, + &mut cursor, + message_tick, + )?; + } Ok(()) } @@ -236,7 +235,9 @@ fn read_update_message( stats.bytes += end_pos; } - let (init_tick, message_tick, update_index) = bincode::deserialize_from(&mut cursor)?; + let init_tick = DefaultOptions::new().deserialize_from(&mut cursor)?; + let message_tick = DefaultOptions::new().deserialize_from(&mut cursor)?; + let update_index = cursor.read_varint()?; trace!("received update message for {message_tick:?}"); buffered_updates.insert(BufferedUpdate { init_tick, @@ -285,7 +286,7 @@ fn apply_entity_mappings( params: &mut ReceiveParams, cursor: &mut Cursor<&[u8]>, ) -> bincode::Result<()> { - let mappings_len: u16 = bincode::deserialize_from(&mut *cursor)?; + let mappings_len: usize = cursor.read_varint()?; if let Some(stats) = &mut params.stats { stats.mappings += mappings_len as u32; } @@ -313,10 +314,10 @@ fn apply_init_components( cursor: &mut Cursor<&[u8]>, message_tick: RepliconTick, ) -> bincode::Result<()> { - let entities_len: u16 = bincode::deserialize_from(&mut *cursor)?; + let entities_len: usize = cursor.read_varint()?; for _ in 0..entities_len { let server_entity = deserialize_entity(cursor)?; - let data_size: u16 = bincode::deserialize_from(&mut *cursor)?; + let data_size: usize = cursor.read_varint()?; let client_entity = params .entity_map @@ -339,7 +340,7 @@ fn apply_init_components( } let end_pos = cursor.position() + data_size as u64; - let mut components_len = 0u32; + let mut components_len = 0; while cursor.position() < end_pos { let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?; let (component_id, component_fns, rule_fns) = params.registry.get(fns_id); @@ -389,7 +390,7 @@ fn apply_despawns( cursor: &mut Cursor<&[u8]>, message_tick: RepliconTick, ) -> bincode::Result<()> { - let entities_len: u16 = bincode::deserialize_from(&mut *cursor)?; + let entities_len: usize = cursor.read_varint()?; if let Some(stats) = &mut params.stats { stats.despawns += entities_len as u32; } @@ -411,7 +412,7 @@ fn apply_despawns( Ok(()) } -/// Deserializes replicated component updates and applies them to the `world`. +/// Deserializes replicated component updates and applies them to the `world`. /// /// Consumes all remaining bytes in the cursor. fn apply_update_components( @@ -423,7 +424,7 @@ fn apply_update_components( let message_end = cursor.get_ref().len() as u64; while cursor.position() < message_end { let server_entity = deserialize_entity(cursor)?; - let data_size: u16 = bincode::deserialize_from(&mut *cursor)?; + let data_size: usize = cursor.read_varint()?; let Some(client_entity) = params.entity_map.get_by_server(server_entity) else { // Update could arrive after a despawn from init message. @@ -470,7 +471,7 @@ fn apply_update_components( } let end_pos = cursor.position() + data_size as u64; - let mut components_count = 0u32; + let mut components_count = 0; while cursor.position() < end_pos { let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?; let (component_id, component_fns, rule_fns) = params.registry.get(fns_id); diff --git a/src/core/replication.rs b/src/core/replication.rs index 33b30885..fc6de032 100644 --- a/src/core/replication.rs +++ b/src/core/replication.rs @@ -13,3 +13,18 @@ pub type Replication = Replicated; #[derive(Component, Clone, Copy, Default, Reflect, Debug)] #[reflect(Component)] pub struct Replicated; + +use bitflags::bitflags; + +bitflags! { + /// Types of data that can be optionally included inside init message if the bit is set. + /// + /// Serialized at the beginning of the message. + #[derive(Default, Clone, Copy)] + pub(crate) struct InitMessageHeader: u8 { + const MAPPINGS = 0b00000001; + const DESPAWNS = 0b00000010; + const REMOVALS = 0b00000100; + const CHANGES = 0b00001000; + } +} diff --git a/src/core/replication/replicated_clients/client_visibility.rs b/src/core/replication/replicated_clients/client_visibility.rs index 7242e73e..f8632fc0 100644 --- a/src/core/replication/replicated_clients/client_visibility.rs +++ b/src/core/replication/replicated_clients/client_visibility.rs @@ -9,11 +9,6 @@ use super::VisibilityPolicy; /// Entity visibility settings for a client. pub struct ClientVisibility { filter: VisibilityFilter, - - /// Visibility for a specific entity that has been cached for re-referencing. - /// - /// Used as an optimization by server replication. - cached_visibility: Visibility, } impl ClientVisibility { @@ -36,10 +31,7 @@ impl ClientVisibility { /// Creates a new instance with a specific filter. fn with_filter(filter: VisibilityFilter) -> Self { - Self { - filter, - cached_visibility: Default::default(), - } + Self { filter } } /// Resets the filter state to as it was after [`Self::new`]. @@ -177,7 +169,7 @@ impl ClientVisibility { // For blacklisting an entity we don't remove the entity right away. // Instead we mark it as queued for removal and remove it // later in `Self::update`. This allows us to avoid accessing - // the blacklist's `removed` field in `Self::get_visibility_state`. + // the blacklist's `removed` field in `Self::visibility_state`. entry.insert(BlacklistInfo::QueuedForRemoval); removed.insert(entity); } else { @@ -200,7 +192,7 @@ impl ClientVisibility { // Instead we mark it as `WhitelistInfo::JustAdded` and then set it to // 'WhitelistInfo::Visible' in `Self::update`. // This allows us to avoid accessing the whitelist's `added` field in - // `Self::get_visibility_state`. + // `Self::visibility_state`. if *list.entry(entity).or_insert(WhitelistInfo::JustAdded) == WhitelistInfo::JustAdded { @@ -227,26 +219,14 @@ impl ClientVisibility { /// Checks if a specific entity is visible. pub fn is_visible(&self, entity: Entity) -> bool { - match self.get_visibility_state(entity) { + match self.visibility_state(entity) { Visibility::Hidden => false, Visibility::Gained | Visibility::Visible => true, } } - /// Caches visibility for a specific entity. - /// - /// Can be obtained later from [`Self::cached_visibility`]. - pub(crate) fn cache_visibility(&mut self, entity: Entity) { - self.cached_visibility = self.get_visibility_state(entity); - } - - /// Returns visibility cached by the last call of [`Self::cache_visibility`]. - pub(crate) fn cached_visibility(&self) -> Visibility { - self.cached_visibility - } - /// Returns visibility of a specific entity. - fn get_visibility_state(&self, entity: Entity) -> Visibility { + pub(crate) fn visibility_state(&self, entity: Entity) -> Visibility { match &self.filter { VisibilityFilter::All => Visibility::Visible, VisibilityFilter::Blacklist { list, .. } => match list.get(&entity) { diff --git a/src/core/replication/replication_registry/component_fns.rs b/src/core/replication/replication_registry/component_fns.rs index c22b6781..a43f67ee 100644 --- a/src/core/replication/replication_registry/component_fns.rs +++ b/src/core/replication/replication_registry/component_fns.rs @@ -88,9 +88,9 @@ impl ComponentFns { ctx: &SerializeCtx, rule_fns: &UntypedRuleFns, ptr: Ptr, - cursor: &mut Cursor>, + message: &mut Vec, ) -> bincode::Result<()> { - (self.serialize)(ctx, rule_fns, ptr, cursor) + (self.serialize)(ctx, rule_fns, ptr, message) } /// Calls the assigned writing function based on entity markers. @@ -174,7 +174,7 @@ impl ComponentFns { /// Signature of component serialization functions that restore the original type. type UntypedSerializeFn = - unsafe fn(&SerializeCtx, &UntypedRuleFns, Ptr, &mut Cursor>) -> bincode::Result<()>; + unsafe fn(&SerializeCtx, &UntypedRuleFns, Ptr, &mut Vec) -> bincode::Result<()>; /// Signature of component writing functions that restore the original type. type UntypedWriteFn = unsafe fn( @@ -198,10 +198,10 @@ unsafe fn untyped_serialize( ctx: &SerializeCtx, rule_fns: &UntypedRuleFns, ptr: Ptr, - cursor: &mut Cursor>, + message: &mut Vec, ) -> bincode::Result<()> { let rule_fns = rule_fns.typed::(); - rule_fns.serialize(ctx, ptr.deref::(), cursor) + rule_fns.serialize(ctx, ptr.deref::(), message) } /// Resolves `rule_fns` to `C` and calls [`UntypedCommandFns::write`] for `C`. diff --git a/src/core/replication/replication_registry/rule_fns.rs b/src/core/replication/replication_registry/rule_fns.rs index 4ca144b0..92805199 100644 --- a/src/core/replication/replication_registry/rule_fns.rs +++ b/src/core/replication/replication_registry/rule_fns.rs @@ -125,9 +125,9 @@ impl RuleFns { &self, ctx: &SerializeCtx, component: &C, - cursor: &mut Cursor>, + message: &mut Vec, ) -> bincode::Result<()> { - (self.serialize)(ctx, component, cursor) + (self.serialize)(ctx, component, message) } /// Deserializes a component from a cursor. @@ -187,7 +187,7 @@ impl Default for RuleFns { } /// Signature of component serialization functions. -pub type SerializeFn = fn(&SerializeCtx, &C, &mut Cursor>) -> bincode::Result<()>; +pub type SerializeFn = fn(&SerializeCtx, &C, &mut Vec) -> bincode::Result<()>; /// Signature of component deserialization functions. pub type DeserializeFn = fn(&mut WriteCtx, &mut Cursor<&[u8]>) -> bincode::Result; @@ -204,7 +204,7 @@ pub type ConsumeFn = pub fn default_serialize( _ctx: &SerializeCtx, component: &C, - cursor: &mut Cursor>, + cursor: &mut Vec, ) -> bincode::Result<()> { DefaultOptions::new().serialize_into(cursor, component) } diff --git a/src/core/replication/replication_registry/test_fns.rs b/src/core/replication/replication_registry/test_fns.rs index 5592b889..791984f6 100644 --- a/src/core/replication/replication_registry/test_fns.rs +++ b/src/core/replication/replication_registry/test_fns.rs @@ -92,7 +92,7 @@ impl TestFnsEntityExt for EntityWorldMut<'_> { fn serialize(&mut self, fns_id: FnsId, server_tick: RepliconTick) -> Vec { let registry = self.world().resource::(); let (component_id, component_fns, rule_fns) = registry.get(fns_id); - let mut cursor = Cursor::default(); + let mut message = Vec::new(); let ctx = SerializeCtx { server_tick, component_id, @@ -107,11 +107,11 @@ impl TestFnsEntityExt for EntityWorldMut<'_> { unsafe { component_fns - .serialize(&ctx, rule_fns, ptr, &mut cursor) + .serialize(&ctx, rule_fns, ptr, &mut message) .expect("serialization into memory should never fail"); } - cursor.into_inner() + message } fn apply_write(&mut self, data: &[u8], fns_id: FnsId, message_tick: RepliconTick) -> &mut Self { diff --git a/src/lib.rs b/src/lib.rs index 1101dbb6..97de36c1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -595,15 +595,6 @@ Updates for despawned entities will be discarded automatically, but events or co Clients should never assume their world state is the same as the server's on any given tick value-wise. World state on the client is only "eventually consistent" with the server's. -# Limits - -To reduce packet size there are the following limits per replication update: - -- Up to [`u16::MAX`] entities that have added components with up to [`u16::MAX`] bytes of component data. -- Up to [`u16::MAX`] entities that have changed components with up to [`u16::MAX`] bytes of component data. -- Up to [`u16::MAX`] entities that have removed components with up to [`u16::MAX`] bytes of component data. -- Up to [`u16::MAX`] entities that were despawned. - # Troubleshooting If you face any issue, try to enable logging to see what is going on. diff --git a/src/server.rs b/src/server.rs index 0495671f..95caa69e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -6,7 +6,7 @@ pub(super) mod replicated_archetypes; pub(super) mod replication_messages; pub mod server_tick; -use std::{io::Cursor, mem, time::Duration}; +use std::{io::Cursor, mem, ops::Range, time::Duration}; use bevy::{ ecs::{ @@ -19,6 +19,7 @@ use bevy::{ ptr::Ptr, time::common_conditions::on_timer, }; +use integer_encoding::VarIntReader; use crate::core::{ channels::{ReplicationChannel, RepliconChannels}, @@ -29,7 +30,10 @@ use crate::core::{ replicated_clients::{ client_visibility::Visibility, ClientBuffers, ReplicatedClients, VisibilityPolicy, }, - replication_registry::{ctx::SerializeCtx, ReplicationRegistry}, + replication_registry::{ + component_fns::ComponentFns, ctx::SerializeCtx, rule_fns::UntypedRuleFns, + ReplicationRegistry, + }, replication_rules::ReplicationRules, }, replicon_server::RepliconServer, @@ -39,8 +43,8 @@ use crate::core::{ use client_entity_map::ClientEntityMap; use despawn_buffer::{DespawnBuffer, DespawnBufferPlugin}; use removal_buffer::{RemovalBuffer, RemovalBufferPlugin}; -use replicated_archetypes::ReplicatedArchetypes; -use replication_messages::ReplicationMessages; +use replicated_archetypes::{ReplicatedArchetypes, ReplicatedComponent}; +use replication_messages::{serialized_data::SerializedData, ReplicationMessages}; use server_tick::ServerTick; pub struct ServerPlugin { @@ -234,7 +238,7 @@ impl ServerPlugin { let mut cursor = Cursor::new(&*message); let message_end = message.len() as u64; while cursor.position() < message_end { - match bincode::deserialize_from(&mut cursor) { + match cursor.read_varint() { Ok(update_index) => { let client = replicated_clients.client_mut(client_id); client.acknowledge( @@ -251,6 +255,7 @@ impl ServerPlugin { /// Collects [`ReplicationMessages`] and sends them. pub(super) fn send_replication( + mut serialized: Local, mut messages: Local, mut replicated_archetypes: Local, change_tick: SystemChangeTick, @@ -277,11 +282,22 @@ impl ServerPlugin { messages.reset(replicated_clients.len()); - collect_mappings(&mut messages, &replicated_clients, &mut set.p4())?; - collect_despawns(&mut messages, &mut replicated_clients, &mut set.p5())?; - collect_removals(&mut messages, &removal_buffer)?; + collect_mappings( + &mut messages, + &mut serialized, + &replicated_clients, + &mut set.p4(), + )?; + collect_despawns( + &mut messages, + &mut serialized, + &mut replicated_clients, + &mut set.p5(), + )?; + collect_removals(&mut messages, &mut serialized, &removal_buffer)?; collect_changes( &mut messages, + &mut serialized, &mut replicated_clients, &replicated_archetypes, ®istry, @@ -292,23 +308,17 @@ impl ServerPlugin { )?; removal_buffer.clear(); - for ((init_message, update_message), client) in - messages.iter_mut().zip(replicated_clients.iter_mut()) - { - let server = &mut set.p6(); - - init_message.send(server, client, **server_tick)?; - update_message.send( - server, - client, - &mut client_buffers, - **server_tick, - change_tick.this_run(), - time.elapsed(), - )?; - - client.visibility_mut().update(); - } + send_messages( + &mut messages, + &mut replicated_clients, + &mut set.p6(), + **server_tick, + &mut serialized, + &mut client_buffers, + change_tick, + &time, + )?; + serialized.clear(); // Return borrowed data back. *set.p1() = replicated_clients; @@ -332,52 +342,91 @@ impl ServerPlugin { } } +fn send_messages( + messages: &mut ReplicationMessages, + replicated_clients: &mut ReplicatedClients, + server: &mut RepliconServer, + server_tick: RepliconTick, + serialized: &mut SerializedData, + client_buffers: &mut ClientBuffers, + change_tick: SystemChangeTick, + time: &Time, +) -> Result<(), Box> { + let mut server_tick_range = None; + for ((init_message, update_message), client) in + messages.iter_mut().zip(replicated_clients.iter_mut()) + { + if !init_message.is_empty() { + client.set_init_tick(server_tick); + let server_tick = write_tick_cached(&mut server_tick_range, serialized, server_tick)?; + + trace!("sending init message to {:?}", client.id()); + init_message.send(server, client, serialized, server_tick)?; + } else { + trace!("no init data to send for {:?}", client.id()); + } + + if !update_message.is_empty() { + let server_tick = write_tick_cached(&mut server_tick_range, serialized, server_tick)?; + + trace!("sending update message(s) to {:?}", client.id()); + update_message.send( + server, + client, + client_buffers, + serialized, + server_tick, + change_tick.this_run(), + time.elapsed(), + )?; + } else { + trace!("no updates to send for {:?}", client.id()); + } + + client.visibility_mut().update(); + } + + Ok(()) +} + /// Collects and writes any new entity mappings that happened in this tick. -/// -/// On deserialization mappings should be processed first, so all referenced entities after it will behave correctly. fn collect_mappings( messages: &mut ReplicationMessages, + serialized: &mut SerializedData, replicated_clients: &ReplicatedClients, entity_map: &mut ClientEntityMap, ) -> bincode::Result<()> { for ((message, _), client) in messages.iter_mut().zip(replicated_clients.iter()) { - message.start_array(); - if let Some(mappings) = entity_map.0.get_mut(&client.id()) { - for mapping in mappings.drain(..) { - message.write_client_mapping(&mapping)?; - } + let len = mappings.len(); + let mappings = serialized.write_mappings(mappings.drain(..))?; + message.set_mappings(mappings, len); } - - message.end_array()?; } + Ok(()) } /// Collect entity despawns from this tick into init messages. fn collect_despawns( messages: &mut ReplicationMessages, + serialized: &mut SerializedData, replicated_clients: &mut ReplicatedClients, despawn_buffer: &mut DespawnBuffer, ) -> bincode::Result<()> { - for (message, _) in messages.iter_mut() { - message.start_array(); - } - for entity in despawn_buffer.drain(..) { - let mut shared_bytes = None; + let entity_range = serialized.write_entity(entity)?; for ((message, _), client) in messages.iter_mut().zip(replicated_clients.iter_mut()) { client.remove_despawned(entity); - message.write_entity(&mut shared_bytes, entity)?; + message.add_despawn(entity_range.clone()); } } for ((message, _), client) in messages.iter_mut().zip(replicated_clients.iter_mut()) { for entity in client.drain_lost_visibility() { - message.write_entity(&mut None, entity)?; + let entity_range = serialized.write_entity(entity)?; + message.add_despawn(entity_range); } - - message.end_array()?; } Ok(()) @@ -386,26 +435,17 @@ fn collect_despawns( /// Collects component removals from this tick into init messages. fn collect_removals( messages: &mut ReplicationMessages, + serialized: &mut SerializedData, removal_buffer: &RemovalBuffer, ) -> bincode::Result<()> { - for (message, _) in messages.iter_mut() { - message.start_array(); - } - for (&entity, remove_ids) in removal_buffer.iter() { + let entity = serialized.write_entity(entity)?; + let fn_ids = serialized.write_fn_ids(remove_ids.iter().map(|&(_, fns_id)| fns_id))?; for (message, _) in messages.iter_mut() { - message.start_entity_data(entity); - for &(_, fns_id) in remove_ids { - message.write_fns_id(fns_id)?; - } - message.end_entity_data(false)?; + message.add_removals(entity.clone(), fn_ids.clone()); } } - for (message, _) in messages.iter_mut() { - message.end_array()?; - } - Ok(()) } @@ -413,6 +453,7 @@ fn collect_removals( /// since the last entity tick. fn collect_changes( messages: &mut ReplicationMessages, + serialized: &mut SerializedData, replicated_clients: &mut ReplicatedClients, replicated_archetypes: &ReplicatedArchetypes, registry: &ReplicationRegistry, @@ -421,10 +462,6 @@ fn collect_changes( change_tick: &SystemChangeTick, server_tick: RepliconTick, ) -> bincode::Result<()> { - for (init_message, _) in messages.iter_mut() { - init_message.start_array(); - } - for replicated_archetype in replicated_archetypes.iter() { // SAFETY: all IDs from replicated archetypes obtained from real archetypes. let archetype = unsafe { @@ -443,12 +480,13 @@ fn collect_changes( }; for entity in archetype.entities() { + let mut entity_range = None; for ((init_message, update_message), client) in - messages.iter_mut().zip(replicated_clients.iter_mut()) + messages.iter_mut().zip(replicated_clients.iter()) { - init_message.start_entity_data(entity.id()); - update_message.start_entity_data(entity.id()); - client.visibility_mut().cache_visibility(entity.id()); + let visibility = client.visibility().visibility_state(entity.id()); + init_message.start_entity_changes(visibility); + update_message.start_entity_changes(); } // SAFETY: all replicated archetypes have marker component with table storage. @@ -486,40 +524,56 @@ fn collect_changes( server_tick, component_id, }; - let mut shared_bytes = None; + let mut component_range = None; for ((init_message, update_message), client) in - messages.iter_mut().zip(replicated_clients.iter_mut()) + messages.iter_mut().zip(replicated_clients.iter()) { - let visibility = client.visibility().cached_visibility(); - if visibility == Visibility::Hidden { + if init_message.entity_visibility() == Visibility::Hidden { continue; } if let Some(tick) = client .get_change_tick(entity.id()) .filter(|_| !marker_added) - .filter(|_| visibility != Visibility::Gained) + .filter(|_| init_message.entity_visibility() != Visibility::Gained) .filter(|_| !ticks.is_added(change_tick.last_run(), change_tick.this_run())) { if ticks.is_changed(tick, change_tick.this_run()) { - update_message.write_component( - &mut shared_bytes, + if !update_message.changes_written() { + let entity_range = write_entity_cached( + &mut entity_range, + serialized, + entity.id(), + )?; + update_message.add_changed_entity(entity.id(), entity_range); + } + let component_range = write_component_cached( + &mut component_range, + serialized, rule_fns, component_fns, &ctx, - replicated_component.fns_id, + replicated_component, component, )?; + update_message.add_changed_component(component_range); } } else { - init_message.write_component( - &mut shared_bytes, + if !init_message.entity_written() { + let entity_range = + write_entity_cached(&mut entity_range, serialized, entity.id())?; + init_message.add_changed_entity(entity_range); + } + let component_range = write_component_cached( + &mut component_range, + serialized, rule_fns, component_fns, &ctx, - replicated_component.fns_id, + replicated_component, component, )?; + init_message.add_changed_component(component_range); } } } @@ -527,33 +581,32 @@ fn collect_changes( for ((init_message, update_message), client) in messages.iter_mut().zip(replicated_clients.iter_mut()) { - let visibility = client.visibility().cached_visibility(); + let visibility = init_message.entity_visibility(); if visibility == Visibility::Hidden { continue; } let new_entity = marker_added || visibility == Visibility::Gained; if new_entity - || init_message.entity_data_size() != 0 + || init_message.entity_written() || removal_buffer.contains_key(&entity.id()) { // If there is any insertion, removal, or we must initialize, include all updates into init message. // and bump the last acknowledged tick to keep entity updates atomic. - init_message.take_entity_data(update_message)?; + init_message.take_changes(update_message); client.set_change_tick(entity.id(), change_tick.this_run()); - } else { - update_message.end_entity_data()?; } - init_message.end_entity_data(new_entity)?; + if new_entity && !init_message.entity_written() { + // Force-write new entity even if it doesn't have any components. + let entity_range = + write_entity_cached(&mut entity_range, serialized, entity.id())?; + init_message.add_changed_entity(entity_range); + } } } } - for (init_message, _) in messages.iter_mut() { - init_message.end_array()?; - } - Ok(()) } @@ -587,6 +640,64 @@ unsafe fn get_component_unchecked<'w>( } } +/// Writes an entity or re-uses previosly written range if exists. +fn write_entity_cached( + entity_range: &mut Option>, + serialized: &mut SerializedData, + entity: Entity, +) -> bincode::Result> { + if let Some(range) = entity_range.clone() { + return Ok(range); + } + + let range = serialized.write_entity(entity)?; + *entity_range = Some(range.clone()); + + Ok(range) +} + +/// Writes a component or re-uses previosly written range if exists. +fn write_component_cached( + component_range: &mut Option>, + serialized: &mut SerializedData, + rule_fns: &UntypedRuleFns, + component_fns: &ComponentFns, + ctx: &SerializeCtx, + replicated_component: &ReplicatedComponent, + component: Ptr<'_>, +) -> bincode::Result> { + if let Some(component_range) = component_range.clone() { + return Ok(component_range); + } + + let range = serialized.write_component( + rule_fns, + component_fns, + ctx, + replicated_component.fns_id, + component, + )?; + *component_range = Some(range.clone()); + + Ok(range) +} + +/// Writes an entity or re-uses previosly written range if exists. +fn write_tick_cached( + tick_range: &mut Option>, + serialized: &mut SerializedData, + tick: RepliconTick, +) -> bincode::Result> { + if let Some(range) = tick_range.clone() { + return Ok(range); + } + + let range = serialized.write_tick(tick)?; + *tick_range = Some(range.clone()); + + Ok(range) +} + /// Set with replication and event systems related to server. #[derive(SystemSet, Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum ServerSet { diff --git a/src/server/replication_messages.rs b/src/server/replication_messages.rs index 9b1ffd46..1ead0a92 100644 --- a/src/server/replication_messages.rs +++ b/src/server/replication_messages.rs @@ -1,20 +1,15 @@ pub(super) mod init_message; +pub(super) mod serialized_data; pub(super) mod update_message; -use std::io::{Cursor, Write}; - -use bevy::prelude::*; -use integer_encoding::VarIntWriter; - use init_message::InitMessage; use update_message::UpdateMessage; /// Accumulates replication messages. /// -/// Messages are serialized and deserialized manually because using an intermediate structure -/// leads to allocations and according to our benchmarks it's much slower. -/// -/// Reuses allocated memory from older messages. +/// Messages are serialized manually into [`SerializedData`](serialized_data::SerializedData) +/// and store only ranges that points to data. This helps reduce allocations and share +/// serialized data across messages. #[derive(Default)] pub(crate) struct ReplicationMessages { messages: Vec<(InitMessage, UpdateMessage)>, @@ -36,8 +31,8 @@ impl ReplicationMessages { for index in 0..clients_count { if let Some((init_message, update_message)) = self.messages.get_mut(index) { - init_message.reset(); - update_message.reset(); + init_message.clear(); + update_message.clear(); } else { self.messages.push(Default::default()); } @@ -49,54 +44,3 @@ impl ReplicationMessages { self.messages.iter_mut().take(self.len) } } - -/// Writes new data into a cursor and returns the serialized size. -/// -/// Reuses previously shared bytes if they exist, or updates them. -/// Serialized size should be less then [`u16`]. -fn write_with<'a>( - shared_bytes: &mut Option<&'a [u8]>, - cursor: &'a mut Cursor>, - write_fn: impl FnOnce(&mut Cursor>) -> bincode::Result<()>, -) -> bincode::Result { - let bytes = if let Some(bytes) = shared_bytes { - cursor.write_all(bytes)?; - bytes - } else { - let previous_pos = cursor.position() as usize; - (write_fn(cursor))?; - let current_pos = cursor.position() as usize; - - let buffer = cursor.get_ref(); - let bytes = &buffer[previous_pos..current_pos]; - *shared_bytes = Some(bytes); - - bytes - }; - - let size = bytes - .len() - .try_into() - .map_err(|_| bincode::ErrorKind::SizeLimit)?; - - Ok(size) -} - -/// Serializes `entity` by writing its index and generation as separate varints. -/// -/// The index is first prepended with a bit flag to indicate if the generation -/// is serialized or not. It is not serialized if <= 1; note that generations are [`NonZeroU32`](std::num::NonZeroU32) -/// and a value of zero is used in [`Option`] to signify [`None`], so generation 1 is the first -/// generation. -fn serialize_entity(cursor: &mut Cursor>, entity: Entity) -> bincode::Result<()> { - let mut flagged_index = (entity.index() as u64) << 1; - let flag = entity.generation() > 1; - flagged_index |= flag as u64; - - cursor.write_varint(flagged_index)?; - if flag { - cursor.write_varint(entity.generation() - 1)?; - } - - Ok(()) -} diff --git a/src/server/replication_messages/init_message.rs b/src/server/replication_messages/init_message.rs index 0df8b98c..054508d8 100644 --- a/src/server/replication_messages/init_message.rs +++ b/src/server/replication_messages/init_message.rs @@ -1,344 +1,288 @@ -use std::{ - io::{Cursor, Write}, - mem, -}; +use std::ops::Range; + +use bevy::prelude::*; +use integer_encoding::{FixedIntWriter, VarInt, VarIntWriter}; -use bevy::{prelude::*, ptr::Ptr}; -use bincode::{DefaultOptions, Options}; -use bytes::Bytes; - -use super::update_message::UpdateMessage; -use crate::{ - core::{ - channels::ReplicationChannel, - replication::{ - replicated_clients::ReplicatedClient, - replication_registry::{ - component_fns::ComponentFns, ctx::SerializeCtx, rule_fns::UntypedRuleFns, FnsId, - }, - }, - replicon_server::RepliconServer, - replicon_tick::RepliconTick, +use super::{serialized_data::SerializedData, update_message::UpdateMessage}; +use crate::core::{ + channels::ReplicationChannel, + replication::{ + replicated_clients::{client_visibility::Visibility, ReplicatedClient}, + InitMessageHeader, }, - server::client_entity_map::ClientMapping, + replicon_server::RepliconServer, }; -/// A reusable message with replicated data. +/// A message with replicated data. /// -/// Contains tick and mappings, insertions, removals and despawns that +/// Contains tick, mappings, insertions, removals and despawns that /// happened on this tick. +/// +/// The data is stored in the form of ranges from [`SerializedData`]. +/// /// Sent over [`ReplicationChannel::Init`] channel. /// -/// See also [Limits](../index.html#limits) +/// All sizes are serialized as `usize`, but we use variable integer encoding, +/// so they are correctly deserialized even on a client with a different pointer size. +/// However, if the server sends a value larger than what a client can fit into `usize` +/// (which is very unlikely), the client will panic. This is expected, +/// as it means the client can't have an array of such a size anyway. +/// +/// Stored inside [`ReplicationMessages`](super::ReplicationMessages). +#[derive(Default)] pub(crate) struct InitMessage { - /// Serialized data. - cursor: Cursor>, - - /// Length of the array that updated automatically after writing data. - array_len: u16, - - /// Position of the array from last call of [`Self::start_array`]. - array_pos: u64, - - /// The number of empty arrays at the end. - trailing_empty_arrays: usize, - - /// Entity from last call of [`Self::start_entity_data`]. - data_entity: Entity, - - /// Size in bytes of the component data stored for the currently-being-written entity. - entity_data_size: u16, - - /// Position of entity from last call of [`Self::start_entity_data`]. - entity_data_pos: u64, - - /// Position of entity data length from last call of [`Self::write_data_entity`]. - entity_data_size_pos: u64, -} - -impl InitMessage { - /// Clears the message. + /// Mappings for client's pre-spawned entities. /// - /// Keeps allocated capacity for reuse. - pub(super) fn reset(&mut self) { - self.cursor.set_position(0); - self.trailing_empty_arrays = 0; - } - - /// Returns size in bytes of the current entity data. + /// Serialized as single continuous chunk of entity pairs. /// - /// See also [`Self::start_entity_data`] and [`Self::end_entity_data`]. - pub(crate) fn entity_data_size(&self) -> u16 { - self.entity_data_size - } - - /// Starts writing array by remembering its position to write length after. + /// Mappings should be processed first, so all referenced entities after it will behave correctly. /// - /// Arrays can contain entity data or despawns inside. - /// See also [`Self::end_array`], [`Self::write_client_mapping`], [`Self::write_entity`] and [`Self::start_entity_data`]. - pub(crate) fn start_array(&mut self) { - debug_assert_eq!(self.array_len, 0); - - self.array_pos = self.cursor.position(); - self.cursor - .set_position(self.array_pos + mem::size_of_val(&self.array_len) as u64); - } + /// See aslo [`ClientEntityMap`](crate::server::client_entity_map::ClientEntityMap). + mappings: Range, - /// Ends writing array by writing its length into the last remembered position. - /// - /// See also [`Self::start_array`]. - pub(crate) fn end_array(&mut self) -> bincode::Result<()> { - if self.array_len != 0 { - let previous_pos = self.cursor.position(); - self.cursor.set_position(self.array_pos); - - bincode::serialize_into(&mut self.cursor, &self.array_len)?; - - self.cursor.set_position(previous_pos); - self.array_len = 0; - self.trailing_empty_arrays = 0; - } else { - self.trailing_empty_arrays += 1; - self.cursor.set_position(self.array_pos); - bincode::serialize_into(&mut self.cursor, &self.array_len)?; - } + /// Number pairs encoded in [`Self::mappings`]. + mappings_len: usize, - Ok(()) - } - - /// Serializes entity to entity mapping as an array element. + /// Despawn happened on this tick. /// - /// Should be called only inside an array and increases its length by 1. - /// See also [`Self::start_array`]. - pub(crate) fn write_client_mapping(&mut self, mapping: &ClientMapping) -> bincode::Result<()> { - super::serialize_entity(&mut self.cursor, mapping.server_entity)?; - super::serialize_entity(&mut self.cursor, mapping.client_entity)?; - self.array_len = self - .array_len - .checked_add(1) - .ok_or(bincode::ErrorKind::SizeLimit)?; - - Ok(()) - } + /// Since clients may see different entities, it's serialized as multiple chunks of entities. + /// I.e. serialized despawns may have holes due to visibility differences. + despawns: Vec>, - /// Serializes entity as an array element. + /// Number of depspawned entities. /// - /// Reuses previously shared bytes if they exist, or updates them. - /// Should be called only inside an array and increases its length by 1. - /// See also [`Self::start_array`]. - pub(crate) fn write_entity<'a>( - &'a mut self, - shared_bytes: &mut Option<&'a [u8]>, - entity: Entity, - ) -> bincode::Result<()> { - super::write_with(shared_bytes, &mut self.cursor, |cursor| { - super::serialize_entity(cursor, entity) - })?; - - self.array_len = self - .array_len - .checked_add(1) - .ok_or(bincode::ErrorKind::SizeLimit)?; - - Ok(()) - } + /// May not be equal to the length of [`Self::despawns`] since adjacent ranges are merged together. + despawns_len: usize, - /// Starts writing entity and its data as an array element. + /// Component removals happened on this tick. /// - /// Should be called only inside an array and increases its length by 1. - /// Data can contain components with their IDs or IDs only. - /// Entity will be written lazily after first data write. - /// See also [`Self::end_entity_data`] and [`Self::write_component`]. - pub(crate) fn start_entity_data(&mut self, entity: Entity) { - debug_assert_eq!(self.entity_data_size, 0); - - self.data_entity = entity; - self.entity_data_pos = self.cursor.position(); - } - - /// Writes entity for the current data and remembers the position after it to write length later. + /// Serialized as a list of pairs of entity chunk and a list of + /// [`FnsId`](crate::core::replication::replication_registry::FnsId) + /// serialized as a single chunk. /// - /// Should be called only after first data write. - fn write_data_entity(&mut self) -> bincode::Result<()> { - super::serialize_entity(&mut self.cursor, self.data_entity)?; - self.entity_data_size_pos = self.cursor.position(); - self.cursor.set_position( - self.entity_data_size_pos + mem::size_of_val(&self.entity_data_size) as u64, - ); + /// For entities, we serialize their count like other data, but for IDs, + /// we serialize their size in bytes. + removals: Vec<(Range, Range)>, - Ok(()) - } + /// Component insertions or changes happened on this tick. + /// + /// Serialized as a list of pairs of entity chunk and multiple chunks with changed components. + /// Components are stored in multiple chunks because newly connected clients may need to serialize all components, + /// while previously connected clients only need the components spawned during this tick. + /// + /// For entities, we serialize their count like other data, but for IDs, + /// we serialize their size in bytes. + /// + /// Usually changes stored in [`UpdateMessage`], but if an entity have any insertion or removal, + /// we serialize it as part of the init message to keep entity changes atomic. + changes: Vec<(Range, Vec>)>, - /// Ends writing entity data by writing its length into the last remembered position. + /// Visibility of the entity for which changes are being written. /// - /// If the entity data is empty, nothing will be written unless `save_empty` is set to true. - /// Should be called only inside an array and increases its length by 1. - /// See also [`Self::start_array`], [`Self::write_component`] and - /// [`Self::write_component_id`]. - pub(crate) fn end_entity_data(&mut self, save_empty: bool) -> bincode::Result<()> { - if self.entity_data_size == 0 && !save_empty { - self.cursor.set_position(self.entity_data_pos); - return Ok(()); - } + /// Updated after [`Self::start_entity_changes`]. + entity_visibility: Visibility, - if self.entity_data_size == 0 { - self.write_data_entity()?; - } + /// Indicates that an entity has been written since the + /// last call of [`Self::start_entity_changes`]. + entity_written: bool, - let previous_pos = self.cursor.position(); - self.cursor.set_position(self.entity_data_size_pos); + /// Intermediate buffer to reuse allocated memory from [`Self::changes`]. + buffer: Vec>>, +} - bincode::serialize_into(&mut self.cursor, &self.entity_data_size)?; +impl InitMessage { + pub(crate) fn set_mappings(&mut self, mappings: Range, len: usize) { + self.mappings = mappings; + self.mappings_len = len; + } - self.cursor.set_position(previous_pos); - self.entity_data_size = 0; - self.array_len = self - .array_len - .checked_add(1) - .ok_or(bincode::ErrorKind::SizeLimit)?; + pub(crate) fn add_despawn(&mut self, entity: Range) { + self.despawns_len += 1; + if let Some(last) = self.despawns.last_mut() { + // Append to previous range if possible. + if last.end == entity.start { + last.end = entity.end; + return; + } + } + self.despawns.push(entity); + } - Ok(()) + pub(crate) fn add_removals(&mut self, entity: Range, fn_ids: Range) { + self.removals.push((entity, fn_ids)); } - /// Serializes component and its replication functions ID as an element of entity data. + /// Updates internal state to start writing changes for an entity with the given visibility. /// - /// Reuses previously shared bytes if they exist, or updates them. - /// Should be called only inside an entity data and increases its size. - /// See also [`Self::start_entity_data`]. - pub(crate) fn write_component<'a>( - &'a mut self, - shared_bytes: &mut Option<&'a [u8]>, - rule_fns: &UntypedRuleFns, - component_fns: &ComponentFns, - ctx: &SerializeCtx, - fns_id: FnsId, - ptr: Ptr, - ) -> bincode::Result<()> { - if self.entity_data_size == 0 { - self.write_data_entity()?; - } + /// Entities and their data written lazily during the iteration. + /// See [`Self::add_changed_entity`] and [`Self::add_changed_component`]. + pub(crate) fn start_entity_changes(&mut self, visibility: Visibility) { + self.entity_visibility = visibility; + self.entity_written = false; + } - let size = super::write_with(shared_bytes, &mut self.cursor, |cursor| { - DefaultOptions::new().serialize_into(&mut *cursor, &fns_id)?; - // SAFETY: `component_fns`, `ptr` and `rule_fns` were created for the same component type. - unsafe { component_fns.serialize(ctx, rule_fns, ptr, cursor) } - })?; + /// Visibility from the last call of [`Self::start_entity_changes`]. + pub(crate) fn entity_visibility(&self) -> Visibility { + self.entity_visibility + } - self.entity_data_size = self - .entity_data_size - .checked_add(size) - .ok_or(bincode::ErrorKind::SizeLimit)?; + /// Returns `true` if [`Self::add_changed_entity`] were called since the last + /// call of [`Self::start_entity_changes`]. + pub(crate) fn entity_written(&mut self) -> bool { + self.entity_written + } - Ok(()) + /// Adds an entity chunk. + pub(crate) fn add_changed_entity(&mut self, entity: Range) { + let components = self.buffer.pop().unwrap_or_default(); + self.changes.push((entity, components)); + self.entity_written = true; } - /// Serializes replication functions ID as an element of entity data. - /// - /// Should be called only inside an entity data and increases its size. - /// See also [`Self::start_entity_data`]. - pub(crate) fn write_fns_id(&mut self, fns_id: FnsId) -> bincode::Result<()> { - if self.entity_data_size == 0 { - self.write_data_entity()?; + /// Adds a component chunk to the last added entity from [`Self::add_changed_entity`]. + pub(crate) fn add_changed_component(&mut self, component: Range) { + let (_, components) = self + .changes + .last_mut() + .expect("entity should be written before adding components"); + + if let Some(last) = components.last_mut() { + // Append to previous range if possible. + if last.end == component.start { + last.end = component.end; + return; + } } - let previous_pos = self.cursor.position(); - DefaultOptions::new().serialize_into(&mut self.cursor, &fns_id)?; - - let id_size = self.cursor.position() - previous_pos; - self.entity_data_size = self - .entity_data_size - .checked_add(id_size as u16) - .ok_or(bincode::ErrorKind::SizeLimit)?; - - Ok(()) + components.push(component); } - /// Removes entity data elements from update message and copies it. + /// Takes last changed entity with its component chunks. /// - /// Ends entity data for the update message. - /// See also [`Self::start_entity_data`] and [`Self::end_entity_data`]. - pub(crate) fn take_entity_data( - &mut self, - update_message: &mut UpdateMessage, - ) -> bincode::Result<()> { - if update_message.entity_data_size != 0 { - if self.entity_data_size == 0 { - self.write_data_entity()?; + /// Needs to be called if an entity have any removal or insertion to keep entity updates atomic. + pub(crate) fn take_changes(&mut self, update_message: &mut UpdateMessage) { + if update_message.changes_written() { + let (entity, components_iter) = update_message + .pop_changes() + .expect("entity should be written"); + + if !self.entity_written { + let mut components = self.buffer.pop().unwrap_or_default(); + components.extend(components_iter); + self.changes.push((entity, components)); + } else { + let (_, components) = self.changes.last_mut().unwrap(); + components.extend(components_iter); } - - let slice = update_message.as_slice(); - let offset = update_message.entity_data_size_pos as usize - + mem::size_of_val(&update_message.entity_data_size); - self.cursor.write_all(&slice[offset..]).unwrap(); - - self.entity_data_size = self - .entity_data_size - .checked_add(update_message.entity_data_size) - .ok_or(bincode::ErrorKind::SizeLimit)?; - update_message.entity_data_size = 0; } - - update_message - .cursor - .set_position(update_message.entity_data_pos); - - Ok(()) } - /// Returns the serialized data, excluding trailing empty arrays, as a byte array. - fn as_slice(&self) -> &[u8] { - let slice = self.cursor.get_ref(); - let position = self.cursor.position() as usize; - let extra_len = self.trailing_empty_arrays * mem::size_of_val(&self.array_len); - &slice[..position - extra_len] + pub(crate) fn is_empty(&self) -> bool { + self.mappings.is_empty() + && self.despawns.is_empty() + && self.removals.is_empty() + && self.changes.is_empty() } - /// Sends the message, excluding trailing empty arrays, to the specified client. - /// - /// Updates change tick for the client if there are data to send. - /// Does nothing if there is no data to send. pub(crate) fn send( &self, server: &mut RepliconServer, - client: &mut ReplicatedClient, - server_tick: RepliconTick, + client: &ReplicatedClient, + serialized: &SerializedData, + server_tick: Range, ) -> bincode::Result<()> { - debug_assert_eq!(self.array_len, 0); - debug_assert_eq!(self.entity_data_size, 0); + let mut header = InitMessageHeader::default(); + let mut message_size = size_of::() + server_tick.len(); - let slice = self.as_slice(); - if slice.is_empty() { - trace!("no init data to send for {:?}", client.id()); - return Ok(()); + if !self.mappings.is_empty() { + header |= InitMessageHeader::MAPPINGS; + message_size += self.mappings_len.required_space() + self.mappings.len(); } + if !self.despawns.is_empty() { + header |= InitMessageHeader::DESPAWNS; + message_size += self.despawns_len.required_space(); + message_size += self.despawns.iter().map(|range| range.len()).sum::(); + } + if !self.removals.is_empty() { + header |= InitMessageHeader::REMOVALS; + message_size += self.removals.len().required_space(); + message_size += self + .removals + .iter() + .map(|(entity, components)| { + entity.len() + components.len().required_space() + components.len() + }) + .sum::(); + } + if !self.changes.is_empty() { + header |= InitMessageHeader::CHANGES; + message_size += self.changes.len().required_space(); + message_size += self + .changes + .iter() + .map(|(entity, components)| { + let components_size = components.iter().map(|range| range.len()).sum::(); + entity.len() + components_size.required_space() + components_size + }) + .sum::(); + } + + let mut message = Vec::with_capacity(message_size); + + message.write_fixedint(header.bits())?; + message.extend_from_slice(&serialized[server_tick]); - client.set_init_tick(server_tick); + if !self.mappings.is_empty() { + message.write_varint(self.mappings_len)?; + message.extend_from_slice(&serialized[self.mappings.clone()]); + } + if !self.despawns.is_empty() { + message.write_varint(self.despawns_len)?; + for range in &self.despawns { + message.extend_from_slice(&serialized[range.clone()]); + } + } + if !self.removals.is_empty() { + message.write_varint(self.removals.len())?; + for (entity, components) in &self.removals { + message.extend_from_slice(&serialized[entity.clone()]); + message.write_varint(components.len())?; + message.extend_from_slice(&serialized[components.clone()]); + } + } + if !self.changes.is_empty() { + message.write_varint(self.changes.len())?; + for (entity, components) in &self.changes { + let components_size = components.iter().map(|range| range.len()).sum::(); + message.extend_from_slice(&serialized[entity.clone()]); + message.write_varint(components_size)?; + for component in components { + message.extend_from_slice(&serialized[component.clone()]); + } + } + } - let mut header = [0; mem::size_of::()]; - bincode::serialize_into(&mut header[..], &server_tick)?; + debug_assert_eq!(message.len(), message_size); trace!("sending init message to {:?}", client.id()); - server.send( - client.id(), - ReplicationChannel::Init, - Bytes::from([&header, slice].concat()), - ); + server.send(client.id(), ReplicationChannel::Init, message); Ok(()) } -} -impl Default for InitMessage { - fn default() -> Self { - Self { - cursor: Default::default(), - array_len: Default::default(), - array_pos: Default::default(), - trailing_empty_arrays: Default::default(), - entity_data_size: Default::default(), - entity_data_pos: Default::default(), - entity_data_size_pos: Default::default(), - data_entity: Entity::PLACEHOLDER, - } + /// Clears all chunks. + /// + /// Keeps allocated memory for reuse. + pub(super) fn clear(&mut self) { + self.mappings = Default::default(); + self.mappings_len = 0; + self.despawns.clear(); + self.despawns_len = 0; + self.removals.clear(); + self.buffer + .extend(self.changes.drain(..).map(|(_, mut range)| { + range.clear(); + range + })); } } diff --git a/src/server/replication_messages/serialized_data.rs b/src/server/replication_messages/serialized_data.rs new file mode 100644 index 00000000..3d61c6b9 --- /dev/null +++ b/src/server/replication_messages/serialized_data.rs @@ -0,0 +1,105 @@ +use std::ops::Range; + +use bevy::{prelude::*, ptr::Ptr}; +use bincode::{DefaultOptions, Options}; +use integer_encoding::VarIntWriter; + +use crate::{ + core::{ + replication::replication_registry::{ + component_fns::ComponentFns, ctx::SerializeCtx, rule_fns::UntypedRuleFns, FnsId, + }, + replicon_tick::RepliconTick, + }, + server::client_entity_map::ClientMapping, +}; + +/// Single continious buffer that stores serialized data for messages. +/// +/// See [`InitMessage`](super::init_message::InitMessage) and +/// [`UpdateMessage`](super::update_message::UpdateMessage). +#[derive(Default, Deref, DerefMut)] +pub(crate) struct SerializedData(Vec); + +impl SerializedData { + pub(crate) fn write_mappings( + &mut self, + mappings: impl Iterator, + ) -> bincode::Result> { + let start = self.len(); + + for mapping in mappings { + self.write_entity(mapping.server_entity)?; + self.write_entity(mapping.client_entity)?; + } + + let end = self.len(); + + Ok(start..end) + } + + pub(crate) fn write_fn_ids( + &mut self, + fn_ids: impl Iterator, + ) -> bincode::Result> { + let start = self.len(); + + for fns_id in fn_ids { + DefaultOptions::new().serialize_into(&mut self.0, &fns_id)?; + } + + let end = self.len(); + + Ok(start..end) + } + + pub(crate) fn write_component( + &mut self, + rule_fns: &UntypedRuleFns, + component_fns: &ComponentFns, + ctx: &SerializeCtx, + fns_id: FnsId, + ptr: Ptr, + ) -> bincode::Result> { + let start = self.len(); + + DefaultOptions::new().serialize_into(&mut self.0, &fns_id)?; + // SAFETY: `component_fns`, `ptr` and `rule_fns` were created for the same component type. + unsafe { component_fns.serialize(ctx, rule_fns, ptr, &mut self.0)? }; + + let end = self.len(); + + Ok(start..end) + } + + /// Serializes `entity` by writing its index and generation as separate varints. + /// + /// The index is first prepended with a bit flag to indicate if the generation + /// is serialized or not. It is not serialized if <= 1; note that generations are [`NonZeroU32`](std::num::NonZeroU32) + /// and a value of zero is used in [`Option`] to signify [`None`], so generation 1 is the first + /// generation. + pub(crate) fn write_entity(&mut self, entity: Entity) -> bincode::Result> { + let start = self.len(); + + let mut flagged_index = (entity.index() as u64) << 1; + let flag = entity.generation() > 1; + flagged_index |= flag as u64; + + self.0.write_varint(flagged_index)?; + if flag { + self.0.write_varint(entity.generation() - 1)?; + } + + let end = self.len(); + + Ok(start..end) + } + + pub(crate) fn write_tick(&mut self, tick: RepliconTick) -> bincode::Result> { + let start = self.len(); + DefaultOptions::new().serialize_into(&mut self.0, &tick)?; + let end = self.len(); + + Ok(start..end) + } +} diff --git a/src/server/replication_messages/update_message.rs b/src/server/replication_messages/update_message.rs index 0978f05b..02dfd0d9 100644 --- a/src/server/replication_messages/update_message.rs +++ b/src/server/replication_messages/update_message.rs @@ -1,255 +1,199 @@ -use std::{io::Cursor, mem, time::Duration}; +use std::{io::Cursor, mem, ops::Range, time::Duration}; -use bevy::{ecs::component::Tick, prelude::*, ptr::Ptr}; +use bevy::{ecs::component::Tick, prelude::*}; use bincode::{DefaultOptions, Options}; -use bytes::Bytes; +use integer_encoding::{VarInt, VarIntWriter}; +use super::serialized_data::SerializedData; use crate::core::{ channels::ReplicationChannel, - replication::{ - replicated_clients::{ClientBuffers, ReplicatedClient}, - replication_registry::{ - component_fns::ComponentFns, ctx::SerializeCtx, rule_fns::UntypedRuleFns, FnsId, - }, - }, + replication::replicated_clients::{ClientBuffers, ReplicatedClient}, replicon_server::RepliconServer, replicon_tick::RepliconTick, }; -/// A reusable message with replicated component updates. +/// A message with replicated component updates. +/// +/// Contains change tick, current tick, update index and component updates since +/// the last acknowledged tick for each entity. /// -/// Contains change tick, current tick and component updates since the last acknowledged tick for each entity. /// Cannot be applied on the client until the init message matching this update message's change tick /// has been applied to the client world. /// The message will be manually split into packets up to max size, and each packet will be applied /// independently on the client. /// Message splits only happen per-entity to avoid weird behavior from partial entity updates. -/// Sent over the [`ReplicationChannel::Update`] channel. /// -/// See also [Limits](../index.html#limits) +/// The data is stored in the form of ranges from [`SerializedData`]. +/// +/// Sent over the [`ReplicationChannel::Update`] channel. If the update gets lost, we try to resend it manually, +/// using the last up-to-date change to avoid re-sending old values. +/// +/// Stored inside [`ReplicationMessages`](super::ReplicationMessages). +#[derive(Default)] pub(crate) struct UpdateMessage { - /// Serialized data. - pub(super) cursor: Cursor>, - - /// Entities and their sizes in the message with data. - entities: Vec<(Entity, usize)>, + /// List of entity values for [`Self::changes`]. + /// + /// Used to associate entities with update indices that the client + /// needs to acknowledge to consider the changes as received. + entities: Vec, - /// Entity from last call of [`Self::start_entity_data`]. - data_entity: Entity, + /// Component changes happened on this tick. + /// + /// Serialized as a list of pairs of entity chunk and multiple chunks with changed components. + /// Components are stored in multiple chunks because some clients may acknowledge changes, + /// while others may not. + /// + /// Unlike with [`InitMessage`](super::init_message::InitMessage::changes), + /// we don't serialize the number of entities and on deserialization just consume all remaining bytes. + /// TODO: use the same optimization for init messages. + changes: Vec<(Range, Vec>)>, - /// Size in bytes of the component data stored for the currently-being-written entity. - pub(super) entity_data_size: u16, + /// Indicates that an entity has been written since the + /// last call of [`Self::start_entity_changes`]. + changes_written: bool, - /// Position of entity from last call of [`Self::start_entity_data`]. - pub(super) entity_data_pos: u64, + /// Intermediate buffer to reuse allocated memory from [`Self::changes`]. + buffer: Vec>>, - /// Position of entity data length from last call of [`Self::write_data_entity`]. - pub(super) entity_data_size_pos: u64, + /// Intermediate buffer with update index, message size and a range for [`Self::changes`]. + /// + /// We split messages first in order to know the number of updates in advance. + /// We plan to include it in the message in the future. + messages: Vec<(u16, usize, Range)>, } impl UpdateMessage { - /// Clears the message. + /// Updates internal state to start writing changes for an entity. /// - /// Keeps allocated capacity for reuse. - pub(super) fn reset(&mut self) { - self.cursor.set_position(0); - self.entities.clear(); + /// Entities and their data written lazily during the iteration. + /// See [`Self::add_changed_entity`] and [`Self::add_changed_component`]. + pub(crate) fn start_entity_changes(&mut self) { + self.changes_written = false; } - /// Starts writing entity and its data. - /// - /// Data can contain components with their IDs. - /// Entity will be written lazily after first data write. - /// See also [`Self::end_entity_data`] and [`Self::write_component`]. - pub(crate) fn start_entity_data(&mut self, entity: Entity) { - debug_assert_eq!(self.entity_data_size, 0); - - self.data_entity = entity; - self.entity_data_pos = self.cursor.position(); + /// Returns `true` if [`Self::add_changed_entity`] were called since the last + /// call of [`Self::start_entity_changes`]. + pub(crate) fn changes_written(&mut self) -> bool { + self.changes_written } - /// Writes entity for the current data and remembers the position after it to write length later. - /// - /// Should be called only after first data write. - fn write_data_entity(&mut self) -> bincode::Result<()> { - super::serialize_entity(&mut self.cursor, self.data_entity)?; - self.entity_data_size_pos = self.cursor.position(); - self.cursor.set_position( - self.entity_data_size_pos + mem::size_of_val(&self.entity_data_size) as u64, - ); - - Ok(()) + /// Adds an entity chunk. + pub(crate) fn add_changed_entity(&mut self, entity: Entity, entity_range: Range) { + let components = self.buffer.pop().unwrap_or_default(); + self.changes.push((entity_range, components)); + self.entities.push(entity); + self.changes_written = true; } - /// Ends writing entity data by writing its length into the last remembered position. - /// - /// If the entity data is empty, nothing will be written and the cursor will reset. - /// See also [`Self::start_array`] and [`Self::write_component`]. - pub(crate) fn end_entity_data(&mut self) -> bincode::Result<()> { - if self.entity_data_size == 0 { - self.cursor.set_position(self.entity_data_pos); - return Ok(()); + /// Adds a component chunk to the last added entity from [`Self::add_changed_entity`]. + pub(crate) fn add_changed_component(&mut self, component: Range) { + let (_, components) = self + .changes + .last_mut() + .expect("entity should be written before adding components"); + + if let Some(last) = components.last_mut() { + // Append to previous range if possible. + if last.end == component.start { + last.end = component.end; + return; + } } - let previous_pos = self.cursor.position(); - self.cursor.set_position(self.entity_data_size_pos); - - bincode::serialize_into(&mut self.cursor, &self.entity_data_size)?; - - self.cursor.set_position(previous_pos); - - let data_size = self.cursor.position() - self.entity_data_pos; - self.entities.push((self.data_entity, data_size as usize)); - - self.entity_data_size = 0; - - Ok(()) + components.push(component); } - /// Serializes component and its replication functions ID as an element of entity data. - /// - /// Reuses previously shared bytes if they exist, or updates them. - /// Should be called only inside an entity data and increases its size. - /// See also [`Self::start_entity_data`]. - pub(crate) fn write_component<'a>( - &'a mut self, - shared_bytes: &mut Option<&'a [u8]>, - rule_fns: &UntypedRuleFns, - component_fns: &ComponentFns, - ctx: &SerializeCtx, - fns_id: FnsId, - ptr: Ptr, - ) -> bincode::Result<()> { - if self.entity_data_size == 0 { - self.write_data_entity()?; - } - - let size = super::write_with(shared_bytes, &mut self.cursor, |cursor| { - DefaultOptions::new().serialize_into(&mut *cursor, &fns_id)?; - // SAFETY: `component_fns`, `ptr` and `rule_fns` were created for the same component type. - unsafe { component_fns.serialize(ctx, rule_fns, ptr, cursor) } - })?; - - self.entity_data_size = self - .entity_data_size - .checked_add(size) - .ok_or(bincode::ErrorKind::SizeLimit)?; - - Ok(()) + /// Removes and returns last changed entity with its component chunks. + pub(super) fn pop_changes( + &mut self, + ) -> Option<(Range, impl Iterator> + '_)> { + let (entity, components) = self.changes.pop()?; + self.buffer.push(components); + let components = self.buffer.last_mut().unwrap(); + Some((entity, components.drain(..))) } - /// Returns the serialized data as a byte array. - pub(super) fn as_slice(&self) -> &[u8] { - let slice = self.cursor.get_ref(); - let position = self.cursor.position() as usize; - &slice[..position] + pub(crate) fn is_empty(&self) -> bool { + self.changes.is_empty() } - /// Splits message according to entities inside it and sends it to the specified client. - /// - /// Does nothing if there is no data to send. pub(crate) fn send( &mut self, server: &mut RepliconServer, client: &mut ReplicatedClient, client_buffers: &mut ClientBuffers, - server_tick: RepliconTick, + serialized: &SerializedData, + server_tick: Range, tick: Tick, timestamp: Duration, ) -> bincode::Result<()> { - debug_assert_eq!(self.entity_data_size, 0); - - let mut slice = self.as_slice(); - if slice.is_empty() { - trace!("no updates to send for {:?}", client.id()); - return Ok(()); - } - - trace!("sending update message(s) to {:?}", client.id()); - const TICKS_SIZE: usize = 2 * mem::size_of::(); - let mut header = [0; TICKS_SIZE + mem::size_of::()]; - bincode::serialize_into(&mut header[..], &(client.init_tick(), server_tick))?; + const MAX_TICK_SIZE: usize = mem::size_of::() + 1; + let mut init_tick = Cursor::new([0; MAX_TICK_SIZE]); + DefaultOptions::new().serialize_into(&mut init_tick, &client.init_tick())?; + let init_tick_size = init_tick.position() as usize; + let ticks_size = init_tick_size + server_tick.len(); - let mut message_size = 0; - let client_id = client.id(); let (mut update_index, mut entities) = client.register_update(client_buffers, tick, timestamp); - for &(entity, data_size) in &self.entities { - // Try to pack back first, then try to pack forward. - if message_size == 0 - || can_pack(header.len(), message_size, data_size) - || can_pack(header.len(), data_size, message_size) - { - entities.push(entity); + let mut message_size = ticks_size + update_index.required_space(); + let mut changes_range = Range::::default(); + for (entity, (entity_range, components)) in self.entities.iter().zip(&self.changes) { + const MAX_PACKET_SIZE: usize = 1200; // TODO: make it configurable by the messaging backend. + let components_size = components.iter().map(|range| range.len()).sum::(); + let data_size = entity_range.len() + components_size.required_space() + components_size; + + if message_size == 0 || message_size + data_size <= MAX_PACKET_SIZE { + entities.push(*entity); + changes_range.end += 1; message_size += data_size; } else { - let (message, remaining) = slice.split_at(message_size); - slice = remaining; - message_size = data_size; - - bincode::serialize_into(&mut header[TICKS_SIZE..], &update_index)?; - - server.send( - client_id, - ReplicationChannel::Update, - Bytes::from([&header, message].concat()), - ); + self.messages + .push((update_index, message_size, changes_range.clone())); + + changes_range.start = changes_range.end; + (update_index, entities) = client.register_update(client_buffers, tick, timestamp); + entities.push(*entity); + changes_range.end += 1; + message_size = ticks_size + update_index.required_space() + data_size; + } + } + if !changes_range.is_empty() { + self.messages + .push((update_index, message_size, changes_range.clone())); + } - if !slice.is_empty() { - (update_index, entities) = - client.register_update(client_buffers, tick, timestamp); + for (update_index, message_size, changes_range) in self.messages.drain(..) { + let mut message = Vec::with_capacity(message_size); + + message.extend_from_slice(&init_tick.get_ref()[..init_tick_size]); + message.extend_from_slice(&serialized[server_tick.clone()]); + message.write_varint(update_index)?; + for (entity, components) in &self.changes[changes_range.clone()] { + let components_size = components.iter().map(|range| range.len()).sum::(); + message.extend_from_slice(&serialized[entity.clone()]); + message.write_varint(components_size)?; + for component in components { + message.extend_from_slice(&serialized[component.clone()]); } } - } - if !slice.is_empty() { - bincode::serialize_into(&mut header[TICKS_SIZE..], &update_index)?; + debug_assert_eq!(message.len(), message_size); - server.send( - client_id, - ReplicationChannel::Update, - Bytes::from([&header, slice].concat()), - ); + server.send(client.id(), ReplicationChannel::Update, message); } Ok(()) } -} - -impl Default for UpdateMessage { - fn default() -> Self { - Self { - cursor: Default::default(), - entities: Default::default(), - entity_data_size: Default::default(), - entity_data_pos: Default::default(), - entity_data_size_pos: Default::default(), - data_entity: Entity::PLACEHOLDER, - } - } -} - -fn can_pack(header_size: usize, base: usize, add: usize) -> bool { - const MAX_PACKET_SIZE: usize = 1200; // TODO: make it configurable by the messaging backend. - - let dangling = (base + header_size) % MAX_PACKET_SIZE; - (dangling > 0) && ((dangling + add) <= MAX_PACKET_SIZE) -} -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn packing() { - assert!(can_pack(10, 0, 5)); - assert!(can_pack(10, 0, 1190)); - assert!(!can_pack(10, 0, 1191)); - assert!(!can_pack(10, 0, 3000)); - - assert!(can_pack(10, 1189, 1)); - assert!(!can_pack(10, 1190, 0)); - assert!(!can_pack(10, 1190, 1)); - assert!(!can_pack(10, 1190, 3000)); + /// Clears all chunks. + /// + /// Keeps allocated memory for reuse. + pub(super) fn clear(&mut self) { + self.entities.clear(); + self.buffer + .extend(self.changes.drain(..).map(|(_, mut range)| { + range.clear(); + range + })); } } diff --git a/tests/stats.rs b/tests/stats.rs index 66e5105b..e8b76518 100644 --- a/tests/stats.rs +++ b/tests/stats.rs @@ -60,7 +60,7 @@ fn client_stats() { assert_eq!(stats.mappings, 1); assert_eq!(stats.despawns, 1); assert_eq!(stats.messages, 2); - assert_eq!(stats.bytes, 33); + assert_eq!(stats.bytes, 17); } #[derive(Component, Deserialize, Serialize)] From 396d91b7bf0862548ed0e3b75aaa475bb692dc7e Mon Sep 17 00:00:00 2001 From: Hennadii Chernyshchyk Date: Wed, 13 Nov 2024 03:46:30 +0200 Subject: [PATCH 02/29] Fix doc tests --- src/core/replication/replication_rules.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/replication/replication_rules.rs b/src/core/replication/replication_rules.rs index 5689437f..0f5d2e74 100644 --- a/src/core/replication/replication_rules.rs +++ b/src/core/replication/replication_rules.rs @@ -98,9 +98,9 @@ pub trait AppRuleExt { fn serialize_translation( _ctx: &SerializeCtx, transform: &Transform, - cursor: &mut Cursor>, + message: &mut Vec, ) -> bincode::Result<()> { - bincode::serialize_into(cursor, &transform.translation) + bincode::serialize_into(message, &transform.translation) } /// Deserializes `translation` and creates [`Transform`] from it. @@ -317,7 +317,7 @@ impl GroupReplication for PlayerBundle { } } -# fn serialize_translation(_: &SerializeCtx, _: &Transform, _: &mut Cursor>) -> bincode::Result<()> { unimplemented!() } +# fn serialize_translation(_: &SerializeCtx, _: &Transform, _: &mut Vec) -> bincode::Result<()> { unimplemented!() } # fn deserialize_translation(_: &mut WriteCtx, _: &mut Cursor<&[u8]>) -> bincode::Result { unimplemented!() } ``` **/ From c1bb570295836e91422b0ad077b70c0d80f43775 Mon Sep 17 00:00:00 2001 From: Hennadii Chernyshchyk Date: Fri, 15 Nov 2024 19:30:06 +0200 Subject: [PATCH 03/29] Add a test for many entities --- tests/changes.rs | 58 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/tests/changes.rs b/tests/changes.rs index b057c6c1..d907e232 100644 --- a/tests/changes.rs +++ b/tests/changes.rs @@ -531,6 +531,64 @@ fn many_entities() { } } +#[test] +fn many_clients() { + let mut server_app = App::new(); + let mut client_app1 = App::new(); + let mut client_app2 = App::new(); + for app in [&mut server_app, &mut client_app1, &mut client_app2] { + app.add_plugins(( + MinimalPlugins, + RepliconPlugins.set(ServerPlugin { + tick_policy: TickPolicy::EveryFrame, + ..Default::default() + }), + )) + .replicate::(); + } + + server_app.connect_client(&mut client_app1); + server_app.connect_client(&mut client_app2); + + let server_entity = server_app + .world_mut() + .spawn((Replicated, BoolComponent(false))) + .id(); + + server_app.update(); + server_app.exchange_with_client(&mut client_app1); + server_app.exchange_with_client(&mut client_app2); + client_app1.update(); + client_app2.update(); + server_app.exchange_with_client(&mut client_app1); + server_app.exchange_with_client(&mut client_app2); + + // Change value. + let mut component = server_app + .world_mut() + .get_mut::(server_entity) + .unwrap(); + component.0 = true; + + server_app.update(); + server_app.exchange_with_client(&mut client_app1); + server_app.exchange_with_client(&mut client_app2); + client_app1.update(); + client_app2.update(); + + let component1 = client_app1 + .world_mut() + .query::<&BoolComponent>() + .single(client_app1.world()); + assert!(component1.0); + + let component2 = client_app2 + .world_mut() + .query::<&BoolComponent>() + .single(client_app2.world()); + assert!(component2.0); +} + #[test] fn with_insertion() { let mut server_app = App::new(); From 2f727b7adf258621b0758da65f2f7f33852757d6 Mon Sep 17 00:00:00 2001 From: Hennadii Chernyshchyk Date: Fri, 15 Nov 2024 22:34:13 +0200 Subject: [PATCH 04/29] Replace many entities test with many components --- tests/changes.rs | 112 +++++++++++++++++++++++------------------------ 1 file changed, 54 insertions(+), 58 deletions(-) diff --git a/tests/changes.rs b/tests/changes.rs index d907e232..11786201 100644 --- a/tests/changes.rs +++ b/tests/changes.rs @@ -113,6 +113,60 @@ fn package_size_component() { assert_eq!(component.0, BIG_DATA); } +#[test] +fn many_components() { + let mut server_app = App::new(); + let mut client_app = App::new(); + for app in [&mut server_app, &mut client_app] { + app.add_plugins(( + MinimalPlugins, + RepliconPlugins.set(ServerPlugin { + tick_policy: TickPolicy::EveryFrame, + ..Default::default() + }), + )) + .replicate::() + .replicate::(); + } + + server_app.connect_client(&mut client_app); + + server_app.update(); + server_app.exchange_with_client(&mut client_app); + client_app.update(); + server_app.exchange_with_client(&mut client_app); + + let server_entity = server_app + .world_mut() + .spawn((Replicated, BoolComponent(false), VecComponent::default())) + .id(); + + server_app.update(); + server_app.exchange_with_client(&mut client_app); + client_app.update(); + server_app.exchange_with_client(&mut client_app); + + let mut server_entity = server_app.world_mut().entity_mut(server_entity); + + let mut bool_component = server_entity.get_mut::().unwrap(); + bool_component.0 = true; + + const VEC_VALUE: &[u8] = &[1; 10]; + let mut vec_component = server_entity.get_mut::().unwrap(); + vec_component.0 = VEC_VALUE.to_vec(); + + server_app.update(); + server_app.exchange_with_client(&mut client_app); + client_app.update(); + + let (bool_component, vec_component) = client_app + .world_mut() + .query::<(&BoolComponent, &VecComponent)>() + .single(client_app.world()); + assert!(bool_component.0); + assert_eq!(vec_component.0, VEC_VALUE); +} + #[test] fn command_fns() { let mut server_app = App::new(); @@ -531,64 +585,6 @@ fn many_entities() { } } -#[test] -fn many_clients() { - let mut server_app = App::new(); - let mut client_app1 = App::new(); - let mut client_app2 = App::new(); - for app in [&mut server_app, &mut client_app1, &mut client_app2] { - app.add_plugins(( - MinimalPlugins, - RepliconPlugins.set(ServerPlugin { - tick_policy: TickPolicy::EveryFrame, - ..Default::default() - }), - )) - .replicate::(); - } - - server_app.connect_client(&mut client_app1); - server_app.connect_client(&mut client_app2); - - let server_entity = server_app - .world_mut() - .spawn((Replicated, BoolComponent(false))) - .id(); - - server_app.update(); - server_app.exchange_with_client(&mut client_app1); - server_app.exchange_with_client(&mut client_app2); - client_app1.update(); - client_app2.update(); - server_app.exchange_with_client(&mut client_app1); - server_app.exchange_with_client(&mut client_app2); - - // Change value. - let mut component = server_app - .world_mut() - .get_mut::(server_entity) - .unwrap(); - component.0 = true; - - server_app.update(); - server_app.exchange_with_client(&mut client_app1); - server_app.exchange_with_client(&mut client_app2); - client_app1.update(); - client_app2.update(); - - let component1 = client_app1 - .world_mut() - .query::<&BoolComponent>() - .single(client_app1.world()); - assert!(component1.0); - - let component2 = client_app2 - .world_mut() - .query::<&BoolComponent>() - .single(client_app2.world()); - assert!(component2.0); -} - #[test] fn with_insertion() { let mut server_app = App::new(); From 4fe95b3d2c017a60013e9ce97f6cecf8b6311a06 Mon Sep 17 00:00:00 2001 From: Hennadii Chernyshchyk Date: Sat, 16 Nov 2024 14:12:00 +0200 Subject: [PATCH 05/29] Rename `InitMessageHeader` into `InitMessageArrays` "Arrays" describes it better since the tick is technically also part of the header. --- src/client.rs | 12 ++++++------ src/core/replication.rs | 4 ++-- src/server/replication_messages/init_message.rs | 16 ++++++++-------- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/client.rs b/src/client.rs index 33fb7668..dcf766be 100644 --- a/src/client.rs +++ b/src/client.rs @@ -20,7 +20,7 @@ use crate::core::{ ctx::{DespawnCtx, RemoveCtx, WriteCtx}, ReplicationRegistry, }, - InitMessageHeader, Replicated, + InitMessageArrays, Replicated, }, replicon_client::RepliconClient, replicon_tick::RepliconTick, @@ -184,20 +184,20 @@ fn apply_init_message( stats.bytes += end_pos; } - let header = InitMessageHeader::from_bits_retain(cursor.read_fixedint()?); + let arrays = InitMessageArrays::from_bits_retain(cursor.read_fixedint()?); let message_tick = DefaultOptions::new().deserialize_from(&mut cursor)?; trace!("applying init message for {message_tick:?}"); world.resource_mut::().0 = message_tick; - if header.contains(InitMessageHeader::MAPPINGS) { + if arrays.contains(InitMessageArrays::MAPPINGS) { apply_entity_mappings(world, params, &mut cursor)?; } - if header.contains(InitMessageHeader::DESPAWNS) { + if arrays.contains(InitMessageArrays::DESPAWNS) { apply_despawns(world, params, &mut cursor, message_tick)?; } - if header.contains(InitMessageHeader::REMOVALS) { + if arrays.contains(InitMessageArrays::REMOVALS) { apply_init_components( world, params, @@ -207,7 +207,7 @@ fn apply_init_message( )?; } - if header.contains(InitMessageHeader::CHANGES) { + if arrays.contains(InitMessageArrays::CHANGES) { apply_init_components( world, params, diff --git a/src/core/replication.rs b/src/core/replication.rs index fc6de032..f9d03e8b 100644 --- a/src/core/replication.rs +++ b/src/core/replication.rs @@ -17,11 +17,11 @@ pub struct Replicated; use bitflags::bitflags; bitflags! { - /// Types of data that can be optionally included inside init message if the bit is set. + /// Types of arrays included in the init message if the bit is set. /// /// Serialized at the beginning of the message. #[derive(Default, Clone, Copy)] - pub(crate) struct InitMessageHeader: u8 { + pub(crate) struct InitMessageArrays: u8 { const MAPPINGS = 0b00000001; const DESPAWNS = 0b00000010; const REMOVALS = 0b00000100; diff --git a/src/server/replication_messages/init_message.rs b/src/server/replication_messages/init_message.rs index 054508d8..c96dfa5b 100644 --- a/src/server/replication_messages/init_message.rs +++ b/src/server/replication_messages/init_message.rs @@ -8,7 +8,7 @@ use crate::core::{ channels::ReplicationChannel, replication::{ replicated_clients::{client_visibility::Visibility, ReplicatedClient}, - InitMessageHeader, + InitMessageArrays, }, replicon_server::RepliconServer, }; @@ -191,20 +191,20 @@ impl InitMessage { serialized: &SerializedData, server_tick: Range, ) -> bincode::Result<()> { - let mut header = InitMessageHeader::default(); - let mut message_size = size_of::() + server_tick.len(); + let mut arrays = InitMessageArrays::default(); + let mut message_size = size_of::() + server_tick.len(); if !self.mappings.is_empty() { - header |= InitMessageHeader::MAPPINGS; + arrays |= InitMessageArrays::MAPPINGS; message_size += self.mappings_len.required_space() + self.mappings.len(); } if !self.despawns.is_empty() { - header |= InitMessageHeader::DESPAWNS; + arrays |= InitMessageArrays::DESPAWNS; message_size += self.despawns_len.required_space(); message_size += self.despawns.iter().map(|range| range.len()).sum::(); } if !self.removals.is_empty() { - header |= InitMessageHeader::REMOVALS; + arrays |= InitMessageArrays::REMOVALS; message_size += self.removals.len().required_space(); message_size += self .removals @@ -215,7 +215,7 @@ impl InitMessage { .sum::(); } if !self.changes.is_empty() { - header |= InitMessageHeader::CHANGES; + arrays |= InitMessageArrays::CHANGES; message_size += self.changes.len().required_space(); message_size += self .changes @@ -229,7 +229,7 @@ impl InitMessage { let mut message = Vec::with_capacity(message_size); - message.write_fixedint(header.bits())?; + message.write_fixedint(arrays.bits())?; message.extend_from_slice(&serialized[server_tick]); if !self.mappings.is_empty() { From 972ab21233e0bba92ee76bbac7f2c162b41742eb Mon Sep 17 00:00:00 2001 From: Hennadii Chernyshchyk Date: Sat, 16 Nov 2024 16:45:05 +0200 Subject: [PATCH 06/29] Swap order of `InitMessage::is_empty` checks Check most unlikely conditions first. --- src/server/replication_messages/init_message.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/server/replication_messages/init_message.rs b/src/server/replication_messages/init_message.rs index c96dfa5b..d81907e7 100644 --- a/src/server/replication_messages/init_message.rs +++ b/src/server/replication_messages/init_message.rs @@ -178,10 +178,10 @@ impl InitMessage { } pub(crate) fn is_empty(&self) -> bool { - self.mappings.is_empty() + self.changes.is_empty() && self.despawns.is_empty() && self.removals.is_empty() - && self.changes.is_empty() + && self.mappings.is_empty() } pub(crate) fn send( From 44270ccaee0a4b0bd27ac55f24cf3c549fda6f31 Mon Sep 17 00:00:00 2001 From: Hennadii Chernyshchyk Date: Sat, 16 Nov 2024 20:13:06 +0200 Subject: [PATCH 07/29] Use "mutation" instead of "change" for update message for clarity --- src/server.rs | 8 +- .../replication_messages/init_message.rs | 10 +-- .../replication_messages/update_message.rs | 80 +++++++++---------- 3 files changed, 49 insertions(+), 49 deletions(-) diff --git a/src/server.rs b/src/server.rs index 95caa69e..3b92ba05 100644 --- a/src/server.rs +++ b/src/server.rs @@ -486,7 +486,7 @@ fn collect_changes( { let visibility = client.visibility().visibility_state(entity.id()); init_message.start_entity_changes(visibility); - update_message.start_entity_changes(); + update_message.start_entity_mutations(); } // SAFETY: all replicated archetypes have marker component with table storage. @@ -539,13 +539,13 @@ fn collect_changes( .filter(|_| !ticks.is_added(change_tick.last_run(), change_tick.this_run())) { if ticks.is_changed(tick, change_tick.this_run()) { - if !update_message.changes_written() { + if !update_message.mutations_written() { let entity_range = write_entity_cached( &mut entity_range, serialized, entity.id(), )?; - update_message.add_changed_entity(entity.id(), entity_range); + update_message.add_mutated_entity(entity.id(), entity_range); } let component_range = write_component_cached( &mut component_range, @@ -556,7 +556,7 @@ fn collect_changes( replicated_component, component, )?; - update_message.add_changed_component(component_range); + update_message.add_mutated_component(component_range); } } else { if !init_message.entity_written() { diff --git a/src/server/replication_messages/init_message.rs b/src/server/replication_messages/init_message.rs index d81907e7..148ff33f 100644 --- a/src/server/replication_messages/init_message.rs +++ b/src/server/replication_messages/init_message.rs @@ -64,7 +64,7 @@ pub(crate) struct InitMessage { /// we serialize their size in bytes. removals: Vec<(Range, Range)>, - /// Component insertions or changes happened on this tick. + /// Component insertions or mutations happened on this tick. /// /// Serialized as a list of pairs of entity chunk and multiple chunks with changed components. /// Components are stored in multiple chunks because newly connected clients may need to serialize all components, @@ -73,7 +73,7 @@ pub(crate) struct InitMessage { /// For entities, we serialize their count like other data, but for IDs, /// we serialize their size in bytes. /// - /// Usually changes stored in [`UpdateMessage`], but if an entity have any insertion or removal, + /// Usually mutations stored in [`UpdateMessage`], but if an entity have any insertion or removal, /// we serialize it as part of the init message to keep entity changes atomic. changes: Vec<(Range, Vec>)>, @@ -112,7 +112,7 @@ impl InitMessage { self.removals.push((entity, fn_ids)); } - /// Updates internal state to start writing changes for an entity with the given visibility. + /// Updates internal state to start writing changed components for an entity with the given visibility. /// /// Entities and their data written lazily during the iteration. /// See [`Self::add_changed_entity`] and [`Self::add_changed_component`]. @@ -161,9 +161,9 @@ impl InitMessage { /// /// Needs to be called if an entity have any removal or insertion to keep entity updates atomic. pub(crate) fn take_changes(&mut self, update_message: &mut UpdateMessage) { - if update_message.changes_written() { + if update_message.mutations_written() { let (entity, components_iter) = update_message - .pop_changes() + .pop_mutations() .expect("entity should be written"); if !self.entity_written { diff --git a/src/server/replication_messages/update_message.rs b/src/server/replication_messages/update_message.rs index 02dfd0d9..3e923c78 100644 --- a/src/server/replication_messages/update_message.rs +++ b/src/server/replication_messages/update_message.rs @@ -26,36 +26,36 @@ use crate::core::{ /// The data is stored in the form of ranges from [`SerializedData`]. /// /// Sent over the [`ReplicationChannel::Update`] channel. If the update gets lost, we try to resend it manually, -/// using the last up-to-date change to avoid re-sending old values. +/// using the last up-to-date mutations to avoid re-sending old values. /// /// Stored inside [`ReplicationMessages`](super::ReplicationMessages). #[derive(Default)] pub(crate) struct UpdateMessage { - /// List of entity values for [`Self::changes`]. + /// List of entity values for [`Self::mutations`]. /// /// Used to associate entities with update indices that the client - /// needs to acknowledge to consider the changes as received. + /// needs to acknowledge to consider the mutations as received. entities: Vec, - /// Component changes happened on this tick. + /// Component mutations happened on this tick. /// - /// Serialized as a list of pairs of entity chunk and multiple chunks with changed components. - /// Components are stored in multiple chunks because some clients may acknowledge changes, + /// Serialized as a list of pairs of entity chunk and multiple chunks with mutated components. + /// Components are stored in multiple chunks because some clients may acknowledge mutations, /// while others may not. /// /// Unlike with [`InitMessage`](super::init_message::InitMessage::changes), /// we don't serialize the number of entities and on deserialization just consume all remaining bytes. /// TODO: use the same optimization for init messages. - changes: Vec<(Range, Vec>)>, + mutations: Vec<(Range, Vec>)>, /// Indicates that an entity has been written since the - /// last call of [`Self::start_entity_changes`]. - changes_written: bool, + /// last call of [`Self::start_entity_mutations`]. + mutations_written: bool, - /// Intermediate buffer to reuse allocated memory from [`Self::changes`]. + /// Intermediate buffer to reuse allocated memory from [`Self::mutations`]. buffer: Vec>>, - /// Intermediate buffer with update index, message size and a range for [`Self::changes`]. + /// Intermediate buffer with update index, message size and a range for [`Self::mutations`]. /// /// We split messages first in order to know the number of updates in advance. /// We plan to include it in the message in the future. @@ -63,32 +63,32 @@ pub(crate) struct UpdateMessage { } impl UpdateMessage { - /// Updates internal state to start writing changes for an entity. + /// Updates internal state to start writing mutated components for an entity. /// /// Entities and their data written lazily during the iteration. - /// See [`Self::add_changed_entity`] and [`Self::add_changed_component`]. - pub(crate) fn start_entity_changes(&mut self) { - self.changes_written = false; + /// See [`Self::add_mutated_entity`] and [`Self::add_mutated_component`]. + pub(crate) fn start_entity_mutations(&mut self) { + self.mutations_written = false; } - /// Returns `true` if [`Self::add_changed_entity`] were called since the last - /// call of [`Self::start_entity_changes`]. - pub(crate) fn changes_written(&mut self) -> bool { - self.changes_written + /// Returns `true` if [`Self::add_mutated_entity`] were called since the last + /// call of [`Self::start_entity_mutations`]. + pub(crate) fn mutations_written(&mut self) -> bool { + self.mutations_written } /// Adds an entity chunk. - pub(crate) fn add_changed_entity(&mut self, entity: Entity, entity_range: Range) { + pub(crate) fn add_mutated_entity(&mut self, entity: Entity, entity_range: Range) { let components = self.buffer.pop().unwrap_or_default(); - self.changes.push((entity_range, components)); + self.mutations.push((entity_range, components)); self.entities.push(entity); - self.changes_written = true; + self.mutations_written = true; } - /// Adds a component chunk to the last added entity from [`Self::add_changed_entity`]. - pub(crate) fn add_changed_component(&mut self, component: Range) { + /// Adds a component chunk to the last added entity from [`Self::add_mutated_entity`]. + pub(crate) fn add_mutated_component(&mut self, component: Range) { let (_, components) = self - .changes + .mutations .last_mut() .expect("entity should be written before adding components"); @@ -103,18 +103,18 @@ impl UpdateMessage { components.push(component); } - /// Removes and returns last changed entity with its component chunks. - pub(super) fn pop_changes( + /// Removes and returns last mutated entity with its component chunks. + pub(super) fn pop_mutations( &mut self, ) -> Option<(Range, impl Iterator> + '_)> { - let (entity, components) = self.changes.pop()?; + let (entity, components) = self.mutations.pop()?; self.buffer.push(components); let components = self.buffer.last_mut().unwrap(); Some((entity, components.drain(..))) } pub(crate) fn is_empty(&self) -> bool { - self.changes.is_empty() + self.mutations.is_empty() } pub(crate) fn send( @@ -136,39 +136,39 @@ impl UpdateMessage { let (mut update_index, mut entities) = client.register_update(client_buffers, tick, timestamp); let mut message_size = ticks_size + update_index.required_space(); - let mut changes_range = Range::::default(); - for (entity, (entity_range, components)) in self.entities.iter().zip(&self.changes) { + let mut mutations_range = Range::::default(); + for (entity, (entity_range, components)) in self.entities.iter().zip(&self.mutations) { const MAX_PACKET_SIZE: usize = 1200; // TODO: make it configurable by the messaging backend. let components_size = components.iter().map(|range| range.len()).sum::(); let data_size = entity_range.len() + components_size.required_space() + components_size; if message_size == 0 || message_size + data_size <= MAX_PACKET_SIZE { entities.push(*entity); - changes_range.end += 1; + mutations_range.end += 1; message_size += data_size; } else { self.messages - .push((update_index, message_size, changes_range.clone())); + .push((update_index, message_size, mutations_range.clone())); - changes_range.start = changes_range.end; + mutations_range.start = mutations_range.end; (update_index, entities) = client.register_update(client_buffers, tick, timestamp); entities.push(*entity); - changes_range.end += 1; + mutations_range.end += 1; message_size = ticks_size + update_index.required_space() + data_size; } } - if !changes_range.is_empty() { + if !mutations_range.is_empty() { self.messages - .push((update_index, message_size, changes_range.clone())); + .push((update_index, message_size, mutations_range.clone())); } - for (update_index, message_size, changes_range) in self.messages.drain(..) { + for (update_index, message_size, mutations_range) in self.messages.drain(..) { let mut message = Vec::with_capacity(message_size); message.extend_from_slice(&init_tick.get_ref()[..init_tick_size]); message.extend_from_slice(&serialized[server_tick.clone()]); message.write_varint(update_index)?; - for (entity, components) in &self.changes[changes_range.clone()] { + for (entity, components) in &self.mutations[mutations_range.clone()] { let components_size = components.iter().map(|range| range.len()).sum::(); message.extend_from_slice(&serialized[entity.clone()]); message.write_varint(components_size)?; @@ -191,7 +191,7 @@ impl UpdateMessage { pub(super) fn clear(&mut self) { self.entities.clear(); self.buffer - .extend(self.changes.drain(..).map(|(_, mut range)| { + .extend(self.mutations.drain(..).map(|(_, mut range)| { range.clear(); range })); From b40cab4cac8f1c458d09df668588eae5c24c0ac9 Mon Sep 17 00:00:00 2001 From: Hennadii Chernyshchyk Date: Sat, 16 Nov 2024 20:48:32 +0200 Subject: [PATCH 08/29] Don't serialize size for last array inside init message Just read until the end of the cursor. Saves us a single byte. --- src/client.rs | 497 ++++++++++-------- src/core/replication.rs | 16 +- src/core/replication/init_message_arrays.rs | 45 ++ .../replication_messages/init_message.rs | 166 +++--- .../replication_messages/update_message.rs | 4 - tests/stats.rs | 2 +- 6 files changed, 430 insertions(+), 300 deletions(-) create mode 100644 src/core/replication/init_message_arrays.rs diff --git a/src/client.rs b/src/client.rs index dcf766be..f8a6702b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -16,11 +16,12 @@ use crate::core::{ replication::{ command_markers::{CommandMarkers, EntityMarkers}, deferred_entity::DeferredEntity, + init_message_arrays::InitMessageArrays, replication_registry::{ ctx::{DespawnCtx, RemoveCtx, WriteCtx}, ReplicationRegistry, }, - InitMessageArrays, Replicated, + Replicated, }, replicon_client::RepliconClient, replicon_tick::RepliconTick, @@ -171,7 +172,9 @@ fn apply_replication( apply_update_messages(world, params, buffered_updates, init_tick) } -/// Applies [`InitMessage`](crate::server::replication_messages::InitMessage). +/// Reads and applies init message. +/// +/// For details see [`replication_messages`](crate::server::replication_messages). fn apply_init_message( world: &mut World, params: &mut ReceiveParams, @@ -189,38 +192,51 @@ fn apply_init_message( trace!("applying init message for {message_tick:?}"); world.resource_mut::().0 = message_tick; - if arrays.contains(InitMessageArrays::MAPPINGS) { - apply_entity_mappings(world, params, &mut cursor)?; - } - - if arrays.contains(InitMessageArrays::DESPAWNS) { - apply_despawns(world, params, &mut cursor, message_tick)?; - } - - if arrays.contains(InitMessageArrays::REMOVALS) { - apply_init_components( - world, - params, - ComponentsKind::Removal, - &mut cursor, - message_tick, - )?; - } - - if arrays.contains(InitMessageArrays::CHANGES) { - apply_init_components( - world, - params, - ComponentsKind::Insert, - &mut cursor, - message_tick, - )?; + let last_array = arrays.last(); + for (_, array) in arrays.iter_names() { + match array { + InitMessageArrays::MAPPINGS => { + let len = apply_array(&mut cursor, array != last_array, |cursor| { + apply_entity_mapping(world, params, cursor) + })?; + if let Some(stats) = &mut params.stats { + stats.mappings += len as u32; + } + } + InitMessageArrays::DESPAWNS => { + let len = apply_array(&mut cursor, array != last_array, |cursor| { + apply_despawn(world, params, cursor, message_tick) + })?; + if let Some(stats) = &mut params.stats { + stats.despawns += len as u32; + } + } + InitMessageArrays::REMOVALS => { + let len = apply_array(&mut cursor, array != last_array, |cursor| { + apply_removals(world, params, cursor, message_tick) + })?; + if let Some(stats) = &mut params.stats { + stats.entities_changed += len as u32; + } + } + InitMessageArrays::CHANGES => { + let len = apply_array(&mut cursor, false, |cursor| { + apply_init_changes(world, params, cursor, message_tick) + })?; + if let Some(stats) = &mut params.stats { + stats.entities_changed += len as u32; + } + } + _ => unreachable!("iteration should yield only named flags"), + } } Ok(()) } -/// Reads and buffers [`UpdateMessage`](crate::server::replication_messages::UpdateMessage). +/// Reads and buffers update message. +/// +/// For details see [`replication_messages`](crate::server::replication_messages). /// /// Returns update index to be used for acknowledgment. fn read_update_message( @@ -250,6 +266,8 @@ fn read_update_message( /// Applies updates from [`BufferedUpdates`]. /// +/// For details see [`replication_messages`](crate::server::replication_messages). +/// /// If the update message can't be applied yet (because the init message with the /// corresponding tick hasn't arrived), it will be kept in the buffer. fn apply_update_messages( @@ -265,13 +283,17 @@ fn apply_update_messages( } trace!("applying update message for {:?}", update.message_tick); - if let Err(e) = apply_update_components( - world, - params, - &mut Cursor::new(&*update.message), - update.message_tick, - ) { - result = Err(e); + let len = apply_array(&mut Cursor::new(&*update.message), false, |cursor| { + apply_update_mutations(world, params, cursor, update.message_tick) + }); + + match len { + Ok(len) => { + if let Some(stats) = &mut params.stats { + stats.entities_changed += len as u32; + } + } + Err(e) => result = Err(e), } false @@ -280,231 +302,278 @@ fn apply_update_messages( result } -/// Applies received server mappings from client's pre-spawned entities. -fn apply_entity_mappings( +/// Reads received array from the message and applies `f` to its memebers. +/// +/// If the array is serialized with the length, calls `f` the specified number of times. +/// Otherwise, calls `f` until the end of the cursor. +/// +/// See [`InitMessageArrays`] for details. +fn apply_array( + cursor: &mut Cursor<&[u8]>, + with_len: bool, + mut f: impl FnMut(&mut Cursor<&[u8]>) -> bincode::Result<()>, +) -> bincode::Result { + if with_len { + let len = cursor.read_varint()?; + for _ in 0..len { + (f)(cursor)?; + } + + Ok(len) + } else { + let mut len = 0; + let end = cursor.get_ref().len() as u64; + while cursor.position() < end { + (f)(cursor)?; + len += 1; + } + + Ok(len) + } +} + +/// Deserializes and applies server mapping from client's pre-spawned entities from init message. +fn apply_entity_mapping( world: &mut World, params: &mut ReceiveParams, cursor: &mut Cursor<&[u8]>, ) -> bincode::Result<()> { - let mappings_len: usize = cursor.read_varint()?; - if let Some(stats) = &mut params.stats { - stats.mappings += mappings_len as u32; + let server_entity = deserialize_entity(cursor)?; + let client_entity = deserialize_entity(cursor)?; + + if let Some(mut entity) = world.get_entity_mut(client_entity) { + debug!("received mapping from {server_entity:?} to {client_entity:?}"); + entity.insert(Replicated); + params.entity_map.insert(server_entity, client_entity); + } else { + // Entity could be despawned on client already. + debug!("received mapping from {server_entity:?} to {client_entity:?}, but the entity doesn't exists"); } - for _ in 0..mappings_len { - let server_entity = deserialize_entity(cursor)?; - let client_entity = deserialize_entity(cursor)?; - - if let Some(mut entity) = world.get_entity_mut(client_entity) { - debug!("received mapping from {server_entity:?} to {client_entity:?}"); - entity.insert(Replicated); - params.entity_map.insert(server_entity, client_entity); - } else { - // Entity could be despawned on client already. - debug!("received mapping from {server_entity:?} to {client_entity:?}, but the entity doesn't exists"); - } + + Ok(()) +} + +/// Deserializes and applies entity despawn from init message. +fn apply_despawn( + world: &mut World, + params: &mut ReceiveParams, + cursor: &mut Cursor<&[u8]>, + message_tick: RepliconTick, +) -> bincode::Result<()> { + // The entity might have already been despawned because of hierarchy or + // with the last replication message, but the server might not yet have received confirmation + // from the client and could include the deletion in the this message. + let server_entity = deserialize_entity(cursor)?; + if let Some(client_entity) = params + .entity_map + .remove_by_server(server_entity) + .and_then(|entity| world.get_entity_mut(entity)) + { + let ctx = DespawnCtx { message_tick }; + (params.registry.despawn)(&ctx, client_entity); } + Ok(()) } -/// Deserializes replicated components of `components_kind` and applies them to the `world`. -fn apply_init_components( +/// Deserializes and applies component removals for an entity from init message. +fn apply_removals( world: &mut World, params: &mut ReceiveParams, - components_kind: ComponentsKind, cursor: &mut Cursor<&[u8]>, message_tick: RepliconTick, ) -> bincode::Result<()> { - let entities_len: usize = cursor.read_varint()?; - for _ in 0..entities_len { - let server_entity = deserialize_entity(cursor)?; - let data_size: usize = cursor.read_varint()?; - - let client_entity = params - .entity_map - .get_by_server_or_insert(server_entity, || world.spawn(Replicated).id()); - - let (mut client_entity, mut commands) = read_entity(world, params.queue, client_entity); - params - .entity_markers - .read(params.command_markers, &*client_entity); - - if let Some(mut history) = client_entity.get_mut::() { - history.set_last_tick(message_tick); - } else { - commands - .entity(client_entity.id()) - .insert(ConfirmHistory::new(message_tick)); - } + let server_entity = deserialize_entity(cursor)?; + let data_size: usize = cursor.read_varint()?; - let end_pos = cursor.position() + data_size as u64; - let mut components_len = 0; - while cursor.position() < end_pos { - let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?; - let (component_id, component_fns, rule_fns) = params.registry.get(fns_id); - match components_kind { - ComponentsKind::Insert => { - let mut ctx = - WriteCtx::new(&mut commands, params.entity_map, component_id, message_tick); - - // SAFETY: `rule_fns` and `component_fns` were created for the same type. - unsafe { - component_fns.write( - &mut ctx, - rule_fns, - params.entity_markers, - &mut client_entity, - cursor, - )?; - } - } - ComponentsKind::Removal => { - let mut ctx = RemoveCtx { - commands: &mut commands, - message_tick, - component_id, - }; - component_fns.remove(&mut ctx, params.entity_markers, &mut client_entity); - } - } - components_len += 1; - } + let client_entity = params + .entity_map + .get_by_server_or_insert(server_entity, || world.spawn(Replicated).id()); - if let Some(stats) = &mut params.stats { - stats.entities_changed += 1; - stats.components_changed += components_len; - } + let (mut client_entity, mut commands) = read_entity(world, params.queue, client_entity); + params + .entity_markers + .read(params.command_markers, &*client_entity); + + if let Some(mut history) = client_entity.get_mut::() { + history.set_last_tick(message_tick); + } else { + commands + .entity(client_entity.id()) + .insert(ConfirmHistory::new(message_tick)); + } + + let end_pos = cursor.position() + data_size as u64; + let mut components_len = 0; + while cursor.position() < end_pos { + let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?; + let (component_id, component_fns, _) = params.registry.get(fns_id); + let mut ctx = RemoveCtx { + commands: &mut commands, + message_tick, + component_id, + }; + component_fns.remove(&mut ctx, params.entity_markers, &mut client_entity); - params.queue.apply(world); + components_len += 1; } + if let Some(stats) = &mut params.stats { + stats.components_changed += components_len; + } + + params.queue.apply(world); + Ok(()) } -/// Deserializes despawns and applies them to the `world`. -fn apply_despawns( +/// Deserializes and applies component insertions or mutations for an entity from init message. +fn apply_init_changes( world: &mut World, params: &mut ReceiveParams, cursor: &mut Cursor<&[u8]>, message_tick: RepliconTick, ) -> bincode::Result<()> { - let entities_len: usize = cursor.read_varint()?; - if let Some(stats) = &mut params.stats { - stats.despawns += entities_len as u32; + let server_entity = deserialize_entity(cursor)?; + let data_size: usize = cursor.read_varint()?; + + let client_entity = params + .entity_map + .get_by_server_or_insert(server_entity, || world.spawn(Replicated).id()); + + let (mut client_entity, mut commands) = read_entity(world, params.queue, client_entity); + params + .entity_markers + .read(params.command_markers, &*client_entity); + + if let Some(mut history) = client_entity.get_mut::() { + history.set_last_tick(message_tick); + } else { + commands + .entity(client_entity.id()) + .insert(ConfirmHistory::new(message_tick)); } - for _ in 0..entities_len { - // The entity might have already been despawned because of hierarchy or - // with the last replication message, but the server might not yet have received confirmation - // from the client and could include the deletion in the this message. - let server_entity = deserialize_entity(cursor)?; - if let Some(client_entity) = params - .entity_map - .remove_by_server(server_entity) - .and_then(|entity| world.get_entity_mut(entity)) - { - let ctx = DespawnCtx { message_tick }; - (params.registry.despawn)(&ctx, client_entity); + + let end_pos = cursor.position() + data_size as u64; + let mut components_len = 0; + while cursor.position() < end_pos { + let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?; + let (component_id, component_fns, rule_fns) = params.registry.get(fns_id); + let mut ctx = WriteCtx::new(&mut commands, params.entity_map, component_id, message_tick); + + // SAFETY: `rule_fns` and `component_fns` were created for the same type. + unsafe { + component_fns.write( + &mut ctx, + rule_fns, + params.entity_markers, + &mut client_entity, + cursor, + )?; } + components_len += 1; } + if let Some(stats) = &mut params.stats { + stats.components_changed += components_len; + } + + params.queue.apply(world); + Ok(()) } -/// Deserializes replicated component updates and applies them to the `world`. +/// Deserializes and applies component mutations for all entities from update message. /// /// Consumes all remaining bytes in the cursor. -fn apply_update_components( +fn apply_update_mutations( world: &mut World, params: &mut ReceiveParams, cursor: &mut Cursor<&[u8]>, message_tick: RepliconTick, ) -> bincode::Result<()> { - let message_end = cursor.get_ref().len() as u64; - while cursor.position() < message_end { - let server_entity = deserialize_entity(cursor)?; - let data_size: usize = cursor.read_varint()?; - - let Some(client_entity) = params.entity_map.get_by_server(server_entity) else { - // Update could arrive after a despawn from init message. - debug!("ignoring update received for unknown server's {server_entity:?}"); - cursor.set_position(cursor.position() + data_size as u64); - continue; - }; - - let (mut client_entity, mut commands) = read_entity(world, params.queue, client_entity); - params - .entity_markers - .read(params.command_markers, &*client_entity); - - let mut history = client_entity - .get_mut::() - .expect("all entities from update should have confirmed ticks"); - let new_entity = message_tick > history.last_tick(); - if new_entity { - history.set_last_tick(message_tick); - } else { - if !params.entity_markers.need_history() { - trace!( - "ignoring outdated update for client's {:?}", - client_entity.id() - ); - cursor.set_position(cursor.position() + data_size as u64); - continue; - } + let server_entity = deserialize_entity(cursor)?; + let data_size: usize = cursor.read_varint()?; + + let Some(client_entity) = params.entity_map.get_by_server(server_entity) else { + // Update could arrive after a despawn from init message. + debug!("ignoring update received for unknown server's {server_entity:?}"); + cursor.set_position(cursor.position() + data_size as u64); + return Ok(()); + }; - let ago = history.last_tick().get().wrapping_sub(message_tick.get()); - if ago >= u64::BITS { - trace!( - "discarding update {ago} ticks old for client's {:?}", - client_entity.id() - ); - cursor.set_position(cursor.position() + data_size as u64); - continue; - } + let (mut client_entity, mut commands) = read_entity(world, params.queue, client_entity); + params + .entity_markers + .read(params.command_markers, &*client_entity); + + let mut history = client_entity + .get_mut::() + .expect("all entities from update should have confirmed ticks"); + let new_entity = message_tick > history.last_tick(); + if new_entity { + history.set_last_tick(message_tick); + } else { + if !params.entity_markers.need_history() { + trace!( + "ignoring outdated update for client's {:?}", + client_entity.id() + ); + cursor.set_position(cursor.position() + data_size as u64); + return Ok(()); + } - history.set(ago); + let ago = history.last_tick().get().wrapping_sub(message_tick.get()); + if ago >= u64::BITS { + trace!( + "discarding update {ago} ticks old for client's {:?}", + client_entity.id() + ); + cursor.set_position(cursor.position() + data_size as u64); + return Ok(()); } - let end_pos = cursor.position() + data_size as u64; - let mut components_count = 0; - while cursor.position() < end_pos { - let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?; - let (component_id, component_fns, rule_fns) = params.registry.get(fns_id); - let mut ctx = - WriteCtx::new(&mut commands, params.entity_map, component_id, message_tick); - - // SAFETY: `rule_fns` and `component_fns` were created for the same type. - unsafe { - if new_entity { - component_fns.write( - &mut ctx, - rule_fns, - params.entity_markers, - &mut client_entity, - cursor, - )?; - } else { - component_fns.consume_or_write( - &mut ctx, - rule_fns, - params.entity_markers, - params.command_markers, - &mut client_entity, - cursor, - )?; - } - } + history.set(ago); + } - components_count += 1; + let end_pos = cursor.position() + data_size as u64; + let mut components_count = 0; + while cursor.position() < end_pos { + let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?; + let (component_id, component_fns, rule_fns) = params.registry.get(fns_id); + let mut ctx = WriteCtx::new(&mut commands, params.entity_map, component_id, message_tick); + + // SAFETY: `rule_fns` and `component_fns` were created for the same type. + unsafe { + if new_entity { + component_fns.write( + &mut ctx, + rule_fns, + params.entity_markers, + &mut client_entity, + cursor, + )?; + } else { + component_fns.consume_or_write( + &mut ctx, + rule_fns, + params.entity_markers, + params.command_markers, + &mut client_entity, + cursor, + )?; + } } - if let Some(stats) = &mut params.stats { - stats.entities_changed += 1; - stats.components_changed += components_count; - } + components_count += 1; + } - params.queue.apply(world); + if let Some(stats) = &mut params.stats { + stats.components_changed += components_count; } + params.queue.apply(world); + Ok(()) } @@ -552,14 +621,6 @@ struct ReceiveParams<'a> { registry: &'a ReplicationRegistry, } -/// Type of components replication. -/// -/// Parameter for [`apply_components`]. -enum ComponentsKind { - Insert, - Removal, -} - /// Set with replication and event systems related to client. #[derive(SystemSet, Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum ClientSet { diff --git a/src/core/replication.rs b/src/core/replication.rs index f9d03e8b..8f2c152c 100644 --- a/src/core/replication.rs +++ b/src/core/replication.rs @@ -1,5 +1,6 @@ pub mod command_markers; pub mod deferred_entity; +pub mod init_message_arrays; pub mod replicated_clients; pub mod replication_registry; pub mod replication_rules; @@ -13,18 +14,3 @@ pub type Replication = Replicated; #[derive(Component, Clone, Copy, Default, Reflect, Debug)] #[reflect(Component)] pub struct Replicated; - -use bitflags::bitflags; - -bitflags! { - /// Types of arrays included in the init message if the bit is set. - /// - /// Serialized at the beginning of the message. - #[derive(Default, Clone, Copy)] - pub(crate) struct InitMessageArrays: u8 { - const MAPPINGS = 0b00000001; - const DESPAWNS = 0b00000010; - const REMOVALS = 0b00000100; - const CHANGES = 0b00001000; - } -} diff --git a/src/core/replication/init_message_arrays.rs b/src/core/replication/init_message_arrays.rs new file mode 100644 index 00000000..158e50b7 --- /dev/null +++ b/src/core/replication/init_message_arrays.rs @@ -0,0 +1,45 @@ +use bitflags::bitflags; + +bitflags! { + /// Types of arrays included in the init message if the bit is set. + /// + /// Serialized at the beginning of the message. + #[derive(Default, Clone, Copy, PartialEq, Eq, Debug)] + pub(crate) struct InitMessageArrays: u8 { + const MAPPINGS = 0b00000001; + const DESPAWNS = 0b00000010; + const REMOVALS = 0b00000100; + const CHANGES = 0b00001000; + } +} + +impl InitMessageArrays { + /// Returns the last array in the message for which the bit is set. + pub(crate) fn last(self) -> InitMessageArrays { + debug_assert!(!self.is_empty()); + let zeroes = u8::BITS - 1 - self.bits().leading_zeros(); + InitMessageArrays::from_bits_retain(1 << zeroes) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn last() { + assert_eq!( + InitMessageArrays::CHANGES.last(), + InitMessageArrays::CHANGES + ); + assert_eq!( + InitMessageArrays::MAPPINGS.last(), + InitMessageArrays::MAPPINGS + ); + assert_eq!(InitMessageArrays::all().last(), InitMessageArrays::CHANGES); + assert_eq!( + (InitMessageArrays::DESPAWNS | InitMessageArrays::REMOVALS).last(), + InitMessageArrays::REMOVALS + ); + } +} diff --git a/src/server/replication_messages/init_message.rs b/src/server/replication_messages/init_message.rs index 148ff33f..cef1285a 100644 --- a/src/server/replication_messages/init_message.rs +++ b/src/server/replication_messages/init_message.rs @@ -7,8 +7,8 @@ use super::{serialized_data::SerializedData, update_message::UpdateMessage}; use crate::core::{ channels::ReplicationChannel, replication::{ + init_message_arrays::InitMessageArrays, replicated_clients::{client_visibility::Visibility, ReplicatedClient}, - InitMessageArrays, }, replicon_server::RepliconServer, }; @@ -28,6 +28,9 @@ use crate::core::{ /// (which is very unlikely), the client will panic. This is expected, /// as it means the client can't have an array of such a size anyway. /// +/// Additionally, we don't serialize the size for the last [`InitMessageArrays`] and +/// on deserialization just consume all remaining bytes. +/// /// Stored inside [`ReplicationMessages`](super::ReplicationMessages). #[derive(Default)] pub(crate) struct InitMessage { @@ -191,74 +194,94 @@ impl InitMessage { serialized: &SerializedData, server_tick: Range, ) -> bincode::Result<()> { - let mut arrays = InitMessageArrays::default(); - let mut message_size = size_of::() + server_tick.len(); + let arrays = self.arrays(); + let last_array = arrays.last(); - if !self.mappings.is_empty() { - arrays |= InitMessageArrays::MAPPINGS; - message_size += self.mappings_len.required_space() + self.mappings.len(); - } - if !self.despawns.is_empty() { - arrays |= InitMessageArrays::DESPAWNS; - message_size += self.despawns_len.required_space(); - message_size += self.despawns.iter().map(|range| range.len()).sum::(); - } - if !self.removals.is_empty() { - arrays |= InitMessageArrays::REMOVALS; - message_size += self.removals.len().required_space(); - message_size += self - .removals - .iter() - .map(|(entity, components)| { - entity.len() + components.len().required_space() + components.len() - }) - .sum::(); - } - if !self.changes.is_empty() { - arrays |= InitMessageArrays::CHANGES; - message_size += self.changes.len().required_space(); - message_size += self - .changes - .iter() - .map(|(entity, components)| { - let components_size = components.iter().map(|range| range.len()).sum::(); - entity.len() + components_size.required_space() + components_size - }) - .sum::(); + // Precalcualte size first to avoid extra allocations. + let mut message_size = size_of::() + server_tick.len(); + for (_, array) in arrays.iter_names() { + match array { + InitMessageArrays::MAPPINGS => { + if array != last_array { + message_size += self.mappings_len.required_space(); + } + message_size += self.mappings.len(); + } + InitMessageArrays::DESPAWNS => { + if array != last_array { + message_size += self.despawns_len.required_space(); + } + message_size += self.despawns.iter().map(|range| range.len()).sum::(); + } + InitMessageArrays::REMOVALS => { + if array != last_array { + message_size += self.removals.len().required_space(); + } + message_size += self + .removals + .iter() + .map(|(entity, components)| { + entity.len() + components.len().required_space() + components.len() + }) + .sum::(); + } + InitMessageArrays::CHANGES => { + message_size += self + .changes + .iter() + .map(|(entity, components)| { + let components_size = + components.iter().map(|range| range.len()).sum::(); + entity.len() + components_size.required_space() + components_size + }) + .sum::(); + } + _ => unreachable!("iteration should yield only named flags"), + } } let mut message = Vec::with_capacity(message_size); - message.write_fixedint(arrays.bits())?; message.extend_from_slice(&serialized[server_tick]); - - if !self.mappings.is_empty() { - message.write_varint(self.mappings_len)?; - message.extend_from_slice(&serialized[self.mappings.clone()]); - } - if !self.despawns.is_empty() { - message.write_varint(self.despawns_len)?; - for range in &self.despawns { - message.extend_from_slice(&serialized[range.clone()]); - } - } - if !self.removals.is_empty() { - message.write_varint(self.removals.len())?; - for (entity, components) in &self.removals { - message.extend_from_slice(&serialized[entity.clone()]); - message.write_varint(components.len())?; - message.extend_from_slice(&serialized[components.clone()]); - } - } - if !self.changes.is_empty() { - message.write_varint(self.changes.len())?; - for (entity, components) in &self.changes { - let components_size = components.iter().map(|range| range.len()).sum::(); - message.extend_from_slice(&serialized[entity.clone()]); - message.write_varint(components_size)?; - for component in components { - message.extend_from_slice(&serialized[component.clone()]); + for (_, array) in arrays.iter_names() { + match array { + InitMessageArrays::MAPPINGS => { + if array != last_array { + message.write_varint(self.mappings_len)?; + } + message.extend_from_slice(&serialized[self.mappings.clone()]); } + InitMessageArrays::DESPAWNS => { + if array != last_array { + message.write_varint(self.despawns_len)?; + } + for range in &self.despawns { + message.extend_from_slice(&serialized[range.clone()]); + } + } + InitMessageArrays::REMOVALS => { + if array != last_array { + message.write_varint(self.removals.len())?; + } + for (entity, components) in &self.removals { + message.extend_from_slice(&serialized[entity.clone()]); + message.write_varint(components.len())?; + message.extend_from_slice(&serialized[components.clone()]); + } + } + InitMessageArrays::CHANGES => { + // Changes are always the last array, don't write len for it. + for (entity, components) in &self.changes { + let components_size = + components.iter().map(|range| range.len()).sum::(); + message.extend_from_slice(&serialized[entity.clone()]); + message.write_varint(components_size)?; + for component in components { + message.extend_from_slice(&serialized[component.clone()]); + } + } + } + _ => unreachable!("iteration should yield only named flags"), } } @@ -270,6 +293,25 @@ impl InitMessage { Ok(()) } + fn arrays(&self) -> InitMessageArrays { + let mut header = InitMessageArrays::default(); + + if !self.mappings.is_empty() { + header |= InitMessageArrays::MAPPINGS; + } + if !self.despawns.is_empty() { + header |= InitMessageArrays::DESPAWNS; + } + if !self.removals.is_empty() { + header |= InitMessageArrays::REMOVALS; + } + if !self.changes.is_empty() { + header |= InitMessageArrays::CHANGES; + } + + header + } + /// Clears all chunks. /// /// Keeps allocated memory for reuse. diff --git a/src/server/replication_messages/update_message.rs b/src/server/replication_messages/update_message.rs index 3e923c78..610ab8b1 100644 --- a/src/server/replication_messages/update_message.rs +++ b/src/server/replication_messages/update_message.rs @@ -42,10 +42,6 @@ pub(crate) struct UpdateMessage { /// Serialized as a list of pairs of entity chunk and multiple chunks with mutated components. /// Components are stored in multiple chunks because some clients may acknowledge mutations, /// while others may not. - /// - /// Unlike with [`InitMessage`](super::init_message::InitMessage::changes), - /// we don't serialize the number of entities and on deserialization just consume all remaining bytes. - /// TODO: use the same optimization for init messages. mutations: Vec<(Range, Vec>)>, /// Indicates that an entity has been written since the diff --git a/tests/stats.rs b/tests/stats.rs index e8b76518..a66d831f 100644 --- a/tests/stats.rs +++ b/tests/stats.rs @@ -60,7 +60,7 @@ fn client_stats() { assert_eq!(stats.mappings, 1); assert_eq!(stats.despawns, 1); assert_eq!(stats.messages, 2); - assert_eq!(stats.bytes, 17); + assert_eq!(stats.bytes, 16); } #[derive(Component, Deserialize, Serialize)] From a148ab1051f3d1d724c1038f3d314e38ed8f0c42 Mon Sep 17 00:00:00 2001 From: Hennadii Chernyshchyk Date: Sat, 16 Nov 2024 22:09:45 +0200 Subject: [PATCH 09/29] Rename `UpdateMessage` into `MutateMessage` --- CHANGELOG.md | 2 + Cargo.toml | 2 +- benches/replication.rs | 4 +- src/client.rs | 118 +++++++++--------- src/core/channels.rs | 10 +- src/core/replication/command_markers.rs | 8 +- src/core/replication/replicated_clients.rs | 88 ++++++------- src/lib.rs | 12 +- src/server.rs | 59 +++++---- src/server/replication_messages.rs | 12 +- .../replication_messages/init_message.rs | 8 +- .../{update_message.rs => mutate_message.rs} | 43 +++---- .../replication_messages/serialized_data.rs | 2 +- tests/{changes.rs => mutations.rs} | 62 ++++----- 14 files changed, 215 insertions(+), 215 deletions(-) rename src/server/replication_messages/{update_message.rs => mutate_message.rs} (83%) rename tests/{changes.rs => mutations.rs} (93%) diff --git a/CHANGELOG.md b/CHANGELOG.md index a5a98bee..fb3ea1b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Move replication-related modules from `core` module under `core::replication`. - Move `Replicated` to the `replication` module. - Split the `ctx` module and move event-related contexts under `core::events_registry::ctx` and replication-related contexts under `core::replication_registry::ctx`. +- Rename `ServerPlugin::change_timeout` into `ServerPlugin::mutate_timeout`. +- Rename `ReplicationChannel::Update` into `ReplicationChannel::Mutations`. ### Removed diff --git a/Cargo.toml b/Cargo.toml index 941c1fd2..7ccaa2d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,7 +67,7 @@ name = "replication" harness = false [[test]] -name = "changes" +name = "mutations" required-features = ["client", "server"] [[test]] diff --git a/benches/replication.rs b/benches/replication.rs index a8a746b7..dc4616ef 100644 --- a/benches/replication.rs +++ b/benches/replication.rs @@ -82,7 +82,7 @@ fn replication(c: }) }); - c.bench_function(&format!("{name}, update send, {clients} client(s)"), |b| { + c.bench_function(&format!("{name}, mutate send, {clients} client(s)"), |b| { b.iter_custom(|iter| { let mut server_app = create_app::(); let mut client_apps = Vec::new(); @@ -154,7 +154,7 @@ fn replication(c: }) }); - c.bench_function(&format!("{name}, update receive"), |b| { + c.bench_function(&format!("{name}, mutate receive"), |b| { b.iter_custom(|iter| { let mut server_app = create_app::(); let mut client_app = create_app::(); diff --git a/src/client.rs b/src/client.rs index f8a6702b..869c0191 100644 --- a/src/client.rs +++ b/src/client.rs @@ -39,7 +39,7 @@ impl Plugin for ClientPlugin { app.init_resource::() .init_resource::() .init_resource::() - .init_resource::() + .init_resource::() .configure_sets( PreUpdate, ( @@ -77,17 +77,17 @@ impl ClientPlugin { /// Receives and applies replication messages from the server. /// /// Tick init messages are sent over the [`ReplicationChannel::Init`] and are applied first to ensure valid state - /// for entity updates. + /// for component mutations. /// - /// Entity update messages are sent over [`ReplicationChannel::Update`], which means they may appear - /// ahead-of or behind init messages from the same server tick. An update will only be applied if its + /// Mutate messages are sent over [`ReplicationChannel::Mutations`], which means they may appear + /// ahead-of or behind init messages from the same server tick. A mutation will only be applied if its /// change tick has already appeared in an init message, otherwise it will be buffered while waiting. - /// Since entity updates can arrive in any order, updates will only be applied if they correspond to a more + /// Since component mutations can arrive in any order, they will only be applied if correspond to a more /// recent server tick than the last acked server tick for each entity. /// - /// Buffered entity update messages are processed last. + /// Buffered mutate messages are processed last. /// - /// Acknowledgments for received entity update messages are sent back to the server. + /// Acknowledgments for received mutate messages are sent back to the server. /// /// See also [`ReplicationMessages`](crate::server::replication_messages::ReplicationMessages). pub(super) fn receive_replication( @@ -97,7 +97,7 @@ impl ClientPlugin { ) -> bincode::Result<()> { world.resource_scope(|world, mut client: Mut| { world.resource_scope(|world, mut entity_map: Mut| { - world.resource_scope(|world, mut buffered_updates: Mut| { + world.resource_scope(|world, mut buffered_mutations: Mut| { world.resource_scope(|world, command_markers: Mut| { world.resource_scope(|world, registry: Mut| { let mut stats = world.remove_resource::(); @@ -114,7 +114,7 @@ impl ClientPlugin { world, &mut params, &mut client, - &mut buffered_updates, + &mut buffered_mutations, )?; if let Some(stats) = stats { @@ -132,44 +132,44 @@ impl ClientPlugin { fn reset( mut init_tick: ResMut, mut entity_map: ResMut, - mut buffered_updates: ResMut, + mut buffered_mutations: ResMut, ) { *init_tick = Default::default(); entity_map.clear(); - buffered_updates.clear(); + buffered_mutations.clear(); } } /// Reads all received messages and applies them. /// -/// Sends acknowledgments for update messages back. +/// Sends acknowledgments for mutate messages back. fn apply_replication( world: &mut World, params: &mut ReceiveParams, client: &mut RepliconClient, - buffered_updates: &mut BufferedUpdates, + buffered_mutations: &mut BufferedMutations, ) -> bincode::Result<()> { for message in client.receive(ReplicationChannel::Init) { apply_init_message(world, params, &message)?; } - // Unlike init messages, we read all updates first, sort them by tick - // in descending order to ensure that the last update will be applied first. - // Since update messages manually split by packet size, we apply all messages, + // Unlike init messages, we read all mutate messages first, sort them by tick + // in descending order to ensure that the last mutation will be applied first. + // Since mutate messages manually split by packet size, we apply all messages, // but skip outdated data per-entity by checking last received tick for it // (unless user requested history via marker). let init_tick = *world.resource::(); - let acks_size = mem::size_of::() * client.received_count(ReplicationChannel::Update); + let acks_size = mem::size_of::() * client.received_count(ReplicationChannel::Mutations); if acks_size != 0 { let mut acks = Vec::with_capacity(acks_size); - for message in client.receive(ReplicationChannel::Update) { - let update_index = read_update_message(params, buffered_updates, message)?; - acks.write_varint(update_index)?; + for message in client.receive(ReplicationChannel::Mutations) { + let mutate_index = buffer_mutate_message(params, buffered_mutations, message)?; + acks.write_varint(mutate_index)?; } client.send(ReplicationChannel::Init, acks); } - apply_update_messages(world, params, buffered_updates, init_tick) + apply_mutate_messages(world, params, buffered_mutations, init_tick) } /// Reads and applies init message. @@ -234,14 +234,14 @@ fn apply_init_message( Ok(()) } -/// Reads and buffers update message. +/// Reads and buffers mutate message. /// /// For details see [`replication_messages`](crate::server::replication_messages). /// -/// Returns update index to be used for acknowledgment. -fn read_update_message( +/// Returns mutate index to be used for acknowledgment. +fn buffer_mutate_message( params: &mut ReceiveParams, - buffered_updates: &mut BufferedUpdates, + buffered_mutations: &mut BufferedMutations, message: Bytes, ) -> bincode::Result { let end_pos: u64 = message.len().try_into().unwrap(); @@ -253,38 +253,36 @@ fn read_update_message( let init_tick = DefaultOptions::new().deserialize_from(&mut cursor)?; let message_tick = DefaultOptions::new().deserialize_from(&mut cursor)?; - let update_index = cursor.read_varint()?; - trace!("received update message for {message_tick:?}"); - buffered_updates.insert(BufferedUpdate { + let mutate_index = cursor.read_varint()?; + trace!("received mutate message for {message_tick:?}"); + buffered_mutations.insert(BufferedMutate { init_tick, message_tick, message: message.slice(cursor.position() as usize..), }); - Ok(update_index) + Ok(mutate_index) } -/// Applies updates from [`BufferedUpdates`]. +/// Applies mutations from [`BufferedMutations`]. /// -/// For details see [`replication_messages`](crate::server::replication_messages). -/// -/// If the update message can't be applied yet (because the init message with the +/// If the mutate message can't be applied yet (because the init message with the /// corresponding tick hasn't arrived), it will be kept in the buffer. -fn apply_update_messages( +fn apply_mutate_messages( world: &mut World, params: &mut ReceiveParams, - buffered_updates: &mut BufferedUpdates, + buffered_mutations: &mut BufferedMutations, init_tick: ServerInitTick, ) -> bincode::Result<()> { let mut result = Ok(()); - buffered_updates.0.retain(|update| { - if update.init_tick > *init_tick { + buffered_mutations.0.retain(|mutate| { + if mutate.init_tick > *init_tick { return true; } - trace!("applying update message for {:?}", update.message_tick); - let len = apply_array(&mut Cursor::new(&*update.message), false, |cursor| { - apply_update_mutations(world, params, cursor, update.message_tick) + trace!("applying mutate message for {:?}", mutate.message_tick); + let len = apply_array(&mut Cursor::new(&*mutate.message), false, |cursor| { + apply_mutations(world, params, cursor, mutate.message_tick) }); match len { @@ -483,10 +481,10 @@ fn apply_init_changes( Ok(()) } -/// Deserializes and applies component mutations for all entities from update message. +/// Deserializes and applies component mutations for all entities from mutate message. /// /// Consumes all remaining bytes in the cursor. -fn apply_update_mutations( +fn apply_mutations( world: &mut World, params: &mut ReceiveParams, cursor: &mut Cursor<&[u8]>, @@ -496,8 +494,8 @@ fn apply_update_mutations( let data_size: usize = cursor.read_varint()?; let Some(client_entity) = params.entity_map.get_by_server(server_entity) else { - // Update could arrive after a despawn from init message. - debug!("ignoring update received for unknown server's {server_entity:?}"); + // Mutation could arrive after a despawn from init message. + debug!("ignoring mutations received for unknown server's {server_entity:?}"); cursor.set_position(cursor.position() + data_size as u64); return Ok(()); }; @@ -509,14 +507,14 @@ fn apply_update_mutations( let mut history = client_entity .get_mut::() - .expect("all entities from update should have confirmed ticks"); + .expect("all entities from mutate message should have confirmed ticks"); let new_entity = message_tick > history.last_tick(); if new_entity { history.set_last_tick(message_tick); } else { if !params.entity_markers.need_history() { trace!( - "ignoring outdated update for client's {:?}", + "ignoring outdated mutations for client's {:?}", client_entity.id() ); cursor.set_position(cursor.position() + data_size as u64); @@ -526,7 +524,7 @@ fn apply_update_mutations( let ago = history.last_tick().get().wrapping_sub(message_tick.get()); if ago >= u64::BITS { trace!( - "discarding update {ago} ticks old for client's {:?}", + "discarding {ago} ticks old mutations for client's {:?}", client_entity.id() ); cursor.set_position(cursor.position() + data_size as u64); @@ -679,42 +677,42 @@ pub enum ClientSet { /// Last received tick for init message from server. /// /// In other words, last [`RepliconTick`] with a removal, insertion, spawn or despawn. -/// When a component changes, this value is not updated. +/// When a component mutates, this value is not updated. #[derive(Clone, Copy, Debug, Default, Deref, Resource)] pub struct ServerInitTick(RepliconTick); -/// All cached buffered updates, used by the replicon client to align replication updates with initialization +/// Cached buffered mutate messages, used by the replicon client to align them with initialization /// messages. /// /// If [`ClientSet::Reset`] is disabled, then this needs to be cleaned up manually with [`Self::clear`]. #[derive(Default, Resource)] -pub struct BufferedUpdates(Vec); +pub struct BufferedMutations(Vec); -impl BufferedUpdates { +impl BufferedMutations { pub fn clear(&mut self) { self.0.clear(); } - /// Inserts a new update, maintaining sorting by their message tick in descending order. - fn insert(&mut self, update: BufferedUpdate) { + /// Inserts a new buffered message, maintaining sorting by their message tick in descending order. + fn insert(&mut self, mutation: BufferedMutate) { let index = self .0 - .partition_point(|other_update| update.message_tick < other_update.message_tick); - self.0.insert(index, update); + .partition_point(|other_mutation| mutation.message_tick < other_mutation.message_tick); + self.0.insert(index, mutation); } } -/// Caches a partially-deserialized entity update message that is waiting for its tick to appear in an init message. +/// Partially-deserialized mutate message that is waiting for its tick to appear in an init message. /// -/// See also [`crate::server::replication_messages::UpdateMessage`]. -pub(super) struct BufferedUpdate { +/// See also [`crate::server::replication_messages`]. +pub(super) struct BufferedMutate { /// Required tick to wait for. init_tick: RepliconTick, - /// The tick this update corresponds to. + /// The tick this mutations corresponds to. message_tick: RepliconTick, - /// Update data. + /// Mutations data. message: Bytes, } diff --git a/src/core/channels.rs b/src/core/channels.rs index 96c47d74..8a1caaa4 100644 --- a/src/core/channels.rs +++ b/src/core/channels.rs @@ -11,17 +11,17 @@ pub enum ReplicationChannel { /// /// This is an ordered reliable channel. Init, - /// For sending messages with component updates. + /// For sending messages with component mutations. /// /// This is an unreliable channel. - Update, + Mutations, } impl From for RepliconChannel { fn from(value: ReplicationChannel) -> Self { match value { ReplicationChannel::Init => ChannelKind::Ordered.into(), - ReplicationChannel::Update => ChannelKind::Unreliable.into(), + ReplicationChannel::Mutations => ChannelKind::Unreliable.into(), } } } @@ -54,11 +54,11 @@ impl Default for RepliconChannels { Self { server: vec![ ReplicationChannel::Init.into(), - ReplicationChannel::Update.into(), + ReplicationChannel::Mutations.into(), ], client: vec![ ReplicationChannel::Init.into(), - ReplicationChannel::Update.into(), + ReplicationChannel::Mutations.into(), ], default_max_bytes: 5 * 1024 * 1024, } diff --git a/src/core/replication/command_markers.rs b/src/core/replication/command_markers.rs index 0869dd7f..ca17361b 100644 --- a/src/core/replication/command_markers.rs +++ b/src/core/replication/command_markers.rs @@ -231,11 +231,11 @@ pub struct MarkerConfig { /// By default set to `0`. pub priority: usize, - /// Represents whether a marker needs to process old updates. + /// Represents whether a marker needs to process old mutations. /// - /// Since updates use [`ChannelKind::Unreliable`](crate::core::channels::ChannelKind), - /// a client may receive an older update for an entity. By default these updates are discarded, - /// but some markers may need them. If this field is set to `true`, old component updates will + /// Since mutations use [`ChannelKind::Unreliable`](crate::core::channels::ChannelKind), + /// a client may receive an older mutation for an entity component. By default these mutations are discarded, + /// but some markers may need them. If this field is set to `true`, old component mutations will /// be passed to the writing function for this marker. /// /// By default set to `false`. diff --git a/src/core/replication/replicated_clients.rs b/src/core/replication/replicated_clients.rs index f15d4c49..6b2f7526 100644 --- a/src/core/replication/replicated_clients.rs +++ b/src/core/replication/replicated_clients.rs @@ -180,17 +180,17 @@ pub struct ReplicatedClient { /// The last tick in which a replicated entity had an insertion, removal, or gained/lost a component from the /// perspective of the client. /// - /// It should be included in update messages and server events to avoid needless waiting for the next init + /// It should be included in mutate messages and server events to avoid needless waiting for the next init /// message to arrive. init_tick: RepliconTick, - /// Update message indexes mapped to their info. - updates: HashMap, + /// Mutate message indices mapped to their info. + mutations: HashMap, - /// Index for the next update message to be sent to this client. + /// Index for the next mutate message to be sent to this client. /// - /// See also [`Self::register_update`]. - next_update_index: u16, + /// See also [`Self::register_mutate_message`]. + next_mutate_index: u16, } impl ReplicatedClient { @@ -200,8 +200,8 @@ impl ReplicatedClient { change_ticks: Default::default(), visibility: ClientVisibility::new(policy), init_tick: Default::default(), - updates: Default::default(), - next_update_index: Default::default(), + mutations: Default::default(), + next_mutate_index: Default::default(), } } @@ -231,13 +231,13 @@ impl ReplicatedClient { self.init_tick } - /// Clears all entities for unacknowledged updates, returning them as an iterator. + /// Clears all entities for unacknowledged mutate messages, returning them as an iterator. /// /// Keeps the allocated memory for reuse. fn drain_entities(&mut self) -> impl Iterator> + '_ { - self.updates + self.mutations .drain() - .map(|(_, update_info)| update_info.entities) + .map(|(_, mutate_info)| mutate_info.entities) } /// Resets all data. @@ -247,37 +247,37 @@ impl ReplicatedClient { self.id = id; self.visibility.clear(); self.change_ticks.clear(); - self.updates.clear(); - self.next_update_index = 0; + self.mutations.clear(); + self.next_mutate_index = 0; } - /// Registers update at specified `tick` and `timestamp` and returns its index with entities to fill. + /// Registers mutate message at specified `tick` and `timestamp` and returns its index with entities to fill. /// - /// Used later to acknowledge updated entities. + /// Used later to acknowledge the message. #[must_use] - pub(crate) fn register_update( + pub(crate) fn register_mutate_message( &mut self, client_buffers: &mut ClientBuffers, tick: Tick, timestamp: Duration, ) -> (u16, &mut Vec) { - let update_index = self.next_update_index; - self.next_update_index = self.next_update_index.overflowing_add(1).0; + let mutate_index = self.next_mutate_index; + self.next_mutate_index = self.next_mutate_index.overflowing_add(1).0; let mut entities = client_buffers.entities.pop().unwrap_or_default(); entities.clear(); - let update_info = UpdateInfo { + let mutate_info = MutateInfo { tick, timestamp, entities, }; - let update_info = self - .updates - .entry(update_index) - .insert(update_info) + let mutate_info = self + .mutations + .entry(mutate_index) + .insert(mutate_info) .into_mut(); - (update_index, &mut update_info.entities) + (mutate_index, &mut mutate_info.entities) } /// Sets the change tick for an entity that is replicated to this client. @@ -293,26 +293,26 @@ impl ReplicatedClient { self.change_ticks.get(&entity).copied() } - /// Marks update with the specified index as acknowledged. + /// Marks mutate message as acknowledged by its index. /// - /// Change limits for all entities from this update will be set to the update's tick if it's higher. + /// Change tick for all entities from this mutate message will be set to the message tick if it's higher. /// /// Keeps allocated memory in the buffers for reuse. - pub(crate) fn acknowledge( + pub(crate) fn ack_mutate_message( &mut self, client_buffers: &mut ClientBuffers, tick: Tick, - update_index: u16, + mutate_index: u16, ) { - let Some(update_info) = self.updates.remove(&update_index) else { + let Some(mutate_info) = self.mutations.remove(&mutate_index) else { debug!( - "received unknown update index {update_index} from {:?}", + "received unknown mutate index {mutate_index} from {:?}", self.id ); return; }; - for entity in &update_info.entities { + for entity in &mutate_info.entities { let Some(last_tick) = self.change_ticks.get_mut(entity) else { // We ignore missing entities, since they were probably despawned. continue; @@ -320,16 +320,16 @@ impl ReplicatedClient { // Received tick could be outdated because we bump it // if we detect any insertion on the entity in `collect_changes`. - if !last_tick.is_newer_than(update_info.tick, tick) { - *last_tick = update_info.tick; + if !last_tick.is_newer_than(mutate_info.tick, tick) { + *last_tick = mutate_info.tick; } } - client_buffers.entities.push(update_info.entities); + client_buffers.entities.push(mutate_info.entities); trace!( - "{:?} acknowledged an update with {:?}", + "{:?} acknowledged mutate message with {:?}", self.id, - update_info.tick, + mutate_info.tick, ); } @@ -337,7 +337,7 @@ impl ReplicatedClient { pub fn remove_despawned(&mut self, entity: Entity) { self.change_ticks.remove(&entity); self.visibility.remove_despawned(entity); - // We don't clean up `self.updates` for efficiency reasons. + // We don't clean up `self.mutations` for efficiency reasons. // `Self::acknowledge()` will properly ignore despawned entities. } @@ -350,19 +350,19 @@ impl ReplicatedClient { }) } - /// Removes all updates older then `min_timestamp`. + /// Removes all mutate messages older then `min_timestamp`. /// /// Keeps allocated memory in the buffers for reuse. - pub(crate) fn remove_older_updates( + pub(crate) fn cleanup_older_mutations( &mut self, client_buffers: &mut ClientBuffers, min_timestamp: Duration, ) { - self.updates.retain(|_, update_info| { - if update_info.timestamp < min_timestamp { + self.mutations.retain(|_, mutate_info| { + if mutate_info.timestamp < min_timestamp { client_buffers .entities - .push(mem::take(&mut update_info.entities)); + .push(mem::take(&mut mutate_info.entities)); false } else { true @@ -379,13 +379,13 @@ pub(crate) struct ClientBuffers { /// Stored to reuse allocated memory. clients: Vec, - /// [`Vec`]'s from acknowledged update indexes from [`ReplicatedClient`]. + /// [`Vec`]'s from acknowledged [`MutateInfo`]'s. /// /// Stored to reuse allocated capacity. entities: Vec>, } -struct UpdateInfo { +struct MutateInfo { tick: Tick, timestamp: Duration, entities: Vec, diff --git a/src/lib.rs b/src/lib.rs index 97de36c1..e2ee2bd3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -583,14 +583,14 @@ For a higher level API consider using [`bevy_replicon_attributes`](https://docs. All events, inserts, removals and despawns will be applied to clients in the same order as on the server. -Entity component updates are grouped by entity, and component groupings may be applied to clients in a different order than on the server. -For example, if two entities are spawned in tick 1 on the server and their components are updated in tick 2, -then the client is guaranteed to see the spawns at the same time, but the component updates may appear in different client ticks. +Entity component mutations are grouped by entity, and component groupings may be applied to clients in a different order than on the server. +For example, if two entities are spawned in tick 1 on the server and their components are mutated in tick 2, +then the client is guaranteed to see the spawns at the same time, but the component mutations may appear in different client ticks. -If a component is dependent on other data, updates to the component will only be applied to the client when that data has arrived. -So if your component references another entity, updates to that component will only be applied when the referenced entity has been spawned on the client. +If a component is dependent on other data, mutations to the component will only be applied to the client when that data has arrived. +So if your component references another entity, mutations to that component will only be applied when the referenced entity has been spawned on the client. -Updates for despawned entities will be discarded automatically, but events or components may reference despawned entities and should be handled with that in mind. +Mutations for despawned entities will be discarded automatically, but events or components may reference despawned entities and should be handled with that in mind. Clients should never assume their world state is the same as the server's on any given tick value-wise. World state on the client is only "eventually consistent" with the server's. diff --git a/src/server.rs b/src/server.rs index 3b92ba05..1bf527ac 100644 --- a/src/server.rs +++ b/src/server.rs @@ -54,10 +54,10 @@ pub struct ServerPlugin { /// Visibility configuration. pub visibility_policy: VisibilityPolicy, - /// The time after which updates will be considered lost if an acknowledgment is not received for them. + /// The time after which mutations will be considered lost if an acknowledgment is not received for them. /// - /// In practice updates will live at least `update_timeout`, and at most `2*update_timeout`. - pub update_timeout: Duration, + /// In practice mutations will live at least `mutate_timeout`, and at most `2*mutate_timeout`. + pub mutate_timeout: Duration, /// If enabled, replication will be started automatically after connection. /// @@ -74,7 +74,7 @@ impl Default for ServerPlugin { Self { tick_policy: TickPolicy::MaxTickRate(30), visibility_policy: Default::default(), - update_timeout: Duration::from_secs(10), + mutate_timeout: Duration::from_secs(10), replicate_after_connect: true, } } @@ -123,7 +123,7 @@ impl Plugin for ServerPlugin { Self::handle_connections, Self::enable_replication, Self::receive_acks, - Self::cleanup_acks(self.update_timeout).run_if(on_timer(self.update_timeout)), + Self::cleanup_acks(self.mutate_timeout).run_if(on_timer(self.mutate_timeout)), ) .chain() .in_set(ServerSet::Receive) @@ -216,14 +216,14 @@ impl ServerPlugin { } fn cleanup_acks( - update_timeout: Duration, + mutate_timeout: Duration, ) -> impl FnMut(ResMut, ResMut, Res