Skip to content

Commit

Permalink
Eliminate some scans + store jobs in BTree (#738)
Browse files Browse the repository at this point in the history
In addition to fixing #731, this also fixes #757 -- and pulls in #733.

Co-authored-by: James MacMahon <[email protected]>
  • Loading branch information
bcantrill and jmpesp authored May 30, 2023
1 parent fcec78e commit 8757b3f
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 354 deletions.
2 changes: 1 addition & 1 deletion crucible-client-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct CrucibleOpts {
pub id: Uuid,
pub target: Vec<SocketAddr>,
pub lossy: bool,
pub flush_timeout: Option<u32>,
pub flush_timeout: Option<f32>,
pub key: Option<String>,
pub cert_pem: Option<String>,
pub key_pem: Option<String>,
Expand Down
4 changes: 2 additions & 2 deletions crutest/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ pub async fn start_cli_client(attach: SocketAddr) -> Result<()> {

println!("cli connecting to {0}", attach);

let deadline = tokio::time::sleep_until(deadline_secs(100));
let deadline = tokio::time::sleep_until(deadline_secs(100.0));
tokio::pin!(deadline);
let tcp = sock.connect(attach);
tokio::pin!(tcp);
Expand All @@ -655,7 +655,7 @@ pub async fn start_cli_client(attach: SocketAddr) -> Result<()> {
Err(e) => {
println!("connect to {0} failure: {1:?}",
attach, e);
tokio::time::sleep_until(deadline_secs(10)).await;
tokio::time::sleep_until(deadline_secs(10.0)).await;
continue 'outer;
}
}
Expand Down
2 changes: 1 addition & 1 deletion crutest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ pub struct Opt {

/// How long to wait before the auto flush check fires
#[clap(long, global = true, action)]
flush_timeout: Option<u32>,
flush_timeout: Option<f32>,

/// IP:Port for the Oximeter register address, which is Nexus.
#[clap(long, global = true, default_value = "127.0.0.1:12221", action)]
Expand Down
49 changes: 31 additions & 18 deletions measure_iops/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ pub struct Opt {

#[clap(long, action)]
bw_limit_in_bytes: Option<usize>,

/// Submit all zeroes instead of random data
#[clap(long, action)]
all_zeroes: bool,

/// How long to wait before the auto flush check fires
#[clap(long, action)]
flush_timeout: Option<f32>,
}

pub fn opts() -> Result<Opt> {
Expand All @@ -74,7 +82,7 @@ async fn main() -> Result<()> {
id: Uuid::new_v4(),
target: opt.target,
lossy: false,
flush_timeout: None,
flush_timeout: opt.flush_timeout,
key: opt.key,
cert_pem: opt.cert_pem,
key_pem: opt.key_pem,
Expand Down Expand Up @@ -128,28 +136,33 @@ async fn main() -> Result<()> {
1
};

let write_buffers: Vec<Bytes> = (0..io_depth)
.map(|_| {
Bytes::from(
(0..io_size)
.map(|_| rng.sample(rand::distributions::Standard))
.collect::<Vec<u8>>(),
)
})
.collect();

let read_buffers: Vec<Buffer> =
(0..io_depth).map(|_| Buffer::new(io_size)).collect();

let mut io_operations_sent = 0;
let mut bw_consumed = 0;
let mut io_operation_time = Instant::now();
let mut measurement_time = Instant::now();
let mut total_io_time = Duration::ZERO;
let mut iops: Vec<f32> = vec![];
let mut bws: Vec<f32> = vec![];

'outer: loop {
let mut futures = Vec::with_capacity(io_depth);

let write_buffers: Vec<Bytes> = (0..io_depth)
.map(|_| {
Bytes::from(if opt.all_zeroes {
vec![0u8; io_size]
} else {
(0..io_size)
.map(|_| rng.sample(rand::distributions::Standard))
.collect::<Vec<u8>>()
})
})
.collect();

let io_operation_time = Instant::now();

for i in 0..io_depth {
let offset: u64 =
rng.gen::<u64>() % (total_blocks - io_size as u64 / bsz);
Expand All @@ -173,15 +186,14 @@ async fn main() -> Result<()> {

crucible::join_all(futures).await?;

total_io_time += io_operation_time.elapsed();
io_operations_sent +=
ceiling_div!(io_size * io_depth, 16 * 1024 * 1024);
bw_consumed += io_size * io_depth;

let diff = io_operation_time.elapsed();

if diff > Duration::from_secs(1) {
let fractional_seconds: f32 =
diff.as_secs() as f32 + (diff.subsec_nanos() as f32 / 1e9);
if measurement_time.elapsed() > Duration::from_secs(1) {
let fractional_seconds: f32 = total_io_time.as_secs() as f32
+ (total_io_time.subsec_nanos() as f32 / 1e9);

iops.push(io_operations_sent as f32 / fractional_seconds);
bws.push(bw_consumed as f32 / fractional_seconds);
Expand All @@ -192,7 +204,8 @@ async fn main() -> Result<()> {

io_operations_sent = 0;
bw_consumed = 0;
io_operation_time = Instant::now();
measurement_time = Instant::now();
total_io_time = Duration::ZERO;
}
}

Expand Down
5 changes: 2 additions & 3 deletions openapi/crucible-pantry.json
Original file line number Diff line number Diff line change
Expand Up @@ -481,9 +481,8 @@
},
"flush_timeout": {
"nullable": true,
"type": "integer",
"format": "uint32",
"minimum": 0
"type": "number",
"format": "float"
},
"id": {
"type": "string",
Expand Down
2 changes: 1 addition & 1 deletion upstairs/src/dummy_downstairs_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ pub(crate) mod protocol_test {
let crucible_opts = CrucibleOpts {
id: Uuid::new_v4(),
target: vec![ds1.local_addr, ds2.local_addr, ds3.local_addr],
flush_timeout: Some(600),
flush_timeout: Some(600.0),

..Default::default()
};
Expand Down
Loading

0 comments on commit 8757b3f

Please sign in to comment.