diff --git a/Cargo.lock b/Cargo.lock index dcb0f20fd016..308646559d45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1195,6 +1195,7 @@ dependencies = [ "notify", "prometheus", "rand", + "rand_distr", "serde", "serde_json", "shlex", diff --git a/datastores/gossip_kv/Cargo.toml b/datastores/gossip_kv/Cargo.toml index 384a168879e8..05ef5641a7a7 100644 --- a/datastores/gossip_kv/Cargo.toml +++ b/datastores/gossip_kv/Cargo.toml @@ -20,6 +20,7 @@ lazy_static = "1.5.0" notify = { version = "6.1.1", default-features = false, features = ["macos_kqueue"] } prometheus = "0.13.4" rand = "0.8.5" +rand_distr = "0.4.3" serde = "1.0.203" serde_json = "1.0.117" shlex = "1.3.0" diff --git a/datastores/gossip_kv/load_test_server/server.rs b/datastores/gossip_kv/load_test_server/server.rs index 202aca7e19a3..53321a631d7f 100644 --- a/datastores/gossip_kv/load_test_server/server.rs +++ b/datastores/gossip_kv/load_test_server/server.rs @@ -10,6 +10,8 @@ use governor::{Quota, RateLimiter}; use lazy_static::lazy_static; use hydroflow::util::{unbounded_channel, unsync_channel}; use prometheus::{gather, register_int_counter, Encoder, IntCounter, TextEncoder}; +use rand::Rng; +use rand_distr::Zipf; use tokio::sync::mpsc::{Sender, UnboundedSender}; use tokio::sync::watch::Receiver; use tokio::task; @@ -106,10 +108,12 @@ fn run_server( let put_throughput = opts.max_set_throughput; local.spawn_local(async move { - let key_master : u64 = 100; + let mut rng = rand::thread_rng(); + let zipf: Zipf = rand_distr::Zipf::new(1_000_000, 1.07).unwrap(); + loop { let request = ClientRequest::Set { - key: key_master.clone(), + key: rng.sample(zipf) as u64, value: "FOOBAR".to_string(), }; client_input_tx.send((request, UNKNOWN_ADDRESS)).await.unwrap();