diff --git a/CHANGELOG.md b/CHANGELOG.md index 3cfd29d2..584730ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,11 +13,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- Various optimizations for replication messages to use fewer bytes. +- Accept `Vec` instead of `Cursor>` for serialization. - `ConnectedClients` now store `ConnectedClient` instead of `ClientId` with more information about the client. - All `TestFnsEntityExt` now accept `FnsId`. - Move replication-related modules from `core` module under `core::replication`. - Move `Replicated` to the `replication` module. - Split the `ctx` module and move event-related contexts under `core::events_registry::ctx` and replication-related contexts under `core::replication_registry::ctx`. +- Rename `ServerPlugin::change_timeout` into `ServerPlugin::mutations_timeout`. +- Rename `ServerInitTick` into `ServerChangeTick`. +- Rename `ReplicatedClient::init_tick` into `ReplicatedClient::change_tick`. +- Rename `ReplicatedClient::get_change_tick` into `ReplicatedClient::mutation_tick`. +- Rename `ReplicationChannel::Init` into `ReplicationChannel::Changes`. +- Rename `ReplicationChannel::Update` into `ReplicationChannel::Mutations`. ### Removed diff --git a/Cargo.toml b/Cargo.toml index fba5e7c9..7ccaa2d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ bincode = "1.3" serde = "1.0" integer-encoding = "4.0" ordered-multimap = "0.7" +bitflags = "2.6" [dev-dependencies] bevy = { version = "0.14", default-features = false, features = [ @@ -66,7 +67,7 @@ name = "replication" harness = false [[test]] -name = "changes" +name = "mutations" required-features = ["client", "server"] [[test]] diff --git a/benches/replication.rs b/benches/replication.rs index a8a746b7..130be87c 100644 --- a/benches/replication.rs +++ b/benches/replication.rs @@ -49,7 +49,7 @@ fn replication(c: name = &name[MODULE_PREFIX_LEN..]; for clients in [1, 20] { - c.bench_function(&format!("{name}, init send, {clients} client(s)"), |b| { + c.bench_function(&format!("{name}, changes send, {clients} client(s)"), |b| { b.iter_custom(|iter| { let mut elapsed = Duration::ZERO; for _ in 0..iter { @@ -82,53 +82,56 @@ fn replication(c: }) }); - c.bench_function(&format!("{name}, update send, {clients} client(s)"), |b| { - b.iter_custom(|iter| { - let mut server_app = create_app::(); - let mut client_apps = Vec::new(); - for _ in 0..clients { - client_apps.push(create_app::()); - } - - for client_app in &mut client_apps { - server_app.connect_client(client_app); - } - - server_app - .world_mut() - .spawn_batch(vec![(Replicated, C::default()); ENTITIES as usize]); - let mut query = server_app.world_mut().query::<&mut C>(); - - server_app.update(); - for client_app in &mut client_apps { - server_app.exchange_with_client(client_app); - client_app.update(); - assert_eq!(client_app.world().entities().len(), ENTITIES); - } + c.bench_function( + &format!("{name}, mutations send, {clients} client(s)"), + |b| { + b.iter_custom(|iter| { + let mut server_app = create_app::(); + let mut client_apps = Vec::new(); + for _ in 0..clients { + client_apps.push(create_app::()); + } - let mut elapsed = Duration::ZERO; - for _ in 0..iter { - for mut component in query.iter_mut(server_app.world_mut()) { - component.set_changed(); + for client_app in &mut client_apps { + server_app.connect_client(client_app); } - let instant = Instant::now(); - server_app.update(); - elapsed += instant.elapsed(); + server_app + .world_mut() + .spawn_batch(vec![(Replicated, C::default()); ENTITIES as usize]); + let mut query = server_app.world_mut().query::<&mut C>(); + server_app.update(); for client_app in &mut client_apps { server_app.exchange_with_client(client_app); client_app.update(); assert_eq!(client_app.world().entities().len(), ENTITIES); } - } - elapsed - }) - }); + let mut elapsed = Duration::ZERO; + for _ in 0..iter { + for mut component in query.iter_mut(server_app.world_mut()) { + component.set_changed(); + } + + let instant = Instant::now(); + server_app.update(); + elapsed += instant.elapsed(); + + for client_app in &mut client_apps { + server_app.exchange_with_client(client_app); + client_app.update(); + assert_eq!(client_app.world().entities().len(), ENTITIES); + } + } + + elapsed + }) + }, + ); } - c.bench_function(&format!("{name}, init receive"), |b| { + c.bench_function(&format!("{name}, changes receive"), |b| { b.iter_custom(|iter| { let mut elapsed = Duration::ZERO; for _ in 0..iter { @@ -154,7 +157,7 @@ fn replication(c: }) }); - c.bench_function(&format!("{name}, update receive"), |b| { + c.bench_function(&format!("{name}, mutations receive"), |b| { b.iter_custom(|iter| { let mut server_app = create_app::(); let mut client_app = create_app::(); diff --git a/src/client.rs b/src/client.rs index c0153b60..b254c70e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -8,12 +8,13 @@ use std::{io::Cursor, mem}; use bevy::{ecs::world::CommandQueue, prelude::*}; use bincode::{DefaultOptions, Options}; use bytes::Bytes; -use integer_encoding::VarIntReader; +use integer_encoding::{FixedIntReader, VarIntReader}; use crate::core::{ channels::{ReplicationChannel, RepliconChannels}, common_conditions::{client_connected, client_just_connected, client_just_disconnected}, replication::{ + change_message_flags::ChangeMessageFlags, command_markers::{CommandMarkers, EntityMarkers}, deferred_entity::DeferredEntity, replication_registry::{ @@ -37,8 +38,8 @@ impl Plugin for ClientPlugin { fn build(&self, app: &mut App) { app.init_resource::() .init_resource::() - .init_resource::() - .init_resource::() + .init_resource::() + .init_resource::() .configure_sets( PreUpdate, ( @@ -75,18 +76,18 @@ impl ClientPlugin { /// Receives and applies replication messages from the server. /// - /// Tick init messages are sent over the [`ReplicationChannel::Init`] and are applied first to ensure valid state - /// for entity updates. + /// Change messages are sent over the [`ReplicationChannel::Changes`] and are applied first to ensure valid state + /// for component mutations. /// - /// Entity update messages are sent over [`ReplicationChannel::Update`], which means they may appear - /// ahead-of or behind init messages from the same server tick. An update will only be applied if its - /// change tick has already appeared in an init message, otherwise it will be buffered while waiting. - /// Since entity updates can arrive in any order, updates will only be applied if they correspond to a more + /// Mutate messages are sent over [`ReplicationChannel::Mutations`], which means they may appear + /// ahead-of or behind change messages from the same server tick. A mutation will only be applied if its + /// change tick has already appeared in an change message, otherwise it will be buffered while waiting. + /// Since component mutations can arrive in any order, they will only be applied if they correspond to a more /// recent server tick than the last acked server tick for each entity. /// - /// Buffered entity update messages are processed last. + /// Buffered mutate messages are processed last. /// - /// Acknowledgments for received entity update messages are sent back to the server. + /// Acknowledgments for received mutate messages are sent back to the server. /// /// See also [`ReplicationMessages`](crate::server::replication_messages::ReplicationMessages). pub(super) fn receive_replication( @@ -96,7 +97,7 @@ impl ClientPlugin { ) -> bincode::Result<()> { world.resource_scope(|world, mut client: Mut| { world.resource_scope(|world, mut entity_map: Mut| { - world.resource_scope(|world, mut buffered_updates: Mut| { + world.resource_scope(|world, mut buffered_mutations: Mut| { world.resource_scope(|world, command_markers: Mut| { world.resource_scope(|world, registry: Mut| { let mut stats = world.remove_resource::(); @@ -113,7 +114,7 @@ impl ClientPlugin { world, &mut params, &mut client, - &mut buffered_updates, + &mut buffered_mutations, )?; if let Some(stats) = stats { @@ -129,50 +130,52 @@ impl ClientPlugin { } fn reset( - mut init_tick: ResMut, + mut change_tick: ResMut, mut entity_map: ResMut, - mut buffered_updates: ResMut, + mut buffered_mutations: ResMut, ) { - *init_tick = Default::default(); + *change_tick = Default::default(); entity_map.clear(); - buffered_updates.clear(); + buffered_mutations.clear(); } } /// Reads all received messages and applies them. /// -/// Sends acknowledgments for update messages back. +/// Sends acknowledgments for mutate messages back. fn apply_replication( world: &mut World, params: &mut ReceiveParams, client: &mut RepliconClient, - buffered_updates: &mut BufferedUpdates, + buffered_mutations: &mut BufferedMutations, ) -> bincode::Result<()> { - for message in client.receive(ReplicationChannel::Init) { - apply_init_message(world, params, &message)?; + for message in client.receive(ReplicationChannel::Changes) { + apply_change_message(world, params, &message)?; } - // Unlike init messages, we read all updates first, sort them by tick - // in descending order to ensure that the last update will be applied first. - // Since update messages manually split by packet size, we apply all messages, + // Unlike change messages, we read all mutate messages first, sort them by tick + // in descending order to ensure that the last mutation will be applied first. + // Since mutate messages manually split by packet size, we apply all messages, // but skip outdated data per-entity by checking last received tick for it // (unless user requested history via marker). - let init_tick = *world.resource::(); - let acks_size = mem::size_of::() * client.received_count(ReplicationChannel::Update); + let change_tick = *world.resource::(); + let acks_size = mem::size_of::() * client.received_count(ReplicationChannel::Mutations); if acks_size != 0 { let mut acks = Vec::with_capacity(acks_size); - for message in client.receive(ReplicationChannel::Update) { - let update_index = read_update_message(params, buffered_updates, message)?; - bincode::serialize_into(&mut acks, &update_index)?; + for message in client.receive(ReplicationChannel::Mutations) { + let mutate_index = buffer_mutate_message(params, buffered_mutations, message)?; + bincode::serialize_into(&mut acks, &mutate_index)?; } - client.send(ReplicationChannel::Init, acks); + client.send(ReplicationChannel::Changes, acks); } - apply_update_messages(world, params, buffered_updates, init_tick) + apply_mutate_messages(world, params, buffered_mutations, change_tick) } -/// Applies [`InitMessage`](crate::server::replication_messages::InitMessage). -fn apply_init_message( +/// Reads and applies a change message. +/// +/// For details see [`replication_messages`](crate::server::replication_messages). +fn apply_change_message( world: &mut World, params: &mut ReceiveParams, message: &[u8], @@ -184,49 +187,71 @@ fn apply_init_message( stats.bytes += end_pos; } - let message_tick = bincode::deserialize_from(&mut cursor)?; - trace!("applying init message for {message_tick:?}"); - world.resource_mut::().0 = message_tick; - debug_assert!(cursor.position() < end_pos, "init message can't be empty"); + let flags = ChangeMessageFlags::from_bits_retain(cursor.read_fixedint()?); + debug_assert!(!flags.is_empty(), "message can't be empty"); - apply_entity_mappings(world, params, &mut cursor)?; - if cursor.position() == end_pos { - return Ok(()); - } + let message_tick = bincode::deserialize_from(&mut cursor)?; + trace!("applying change message for {message_tick:?}"); + world.resource_mut::().0 = message_tick; - apply_despawns(world, params, &mut cursor, message_tick)?; - if cursor.position() == end_pos { - return Ok(()); - } + let last_flag = flags.last(); + for (_, flag) in flags.iter_names() { + let array_kind = if flag != last_flag { + ArrayKind::Sized + } else { + ArrayKind::Dynamic + }; - apply_init_components( - world, - params, - ComponentsKind::Removal, - &mut cursor, - message_tick, - )?; - if cursor.position() == end_pos { - return Ok(()); + match flag { + ChangeMessageFlags::MAPPINGS => { + debug_assert_eq!(array_kind, ArrayKind::Sized); + let len = apply_array(array_kind, &mut cursor, |cursor| { + apply_entity_mapping(world, params, cursor) + })?; + if let Some(stats) = &mut params.stats { + stats.mappings += len as u32; + } + } + ChangeMessageFlags::DESPAWNS => { + let len = apply_array(array_kind, &mut cursor, |cursor| { + apply_despawn(world, params, cursor, message_tick) + })?; + if let Some(stats) = &mut params.stats { + stats.despawns += len as u32; + } + } + ChangeMessageFlags::REMOVALS => { + let len = apply_array(array_kind, &mut cursor, |cursor| { + apply_removals(world, params, cursor, message_tick) + })?; + if let Some(stats) = &mut params.stats { + stats.entities_changed += len as u32; + } + } + ChangeMessageFlags::CHANGES => { + debug_assert_eq!(array_kind, ArrayKind::Dynamic); + let len = apply_array(array_kind, &mut cursor, |cursor| { + apply_changes(world, params, cursor, message_tick) + })?; + if let Some(stats) = &mut params.stats { + stats.entities_changed += len as u32; + } + } + _ => unreachable!("iteration should yield only named flags"), + } } - apply_init_components( - world, - params, - ComponentsKind::Insert, - &mut cursor, - message_tick, - )?; - Ok(()) } -/// Reads and buffers [`UpdateMessage`](crate::server::replication_messages::UpdateMessage). +/// Reads and buffers mutate message. /// -/// Returns update index to be used for acknowledgment. -fn read_update_message( +/// For details see [`replication_messages`](crate::server::replication_messages). +/// +/// Returns mutate index to be used for acknowledgment. +fn buffer_mutate_message( params: &mut ReceiveParams, - buffered_updates: &mut BufferedUpdates, + buffered_mutations: &mut BufferedMutations, message: Bytes, ) -> bincode::Result { let end_pos: u64 = message.len().try_into().unwrap(); @@ -236,41 +261,49 @@ fn read_update_message( stats.bytes += end_pos; } - let (init_tick, message_tick, update_index) = bincode::deserialize_from(&mut cursor)?; - trace!("received update message for {message_tick:?}"); - buffered_updates.insert(BufferedUpdate { - init_tick, + let change_tick = bincode::deserialize_from(&mut cursor)?; + let message_tick = bincode::deserialize_from(&mut cursor)?; + let mutate_index = cursor.read_varint()?; + trace!("received mutate message for {message_tick:?}"); + buffered_mutations.insert(BufferedMutate { + change_tick, message_tick, message: message.slice(cursor.position() as usize..), }); - Ok(update_index) + Ok(mutate_index) } -/// Applies updates from [`BufferedUpdates`]. +/// Applies mutations from [`BufferedMutations`]. /// -/// If the update message can't be applied yet (because the init message with the +/// If the mutate message can't be applied yet (because the change message with the /// corresponding tick hasn't arrived), it will be kept in the buffer. -fn apply_update_messages( +fn apply_mutate_messages( world: &mut World, params: &mut ReceiveParams, - buffered_updates: &mut BufferedUpdates, - init_tick: ServerInitTick, + buffered_mutations: &mut BufferedMutations, + change_tick: ServerChangeTick, ) -> bincode::Result<()> { let mut result = Ok(()); - buffered_updates.0.retain(|update| { - if update.init_tick > *init_tick { + buffered_mutations.0.retain(|mutate| { + if mutate.change_tick > *change_tick { return true; } - trace!("applying update message for {:?}", update.message_tick); - if let Err(e) = apply_update_components( - world, - params, - &mut Cursor::new(&*update.message), - update.message_tick, - ) { - result = Err(e); + trace!("applying mutate message for {:?}", mutate.message_tick); + let len = apply_array( + ArrayKind::Dynamic, + &mut Cursor::new(&*mutate.message), + |cursor| apply_mutations(world, params, cursor, mutate.message_tick), + ); + + match len { + Ok(len) => { + if let Some(stats) = &mut params.stats { + stats.entities_changed += len as u32; + } + } + Err(e) => result = Err(e), } false @@ -279,231 +312,279 @@ fn apply_update_messages( result } -/// Applies received server mappings from client's pre-spawned entities. -fn apply_entity_mappings( +/// Deserializes and applies server mapping from client's pre-spawned entities. +fn apply_entity_mapping( world: &mut World, params: &mut ReceiveParams, cursor: &mut Cursor<&[u8]>, ) -> bincode::Result<()> { - let mappings_len: u16 = bincode::deserialize_from(&mut *cursor)?; - if let Some(stats) = &mut params.stats { - stats.mappings += mappings_len as u32; + let server_entity = deserialize_entity(cursor)?; + let client_entity = deserialize_entity(cursor)?; + + if let Some(mut entity) = world.get_entity_mut(client_entity) { + debug!("received mapping from {server_entity:?} to {client_entity:?}"); + entity.insert(Replicated); + params.entity_map.insert(server_entity, client_entity); + } else { + // Entity could be despawned on client already. + debug!("received mapping from {server_entity:?} to {client_entity:?}, but the entity doesn't exists"); } - for _ in 0..mappings_len { - let server_entity = deserialize_entity(cursor)?; - let client_entity = deserialize_entity(cursor)?; - - if let Some(mut entity) = world.get_entity_mut(client_entity) { - debug!("received mapping from {server_entity:?} to {client_entity:?}"); - entity.insert(Replicated); - params.entity_map.insert(server_entity, client_entity); - } else { - // Entity could be despawned on client already. - debug!("received mapping from {server_entity:?} to {client_entity:?}, but the entity doesn't exists"); - } + + Ok(()) +} + +/// Deserializes and applies entity despawn from change message. +fn apply_despawn( + world: &mut World, + params: &mut ReceiveParams, + cursor: &mut Cursor<&[u8]>, + message_tick: RepliconTick, +) -> bincode::Result<()> { + // The entity might have already been despawned because of hierarchy or + // with the last replication message, but the server might not yet have received confirmation + // from the client and could include the deletion in the this message. + let server_entity = deserialize_entity(cursor)?; + if let Some(client_entity) = params + .entity_map + .remove_by_server(server_entity) + .and_then(|entity| world.get_entity_mut(entity)) + { + let ctx = DespawnCtx { message_tick }; + (params.registry.despawn)(&ctx, client_entity); } + Ok(()) } -/// Deserializes replicated components of `components_kind` and applies them to the `world`. -fn apply_init_components( +/// Deserializes and applies component removals for an entity. +fn apply_removals( world: &mut World, params: &mut ReceiveParams, - components_kind: ComponentsKind, cursor: &mut Cursor<&[u8]>, message_tick: RepliconTick, ) -> bincode::Result<()> { - let entities_len: u16 = bincode::deserialize_from(&mut *cursor)?; - for _ in 0..entities_len { - let server_entity = deserialize_entity(cursor)?; - let data_size: u16 = bincode::deserialize_from(&mut *cursor)?; - - let client_entity = params - .entity_map - .get_by_server_or_insert(server_entity, || world.spawn(Replicated).id()); - - let (mut client_entity, mut commands) = read_entity(world, params.queue, client_entity); - params - .entity_markers - .read(params.command_markers, &*client_entity); - - if let Some(mut history) = client_entity.get_mut::() { - history.set_last_tick(message_tick); - } else { - commands - .entity(client_entity.id()) - .insert(ConfirmHistory::new(message_tick)); - } + let server_entity = deserialize_entity(cursor)?; - let end_pos = cursor.position() + data_size as u64; - let mut components_len = 0u32; - while cursor.position() < end_pos { - let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?; - let (component_id, component_fns, rule_fns) = params.registry.get(fns_id); - match components_kind { - ComponentsKind::Insert => { - let mut ctx = - WriteCtx::new(&mut commands, params.entity_map, component_id, message_tick); - - // SAFETY: `rule_fns` and `component_fns` were created for the same type. - unsafe { - component_fns.write( - &mut ctx, - rule_fns, - params.entity_markers, - &mut client_entity, - cursor, - )?; - } - } - ComponentsKind::Removal => { - let mut ctx = RemoveCtx { - commands: &mut commands, - message_tick, - component_id, - }; - component_fns.remove(&mut ctx, params.entity_markers, &mut client_entity); - } - } - components_len += 1; - } + let client_entity = params + .entity_map + .get_by_server_or_insert(server_entity, || world.spawn(Replicated).id()); - if let Some(stats) = &mut params.stats { - stats.entities_changed += 1; - stats.components_changed += components_len; - } + let (mut client_entity, mut commands) = read_entity(world, params.queue, client_entity); + params + .entity_markers + .read(params.command_markers, &*client_entity); + + if let Some(mut history) = client_entity.get_mut::() { + history.set_last_tick(message_tick); + } else { + commands + .entity(client_entity.id()) + .insert(ConfirmHistory::new(message_tick)); + } + + let len = apply_array(ArrayKind::Sized, cursor, |cursor| { + let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?; + let (component_id, component_fns, _) = params.registry.get(fns_id); + let mut ctx = RemoveCtx { + commands: &mut commands, + message_tick, + component_id, + }; + component_fns.remove(&mut ctx, params.entity_markers, &mut client_entity); + + Ok(()) + })?; - params.queue.apply(world); + if let Some(stats) = &mut params.stats { + stats.components_changed += len as u32; } + params.queue.apply(world); + Ok(()) } -/// Deserializes despawns and applies them to the `world`. -fn apply_despawns( +/// Deserializes and applies component insertions and/or mutations for an entity. +fn apply_changes( world: &mut World, params: &mut ReceiveParams, cursor: &mut Cursor<&[u8]>, message_tick: RepliconTick, ) -> bincode::Result<()> { - let entities_len: u16 = bincode::deserialize_from(&mut *cursor)?; - if let Some(stats) = &mut params.stats { - stats.despawns += entities_len as u32; + let server_entity = deserialize_entity(cursor)?; + + let client_entity = params + .entity_map + .get_by_server_or_insert(server_entity, || world.spawn(Replicated).id()); + + let (mut client_entity, mut commands) = read_entity(world, params.queue, client_entity); + params + .entity_markers + .read(params.command_markers, &*client_entity); + + if let Some(mut history) = client_entity.get_mut::() { + history.set_last_tick(message_tick); + } else { + commands + .entity(client_entity.id()) + .insert(ConfirmHistory::new(message_tick)); } - for _ in 0..entities_len { - // The entity might have already been despawned because of hierarchy or - // with the last replication message, but the server might not yet have received confirmation - // from the client and could include the deletion in the this message. - let server_entity = deserialize_entity(cursor)?; - if let Some(client_entity) = params - .entity_map - .remove_by_server(server_entity) - .and_then(|entity| world.get_entity_mut(entity)) - { - let ctx = DespawnCtx { message_tick }; - (params.registry.despawn)(&ctx, client_entity); + + let len = apply_array(ArrayKind::Sized, cursor, |cursor| { + let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?; + let (component_id, component_fns, rule_fns) = params.registry.get(fns_id); + let mut ctx = WriteCtx::new(&mut commands, params.entity_map, component_id, message_tick); + + // SAFETY: `rule_fns` and `component_fns` were created for the same type. + unsafe { + component_fns.write( + &mut ctx, + rule_fns, + params.entity_markers, + &mut client_entity, + cursor, + )?; } + + Ok(()) + })?; + + if let Some(stats) = &mut params.stats { + stats.components_changed += len as u32; } + params.queue.apply(world); + Ok(()) } -/// Deserializes replicated component updates and applies them to the `world`. +fn apply_array( + kind: ArrayKind, + cursor: &mut Cursor<&[u8]>, + mut f: impl FnMut(&mut Cursor<&[u8]>) -> bincode::Result<()>, +) -> bincode::Result { + match kind { + ArrayKind::Sized => { + let len = cursor.read_varint()?; + for _ in 0..len { + (f)(cursor)?; + } + + Ok(len) + } + ArrayKind::Dynamic => { + let mut len = 0; + let end = cursor.get_ref().len() as u64; + while cursor.position() < end { + (f)(cursor)?; + len += 1; + } + + Ok(len) + } + } +} + +/// Type of serialized array. +#[derive(PartialEq, Eq, Debug)] +enum ArrayKind { + /// Size is serialized before the array. + Sized, + /// Size is unknown, means that all bytes needs to be consumed. + Dynamic, +} + +/// Deserializes and applies component mutations for all entities. /// /// Consumes all remaining bytes in the cursor. -fn apply_update_components( +fn apply_mutations( world: &mut World, params: &mut ReceiveParams, cursor: &mut Cursor<&[u8]>, message_tick: RepliconTick, ) -> bincode::Result<()> { - let message_end = cursor.get_ref().len() as u64; - while cursor.position() < message_end { - let server_entity = deserialize_entity(cursor)?; - let data_size: u16 = bincode::deserialize_from(&mut *cursor)?; - - let Some(client_entity) = params.entity_map.get_by_server(server_entity) else { - // Update could arrive after a despawn from init message. - debug!("ignoring update received for unknown server's {server_entity:?}"); - cursor.set_position(cursor.position() + data_size as u64); - continue; - }; + let server_entity = deserialize_entity(cursor)?; + let data_size: usize = cursor.read_varint()?; - let (mut client_entity, mut commands) = read_entity(world, params.queue, client_entity); - params - .entity_markers - .read(params.command_markers, &*client_entity); - - let mut history = client_entity - .get_mut::() - .expect("all entities from update should have confirmed ticks"); - let new_entity = message_tick > history.last_tick(); - if new_entity { - history.set_last_tick(message_tick); - } else { - if !params.entity_markers.need_history() { - trace!( - "ignoring outdated update for client's {:?}", - client_entity.id() - ); - cursor.set_position(cursor.position() + data_size as u64); - continue; - } + let Some(client_entity) = params.entity_map.get_by_server(server_entity) else { + // Mutation could arrive after a despawn from change message. + debug!("ignoring mutations received for unknown server's {server_entity:?}"); + cursor.set_position(cursor.position() + data_size as u64); + return Ok(()); + }; - let ago = history.last_tick().get().wrapping_sub(message_tick.get()); - if ago >= u64::BITS { - trace!( - "discarding update {ago} ticks old for client's {:?}", - client_entity.id() - ); - cursor.set_position(cursor.position() + data_size as u64); - continue; - } + let (mut client_entity, mut commands) = read_entity(world, params.queue, client_entity); + params + .entity_markers + .read(params.command_markers, &*client_entity); + + let mut history = client_entity + .get_mut::() + .expect("all entities from mutate message should have confirmed ticks"); + let new_tick = message_tick > history.last_tick(); + if new_tick { + history.set_last_tick(message_tick); + } else { + if !params.entity_markers.need_history() { + trace!( + "ignoring outdated mutations for client's {:?}", + client_entity.id() + ); + cursor.set_position(cursor.position() + data_size as u64); + return Ok(()); + } - history.set(ago); + let ago = history.last_tick().get().wrapping_sub(message_tick.get()); + if ago >= u64::BITS { + trace!( + "discarding {ago} ticks old mutations for client's {:?}", + client_entity.id() + ); + cursor.set_position(cursor.position() + data_size as u64); + return Ok(()); } - let end_pos = cursor.position() + data_size as u64; - let mut components_count = 0u32; - while cursor.position() < end_pos { - let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?; - let (component_id, component_fns, rule_fns) = params.registry.get(fns_id); - let mut ctx = - WriteCtx::new(&mut commands, params.entity_map, component_id, message_tick); - - // SAFETY: `rule_fns` and `component_fns` were created for the same type. - unsafe { - if new_entity { - component_fns.write( - &mut ctx, - rule_fns, - params.entity_markers, - &mut client_entity, - cursor, - )?; - } else { - component_fns.consume_or_write( - &mut ctx, - rule_fns, - params.entity_markers, - params.command_markers, - &mut client_entity, - cursor, - )?; - } - } + history.set(ago); + } - components_count += 1; + let end_pos = cursor.position() + data_size as u64; + let mut components_count = 0; + while cursor.position() < end_pos { + let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?; + let (component_id, component_fns, rule_fns) = params.registry.get(fns_id); + let mut ctx = WriteCtx::new(&mut commands, params.entity_map, component_id, message_tick); + + // SAFETY: `rule_fns` and `component_fns` were created for the same type. + unsafe { + if new_tick { + component_fns.write( + &mut ctx, + rule_fns, + params.entity_markers, + &mut client_entity, + cursor, + )?; + } else { + component_fns.consume_or_write( + &mut ctx, + rule_fns, + params.entity_markers, + params.command_markers, + &mut client_entity, + cursor, + )?; + } } - if let Some(stats) = &mut params.stats { - stats.entities_changed += 1; - stats.components_changed += components_count; - } + components_count += 1; + } - params.queue.apply(world); + if let Some(stats) = &mut params.stats { + stats.components_changed += components_count; } + params.queue.apply(world); + Ok(()) } @@ -551,14 +632,6 @@ struct ReceiveParams<'a> { registry: &'a ReplicationRegistry, } -/// Type of components replication. -/// -/// Parameter for [`apply_components`]. -enum ComponentsKind { - Insert, - Removal, -} - /// Set with replication and event systems related to client. #[derive(SystemSet, Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum ClientSet { @@ -614,45 +687,44 @@ pub enum ClientSet { Reset, } -/// Last received tick for init message from server. +/// Last received tick for change messages from the server. /// -/// In other words, last [`RepliconTick`] with a removal, insertion, spawn or despawn. -/// When a component changes, this value is not updated. +/// In other words, the last [`RepliconTick`] with a removal, insertion, spawn or despawn. +/// This value is not updated when mutation messages are received from the server. #[derive(Clone, Copy, Debug, Default, Deref, Resource)] -pub struct ServerInitTick(RepliconTick); +pub struct ServerChangeTick(RepliconTick); -/// All cached buffered updates, used by the replicon client to align replication updates with initialization -/// messages. +/// Cached buffered mutate messages, used to synchronize mutations with change messages. /// /// If [`ClientSet::Reset`] is disabled, then this needs to be cleaned up manually with [`Self::clear`]. #[derive(Default, Resource)] -pub struct BufferedUpdates(Vec); +pub struct BufferedMutations(Vec); -impl BufferedUpdates { +impl BufferedMutations { pub fn clear(&mut self) { self.0.clear(); } - /// Inserts a new update, maintaining sorting by their message tick in descending order. - fn insert(&mut self, update: BufferedUpdate) { + /// Inserts a new buffered message, maintaining sorting by their message tick in descending order. + fn insert(&mut self, mutation: BufferedMutate) { let index = self .0 - .partition_point(|other_update| update.message_tick < other_update.message_tick); - self.0.insert(index, update); + .partition_point(|other_mutation| mutation.message_tick < other_mutation.message_tick); + self.0.insert(index, mutation); } } -/// Caches a partially-deserialized entity update message that is waiting for its tick to appear in an init message. +/// Partially-deserialized mutate message that is waiting for its tick to appear in an change message. /// -/// See also [`crate::server::replication_messages::UpdateMessage`]. -pub(super) struct BufferedUpdate { +/// See also [`crate::server::replication_messages`]. +pub(super) struct BufferedMutate { /// Required tick to wait for. - init_tick: RepliconTick, + change_tick: RepliconTick, - /// The tick this update corresponds to. + /// The tick this mutations corresponds to. message_tick: RepliconTick, - /// Update data. + /// Mutations data. message: Bytes, } diff --git a/src/client/events.rs b/src/client/events.rs index 359be3a0..ca98fcfc 100644 --- a/src/client/events.rs +++ b/src/client/events.rs @@ -1,4 +1,4 @@ -use super::{ClientPlugin, ClientSet, ServerInitTick}; +use super::{ClientPlugin, ClientSet, ServerChangeTick}; use crate::core::{ common_conditions::*, event_registry::{ @@ -85,7 +85,7 @@ impl ClientEventsPlugin { world.resource_scope(|world, registry: Mut| { world.resource_scope(|world, entity_map: Mut| { world.resource_scope(|world, event_registry: Mut| { - let init_tick = **world.resource::(); + let change_tick = **world.resource::(); let mut ctx = ClientReceiveCtx { registry: ®istry.read(), entity_map: &entity_map, @@ -112,7 +112,7 @@ impl ClientEventsPlugin { events.into_inner(), queue.into_inner(), &mut client, - init_tick, + change_tick, ) }; } diff --git a/src/core/channels.rs b/src/core/channels.rs index 96c47d74..7a93d72a 100644 --- a/src/core/channels.rs +++ b/src/core/channels.rs @@ -10,18 +10,18 @@ pub enum ReplicationChannel { /// For sending messages with entity mappings, inserts, removals and despawns. /// /// This is an ordered reliable channel. - Init, - /// For sending messages with component updates. + Changes, + /// For sending messages with component mutations. /// /// This is an unreliable channel. - Update, + Mutations, } impl From for RepliconChannel { fn from(value: ReplicationChannel) -> Self { match value { - ReplicationChannel::Init => ChannelKind::Ordered.into(), - ReplicationChannel::Update => ChannelKind::Unreliable.into(), + ReplicationChannel::Changes => ChannelKind::Ordered.into(), + ReplicationChannel::Mutations => ChannelKind::Unreliable.into(), } } } @@ -53,12 +53,12 @@ impl Default for RepliconChannels { fn default() -> Self { Self { server: vec![ - ReplicationChannel::Init.into(), - ReplicationChannel::Update.into(), + ReplicationChannel::Changes.into(), + ReplicationChannel::Mutations.into(), ], client: vec![ - ReplicationChannel::Init.into(), - ReplicationChannel::Update.into(), + ReplicationChannel::Changes.into(), + ReplicationChannel::Mutations.into(), ], default_max_bytes: 5 * 1024 * 1024, } diff --git a/src/core/event_registry/server_event.rs b/src/core/event_registry/server_event.rs index d95eb33d..d47774d8 100644 --- a/src/core/event_registry/server_event.rs +++ b/src/core/event_registry/server_event.rs @@ -340,9 +340,9 @@ impl ServerEvent { events: PtrMut, queue: PtrMut, client: &mut RepliconClient, - init_tick: RepliconTick, + change_tick: RepliconTick, ) { - (self.receive)(self, ctx, events, queue, client, init_tick); + (self.receive)(self, ctx, events, queue, client, change_tick); } /// Drains events [`ToClients`] and re-emits them as `E` if the server is in the list of the event recipients. @@ -494,12 +494,12 @@ unsafe fn receive( events: PtrMut, queue: PtrMut, client: &mut RepliconClient, - init_tick: RepliconTick, + change_tick: RepliconTick, ) { let events: &mut Events = events.deref_mut(); let queue: &mut ServerEventQueue = queue.deref_mut(); - while let Some((tick, message)) = queue.pop_if_le(init_tick) { + while let Some((tick, message)) = queue.pop_if_le(change_tick) { let mut cursor = Cursor::new(&*message); match event_data.deserialize(ctx, &mut cursor) { Ok(event) => { @@ -529,7 +529,7 @@ unsafe fn receive( continue; } }; - if tick > init_tick { + if tick > change_tick { trace!("queuing event `{}` with `{tick:?}`", any::type_name::()); queue.insert(tick, message.slice(cursor.position() as usize..)); continue; @@ -699,19 +699,19 @@ enum SerializedMessage { } impl SerializedMessage { - /// Optimized to avoid reallocations when clients have the same init tick as other clients receiving the + /// Optimized to avoid reallocations when clients have the same change tick as other clients receiving the /// same message. - fn get_bytes(&mut self, init_tick: RepliconTick) -> bincode::Result { + fn get_bytes(&mut self, change_tick: RepliconTick) -> bincode::Result { match self { // Resolve the raw value into a message with serialized tick. Self::Raw(raw) => { let mut bytes = std::mem::take(raw); - let tick_size = DefaultOptions::new().serialized_size(&init_tick)? as usize; + let tick_size = DefaultOptions::new().serialized_size(&change_tick)? as usize; let padding = RepliconTick::MAX_SERIALIZED_SIZE - tick_size; - DefaultOptions::new().serialize_into(&mut bytes[padding..], &init_tick)?; + DefaultOptions::new().serialize_into(&mut bytes[padding..], &change_tick)?; let bytes = Bytes::from(bytes).slice(padding..); *self = Self::Resolved { - tick: init_tick, + tick: change_tick, tick_size, bytes: bytes.clone(), }; @@ -723,13 +723,13 @@ impl SerializedMessage { tick_size, bytes, } => { - if *tick == init_tick { + if *tick == change_tick { return Ok(bytes.clone()); } - let new_tick_size = DefaultOptions::new().serialized_size(&init_tick)? as usize; + let new_tick_size = DefaultOptions::new().serialized_size(&change_tick)? as usize; let mut new_bytes = Vec::with_capacity(new_tick_size + bytes.len() - *tick_size); - DefaultOptions::new().serialize_into(&mut new_bytes, &init_tick)?; + DefaultOptions::new().serialize_into(&mut new_bytes, &change_tick)?; new_bytes.extend_from_slice(&bytes[*tick_size..]); Ok(new_bytes.into()) } @@ -749,7 +749,7 @@ impl BufferedServerEvent { server: &mut RepliconServer, client: &ReplicatedClient, ) -> bincode::Result<()> { - let message = self.message.get_bytes(client.init_tick())?; + let message = self.message.get_bytes(client.change_tick())?; server.send(client.id(), self.channel, message); Ok(()) } @@ -769,10 +769,10 @@ impl BufferedServerEventSet { } } -/// Caches synchronization-dependent server events until they can be sent with an accurate init tick. +/// Caches synchronization-dependent server events until they can be sent with an accurate change tick. /// /// This exists because replication does not scan the world every tick. If a server event is sent in the same -/// tick as a spawn and the event references that spawn, then the server event's init tick needs to be synchronized +/// tick as a spawn and the event references that spawn, then the server event's change tick needs to be synchronized /// with that spawn on the client. We buffer the event until the spawn can be detected. #[derive(Resource, Default)] pub(crate) struct BufferedServerEvents { @@ -890,9 +890,9 @@ struct ServerEventQueue { impl ServerEventQueue { /// Pops the next event that is at least as old as the specified replicon tick. - fn pop_if_le(&mut self, init_tick: RepliconTick) -> Option<(RepliconTick, Bytes)> { + fn pop_if_le(&mut self, change_tick: RepliconTick) -> Option<(RepliconTick, Bytes)> { let (tick, _) = self.list.front()?; - if *tick > init_tick { + if *tick > change_tick { return None; } self.list diff --git a/src/core/replication.rs b/src/core/replication.rs index 33b30885..a657f8dd 100644 --- a/src/core/replication.rs +++ b/src/core/replication.rs @@ -1,3 +1,4 @@ +pub mod change_message_flags; pub mod command_markers; pub mod deferred_entity; pub mod replicated_clients; diff --git a/src/core/replication/change_message_flags.rs b/src/core/replication/change_message_flags.rs new file mode 100644 index 00000000..834bda3a --- /dev/null +++ b/src/core/replication/change_message_flags.rs @@ -0,0 +1,48 @@ +use bitflags::bitflags; + +bitflags! { + /// Types of data included in the change message if the bit is set. + /// + /// Serialized at the beginning of the message. + #[derive(Default, Clone, Copy, PartialEq, Eq, Debug)] + pub(crate) struct ChangeMessageFlags: u8 { + const MAPPINGS = 0b00000001; + const DESPAWNS = 0b00000010; + const REMOVALS = 0b00000100; + const CHANGES = 0b00001000; + } +} + +impl ChangeMessageFlags { + /// Returns the last set flag in the message. + pub(crate) fn last(self) -> ChangeMessageFlags { + debug_assert!(!self.is_empty()); + let zeroes = u8::BITS - 1 - self.bits().leading_zeros(); + ChangeMessageFlags::from_bits_retain(1 << zeroes) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn last() { + assert_eq!( + ChangeMessageFlags::CHANGES.last(), + ChangeMessageFlags::CHANGES + ); + assert_eq!( + ChangeMessageFlags::MAPPINGS.last(), + ChangeMessageFlags::MAPPINGS + ); + assert_eq!( + ChangeMessageFlags::all().last(), + ChangeMessageFlags::CHANGES + ); + assert_eq!( + (ChangeMessageFlags::DESPAWNS | ChangeMessageFlags::REMOVALS).last(), + ChangeMessageFlags::REMOVALS + ); + } +} diff --git a/src/core/replication/command_markers.rs b/src/core/replication/command_markers.rs index 0869dd7f..ca17361b 100644 --- a/src/core/replication/command_markers.rs +++ b/src/core/replication/command_markers.rs @@ -231,11 +231,11 @@ pub struct MarkerConfig { /// By default set to `0`. pub priority: usize, - /// Represents whether a marker needs to process old updates. + /// Represents whether a marker needs to process old mutations. /// - /// Since updates use [`ChannelKind::Unreliable`](crate::core::channels::ChannelKind), - /// a client may receive an older update for an entity. By default these updates are discarded, - /// but some markers may need them. If this field is set to `true`, old component updates will + /// Since mutations use [`ChannelKind::Unreliable`](crate::core::channels::ChannelKind), + /// a client may receive an older mutation for an entity component. By default these mutations are discarded, + /// but some markers may need them. If this field is set to `true`, old component mutations will /// be passed to the writing function for this marker. /// /// By default set to `false`. diff --git a/src/core/replication/replicated_clients.rs b/src/core/replication/replicated_clients.rs index f15d4c49..615d01c7 100644 --- a/src/core/replication/replicated_clients.rs +++ b/src/core/replication/replicated_clients.rs @@ -172,7 +172,7 @@ pub struct ReplicatedClient { id: ClientId, /// Lowest tick for use in change detection for each entity. - change_ticks: EntityHashMap, + mutation_ticks: EntityHashMap, /// Entity visibility settings. visibility: ClientVisibility, @@ -180,28 +180,28 @@ pub struct ReplicatedClient { /// The last tick in which a replicated entity had an insertion, removal, or gained/lost a component from the /// perspective of the client. /// - /// It should be included in update messages and server events to avoid needless waiting for the next init + /// It should be included in mutate messages and server events to avoid needless waiting for the next change /// message to arrive. - init_tick: RepliconTick, + change_tick: RepliconTick, - /// Update message indexes mapped to their info. - updates: HashMap, + /// Mutate message indices mapped to their info. + mutations: HashMap, - /// Index for the next update message to be sent to this client. + /// Index for the next mutate message to be sent to this client. /// - /// See also [`Self::register_update`]. - next_update_index: u16, + /// See also [`Self::register_mutate_message`]. + next_mutate_index: u16, } impl ReplicatedClient { fn new(id: ClientId, policy: VisibilityPolicy) -> Self { Self { id, - change_ticks: Default::default(), + mutation_ticks: Default::default(), visibility: ClientVisibility::new(policy), - init_tick: Default::default(), - updates: Default::default(), - next_update_index: Default::default(), + change_tick: Default::default(), + mutations: Default::default(), + next_mutate_index: Default::default(), } } @@ -220,24 +220,24 @@ impl ReplicatedClient { &mut self.visibility } - /// Sets the client's init tick. - pub(crate) fn set_init_tick(&mut self, tick: RepliconTick) { - self.init_tick = tick; + /// Sets the client's change tick. + pub(crate) fn set_change_tick(&mut self, tick: RepliconTick) { + self.change_tick = tick; } /// Returns the last tick in which a replicated entity had an insertion, removal, or gained/lost a component from the /// perspective of the client. - pub fn init_tick(&self) -> RepliconTick { - self.init_tick + pub fn change_tick(&self) -> RepliconTick { + self.change_tick } - /// Clears all entities for unacknowledged updates, returning them as an iterator. + /// Clears all entities for unacknowledged mutate messages, returning them as an iterator. /// /// Keeps the allocated memory for reuse. fn drain_entities(&mut self) -> impl Iterator> + '_ { - self.updates + self.mutations .drain() - .map(|(_, update_info)| update_info.entities) + .map(|(_, mutate_info)| mutate_info.entities) } /// Resets all data. @@ -246,98 +246,98 @@ impl ReplicatedClient { fn reset(&mut self, id: ClientId) { self.id = id; self.visibility.clear(); - self.change_ticks.clear(); - self.updates.clear(); - self.next_update_index = 0; + self.mutation_ticks.clear(); + self.mutations.clear(); + self.next_mutate_index = 0; } - /// Registers update at specified `tick` and `timestamp` and returns its index with entities to fill. + /// Registers mutate message at specified `tick` and `timestamp` and returns its index with entities to fill. /// /// Used later to acknowledge updated entities. #[must_use] - pub(crate) fn register_update( + pub(crate) fn register_mutate_message( &mut self, client_buffers: &mut ClientBuffers, tick: Tick, timestamp: Duration, ) -> (u16, &mut Vec) { - let update_index = self.next_update_index; - self.next_update_index = self.next_update_index.overflowing_add(1).0; + let mutate_index = self.next_mutate_index; + self.next_mutate_index = self.next_mutate_index.overflowing_add(1).0; let mut entities = client_buffers.entities.pop().unwrap_or_default(); entities.clear(); - let update_info = UpdateInfo { + let mutate_info = MutateInfo { tick, timestamp, entities, }; - let update_info = self - .updates - .entry(update_index) - .insert(update_info) + let mutate_info = self + .mutations + .entry(mutate_index) + .insert(mutate_info) .into_mut(); - (update_index, &mut update_info.entities) + (mutate_index, &mut mutate_info.entities) } /// Sets the change tick for an entity that is replicated to this client. /// /// The change tick is the reference point for determining if components on an entity have changed and /// need to be replicated. Component changes older than the change limit are assumed to be acked by the client. - pub(crate) fn set_change_tick(&mut self, entity: Entity, tick: Tick) { - self.change_ticks.insert(entity, tick); + pub(crate) fn set_mutation_tick(&mut self, entity: Entity, tick: Tick) { + self.mutation_ticks.insert(entity, tick); } /// Gets the change tick for an entity that is replicated to this client. - pub fn get_change_tick(&self, entity: Entity) -> Option { - self.change_ticks.get(&entity).copied() + pub fn mutation_tick(&self, entity: Entity) -> Option { + self.mutation_ticks.get(&entity).copied() } - /// Marks update with the specified index as acknowledged. + /// Marks mutate message as acknowledged by its index. /// - /// Change limits for all entities from this update will be set to the update's tick if it's higher. + /// Change tick for all entities from this mutate message will be set to the message tick if it's higher. /// /// Keeps allocated memory in the buffers for reuse. - pub(crate) fn acknowledge( + pub(crate) fn ack_mutate_message( &mut self, client_buffers: &mut ClientBuffers, tick: Tick, - update_index: u16, + mutate_index: u16, ) { - let Some(update_info) = self.updates.remove(&update_index) else { + let Some(mutate_info) = self.mutations.remove(&mutate_index) else { debug!( - "received unknown update index {update_index} from {:?}", + "received unknown mutate index {mutate_index} from {:?}", self.id ); return; }; - for entity in &update_info.entities { - let Some(last_tick) = self.change_ticks.get_mut(entity) else { + for entity in &mutate_info.entities { + let Some(last_tick) = self.mutation_ticks.get_mut(entity) else { // We ignore missing entities, since they were probably despawned. continue; }; // Received tick could be outdated because we bump it // if we detect any insertion on the entity in `collect_changes`. - if !last_tick.is_newer_than(update_info.tick, tick) { - *last_tick = update_info.tick; + if !last_tick.is_newer_than(mutate_info.tick, tick) { + *last_tick = mutate_info.tick; } } - client_buffers.entities.push(update_info.entities); + client_buffers.entities.push(mutate_info.entities); trace!( - "{:?} acknowledged an update with {:?}", + "{:?} acknowledged mutate message with {:?}", self.id, - update_info.tick, + mutate_info.tick, ); } /// Removes a despawned entity tracked by this client. pub fn remove_despawned(&mut self, entity: Entity) { - self.change_ticks.remove(&entity); + self.mutation_ticks.remove(&entity); self.visibility.remove_despawned(entity); - // We don't clean up `self.updates` for efficiency reasons. + // We don't clean up `self.mutations` for efficiency reasons. // `Self::acknowledge()` will properly ignore despawned entities. } @@ -346,23 +346,23 @@ impl ReplicatedClient { /// Internal cleanup happens lazily during the iteration. pub(crate) fn drain_lost_visibility(&mut self) -> impl Iterator + '_ { self.visibility.drain_lost_visibility().inspect(|entity| { - self.change_ticks.remove(entity); + self.mutation_ticks.remove(entity); }) } - /// Removes all updates older then `min_timestamp`. + /// Removes all mutate messages older then `min_timestamp`. /// /// Keeps allocated memory in the buffers for reuse. - pub(crate) fn remove_older_updates( + pub(crate) fn cleanup_older_mutations( &mut self, client_buffers: &mut ClientBuffers, min_timestamp: Duration, ) { - self.updates.retain(|_, update_info| { - if update_info.timestamp < min_timestamp { + self.mutations.retain(|_, mutate_info| { + if mutate_info.timestamp < min_timestamp { client_buffers .entities - .push(mem::take(&mut update_info.entities)); + .push(mem::take(&mut mutate_info.entities)); false } else { true @@ -379,13 +379,13 @@ pub(crate) struct ClientBuffers { /// Stored to reuse allocated memory. clients: Vec, - /// [`Vec`]'s from acknowledged update indexes from [`ReplicatedClient`]. + /// [`Vec`]'s from acknowledged [`MutateInfo`]'s. /// /// Stored to reuse allocated capacity. entities: Vec>, } -struct UpdateInfo { +struct MutateInfo { tick: Tick, timestamp: Duration, entities: Vec, diff --git a/src/core/replication/replicated_clients/client_visibility.rs b/src/core/replication/replicated_clients/client_visibility.rs index 7242e73e..f8632fc0 100644 --- a/src/core/replication/replicated_clients/client_visibility.rs +++ b/src/core/replication/replicated_clients/client_visibility.rs @@ -9,11 +9,6 @@ use super::VisibilityPolicy; /// Entity visibility settings for a client. pub struct ClientVisibility { filter: VisibilityFilter, - - /// Visibility for a specific entity that has been cached for re-referencing. - /// - /// Used as an optimization by server replication. - cached_visibility: Visibility, } impl ClientVisibility { @@ -36,10 +31,7 @@ impl ClientVisibility { /// Creates a new instance with a specific filter. fn with_filter(filter: VisibilityFilter) -> Self { - Self { - filter, - cached_visibility: Default::default(), - } + Self { filter } } /// Resets the filter state to as it was after [`Self::new`]. @@ -177,7 +169,7 @@ impl ClientVisibility { // For blacklisting an entity we don't remove the entity right away. // Instead we mark it as queued for removal and remove it // later in `Self::update`. This allows us to avoid accessing - // the blacklist's `removed` field in `Self::get_visibility_state`. + // the blacklist's `removed` field in `Self::visibility_state`. entry.insert(BlacklistInfo::QueuedForRemoval); removed.insert(entity); } else { @@ -200,7 +192,7 @@ impl ClientVisibility { // Instead we mark it as `WhitelistInfo::JustAdded` and then set it to // 'WhitelistInfo::Visible' in `Self::update`. // This allows us to avoid accessing the whitelist's `added` field in - // `Self::get_visibility_state`. + // `Self::visibility_state`. if *list.entry(entity).or_insert(WhitelistInfo::JustAdded) == WhitelistInfo::JustAdded { @@ -227,26 +219,14 @@ impl ClientVisibility { /// Checks if a specific entity is visible. pub fn is_visible(&self, entity: Entity) -> bool { - match self.get_visibility_state(entity) { + match self.visibility_state(entity) { Visibility::Hidden => false, Visibility::Gained | Visibility::Visible => true, } } - /// Caches visibility for a specific entity. - /// - /// Can be obtained later from [`Self::cached_visibility`]. - pub(crate) fn cache_visibility(&mut self, entity: Entity) { - self.cached_visibility = self.get_visibility_state(entity); - } - - /// Returns visibility cached by the last call of [`Self::cache_visibility`]. - pub(crate) fn cached_visibility(&self) -> Visibility { - self.cached_visibility - } - /// Returns visibility of a specific entity. - fn get_visibility_state(&self, entity: Entity) -> Visibility { + pub(crate) fn visibility_state(&self, entity: Entity) -> Visibility { match &self.filter { VisibilityFilter::All => Visibility::Visible, VisibilityFilter::Blacklist { list, .. } => match list.get(&entity) { diff --git a/src/core/replication/replication_registry/component_fns.rs b/src/core/replication/replication_registry/component_fns.rs index c22b6781..a43f67ee 100644 --- a/src/core/replication/replication_registry/component_fns.rs +++ b/src/core/replication/replication_registry/component_fns.rs @@ -88,9 +88,9 @@ impl ComponentFns { ctx: &SerializeCtx, rule_fns: &UntypedRuleFns, ptr: Ptr, - cursor: &mut Cursor>, + message: &mut Vec, ) -> bincode::Result<()> { - (self.serialize)(ctx, rule_fns, ptr, cursor) + (self.serialize)(ctx, rule_fns, ptr, message) } /// Calls the assigned writing function based on entity markers. @@ -174,7 +174,7 @@ impl ComponentFns { /// Signature of component serialization functions that restore the original type. type UntypedSerializeFn = - unsafe fn(&SerializeCtx, &UntypedRuleFns, Ptr, &mut Cursor>) -> bincode::Result<()>; + unsafe fn(&SerializeCtx, &UntypedRuleFns, Ptr, &mut Vec) -> bincode::Result<()>; /// Signature of component writing functions that restore the original type. type UntypedWriteFn = unsafe fn( @@ -198,10 +198,10 @@ unsafe fn untyped_serialize( ctx: &SerializeCtx, rule_fns: &UntypedRuleFns, ptr: Ptr, - cursor: &mut Cursor>, + message: &mut Vec, ) -> bincode::Result<()> { let rule_fns = rule_fns.typed::(); - rule_fns.serialize(ctx, ptr.deref::(), cursor) + rule_fns.serialize(ctx, ptr.deref::(), message) } /// Resolves `rule_fns` to `C` and calls [`UntypedCommandFns::write`] for `C`. diff --git a/src/core/replication/replication_registry/rule_fns.rs b/src/core/replication/replication_registry/rule_fns.rs index 4ca144b0..92805199 100644 --- a/src/core/replication/replication_registry/rule_fns.rs +++ b/src/core/replication/replication_registry/rule_fns.rs @@ -125,9 +125,9 @@ impl RuleFns { &self, ctx: &SerializeCtx, component: &C, - cursor: &mut Cursor>, + message: &mut Vec, ) -> bincode::Result<()> { - (self.serialize)(ctx, component, cursor) + (self.serialize)(ctx, component, message) } /// Deserializes a component from a cursor. @@ -187,7 +187,7 @@ impl Default for RuleFns { } /// Signature of component serialization functions. -pub type SerializeFn = fn(&SerializeCtx, &C, &mut Cursor>) -> bincode::Result<()>; +pub type SerializeFn = fn(&SerializeCtx, &C, &mut Vec) -> bincode::Result<()>; /// Signature of component deserialization functions. pub type DeserializeFn = fn(&mut WriteCtx, &mut Cursor<&[u8]>) -> bincode::Result; @@ -204,7 +204,7 @@ pub type ConsumeFn = pub fn default_serialize( _ctx: &SerializeCtx, component: &C, - cursor: &mut Cursor>, + cursor: &mut Vec, ) -> bincode::Result<()> { DefaultOptions::new().serialize_into(cursor, component) } diff --git a/src/core/replication/replication_registry/test_fns.rs b/src/core/replication/replication_registry/test_fns.rs index 5592b889..791984f6 100644 --- a/src/core/replication/replication_registry/test_fns.rs +++ b/src/core/replication/replication_registry/test_fns.rs @@ -92,7 +92,7 @@ impl TestFnsEntityExt for EntityWorldMut<'_> { fn serialize(&mut self, fns_id: FnsId, server_tick: RepliconTick) -> Vec { let registry = self.world().resource::(); let (component_id, component_fns, rule_fns) = registry.get(fns_id); - let mut cursor = Cursor::default(); + let mut message = Vec::new(); let ctx = SerializeCtx { server_tick, component_id, @@ -107,11 +107,11 @@ impl TestFnsEntityExt for EntityWorldMut<'_> { unsafe { component_fns - .serialize(&ctx, rule_fns, ptr, &mut cursor) + .serialize(&ctx, rule_fns, ptr, &mut message) .expect("serialization into memory should never fail"); } - cursor.into_inner() + message } fn apply_write(&mut self, data: &[u8], fns_id: FnsId, message_tick: RepliconTick) -> &mut Self { diff --git a/src/core/replication/replication_rules.rs b/src/core/replication/replication_rules.rs index 5689437f..0f5d2e74 100644 --- a/src/core/replication/replication_rules.rs +++ b/src/core/replication/replication_rules.rs @@ -98,9 +98,9 @@ pub trait AppRuleExt { fn serialize_translation( _ctx: &SerializeCtx, transform: &Transform, - cursor: &mut Cursor>, + message: &mut Vec, ) -> bincode::Result<()> { - bincode::serialize_into(cursor, &transform.translation) + bincode::serialize_into(message, &transform.translation) } /// Deserializes `translation` and creates [`Transform`] from it. @@ -317,7 +317,7 @@ impl GroupReplication for PlayerBundle { } } -# fn serialize_translation(_: &SerializeCtx, _: &Transform, _: &mut Cursor>) -> bincode::Result<()> { unimplemented!() } +# fn serialize_translation(_: &SerializeCtx, _: &Transform, _: &mut Vec) -> bincode::Result<()> { unimplemented!() } # fn deserialize_translation(_: &mut WriteCtx, _: &mut Cursor<&[u8]>) -> bincode::Result { unimplemented!() } ``` **/ diff --git a/src/core/replicon_tick.rs b/src/core/replicon_tick.rs index 86a5524e..446e2612 100644 --- a/src/core/replicon_tick.rs +++ b/src/core/replicon_tick.rs @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize}; /// /// All operations on it are wrapping. /// -/// See also [`ServerInitTick`](crate::client::ServerInitTick) and +/// See also [`ServerChangeTick`](crate::client::ServerChangeTick) and /// [`ServerTick`](crate::server::server_tick::ServerTick). #[derive(Clone, Copy, Debug, Default, Deserialize, Eq, Hash, PartialEq, Serialize)] pub struct RepliconTick(u32); diff --git a/src/lib.rs b/src/lib.rs index 1101dbb6..e2ee2bd3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -583,27 +583,18 @@ For a higher level API consider using [`bevy_replicon_attributes`](https://docs. All events, inserts, removals and despawns will be applied to clients in the same order as on the server. -Entity component updates are grouped by entity, and component groupings may be applied to clients in a different order than on the server. -For example, if two entities are spawned in tick 1 on the server and their components are updated in tick 2, -then the client is guaranteed to see the spawns at the same time, but the component updates may appear in different client ticks. +Entity component mutations are grouped by entity, and component groupings may be applied to clients in a different order than on the server. +For example, if two entities are spawned in tick 1 on the server and their components are mutated in tick 2, +then the client is guaranteed to see the spawns at the same time, but the component mutations may appear in different client ticks. -If a component is dependent on other data, updates to the component will only be applied to the client when that data has arrived. -So if your component references another entity, updates to that component will only be applied when the referenced entity has been spawned on the client. +If a component is dependent on other data, mutations to the component will only be applied to the client when that data has arrived. +So if your component references another entity, mutations to that component will only be applied when the referenced entity has been spawned on the client. -Updates for despawned entities will be discarded automatically, but events or components may reference despawned entities and should be handled with that in mind. +Mutations for despawned entities will be discarded automatically, but events or components may reference despawned entities and should be handled with that in mind. Clients should never assume their world state is the same as the server's on any given tick value-wise. World state on the client is only "eventually consistent" with the server's. -# Limits - -To reduce packet size there are the following limits per replication update: - -- Up to [`u16::MAX`] entities that have added components with up to [`u16::MAX`] bytes of component data. -- Up to [`u16::MAX`] entities that have changed components with up to [`u16::MAX`] bytes of component data. -- Up to [`u16::MAX`] entities that have removed components with up to [`u16::MAX`] bytes of component data. -- Up to [`u16::MAX`] entities that were despawned. - # Troubleshooting If you face any issue, try to enable logging to see what is going on. diff --git a/src/server.rs b/src/server.rs index 0495671f..b94b9997 100644 --- a/src/server.rs +++ b/src/server.rs @@ -6,7 +6,7 @@ pub(super) mod replicated_archetypes; pub(super) mod replication_messages; pub mod server_tick; -use std::{io::Cursor, mem, time::Duration}; +use std::{io::Cursor, mem, ops::Range, time::Duration}; use bevy::{ ecs::{ @@ -29,7 +29,10 @@ use crate::core::{ replicated_clients::{ client_visibility::Visibility, ClientBuffers, ReplicatedClients, VisibilityPolicy, }, - replication_registry::{ctx::SerializeCtx, ReplicationRegistry}, + replication_registry::{ + component_fns::ComponentFns, ctx::SerializeCtx, rule_fns::UntypedRuleFns, + ReplicationRegistry, + }, replication_rules::ReplicationRules, }, replicon_server::RepliconServer, @@ -39,8 +42,8 @@ use crate::core::{ use client_entity_map::ClientEntityMap; use despawn_buffer::{DespawnBuffer, DespawnBufferPlugin}; use removal_buffer::{RemovalBuffer, RemovalBufferPlugin}; -use replicated_archetypes::ReplicatedArchetypes; -use replication_messages::ReplicationMessages; +use replicated_archetypes::{ReplicatedArchetypes, ReplicatedComponent}; +use replication_messages::{serialized_data::SerializedData, ReplicationMessages}; use server_tick::ServerTick; pub struct ServerPlugin { @@ -50,10 +53,10 @@ pub struct ServerPlugin { /// Visibility configuration. pub visibility_policy: VisibilityPolicy, - /// The time after which updates will be considered lost if an acknowledgment is not received for them. + /// The time after which mutations will be considered lost if an acknowledgment is not received for them. /// - /// In practice updates will live at least `update_timeout`, and at most `2*update_timeout`. - pub update_timeout: Duration, + /// In practice mutations will live at least `mutations_timeout`, and at most `2*mutations_timeout`. + pub mutations_timeout: Duration, /// If enabled, replication will be started automatically after connection. /// @@ -70,7 +73,7 @@ impl Default for ServerPlugin { Self { tick_policy: TickPolicy::MaxTickRate(30), visibility_policy: Default::default(), - update_timeout: Duration::from_secs(10), + mutations_timeout: Duration::from_secs(10), replicate_after_connect: true, } } @@ -119,7 +122,8 @@ impl Plugin for ServerPlugin { Self::handle_connections, Self::enable_replication, Self::receive_acks, - Self::cleanup_acks(self.update_timeout).run_if(on_timer(self.update_timeout)), + Self::cleanup_acks(self.mutations_timeout) + .run_if(on_timer(self.mutations_timeout)), ) .chain() .in_set(ServerSet::Receive) @@ -212,14 +216,14 @@ impl ServerPlugin { } fn cleanup_acks( - update_timeout: Duration, + mutations_timeout: Duration, ) -> impl FnMut(ResMut, ResMut, Res