From 7816d6fcd191b9fa9a2c6735ea7d3b9f86faf59b Mon Sep 17 00:00:00 2001 From: jprochazk Date: Wed, 8 Jan 2025 21:01:59 +0100 Subject: [PATCH 01/25] Add `LogMsg` proto definition --- .../re_protos/proto/rerun/v0/log_msg.proto | 13 ++++++++ .../re_protos/src/v0/rerun.log_msg.v0.rs | 30 +++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/crates/store/re_protos/proto/rerun/v0/log_msg.proto b/crates/store/re_protos/proto/rerun/v0/log_msg.proto index b35abec50a58..cc72205668b1 100644 --- a/crates/store/re_protos/proto/rerun/v0/log_msg.proto +++ b/crates/store/re_protos/proto/rerun/v0/log_msg.proto @@ -4,6 +4,19 @@ package rerun.log_msg.v0; import "rerun/v0/common.proto"; +message LogMsg { + oneof msg { + // A message that contains a new store info. + SetStoreInfo set_store_info = 1; + + // A message that contains an Arrow-IPC encoded message. + ArrowMsg arrow_msg = 2; + + // A message that contains a blueprint activation command. + BlueprintActivationCommand blueprint_activation_command = 3; + } +} + // Corresponds to `LogMsg::SetStoreInfo`. Used to identify a recording. message SetStoreInfo { // A time-based UID that is used to determine how a `StoreInfo` fits in the global ordering of events. diff --git a/crates/store/re_protos/src/v0/rerun.log_msg.v0.rs b/crates/store/re_protos/src/v0/rerun.log_msg.v0.rs index 3b415876fbc0..19744af04354 100644 --- a/crates/store/re_protos/src/v0/rerun.log_msg.v0.rs +++ b/crates/store/re_protos/src/v0/rerun.log_msg.v0.rs @@ -1,4 +1,34 @@ // This file is @generated by prost-build. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct LogMsg { + #[prost(oneof = "log_msg::Msg", tags = "1, 2, 3")] + pub msg: ::core::option::Option, +} +/// Nested message and enum types in `LogMsg`. +pub mod log_msg { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Msg { + /// A message that contains a new store info. + #[prost(message, tag = "1")] + SetStoreInfo(super::SetStoreInfo), + /// A message that contains an Arrow-IPC encoded message. + #[prost(message, tag = "2")] + ArrowMsg(super::ArrowMsg), + /// A message that contains a blueprint activation command. + #[prost(message, tag = "3")] + BlueprintActivationCommand(super::BlueprintActivationCommand), + } +} +impl ::prost::Name for LogMsg { + const NAME: &'static str = "LogMsg"; + const PACKAGE: &'static str = "rerun.log_msg.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.log_msg.v0.LogMsg".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.log_msg.v0.LogMsg".into() + } +} /// Corresponds to `LogMsg::SetStoreInfo`. Used to identify a recording. #[derive(Clone, PartialEq, ::prost::Message)] pub struct SetStoreInfo { From 791dfb4ccd27e46b672220d8cf21589066c67b54 Mon Sep 17 00:00:00 2001 From: jprochazk Date: Wed, 8 Jan 2025 21:02:47 +0100 Subject: [PATCH 02/25] Add `MessageProxy` service definition --- .../src/bin/build_re_remote_store_types.rs | 15 +- .../re_protos/proto/rerun/v0/sdk_comms.proto | 22 ++ crates/store/re_protos/src/lib.rs | 9 + .../re_protos/src/v0/rerun.sdk_comms.v0.rs | 370 ++++++++++++++++++ 4 files changed, 414 insertions(+), 2 deletions(-) create mode 100644 crates/store/re_protos/proto/rerun/v0/sdk_comms.proto create mode 100644 crates/store/re_protos/src/v0/rerun.sdk_comms.v0.rs diff --git a/crates/build/re_protos_builder/src/bin/build_re_remote_store_types.rs b/crates/build/re_protos_builder/src/bin/build_re_remote_store_types.rs index 9ed06af18e62..d31d0e04c532 100644 --- a/crates/build/re_protos_builder/src/bin/build_re_remote_store_types.rs +++ b/crates/build/re_protos_builder/src/bin/build_re_remote_store_types.rs @@ -8,7 +8,7 @@ use camino::Utf8Path; const PROTOS_DIR: &str = "crates/store/re_protos/proto"; -const INPUT_V0: &[&str] = &["rerun/v0/remote_store.proto", "rerun/v0/log_msg.proto"]; +const INPUT_V0_DIR: &str = "rerun/v0"; const OUTPUT_V0_RUST_DIR: &str = "crates/store/re_protos/src/v0"; fn main() { @@ -28,16 +28,27 @@ fn main() { let definitions_dir_path = workspace_dir.join(PROTOS_DIR); let rust_generated_output_dir_path = workspace_dir.join(OUTPUT_V0_RUST_DIR); + let proto_paths = std::fs::read_dir(definitions_dir_path.join(INPUT_V0_DIR)) + .unwrap() + .map(|v| { + v.unwrap() + .path() + .strip_prefix(&definitions_dir_path) + .unwrap() + .to_owned() + }) + .collect::>(); re_log::info!( definitions=?definitions_dir_path, output=?rust_generated_output_dir_path, + protos=?proto_paths, "Running codegen for storage node types", ); re_protos_builder::generate_rust_code( definitions_dir_path, - INPUT_V0, + &proto_paths, rust_generated_output_dir_path, ); } diff --git a/crates/store/re_protos/proto/rerun/v0/sdk_comms.proto b/crates/store/re_protos/proto/rerun/v0/sdk_comms.proto new file mode 100644 index 000000000000..6edc3148dec9 --- /dev/null +++ b/crates/store/re_protos/proto/rerun/v0/sdk_comms.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +package rerun.sdk_comms.v0; + +import "rerun/v0/log_msg.proto"; +import "rerun/v0/common.proto"; + +// Simple buffer for messages between SDKs and viewers. +// +// - SDKs produce messages by calling `WriteMessages` +// - Viewers consume messages by calling `ReadMessages` +// +// The buffer is bounded by a memory limit, and will drop the oldest messages when the limit is reached. +// +// Whenever `ReadMessages` is called, all buffered messages are sent in the order they were received. +// The stream will then also yield any new messages passed to `WriteMessages` from any client. +service MessageProxy { + rpc WriteMessages(stream rerun.log_msg.v0.LogMsg) returns (Empty) {} + rpc ReadMessages(Empty) returns (stream rerun.log_msg.v0.LogMsg) {} +} + +message Empty {} diff --git a/crates/store/re_protos/src/lib.rs b/crates/store/re_protos/src/lib.rs index 545198db999b..ed9daa35efbf 100644 --- a/crates/store/re_protos/src/lib.rs +++ b/crates/store/re_protos/src/lib.rs @@ -29,6 +29,9 @@ mod v0 { #[path = "./rerun.remote_store.v0.rs"] pub mod rerun_remote_store_v0; + + #[path = "./rerun.sdk_comms.v0.rs"] + pub mod rerun_sdk_comms_v0; } pub mod common { @@ -50,6 +53,12 @@ pub mod remote_store { } } +pub mod sdk_comms { + pub mod v0 { + pub use crate::v0::rerun_sdk_comms_v0::*; + } +} + #[derive(Debug, thiserror::Error)] pub enum TypeConversionError { #[error("missing required field: {package_name}.{type_name}.{field_name}")] diff --git a/crates/store/re_protos/src/v0/rerun.sdk_comms.v0.rs b/crates/store/re_protos/src/v0/rerun.sdk_comms.v0.rs new file mode 100644 index 000000000000..aa77fa3666e9 --- /dev/null +++ b/crates/store/re_protos/src/v0/rerun.sdk_comms.v0.rs @@ -0,0 +1,370 @@ +// This file is @generated by prost-build. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct Empty {} +impl ::prost::Name for Empty { + const NAME: &'static str = "Empty"; + const PACKAGE: &'static str = "rerun.sdk_comms.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.sdk_comms.v0.Empty".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.sdk_comms.v0.Empty".into() + } +} +/// Generated client implementations. +pub mod message_proxy_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value + )] + use tonic::codegen::http::Uri; + use tonic::codegen::*; + /// Simple buffer for messages between SDKs and viewers. + /// + /// - SDKs produce messages by calling `WriteMessages` + /// - Viewers consume messages by calling `ReadMessages` + /// + /// The buffer is bounded by a memory limit, and will drop the oldest messages when the limit is reached. + /// + /// Whenever `ReadMessages` is called, all buffered messages are sent in the order they were received. + /// The stream will then also yield any new messages passed to `WriteMessages` from any client. + #[derive(Debug, Clone)] + pub struct MessageProxyClient { + inner: tonic::client::Grpc, + } + impl MessageProxyClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> MessageProxyClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + >>::Error: + Into + std::marker::Send + std::marker::Sync, + { + MessageProxyClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn write_messages( + &mut self, + request: impl tonic::IntoStreamingRequest< + Message = super::super::super::log_msg::v0::LogMsg, + >, + ) -> std::result::Result, tonic::Status> { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/rerun.sdk_comms.v0.MessageProxy/WriteMessages", + ); + let mut req = request.into_streaming_request(); + req.extensions_mut().insert(GrpcMethod::new( + "rerun.sdk_comms.v0.MessageProxy", + "WriteMessages", + )); + self.inner.client_streaming(req, path, codec).await + } + pub async fn read_messages( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/rerun.sdk_comms.v0.MessageProxy/ReadMessages", + ); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new( + "rerun.sdk_comms.v0.MessageProxy", + "ReadMessages", + )); + self.inner.server_streaming(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod message_proxy_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with MessageProxyServer. + #[async_trait] + pub trait MessageProxy: std::marker::Send + std::marker::Sync + 'static { + async fn write_messages( + &self, + request: tonic::Request>, + ) -> std::result::Result, tonic::Status>; + /// Server streaming response type for the ReadMessages method. + type ReadMessagesStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + std::marker::Send + + 'static; + async fn read_messages( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + } + /// Simple buffer for messages between SDKs and viewers. + /// + /// - SDKs produce messages by calling `WriteMessages` + /// - Viewers consume messages by calling `ReadMessages` + /// + /// The buffer is bounded by a memory limit, and will drop the oldest messages when the limit is reached. + /// + /// Whenever `ReadMessages` is called, all buffered messages are sent in the order they were received. + /// The stream will then also yield any new messages passed to `WriteMessages` from any client. + #[derive(Debug)] + pub struct MessageProxyServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl MessageProxyServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for MessageProxyServer + where + T: MessageProxy, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/rerun.sdk_comms.v0.MessageProxy/WriteMessages" => { + #[allow(non_camel_case_types)] + struct WriteMessagesSvc(pub Arc); + impl + tonic::server::ClientStreamingService< + super::super::super::log_msg::v0::LogMsg, + > for WriteMessagesSvc + { + type Response = super::Empty; + type Future = BoxFuture, tonic::Status>; + fn call( + &mut self, + request: tonic::Request< + tonic::Streaming, + >, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::write_messages(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = WriteMessagesSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.client_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/rerun.sdk_comms.v0.MessageProxy/ReadMessages" => { + #[allow(non_camel_case_types)] + struct ReadMessagesSvc(pub Arc); + impl tonic::server::ServerStreamingService for ReadMessagesSvc { + type Response = super::super::super::log_msg::v0::LogMsg; + type ResponseStream = T::ReadMessagesStream; + type Future = + BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::read_messages(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ReadMessagesSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => Box::pin(async move { + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers.insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers.insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }), + } + } + } + impl Clone for MessageProxyServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "rerun.sdk_comms.v0.MessageProxy"; + impl tonic::server::NamedService for MessageProxyServer { + const NAME: &'static str = SERVICE_NAME; + } +} From ea2646eaaac9fde9d3e95708d330f44a8fa74677 Mon Sep 17 00:00:00 2001 From: jprochazk Date: Wed, 8 Jan 2025 21:03:35 +0100 Subject: [PATCH 03/25] Expose protubuf conversions in re_log_encoding --- crates/store/re_log_encoding/src/codec/mod.rs | 2 +- crates/store/re_log_encoding/src/lib.rs | 2 +- .../src/protobuf_conversions.rs | 103 ++++++++++++++++++ 3 files changed, 105 insertions(+), 2 deletions(-) diff --git a/crates/store/re_log_encoding/src/codec/mod.rs b/crates/store/re_log_encoding/src/codec/mod.rs index 7c51b4b690e0..647c2fc156e0 100644 --- a/crates/store/re_log_encoding/src/codec/mod.rs +++ b/crates/store/re_log_encoding/src/codec/mod.rs @@ -1,4 +1,4 @@ -mod arrow; +pub(crate) mod arrow; pub mod file; pub mod wire; diff --git a/crates/store/re_log_encoding/src/lib.rs b/crates/store/re_log_encoding/src/lib.rs index d35b8e331342..709b96fcdb4f 100644 --- a/crates/store/re_log_encoding/src/lib.rs +++ b/crates/store/re_log_encoding/src/lib.rs @@ -8,7 +8,7 @@ pub mod encoder; pub mod codec; -mod protobuf_conversions; +pub mod protobuf_conversions; #[cfg(feature = "encoder")] #[cfg(not(target_arch = "wasm32"))] diff --git a/crates/store/re_log_encoding/src/protobuf_conversions.rs b/crates/store/re_log_encoding/src/protobuf_conversions.rs index 9b613a1e3937..86562f3adcdc 100644 --- a/crates/store/re_log_encoding/src/protobuf_conversions.rs +++ b/crates/store/re_log_encoding/src/protobuf_conversions.rs @@ -1,3 +1,8 @@ +use re_protos::missing_field; + +use crate::codec::CodecError; +use crate::Compression; + impl From for crate::Compression { fn from(value: re_protos::log_msg::v0::Compression) -> Self { match value { @@ -15,3 +20,101 @@ impl From for re_protos::log_msg::v0::Compression { } } } + +#[cfg(feature = "decoder")] +pub fn log_msg_from_proto( + message: re_protos::log_msg::v0::LogMsg, +) -> Result { + use crate::codec::arrow::decode_arrow; + use crate::decoder::DecodeError; + use re_protos::log_msg::v0::{log_msg::Msg, Encoding}; + + match message.msg { + Some(Msg::SetStoreInfo(set_store_info)) => Ok(re_log_types::LogMsg::SetStoreInfo( + set_store_info.try_into()?, + )), + Some(Msg::ArrowMsg(arrow_msg)) => { + if arrow_msg.encoding() != Encoding::ArrowIpc { + return Err(DecodeError::Codec(CodecError::UnsupportedEncoding)); + } + + let (schema, chunk) = decode_arrow( + &arrow_msg.payload, + arrow_msg.uncompressed_size as usize, + arrow_msg.compression().into(), + )?; + + let store_id: re_log_types::StoreId = arrow_msg + .store_id + .ok_or_else(|| missing_field!(re_protos::log_msg::v0::ArrowMsg, "store_id"))? + .into(); + + let chunk = re_chunk::Chunk::from_transport(&re_chunk::TransportChunk { + schema, + data: chunk, + })?; + + Ok(re_log_types::LogMsg::ArrowMsg( + store_id, + chunk.to_arrow_msg()?, + )) + } + Some(Msg::BlueprintActivationCommand(blueprint_activation_command)) => { + Ok(re_log_types::LogMsg::BlueprintActivationCommand( + blueprint_activation_command.try_into()?, + )) + } + None => Err(missing_field!(re_protos::log_msg::v0::LogMsg, "msg").into()), + } +} + +#[cfg(feature = "encoder")] +pub fn log_msg_to_proto( + message: re_log_types::LogMsg, + compression: Compression, +) -> Result { + use crate::codec::arrow::encode_arrow; + use re_protos::log_msg::v0::{ + ArrowMsg, BlueprintActivationCommand, LogMsg as ProtoLogMsg, SetStoreInfo, + }; + + let proto_msg = match message { + re_log_types::LogMsg::SetStoreInfo(set_store_info) => { + let set_store_info: SetStoreInfo = set_store_info.into(); + ProtoLogMsg { + msg: Some(re_protos::log_msg::v0::log_msg::Msg::SetStoreInfo( + set_store_info, + )), + } + } + re_log_types::LogMsg::ArrowMsg(store_id, arrow_msg) => { + let payload = encode_arrow(&arrow_msg.schema, &arrow_msg.chunk, compression)?; + let arrow_msg = ArrowMsg { + store_id: Some(store_id.into()), + compression: match compression { + Compression::Off => re_protos::log_msg::v0::Compression::None as i32, + Compression::LZ4 => re_protos::log_msg::v0::Compression::Lz4 as i32, + }, + uncompressed_size: payload.uncompressed_size as i32, + encoding: re_protos::log_msg::v0::Encoding::ArrowIpc as i32, + payload: payload.data, + }; + ProtoLogMsg { + msg: Some(re_protos::log_msg::v0::log_msg::Msg::ArrowMsg(arrow_msg)), + } + } + re_log_types::LogMsg::BlueprintActivationCommand(blueprint_activation_command) => { + let blueprint_activation_command: BlueprintActivationCommand = + blueprint_activation_command.into(); + ProtoLogMsg { + msg: Some( + re_protos::log_msg::v0::log_msg::Msg::BlueprintActivationCommand( + blueprint_activation_command, + ), + ), + } + } + }; + + Ok(proto_msg) +} From 7fdcac592f5d2b5a695fe20ffcfa744e0917f725 Mon Sep 17 00:00:00 2001 From: jprochazk Date: Wed, 8 Jan 2025 21:04:29 +0100 Subject: [PATCH 04/25] Add `SizeBytes` impls for some types --- crates/build/re_build_info/Cargo.toml | 1 + .../build/re_build_info/src/crate_version.rs | 6 + crates/store/re_log_types/src/lib.rs | 127 ++++++++++++++++++ crates/utils/re_byte_size/src/arrow2_sizes.rs | 20 ++- 4 files changed, 153 insertions(+), 1 deletion(-) diff --git a/crates/build/re_build_info/Cargo.toml b/crates/build/re_build_info/Cargo.toml index a37d63ef9c54..bbd38124d7f0 100644 --- a/crates/build/re_build_info/Cargo.toml +++ b/crates/build/re_build_info/Cargo.toml @@ -27,6 +27,7 @@ serde = ["dep:serde"] [dependencies] +re_byte_size.workspace = true # Optional dependencies: serde = { workspace = true, optional = true, features = ["derive", "rc"] } diff --git a/crates/build/re_build_info/src/crate_version.rs b/crates/build/re_build_info/src/crate_version.rs index a886c4c828b1..fc67288c1e44 100644 --- a/crates/build/re_build_info/src/crate_version.rs +++ b/crates/build/re_build_info/src/crate_version.rs @@ -470,6 +470,12 @@ impl std::fmt::Display for CrateVersion { } } +impl re_byte_size::SizeBytes for CrateVersion { + fn heap_size_bytes(&self) -> u64 { + 0 + } +} + #[test] fn test_parse_version() { macro_rules! assert_parse_ok { diff --git a/crates/store/re_log_types/src/lib.rs b/crates/store/re_log_types/src/lib.rs index 3174228b7750..9c6965402f7f 100644 --- a/crates/store/re_log_types/src/lib.rs +++ b/crates/store/re_log_types/src/lib.rs @@ -37,6 +37,7 @@ mod protobuf_conversions; use std::sync::Arc; use re_build_info::CrateVersion; +use re_byte_size::SizeBytes; pub use self::arrow_msg::{ArrowChunkReleaseCallback, ArrowMsg}; pub use self::instance::Instance; @@ -635,6 +636,132 @@ pub fn build_frame_nr(frame_nr: impl TryInto) -> (Timeline, TimeInt) { ) } +impl SizeBytes for ApplicationId { + fn heap_size_bytes(&self) -> u64 { + self.0.heap_size_bytes() + } +} + +impl SizeBytes for StoreId { + fn heap_size_bytes(&self) -> u64 { + self.id.heap_size_bytes() + } +} + +impl SizeBytes for PythonVersion { + fn heap_size_bytes(&self) -> u64 { + let Self { + major: _, + minor: _, + patch: _, + suffix, + } = self; + + suffix.heap_size_bytes() + } +} + +impl SizeBytes for FileSource { + fn heap_size_bytes(&self) -> u64 { + match self { + Self::Uri | Self::Sdk | Self::Cli => 0, + Self::DragAndDrop { + recommended_application_id, + recommended_recording_id, + force_store_info, + } + | Self::FileDialog { + recommended_application_id, + recommended_recording_id, + force_store_info, + } => { + recommended_application_id.heap_size_bytes() + + recommended_recording_id.heap_size_bytes() + + force_store_info.heap_size_bytes() + } + } + } +} + +impl SizeBytes for StoreSource { + fn heap_size_bytes(&self) -> u64 { + match self { + Self::Unknown | Self::CSdk | Self::Viewer => 0, + Self::PythonSdk(python_version) => python_version.heap_size_bytes(), + Self::RustSdk { + rustc_version, + llvm_version, + } => rustc_version.heap_size_bytes() + llvm_version.heap_size_bytes(), + Self::File { file_source } => file_source.heap_size_bytes(), + Self::Other(description) => description.heap_size_bytes(), + } + } +} + +impl SizeBytes for StoreInfo { + fn heap_size_bytes(&self) -> u64 { + let Self { + application_id, + store_id, + cloned_from: _, + is_official_example: _, + started: _, + store_source, + store_version, + } = self; + + application_id.heap_size_bytes() + + store_id.heap_size_bytes() + + store_source.heap_size_bytes() + + store_version.heap_size_bytes() + } +} + +impl SizeBytes for SetStoreInfo { + fn heap_size_bytes(&self) -> u64 { + let Self { row_id, info } = self; + + row_id.heap_size_bytes() + info.heap_size_bytes() + } +} + +impl SizeBytes for BlueprintActivationCommand { + fn heap_size_bytes(&self) -> u64 { + 0 + } +} + +impl SizeBytes for ArrowMsg { + fn heap_size_bytes(&self) -> u64 { + let Self { + chunk_id, + timepoint_max, + schema, + chunk, + on_release: _, + } = self; + + chunk_id.heap_size_bytes() + + timepoint_max.heap_size_bytes() + + schema.heap_size_bytes() + + chunk.heap_size_bytes() + } +} + +impl SizeBytes for LogMsg { + fn heap_size_bytes(&self) -> u64 { + match self { + Self::SetStoreInfo(set_store_info) => set_store_info.heap_size_bytes(), + Self::ArrowMsg(store_id, arrow_msg) => { + store_id.heap_size_bytes() + arrow_msg.heap_size_bytes() + } + Self::BlueprintActivationCommand(blueprint_activation_command) => { + blueprint_activation_command.heap_size_bytes() + } + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/utils/re_byte_size/src/arrow2_sizes.rs b/crates/utils/re_byte_size/src/arrow2_sizes.rs index ddac7760dc41..21255a12b1ff 100644 --- a/crates/utils/re_byte_size/src/arrow2_sizes.rs +++ b/crates/utils/re_byte_size/src/arrow2_sizes.rs @@ -12,7 +12,8 @@ use arrow2::{ Utf8Array, }, bitmap::Bitmap, - datatypes::{DataType, Field, PhysicalType}, + chunk::Chunk, + datatypes::{DataType, Field, PhysicalType, Schema}, types::{NativeType, Offset}, }; @@ -552,3 +553,20 @@ impl SizeBytes for StructArray { estimated_bytes_size(self) as _ } } + +impl SizeBytes for Schema { + fn heap_size_bytes(&self) -> u64 { + let Self { fields, metadata } = self; + + fields.heap_size_bytes() + metadata.heap_size_bytes() + } +} + +impl SizeBytes for Chunk> { + fn heap_size_bytes(&self) -> u64 { + self.arrays() + .iter() + .map(SizeBytes::total_size_bytes) + .sum::() + } +} From 799ed2f23f8139c7c8fe5e4aa8fdd96176424015 Mon Sep 17 00:00:00 2001 From: jprochazk Date: Wed, 8 Jan 2025 21:04:56 +0100 Subject: [PATCH 05/25] Fix typos --- crates/store/re_grpc_client/Cargo.toml | 2 +- crates/store/re_grpc_client/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/store/re_grpc_client/Cargo.toml b/crates/store/re_grpc_client/Cargo.toml index efb330fc306e..7fcc26102500 100644 --- a/crates/store/re_grpc_client/Cargo.toml +++ b/crates/store/re_grpc_client/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "re_grpc_client" authors.workspace = true -description = "gRCP client for the Rerun Data Platform gRPC prototocl" +description = "gRCP client for the Rerun Data Platform gRPC protocol" edition.workspace = true homepage.workspace = true include.workspace = true diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index 2ce557c5890b..11279e16d127 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -90,7 +90,7 @@ const CATALOG_BP_STORE_ID: &str = "catalog_blueprint"; const CATALOG_REC_STORE_ID: &str = "catalog"; const CATALOG_APPLICATION_ID: &str = "redap_catalog"; -/// Stream an rrd file or metadsasta catalog over gRPC from a Rerun Data Platform server. +/// Stream an rrd file or metadata catalog over gRPC from a Rerun Data Platform server. /// /// `on_msg` can be used to wake up the UI thread on Wasm. pub fn stream_from_redap( From 796f736c54ef737295122077595ad5b600624ed0 Mon Sep 17 00:00:00 2001 From: jprochazk Date: Wed, 8 Jan 2025 21:05:03 +0100 Subject: [PATCH 06/25] Implement `re_grpc_server` --- ARCHITECTURE.md | 1 + Cargo.lock | 28 ++ Cargo.toml | 1 + crates/store/re_grpc_server/Cargo.toml | 44 ++ crates/store/re_grpc_server/README.md | 10 + crates/store/re_grpc_server/src/lib.rs | 537 +++++++++++++++++++++++++ 6 files changed, 621 insertions(+) create mode 100644 crates/store/re_grpc_server/Cargo.toml create mode 100644 crates/store/re_grpc_server/README.md create mode 100644 crates/store/re_grpc_server/src/lib.rs diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 1f07bcdb65ff..ccfe5dcc570d 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -188,6 +188,7 @@ Update instructions: | re_data_loader | Handles loading of Rerun data from file using data loader plugins | | re_data_source | Handles loading of Rerun data from different sources | | re_grpc_client | Communicate with the Rerun Data Platform over gRPC | +| re_grpc_server | Host an in-memory Storage Node | | re_sdk_comms | TCP communication between Rerun SDK and Rerun Server | | re_web_viewer_server | Serves the Rerun web viewer (Wasm and HTML) over HTTP | | re_ws_comms | WebSocket communication library (encoding, decoding, client, server) between a Rerun server and Viewer | diff --git a/Cargo.lock b/Cargo.lock index d02046f7e1d7..c00302a32dea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5617,6 +5617,7 @@ dependencies = [ name = "re_build_info" version = "0.22.0-alpha.1+dev" dependencies = [ + "re_byte_size", "serde", ] @@ -6005,6 +6006,32 @@ dependencies = [ "wasm-bindgen-futures", ] +[[package]] +name = "re_grpc_server" +version = "0.22.0-alpha.1+dev" +dependencies = [ + "prost", + "re_build_info", + "re_byte_size", + "re_chunk", + "re_error", + "re_format", + "re_log", + "re_log_encoding", + "re_log_types", + "re_memory", + "re_protos", + "re_smart_channel", + "re_tracing", + "re_types", + "thiserror 1.0.65", + "tokio", + "tokio-stream", + "tokio-util", + "tonic", + "url", +] + [[package]] name = "re_int_histogram" version = "0.22.0-alpha.1+dev" @@ -8820,6 +8847,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 155e5fb499cd..9de72086907f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -280,6 +280,7 @@ tinyvec = { version = "1.6", features = ["alloc", "rustc_1_55"] } tobj = "4.0" tokio = { version = "1.40.0", default-features = false } tokio-stream = "0.1.16" +tokio-util = { version = "0.7.12", default-features = false } toml = { version = "0.8.10", default-features = false } tonic = { version = "0.12.3", default-features = false } tonic-build = { version = "0.12.3", default-features = false } diff --git a/crates/store/re_grpc_server/Cargo.toml b/crates/store/re_grpc_server/Cargo.toml new file mode 100644 index 000000000000..443f27b457e6 --- /dev/null +++ b/crates/store/re_grpc_server/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "re_grpc_server" +authors.workspace = true +description = "gRCP server for the Rerun Data Platform gRPC protocol" +edition.workspace = true +homepage.workspace = true +include.workspace = true +license.workspace = true +publish = true +readme = "README.md" +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[lints] +workspace = true + +[package.metadata.docs.rs] +all-features = true + + +[dependencies] +re_build_info.workspace = true +re_byte_size.workspace = true +re_chunk.workspace = true +re_error.workspace = true +re_format.workspace = true +re_log.workspace = true +re_log_encoding = { workspace = true, features = ["encoder", "decoder"] } +re_log_types.workspace = true +re_memory.workspace = true +re_protos.workspace = true +re_smart_channel.workspace = true +re_tracing.workspace = true +re_types.workspace = true + +# External +prost.workspace = true +thiserror.workspace = true +tokio.workspace = true +tokio-stream = { workspace = true, features = ["sync"] } +tokio-util.workspace = true +tonic = { workspace = true, default-features = false, features = ["transport"] } +url.workspace = true diff --git a/crates/store/re_grpc_server/README.md b/crates/store/re_grpc_server/README.md new file mode 100644 index 000000000000..137ee18013cd --- /dev/null +++ b/crates/store/re_grpc_server/README.md @@ -0,0 +1,10 @@ +# re_grpc_server + +Part of the [`rerun`](https://github.com/rerun-io/rerun) family of crates. + +[![Latest version](https://img.shields.io/crates/v/re_grpc_server.svg)](https://crates.io/crates/re_grpc_server) +[![Documentation](https://docs.rs/re_grpc_server/badge.svg)](https://docs.rs/re_grpc_server) +![MIT](https://img.shields.io/badge/license-MIT-blue.svg) +![Apache](https://img.shields.io/badge/license-Apache-blue.svg) + +Server implementation of an in-memory Storage Node. diff --git a/crates/store/re_grpc_server/src/lib.rs b/crates/store/re_grpc_server/src/lib.rs new file mode 100644 index 000000000000..5172ef31c304 --- /dev/null +++ b/crates/store/re_grpc_server/src/lib.rs @@ -0,0 +1,537 @@ +use std::collections::VecDeque; +use std::pin::Pin; + +use prost::Message as _; +use re_memory::MemoryLimit; +use re_protos::{ + log_msg::v0::LogMsg as LogMsgProto, + sdk_comms::v0::{message_proxy_server, Empty}, +}; +use tokio::sync::broadcast; +use tokio::sync::mpsc; +use tokio::sync::oneshot; +use tokio_stream::wrappers::BroadcastStream; +use tokio_stream::Stream; +use tokio_stream::StreamExt as _; + +enum Event { + /// New client connected, requesting full history and subscribing to new messages. + NewClient(oneshot::Sender<(Vec, broadcast::Receiver)>), + + /// A client sent a message. + Message(LogMsgProto), +} + +struct QueueState { + server_memory_limit: MemoryLimit, + + /// New messages are broadcast to all clients. + broadcast_tx: broadcast::Sender, + + /// Channel for incoming events. + event_rx: mpsc::Receiver, + + /// Messages stored in order of arrival, and garbage collected if the server hits the memory limit. + temporal_message_queue: VecDeque, + + /// Total size of buffered messages in bytes. + /// + /// Excludes size of `commands`. + message_bytes: u64, + + /// Messages potentially out of order with the rest of the message stream. These are never garbage collected. + static_message_queue: VecDeque, +} + +impl QueueState { + fn new(server_memory_limit: MemoryLimit, event_rx: mpsc::Receiver) -> Self { + Self { + server_memory_limit, + broadcast_tx: broadcast::channel(1024).0, + event_rx, + temporal_message_queue: Default::default(), + message_bytes: 0, + static_message_queue: Default::default(), + } + } + + async fn run_in_place(mut self) { + loop { + let Some(event) = self.event_rx.recv().await else { + break; + }; + + match event { + Event::NewClient(channel) => self.handle_new_client(channel), + Event::Message(msg) => self.handle_msg(msg), + } + } + } + + fn handle_new_client( + &self, + channel: oneshot::Sender<(Vec, broadcast::Receiver)>, + ) { + channel + .send(( + // static messages come first + self.static_message_queue + .iter() + .cloned() + .chain(self.temporal_message_queue.iter().cloned()) + .collect(), + self.broadcast_tx.subscribe(), + )) + .ok(); + } + + fn handle_msg(&mut self, msg: LogMsgProto) { + self.broadcast_tx.send(msg.clone()).ok(); + + self.gc_if_using_too_much_ram(); + + let Some(inner) = &msg.msg else { + re_log::error!( + "{}", + re_protos::missing_field!(re_protos::log_msg::v0::LogMsg, "msg") + ); + return; + }; + + use re_protos::log_msg::v0::log_msg::Msg; + match inner { + // We consider `BlueprintActivationCommand` a temporal message, + // because it is sensitive to order, and it is safe to garbage collect + // if all the messages that came before it were also garbage collected, + // as it's the last message sent by the SDK when submitting a blueprint. + Msg::ArrowMsg(..) | Msg::BlueprintActivationCommand(..) => { + let approx_size_bytes = message_size(&msg); + self.message_bytes += approx_size_bytes; + self.temporal_message_queue.push_back(msg); + } + Msg::SetStoreInfo(..) => { + self.static_message_queue.push_back(msg); + } + } + } + + fn gc_if_using_too_much_ram(&mut self) { + re_tracing::profile_function!(); + + if let Some(max_bytes) = self.server_memory_limit.max_bytes { + let max_bytes = max_bytes as u64; + if max_bytes < self.message_bytes { + re_tracing::profile_scope!("Drop messages"); + re_log::info_once!( + "Memory limit ({}) exceeded. Dropping old log messages from the server. Clients connecting after this will not see the full history.", + re_format::format_bytes(max_bytes as _) + ); + + let bytes_to_free = self.message_bytes - max_bytes; + + let mut bytes_dropped = 0; + let mut messages_dropped = 0; + + while bytes_dropped < bytes_to_free { + // only drop messages from temporal queue + if let Some(msg) = self.temporal_message_queue.pop_front() { + bytes_dropped += message_size(&msg); + messages_dropped += 1; + } else { + break; + } + } + + re_log::trace!( + "Dropped {} bytes in {messages_dropped} message(s)", + re_format::format_bytes(bytes_dropped as _) + ); + } + } + } +} + +fn message_size(msg: &LogMsgProto) -> u64 { + // TODO(jan): don't use encoded len + msg.encoded_len() as u64 +} + +struct Queue { + _task_handle: tokio::task::JoinHandle<()>, + event_tx: mpsc::Sender, +} + +impl Queue { + fn spawn(server_memory_limit: MemoryLimit) -> Self { + let (event_tx, event_rx) = mpsc::channel(1024); + + let task_handle = tokio::spawn(async move { + QueueState::new(server_memory_limit, event_rx) + .run_in_place() + .await; + }); + + Self { + _task_handle: task_handle, + event_tx, + } + } + + async fn push(&self, msg: LogMsgProto) { + self.event_tx.send(Event::Message(msg)).await.ok(); + } + + async fn new_client_stream(&self) -> MessageStream { + let (sender, receiver) = oneshot::channel(); + if let Err(err) = self.event_tx.send(Event::NewClient(sender)).await { + re_log::error!("Error initializing new client: {err}"); + return Box::pin(tokio_stream::empty()); + }; + let (history, channel) = match receiver.await { + Ok(v) => v, + Err(err) => { + re_log::error!("Error initializing new client: {err}"); + return Box::pin(tokio_stream::empty()); + } + }; + + let history = tokio_stream::iter(history.into_iter().map(Ok)); + let channel = BroadcastStream::new(channel).map(|result| { + result.map_err(|err| { + re_log::error!("Error reading message from broadcast channel: {err}"); + tonic::Status::internal("internal channel error") + }) + }); + + Box::pin(history.merge(channel)) + } +} + +pub struct MessageProxy { + queue: Queue, +} + +impl MessageProxy { + pub fn new(server_memory_limit: MemoryLimit) -> Self { + Self { + queue: Queue::spawn(server_memory_limit), + } + } +} + +type MessageStream = Pin> + Send>>; + +#[tonic::async_trait] +impl message_proxy_server::MessageProxy for MessageProxy { + async fn write_messages( + &self, + request: tonic::Request>, + ) -> tonic::Result> { + let mut stream = request.into_inner(); + loop { + match stream.message().await { + Ok(Some(msg)) => { + self.queue.push(msg).await; + } + Ok(None) => { + // Connection was closed + break; + } + Err(err) => { + re_log::error!("Error while receiving messages: {err}"); + break; + } + } + } + + Ok(tonic::Response::new(Empty {})) + } + + type ReadMessagesStream = MessageStream; + + async fn read_messages( + &self, + _: tonic::Request, + ) -> tonic::Result> { + Ok(tonic::Response::new(self.queue.new_client_stream().await)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use message_proxy_server::MessageProxy; + use re_build_info::CrateVersion; + use re_chunk::RowId; + use re_log_encoding::protobuf_conversions::{log_msg_from_proto, log_msg_to_proto}; + use re_log_encoding::Compression; + use re_log_types::{ + ApplicationId, LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource, Time, + }; + use re_protos::log_msg; + use re_protos::sdk_comms::v0::{ + message_proxy_client::MessageProxyClient, message_proxy_server::MessageProxyServer, + }; + use std::net::SocketAddr; + use std::sync::Arc; + use tokio::net::TcpListener; + use tokio_util::sync::CancellationToken; + use tonic::transport::server::TcpIncoming; + use tonic::transport::Channel; + use tonic::transport::Endpoint; + + #[derive(Clone)] + struct Completion(Arc); + + impl Drop for Completion { + fn drop(&mut self) { + self.finish(); + } + } + + impl Completion { + fn new() -> Self { + Self(Arc::new(CancellationToken::new())) + } + + fn finish(&self) { + self.0.cancel(); + } + + async fn wait(&self) { + self.0.cancelled().await; + } + } + + /// Generates `n` log messages wrapped in a `SetStoreInfo` at the start and `BlueprintActivationCommand` at the end, + /// to exercise message ordering. + fn fake_log_stream(n: usize) -> Vec { + let store_id = StoreId::random(StoreKind::Blueprint); + + let mut messages = Vec::new(); + messages.push(LogMsg::SetStoreInfo(SetStoreInfo { + row_id: *RowId::new(), + info: StoreInfo { + application_id: ApplicationId("test".to_owned()), + store_id: store_id.clone(), + cloned_from: None, + is_official_example: true, + started: Time::now(), + store_source: StoreSource::RustSdk { + rustc_version: String::new(), + llvm_version: String::new(), + }, + store_version: Some(CrateVersion::LOCAL), + }, + })); + for _ in 0..n { + messages.push(LogMsg::ArrowMsg( + store_id.clone(), + re_chunk::Chunk::builder("test_entity".into()) + .with_archetype( + re_chunk::RowId::new(), + re_log_types::TimePoint::default().with( + re_log_types::Timeline::new_sequence("blueprint"), + re_log_types::TimeInt::from_milliseconds(re_log_types::NonMinI64::MIN), + ), + &re_types::blueprint::archetypes::Background::new( + re_types::blueprint::components::BackgroundKind::SolidColor, + ) + .with_color([255, 0, 0]), + ) + .build() + .unwrap() + .to_arrow_msg() + .unwrap(), + )); + } + messages.push(LogMsg::BlueprintActivationCommand( + re_log_types::BlueprintActivationCommand { + blueprint_id: store_id, + make_active: true, + make_default: true, + }, + )); + + messages + } + + async fn setup() -> (Completion, SocketAddr) { + let completion = Completion::new(); + + let tcp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = tcp_listener.local_addr().unwrap(); + + tokio::spawn({ + let completion = completion.clone(); + async move { + tonic::transport::Server::builder() + .add_service(MessageProxyServer::new(super::MessageProxy::new( + MemoryLimit::UNLIMITED, + ))) + .serve_with_incoming_shutdown( + TcpIncoming::from_listener(tcp_listener, true, None).unwrap(), + completion.wait(), + ) + .await + .unwrap(); + } + }); + + (completion, addr) + } + + async fn make_client(addr: SocketAddr) -> MessageProxyClient { + MessageProxyClient::new( + Endpoint::from_shared(format!("http://{}", addr)) + .unwrap() + .connect() + .await + .unwrap(), + ) + } + + async fn read_log_stream( + log_stream: &mut tonic::Response>, + n: usize, + ) -> Vec { + let mut stream_ref = log_stream + .get_mut() + .map(|result| log_msg_from_proto(result.unwrap()).unwrap()); + + let mut messages = Vec::new(); + for _ in 0..n { + messages.push(stream_ref.next().await.unwrap()); + } + messages + } + + #[tokio::test] + async fn pubsub_basic() { + let (completion, addr) = setup().await; + let mut client = make_client(addr).await; // We use the same client for both producing and consuming + let messages = fake_log_stream(3); + + // start reading + let mut log_stream = client.read_messages(Empty {}).await.unwrap(); + + // write a few messages + client + .write_messages(tokio_stream::iter( + messages + .clone() + .into_iter() + .map(|msg| log_msg_to_proto(msg, Compression::Off).unwrap()), + )) + .await + .unwrap(); + + // the messages should be echoed to us + let actual = read_log_stream(&mut log_stream, messages.len()).await; + assert_eq!(messages, actual); + + completion.finish(); + } + + #[tokio::test] + async fn pubsub_history() { + let (completion, addr) = setup().await; + let mut client = make_client(addr).await; // We use the same client for both producing and consuming + let messages = fake_log_stream(3); + + // don't read anything yet - these messages should be sent to us as part of history when we call `read_messages` later + + // Write a few messages: + client + .write_messages(tokio_stream::iter( + messages + .clone() + .into_iter() + .map(|msg| log_msg_to_proto(msg, Compression::Off).unwrap()), + )) + .await + .unwrap(); + + // Start reading now - we should receive full history at this point: + let mut log_stream = client.read_messages(Empty {}).await.unwrap(); + + let actual = read_log_stream(&mut log_stream, messages.len()).await; + assert_eq!(messages, actual); + + completion.finish(); + } + + #[tokio::test] + async fn one_producer_many_consumers() { + let (completion, addr) = setup().await; + let mut producer = make_client(addr).await; // We use separate clients for producing and consuming + let mut consumers = vec![make_client(addr).await, make_client(addr).await]; + let messages = fake_log_stream(3); + + // Initialize multiple read streams: + let mut log_streams = vec![]; + for consumer in &mut consumers { + log_streams.push(consumer.read_messages(Empty {}).await.unwrap()); + } + + // Write a few messages using our single producer: + producer + .write_messages(tokio_stream::iter( + messages + .clone() + .into_iter() + .map(|msg| log_msg_to_proto(msg, Compression::Off).unwrap()), + )) + .await + .unwrap(); + + // Each consumer should've received them: + for log_stream in &mut log_streams { + let actual = read_log_stream(log_stream, messages.len()).await; + assert_eq!(messages, actual); + } + + completion.finish(); + } + + #[tokio::test] + async fn many_producers_many_consumers() { + let (completion, addr) = setup().await; + let mut producers = vec![make_client(addr).await, make_client(addr).await]; + let mut consumers = vec![make_client(addr).await, make_client(addr).await]; + let messages = fake_log_stream(3); + + // Initialize multiple read streams: + let mut log_streams = vec![]; + for consumer in &mut consumers { + log_streams.push(consumer.read_messages(Empty {}).await.unwrap()); + } + + // Write a few messages using each producer: + for producer in &mut producers { + producer + .write_messages(tokio_stream::iter( + messages + .clone() + .into_iter() + .map(|msg| log_msg_to_proto(msg, Compression::Off).unwrap()), + )) + .await + .unwrap(); + } + + let expected = [messages.clone(), messages.clone()].concat(); + + // Each consumer should've received one set of messages from each producer. + // Note that in this test we also guarantee the order of messages across producers, + // due to the `write_messages` calls being sequential. + + for log_stream in &mut log_streams { + let actual = read_log_stream(log_stream, expected.len()).await; + assert_eq!(expected, actual); + } + + completion.finish(); + } +} From f59825a64855132eb9defee48ca8d13a8725c839 Mon Sep 17 00:00:00 2001 From: jprochazk Date: Wed, 8 Jan 2025 21:23:06 +0100 Subject: [PATCH 07/25] Fix lints --- Cargo.lock | 5 ----- crates/store/re_grpc_server/Cargo.toml | 5 ----- crates/store/re_grpc_server/README.md | 4 ++-- crates/store/re_grpc_server/src/lib.rs | 8 +++++--- 4 files changed, 7 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c00302a32dea..70d8121efe31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6012,24 +6012,19 @@ version = "0.22.0-alpha.1+dev" dependencies = [ "prost", "re_build_info", - "re_byte_size", "re_chunk", - "re_error", "re_format", "re_log", "re_log_encoding", "re_log_types", "re_memory", "re_protos", - "re_smart_channel", "re_tracing", "re_types", - "thiserror 1.0.65", "tokio", "tokio-stream", "tokio-util", "tonic", - "url", ] [[package]] diff --git a/crates/store/re_grpc_server/Cargo.toml b/crates/store/re_grpc_server/Cargo.toml index 443f27b457e6..525ffcb5d3d4 100644 --- a/crates/store/re_grpc_server/Cargo.toml +++ b/crates/store/re_grpc_server/Cargo.toml @@ -21,24 +21,19 @@ all-features = true [dependencies] re_build_info.workspace = true -re_byte_size.workspace = true re_chunk.workspace = true -re_error.workspace = true re_format.workspace = true re_log.workspace = true re_log_encoding = { workspace = true, features = ["encoder", "decoder"] } re_log_types.workspace = true re_memory.workspace = true re_protos.workspace = true -re_smart_channel.workspace = true re_tracing.workspace = true re_types.workspace = true # External prost.workspace = true -thiserror.workspace = true tokio.workspace = true tokio-stream = { workspace = true, features = ["sync"] } tokio-util.workspace = true tonic = { workspace = true, default-features = false, features = ["transport"] } -url.workspace = true diff --git a/crates/store/re_grpc_server/README.md b/crates/store/re_grpc_server/README.md index 137ee18013cd..3e60ed9287d0 100644 --- a/crates/store/re_grpc_server/README.md +++ b/crates/store/re_grpc_server/README.md @@ -2,8 +2,8 @@ Part of the [`rerun`](https://github.com/rerun-io/rerun) family of crates. -[![Latest version](https://img.shields.io/crates/v/re_grpc_server.svg)](https://crates.io/crates/re_grpc_server) -[![Documentation](https://docs.rs/re_grpc_server/badge.svg)](https://docs.rs/re_grpc_server) +[![Latest version](https://img.shields.io/crates/v/re_grpc_server.svg?speculative-link)](https://crates.io/crates/re_grpc_server?speculative-link) +[![Documentation](https://docs.rs/re_grpc_server/badge.svg?speculative-link)](https://docs.rs/re_grpc_server?speculative-link) ![MIT](https://img.shields.io/badge/license-MIT-blue.svg) ![Apache](https://img.shields.io/badge/license-Apache-blue.svg) diff --git a/crates/store/re_grpc_server/src/lib.rs b/crates/store/re_grpc_server/src/lib.rs index 5172ef31c304..01f1ef94c23d 100644 --- a/crates/store/re_grpc_server/src/lib.rs +++ b/crates/store/re_grpc_server/src/lib.rs @@ -261,7 +261,6 @@ impl message_proxy_server::MessageProxy for MessageProxy { mod tests { use super::*; - use message_proxy_server::MessageProxy; use re_build_info::CrateVersion; use re_chunk::RowId; use re_log_encoding::protobuf_conversions::{log_msg_from_proto, log_msg_to_proto}; @@ -269,7 +268,6 @@ mod tests { use re_log_types::{ ApplicationId, LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource, Time, }; - use re_protos::log_msg; use re_protos::sdk_comms::v0::{ message_proxy_client::MessageProxyClient, message_proxy_server::MessageProxyServer, }; @@ -358,6 +356,10 @@ mod tests { } async fn setup() -> (Completion, SocketAddr) { + setup_with_memory_limit(MemoryLimit::UNLIMITED).await + } + + async fn setup_with_memory_limit(memory_limit: MemoryLimit) -> (Completion, SocketAddr) { let completion = Completion::new(); let tcp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); @@ -368,7 +370,7 @@ mod tests { async move { tonic::transport::Server::builder() .add_service(MessageProxyServer::new(super::MessageProxy::new( - MemoryLimit::UNLIMITED, + memory_limit, ))) .serve_with_incoming_shutdown( TcpIncoming::from_listener(tcp_listener, true, None).unwrap(), From f04124cce9ee6fc65577417cb1b57cc695157362 Mon Sep 17 00:00:00 2001 From: jprochazk Date: Thu, 9 Jan 2025 13:51:10 +0100 Subject: [PATCH 08/25] Implement `SizeBytes` for `LogMsg` in `re_protos` --- Cargo.lock | 3 +- crates/store/re_protos/Cargo.toml | 2 + crates/store/re_protos/src/lib.rs | 156 +++++++++++++++++++++++++++ crates/utils/re_byte_size/Cargo.toml | 1 - crates/utils/re_byte_size/src/lib.rs | 7 -- crates/utils/re_tuid/Cargo.toml | 1 + crates/utils/re_tuid/src/lib.rs | 7 ++ 7 files changed, 168 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 70d8121efe31..c927e120ea60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5641,7 +5641,6 @@ dependencies = [ "arrow", "half", "re_arrow2", - "re_tuid", "smallvec", ] @@ -6170,6 +6169,7 @@ name = "re_protos" version = "0.22.0-alpha.1+dev" dependencies = [ "prost", + "re_byte_size", "thiserror 1.0.65", "tonic", "tonic-web-wasm-client", @@ -6467,6 +6467,7 @@ dependencies = [ "document-features", "getrandom", "once_cell", + "re_byte_size", "re_protos", "serde", "web-time", diff --git a/crates/store/re_protos/Cargo.toml b/crates/store/re_protos/Cargo.toml index 74a75d7759c4..85b6e3b78164 100644 --- a/crates/store/re_protos/Cargo.toml +++ b/crates/store/re_protos/Cargo.toml @@ -13,6 +13,8 @@ rust-version.workspace = true version.workspace = true [dependencies] +re_byte_size.workspace = true + # External prost.workspace = true thiserror.workspace = true diff --git a/crates/store/re_protos/src/lib.rs b/crates/store/re_protos/src/lib.rs index ed9daa35efbf..e41036697752 100644 --- a/crates/store/re_protos/src/lib.rs +++ b/crates/store/re_protos/src/lib.rs @@ -121,3 +121,159 @@ macro_rules! invalid_field { $crate::TypeConversionError::invalid_field::<$type>($field, &$reason) }; } + +mod sizes { + use re_byte_size::SizeBytes; + + impl SizeBytes for crate::log_msg::v0::LogMsg { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { msg } = self; + + match msg { + Some(msg) => msg.heap_size_bytes(), + None => 0, + } + } + } + + impl SizeBytes for crate::log_msg::v0::log_msg::Msg { + #[inline] + fn heap_size_bytes(&self) -> u64 { + match self { + Self::SetStoreInfo(set_store_info) => set_store_info.heap_size_bytes(), + Self::ArrowMsg(arrow_msg) => arrow_msg.heap_size_bytes(), + Self::BlueprintActivationCommand(blueprint_activation_command) => { + blueprint_activation_command.heap_size_bytes() + } + } + } + } + + impl SizeBytes for crate::log_msg::v0::SetStoreInfo { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { row_id, info } = self; + + row_id.heap_size_bytes() + info.heap_size_bytes() + } + } + + impl SizeBytes for crate::common::v0::Tuid { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { inc, time_ns } = self; + + inc.heap_size_bytes() + time_ns.heap_size_bytes() + } + } + + impl SizeBytes for crate::log_msg::v0::StoreInfo { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { + application_id, + store_id, + is_official_example, + started, + store_source, + store_version, + } = self; + + application_id.heap_size_bytes() + + store_id.heap_size_bytes() + + is_official_example.heap_size_bytes() + + started.heap_size_bytes() + + store_source.heap_size_bytes() + + store_version.heap_size_bytes() + } + } + + impl SizeBytes for crate::common::v0::ApplicationId { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { id } = self; + + id.heap_size_bytes() + } + } + + impl SizeBytes for crate::common::v0::StoreId { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { kind, id } = self; + + kind.heap_size_bytes() + id.heap_size_bytes() + } + } + + impl SizeBytes for crate::common::v0::Time { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { nanos_since_epoch } = self; + + nanos_since_epoch.heap_size_bytes() + } + } + + impl SizeBytes for crate::log_msg::v0::StoreSource { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { kind, extra } = self; + + kind.heap_size_bytes() + extra.heap_size_bytes() + } + } + + impl SizeBytes for crate::log_msg::v0::StoreSourceExtra { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { payload } = self; + + payload.heap_size_bytes() + } + } + + impl SizeBytes for crate::log_msg::v0::StoreVersion { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { crate_version_bits } = self; + + crate_version_bits.heap_size_bytes() + } + } + + impl SizeBytes for crate::log_msg::v0::ArrowMsg { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { + store_id, + compression, + uncompressed_size, + encoding, + payload, + } = self; + + store_id.heap_size_bytes() + + compression.heap_size_bytes() + + uncompressed_size.heap_size_bytes() + + encoding.heap_size_bytes() + + payload.heap_size_bytes() + } + } + + impl SizeBytes for crate::log_msg::v0::BlueprintActivationCommand { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { + blueprint_id, + make_active, + make_default, + } = self; + + blueprint_id.heap_size_bytes() + + make_active.heap_size_bytes() + + make_default.heap_size_bytes() + } + } +} diff --git a/crates/utils/re_byte_size/Cargo.toml b/crates/utils/re_byte_size/Cargo.toml index 6cf0e1858b4b..21955ff633e5 100644 --- a/crates/utils/re_byte_size/Cargo.toml +++ b/crates/utils/re_byte_size/Cargo.toml @@ -28,4 +28,3 @@ arrow.workspace = true arrow2.workspace = true half.workspace = true smallvec.workspace = true -re_tuid.workspace = true # TODO(emilk): maybe re_tuid should depend on re_byte_size instead? diff --git a/crates/utils/re_byte_size/src/lib.rs b/crates/utils/re_byte_size/src/lib.rs index 3459d5d21b5b..c276fb70d7c3 100644 --- a/crates/utils/re_byte_size/src/lib.rs +++ b/crates/utils/re_byte_size/src/lib.rs @@ -38,10 +38,3 @@ pub trait SizeBytes { false } } - -impl SizeBytes for re_tuid::Tuid { - #[inline] - fn heap_size_bytes(&self) -> u64 { - 0 - } -} diff --git a/crates/utils/re_tuid/Cargo.toml b/crates/utils/re_tuid/Cargo.toml index 3591c5e5c8ac..e60a198f6f25 100644 --- a/crates/utils/re_tuid/Cargo.toml +++ b/crates/utils/re_tuid/Cargo.toml @@ -27,6 +27,7 @@ serde = ["dep:serde"] [dependencies] +re_byte_size.workspace = true re_protos.workspace = true document-features.workspace = true diff --git a/crates/utils/re_tuid/src/lib.rs b/crates/utils/re_tuid/src/lib.rs index 5a19ef51df71..bcfdaa86aeee 100644 --- a/crates/utils/re_tuid/src/lib.rs +++ b/crates/utils/re_tuid/src/lib.rs @@ -214,6 +214,13 @@ fn random_u64() -> u64 { u64::from_le_bytes(bytes) } +impl re_byte_size::SizeBytes for Tuid { + #[inline] + fn heap_size_bytes(&self) -> u64 { + 0 + } +} + #[test] fn test_tuid() { use std::collections::{BTreeSet, HashSet}; From b8cd115ef69abc711eda45d581887db43735cf61 Mon Sep 17 00:00:00 2001 From: jprochazk Date: Thu, 9 Jan 2025 13:51:59 +0100 Subject: [PATCH 09/25] Use `SizeBytes` instead of `encoded_len` + add memory limit tests --- Cargo.lock | 1 + crates/store/re_grpc_server/Cargo.toml | 1 + crates/store/re_grpc_server/src/lib.rs | 77 +++++++++++++++++++++++++- 3 files changed, 77 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c927e120ea60..0b05e3fe4b75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6011,6 +6011,7 @@ version = "0.22.0-alpha.1+dev" dependencies = [ "prost", "re_build_info", + "re_byte_size", "re_chunk", "re_format", "re_log", diff --git a/crates/store/re_grpc_server/Cargo.toml b/crates/store/re_grpc_server/Cargo.toml index 525ffcb5d3d4..2c9593cb6bc5 100644 --- a/crates/store/re_grpc_server/Cargo.toml +++ b/crates/store/re_grpc_server/Cargo.toml @@ -21,6 +21,7 @@ all-features = true [dependencies] re_build_info.workspace = true +re_byte_size.workspace = true re_chunk.workspace = true re_format.workspace = true re_log.workspace = true diff --git a/crates/store/re_grpc_server/src/lib.rs b/crates/store/re_grpc_server/src/lib.rs index 01f1ef94c23d..8c8c68368f99 100644 --- a/crates/store/re_grpc_server/src/lib.rs +++ b/crates/store/re_grpc_server/src/lib.rs @@ -1,7 +1,7 @@ use std::collections::VecDeque; use std::pin::Pin; -use prost::Message as _; +use re_byte_size::SizeBytes; use re_memory::MemoryLimit; use re_protos::{ log_msg::v0::LogMsg as LogMsgProto, @@ -153,7 +153,7 @@ impl QueueState { fn message_size(msg: &LogMsgProto) -> u64 { // TODO(jan): don't use encoded len - msg.encoded_len() as u64 + msg.total_size_bytes() } struct Queue { @@ -273,6 +273,7 @@ mod tests { }; use std::net::SocketAddr; use std::sync::Arc; + use std::time::Duration; use tokio::net::TcpListener; use tokio_util::sync::CancellationToken; use tonic::transport::server::TcpIncoming; @@ -536,4 +537,76 @@ mod tests { completion.finish(); } + + #[tokio::test] + async fn memory_limit_drops_messages() { + // Use an absurdly low memory limit to force all messages to be dropped immediately from history + let (completion, addr) = setup_with_memory_limit(MemoryLimit::from_bytes(1)).await; + let mut client = make_client(addr).await; + let messages = fake_log_stream(3); + + // Write some messages + client + .write_messages(tokio_stream::iter( + messages + .clone() + .into_iter() + .map(|msg| log_msg_to_proto(msg, Compression::Off).unwrap()), + )) + .await + .unwrap(); + + // Start reading + let mut log_stream = client.read_messages(Empty {}).await.unwrap(); + let mut actual = vec![]; + loop { + let timeout_stream = log_stream.get_mut().timeout(Duration::from_millis(100)); + tokio::pin!(timeout_stream); + let timeout_result = timeout_stream.try_next().await; + match timeout_result { + Ok(Some(value)) => { + actual.push(log_msg_from_proto(value.unwrap()).unwrap()); + } + + // Stream closed + Ok(None) => break, + + // Timed out + Err(_) => break, + } + } + + // The GC runs _before_ a message is stored, so we should see the static message, and the last message sent. + assert_eq!(actual.len(), 2); + assert_eq!(&actual[0], &messages[0]); + assert_eq!(&actual[1], messages.last().unwrap()); + } + + #[tokio::test] + async fn memory_limit_does_not_interrupt_stream() { + // Use an absurdly low memory limit to force all messages to be dropped immediately from history + let (completion, addr) = setup_with_memory_limit(MemoryLimit::from_bytes(1)).await; + let mut client = make_client(addr).await; // We use the same client for both producing and consuming + let messages = fake_log_stream(3); + + // Start reading + let mut log_stream = client.read_messages(Empty {}).await.unwrap(); + + // Write a few messages + client + .write_messages(tokio_stream::iter( + messages + .clone() + .into_iter() + .map(|msg| log_msg_to_proto(msg, Compression::Off).unwrap()), + )) + .await + .unwrap(); + + // The messages should be echoed to us, even though none of them will be stored in history + let actual = read_log_stream(&mut log_stream, messages.len()).await; + assert_eq!(messages, actual); + + completion.finish(); + } } From 306877b1b41a9bdfd528246c01029e2af73a08de Mon Sep 17 00:00:00 2001 From: jprochazk Date: Thu, 9 Jan 2025 13:53:00 +0100 Subject: [PATCH 10/25] Fix lint --- crates/store/re_grpc_server/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/store/re_grpc_server/src/lib.rs b/crates/store/re_grpc_server/src/lib.rs index 8c8c68368f99..eb00e4c66729 100644 --- a/crates/store/re_grpc_server/src/lib.rs +++ b/crates/store/re_grpc_server/src/lib.rs @@ -387,7 +387,7 @@ mod tests { async fn make_client(addr: SocketAddr) -> MessageProxyClient { MessageProxyClient::new( - Endpoint::from_shared(format!("http://{}", addr)) + Endpoint::from_shared(format!("http://{addr}")) .unwrap() .connect() .await From cc4b71267353fede6ad16a5800c99e36f6ef599f Mon Sep 17 00:00:00 2001 From: jprochazk Date: Thu, 9 Jan 2025 14:08:32 +0100 Subject: [PATCH 11/25] Fix lints --- crates/store/re_grpc_server/src/lib.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/crates/store/re_grpc_server/src/lib.rs b/crates/store/re_grpc_server/src/lib.rs index eb00e4c66729..b0fcfb3753cc 100644 --- a/crates/store/re_grpc_server/src/lib.rs +++ b/crates/store/re_grpc_server/src/lib.rs @@ -568,11 +568,8 @@ mod tests { actual.push(log_msg_from_proto(value.unwrap()).unwrap()); } - // Stream closed - Ok(None) => break, - - // Timed out - Err(_) => break, + // Stream closed | Timed out + Ok(None) | Err(_) => break, } } @@ -580,6 +577,8 @@ mod tests { assert_eq!(actual.len(), 2); assert_eq!(&actual[0], &messages[0]); assert_eq!(&actual[1], messages.last().unwrap()); + + completion.finish(); } #[tokio::test] From 2fb22b989bd5c242cb88ae1cb09c13d216649d85 Mon Sep 17 00:00:00 2001 From: jprochazk Date: Thu, 9 Jan 2025 14:22:24 +0100 Subject: [PATCH 12/25] Fix lints --- Cargo.lock | 1 - crates/store/re_grpc_server/Cargo.toml | 1 - crates/store/re_log_encoding/src/protobuf_conversions.rs | 7 +++---- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dc652c8a4f51..b01f47f3bdd7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6008,7 +6008,6 @@ dependencies = [ name = "re_grpc_server" version = "0.22.0-alpha.1+dev" dependencies = [ - "prost", "re_build_info", "re_byte_size", "re_chunk", diff --git a/crates/store/re_grpc_server/Cargo.toml b/crates/store/re_grpc_server/Cargo.toml index 2c9593cb6bc5..ab949bf17fe8 100644 --- a/crates/store/re_grpc_server/Cargo.toml +++ b/crates/store/re_grpc_server/Cargo.toml @@ -33,7 +33,6 @@ re_tracing.workspace = true re_types.workspace = true # External -prost.workspace = true tokio.workspace = true tokio-stream = { workspace = true, features = ["sync"] } tokio-util.workspace = true diff --git a/crates/store/re_log_encoding/src/protobuf_conversions.rs b/crates/store/re_log_encoding/src/protobuf_conversions.rs index 86562f3adcdc..4ad6580d2950 100644 --- a/crates/store/re_log_encoding/src/protobuf_conversions.rs +++ b/crates/store/re_log_encoding/src/protobuf_conversions.rs @@ -1,7 +1,6 @@ use re_protos::missing_field; use crate::codec::CodecError; -use crate::Compression; impl From for crate::Compression { fn from(value: re_protos::log_msg::v0::Compression) -> Self { @@ -71,7 +70,7 @@ pub fn log_msg_from_proto( #[cfg(feature = "encoder")] pub fn log_msg_to_proto( message: re_log_types::LogMsg, - compression: Compression, + compression: crate::Compression, ) -> Result { use crate::codec::arrow::encode_arrow; use re_protos::log_msg::v0::{ @@ -92,8 +91,8 @@ pub fn log_msg_to_proto( let arrow_msg = ArrowMsg { store_id: Some(store_id.into()), compression: match compression { - Compression::Off => re_protos::log_msg::v0::Compression::None as i32, - Compression::LZ4 => re_protos::log_msg::v0::Compression::Lz4 as i32, + crate::Compression::Off => re_protos::log_msg::v0::Compression::None as i32, + crate::Compression::LZ4 => re_protos::log_msg::v0::Compression::Lz4 as i32, }, uncompressed_size: payload.uncompressed_size as i32, encoding: re_protos::log_msg::v0::Encoding::ArrowIpc as i32, From 4cd889c12e6e52b28f2a150b94c7a2b07fdd97a0 Mon Sep 17 00:00:00 2001 From: jprochazk Date: Thu, 9 Jan 2025 14:47:14 +0100 Subject: [PATCH 13/25] Add top-level doc --- crates/store/re_grpc_server/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/store/re_grpc_server/src/lib.rs b/crates/store/re_grpc_server/src/lib.rs index b0fcfb3753cc..3f50d45f9ed9 100644 --- a/crates/store/re_grpc_server/src/lib.rs +++ b/crates/store/re_grpc_server/src/lib.rs @@ -1,3 +1,5 @@ +//! Server implementation of an in-memory Storage Node. + use std::collections::VecDeque; use std::pin::Pin; From 0667da6f67b492360bf90607fdf3f2a67e81a965 Mon Sep 17 00:00:00 2001 From: jprochazk Date: Thu, 9 Jan 2025 15:25:52 +0100 Subject: [PATCH 14/25] Mark `heap_size_bytes` as `inline` --- crates/build/re_build_info/src/crate_version.rs | 1 + crates/store/re_log_types/src/lib.rs | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/crates/build/re_build_info/src/crate_version.rs b/crates/build/re_build_info/src/crate_version.rs index fc67288c1e44..e31080e8b4f9 100644 --- a/crates/build/re_build_info/src/crate_version.rs +++ b/crates/build/re_build_info/src/crate_version.rs @@ -471,6 +471,7 @@ impl std::fmt::Display for CrateVersion { } impl re_byte_size::SizeBytes for CrateVersion { + #[inline] fn heap_size_bytes(&self) -> u64 { 0 } diff --git a/crates/store/re_log_types/src/lib.rs b/crates/store/re_log_types/src/lib.rs index 9c6965402f7f..7a084fa33af2 100644 --- a/crates/store/re_log_types/src/lib.rs +++ b/crates/store/re_log_types/src/lib.rs @@ -637,18 +637,21 @@ pub fn build_frame_nr(frame_nr: impl TryInto) -> (Timeline, TimeInt) { } impl SizeBytes for ApplicationId { + #[inline] fn heap_size_bytes(&self) -> u64 { self.0.heap_size_bytes() } } impl SizeBytes for StoreId { + #[inline] fn heap_size_bytes(&self) -> u64 { self.id.heap_size_bytes() } } impl SizeBytes for PythonVersion { + #[inline] fn heap_size_bytes(&self) -> u64 { let Self { major: _, @@ -662,6 +665,7 @@ impl SizeBytes for PythonVersion { } impl SizeBytes for FileSource { + #[inline] fn heap_size_bytes(&self) -> u64 { match self { Self::Uri | Self::Sdk | Self::Cli => 0, @@ -684,6 +688,7 @@ impl SizeBytes for FileSource { } impl SizeBytes for StoreSource { + #[inline] fn heap_size_bytes(&self) -> u64 { match self { Self::Unknown | Self::CSdk | Self::Viewer => 0, @@ -699,6 +704,7 @@ impl SizeBytes for StoreSource { } impl SizeBytes for StoreInfo { + #[inline] fn heap_size_bytes(&self) -> u64 { let Self { application_id, @@ -718,6 +724,7 @@ impl SizeBytes for StoreInfo { } impl SizeBytes for SetStoreInfo { + #[inline] fn heap_size_bytes(&self) -> u64 { let Self { row_id, info } = self; @@ -726,12 +733,14 @@ impl SizeBytes for SetStoreInfo { } impl SizeBytes for BlueprintActivationCommand { + #[inline] fn heap_size_bytes(&self) -> u64 { 0 } } impl SizeBytes for ArrowMsg { + #[inline] fn heap_size_bytes(&self) -> u64 { let Self { chunk_id, @@ -749,6 +758,7 @@ impl SizeBytes for ArrowMsg { } impl SizeBytes for LogMsg { + #[inline] fn heap_size_bytes(&self) -> u64 { match self { Self::SetStoreInfo(set_store_info) => set_store_info.heap_size_bytes(), From 792cfdafd7ef8338e95029b97e52ac30b33e2079 Mon Sep 17 00:00:00 2001 From: jprochazk Date: Thu, 9 Jan 2025 15:27:47 +0100 Subject: [PATCH 15/25] More inlines --- crates/utils/re_byte_size/src/arrow2_sizes.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/utils/re_byte_size/src/arrow2_sizes.rs b/crates/utils/re_byte_size/src/arrow2_sizes.rs index 21255a12b1ff..732c60506025 100644 --- a/crates/utils/re_byte_size/src/arrow2_sizes.rs +++ b/crates/utils/re_byte_size/src/arrow2_sizes.rs @@ -555,6 +555,7 @@ impl SizeBytes for StructArray { } impl SizeBytes for Schema { + #[inline] fn heap_size_bytes(&self) -> u64 { let Self { fields, metadata } = self; @@ -563,6 +564,7 @@ impl SizeBytes for Schema { } impl SizeBytes for Chunk> { + #[inline] fn heap_size_bytes(&self) -> u64 { self.arrays() .iter() From c2a850d71e4f1d6b760eee38118d7862b9775956 Mon Sep 17 00:00:00 2001 From: jprochazk Date: Thu, 9 Jan 2025 15:30:55 +0100 Subject: [PATCH 16/25] Update comment --- crates/store/re_grpc_server/src/lib.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/crates/store/re_grpc_server/src/lib.rs b/crates/store/re_grpc_server/src/lib.rs index 3f50d45f9ed9..b9bb9cd1a8fc 100644 --- a/crates/store/re_grpc_server/src/lib.rs +++ b/crates/store/re_grpc_server/src/lib.rs @@ -36,10 +36,8 @@ struct QueueState { /// Messages stored in order of arrival, and garbage collected if the server hits the memory limit. temporal_message_queue: VecDeque, - /// Total size of buffered messages in bytes. - /// - /// Excludes size of `commands`. - message_bytes: u64, + /// Total size of `temporal_message_queue` in bytes. + temporal_message_bytes: u64, /// Messages potentially out of order with the rest of the message stream. These are never garbage collected. static_message_queue: VecDeque, @@ -52,7 +50,7 @@ impl QueueState { broadcast_tx: broadcast::channel(1024).0, event_rx, temporal_message_queue: Default::default(), - message_bytes: 0, + temporal_message_bytes: 0, static_message_queue: Default::default(), } } @@ -108,7 +106,7 @@ impl QueueState { // as it's the last message sent by the SDK when submitting a blueprint. Msg::ArrowMsg(..) | Msg::BlueprintActivationCommand(..) => { let approx_size_bytes = message_size(&msg); - self.message_bytes += approx_size_bytes; + self.temporal_message_bytes += approx_size_bytes; self.temporal_message_queue.push_back(msg); } Msg::SetStoreInfo(..) => { @@ -122,14 +120,14 @@ impl QueueState { if let Some(max_bytes) = self.server_memory_limit.max_bytes { let max_bytes = max_bytes as u64; - if max_bytes < self.message_bytes { + if max_bytes < self.temporal_message_bytes { re_tracing::profile_scope!("Drop messages"); re_log::info_once!( "Memory limit ({}) exceeded. Dropping old log messages from the server. Clients connecting after this will not see the full history.", re_format::format_bytes(max_bytes as _) ); - let bytes_to_free = self.message_bytes - max_bytes; + let bytes_to_free = self.temporal_message_bytes - max_bytes; let mut bytes_dropped = 0; let mut messages_dropped = 0; From 79f1a7e69db388c93bfb9d38beaa10e346c4de59 Mon Sep 17 00:00:00 2001 From: jprochazk Date: Thu, 9 Jan 2025 15:32:43 +0100 Subject: [PATCH 17/25] Remove outdated comment --- crates/store/re_grpc_server/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/store/re_grpc_server/src/lib.rs b/crates/store/re_grpc_server/src/lib.rs index b9bb9cd1a8fc..a8795671aae6 100644 --- a/crates/store/re_grpc_server/src/lib.rs +++ b/crates/store/re_grpc_server/src/lib.rs @@ -152,7 +152,6 @@ impl QueueState { } fn message_size(msg: &LogMsgProto) -> u64 { - // TODO(jan): don't use encoded len msg.total_size_bytes() } From 31915c5aa2890763bf52532ef06da96160c044ea Mon Sep 17 00:00:00 2001 From: jprochazk Date: Thu, 9 Jan 2025 15:59:41 +0100 Subject: [PATCH 18/25] Add comment about channel capacity --- crates/store/re_grpc_server/src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/store/re_grpc_server/src/lib.rs b/crates/store/re_grpc_server/src/lib.rs index a8795671aae6..2bdb91779a22 100644 --- a/crates/store/re_grpc_server/src/lib.rs +++ b/crates/store/re_grpc_server/src/lib.rs @@ -47,6 +47,8 @@ impl QueueState { fn new(server_memory_limit: MemoryLimit, event_rx: mpsc::Receiver) -> Self { Self { server_memory_limit, + // Channel capacity is completely arbitrary. + // We just want enough capacity to handle bursts of messages. broadcast_tx: broadcast::channel(1024).0, event_rx, temporal_message_queue: Default::default(), @@ -162,6 +164,8 @@ struct Queue { impl Queue { fn spawn(server_memory_limit: MemoryLimit) -> Self { + // Channel capacity is completely arbitrary. + // We just want something large enough to handle bursts of messages. let (event_tx, event_rx) = mpsc::channel(1024); let task_handle = tokio::spawn(async move { From 9f5cf252b681d9bb7e33f481175cdad7fed3b57f Mon Sep 17 00:00:00 2001 From: jprochazk Date: Thu, 9 Jan 2025 15:59:57 +0100 Subject: [PATCH 19/25] Add todo --- crates/store/re_protos/proto/rerun/v0/sdk_comms.proto | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/store/re_protos/proto/rerun/v0/sdk_comms.proto b/crates/store/re_protos/proto/rerun/v0/sdk_comms.proto index 6edc3148dec9..27fc9c596673 100644 --- a/crates/store/re_protos/proto/rerun/v0/sdk_comms.proto +++ b/crates/store/re_protos/proto/rerun/v0/sdk_comms.proto @@ -15,6 +15,8 @@ import "rerun/v0/common.proto"; // Whenever `ReadMessages` is called, all buffered messages are sent in the order they were received. // The stream will then also yield any new messages passed to `WriteMessages` from any client. service MessageProxy { + // TODO(jan): Would it be more efficient to send a "message batch" instead of individual messages? + // It may allow us to amortize the overhead of the gRPC protocol. rpc WriteMessages(stream rerun.log_msg.v0.LogMsg) returns (Empty) {} rpc ReadMessages(Empty) returns (stream rerun.log_msg.v0.LogMsg) {} } From 1e8b00ebee3df30d0aacb7e4458e196671616250 Mon Sep 17 00:00:00 2001 From: jprochazk Date: Thu, 9 Jan 2025 16:00:07 +0100 Subject: [PATCH 20/25] Some refactoring --- crates/store/re_grpc_server/src/lib.rs | 78 ++++++++++++++------------ 1 file changed, 43 insertions(+), 35 deletions(-) diff --git a/crates/store/re_grpc_server/src/lib.rs b/crates/store/re_grpc_server/src/lib.rs index 2bdb91779a22..2d508b19c96e 100644 --- a/crates/store/re_grpc_server/src/lib.rs +++ b/crates/store/re_grpc_server/src/lib.rs @@ -34,13 +34,13 @@ struct QueueState { event_rx: mpsc::Receiver, /// Messages stored in order of arrival, and garbage collected if the server hits the memory limit. - temporal_message_queue: VecDeque, + ordered_message_queue: VecDeque, /// Total size of `temporal_message_queue` in bytes. temporal_message_bytes: u64, /// Messages potentially out of order with the rest of the message stream. These are never garbage collected. - static_message_queue: VecDeque, + persistent_message_queue: VecDeque, } impl QueueState { @@ -51,9 +51,9 @@ impl QueueState { // We just want enough capacity to handle bursts of messages. broadcast_tx: broadcast::channel(1024).0, event_rx, - temporal_message_queue: Default::default(), + ordered_message_queue: Default::default(), temporal_message_bytes: 0, - static_message_queue: Default::default(), + persistent_message_queue: Default::default(), } } @@ -77,10 +77,10 @@ impl QueueState { channel .send(( // static messages come first - self.static_message_queue + self.persistent_message_queue .iter() .cloned() - .chain(self.temporal_message_queue.iter().cloned()) + .chain(self.ordered_message_queue.iter().cloned()) .collect(), self.broadcast_tx.subscribe(), )) @@ -109,10 +109,10 @@ impl QueueState { Msg::ArrowMsg(..) | Msg::BlueprintActivationCommand(..) => { let approx_size_bytes = message_size(&msg); self.temporal_message_bytes += approx_size_bytes; - self.temporal_message_queue.push_back(msg); + self.ordered_message_queue.push_back(msg); } Msg::SetStoreInfo(..) => { - self.static_message_queue.push_back(msg); + self.persistent_message_queue.push_back(msg); } } } @@ -120,35 +120,43 @@ impl QueueState { fn gc_if_using_too_much_ram(&mut self) { re_tracing::profile_function!(); - if let Some(max_bytes) = self.server_memory_limit.max_bytes { - let max_bytes = max_bytes as u64; - if max_bytes < self.temporal_message_bytes { - re_tracing::profile_scope!("Drop messages"); - re_log::info_once!( - "Memory limit ({}) exceeded. Dropping old log messages from the server. Clients connecting after this will not see the full history.", - re_format::format_bytes(max_bytes as _) - ); - - let bytes_to_free = self.temporal_message_bytes - max_bytes; - - let mut bytes_dropped = 0; - let mut messages_dropped = 0; - - while bytes_dropped < bytes_to_free { - // only drop messages from temporal queue - if let Some(msg) = self.temporal_message_queue.pop_front() { - bytes_dropped += message_size(&msg); - messages_dropped += 1; - } else { - break; - } - } + let Some(max_bytes) = self.server_memory_limit.max_bytes else { + // Unlimited memory! + return; + }; + + let max_bytes = max_bytes as u64; + if max_bytes >= self.temporal_message_bytes { + // We're not using too much memory. + return; + }; + + { + re_tracing::profile_scope!("Drop messages"); + re_log::info_once!( + "Memory limit ({}) exceeded. Dropping old log messages from the server. Clients connecting after this will not see the full history.", + re_format::format_bytes(max_bytes as _) + ); + + let bytes_to_free = self.temporal_message_bytes - max_bytes; - re_log::trace!( - "Dropped {} bytes in {messages_dropped} message(s)", - re_format::format_bytes(bytes_dropped as _) - ); + let mut bytes_dropped = 0; + let mut messages_dropped = 0; + + while bytes_dropped < bytes_to_free { + // only drop messages from temporal queue + if let Some(msg) = self.ordered_message_queue.pop_front() { + bytes_dropped += message_size(&msg); + messages_dropped += 1; + } else { + break; + } } + + re_log::trace!( + "Dropped {} bytes in {messages_dropped} message(s)", + re_format::format_bytes(bytes_dropped as _) + ); } } } From 7f5708e942976278d8964a58b9f6adfad64d4ddb Mon Sep 17 00:00:00 2001 From: jprochazk Date: Thu, 9 Jan 2025 16:05:22 +0100 Subject: [PATCH 21/25] Add todo --- crates/utils/re_byte_size/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/utils/re_byte_size/src/lib.rs b/crates/utils/re_byte_size/src/lib.rs index c276fb70d7c3..7d692d79ce49 100644 --- a/crates/utils/re_byte_size/src/lib.rs +++ b/crates/utils/re_byte_size/src/lib.rs @@ -12,6 +12,7 @@ mod tuple_sizes; /// Approximations of stack and heap size for both internal and external types. /// /// Motly used for statistics and triggering events such as garbage collection. +// TODO(#8630): Derive macro for this trait. pub trait SizeBytes { /// Returns the total size of `self` in bytes, accounting for both stack and heap space. #[inline] From 3dbdcfdadcaa12259f4d3e1515aea7ea3ca5aa79 Mon Sep 17 00:00:00 2001 From: jprochazk Date: Thu, 9 Jan 2025 16:19:22 +0100 Subject: [PATCH 22/25] Add todo for removing `LogMsg` --- crates/store/re_log_types/src/lib.rs | 1 + crates/store/re_protos/proto/rerun/v0/log_msg.proto | 1 + 2 files changed, 2 insertions(+) diff --git a/crates/store/re_log_types/src/lib.rs b/crates/store/re_log_types/src/lib.rs index 7a084fa33af2..7cc2768c42c7 100644 --- a/crates/store/re_log_types/src/lib.rs +++ b/crates/store/re_log_types/src/lib.rs @@ -261,6 +261,7 @@ impl BlueprintActivationCommand { #[derive(Clone, Debug, PartialEq)] // `PartialEq` used for tests in another crate #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] #[allow(clippy::large_enum_variant)] +// TODO(#8631): Remove `LogMsg` pub enum LogMsg { /// A new recording has begun. /// diff --git a/crates/store/re_protos/proto/rerun/v0/log_msg.proto b/crates/store/re_protos/proto/rerun/v0/log_msg.proto index cc72205668b1..fef64bcc3dcb 100644 --- a/crates/store/re_protos/proto/rerun/v0/log_msg.proto +++ b/crates/store/re_protos/proto/rerun/v0/log_msg.proto @@ -4,6 +4,7 @@ package rerun.log_msg.v0; import "rerun/v0/common.proto"; +// TODO(#8631): Remove `LogMsg` message LogMsg { oneof msg { // A message that contains a new store info. From b5642d2ac898f08d52a3b79f2cd8c3d263935f00 Mon Sep 17 00:00:00 2001 From: jprochazk Date: Thu, 9 Jan 2025 16:25:36 +0100 Subject: [PATCH 23/25] add comment about `SetStoreInfo` order --- crates/store/re_grpc_server/src/lib.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/store/re_grpc_server/src/lib.rs b/crates/store/re_grpc_server/src/lib.rs index 2d508b19c96e..6a11e86e196e 100644 --- a/crates/store/re_grpc_server/src/lib.rs +++ b/crates/store/re_grpc_server/src/lib.rs @@ -443,8 +443,15 @@ mod tests { // the messages should be echoed to us let actual = read_log_stream(&mut log_stream, messages.len()).await; + assert_eq!(messages, actual); + // While `SetStoreInfo` is sent first in `fake_log_stream`, + // we can observe that it's also received first, + // even though it is actually stored out of order in `persistent_message_queue`. + assert!(matches!(messages[0], LogMsg::SetStoreInfo(..))); + assert!(matches!(actual[0], LogMsg::SetStoreInfo(..))); + completion.finish(); } From 8c1a78e31cf825923df3757694da83b2db51ebd5 Mon Sep 17 00:00:00 2001 From: jprochazk Date: Thu, 9 Jan 2025 16:27:10 +0100 Subject: [PATCH 24/25] Rename --- crates/store/re_grpc_server/src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/store/re_grpc_server/src/lib.rs b/crates/store/re_grpc_server/src/lib.rs index 6a11e86e196e..17011ddc4b11 100644 --- a/crates/store/re_grpc_server/src/lib.rs +++ b/crates/store/re_grpc_server/src/lib.rs @@ -37,7 +37,7 @@ struct QueueState { ordered_message_queue: VecDeque, /// Total size of `temporal_message_queue` in bytes. - temporal_message_bytes: u64, + ordered_message_bytes: u64, /// Messages potentially out of order with the rest of the message stream. These are never garbage collected. persistent_message_queue: VecDeque, @@ -52,7 +52,7 @@ impl QueueState { broadcast_tx: broadcast::channel(1024).0, event_rx, ordered_message_queue: Default::default(), - temporal_message_bytes: 0, + ordered_message_bytes: 0, persistent_message_queue: Default::default(), } } @@ -108,7 +108,7 @@ impl QueueState { // as it's the last message sent by the SDK when submitting a blueprint. Msg::ArrowMsg(..) | Msg::BlueprintActivationCommand(..) => { let approx_size_bytes = message_size(&msg); - self.temporal_message_bytes += approx_size_bytes; + self.ordered_message_bytes += approx_size_bytes; self.ordered_message_queue.push_back(msg); } Msg::SetStoreInfo(..) => { @@ -126,7 +126,7 @@ impl QueueState { }; let max_bytes = max_bytes as u64; - if max_bytes >= self.temporal_message_bytes { + if max_bytes >= self.ordered_message_bytes { // We're not using too much memory. return; }; @@ -138,7 +138,7 @@ impl QueueState { re_format::format_bytes(max_bytes as _) ); - let bytes_to_free = self.temporal_message_bytes - max_bytes; + let bytes_to_free = self.ordered_message_bytes - max_bytes; let mut bytes_dropped = 0; let mut messages_dropped = 0; From 6aaa1033de5d160d6fe92354b68a9f0115ab26fb Mon Sep 17 00:00:00 2001 From: jprochazk Date: Thu, 9 Jan 2025 18:03:04 +0100 Subject: [PATCH 25/25] Remove `Queue`, rename `QueueState` to `EventLoop`, add some comments --- crates/store/re_grpc_server/src/lib.rs | 41 ++++++++----------- .../re_protos/src/v0/rerun.log_msg.v0.rs | 1 + .../re_protos/src/v0/rerun.sdk_comms.v0.rs | 4 ++ 3 files changed, 21 insertions(+), 25 deletions(-) diff --git a/crates/store/re_grpc_server/src/lib.rs b/crates/store/re_grpc_server/src/lib.rs index 17011ddc4b11..020ba2a4559e 100644 --- a/crates/store/re_grpc_server/src/lib.rs +++ b/crates/store/re_grpc_server/src/lib.rs @@ -24,7 +24,10 @@ enum Event { Message(LogMsgProto), } -struct QueueState { +/// Main event loop for the server, which runs in its own task. +/// +/// Handles message history, and broadcasts messages to clients. +struct EventLoop { server_memory_limit: MemoryLimit, /// New messages are broadcast to all clients. @@ -43,7 +46,7 @@ struct QueueState { persistent_message_queue: VecDeque, } -impl QueueState { +impl EventLoop { fn new(server_memory_limit: MemoryLimit, event_rx: mpsc::Receiver) -> Self { Self { server_memory_limit, @@ -165,25 +168,25 @@ fn message_size(msg: &LogMsgProto) -> u64 { msg.total_size_bytes() } -struct Queue { - _task_handle: tokio::task::JoinHandle<()>, +pub struct MessageProxy { + _queue_task_handle: tokio::task::JoinHandle<()>, event_tx: mpsc::Sender, } -impl Queue { - fn spawn(server_memory_limit: MemoryLimit) -> Self { +impl MessageProxy { + pub fn new(server_memory_limit: MemoryLimit) -> Self { // Channel capacity is completely arbitrary. // We just want something large enough to handle bursts of messages. let (event_tx, event_rx) = mpsc::channel(1024); let task_handle = tokio::spawn(async move { - QueueState::new(server_memory_limit, event_rx) + EventLoop::new(server_memory_limit, event_rx) .run_in_place() .await; }); Self { - _task_handle: task_handle, + _queue_task_handle: task_handle, event_tx, } } @@ -192,7 +195,7 @@ impl Queue { self.event_tx.send(Event::Message(msg)).await.ok(); } - async fn new_client_stream(&self) -> MessageStream { + async fn new_client_stream(&self) -> LogMsgStream { let (sender, receiver) = oneshot::channel(); if let Err(err) = self.event_tx.send(Event::NewClient(sender)).await { re_log::error!("Error initializing new client: {err}"); @@ -218,19 +221,7 @@ impl Queue { } } -pub struct MessageProxy { - queue: Queue, -} - -impl MessageProxy { - pub fn new(server_memory_limit: MemoryLimit) -> Self { - Self { - queue: Queue::spawn(server_memory_limit), - } - } -} - -type MessageStream = Pin> + Send>>; +type LogMsgStream = Pin> + Send>>; #[tonic::async_trait] impl message_proxy_server::MessageProxy for MessageProxy { @@ -242,7 +233,7 @@ impl message_proxy_server::MessageProxy for MessageProxy { loop { match stream.message().await { Ok(Some(msg)) => { - self.queue.push(msg).await; + self.push(msg).await; } Ok(None) => { // Connection was closed @@ -258,13 +249,13 @@ impl message_proxy_server::MessageProxy for MessageProxy { Ok(tonic::Response::new(Empty {})) } - type ReadMessagesStream = MessageStream; + type ReadMessagesStream = LogMsgStream; async fn read_messages( &self, _: tonic::Request, ) -> tonic::Result> { - Ok(tonic::Response::new(self.queue.new_client_stream().await)) + Ok(tonic::Response::new(self.new_client_stream().await)) } } diff --git a/crates/store/re_protos/src/v0/rerun.log_msg.v0.rs b/crates/store/re_protos/src/v0/rerun.log_msg.v0.rs index 19744af04354..88dcae5534a3 100644 --- a/crates/store/re_protos/src/v0/rerun.log_msg.v0.rs +++ b/crates/store/re_protos/src/v0/rerun.log_msg.v0.rs @@ -1,4 +1,5 @@ // This file is @generated by prost-build. +/// TODO(#8631): Remove `LogMsg` #[derive(Clone, PartialEq, ::prost::Message)] pub struct LogMsg { #[prost(oneof = "log_msg::Msg", tags = "1, 2, 3")] diff --git a/crates/store/re_protos/src/v0/rerun.sdk_comms.v0.rs b/crates/store/re_protos/src/v0/rerun.sdk_comms.v0.rs index aa77fa3666e9..ac4c4e6082f4 100644 --- a/crates/store/re_protos/src/v0/rerun.sdk_comms.v0.rs +++ b/crates/store/re_protos/src/v0/rerun.sdk_comms.v0.rs @@ -99,6 +99,8 @@ pub mod message_proxy_client { self.inner = self.inner.max_encoding_message_size(limit); self } + /// TODO(jan): Would it be more efficient to send a "message batch" instead of individual messages? + /// It may allow us to amortize the overhead of the gRPC protocol. pub async fn write_messages( &mut self, request: impl tonic::IntoStreamingRequest< @@ -155,6 +157,8 @@ pub mod message_proxy_server { /// Generated trait containing gRPC methods that should be implemented for use with MessageProxyServer. #[async_trait] pub trait MessageProxy: std::marker::Send + std::marker::Sync + 'static { + /// TODO(jan): Would it be more efficient to send a "message batch" instead of individual messages? + /// It may allow us to amortize the overhead of the gRPC protocol. async fn write_messages( &self, request: tonic::Request>,