From b6945d8335c37a23938067bf14a7b8b68564b236 Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Tue, 11 Apr 2023 23:31:19 -0700 Subject: [PATCH] Place a buffer over each sink of a demux to avoid serial message sending --- Cargo.lock | 271 ++++++++++++++++++++++++++- hydroflow_cli_integration/src/lib.rs | 27 ++- 2 files changed, 290 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bc9706ac5626..0328ad657111 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -392,7 +392,7 @@ dependencies = [ "crates-io", "curl", "curl-sys", - "env_logger", + "env_logger 0.10.0", "filetime", "flate2", "fwdansi", @@ -488,6 +488,15 @@ dependencies = [ "jobserver", ] +[[package]] +name = "cfg-expr" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a35b255461940a32985c627ce82900867c61db1659764d3675ea81963f72a4c6" +dependencies = [ + "smallvec", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -752,6 +761,18 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crdts" +version = "7.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e30e1170960eddf5392c448ff5b190a49cfcb28d6c79f789f2b4e30b571f8f8" +dependencies = [ + "num", + "quickcheck", + "serde", + "tiny-keccak", +] + [[package]] name = "criterion" version = "0.4.0" @@ -790,6 +811,20 @@ dependencies = [ "itertools", ] +[[package]] +name = "crossbeam" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" +dependencies = [ + "cfg-if", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.7" @@ -824,6 +859,16 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.15" @@ -833,6 +878,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + [[package]] name = "crypto-common" version = "0.1.6" @@ -981,6 +1032,17 @@ dependencies = [ "subtle", ] +[[package]] +name = "dircpy" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10b6622b9d0dc20c70e74ff24c56493278d7d9299ac8729deb923703616e5a7e" +dependencies = [ + "jwalk", + "log", + "walkdir", +] + [[package]] name = "dirs" version = "3.0.2" @@ -1019,6 +1081,16 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" +[[package]] +name = "env_logger" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" +dependencies = [ + "log", + "regex", +] + [[package]] name = "env_logger" version = "0.10.0" @@ -1513,6 +1585,7 @@ dependencies = [ "clap 4.1.13", "colored", "core_affinity", + "crdts", "criterion", "futures", "getrandom 0.2.8", @@ -1526,16 +1599,19 @@ dependencies = [ "multiplatform_test", "pusherator", "rand 0.8.5", + "rand_distr", "ref-cast", "regex", "rustc-hash", "sealed", "serde", + "serde-big-array", "serde_json", "slotmap", "static_assertions", "textnonce", "time", + "tmq", "tokio", "tokio-stream", "tokio-util", @@ -1787,6 +1863,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jwalk" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dbcda57db8b6dc067e589628b7348639014e793d9e8137d8cf215e8b133a0bd" +dependencies = [ + "crossbeam", + "rayon", +] + [[package]] name = "kernel32-sys" version = "0.2.2" @@ -2038,6 +2124,42 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "num" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b7a8e9be5e039e2ff869df49155f1c06bd01ade2117ec783e56ab0932b67a8f" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + +[[package]] +name = "num-bigint" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f6f7833f2cbf2360a6cfd58cd41a53aa7a90bd4c202f5b1c7dd2ed73c57b2c3" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", + "serde", +] + +[[package]] +name = "num-complex" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "747d632c0c558b87dbabbe6a82f3b4ae03720d0646ac5b7b4dae89394be5f2c5" +dependencies = [ + "num-traits", + "serde", +] + [[package]] name = "num-integer" version = "0.1.45" @@ -2048,6 +2170,30 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-iter" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d03e6c028c5dc5cac6e2dec0efda81fc887605bb3d884578bb6d6bf7514e252" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12ac428b1cb17fce6f731001d307d351ec70a6d202fc2e60f7d4c5e42d8f4f07" +dependencies = [ + "autocfg", + "num-bigint", + "num-integer", + "num-traits", + "serde", +] + [[package]] name = "num-traits" version = "0.2.15" @@ -2491,6 +2637,18 @@ dependencies = [ "serde", ] +[[package]] +name = "quickcheck" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44883e74aa97ad63db83c4bf8ca490f02b2fc02f92575e720c8551e843c945f" +dependencies = [ + "env_logger 0.7.1", + "log", + "rand 0.7.3", + "rand_core 0.5.1", +] + [[package]] name = "quote" version = "1.0.26" @@ -2873,6 +3031,15 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-big-array" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11fc7cc2c76d73e0f27ee52abbd64eec84d46f370c88371120433196934e4b7f" +dependencies = [ + "serde", +] + [[package]] name = "serde-value" version = "0.7.0" @@ -2915,6 +3082,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0efd8caf556a6cebd3b285caf480045fcc1ac04f6bd786b09a6f11af30c4fcf4" +dependencies = [ + "serde", +] + [[package]] name = "sha1" version = "0.10.5" @@ -3097,6 +3273,19 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "system-deps" +version = "6.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "555fc8147af6256f3931a36bb83ad0023240ce9cf2b319dec8236fd1f220b05f" +dependencies = [ + "cfg-expr", + "heck 0.4.1", + "pkg-config", + "toml 0.7.3", + "version-compare", +] + [[package]] name = "tar" version = "0.4.38" @@ -3252,6 +3441,15 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9046af28827ac831479d245eb8afd9522599a3cbb22d6c42a82cb9e4ccdf858" +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tiny_http" version = "0.8.2" @@ -3290,6 +3488,19 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" +[[package]] +name = "tmq" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e83994e39464b6dd4b3d3e1596744d6c1d56b6687fa4aa099c4994c31835763" +dependencies = [ + "futures", + "log", + "thiserror", + "tokio", + "zmq", +] + [[package]] name = "tokio" version = "1.26.0" @@ -3330,6 +3541,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] @@ -3356,6 +3568,18 @@ dependencies = [ "serde", ] +[[package]] +name = "toml" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b403acf6f2bb0859c93c7f0d967cb4a75a7ac552100f9322faf64dc047669b21" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime 0.6.1", + "toml_edit 0.19.8", +] + [[package]] name = "toml_datetime" version = "0.5.1" @@ -3370,6 +3594,9 @@ name = "toml_datetime" version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3ab8ed2edee10b50132aed5f331333428b011c99402b5a534154ed15746f9622" +dependencies = [ + "serde", +] [[package]] name = "toml_edit" @@ -3392,6 +3619,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "239410c8609e8125456927e6707163a3b1fdb40561e4b803bc041f466ccfdc13" dependencies = [ "indexmap", + "serde", + "serde_spanned", "toml_datetime 0.6.1", "winnow", ] @@ -3471,7 +3700,7 @@ dependencies = [ "serde_json", "smallbitvec", "tiny_http", - "toml", + "toml 0.5.11", "tree-sitter", "tree-sitter-config", "tree-sitter-highlight", @@ -3641,6 +3870,12 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" +[[package]] +name = "version-compare" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "579a42fc0b8e0c63b76519a339be31bed574929511fa53c1a3acae26eb258f29" + [[package]] name = "version_check" version = "0.9.4" @@ -3975,6 +4210,16 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "zeromq-src" +version = "0.2.5+4.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53aaa8119f753d047dc5a9dbcc720bf720b466fdf859aaaae8638f3afc1a3564" +dependencies = [ + "cc", + "dircpy", +] + [[package]] name = "zipf" version = "7.0.0" @@ -3983,3 +4228,25 @@ checksum = "835688a7a1b5d2dfaeb5b7e1b4cfb979e7095a70cd1c72fe083f4904ef3e995e" dependencies = [ "rand 0.8.5", ] + +[[package]] +name = "zmq" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd3091dd571fb84a9b3e5e5c6a807d186c411c812c8618786c3c30e5349234e7" +dependencies = [ + "bitflags", + "libc", + "zmq-sys", +] + +[[package]] +name = "zmq-sys" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e8351dc72494b4d7f5652a681c33634063bbad58046c1689e75270908fdc864" +dependencies = [ + "libc", + "system-deps", + "zeromq-src", +] diff --git a/hydroflow_cli_integration/src/lib.rs b/hydroflow_cli_integration/src/lib.rs index 80523dca51ab..7c27b4a3cc8b 100644 --- a/hydroflow_cli_integration/src/lib.rs +++ b/hydroflow_cli_integration/src/lib.rs @@ -12,7 +12,7 @@ use std::{ use bytes::{Bytes, BytesMut}; use serde::{Deserialize, Serialize}; -use futures::{ready, stream, Sink, Stream}; +use futures::{ready, sink::Buffer, stream, Sink, SinkExt, Stream}; use async_recursion::async_recursion; use async_trait::async_trait; @@ -395,9 +395,14 @@ impl ConnectedSink for ConnectedBidi { } } -pub struct ConnectedDemux { +pub type BufferedDrain = DemuxDrain>; + +pub struct ConnectedDemux +where + ::Input: Sync, +{ pub keys: Vec, - sink: Option>, + sink: Option>, } #[pin_project] @@ -460,7 +465,12 @@ where for (id, pipe) in demux { connected_demux.insert( id, - Box::pin(T::from_defn(ServerOrBound::Server(pipe)).await.into_sink()), + Box::pin( + T::from_defn(ServerOrBound::Server(pipe)) + .await + .into_sink() + .buffer(1024), + ), ); } @@ -481,7 +491,12 @@ where for (id, bound) in demux { connected_demux.insert( id, - Box::pin(T::from_defn(ServerOrBound::Bound(bound)).await.into_sink()), + Box::pin( + T::from_defn(ServerOrBound::Bound(bound)) + .await + .into_sink() + .buffer(1024), + ), ); } @@ -505,7 +520,7 @@ where ::Input: 'static + Sync, { type Input = (u32, T::Input); - type Sink = DemuxDrain; + type Sink = BufferedDrain; fn into_sink(mut self) -> Self::Sink { self.sink.take().unwrap()