diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 1f07bcdb65ff..ccfe5dcc570d 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -188,6 +188,7 @@ Update instructions: | re_data_loader | Handles loading of Rerun data from file using data loader plugins | | re_data_source | Handles loading of Rerun data from different sources | | re_grpc_client | Communicate with the Rerun Data Platform over gRPC | +| re_grpc_server | Host an in-memory Storage Node | | re_sdk_comms | TCP communication between Rerun SDK and Rerun Server | | re_web_viewer_server | Serves the Rerun web viewer (Wasm and HTML) over HTTP | | re_ws_comms | WebSocket communication library (encoding, decoding, client, server) between a Rerun server and Viewer | diff --git a/Cargo.lock b/Cargo.lock index 386a805aaea9..51d350994063 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5616,6 +5616,7 @@ dependencies = [ name = "re_build_info" version = "0.22.0-alpha.1+dev" dependencies = [ + "re_byte_size", "serde", ] @@ -5639,7 +5640,6 @@ dependencies = [ "arrow", "half", "re_arrow2", - "re_tuid", "smallvec", ] @@ -6004,6 +6004,27 @@ dependencies = [ "wasm-bindgen-futures", ] +[[package]] +name = "re_grpc_server" +version = "0.22.0-alpha.1+dev" +dependencies = [ + "re_build_info", + "re_byte_size", + "re_chunk", + "re_format", + "re_log", + "re_log_encoding", + "re_log_types", + "re_memory", + "re_protos", + "re_tracing", + "re_types", + "tokio", + "tokio-stream", + "tokio-util", + "tonic", +] + [[package]] name = "re_int_histogram" version = "0.22.0-alpha.1+dev" @@ -6147,6 +6168,7 @@ name = "re_protos" version = "0.22.0-alpha.1+dev" dependencies = [ "prost", + "re_byte_size", "thiserror 1.0.65", "tonic", "tonic-web-wasm-client", @@ -6444,6 +6466,7 @@ dependencies = [ "document-features", "getrandom", "once_cell", + "re_byte_size", "re_protos", "serde", "web-time", @@ -8821,6 +8844,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 5e6e52eaa238..28e8da64fc1f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -280,6 +280,7 @@ tinyvec = { version = "1.6", features = ["alloc", "rustc_1_55"] } tobj = "4.0" tokio = { version = "1.40.0", default-features = false } tokio-stream = "0.1.16" +tokio-util = { version = "0.7.12", default-features = false } toml = { version = "0.8.10", default-features = false } tonic = { version = "0.12.3", default-features = false } tonic-build = { version = "0.12.3", default-features = false } diff --git a/crates/build/re_build_info/Cargo.toml b/crates/build/re_build_info/Cargo.toml index a37d63ef9c54..bbd38124d7f0 100644 --- a/crates/build/re_build_info/Cargo.toml +++ b/crates/build/re_build_info/Cargo.toml @@ -27,6 +27,7 @@ serde = ["dep:serde"] [dependencies] +re_byte_size.workspace = true # Optional dependencies: serde = { workspace = true, optional = true, features = ["derive", "rc"] } diff --git a/crates/build/re_build_info/src/crate_version.rs b/crates/build/re_build_info/src/crate_version.rs index a886c4c828b1..e31080e8b4f9 100644 --- a/crates/build/re_build_info/src/crate_version.rs +++ b/crates/build/re_build_info/src/crate_version.rs @@ -470,6 +470,13 @@ impl std::fmt::Display for CrateVersion { } } +impl re_byte_size::SizeBytes for CrateVersion { + #[inline] + fn heap_size_bytes(&self) -> u64 { + 0 + } +} + #[test] fn test_parse_version() { macro_rules! assert_parse_ok { diff --git a/crates/build/re_protos_builder/src/bin/build_re_remote_store_types.rs b/crates/build/re_protos_builder/src/bin/build_re_remote_store_types.rs index 9ed06af18e62..d31d0e04c532 100644 --- a/crates/build/re_protos_builder/src/bin/build_re_remote_store_types.rs +++ b/crates/build/re_protos_builder/src/bin/build_re_remote_store_types.rs @@ -8,7 +8,7 @@ use camino::Utf8Path; const PROTOS_DIR: &str = "crates/store/re_protos/proto"; -const INPUT_V0: &[&str] = &["rerun/v0/remote_store.proto", "rerun/v0/log_msg.proto"]; +const INPUT_V0_DIR: &str = "rerun/v0"; const OUTPUT_V0_RUST_DIR: &str = "crates/store/re_protos/src/v0"; fn main() { @@ -28,16 +28,27 @@ fn main() { let definitions_dir_path = workspace_dir.join(PROTOS_DIR); let rust_generated_output_dir_path = workspace_dir.join(OUTPUT_V0_RUST_DIR); + let proto_paths = std::fs::read_dir(definitions_dir_path.join(INPUT_V0_DIR)) + .unwrap() + .map(|v| { + v.unwrap() + .path() + .strip_prefix(&definitions_dir_path) + .unwrap() + .to_owned() + }) + .collect::>(); re_log::info!( definitions=?definitions_dir_path, output=?rust_generated_output_dir_path, + protos=?proto_paths, "Running codegen for storage node types", ); re_protos_builder::generate_rust_code( definitions_dir_path, - INPUT_V0, + &proto_paths, rust_generated_output_dir_path, ); } diff --git a/crates/store/re_grpc_client/Cargo.toml b/crates/store/re_grpc_client/Cargo.toml index efb330fc306e..7fcc26102500 100644 --- a/crates/store/re_grpc_client/Cargo.toml +++ b/crates/store/re_grpc_client/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "re_grpc_client" authors.workspace = true -description = "gRCP client for the Rerun Data Platform gRPC prototocl" +description = "gRCP client for the Rerun Data Platform gRPC protocol" edition.workspace = true homepage.workspace = true include.workspace = true diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index 731f8cbcedbc..c1514ece5c5c 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -91,7 +91,7 @@ const CATALOG_BP_STORE_ID: &str = "catalog_blueprint"; const CATALOG_REC_STORE_ID: &str = "catalog"; const CATALOG_APPLICATION_ID: &str = "redap_catalog"; -/// Stream an rrd file or metadsasta catalog over gRPC from a Rerun Data Platform server. +/// Stream an rrd file or metadata catalog over gRPC from a Rerun Data Platform server. /// /// `on_msg` can be used to wake up the UI thread on Wasm. pub fn stream_from_redap( diff --git a/crates/store/re_grpc_server/Cargo.toml b/crates/store/re_grpc_server/Cargo.toml new file mode 100644 index 000000000000..ab949bf17fe8 --- /dev/null +++ b/crates/store/re_grpc_server/Cargo.toml @@ -0,0 +1,39 @@ +[package] +name = "re_grpc_server" +authors.workspace = true +description = "gRCP server for the Rerun Data Platform gRPC protocol" +edition.workspace = true +homepage.workspace = true +include.workspace = true +license.workspace = true +publish = true +readme = "README.md" +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[lints] +workspace = true + +[package.metadata.docs.rs] +all-features = true + + +[dependencies] +re_build_info.workspace = true +re_byte_size.workspace = true +re_chunk.workspace = true +re_format.workspace = true +re_log.workspace = true +re_log_encoding = { workspace = true, features = ["encoder", "decoder"] } +re_log_types.workspace = true +re_memory.workspace = true +re_protos.workspace = true +re_tracing.workspace = true +re_types.workspace = true + +# External +tokio.workspace = true +tokio-stream = { workspace = true, features = ["sync"] } +tokio-util.workspace = true +tonic = { workspace = true, default-features = false, features = ["transport"] } diff --git a/crates/store/re_grpc_server/README.md b/crates/store/re_grpc_server/README.md new file mode 100644 index 000000000000..3e60ed9287d0 --- /dev/null +++ b/crates/store/re_grpc_server/README.md @@ -0,0 +1,10 @@ +# re_grpc_server + +Part of the [`rerun`](https://github.com/rerun-io/rerun) family of crates. + +[![Latest version](https://img.shields.io/crates/v/re_grpc_server.svg?speculative-link)](https://crates.io/crates/re_grpc_server?speculative-link) +[![Documentation](https://docs.rs/re_grpc_server/badge.svg?speculative-link)](https://docs.rs/re_grpc_server?speculative-link) +![MIT](https://img.shields.io/badge/license-MIT-blue.svg) +![Apache](https://img.shields.io/badge/license-Apache-blue.svg) + +Server implementation of an in-memory Storage Node. diff --git a/crates/store/re_grpc_server/src/lib.rs b/crates/store/re_grpc_server/src/lib.rs new file mode 100644 index 000000000000..020ba2a4559e --- /dev/null +++ b/crates/store/re_grpc_server/src/lib.rs @@ -0,0 +1,620 @@ +//! Server implementation of an in-memory Storage Node. + +use std::collections::VecDeque; +use std::pin::Pin; + +use re_byte_size::SizeBytes; +use re_memory::MemoryLimit; +use re_protos::{ + log_msg::v0::LogMsg as LogMsgProto, + sdk_comms::v0::{message_proxy_server, Empty}, +}; +use tokio::sync::broadcast; +use tokio::sync::mpsc; +use tokio::sync::oneshot; +use tokio_stream::wrappers::BroadcastStream; +use tokio_stream::Stream; +use tokio_stream::StreamExt as _; + +enum Event { + /// New client connected, requesting full history and subscribing to new messages. + NewClient(oneshot::Sender<(Vec, broadcast::Receiver)>), + + /// A client sent a message. + Message(LogMsgProto), +} + +/// Main event loop for the server, which runs in its own task. +/// +/// Handles message history, and broadcasts messages to clients. +struct EventLoop { + server_memory_limit: MemoryLimit, + + /// New messages are broadcast to all clients. + broadcast_tx: broadcast::Sender, + + /// Channel for incoming events. + event_rx: mpsc::Receiver, + + /// Messages stored in order of arrival, and garbage collected if the server hits the memory limit. + ordered_message_queue: VecDeque, + + /// Total size of `temporal_message_queue` in bytes. + ordered_message_bytes: u64, + + /// Messages potentially out of order with the rest of the message stream. These are never garbage collected. + persistent_message_queue: VecDeque, +} + +impl EventLoop { + fn new(server_memory_limit: MemoryLimit, event_rx: mpsc::Receiver) -> Self { + Self { + server_memory_limit, + // Channel capacity is completely arbitrary. + // We just want enough capacity to handle bursts of messages. + broadcast_tx: broadcast::channel(1024).0, + event_rx, + ordered_message_queue: Default::default(), + ordered_message_bytes: 0, + persistent_message_queue: Default::default(), + } + } + + async fn run_in_place(mut self) { + loop { + let Some(event) = self.event_rx.recv().await else { + break; + }; + + match event { + Event::NewClient(channel) => self.handle_new_client(channel), + Event::Message(msg) => self.handle_msg(msg), + } + } + } + + fn handle_new_client( + &self, + channel: oneshot::Sender<(Vec, broadcast::Receiver)>, + ) { + channel + .send(( + // static messages come first + self.persistent_message_queue + .iter() + .cloned() + .chain(self.ordered_message_queue.iter().cloned()) + .collect(), + self.broadcast_tx.subscribe(), + )) + .ok(); + } + + fn handle_msg(&mut self, msg: LogMsgProto) { + self.broadcast_tx.send(msg.clone()).ok(); + + self.gc_if_using_too_much_ram(); + + let Some(inner) = &msg.msg else { + re_log::error!( + "{}", + re_protos::missing_field!(re_protos::log_msg::v0::LogMsg, "msg") + ); + return; + }; + + use re_protos::log_msg::v0::log_msg::Msg; + match inner { + // We consider `BlueprintActivationCommand` a temporal message, + // because it is sensitive to order, and it is safe to garbage collect + // if all the messages that came before it were also garbage collected, + // as it's the last message sent by the SDK when submitting a blueprint. + Msg::ArrowMsg(..) | Msg::BlueprintActivationCommand(..) => { + let approx_size_bytes = message_size(&msg); + self.ordered_message_bytes += approx_size_bytes; + self.ordered_message_queue.push_back(msg); + } + Msg::SetStoreInfo(..) => { + self.persistent_message_queue.push_back(msg); + } + } + } + + fn gc_if_using_too_much_ram(&mut self) { + re_tracing::profile_function!(); + + let Some(max_bytes) = self.server_memory_limit.max_bytes else { + // Unlimited memory! + return; + }; + + let max_bytes = max_bytes as u64; + if max_bytes >= self.ordered_message_bytes { + // We're not using too much memory. + return; + }; + + { + re_tracing::profile_scope!("Drop messages"); + re_log::info_once!( + "Memory limit ({}) exceeded. Dropping old log messages from the server. Clients connecting after this will not see the full history.", + re_format::format_bytes(max_bytes as _) + ); + + let bytes_to_free = self.ordered_message_bytes - max_bytes; + + let mut bytes_dropped = 0; + let mut messages_dropped = 0; + + while bytes_dropped < bytes_to_free { + // only drop messages from temporal queue + if let Some(msg) = self.ordered_message_queue.pop_front() { + bytes_dropped += message_size(&msg); + messages_dropped += 1; + } else { + break; + } + } + + re_log::trace!( + "Dropped {} bytes in {messages_dropped} message(s)", + re_format::format_bytes(bytes_dropped as _) + ); + } + } +} + +fn message_size(msg: &LogMsgProto) -> u64 { + msg.total_size_bytes() +} + +pub struct MessageProxy { + _queue_task_handle: tokio::task::JoinHandle<()>, + event_tx: mpsc::Sender, +} + +impl MessageProxy { + pub fn new(server_memory_limit: MemoryLimit) -> Self { + // Channel capacity is completely arbitrary. + // We just want something large enough to handle bursts of messages. + let (event_tx, event_rx) = mpsc::channel(1024); + + let task_handle = tokio::spawn(async move { + EventLoop::new(server_memory_limit, event_rx) + .run_in_place() + .await; + }); + + Self { + _queue_task_handle: task_handle, + event_tx, + } + } + + async fn push(&self, msg: LogMsgProto) { + self.event_tx.send(Event::Message(msg)).await.ok(); + } + + async fn new_client_stream(&self) -> LogMsgStream { + let (sender, receiver) = oneshot::channel(); + if let Err(err) = self.event_tx.send(Event::NewClient(sender)).await { + re_log::error!("Error initializing new client: {err}"); + return Box::pin(tokio_stream::empty()); + }; + let (history, channel) = match receiver.await { + Ok(v) => v, + Err(err) => { + re_log::error!("Error initializing new client: {err}"); + return Box::pin(tokio_stream::empty()); + } + }; + + let history = tokio_stream::iter(history.into_iter().map(Ok)); + let channel = BroadcastStream::new(channel).map(|result| { + result.map_err(|err| { + re_log::error!("Error reading message from broadcast channel: {err}"); + tonic::Status::internal("internal channel error") + }) + }); + + Box::pin(history.merge(channel)) + } +} + +type LogMsgStream = Pin> + Send>>; + +#[tonic::async_trait] +impl message_proxy_server::MessageProxy for MessageProxy { + async fn write_messages( + &self, + request: tonic::Request>, + ) -> tonic::Result> { + let mut stream = request.into_inner(); + loop { + match stream.message().await { + Ok(Some(msg)) => { + self.push(msg).await; + } + Ok(None) => { + // Connection was closed + break; + } + Err(err) => { + re_log::error!("Error while receiving messages: {err}"); + break; + } + } + } + + Ok(tonic::Response::new(Empty {})) + } + + type ReadMessagesStream = LogMsgStream; + + async fn read_messages( + &self, + _: tonic::Request, + ) -> tonic::Result> { + Ok(tonic::Response::new(self.new_client_stream().await)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use re_build_info::CrateVersion; + use re_chunk::RowId; + use re_log_encoding::protobuf_conversions::{log_msg_from_proto, log_msg_to_proto}; + use re_log_encoding::Compression; + use re_log_types::{ + ApplicationId, LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource, Time, + }; + use re_protos::sdk_comms::v0::{ + message_proxy_client::MessageProxyClient, message_proxy_server::MessageProxyServer, + }; + use std::net::SocketAddr; + use std::sync::Arc; + use std::time::Duration; + use tokio::net::TcpListener; + use tokio_util::sync::CancellationToken; + use tonic::transport::server::TcpIncoming; + use tonic::transport::Channel; + use tonic::transport::Endpoint; + + #[derive(Clone)] + struct Completion(Arc); + + impl Drop for Completion { + fn drop(&mut self) { + self.finish(); + } + } + + impl Completion { + fn new() -> Self { + Self(Arc::new(CancellationToken::new())) + } + + fn finish(&self) { + self.0.cancel(); + } + + async fn wait(&self) { + self.0.cancelled().await; + } + } + + /// Generates `n` log messages wrapped in a `SetStoreInfo` at the start and `BlueprintActivationCommand` at the end, + /// to exercise message ordering. + fn fake_log_stream(n: usize) -> Vec { + let store_id = StoreId::random(StoreKind::Blueprint); + + let mut messages = Vec::new(); + messages.push(LogMsg::SetStoreInfo(SetStoreInfo { + row_id: *RowId::new(), + info: StoreInfo { + application_id: ApplicationId("test".to_owned()), + store_id: store_id.clone(), + cloned_from: None, + is_official_example: true, + started: Time::now(), + store_source: StoreSource::RustSdk { + rustc_version: String::new(), + llvm_version: String::new(), + }, + store_version: Some(CrateVersion::LOCAL), + }, + })); + for _ in 0..n { + messages.push(LogMsg::ArrowMsg( + store_id.clone(), + re_chunk::Chunk::builder("test_entity".into()) + .with_archetype( + re_chunk::RowId::new(), + re_log_types::TimePoint::default().with( + re_log_types::Timeline::new_sequence("blueprint"), + re_log_types::TimeInt::from_milliseconds(re_log_types::NonMinI64::MIN), + ), + &re_types::blueprint::archetypes::Background::new( + re_types::blueprint::components::BackgroundKind::SolidColor, + ) + .with_color([255, 0, 0]), + ) + .build() + .unwrap() + .to_arrow_msg() + .unwrap(), + )); + } + messages.push(LogMsg::BlueprintActivationCommand( + re_log_types::BlueprintActivationCommand { + blueprint_id: store_id, + make_active: true, + make_default: true, + }, + )); + + messages + } + + async fn setup() -> (Completion, SocketAddr) { + setup_with_memory_limit(MemoryLimit::UNLIMITED).await + } + + async fn setup_with_memory_limit(memory_limit: MemoryLimit) -> (Completion, SocketAddr) { + let completion = Completion::new(); + + let tcp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = tcp_listener.local_addr().unwrap(); + + tokio::spawn({ + let completion = completion.clone(); + async move { + tonic::transport::Server::builder() + .add_service(MessageProxyServer::new(super::MessageProxy::new( + memory_limit, + ))) + .serve_with_incoming_shutdown( + TcpIncoming::from_listener(tcp_listener, true, None).unwrap(), + completion.wait(), + ) + .await + .unwrap(); + } + }); + + (completion, addr) + } + + async fn make_client(addr: SocketAddr) -> MessageProxyClient { + MessageProxyClient::new( + Endpoint::from_shared(format!("http://{addr}")) + .unwrap() + .connect() + .await + .unwrap(), + ) + } + + async fn read_log_stream( + log_stream: &mut tonic::Response>, + n: usize, + ) -> Vec { + let mut stream_ref = log_stream + .get_mut() + .map(|result| log_msg_from_proto(result.unwrap()).unwrap()); + + let mut messages = Vec::new(); + for _ in 0..n { + messages.push(stream_ref.next().await.unwrap()); + } + messages + } + + #[tokio::test] + async fn pubsub_basic() { + let (completion, addr) = setup().await; + let mut client = make_client(addr).await; // We use the same client for both producing and consuming + let messages = fake_log_stream(3); + + // start reading + let mut log_stream = client.read_messages(Empty {}).await.unwrap(); + + // write a few messages + client + .write_messages(tokio_stream::iter( + messages + .clone() + .into_iter() + .map(|msg| log_msg_to_proto(msg, Compression::Off).unwrap()), + )) + .await + .unwrap(); + + // the messages should be echoed to us + let actual = read_log_stream(&mut log_stream, messages.len()).await; + + assert_eq!(messages, actual); + + // While `SetStoreInfo` is sent first in `fake_log_stream`, + // we can observe that it's also received first, + // even though it is actually stored out of order in `persistent_message_queue`. + assert!(matches!(messages[0], LogMsg::SetStoreInfo(..))); + assert!(matches!(actual[0], LogMsg::SetStoreInfo(..))); + + completion.finish(); + } + + #[tokio::test] + async fn pubsub_history() { + let (completion, addr) = setup().await; + let mut client = make_client(addr).await; // We use the same client for both producing and consuming + let messages = fake_log_stream(3); + + // don't read anything yet - these messages should be sent to us as part of history when we call `read_messages` later + + // Write a few messages: + client + .write_messages(tokio_stream::iter( + messages + .clone() + .into_iter() + .map(|msg| log_msg_to_proto(msg, Compression::Off).unwrap()), + )) + .await + .unwrap(); + + // Start reading now - we should receive full history at this point: + let mut log_stream = client.read_messages(Empty {}).await.unwrap(); + + let actual = read_log_stream(&mut log_stream, messages.len()).await; + assert_eq!(messages, actual); + + completion.finish(); + } + + #[tokio::test] + async fn one_producer_many_consumers() { + let (completion, addr) = setup().await; + let mut producer = make_client(addr).await; // We use separate clients for producing and consuming + let mut consumers = vec![make_client(addr).await, make_client(addr).await]; + let messages = fake_log_stream(3); + + // Initialize multiple read streams: + let mut log_streams = vec![]; + for consumer in &mut consumers { + log_streams.push(consumer.read_messages(Empty {}).await.unwrap()); + } + + // Write a few messages using our single producer: + producer + .write_messages(tokio_stream::iter( + messages + .clone() + .into_iter() + .map(|msg| log_msg_to_proto(msg, Compression::Off).unwrap()), + )) + .await + .unwrap(); + + // Each consumer should've received them: + for log_stream in &mut log_streams { + let actual = read_log_stream(log_stream, messages.len()).await; + assert_eq!(messages, actual); + } + + completion.finish(); + } + + #[tokio::test] + async fn many_producers_many_consumers() { + let (completion, addr) = setup().await; + let mut producers = vec![make_client(addr).await, make_client(addr).await]; + let mut consumers = vec![make_client(addr).await, make_client(addr).await]; + let messages = fake_log_stream(3); + + // Initialize multiple read streams: + let mut log_streams = vec![]; + for consumer in &mut consumers { + log_streams.push(consumer.read_messages(Empty {}).await.unwrap()); + } + + // Write a few messages using each producer: + for producer in &mut producers { + producer + .write_messages(tokio_stream::iter( + messages + .clone() + .into_iter() + .map(|msg| log_msg_to_proto(msg, Compression::Off).unwrap()), + )) + .await + .unwrap(); + } + + let expected = [messages.clone(), messages.clone()].concat(); + + // Each consumer should've received one set of messages from each producer. + // Note that in this test we also guarantee the order of messages across producers, + // due to the `write_messages` calls being sequential. + + for log_stream in &mut log_streams { + let actual = read_log_stream(log_stream, expected.len()).await; + assert_eq!(expected, actual); + } + + completion.finish(); + } + + #[tokio::test] + async fn memory_limit_drops_messages() { + // Use an absurdly low memory limit to force all messages to be dropped immediately from history + let (completion, addr) = setup_with_memory_limit(MemoryLimit::from_bytes(1)).await; + let mut client = make_client(addr).await; + let messages = fake_log_stream(3); + + // Write some messages + client + .write_messages(tokio_stream::iter( + messages + .clone() + .into_iter() + .map(|msg| log_msg_to_proto(msg, Compression::Off).unwrap()), + )) + .await + .unwrap(); + + // Start reading + let mut log_stream = client.read_messages(Empty {}).await.unwrap(); + let mut actual = vec![]; + loop { + let timeout_stream = log_stream.get_mut().timeout(Duration::from_millis(100)); + tokio::pin!(timeout_stream); + let timeout_result = timeout_stream.try_next().await; + match timeout_result { + Ok(Some(value)) => { + actual.push(log_msg_from_proto(value.unwrap()).unwrap()); + } + + // Stream closed | Timed out + Ok(None) | Err(_) => break, + } + } + + // The GC runs _before_ a message is stored, so we should see the static message, and the last message sent. + assert_eq!(actual.len(), 2); + assert_eq!(&actual[0], &messages[0]); + assert_eq!(&actual[1], messages.last().unwrap()); + + completion.finish(); + } + + #[tokio::test] + async fn memory_limit_does_not_interrupt_stream() { + // Use an absurdly low memory limit to force all messages to be dropped immediately from history + let (completion, addr) = setup_with_memory_limit(MemoryLimit::from_bytes(1)).await; + let mut client = make_client(addr).await; // We use the same client for both producing and consuming + let messages = fake_log_stream(3); + + // Start reading + let mut log_stream = client.read_messages(Empty {}).await.unwrap(); + + // Write a few messages + client + .write_messages(tokio_stream::iter( + messages + .clone() + .into_iter() + .map(|msg| log_msg_to_proto(msg, Compression::Off).unwrap()), + )) + .await + .unwrap(); + + // The messages should be echoed to us, even though none of them will be stored in history + let actual = read_log_stream(&mut log_stream, messages.len()).await; + assert_eq!(messages, actual); + + completion.finish(); + } +} diff --git a/crates/store/re_log_encoding/src/codec/mod.rs b/crates/store/re_log_encoding/src/codec/mod.rs index 7c51b4b690e0..647c2fc156e0 100644 --- a/crates/store/re_log_encoding/src/codec/mod.rs +++ b/crates/store/re_log_encoding/src/codec/mod.rs @@ -1,4 +1,4 @@ -mod arrow; +pub(crate) mod arrow; pub mod file; pub mod wire; diff --git a/crates/store/re_log_encoding/src/lib.rs b/crates/store/re_log_encoding/src/lib.rs index d35b8e331342..709b96fcdb4f 100644 --- a/crates/store/re_log_encoding/src/lib.rs +++ b/crates/store/re_log_encoding/src/lib.rs @@ -8,7 +8,7 @@ pub mod encoder; pub mod codec; -mod protobuf_conversions; +pub mod protobuf_conversions; #[cfg(feature = "encoder")] #[cfg(not(target_arch = "wasm32"))] diff --git a/crates/store/re_log_encoding/src/protobuf_conversions.rs b/crates/store/re_log_encoding/src/protobuf_conversions.rs index 9b613a1e3937..4ad6580d2950 100644 --- a/crates/store/re_log_encoding/src/protobuf_conversions.rs +++ b/crates/store/re_log_encoding/src/protobuf_conversions.rs @@ -1,3 +1,7 @@ +use re_protos::missing_field; + +use crate::codec::CodecError; + impl From for crate::Compression { fn from(value: re_protos::log_msg::v0::Compression) -> Self { match value { @@ -15,3 +19,101 @@ impl From for re_protos::log_msg::v0::Compression { } } } + +#[cfg(feature = "decoder")] +pub fn log_msg_from_proto( + message: re_protos::log_msg::v0::LogMsg, +) -> Result { + use crate::codec::arrow::decode_arrow; + use crate::decoder::DecodeError; + use re_protos::log_msg::v0::{log_msg::Msg, Encoding}; + + match message.msg { + Some(Msg::SetStoreInfo(set_store_info)) => Ok(re_log_types::LogMsg::SetStoreInfo( + set_store_info.try_into()?, + )), + Some(Msg::ArrowMsg(arrow_msg)) => { + if arrow_msg.encoding() != Encoding::ArrowIpc { + return Err(DecodeError::Codec(CodecError::UnsupportedEncoding)); + } + + let (schema, chunk) = decode_arrow( + &arrow_msg.payload, + arrow_msg.uncompressed_size as usize, + arrow_msg.compression().into(), + )?; + + let store_id: re_log_types::StoreId = arrow_msg + .store_id + .ok_or_else(|| missing_field!(re_protos::log_msg::v0::ArrowMsg, "store_id"))? + .into(); + + let chunk = re_chunk::Chunk::from_transport(&re_chunk::TransportChunk { + schema, + data: chunk, + })?; + + Ok(re_log_types::LogMsg::ArrowMsg( + store_id, + chunk.to_arrow_msg()?, + )) + } + Some(Msg::BlueprintActivationCommand(blueprint_activation_command)) => { + Ok(re_log_types::LogMsg::BlueprintActivationCommand( + blueprint_activation_command.try_into()?, + )) + } + None => Err(missing_field!(re_protos::log_msg::v0::LogMsg, "msg").into()), + } +} + +#[cfg(feature = "encoder")] +pub fn log_msg_to_proto( + message: re_log_types::LogMsg, + compression: crate::Compression, +) -> Result { + use crate::codec::arrow::encode_arrow; + use re_protos::log_msg::v0::{ + ArrowMsg, BlueprintActivationCommand, LogMsg as ProtoLogMsg, SetStoreInfo, + }; + + let proto_msg = match message { + re_log_types::LogMsg::SetStoreInfo(set_store_info) => { + let set_store_info: SetStoreInfo = set_store_info.into(); + ProtoLogMsg { + msg: Some(re_protos::log_msg::v0::log_msg::Msg::SetStoreInfo( + set_store_info, + )), + } + } + re_log_types::LogMsg::ArrowMsg(store_id, arrow_msg) => { + let payload = encode_arrow(&arrow_msg.schema, &arrow_msg.chunk, compression)?; + let arrow_msg = ArrowMsg { + store_id: Some(store_id.into()), + compression: match compression { + crate::Compression::Off => re_protos::log_msg::v0::Compression::None as i32, + crate::Compression::LZ4 => re_protos::log_msg::v0::Compression::Lz4 as i32, + }, + uncompressed_size: payload.uncompressed_size as i32, + encoding: re_protos::log_msg::v0::Encoding::ArrowIpc as i32, + payload: payload.data, + }; + ProtoLogMsg { + msg: Some(re_protos::log_msg::v0::log_msg::Msg::ArrowMsg(arrow_msg)), + } + } + re_log_types::LogMsg::BlueprintActivationCommand(blueprint_activation_command) => { + let blueprint_activation_command: BlueprintActivationCommand = + blueprint_activation_command.into(); + ProtoLogMsg { + msg: Some( + re_protos::log_msg::v0::log_msg::Msg::BlueprintActivationCommand( + blueprint_activation_command, + ), + ), + } + } + }; + + Ok(proto_msg) +} diff --git a/crates/store/re_log_types/src/lib.rs b/crates/store/re_log_types/src/lib.rs index 3174228b7750..7cc2768c42c7 100644 --- a/crates/store/re_log_types/src/lib.rs +++ b/crates/store/re_log_types/src/lib.rs @@ -37,6 +37,7 @@ mod protobuf_conversions; use std::sync::Arc; use re_build_info::CrateVersion; +use re_byte_size::SizeBytes; pub use self::arrow_msg::{ArrowChunkReleaseCallback, ArrowMsg}; pub use self::instance::Instance; @@ -260,6 +261,7 @@ impl BlueprintActivationCommand { #[derive(Clone, Debug, PartialEq)] // `PartialEq` used for tests in another crate #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] #[allow(clippy::large_enum_variant)] +// TODO(#8631): Remove `LogMsg` pub enum LogMsg { /// A new recording has begun. /// @@ -635,6 +637,142 @@ pub fn build_frame_nr(frame_nr: impl TryInto) -> (Timeline, TimeInt) { ) } +impl SizeBytes for ApplicationId { + #[inline] + fn heap_size_bytes(&self) -> u64 { + self.0.heap_size_bytes() + } +} + +impl SizeBytes for StoreId { + #[inline] + fn heap_size_bytes(&self) -> u64 { + self.id.heap_size_bytes() + } +} + +impl SizeBytes for PythonVersion { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { + major: _, + minor: _, + patch: _, + suffix, + } = self; + + suffix.heap_size_bytes() + } +} + +impl SizeBytes for FileSource { + #[inline] + fn heap_size_bytes(&self) -> u64 { + match self { + Self::Uri | Self::Sdk | Self::Cli => 0, + Self::DragAndDrop { + recommended_application_id, + recommended_recording_id, + force_store_info, + } + | Self::FileDialog { + recommended_application_id, + recommended_recording_id, + force_store_info, + } => { + recommended_application_id.heap_size_bytes() + + recommended_recording_id.heap_size_bytes() + + force_store_info.heap_size_bytes() + } + } + } +} + +impl SizeBytes for StoreSource { + #[inline] + fn heap_size_bytes(&self) -> u64 { + match self { + Self::Unknown | Self::CSdk | Self::Viewer => 0, + Self::PythonSdk(python_version) => python_version.heap_size_bytes(), + Self::RustSdk { + rustc_version, + llvm_version, + } => rustc_version.heap_size_bytes() + llvm_version.heap_size_bytes(), + Self::File { file_source } => file_source.heap_size_bytes(), + Self::Other(description) => description.heap_size_bytes(), + } + } +} + +impl SizeBytes for StoreInfo { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { + application_id, + store_id, + cloned_from: _, + is_official_example: _, + started: _, + store_source, + store_version, + } = self; + + application_id.heap_size_bytes() + + store_id.heap_size_bytes() + + store_source.heap_size_bytes() + + store_version.heap_size_bytes() + } +} + +impl SizeBytes for SetStoreInfo { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { row_id, info } = self; + + row_id.heap_size_bytes() + info.heap_size_bytes() + } +} + +impl SizeBytes for BlueprintActivationCommand { + #[inline] + fn heap_size_bytes(&self) -> u64 { + 0 + } +} + +impl SizeBytes for ArrowMsg { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { + chunk_id, + timepoint_max, + schema, + chunk, + on_release: _, + } = self; + + chunk_id.heap_size_bytes() + + timepoint_max.heap_size_bytes() + + schema.heap_size_bytes() + + chunk.heap_size_bytes() + } +} + +impl SizeBytes for LogMsg { + #[inline] + fn heap_size_bytes(&self) -> u64 { + match self { + Self::SetStoreInfo(set_store_info) => set_store_info.heap_size_bytes(), + Self::ArrowMsg(store_id, arrow_msg) => { + store_id.heap_size_bytes() + arrow_msg.heap_size_bytes() + } + Self::BlueprintActivationCommand(blueprint_activation_command) => { + blueprint_activation_command.heap_size_bytes() + } + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/store/re_protos/Cargo.toml b/crates/store/re_protos/Cargo.toml index 74a75d7759c4..85b6e3b78164 100644 --- a/crates/store/re_protos/Cargo.toml +++ b/crates/store/re_protos/Cargo.toml @@ -13,6 +13,8 @@ rust-version.workspace = true version.workspace = true [dependencies] +re_byte_size.workspace = true + # External prost.workspace = true thiserror.workspace = true diff --git a/crates/store/re_protos/proto/rerun/v0/log_msg.proto b/crates/store/re_protos/proto/rerun/v0/log_msg.proto index b35abec50a58..fef64bcc3dcb 100644 --- a/crates/store/re_protos/proto/rerun/v0/log_msg.proto +++ b/crates/store/re_protos/proto/rerun/v0/log_msg.proto @@ -4,6 +4,20 @@ package rerun.log_msg.v0; import "rerun/v0/common.proto"; +// TODO(#8631): Remove `LogMsg` +message LogMsg { + oneof msg { + // A message that contains a new store info. + SetStoreInfo set_store_info = 1; + + // A message that contains an Arrow-IPC encoded message. + ArrowMsg arrow_msg = 2; + + // A message that contains a blueprint activation command. + BlueprintActivationCommand blueprint_activation_command = 3; + } +} + // Corresponds to `LogMsg::SetStoreInfo`. Used to identify a recording. message SetStoreInfo { // A time-based UID that is used to determine how a `StoreInfo` fits in the global ordering of events. diff --git a/crates/store/re_protos/proto/rerun/v0/sdk_comms.proto b/crates/store/re_protos/proto/rerun/v0/sdk_comms.proto new file mode 100644 index 000000000000..27fc9c596673 --- /dev/null +++ b/crates/store/re_protos/proto/rerun/v0/sdk_comms.proto @@ -0,0 +1,24 @@ +syntax = "proto3"; + +package rerun.sdk_comms.v0; + +import "rerun/v0/log_msg.proto"; +import "rerun/v0/common.proto"; + +// Simple buffer for messages between SDKs and viewers. +// +// - SDKs produce messages by calling `WriteMessages` +// - Viewers consume messages by calling `ReadMessages` +// +// The buffer is bounded by a memory limit, and will drop the oldest messages when the limit is reached. +// +// Whenever `ReadMessages` is called, all buffered messages are sent in the order they were received. +// The stream will then also yield any new messages passed to `WriteMessages` from any client. +service MessageProxy { + // TODO(jan): Would it be more efficient to send a "message batch" instead of individual messages? + // It may allow us to amortize the overhead of the gRPC protocol. + rpc WriteMessages(stream rerun.log_msg.v0.LogMsg) returns (Empty) {} + rpc ReadMessages(Empty) returns (stream rerun.log_msg.v0.LogMsg) {} +} + +message Empty {} diff --git a/crates/store/re_protos/src/lib.rs b/crates/store/re_protos/src/lib.rs index 545198db999b..e41036697752 100644 --- a/crates/store/re_protos/src/lib.rs +++ b/crates/store/re_protos/src/lib.rs @@ -29,6 +29,9 @@ mod v0 { #[path = "./rerun.remote_store.v0.rs"] pub mod rerun_remote_store_v0; + + #[path = "./rerun.sdk_comms.v0.rs"] + pub mod rerun_sdk_comms_v0; } pub mod common { @@ -50,6 +53,12 @@ pub mod remote_store { } } +pub mod sdk_comms { + pub mod v0 { + pub use crate::v0::rerun_sdk_comms_v0::*; + } +} + #[derive(Debug, thiserror::Error)] pub enum TypeConversionError { #[error("missing required field: {package_name}.{type_name}.{field_name}")] @@ -112,3 +121,159 @@ macro_rules! invalid_field { $crate::TypeConversionError::invalid_field::<$type>($field, &$reason) }; } + +mod sizes { + use re_byte_size::SizeBytes; + + impl SizeBytes for crate::log_msg::v0::LogMsg { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { msg } = self; + + match msg { + Some(msg) => msg.heap_size_bytes(), + None => 0, + } + } + } + + impl SizeBytes for crate::log_msg::v0::log_msg::Msg { + #[inline] + fn heap_size_bytes(&self) -> u64 { + match self { + Self::SetStoreInfo(set_store_info) => set_store_info.heap_size_bytes(), + Self::ArrowMsg(arrow_msg) => arrow_msg.heap_size_bytes(), + Self::BlueprintActivationCommand(blueprint_activation_command) => { + blueprint_activation_command.heap_size_bytes() + } + } + } + } + + impl SizeBytes for crate::log_msg::v0::SetStoreInfo { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { row_id, info } = self; + + row_id.heap_size_bytes() + info.heap_size_bytes() + } + } + + impl SizeBytes for crate::common::v0::Tuid { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { inc, time_ns } = self; + + inc.heap_size_bytes() + time_ns.heap_size_bytes() + } + } + + impl SizeBytes for crate::log_msg::v0::StoreInfo { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { + application_id, + store_id, + is_official_example, + started, + store_source, + store_version, + } = self; + + application_id.heap_size_bytes() + + store_id.heap_size_bytes() + + is_official_example.heap_size_bytes() + + started.heap_size_bytes() + + store_source.heap_size_bytes() + + store_version.heap_size_bytes() + } + } + + impl SizeBytes for crate::common::v0::ApplicationId { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { id } = self; + + id.heap_size_bytes() + } + } + + impl SizeBytes for crate::common::v0::StoreId { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { kind, id } = self; + + kind.heap_size_bytes() + id.heap_size_bytes() + } + } + + impl SizeBytes for crate::common::v0::Time { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { nanos_since_epoch } = self; + + nanos_since_epoch.heap_size_bytes() + } + } + + impl SizeBytes for crate::log_msg::v0::StoreSource { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { kind, extra } = self; + + kind.heap_size_bytes() + extra.heap_size_bytes() + } + } + + impl SizeBytes for crate::log_msg::v0::StoreSourceExtra { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { payload } = self; + + payload.heap_size_bytes() + } + } + + impl SizeBytes for crate::log_msg::v0::StoreVersion { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { crate_version_bits } = self; + + crate_version_bits.heap_size_bytes() + } + } + + impl SizeBytes for crate::log_msg::v0::ArrowMsg { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { + store_id, + compression, + uncompressed_size, + encoding, + payload, + } = self; + + store_id.heap_size_bytes() + + compression.heap_size_bytes() + + uncompressed_size.heap_size_bytes() + + encoding.heap_size_bytes() + + payload.heap_size_bytes() + } + } + + impl SizeBytes for crate::log_msg::v0::BlueprintActivationCommand { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { + blueprint_id, + make_active, + make_default, + } = self; + + blueprint_id.heap_size_bytes() + + make_active.heap_size_bytes() + + make_default.heap_size_bytes() + } + } +} diff --git a/crates/store/re_protos/src/v0/rerun.log_msg.v0.rs b/crates/store/re_protos/src/v0/rerun.log_msg.v0.rs index 3b415876fbc0..88dcae5534a3 100644 --- a/crates/store/re_protos/src/v0/rerun.log_msg.v0.rs +++ b/crates/store/re_protos/src/v0/rerun.log_msg.v0.rs @@ -1,4 +1,35 @@ // This file is @generated by prost-build. +/// TODO(#8631): Remove `LogMsg` +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct LogMsg { + #[prost(oneof = "log_msg::Msg", tags = "1, 2, 3")] + pub msg: ::core::option::Option, +} +/// Nested message and enum types in `LogMsg`. +pub mod log_msg { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Msg { + /// A message that contains a new store info. + #[prost(message, tag = "1")] + SetStoreInfo(super::SetStoreInfo), + /// A message that contains an Arrow-IPC encoded message. + #[prost(message, tag = "2")] + ArrowMsg(super::ArrowMsg), + /// A message that contains a blueprint activation command. + #[prost(message, tag = "3")] + BlueprintActivationCommand(super::BlueprintActivationCommand), + } +} +impl ::prost::Name for LogMsg { + const NAME: &'static str = "LogMsg"; + const PACKAGE: &'static str = "rerun.log_msg.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.log_msg.v0.LogMsg".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.log_msg.v0.LogMsg".into() + } +} /// Corresponds to `LogMsg::SetStoreInfo`. Used to identify a recording. #[derive(Clone, PartialEq, ::prost::Message)] pub struct SetStoreInfo { diff --git a/crates/store/re_protos/src/v0/rerun.sdk_comms.v0.rs b/crates/store/re_protos/src/v0/rerun.sdk_comms.v0.rs new file mode 100644 index 000000000000..ac4c4e6082f4 --- /dev/null +++ b/crates/store/re_protos/src/v0/rerun.sdk_comms.v0.rs @@ -0,0 +1,374 @@ +// This file is @generated by prost-build. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct Empty {} +impl ::prost::Name for Empty { + const NAME: &'static str = "Empty"; + const PACKAGE: &'static str = "rerun.sdk_comms.v0"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.sdk_comms.v0.Empty".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.sdk_comms.v0.Empty".into() + } +} +/// Generated client implementations. +pub mod message_proxy_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value + )] + use tonic::codegen::http::Uri; + use tonic::codegen::*; + /// Simple buffer for messages between SDKs and viewers. + /// + /// - SDKs produce messages by calling `WriteMessages` + /// - Viewers consume messages by calling `ReadMessages` + /// + /// The buffer is bounded by a memory limit, and will drop the oldest messages when the limit is reached. + /// + /// Whenever `ReadMessages` is called, all buffered messages are sent in the order they were received. + /// The stream will then also yield any new messages passed to `WriteMessages` from any client. + #[derive(Debug, Clone)] + pub struct MessageProxyClient { + inner: tonic::client::Grpc, + } + impl MessageProxyClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> MessageProxyClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + >>::Error: + Into + std::marker::Send + std::marker::Sync, + { + MessageProxyClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// TODO(jan): Would it be more efficient to send a "message batch" instead of individual messages? + /// It may allow us to amortize the overhead of the gRPC protocol. + pub async fn write_messages( + &mut self, + request: impl tonic::IntoStreamingRequest< + Message = super::super::super::log_msg::v0::LogMsg, + >, + ) -> std::result::Result, tonic::Status> { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/rerun.sdk_comms.v0.MessageProxy/WriteMessages", + ); + let mut req = request.into_streaming_request(); + req.extensions_mut().insert(GrpcMethod::new( + "rerun.sdk_comms.v0.MessageProxy", + "WriteMessages", + )); + self.inner.client_streaming(req, path, codec).await + } + pub async fn read_messages( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/rerun.sdk_comms.v0.MessageProxy/ReadMessages", + ); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new( + "rerun.sdk_comms.v0.MessageProxy", + "ReadMessages", + )); + self.inner.server_streaming(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod message_proxy_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with MessageProxyServer. + #[async_trait] + pub trait MessageProxy: std::marker::Send + std::marker::Sync + 'static { + /// TODO(jan): Would it be more efficient to send a "message batch" instead of individual messages? + /// It may allow us to amortize the overhead of the gRPC protocol. + async fn write_messages( + &self, + request: tonic::Request>, + ) -> std::result::Result, tonic::Status>; + /// Server streaming response type for the ReadMessages method. + type ReadMessagesStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + std::marker::Send + + 'static; + async fn read_messages( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + } + /// Simple buffer for messages between SDKs and viewers. + /// + /// - SDKs produce messages by calling `WriteMessages` + /// - Viewers consume messages by calling `ReadMessages` + /// + /// The buffer is bounded by a memory limit, and will drop the oldest messages when the limit is reached. + /// + /// Whenever `ReadMessages` is called, all buffered messages are sent in the order they were received. + /// The stream will then also yield any new messages passed to `WriteMessages` from any client. + #[derive(Debug)] + pub struct MessageProxyServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl MessageProxyServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for MessageProxyServer + where + T: MessageProxy, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/rerun.sdk_comms.v0.MessageProxy/WriteMessages" => { + #[allow(non_camel_case_types)] + struct WriteMessagesSvc(pub Arc); + impl + tonic::server::ClientStreamingService< + super::super::super::log_msg::v0::LogMsg, + > for WriteMessagesSvc + { + type Response = super::Empty; + type Future = BoxFuture, tonic::Status>; + fn call( + &mut self, + request: tonic::Request< + tonic::Streaming, + >, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::write_messages(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = WriteMessagesSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.client_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/rerun.sdk_comms.v0.MessageProxy/ReadMessages" => { + #[allow(non_camel_case_types)] + struct ReadMessagesSvc(pub Arc); + impl tonic::server::ServerStreamingService for ReadMessagesSvc { + type Response = super::super::super::log_msg::v0::LogMsg; + type ResponseStream = T::ReadMessagesStream; + type Future = + BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::read_messages(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ReadMessagesSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => Box::pin(async move { + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers.insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers.insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }), + } + } + } + impl Clone for MessageProxyServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "rerun.sdk_comms.v0.MessageProxy"; + impl tonic::server::NamedService for MessageProxyServer { + const NAME: &'static str = SERVICE_NAME; + } +} diff --git a/crates/utils/re_byte_size/Cargo.toml b/crates/utils/re_byte_size/Cargo.toml index 6cf0e1858b4b..21955ff633e5 100644 --- a/crates/utils/re_byte_size/Cargo.toml +++ b/crates/utils/re_byte_size/Cargo.toml @@ -28,4 +28,3 @@ arrow.workspace = true arrow2.workspace = true half.workspace = true smallvec.workspace = true -re_tuid.workspace = true # TODO(emilk): maybe re_tuid should depend on re_byte_size instead? diff --git a/crates/utils/re_byte_size/src/arrow2_sizes.rs b/crates/utils/re_byte_size/src/arrow2_sizes.rs index ddac7760dc41..732c60506025 100644 --- a/crates/utils/re_byte_size/src/arrow2_sizes.rs +++ b/crates/utils/re_byte_size/src/arrow2_sizes.rs @@ -12,7 +12,8 @@ use arrow2::{ Utf8Array, }, bitmap::Bitmap, - datatypes::{DataType, Field, PhysicalType}, + chunk::Chunk, + datatypes::{DataType, Field, PhysicalType, Schema}, types::{NativeType, Offset}, }; @@ -552,3 +553,22 @@ impl SizeBytes for StructArray { estimated_bytes_size(self) as _ } } + +impl SizeBytes for Schema { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { fields, metadata } = self; + + fields.heap_size_bytes() + metadata.heap_size_bytes() + } +} + +impl SizeBytes for Chunk> { + #[inline] + fn heap_size_bytes(&self) -> u64 { + self.arrays() + .iter() + .map(SizeBytes::total_size_bytes) + .sum::() + } +} diff --git a/crates/utils/re_byte_size/src/lib.rs b/crates/utils/re_byte_size/src/lib.rs index 3459d5d21b5b..7d692d79ce49 100644 --- a/crates/utils/re_byte_size/src/lib.rs +++ b/crates/utils/re_byte_size/src/lib.rs @@ -12,6 +12,7 @@ mod tuple_sizes; /// Approximations of stack and heap size for both internal and external types. /// /// Motly used for statistics and triggering events such as garbage collection. +// TODO(#8630): Derive macro for this trait. pub trait SizeBytes { /// Returns the total size of `self` in bytes, accounting for both stack and heap space. #[inline] @@ -38,10 +39,3 @@ pub trait SizeBytes { false } } - -impl SizeBytes for re_tuid::Tuid { - #[inline] - fn heap_size_bytes(&self) -> u64 { - 0 - } -} diff --git a/crates/utils/re_tuid/Cargo.toml b/crates/utils/re_tuid/Cargo.toml index 3591c5e5c8ac..e60a198f6f25 100644 --- a/crates/utils/re_tuid/Cargo.toml +++ b/crates/utils/re_tuid/Cargo.toml @@ -27,6 +27,7 @@ serde = ["dep:serde"] [dependencies] +re_byte_size.workspace = true re_protos.workspace = true document-features.workspace = true diff --git a/crates/utils/re_tuid/src/lib.rs b/crates/utils/re_tuid/src/lib.rs index 5a19ef51df71..bcfdaa86aeee 100644 --- a/crates/utils/re_tuid/src/lib.rs +++ b/crates/utils/re_tuid/src/lib.rs @@ -214,6 +214,13 @@ fn random_u64() -> u64 { u64::from_le_bytes(bytes) } +impl re_byte_size::SizeBytes for Tuid { + #[inline] + fn heap_size_bytes(&self) -> u64 { + 0 + } +} + #[test] fn test_tuid() { use std::collections::{BTreeSet, HashSet};