diff --git a/Cargo.lock b/Cargo.lock index a168e31ee2ee..41c5b75b94a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -865,9 +865,9 @@ dependencies = [ [[package]] name = "dashmap" -version = "6.0.1" +version = "6.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" dependencies = [ "cfg-if", "crossbeam-utils", @@ -1114,9 +1114,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -1124,9 +1124,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" @@ -1141,9 +1141,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-lite" @@ -1160,9 +1160,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -1171,21 +1171,27 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -1249,6 +1255,7 @@ version = "0.1.0" dependencies = [ "clap", "config", + "governor", "hostname", "hydroflow", "lattices", @@ -1266,6 +1273,27 @@ dependencies = [ "warp", ] +[[package]] +name = "governor" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0746aa765db78b521451ef74221663b57ba595bf83f75d0ce23cc09447c8139f" +dependencies = [ + "cfg-if", + "dashmap", + "futures-sink", + "futures-timer", + "futures-util", + "no-std-compat", + "nonzero_ext", + "parking_lot 0.12.3", + "portable-atomic", + "quanta", + "rand", + "smallvec", + "spinning_top", +] + [[package]] name = "h2" version = "0.3.26" @@ -2213,6 +2241,12 @@ dependencies = [ "libc", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "2.2.1" @@ -2229,6 +2263,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "notify" version = "6.1.1" @@ -2785,6 +2825,21 @@ dependencies = [ "serde", ] +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quick-xml" version = "0.26.0" @@ -2843,6 +2898,15 @@ dependencies = [ "rand", ] +[[package]] +name = "raw-cpuid" +version = "11.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "rayon" version = "1.10.0" @@ -3335,6 +3399,15 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spinning_top" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" +dependencies = [ + "lock_api", +] + [[package]] name = "ssh2" version = "0.9.4" diff --git a/datastores/gossip_kv/Cargo.toml b/datastores/gossip_kv/Cargo.toml index 54fc239ad67a..b59765578698 100644 --- a/datastores/gossip_kv/Cargo.toml +++ b/datastores/gossip_kv/Cargo.toml @@ -9,6 +9,7 @@ publish = false [dependencies] clap = { version = "4.5.4", features = ["derive"] } config = "0.14.0" +governor = "0.7.0" hostname = "0.4.0" hydroflow = { path="../../hydroflow" } lattices = { path = '../../lattices'} @@ -31,10 +32,14 @@ warp = "0.3.7" name = "gossip_server" path = "server/main.rs" +[[bin]] +name = "load_test_server" +path = "load_test_server/server.rs" + [[bin]] name = "gossip_cli" path = "cli/main.rs" [lib] -name = "gossip_protocol" -path = "protocol/lib.rs" \ No newline at end of file +name = "gossip_kv" +path = "kv/lib.rs" \ No newline at end of file diff --git a/datastores/gossip_kv/Makefile b/datastores/gossip_kv/Makefile index 03690c1d3cf5..dbfe144f0d78 100644 --- a/datastores/gossip_kv/Makefile +++ b/datastores/gossip_kv/Makefile @@ -8,26 +8,15 @@ MINIKUBE_MEMORY:=32768 BASE_IMAGE_VERSION:=latest SERVER_IMAGE_VERSION:=latest CLI_IMAGE_VERSION:=latest +LOAD_TEST_IMAGE_VERSION:=latest # Docker Image Tags BASE_IMAGE_TAG:=hydroflow-gossip-kv-base-image:$(BASE_IMAGE_VERSION) SERVER_IMAGE_TAG:=hydroflow-gossip-kv-server:$(SERVER_IMAGE_VERSION) CLI_IMAGE_TAG:=hydroflow-gossip-kv-cli:$(CLI_IMAGE_VERSION) +LOAD_TEST_IMAGE_TAG:=hydroflow-gossip-kv-load-test:$(LOAD_TEST_IMAGE_VERSION) -AWS_TERRAFORM_PATH=../../datastores/gossip_kv/server/deployment/aws/terraform - -# Define color variables -COLOR_RESET = \033[0m -COLOR_GREEN = \033[1;32m -COLOR_BLUE = \033[1;34m -COLOR_RED = \033[1;31m - -# Define custom echo functions for different colors -greentext = @echo "$(COLOR_GREEN)$(1)$(COLOR_RESET)" -bluetext = @echo "$(COLOR_BLUE)$(1)$(COLOR_RESET)" -redtext = @echo "$(COLOR_RED)$(1)$(COLOR_RESET)" - -# Default target when you run 'make' +AWS_TERRAFORM_PATH=../../datastores/gossip_kv/deployment/aws/terraform # Target to start Minikube with specific options start_minikube: @@ -35,7 +24,7 @@ start_minikube: @echo "Please run 'eval \$$(minikube docker-env)' to use the Minikube Docker daemon" # Target to build the Docker images -build_docker_images: build_base_image build_server_image build_cli_image +build_docker_images: build_base_image build_server_image build_cli_image build_load_test_image build_base_image: docker build -t "$(BASE_IMAGE_TAG)" -f ../../datastores/gossip_kv/server/baseimage.Dockerfile ../.. @@ -46,6 +35,9 @@ build_server_image: build_cli_image: docker build -t "$(CLI_IMAGE_TAG)" -f ../../datastores/gossip_kv/cli/Dockerfile ../.. +build_load_test_image: + docker build -t "$(LOAD_TEST_IMAGE_TAG)" -f ../../datastores/gossip_kv/load_test_server/Dockerfile ../.. + # Target to clean up the Minikube cluster clean_local: minikube delete @@ -70,13 +62,17 @@ aws_setup_kubectl: aws_upload_docker_images: build_docker_images $(eval SERVER_REPO_URL := $(shell terraform -chdir=$(AWS_TERRAFORM_PATH) output -json repository_urls | jq -r '.["gossip_kv_server"]')) $(eval CLI_REPO_URL := $(shell terraform -chdir=$(AWS_TERRAFORM_PATH) output -json repository_urls | jq -r '.["gossip_kv_cli"]')) + $(eval LOAD_TEST_REPO_URL := $(shell terraform -chdir=$(AWS_TERRAFORM_PATH) output -json repository_urls | jq -r '.["gossip_kv_load_test"]')) $(eval REGION := $(shell terraform -chdir=$(AWS_TERRAFORM_PATH) output -raw region)) docker tag $(SERVER_IMAGE_TAG) $(SERVER_REPO_URL):$(SERVER_IMAGE_VERSION) docker tag $(CLI_IMAGE_TAG) $(CLI_REPO_URL):$(CLI_IMAGE_VERSION) + docker tag $(LOAD_TEST_IMAGE_TAG) $(LOAD_TEST_REPO_URL):$(LOAD_TEST_IMAGE_VERSION) aws ecr get-login-password --region $(REGION) | docker login --username AWS --password-stdin $(SERVER_REPO_URL) docker push $(SERVER_REPO_URL):$(SERVER_IMAGE_VERSION) - aws ecr get-login-password --region $(REGION) | docker login --username AWS --password-stdin $(SERVER_REPO_URL) + aws ecr get-login-password --region $(REGION) | docker login --username AWS --password-stdin $(CLI_REPO_URL) docker push $(CLI_REPO_URL):$(CLI_IMAGE_VERSION) + aws ecr get-login-password --region $(REGION) | docker login --username AWS --password-stdin $(LOAD_TEST_REPO_URL) + docker push $(LOAD_TEST_REPO_URL):$(LOAD_TEST_IMAGE_VERSION) aws_tunnel_grafana: $(eval GRAFANA_PORT := $(shell terraform -chdir=$(AWS_TERRAFORM_PATH) output -raw grafana_port)) diff --git a/datastores/gossip_kv/cli/main.rs b/datastores/gossip_kv/cli/main.rs index 93264bc72e64..8834da9ce459 100644 --- a/datastores/gossip_kv/cli/main.rs +++ b/datastores/gossip_kv/cli/main.rs @@ -1,7 +1,7 @@ use std::net::SocketAddr; use clap::{CommandFactory, Parser, Subcommand}; -use gossip_protocol::{ClientRequest, ClientResponse, Key}; +use gossip_kv::{ClientRequest, ClientResponse, Key}; use hydroflow::util::{bind_udp_bytes, ipv4_resolve}; use hydroflow::{hydroflow_syntax, tokio, DemuxEnum}; use tracing::error; diff --git a/datastores/gossip_kv/server/deployment/aws/terraform/.gitignore b/datastores/gossip_kv/deployment/aws/terraform/.gitignore similarity index 100% rename from datastores/gossip_kv/server/deployment/aws/terraform/.gitignore rename to datastores/gossip_kv/deployment/aws/terraform/.gitignore diff --git a/datastores/gossip_kv/server/deployment/aws/terraform/main.tf b/datastores/gossip_kv/deployment/aws/terraform/main.tf similarity index 98% rename from datastores/gossip_kv/server/deployment/aws/terraform/main.tf rename to datastores/gossip_kv/deployment/aws/terraform/main.tf index ac65aa6ba571..47799f259c45 100644 --- a/datastores/gossip_kv/server/deployment/aws/terraform/main.tf +++ b/datastores/gossip_kv/deployment/aws/terraform/main.tf @@ -97,7 +97,7 @@ module "irsa-ebs-csi" { variable "ecr_repositories" { description = "List of ECR repository names" type = list(string) - default = ["gossip_kv_server", "gossip_kv_cli"] + default = ["gossip_kv_server", "gossip_kv_cli", "gossip_kv_load_test"] } module "ecr" { @@ -158,7 +158,7 @@ resource "kubernetes_stateful_set" "gossip_kv_seed_nodes" { spec { service_name = "gossip-kv-seed-nodes" - replicas = 3 + replicas = 1 selector { match_labels = { @@ -182,7 +182,7 @@ resource "kubernetes_stateful_set" "gossip_kv_seed_nodes" { container { name = "gossip-kv-server" - image = "${module.ecr.gossip_kv_server.repository_url}:latest" + image = "${module.ecr.gossip_kv_load_test.repository_url}:latest" image_pull_policy = "Always" env { diff --git a/datastores/gossip_kv/server/deployment/aws/terraform/outputs.tf b/datastores/gossip_kv/deployment/aws/terraform/outputs.tf similarity index 100% rename from datastores/gossip_kv/server/deployment/aws/terraform/outputs.tf rename to datastores/gossip_kv/deployment/aws/terraform/outputs.tf diff --git a/datastores/gossip_kv/server/deployment/aws/terraform/terraform.tf b/datastores/gossip_kv/deployment/aws/terraform/terraform.tf similarity index 100% rename from datastores/gossip_kv/server/deployment/aws/terraform/terraform.tf rename to datastores/gossip_kv/deployment/aws/terraform/terraform.tf diff --git a/datastores/gossip_kv/server/deployment/aws/terraform/variables.tf b/datastores/gossip_kv/deployment/aws/terraform/variables.tf similarity index 100% rename from datastores/gossip_kv/server/deployment/aws/terraform/variables.tf rename to datastores/gossip_kv/deployment/aws/terraform/variables.tf diff --git a/datastores/gossip_kv/server/deployment/local/objects.yaml b/datastores/gossip_kv/deployment/local/objects.yaml similarity index 100% rename from datastores/gossip_kv/server/deployment/local/objects.yaml rename to datastores/gossip_kv/deployment/local/objects.yaml diff --git a/datastores/gossip_kv/server/deployment/local/updated_seed_node_config.yaml b/datastores/gossip_kv/deployment/local/updated_seed_node_config.yaml similarity index 100% rename from datastores/gossip_kv/server/deployment/local/updated_seed_node_config.yaml rename to datastores/gossip_kv/deployment/local/updated_seed_node_config.yaml diff --git a/datastores/gossip_kv/server/lattices/mod.rs b/datastores/gossip_kv/kv/lattices/mod.rs similarity index 100% rename from datastores/gossip_kv/server/lattices/mod.rs rename to datastores/gossip_kv/kv/lattices/mod.rs diff --git a/datastores/gossip_kv/protocol/lib.rs b/datastores/gossip_kv/kv/lib.rs similarity index 99% rename from datastores/gossip_kv/protocol/lib.rs rename to datastores/gossip_kv/kv/lib.rs index be725878a24d..15badc703be3 100644 --- a/datastores/gossip_kv/protocol/lib.rs +++ b/datastores/gossip_kv/kv/lib.rs @@ -1,6 +1,12 @@ pub mod membership; pub mod model; +pub mod server; + +pub mod lattices; + +pub mod util; + use std::collections::HashSet; use std::fmt::Display; use std::str::FromStr; diff --git a/datastores/gossip_kv/protocol/membership.rs b/datastores/gossip_kv/kv/membership.rs similarity index 100% rename from datastores/gossip_kv/protocol/membership.rs rename to datastores/gossip_kv/kv/membership.rs diff --git a/datastores/gossip_kv/protocol/model.rs b/datastores/gossip_kv/kv/model.rs similarity index 100% rename from datastores/gossip_kv/protocol/model.rs rename to datastores/gossip_kv/kv/model.rs diff --git a/datastores/gossip_kv/server/server.rs b/datastores/gossip_kv/kv/server.rs similarity index 96% rename from datastores/gossip_kv/server/server.rs rename to datastores/gossip_kv/kv/server.rs index 176f1c387fd4..a36d89013c40 100644 --- a/datastores/gossip_kv/server/server.rs +++ b/datastores/gossip_kv/kv/server.rs @@ -1,14 +1,7 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::hash::Hash; -use std::time::Duration; -use gossip_protocol::membership::{MemberData, MemberId}; -use gossip_protocol::model::{ - delete_row, upsert_row, Clock, NamespaceMap, Namespaces, RowKey, RowValue, TableMap, TableName, -}; -use gossip_protocol::GossipMessage::{Ack, Nack}; -use gossip_protocol::{ClientRequest, ClientResponse, GossipMessage, Key, Namespace}; use hydroflow::futures::{Sink, Stream}; use hydroflow::hydroflow_syntax; use hydroflow::itertools::Itertools; @@ -27,7 +20,13 @@ use serde::{Deserialize, Serialize}; use tracing::{info, trace}; use crate::lattices::BoundedSetLattice; +use crate::membership::{MemberData, MemberId}; +use crate::model::{ + delete_row, upsert_row, Clock, NamespaceMap, Namespaces, RowKey, RowValue, TableMap, TableName, +}; use crate::util::{ClientRequestWithAddress, GossipRequestWithAddress}; +use crate::GossipMessage::{Ack, Nack}; +use crate::{ClientRequest, ClientResponse, GossipMessage, Key, Namespace}; /// A trait that represents an abstract network address. In production, this will typically be /// SocketAddr. @@ -52,8 +51,8 @@ pub struct InfectingWrite { pub type MessageId = String; lazy_static! { - pub static ref EXAMPLE_COUNTER: IntCounter = - register_int_counter!("example_counter", "Counts the number of examples.").unwrap(); + pub static ref SETS_COUNTER: IntCounter = + register_int_counter!("sets", "Counts the number of SET requests processed.").unwrap(); } /// Creates a L0 key-value store server using Hydroflow. @@ -67,12 +66,13 @@ lazy_static! { pub fn server< ClientInput, ClientOutput, + ClientOutputError, GossipInput, GossipOutput, + GossipOutputError, GossipTrigger, SeedNodeStream, Addr, - E, >( client_inputs: ClientInput, client_outputs: ClientOutput, @@ -85,13 +85,14 @@ pub fn server< ) -> Hydroflow<'static> where ClientInput: Stream + Unpin + 'static, - ClientOutput: Sink<(ClientResponse, Addr), Error = E> + Unpin + 'static, + ClientOutput: Sink<(ClientResponse, Addr), Error = ClientOutputError> + Unpin + 'static, GossipInput: Stream + Unpin + 'static, - GossipOutput: Sink<(GossipMessage, Addr), Error = E> + Unpin + 'static, + GossipOutput: Sink<(GossipMessage, Addr), Error = GossipOutputError> + Unpin + 'static, GossipTrigger: Stream + Unpin + 'static, SeedNodeStream: Stream>> + Unpin + 'static, Addr: Address + DeserializeOwned + 'static, - E: Debug + 'static, + ClientOutputError: Debug + 'static, + GossipOutputError: Debug + 'static, { let my_member_id = member_info.id.clone(); // TODO: This is ugly, but the only way this works at the moment. @@ -99,13 +100,12 @@ where let member_id_3 = my_member_id.clone(); let member_id_4 = my_member_id.clone(); let member_id_5 = my_member_id.clone(); + let member_id_6 = my_member_id.clone(); hydroflow_syntax! { on_start = initialize() -> tee(); - on_start -> for_each(|_| info!("{:?}: Transducer started.", context.current_tick())); - - source_interval(Duration::from_secs(10)) -> for_each(|_| EXAMPLE_COUNTER.inc()); + on_start -> for_each(|_| info!("{:?}: Transducer {} started.", context.current_tick(), member_id_6)); seed_nodes = source_stream(seed_node_stream) -> fold::<'static>(|| Box::new(seed_nodes), |last_seed_nodes, new_seed_nodes: Vec>| { @@ -142,6 +142,9 @@ where client_in[Set] -> inspect(|request| trace!("{:?}: Received Set request: {:?}.", context.current_tick(), request)) -> map(|(key, value, _addr) : (Key, String, Addr)| upsert_row(Clock::new(context.current_tick().0), key.namespace, key.table, key.row_key, value)) + -> inspect(|_| { + SETS_COUNTER.inc(); // Bump SET metrics + }) -> writes; client_in[Delete] @@ -286,7 +289,9 @@ where gossip_messages = gossip_trigger -> flat_map( |_| { - #infecting_writes.as_reveal_ref().clone() + let infecting_writes = #infecting_writes.as_reveal_ref().clone(); + trace!("{:?}: Currently gossipping {} infecting writes.", context.current_tick(), infecting_writes.iter().filter(|(_, write)| !write.members.is_top()).count()); + infecting_writes } ) -> filter(|(_id, infecting_write)| !infecting_write.members.is_top()) @@ -346,11 +351,11 @@ where mod tests { use std::collections::HashSet; - use gossip_protocol::membership::{MemberDataBuilder, Protocol}; use hydroflow::tokio_stream::empty; use hydroflow::util::simulation::{Address, Fleet, Hostname}; use super::*; + use crate::membership::{MemberDataBuilder, Protocol}; #[hydroflow::test] async fn test_member_init() { diff --git a/datastores/gossip_kv/server/util.rs b/datastores/gossip_kv/kv/util.rs similarity index 96% rename from datastores/gossip_kv/server/util.rs rename to datastores/gossip_kv/kv/util.rs index 593970272547..4cc391da6e7a 100644 --- a/datastores/gossip_kv/server/util.rs +++ b/datastores/gossip_kv/kv/util.rs @@ -1,7 +1,8 @@ -use gossip_protocol::model::{Clock, Namespaces}; -use gossip_protocol::{ClientRequest, GossipMessage, Key}; use hydroflow::DemuxEnum; +use crate::model::{Clock, Namespaces}; +use crate::{ClientRequest, GossipMessage, Key}; + /// Convenience enum to represent a client request with the address of the client. Makes it /// possible to use `demux_enum` in the surface syntax. #[derive(Debug, DemuxEnum)] diff --git a/datastores/gossip_kv/load_test_server/Dockerfile b/datastores/gossip_kv/load_test_server/Dockerfile new file mode 100644 index 000000000000..91f27fdd22d5 --- /dev/null +++ b/datastores/gossip_kv/load_test_server/Dockerfile @@ -0,0 +1,11 @@ +FROM "hydroflow-gossip-kv-base-image:latest" AS builder +WORKDIR /usr/src/gossip-kv-server +COPY . . +RUN find . +RUN cargo build --release --workspace -p gossip_kv + +FROM rustlang/rust:nightly-slim +COPY --from=builder /usr/src/gossip-kv-server/target/release/load_test_server /usr/local/bin/load_test_server + +# Don't skip the trailing slash in the destination directory +CMD ["load_test_server"] diff --git a/datastores/gossip_kv/load_test_server/server.rs b/datastores/gossip_kv/load_test_server/server.rs new file mode 100644 index 000000000000..0690de2c978c --- /dev/null +++ b/datastores/gossip_kv/load_test_server/server.rs @@ -0,0 +1,228 @@ +use std::convert::Infallible; +use std::num::{NonZeroU32, ParseFloatError}; +use std::thread::sleep; +use std::time::Duration; + +use clap::Parser; +use gossip_kv::membership::{MemberDataBuilder, Protocol}; +use gossip_kv::{ClientRequest, GossipMessage}; +use governor::{Quota, RateLimiter}; +use prometheus::{gather, Encoder, TextEncoder}; +use tokio::runtime::Runtime; +use hydroflow::util::{unbounded_channel, unsync_channel}; +use tokio::sync::mpsc::UnboundedSender; +use tokio::task; +use tracing::{error, info, trace}; +use warp::Filter; + +type LoadTestAddress = u64; + +use gossip_kv::server::{server, SeedNode}; +use hydroflow::futures::sink::drain; +use hydroflow::futures::stream; +use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream; +use hydroflow::tokio_stream::StreamExt; +use lattices::cc_traits::Iter; + +const UNKNOWN_ADDRESS: LoadTestAddress = 9999999999; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Parser)] +struct Opts { + /// Number of threads to run. Each thread will run an instance of the gossip-kv server transducer. + #[clap(short, long, default_value = "5")] + thread_count: usize, + + /// Frequency (in seconds) at which to send gossip messages. + #[clap(short, long, default_value = "10", value_parser = clap_duration_from_secs)] + gossip_frequency: Duration, + + /// Maximum number of SET requests to send per second. + #[clap(short, long, default_value = "1")] + max_set_throughput: u32, +} + +/// Parse duration from float string for clap args. +fn clap_duration_from_secs(arg: &str) -> Result { + arg.parse().map(Duration::from_secs_f32) +} + +fn run_server( + server_name: String, + gossip_address: LoadTestAddress, + gossip_input_rx: UnboundedReceiverStream<(GossipMessage, LoadTestAddress)>, + switchboard: Switchboard, + seed_nodes: Vec>, + opts: Opts, +) { + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + let (gossip_output_tx, mut gossip_output_rx) = unsync_channel(None); + + let (gossip_trigger_tx, gossip_trigger_rx) = unbounded_channel(); + + let member_data = MemberDataBuilder::new(server_name.clone()) + .add_protocol(Protocol::new("gossip".into(), gossip_address)) + .build(); + + rt.block_on(async { + let local = task::LocalSet::new(); + + let (client_input_tx, client_input_rx) = unbounded_channel(); + + let put_throughput = opts.max_set_throughput; + local.spawn_local(async move { + let rate_limiter = RateLimiter::direct(Quota::per_second( + NonZeroU32::new(put_throughput).unwrap(), + )); + loop { + rate_limiter.until_ready().await; + let key = "/usr/table/key".parse().unwrap(); + let request = ClientRequest::Set { + key, + value: "FOOBAR".to_string(), + }; + client_input_tx.send((request, UNKNOWN_ADDRESS)).unwrap(); + } + }); + + let gossip_frequency = opts.gossip_frequency; + local.spawn_local(async move { + loop { + tokio::time::sleep(gossip_frequency).await; + gossip_trigger_tx.send(()).unwrap(); + } + }); + + // Networking + local.spawn_local(async move { + while let Some((msg, addr)) = gossip_output_rx.next().await { + trace!("Sending gossip message: {:?} to {}", msg, addr); + let outbox = switchboard.gossip_outboxes.get(addr as usize).unwrap(); + if let Err(e) = outbox.send((msg, gossip_address)) { + error!("Failed to send gossip message: {:?}", e); + } + } + }); + + local.spawn_local(async { + let mut server = server( + client_input_rx, + drain(), // Ignoring client responses for now. + gossip_input_rx, + gossip_output_tx, + gossip_trigger_rx, + member_data, + seed_nodes, + stream::empty(), + ); + + server.run_async().await + }); + + local.await + }); + }); +} + +struct Switchboard { + gossip_outboxes: Vec>, +} + +impl Clone for Switchboard { + fn clone(&self) -> Self { + Self { + gossip_outboxes: self.gossip_outboxes.clone(), + } + } +} + +impl Switchboard { + fn new() -> Self { + Self { + gossip_outboxes: Vec::new(), + } + } + fn new_outbox( + &mut self, + ) -> ( + LoadTestAddress, + UnboundedReceiverStream<(GossipMessage, LoadTestAddress)>, + ) { + let addr: LoadTestAddress = self.gossip_outboxes.len() as LoadTestAddress; + let (tx, rx) = unbounded_channel(); + self.gossip_outboxes.push(tx); + (addr, rx) + } +} + +async fn metrics_handler() -> Result { + let encoder = TextEncoder::new(); + let metric_families = gather(); + let mut buffer = Vec::new(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + + Ok(warp::reply::with_header( + buffer, + "Content-Type", + encoder.format_type(), + )) +} + +fn main() { + tracing_subscriber::fmt::init(); + + let opts: Opts = Opts::parse(); + + std::thread::spawn(move || { + let metrics_route = warp::path("metrics").and_then(metrics_handler); + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + rt.block_on(async move { + info!("Starting metrics server on port 4003"); + warp::serve(metrics_route).run(([0, 0, 0, 0], 4003)).await; + }); + + }); + + info!("Starting load test with with {} threads", opts.thread_count); + + let mut switchboard = Switchboard::new(); + + let outboxes: Vec<_> = (0..opts.thread_count) + .map(|_| { + let (addr, rx) = switchboard.new_outbox(); + (format!("SERVER-{}", addr), addr, rx) + }) + .collect(); + + let seed_nodes: Vec<_> = outboxes + .iter() + .map(|(name, addr, _)| SeedNode { + id: name.clone(), + address: *addr, + }) + .collect(); + + outboxes.into_iter().for_each(|(name, addr, outbox)| { + run_server( + name, + addr, + outbox, + switchboard.clone(), + seed_nodes.clone(), + opts, + ); + }); + + loop { + sleep(Duration::from_secs(1)); + } +} diff --git a/datastores/gossip_kv/server/main.rs b/datastores/gossip_kv/server/main.rs index f8e25793912e..cb7dac2c8b39 100644 --- a/datastores/gossip_kv/server/main.rs +++ b/datastores/gossip_kv/server/main.rs @@ -6,8 +6,9 @@ use std::io::Error; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use clap::Parser; -use gossip_protocol::membership::{MemberDataBuilder, Protocol}; -use gossip_protocol::{ClientRequest, GossipMessage}; +use gossip_kv::membership::{MemberDataBuilder, Protocol}; +use gossip_kv::server::{server, SeedNode}; +use gossip_kv::{ClientRequest, GossipMessage}; use hydroflow::futures::{SinkExt, StreamExt}; use hydroflow::tokio_stream::wrappers::IntervalStream; use hydroflow::util::{bind_udp_bytes, ipv4_resolve}; @@ -18,13 +19,10 @@ use warp::Filter; use crate::config::{setup_settings_watch, SeedNodeSettings}; use crate::membership::member_name; -use crate::server::{server, SeedNode}; mod config; -mod lattices; + mod membership; -mod server; -mod util; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Parser)] struct Opts { diff --git a/datastores/gossip_kv/server/membership.rs b/datastores/gossip_kv/server/membership.rs index 14a741ef1705..2e20ab534bf0 100644 --- a/datastores/gossip_kv/server/membership.rs +++ b/datastores/gossip_kv/server/membership.rs @@ -1,6 +1,6 @@ use std::sync::OnceLock; -use gossip_protocol::membership::MemberId; +use gossip_kv::membership::MemberId; // use rand::distributions::Distribution; // use rand::{Rng};