Skip to content

Commit

Permalink
It may be the infecting writes are not being drained.
Browse files Browse the repository at this point in the history
Bounded Queues
Fixed Node Type:m4.16xlarge
Attempting CPU affinity.
  • Loading branch information
rohitkulshreshtha committed Dec 3, 2024
1 parent 07e074c commit 7cf695c
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 27 deletions.
27 changes: 25 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datastores/gossip_kv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ license = "Apache-2.0"
publish = false

[dependencies]
affinity = "0.1.2"
clap = { version = "4.5.4", features = ["derive", "env"] }
config = "0.14.0"
governor = "0.7.0"
Expand Down
70 changes: 45 additions & 25 deletions datastores/gossip_kv/load_test_server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ use std::convert::Infallible;
use std::num::{NonZeroU32, ParseFloatError};
use std::thread::sleep;
use std::time::Duration;

use affinity::set_thread_affinity;
use clap::Parser;
use gossip_kv::membership::{MemberDataBuilder, Protocol};
use gossip_kv::{ClientRequest, GossipMessage};
use gossip_kv::{ClientRequest, GossipMessage, Key};
use governor::{Quota, RateLimiter};
use lazy_static::lazy_static;
use hydroflow::util::{unbounded_channel, unsync_channel};
use hydroflow::util::{bounded_channel, unbounded_channel, unsync_channel};
use prometheus::{gather, register_int_counter, Encoder, IntCounter, TextEncoder};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::{Sender, UnboundedSender};
use tokio::sync::watch::Receiver;
use tokio::task;
use tracing::{error, info, trace};
use warp::Filter;
Expand All @@ -20,7 +21,7 @@ type LoadTestAddress = u64;
use gossip_kv::server::{server, SeedNode};
use hydroflow::futures::sink::drain;
use hydroflow::futures::stream;
use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream;
use hydroflow::tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use hydroflow::tokio_stream::StreamExt;
use hydroflow::util::unsync::mpsc::bounded;
use lattices::cc_traits::Iter;
Expand Down Expand Up @@ -61,15 +62,19 @@ fn run_server(
seed_nodes: Vec<SeedNode<LoadTestAddress>>,
opts: Opts,
) {
let (client_input_tx, client_input_rx) = bounded_channel(1000);

std::thread::spawn(move || {
set_thread_affinity(0).unwrap();

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

let (gossip_output_tx, mut gossip_output_rx) = unsync_channel(None);

let (gossip_trigger_tx, gossip_trigger_rx) = unbounded_channel();
let (gossip_trigger_tx, gossip_trigger_rx) = unbounded_channel();

let member_data = MemberDataBuilder::new(server_name.clone())
.add_protocol(Protocol::new("gossip".into(), gossip_address))
Expand All @@ -78,25 +83,6 @@ fn run_server(
rt.block_on(async {
let local = task::LocalSet::new();

let (client_input_tx, client_input_rx) = bounded(1000);

let put_throughput = opts.max_set_throughput;
local.spawn_local(async move {
let rate_limiter = RateLimiter::direct(Quota::per_second(
NonZeroU32::new(put_throughput).unwrap(),
));
loop {
rate_limiter.until_ready().await;
let key = "/usr/table/key".parse().unwrap();
let request = ClientRequest::Set {
key,
value: "FOOBAR".to_string(),
};
client_input_tx.send((request, UNKNOWN_ADDRESS)).await.unwrap();
SETS_SENT.inc();
}
});

let gossip_frequency = opts.gossip_frequency;
local.spawn_local(async move {
loop {
Expand Down Expand Up @@ -134,6 +120,40 @@ fn run_server(
local.await
});
});

std::thread::spawn(move || {
set_thread_affinity(2).unwrap();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();



let put_throughput = opts.max_set_throughput;

rt.block_on(async {
let local = task::LocalSet::new();

local.spawn_local(async move {
let rate_limiter = RateLimiter::direct(Quota::per_second(
NonZeroU32::new(put_throughput).unwrap(),
));
let key_master : Key = "/usr/table/key".parse().unwrap();
loop {
rate_limiter.until_ready().await;
let request = ClientRequest::Set {
key: key_master.clone(),
value: "FOOBAR".to_string(),
};
client_input_tx.send((request, UNKNOWN_ADDRESS)).await.unwrap();
SETS_SENT.inc();
}
});
});

});

}

struct Switchboard {
Expand Down
6 changes: 6 additions & 0 deletions hydroflow/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ pub fn unbounded_channel<T>() -> (
(send, recv)
}

pub fn bounded_channel<T>(buffer: usize) -> (tokio::sync::mpsc::Sender<T>, tokio_stream::wrappers::ReceiverStream<T>) {
let (send, recv) = tokio::sync::mpsc::channel(buffer);
let recv = tokio_stream::wrappers::ReceiverStream::new(recv);
(send, recv)
}

/// Returns an unsync channel as a (1) sender and (2) receiver `Stream` for use in Hydroflow.
pub fn unsync_channel<T>(
capacity: Option<NonZeroUsize>,
Expand Down

0 comments on commit 7cf695c

Please sign in to comment.