From cac452323c06b1cdaa2caebc4aad1d118ce3f74b Mon Sep 17 00:00:00 2001 From: 0xterminator Date: Sat, 18 Jan 2025 04:59:58 +0200 Subject: [PATCH] feat(repo): Added load-tester and benchmarks (#379) * feat(repo): Added load-tester and benchmarks * ci(repo): adjust sqlx-cli installation * refactor(core): small adjustment --------- Co-authored-by: Pedro Nauck --- .github/actions/setup-rust/action.yaml | 4 + .github/workflows/ci.yaml | 1 + CONTRIBUTING.md | 2 + Cargo.lock | 280 ++++++++++++++++++ Cargo.toml | 3 +- Makefile | 21 +- README.md | 6 +- benches/data-parser/Cargo.toml | 41 +++ benches/data-parser/README.md | 7 + benches/data-parser/benches/deserialize.rs | 74 +++++ .../benches/deserialize_decompress.rs | 73 +++++ benches/data-parser/benches/serialize.rs | 50 ++++ .../data-parser/benches/serialize_compress.rs | 50 ++++ benches/data-parser/src/lib.rs | 8 + benches/load-tester/Cargo.toml | 22 ++ benches/load-tester/README.md | 9 + benches/load-tester/src/lib.rs | 1 + benches/load-tester/src/main.rs | 17 ++ benches/load-tester/src/runners/cli.rs | 42 +++ benches/load-tester/src/runners/mod.rs | 4 + benches/load-tester/src/runners/results.rs | 118 ++++++++ benches/load-tester/src/runners/runner_all.rs | 184 ++++++++++++ .../src/runners/runner_streamable.rs | 63 ++++ .../src/stream/stream_impl.rs | 5 +- crates/fuel-streams/README.md | 6 +- 25 files changed, 1077 insertions(+), 14 deletions(-) create mode 100644 benches/data-parser/Cargo.toml create mode 100644 benches/data-parser/README.md create mode 100644 benches/data-parser/benches/deserialize.rs create mode 100644 benches/data-parser/benches/deserialize_decompress.rs create mode 100644 benches/data-parser/benches/serialize.rs create mode 100644 benches/data-parser/benches/serialize_compress.rs create mode 100644 benches/data-parser/src/lib.rs create mode 100644 benches/load-tester/Cargo.toml create mode 100644 benches/load-tester/README.md create mode 100644 benches/load-tester/src/lib.rs create mode 100644 benches/load-tester/src/main.rs create mode 100644 benches/load-tester/src/runners/cli.rs create mode 100644 benches/load-tester/src/runners/mod.rs create mode 100644 benches/load-tester/src/runners/results.rs create mode 100644 benches/load-tester/src/runners/runner_all.rs create mode 100644 benches/load-tester/src/runners/runner_streamable.rs diff --git a/.github/actions/setup-rust/action.yaml b/.github/actions/setup-rust/action.yaml index bbf588ae..f03edf26 100644 --- a/.github/actions/setup-rust/action.yaml +++ b/.github/actions/setup-rust/action.yaml @@ -51,6 +51,10 @@ runs: with: tool: cargo-edit@0.12.3 + - uses: taiki-e/cache-cargo-install-action@v2 + with: + tool: sqlx-cli@0.8.3 + - uses: Swatinem/rust-cache@v2 if: inputs.cache == 'true' with: diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 3cd80c49..5be62df2 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -38,6 +38,7 @@ jobs: refactor test scopes: | + benches repo deps release diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 7f6b217c..8bde43c2 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -57,6 +57,7 @@ You can check the [./scripts/setup.sh](./scripts/setup.sh) file to see what is b Here's an overview of the project's directory structure: - `crates/`: Contains the main Rust crates for the project +- `benches/`: Benchmarking code - `tests/`: Integration and end-to-end tests - `examples/`: Example code and usage demonstrations - `cluster/`: Kubernetes cluster configuration and deployment files @@ -90,6 +91,7 @@ This is a general rule used for commits. When you are creating a PR, ensure that - `core`: Changes that affect the core package. - `publisher`: Changes that affect the publisher package. - `fuel-streams`: Changes that affect the fuel-streams package. +- `benches`: Changes related to benchmarks. - `deps`: Changes related to dependencies. - `macros`: Changes that affect the macros package. diff --git a/Cargo.lock b/Cargo.lock index abfe353b..09db1524 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -349,6 +349,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "anstream" version = "0.6.18" @@ -404,6 +410,15 @@ version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04" +[[package]] +name = "approx" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cab112f0a86d568ea0e627cc1d6be74a1e9cd55214684db5561995f6dad897c6" +dependencies = [ + "num-traits", +] + [[package]] name = "arbitrary" version = "1.4.1" @@ -1162,6 +1177,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "bytemuck" +version = "1.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef657dfab802224e671f5818e9a4935f9b1957ed18e58292690cc39e7a4092a3" + [[package]] name = "byteorder" version = "1.5.0" @@ -1239,6 +1260,12 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "cc" version = "1.2.9" @@ -1310,6 +1337,33 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "cipher" version = "0.4.4" @@ -1868,6 +1922,44 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap 4.5.26", + "criterion-plot", + "futures", + "is-terminal", + "itertools 0.10.5", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "tokio", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools 0.10.5", +] + [[package]] name = "critical-section" version = "1.2.0" @@ -2146,6 +2238,18 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "data-parser" +version = "0.0.16" +dependencies = [ + "criterion", + "fuel-data-parser", + "fuel-streams-domains", + "rand", + "strum 0.26.3", + "tokio", +] + [[package]] name = "debugid" version = "0.8.0" @@ -4258,6 +4362,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" +dependencies = [ + "cfg-if", + "crunchy", +] + [[package]] name = "hash32" version = "0.2.1" @@ -5118,6 +5232,17 @@ version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" +[[package]] +name = "is-terminal" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "261f68e344040fbd0edea105bef17c66edf46f984ddb1115b775ce31be948f4b" +dependencies = [ + "hermit-abi 0.4.0", + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -5865,6 +5990,21 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4ce301924b7887e9d637144fdade93f9dfff9b60981d4ac161db09720d39aa5" +[[package]] +name = "load-tester" +version = "0.0.16" +dependencies = [ + "anyhow", + "chrono", + "clap 4.5.26", + "fuel-streams", + "fuel-streams-core", + "fuel-streams-store", + "futures", + "statrs", + "tokio", +] + [[package]] name = "local-channel" version = "0.1.5" @@ -5974,6 +6114,16 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "matrixmultiply" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9380b911e3e96d10c1f415da0876389aaf1b56759054eeb0de7df940c456ba1a" +dependencies = [ + "autocfg", + "rawpointer", +] + [[package]] name = "md-5" version = "0.10.6" @@ -6171,6 +6321,23 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e94e1e6445d314f972ff7395df2de295fe51b71821694f0b0e1e79c4f12c8577" +[[package]] +name = "nalgebra" +version = "0.33.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26aecdf64b707efd1310e3544d709c5c0ac61c13756046aaaba41be5c4f66a3b" +dependencies = [ + "approx", + "matrixmultiply", + "num-complex", + "num-rational", + "num-traits", + "rand", + "rand_distr", + "simba", + "typenum", +] + [[package]] name = "names" version = "0.14.0" @@ -6366,6 +6533,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-complex" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" +dependencies = [ + "num-traits", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -6486,6 +6662,12 @@ version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +[[package]] +name = "oorandom" +version = "11.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9" + [[package]] name = "opaque-debug" version = "0.3.1" @@ -6823,6 +7005,34 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + [[package]] name = "polling" version = "3.7.4" @@ -7425,6 +7635,16 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rand_distr" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32cb0b9bc82b0a0876c2dd994a7e7a2683d3e7390ca40e6886785ef0c7e3ee31" +dependencies = [ + "num-traits", + "rand", +] + [[package]] name = "rand_xorshift" version = "0.3.0" @@ -7434,6 +7654,12 @@ dependencies = [ "rand_core", ] +[[package]] +name = "rawpointer" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a357793950651c4ed0f3f52338f53b2f809f32d83a07f72909fa13e4c6c1e3" + [[package]] name = "rayon" version = "1.10.0" @@ -8033,6 +8259,15 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" +[[package]] +name = "safe_arch" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96b02de82ddbe1b636e6170c21be622223aea188ef2e139be0a5b219ec215323" +dependencies = [ + "bytemuck", +] + [[package]] name = "same-file" version = "1.0.6" @@ -8448,6 +8683,19 @@ dependencies = [ "rand_core", ] +[[package]] +name = "simba" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3a386a501cd104797982c15ae17aafe8b9261315b5d07e3ec803f2ea26be0fa" +dependencies = [ + "approx", + "num-complex", + "num-traits", + "paste", + "wide", +] + [[package]] name = "simdutf8" version = "0.1.5" @@ -8765,6 +9013,18 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7beae5182595e9a8b683fa98c4317f956c9a2dec3b9716990d20023cc60c766" +[[package]] +name = "statrs" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a3fe7c28c6512e766b0874335db33c94ad7b8f9054228ae1c2abd47ce7d335e" +dependencies = [ + "approx", + "nalgebra", + "num-traits", + "rand", +] + [[package]] name = "stringprep" version = "0.1.5" @@ -9454,6 +9714,16 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.8.1" @@ -10522,6 +10792,16 @@ dependencies = [ "wasite", ] +[[package]] +name = "wide" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41b5576b9a81633f3e8df296ce0063042a73507636cbe956c61133dd7034ab22" +dependencies = [ + "bytemuck", + "safe_arch", +] + [[package]] name = "widestring" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index eb40e9c3..fd46ea1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,11 +2,11 @@ default-members = ["crates/fuel-streams"] resolver = "2" members = [ + "benches/*", "crates/*", "crates/fuel-streams-macros/subject-derive", "examples", "scripts/subjects-schema", - "scripts/subjects-schema", "tests", ] @@ -87,6 +87,7 @@ fuel-streams-domains = { version = "0.0.16", path = "crates/fuel-streams-domains fuel-streams-macros = { version = "0.0.16", path = "crates/fuel-streams-macros" } fuel-streams-store = { version = "0.0.16", path = "crates/fuel-streams-store" } fuel-streams-types = { version = "0.0.16", path = "crates/fuel-streams-types" } +fuel-streams-test = { version = "0.0.16", path = "crates/fuel-streams-test" } fuel-web-utils = { version = "0.0.16", path = "crates/fuel-web-utils" } subject-derive = { version = "0.0.16", path = "crates/fuel-streams-macros/subject-derive" } sv-consumer = { version = "0.0.16", path = "crates/sv-consumer" } diff --git a/Makefile b/Makefile index eba290ac..b892bd7c 100644 --- a/Makefile +++ b/Makefile @@ -193,12 +193,31 @@ audit-fix-test: audit-fix: cargo audit fix +# ------------------------------------------------------------ +# Build & Documentation +# ------------------------------------------------------------ + +build: + cargo build --release + +docs: doc + @echo "Generating additional documentation..." + @cargo doc --no-deps --document-private-items + @cargo doc --workspace --no-deps + +docs-serve: docs + @echo "Serving documentation on http://localhost:8000" + @python3 -m http.server 8000 --directory target/doc + # ------------------------------------------------------------ # Load Testing & Benchmarking # ------------------------------------------------------------ load-test: - cargo run -p load-tester -- --network testnet --max-subscriptions 10 --step-size 1 + cargo run -p load-tester -- --network staging --ws-url "wss://stream-staging.fuel.network" --api-key "your_api_key" --max-subscriptions 10 --step-size 1 + +bench: + cargo bench -p data-parser # ------------------------------------------------------------ # Publisher Run Commands diff --git a/README.md b/README.md index 0fb10b48..e8703dfb 100644 --- a/README.md +++ b/README.md @@ -25,16 +25,14 @@ ## 📝 About The Project -> [!WARNING] -> This project is currently under development and is not yet ready for production use. - -Fuel Data Systems is a comprehensive suite of libraries and tools designed to enable real-time data streaming and processing from the Fuel Network. This repository houses the official data streaming ecosystem, offering developers a powerful and flexible API to interact with Fuel Network data in real-time. +Fuel Data Systems is a comprehensive suite of libraries and tools designed to enable real-time and historical data streaming and processing from the Fuel Network. This repository houses the official data streaming ecosystem, offering developers a powerful and flexible API to interact with Fuel Network data in real-time. With Fuel Data Systems, developers can build sophisticated applications that leverage the full potential of the Fuel Network's data, from simple block explorers to complex analytics engines and trading systems. ## 🚀 Features - Real-time streaming of Fuel blockchain data +- Historical streaming of Fuel blockchain data - Support for various Fuel-specific data types - Customizable filters for targeted data retrieval - Flexible delivery policies for historical and real-time data diff --git a/benches/data-parser/Cargo.toml b/benches/data-parser/Cargo.toml new file mode 100644 index 00000000..63923ee5 --- /dev/null +++ b/benches/data-parser/Cargo.toml @@ -0,0 +1,41 @@ +[package] +name = "data-parser" +authors = { workspace = true } +keywords = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +version = { workspace = true } +rust-version = { workspace = true } +publish = false + +[[bench]] +name = "serialize" +harness = false # do not use the default harness test +path = "benches/serialize.rs" + +[[bench]] +name = "deserialize" +harness = false # do not use the default harness test +path = "benches/deserialize.rs" + +[[bench]] +name = "serialize_compress" +harness = false # do not use the default harness test +path = "benches/serialize_compress.rs" + +[[bench]] +name = "deserialize_decompress" +harness = false # do not use the default harness test +path = "benches/deserialize_decompress.rs" + +[dependencies] +fuel-data-parser = { workspace = true, features = ["test-helpers", "bench-helpers", "all"] } +fuel-streams-domains = { workspace = true, features = ["test-helpers"] } +rand = { workspace = true } +strum = { workspace = true } +tokio = { workspace = true } + +[dev-dependencies] +criterion = { version = "0.5", features = ["html_reports", "async_tokio"] } diff --git a/benches/data-parser/README.md b/benches/data-parser/README.md new file mode 100644 index 00000000..c6607b41 --- /dev/null +++ b/benches/data-parser/README.md @@ -0,0 +1,7 @@ +# Running + +You can run all benchmarks with cargo: + +```sh +cargo bench +``` diff --git a/benches/data-parser/benches/deserialize.rs b/benches/data-parser/benches/deserialize.rs new file mode 100644 index 00000000..e0fd4109 --- /dev/null +++ b/benches/data-parser/benches/deserialize.rs @@ -0,0 +1,74 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use data_parser::generate_test_block; +use fuel_data_parser::{ + DataParser, + SerializationType, + DEFAULT_COMPRESSION_STRATEGY, +}; +use fuel_streams_domains::blocks::Block; +use strum::IntoEnumIterator; + +fn bench_deserialize(c: &mut Criterion) { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + // Benchmarks for different serialization methods + let parametric_matrix = SerializationType::iter() + .map(|serialization_type| { + (serialization_type, DEFAULT_COMPRESSION_STRATEGY.clone()) + }) + .collect::>(); + + // Pre-serialize data for each serialization type + let serialized_data: Vec<_> = parametric_matrix + .iter() + .map(|(serialization_type, compression_strategy)| { + let test_block = generate_test_block(); + let data_parser = DataParser::default() + .with_compression_strategy(compression_strategy) + .with_serialization_type(serialization_type.clone()); + + // Perform serialization asynchronously and collect the results + let serialized = runtime.block_on(async { + data_parser + .serialize(&test_block) + .await + .expect("serialization failed") + }); + + (serialization_type.clone(), compression_strategy, serialized) + }) + .collect(); + + let mut group = c.benchmark_group("deserialize"); + + // benchmark each combination + for (serialization_type, compression_strategy, serialized) in + serialized_data + { + let bench_name = format!("[{}]", serialization_type); + group.bench_function(&bench_name, |b| { + let data_parser = DataParser::default() + .with_compression_strategy(compression_strategy) + .with_serialization_type(serialization_type.clone()); + + b.iter(|| { + // Perform deserialization + let result = runtime.block_on(async { + data_parser + .deserialize::(&serialized) + .expect("deserialization failed") + }); + // Use black_box to make sure 'result' is considered used by the compiler + black_box(result); + }); + }); + } + + group.finish(); +} + +criterion_group!(benches, bench_deserialize); +criterion_main!(benches); diff --git a/benches/data-parser/benches/deserialize_decompress.rs b/benches/data-parser/benches/deserialize_decompress.rs new file mode 100644 index 00000000..45e54963 --- /dev/null +++ b/benches/data-parser/benches/deserialize_decompress.rs @@ -0,0 +1,73 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use data_parser::generate_test_block; +use fuel_data_parser::{ + DataParser, + SerializationType, + ALL_COMPRESSION_STRATEGIES, +}; +use fuel_streams_domains::blocks::Block; +use strum::IntoEnumIterator; + +fn bench_decompress_deserialize(c: &mut Criterion) { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + // Pre-serialize data for each combination type + let mut parametric_matrix = vec![]; + for serialization_type in SerializationType::iter() { + for compression_strategy in ALL_COMPRESSION_STRATEGIES.iter() { + let data_parser = DataParser::default() + .with_serialization_type(serialization_type.clone()) + .with_compression_strategy(compression_strategy); + + let serialized_and_compressed = runtime.block_on(async { + data_parser + .encode(&generate_test_block()) + .await + .expect("serialization failed") + }); + + parametric_matrix.push(( + serialization_type.clone(), + compression_strategy, + serialized_and_compressed, + )); + } + } + + let mut group = c.benchmark_group("decompress_deserialize"); + + // benchmark each combination + for (serialization_type, compression_strategy, serialized_and_compressed) in + parametric_matrix.iter() + { + let bench_name = format!( + "[{:?}][{:?}]", + serialization_type, + compression_strategy.name(), + ); + + group.bench_function(&bench_name, |b| { + let data_parser = DataParser::default() + .with_compression_strategy(compression_strategy) + .with_serialization_type(serialization_type.clone()); + + b.to_async(&runtime).iter(|| async { + let deserialized_and_decompressed = data_parser + .decode::(serialized_and_compressed) + .await + .expect("decompresison and deserialization"); + + // Use black_box to make sure 'result' is considered used by the compiler + black_box(deserialized_and_decompressed); + }); + }); + } + + group.finish(); +} + +criterion_group!(benches, bench_decompress_deserialize); +criterion_main!(benches); diff --git a/benches/data-parser/benches/serialize.rs b/benches/data-parser/benches/serialize.rs new file mode 100644 index 00000000..82778b16 --- /dev/null +++ b/benches/data-parser/benches/serialize.rs @@ -0,0 +1,50 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use data_parser::generate_test_block; +use fuel_data_parser::{ + DataParser, + SerializationType, + DEFAULT_COMPRESSION_STRATEGY, +}; +use strum::IntoEnumIterator; + +fn bench_serialize(c: &mut Criterion) { + let mut group = c.benchmark_group("serialize"); + + let test_block = generate_test_block(); + + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + // Benchmarks for different serialization methods + let parametric_matrix = SerializationType::iter() + .map(|serialization_type| { + (serialization_type, DEFAULT_COMPRESSION_STRATEGY.clone()) + }) + .collect::>(); + + for (serialization_type, compression_strategy) in parametric_matrix { + let bench_name = format!("[{}]", serialization_type); + + group.bench_function(bench_name, |b| { + let data_parser = DataParser::default() + .with_compression_strategy(&compression_strategy) + .with_serialization_type(serialization_type.clone()); + + b.to_async(&runtime).iter(|| async { + let result = data_parser + .serialize(&test_block) + .await + .expect("serialization"); + // Use black_box to make sure 'result' is considered used by the compiler + black_box(result.len()); // record size of the data + }); + }); + } + + group.finish(); +} + +criterion_group!(benches, bench_serialize); +criterion_main!(benches); diff --git a/benches/data-parser/benches/serialize_compress.rs b/benches/data-parser/benches/serialize_compress.rs new file mode 100644 index 00000000..9194bdce --- /dev/null +++ b/benches/data-parser/benches/serialize_compress.rs @@ -0,0 +1,50 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use data_parser::generate_test_block; +use fuel_data_parser::{ + DataParser, + SerializationType, + ALL_COMPRESSION_STRATEGIES, +}; +use strum::IntoEnumIterator; + +fn bench_serialize_compress(c: &mut Criterion) { + let mut group = c.benchmark_group("serialize_compress"); + + let test_block = generate_test_block(); + + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + // build test matrix + for serialization_type in SerializationType::iter() { + for compression_strategy in ALL_COMPRESSION_STRATEGIES.iter() { + let bench_name = format!( + "[{:?}][{:?}]", + serialization_type.to_string(), + compression_strategy.name(), + ); + + group.bench_function(&bench_name, |b| { + let data_parser = DataParser::default() + .with_serialization_type(serialization_type.clone()) + .with_compression_strategy(compression_strategy); + + b.to_async(&runtime).iter(|| async { + let result = data_parser + .encode(&test_block) + .await + .expect("serialization and compression error"); + // Use black_box to make sure 'result' is considered used by the compiler + black_box(result.len()); // record size of the data + }); + }); + } + } + + group.finish(); +} + +criterion_group!(benches, bench_serialize_compress); +criterion_main!(benches); diff --git a/benches/data-parser/src/lib.rs b/benches/data-parser/src/lib.rs new file mode 100644 index 00000000..8dcb0efd --- /dev/null +++ b/benches/data-parser/src/lib.rs @@ -0,0 +1,8 @@ +use fuel_streams_domains::blocks::{Block, MockBlock}; +use rand::Rng; + +pub fn generate_test_block() -> Block { + let mut rng = rand::thread_rng(); + let block_height: u32 = rng.gen_range(1..100000); + MockBlock::build(block_height) +} diff --git a/benches/load-tester/Cargo.toml b/benches/load-tester/Cargo.toml new file mode 100644 index 00000000..972dd7b6 --- /dev/null +++ b/benches/load-tester/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "load-tester" +authors = { workspace = true } +keywords = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +version = { workspace = true } +rust-version = { workspace = true } +publish = false + +[dependencies] +anyhow = { workspace = true } +chrono = { workspace = true } +clap = { workspace = true } +fuel-streams = { workspace = true } +fuel-streams-core = { workspace = true, features = ["test-helpers"] } +fuel-streams-store = { workspace = true } +futures = { workspace = true } +statrs = "0.18.0" +tokio = { workspace = true } diff --git a/benches/load-tester/README.md b/benches/load-tester/README.md new file mode 100644 index 00000000..38dae63a --- /dev/null +++ b/benches/load-tester/README.md @@ -0,0 +1,9 @@ +# Running + +To run the load-test suite: + + ```sh + cargo run -- --network staging --ws-url "wss://stream-staging.fuel.network" --api-key "your_api_key" --max-subscriptions 10 --step-size 1 + ``` + +Adjustments are to be applied based on the max-subscriptions and step-size. diff --git a/benches/load-tester/src/lib.rs b/benches/load-tester/src/lib.rs new file mode 100644 index 00000000..0edfafa9 --- /dev/null +++ b/benches/load-tester/src/lib.rs @@ -0,0 +1 @@ +pub mod runners; diff --git a/benches/load-tester/src/main.rs b/benches/load-tester/src/main.rs new file mode 100644 index 00000000..48f2eae2 --- /dev/null +++ b/benches/load-tester/src/main.rs @@ -0,0 +1,17 @@ +use clap::Parser; +use load_tester::runners::{cli::Cli, runner_all::LoadTesterEngine}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let cli = Cli::parse(); + println!("Running load test ..."); + let load_tester = LoadTesterEngine::new( + cli.network, + cli.api_key, + cli.max_subscriptions, + cli.step_size, + ); + load_tester.run().await?; + println!("Finished load testing!"); + Ok(()) +} diff --git a/benches/load-tester/src/runners/cli.rs b/benches/load-tester/src/runners/cli.rs new file mode 100644 index 00000000..7b9ab20a --- /dev/null +++ b/benches/load-tester/src/runners/cli.rs @@ -0,0 +1,42 @@ +use clap::Parser; +use fuel_streams::FuelNetwork; + +#[derive(Clone, Parser)] +pub struct Cli { + /// Fuel Network to connect to. + #[arg( + long, + value_name = "NETWORK", + env = "NETWORK", + default_value = "Local", + value_parser = clap::value_parser!(FuelNetwork) + )] + pub network: FuelNetwork, + /// Api Key for the websocket server. + #[arg( + long, + value_name = "API_KEY", + env = "API_KEY", + default_value = "", + help = "Api Key for the ws server." + )] + pub api_key: String, + /// Maximum subscriptions for load testing + #[arg( + long, + value_name = "MAXS", + env = "MAX_SUBS", + default_value = "10", + help = "Maximum subscriptions for load testing." + )] + pub max_subscriptions: u16, + /// Maximum step size for load testing + #[arg( + long, + value_name = "SSIZE", + env = "STEP_SIZE", + default_value = "1", + help = "Maximum step size for load testing." + )] + pub step_size: u16, +} diff --git a/benches/load-tester/src/runners/mod.rs b/benches/load-tester/src/runners/mod.rs new file mode 100644 index 00000000..cc84e6f4 --- /dev/null +++ b/benches/load-tester/src/runners/mod.rs @@ -0,0 +1,4 @@ +pub mod cli; +pub mod results; +pub mod runner_all; +pub mod runner_streamable; diff --git a/benches/load-tester/src/runners/results.rs b/benches/load-tester/src/runners/results.rs new file mode 100644 index 00000000..fe1b56b8 --- /dev/null +++ b/benches/load-tester/src/runners/results.rs @@ -0,0 +1,118 @@ +use core::fmt; +use std::{ + sync::{ + atomic::{AtomicUsize, Ordering}, + RwLock, + }, + time::{Duration, Instant}, +}; + +use chrono::{DateTime, Utc}; +use statrs::statistics::{Data, Distribution}; + +#[derive(Debug)] +pub struct LoadTestTracker { + pub name: String, + pub message_count: AtomicUsize, + pub error_count: AtomicUsize, + start_time: Instant, + pub elapsed_time: RwLock>, + pub messages_per_second: RwLock>, + pub publish_times: RwLock>, + pub mean_publish_time: RwLock>, +} + +impl fmt::Display for LoadTestTracker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "\n{}\nLoadTest Results: {}\n{}\nTotal Messages: {}\nTotal Errors: {}\nElapsed Time: {:?}\nMessages per Second: {:.2}\nMean Publish Time: {:?}\n{}", + "=".repeat(50), + self.name, + "=".repeat(50), + self.message_count.load(Ordering::Relaxed), + self.error_count.load(Ordering::Relaxed), + self.elapsed_time.read().unwrap().unwrap_or_default(), + self.messages_per_second.read().unwrap().unwrap_or_default(), + self.mean_publish_time.read().unwrap().unwrap_or_default(), + "=".repeat(50) + ) + } +} + +impl LoadTestTracker { + pub fn new(name: String) -> Self { + Self { + name, + message_count: AtomicUsize::new(0), + error_count: AtomicUsize::new(0), + start_time: Instant::now(), + elapsed_time: RwLock::new(None), + messages_per_second: RwLock::new(None), + publish_times: RwLock::new(vec![]), + mean_publish_time: RwLock::new(None), + } + } + + pub fn increment_message_count(&self) { + self.message_count.fetch_add(1, Ordering::Relaxed); + } + + pub fn increment_error_count(&self) { + self.error_count.fetch_add(1, Ordering::Relaxed); + } + + pub fn refresh(&self) -> &Self { + self.calculate_mean_publish_time(); + + let elapsed = self.start_time.elapsed(); + let message_count = self.message_count.load(Ordering::Relaxed); + + if let Ok(mut elapsed_time) = self.elapsed_time.write() { + *elapsed_time = Some(elapsed); + } + + if let Ok(mut messages_per_second) = self.messages_per_second.write() { + *messages_per_second = + Some(message_count as f64 / elapsed.as_secs_f64()); + } + + self + } + + pub fn add_publish_time(&self, timestamp: u128) -> &Self { + let current_time = Utc::now(); + let publish_time = + DateTime::::from_timestamp_millis(timestamp as i64) + .expect("Invalid timestamp"); + let duration = current_time + .signed_duration_since(publish_time) + .to_std() + .expect("Duration calculation failed"); + + if let Ok(mut times) = self.publish_times.write() { + times.push(duration); + } + self + } + + pub fn calculate_mean_publish_time(&self) { + // Lock the mutex to access publish_times + let times = self.publish_times.read().unwrap(); + + if times.is_empty() { + return; + } + + let times_ns: Vec = + times.iter().map(|d| d.as_nanos() as f64).collect(); + drop(times); + + let data = Data::new(times_ns); + let mean_ns = data.mean().unwrap(); + + if let Ok(mut mean_publish_time) = self.mean_publish_time.write() { + *mean_publish_time = Some(Duration::from_nanos(mean_ns as u64)); + } + } +} diff --git a/benches/load-tester/src/runners/runner_all.rs b/benches/load-tester/src/runners/runner_all.rs new file mode 100644 index 00000000..81908d24 --- /dev/null +++ b/benches/load-tester/src/runners/runner_all.rs @@ -0,0 +1,184 @@ +use std::{sync::Arc, time::Duration}; + +use anyhow::Result; +use fuel_streams::prelude::*; +use fuel_streams_core::{ + blocks::BlocksSubject, + inputs::InputsCoinSubject, + outputs::OutputsCoinSubject, + subjects::{ReceiptsLogSubject, SubjectBuildable, TransactionsSubject}, + types::{Block, Input, Output, Receipt, Transaction, Utxo}, + utxos::UtxosSubject, +}; +use tokio::task::JoinHandle; + +use super::{ + results::LoadTestTracker, + runner_streamable::spawn_streamable_consumer, +}; + +pub struct LoadTesterEngine { + max_subscriptions: u16, + step_size: u16, + api_key: String, + network: FuelNetwork, +} + +impl LoadTesterEngine { + pub fn new( + network: FuelNetwork, + api_key: String, + max_subscriptions: u16, + step_size: u16, + ) -> Self { + Self { + network, + api_key, + max_subscriptions, + step_size, + } + } +} + +impl LoadTesterEngine { + pub async fn run(&self) -> Result<(), anyhow::Error> { + let mut handles: Vec> = vec![]; + // blocks + let blocks_test_tracker = + Arc::new(LoadTestTracker::new("Blocks Consumer".into())); + let blocks_test_tracker_printer = Arc::clone(&blocks_test_tracker); + + // inputs + let inputs_test_tracker = + Arc::new(LoadTestTracker::new("Inputs Consumer".into())); + let inputs_test_tracker_printer = Arc::clone(&inputs_test_tracker); + + // txs + let txs_test_tracker = + Arc::new(LoadTestTracker::new("Txs Consumer".into())); + let txs_test_tracker_printer = Arc::clone(&txs_test_tracker); + + // receipts + let receipts_test_tracker = + Arc::new(LoadTestTracker::new("Receipts Consumer".into())); + let receipts_test_tracker_printer = Arc::clone(&receipts_test_tracker); + + // utxos + let utxos_test_tracker = + Arc::new(LoadTestTracker::new("Utxos Consumer".into())); + let utxos_test_tracker_printer = Arc::clone(&utxos_test_tracker); + + // outputs + let outputs_test_tracker = + Arc::new(LoadTestTracker::new("Outputs Consumer".into())); + let outputs_test_tracker_printer = Arc::clone(&outputs_test_tracker); + + // print regularly the tracked metrics + handles.push(tokio::spawn(async move { + loop { + // blocks + blocks_test_tracker_printer.refresh(); + println!("{}", blocks_test_tracker_printer); + + // inputs + inputs_test_tracker_printer.refresh(); + println!("{}", inputs_test_tracker_printer); + + // txs + txs_test_tracker_printer.refresh(); + println!("{}", txs_test_tracker_printer); + + // utxos + utxos_test_tracker_printer.refresh(); + println!("{}", utxos_test_tracker_printer); + + // receipts + receipts_test_tracker_printer.refresh(); + println!("{}", receipts_test_tracker_printer); + + // outputs + outputs_test_tracker_printer.refresh(); + println!("{}", outputs_test_tracker_printer); + + // do a short pause + tokio::time::sleep(Duration::from_secs(5)).await; + } + })); + + // Incrementally increase subscriptions + for current_subs in + (1..=self.max_subscriptions).step_by(self.step_size as usize) + { + let blocks_test_tracker = Arc::clone(&blocks_test_tracker); + for _ in 0..current_subs { + // blocks + handles.push( + spawn_streamable_consumer::( + self.network, + self.api_key.clone(), + BlocksSubject::new().with_height(None), + Arc::clone(&blocks_test_tracker), + ) + .await?, + ); + + // inputs + handles.push( + spawn_streamable_consumer::( + self.network, + self.api_key.clone(), + InputsCoinSubject::new(), + Arc::clone(&inputs_test_tracker), + ) + .await?, + ); + + // txs + handles.push(spawn_streamable_consumer::(self.network, self.api_key.clone(), TransactionsSubject::new(), Arc::clone(&txs_test_tracker)).await?); + + // outputs + handles.push( + spawn_streamable_consumer::( + self.network, + self.api_key.clone(), + OutputsCoinSubject::new(), + Arc::clone(&outputs_test_tracker), + ) + .await?, + ); + + // utxos + handles.push( + spawn_streamable_consumer::( + self.network, + self.api_key.clone(), + UtxosSubject::new(), + Arc::clone(&utxos_test_tracker), + ) + .await?, + ); + + // receipts + handles.push( + spawn_streamable_consumer::( + self.network, + self.api_key.clone(), + ReceiptsLogSubject::new(), + Arc::clone(&receipts_test_tracker), + ) + .await?, + ); + } + + // Small pause between test iterations + tokio::time::sleep(Duration::from_secs(5)).await; + } + + // cleanup + for handle in handles.iter() { + handle.abort(); + } + + Ok(()) + } +} diff --git a/benches/load-tester/src/runners/runner_streamable.rs b/benches/load-tester/src/runners/runner_streamable.rs new file mode 100644 index 00000000..063771d1 --- /dev/null +++ b/benches/load-tester/src/runners/runner_streamable.rs @@ -0,0 +1,63 @@ +use std::sync::Arc; + +use anyhow::Result; +use chrono::Utc; +use fuel_streams::{subjects::FromJsonString, Client, FuelNetwork}; +use fuel_streams_core::{server::DeliverPolicy, subjects::IntoSubject}; +use fuel_streams_store::record::Record; +use futures::StreamExt; +use tokio::task::JoinHandle; + +use super::results::LoadTestTracker; + +pub async fn run_streamable_consumer< + S: IntoSubject + FromJsonString, + T: Record, +>( + network: FuelNetwork, + api_key: String, + subject: S, + load_test_tracker: Arc, +) -> Result<()> { + let mut client = Client::new(network).with_api_key(api_key); + let mut connection = client.connect().await?; + let mut stream = connection + .subscribe::(subject, DeliverPolicy::New) + .await?; + + while let Some(msg) = stream.next().await { + println!("Received entity: {:?}", msg.data); + load_test_tracker.increment_message_count(); + load_test_tracker + .add_publish_time(Utc::now().timestamp_millis() as u128); + } + + Ok(()) +} + +pub async fn spawn_streamable_consumer< + S: IntoSubject + FromJsonString, + T: Record, +>( + network: FuelNetwork, + api_key: String, + subject: S, + load_test_tracker: Arc, +) -> Result> { + Ok(tokio::spawn(async move { + if let Err(e) = run_streamable_consumer::( + network, + api_key, + subject.clone(), + load_test_tracker, + ) + .await + { + eprintln!( + "Error in {:?} subscriptions - {:?}", + subject.wildcard(), + e + ); + } + })) +} diff --git a/crates/fuel-streams-core/src/stream/stream_impl.rs b/crates/fuel-streams-core/src/stream/stream_impl.rs index fe2aaf40..b31ac308 100644 --- a/crates/fuel-streams-core/src/stream/stream_impl.rs +++ b/crates/fuel-streams-core/src/stream/stream_impl.rs @@ -93,10 +93,7 @@ impl Stream { } let mut live = broker.subscribe_to_events(&subject.parse()).await?; while let Some(msg) = live.next().await { - let msg = msg?; - let subject = msg.0; - let value = msg.1; - yield (subject, value); + yield msg?; let throttle_time = *config::STREAM_THROTTLE_LIVE; sleep(Duration::from_millis(throttle_time as u64)).await; } diff --git a/crates/fuel-streams/README.md b/crates/fuel-streams/README.md index 854a4bb5..da3b5e62 100644 --- a/crates/fuel-streams/README.md +++ b/crates/fuel-streams/README.md @@ -34,14 +34,12 @@ ## 📝 About The Project -> [!WARNING] -> This project is currently under development and is not yet ready for production use. - -Fuel Streams is a Rust library designed for working with streams of Fuel blockchain data. It provides an efficient and user-friendly interface for developers to interact with real-time blockchain data, offering support for Fuel-specific data types and leveraging NATS for scalable streaming. +Fuel Streams is a Rust library designed for working with streams of Fuel blockchain data. It provides an efficient and user-friendly interface for developers to interact with real-time and historical blockchain data, offering support for Fuel-specific data types and leveraging NATS for scalable streaming. ## 🚀 Features - Real-time streaming of Fuel blockchain data +- Historical streaming of Fuel blockchain data - Support for Fuel-specific data types - Efficient data handling using NATS - Easy-to-use API for subscribing to and processing blockchain events