From 8757b3fb55a1763382ad7111144bc60ec72af23d Mon Sep 17 00:00:00 2001 From: Bryan Cantrill Date: Tue, 30 May 2023 15:02:06 -0700 Subject: [PATCH] Eliminate some scans + store jobs in BTree (#738) In addition to fixing #731, this also fixes #757 -- and pulls in #733. Co-authored-by: James MacMahon --- crucible-client-types/src/lib.rs | 2 +- crutest/src/cli.rs | 4 +- crutest/src/main.rs | 2 +- measure_iops/src/main.rs | 49 ++-- openapi/crucible-pantry.json | 5 +- upstairs/src/dummy_downstairs_tests.rs | 2 +- upstairs/src/lib.rs | 387 +++++++++++-------------- upstairs/src/live_repair.rs | 150 +++------- 8 files changed, 247 insertions(+), 354 deletions(-) diff --git a/crucible-client-types/src/lib.rs b/crucible-client-types/src/lib.rs index 7e5818d12..0ca03eb2f 100644 --- a/crucible-client-types/src/lib.rs +++ b/crucible-client-types/src/lib.rs @@ -44,7 +44,7 @@ pub struct CrucibleOpts { pub id: Uuid, pub target: Vec, pub lossy: bool, - pub flush_timeout: Option, + pub flush_timeout: Option, pub key: Option, pub cert_pem: Option, pub key_pem: Option, diff --git a/crutest/src/cli.rs b/crutest/src/cli.rs index 7a72935ce..5f32515f8 100644 --- a/crutest/src/cli.rs +++ b/crutest/src/cli.rs @@ -635,7 +635,7 @@ pub async fn start_cli_client(attach: SocketAddr) -> Result<()> { println!("cli connecting to {0}", attach); - let deadline = tokio::time::sleep_until(deadline_secs(100)); + let deadline = tokio::time::sleep_until(deadline_secs(100.0)); tokio::pin!(deadline); let tcp = sock.connect(attach); tokio::pin!(tcp); @@ -655,7 +655,7 @@ pub async fn start_cli_client(attach: SocketAddr) -> Result<()> { Err(e) => { println!("connect to {0} failure: {1:?}", attach, e); - tokio::time::sleep_until(deadline_secs(10)).await; + tokio::time::sleep_until(deadline_secs(10.0)).await; continue 'outer; } } diff --git a/crutest/src/main.rs b/crutest/src/main.rs index c6ceca2ac..2cba5ab9c 100644 --- a/crutest/src/main.rs +++ b/crutest/src/main.rs @@ -181,7 +181,7 @@ pub struct Opt { /// How long to wait before the auto flush check fires #[clap(long, global = true, action)] - flush_timeout: Option, + flush_timeout: Option, /// IP:Port for the Oximeter register address, which is Nexus. #[clap(long, global = true, default_value = "127.0.0.1:12221", action)] diff --git a/measure_iops/src/main.rs b/measure_iops/src/main.rs index cf41ca55b..f15103786 100644 --- a/measure_iops/src/main.rs +++ b/measure_iops/src/main.rs @@ -48,6 +48,14 @@ pub struct Opt { #[clap(long, action)] bw_limit_in_bytes: Option, + + /// Submit all zeroes instead of random data + #[clap(long, action)] + all_zeroes: bool, + + /// How long to wait before the auto flush check fires + #[clap(long, action)] + flush_timeout: Option, } pub fn opts() -> Result { @@ -74,7 +82,7 @@ async fn main() -> Result<()> { id: Uuid::new_v4(), target: opt.target, lossy: false, - flush_timeout: None, + flush_timeout: opt.flush_timeout, key: opt.key, cert_pem: opt.cert_pem, key_pem: opt.key_pem, @@ -128,28 +136,33 @@ async fn main() -> Result<()> { 1 }; - let write_buffers: Vec = (0..io_depth) - .map(|_| { - Bytes::from( - (0..io_size) - .map(|_| rng.sample(rand::distributions::Standard)) - .collect::>(), - ) - }) - .collect(); - let read_buffers: Vec = (0..io_depth).map(|_| Buffer::new(io_size)).collect(); let mut io_operations_sent = 0; let mut bw_consumed = 0; - let mut io_operation_time = Instant::now(); + let mut measurement_time = Instant::now(); + let mut total_io_time = Duration::ZERO; let mut iops: Vec = vec![]; let mut bws: Vec = vec![]; 'outer: loop { let mut futures = Vec::with_capacity(io_depth); + let write_buffers: Vec = (0..io_depth) + .map(|_| { + Bytes::from(if opt.all_zeroes { + vec![0u8; io_size] + } else { + (0..io_size) + .map(|_| rng.sample(rand::distributions::Standard)) + .collect::>() + }) + }) + .collect(); + + let io_operation_time = Instant::now(); + for i in 0..io_depth { let offset: u64 = rng.gen::() % (total_blocks - io_size as u64 / bsz); @@ -173,15 +186,14 @@ async fn main() -> Result<()> { crucible::join_all(futures).await?; + total_io_time += io_operation_time.elapsed(); io_operations_sent += ceiling_div!(io_size * io_depth, 16 * 1024 * 1024); bw_consumed += io_size * io_depth; - let diff = io_operation_time.elapsed(); - - if diff > Duration::from_secs(1) { - let fractional_seconds: f32 = - diff.as_secs() as f32 + (diff.subsec_nanos() as f32 / 1e9); + if measurement_time.elapsed() > Duration::from_secs(1) { + let fractional_seconds: f32 = total_io_time.as_secs() as f32 + + (total_io_time.subsec_nanos() as f32 / 1e9); iops.push(io_operations_sent as f32 / fractional_seconds); bws.push(bw_consumed as f32 / fractional_seconds); @@ -192,7 +204,8 @@ async fn main() -> Result<()> { io_operations_sent = 0; bw_consumed = 0; - io_operation_time = Instant::now(); + measurement_time = Instant::now(); + total_io_time = Duration::ZERO; } } diff --git a/openapi/crucible-pantry.json b/openapi/crucible-pantry.json index f71528f5a..a52424f0f 100644 --- a/openapi/crucible-pantry.json +++ b/openapi/crucible-pantry.json @@ -481,9 +481,8 @@ }, "flush_timeout": { "nullable": true, - "type": "integer", - "format": "uint32", - "minimum": 0 + "type": "number", + "format": "float" }, "id": { "type": "string", diff --git a/upstairs/src/dummy_downstairs_tests.rs b/upstairs/src/dummy_downstairs_tests.rs index 51dbffcd1..ac9679564 100644 --- a/upstairs/src/dummy_downstairs_tests.rs +++ b/upstairs/src/dummy_downstairs_tests.rs @@ -364,7 +364,7 @@ pub(crate) mod protocol_test { let crucible_opts = CrucibleOpts { id: Uuid::new_v4(), target: vec![ds1.local_addr, ds2.local_addr, ds3.local_addr], - flush_timeout: Some(600), + flush_timeout: Some(600.0), ..Default::default() }; diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index ca4a9b7a0..99df635c7 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -4,7 +4,7 @@ #![allow(clippy::mutex_atomic)] use std::clone::Clone; -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::convert::TryFrom; use std::fmt; use std::fmt::{Debug, Formatter}; @@ -21,7 +21,6 @@ pub use crucible_protocol::*; use anyhow::{anyhow, bail, Result}; pub use bytes::{Bytes, BytesMut}; use futures::{SinkExt, StreamExt}; -use itertools::Itertools; use oximeter::types::ProducerRegistry; use rand::prelude::*; use ringbuffer::{AllocRingBuffer, RingBufferExt, RingBufferWrite}; @@ -133,9 +132,9 @@ pub trait BlockIO: Sync { /// implemented for those. async fn replace_downstairs( &self, - id: Uuid, - old: SocketAddr, - new: SocketAddr, + _id: Uuid, + _old: SocketAddr, + _new: SocketAddr, ) -> Result { panic!("should never hit here!"); } @@ -292,6 +291,7 @@ mod cdt { fn gw__read__start(_: u64) {} fn gw__write__start(_: u64) {} fn gw__write__unwritten__start(_: u64) {} + fn gw__write__deps(_: u64, _: u64) {} fn gw__flush__start(_: u64) {} fn gw__close__start(_: u64, _: u32) {} fn gw__repair__start(_: u64, _: u32) {} @@ -338,9 +338,9 @@ mod cdt { fn volume__flush__done(_: u32, _: Uuid) {} } -pub fn deadline_secs(secs: u64) -> Instant { +pub fn deadline_secs(secs: f32) -> Instant { Instant::now() - .checked_add(Duration::from_secs(secs)) + .checked_add(Duration::from_secs_f32(secs)) .unwrap() } @@ -600,16 +600,28 @@ where new_work.sort_unstable(); let mut active_count = u.downstairs.lock().await.submitted_work(client_id); - for new_id in new_work.iter() { + for ndx in 0..new_work.len() { if active_count >= 100 { - // Flow control enacted, stop sending work + // Flow control enacted, stop sending work -- and requeue all of + // our remaining work to assure it isn't dropped + u.downstairs + .lock() + .await + .requeue_work(client_id, &new_work[ndx..]); return Ok(true); } + + let new_id = new_work[ndx]; + /* * Walk the list of work to do, update its status as in progress * and send the details to our downstairs. */ if u.lossy && random() && random() { + /* + * Requeue this work so it isn't completely lost. + */ + u.downstairs.lock().await.requeue_work(client_id, &[new_id]); continue; } @@ -617,7 +629,7 @@ where * If in_progress returns None, it means that this job on this * client should be skipped. */ - let job = u.downstairs.lock().await.in_progress(*new_id, client_id); + let job = u.downstairs.lock().await.in_progress(new_id, client_id); if job.is_none() { continue; } @@ -632,14 +644,14 @@ where .live_repair_dep_check( client_id, dependencies.clone(), - *new_id, + new_id, ) .await; - cdt::ds__write__io__start!(|| (*new_id, client_id as u64)); + cdt::ds__write__io__start!(|| (new_id, client_id as u64)); fw.send(Message::Write { upstairs_id: u.uuid, session_id: u.session_id, - job_id: *new_id, + job_id: new_id, dependencies: deps.clone(), writes: writes.clone(), }) @@ -653,17 +665,17 @@ where .live_repair_dep_check( client_id, dependencies.clone(), - *new_id, + new_id, ) .await; cdt::ds__write__unwritten__io__start!(|| ( - *new_id, + new_id, client_id as u64 )); fw.send(Message::WriteUnwritten { upstairs_id: u.uuid, session_id: u.session_id, - job_id: *new_id, + job_id: new_id, dependencies: deps.clone(), writes: writes.clone(), }) @@ -680,7 +692,7 @@ where .live_repair_dep_check( client_id, dependencies.clone(), - *new_id, + new_id, ) .await; // If our downstairs is under repair, then include any extent @@ -693,11 +705,11 @@ where } else { None }; - cdt::ds__flush__io__start!(|| (*new_id, client_id as u64)); + cdt::ds__flush__io__start!(|| (new_id, client_id as u64)); fw.send(Message::Flush { upstairs_id: u.uuid, session_id: u.session_id, - job_id: *new_id, + job_id: new_id, dependencies: deps.clone(), flush_number, gen_number, @@ -714,14 +726,14 @@ where .live_repair_dep_check( client_id, dependencies.clone(), - *new_id, + new_id, ) .await; - cdt::ds__read__io__start!(|| (*new_id, client_id as u64)); + cdt::ds__read__io__start!(|| (new_id, client_id as u64)); fw.send(Message::ReadRequest { upstairs_id: u.uuid, session_id: u.session_id, - job_id: *new_id, + job_id: new_id, dependencies: deps.clone(), requests, }) @@ -748,16 +760,16 @@ where .live_repair_dep_check( client_id, dependencies.clone(), - *new_id, + new_id, ) .await; - cdt::ds__close__start!(|| (*new_id, client_id as u64, extent)); + cdt::ds__close__start!(|| (new_id, client_id as u64, extent)); if repair_downstairs.contains(&client_id) { // We are the downstairs being repaired, so just close. fw.send(Message::ExtentLiveClose { upstairs_id: u.uuid, session_id: u.session_id, - job_id: *new_id, + job_id: new_id, dependencies: deps.clone(), extent_id: extent, }) @@ -766,7 +778,7 @@ where fw.send(Message::ExtentLiveFlushClose { upstairs_id: u.uuid, session_id: u.session_id, - job_id: *new_id, + job_id: new_id, dependencies: deps.clone(), extent_id: extent, flush_number, @@ -786,15 +798,15 @@ where .live_repair_dep_check( client_id, dependencies.clone(), - *new_id, + new_id, ) .await; - cdt::ds__repair__start!(|| (*new_id, client_id as u64, extent)); + cdt::ds__repair__start!(|| (new_id, client_id as u64, extent)); if repair_downstairs.contains(&client_id) { fw.send(Message::ExtentLiveRepair { upstairs_id: u.uuid, session_id: u.session_id, - job_id: *new_id, + job_id: new_id, dependencies: deps.clone(), extent_id: extent, source_client_id: source_downstairs, @@ -805,7 +817,7 @@ where fw.send(Message::ExtentLiveNoOp { upstairs_id: u.uuid, session_id: u.session_id, - job_id: *new_id, + job_id: new_id, dependencies: deps.clone(), }) .await? @@ -819,14 +831,14 @@ where .live_repair_dep_check( client_id, dependencies.clone(), - *new_id, + new_id, ) .await; - cdt::ds__reopen__start!(|| (*new_id, client_id as u64, extent)); + cdt::ds__reopen__start!(|| (new_id, client_id as u64, extent)); fw.send(Message::ExtentLiveReopen { upstairs_id: u.uuid, session_id: u.session_id, - job_id: *new_id, + job_id: new_id, dependencies: deps.clone(), extent_id: extent, }) @@ -837,14 +849,14 @@ where .live_repair_dep_check( client_id, dependencies.clone(), - *new_id, + new_id, ) .await; - cdt::ds__noop__start!(|| (*new_id, client_id as u64)); + cdt::ds__noop__start!(|| (new_id, client_id as u64)); fw.send(Message::ExtentLiveNoOp { upstairs_id: u.uuid, session_id: u.session_id, - job_id: *new_id, + job_id: new_id, dependencies: deps.clone(), }) .await? @@ -974,8 +986,8 @@ where let mut negotiated = 0; // XXX figure out what deadlines make sense here - let mut ping_interval = deadline_secs(5); - let mut timeout_deadline = deadline_secs(50); + let mut ping_interval = deadline_secs(5.0); + let mut timeout_deadline = deadline_secs(50.0); /* * Either we get all the way through the negotiation, or we hit the @@ -1070,7 +1082,7 @@ where } _ = sleep_until(ping_interval) => { fw.send(Message::Ruok).await?; - ping_interval = deadline_secs(5); + ping_interval = deadline_secs(5.0); } r = up_coms.ds_active_rx.changed(), if negotiated == 1 && !self_promotion => @@ -1111,8 +1123,8 @@ where } f = fr.next() => { // When the downstairs responds, push the deadlines - timeout_deadline = deadline_secs(50); - ping_interval = deadline_secs(5); + timeout_deadline = deadline_secs(50.0); + ping_interval = deadline_secs(5.0); match f.transpose()? { None => { @@ -1753,9 +1765,9 @@ where * * XXX figure out what deadlines make sense here */ - let mut more_work_interval = deadline_secs(1); - let mut ping_interval = deadline_secs(10); - let mut timeout_deadline = deadline_secs(50); + let mut more_work_interval = deadline_secs(1.0); + let mut ping_interval = deadline_secs(10.0); + let mut timeout_deadline = deadline_secs(50.0); let mut ping_count = 0; /* @@ -1837,8 +1849,8 @@ where } f = fr.next() => { // When the downstairs responds, push the deadlines - timeout_deadline = deadline_secs(50); - ping_interval = deadline_secs(10); + timeout_deadline = deadline_secs(50.0); + ping_interval = deadline_secs(10.0); match f.transpose()? { None => { @@ -1965,7 +1977,7 @@ where ); more_work = true; - more_work_interval = deadline_secs(1); + more_work_interval = deadline_secs(1.0); } } _ = sleep_until(more_work_interval), if more_work => { @@ -1974,7 +1986,7 @@ where warn!(up.log, "[{}] flow control end ", up_coms.client_id); } - more_work_interval = deadline_secs(1); + more_work_interval = deadline_secs(1.0); } /* * Don't wait more than 50 seconds to hear from the other side. @@ -2018,7 +2030,7 @@ where bail!("[{}] exits ping deactivation", up_coms.client_id); } - ping_interval = deadline_secs(10); + ping_interval = deadline_secs(10.0); } } } @@ -2061,14 +2073,14 @@ where * reconciliation work notification to arrive from the upstairs task * responsible for making all downstairs the same. */ - let mut ping_interval = deadline_secs(5); - let mut timeout_deadline = deadline_secs(40); + let mut ping_interval = deadline_secs(5.0); + let mut timeout_deadline = deadline_secs(40.0); loop { tokio::select! { f = fr.next() => { // When the downstairs responds, push the deadlines - timeout_deadline = deadline_secs(40); - ping_interval = deadline_secs(5); + timeout_deadline = deadline_secs(40.0); + ping_interval = deadline_secs(5.0); match f.transpose()? { None => { @@ -2374,7 +2386,7 @@ where bail!("[{}] exits ping deactivation", up_coms.client_id); } - ping_interval = deadline_secs(10); + ping_interval = deadline_secs(10.0); } } } @@ -2483,7 +2495,7 @@ async fn looper( info!(log, "[{1}] connecting to {0}", target, up_coms.client_id); } notify = (notify + 1) % 10; - let deadline = tokio::time::sleep_until(deadline_secs(10)); + let deadline = tokio::time::sleep_until(deadline_secs(10.0)); tokio::pin!(deadline); let tcp = sock.connect(target); tokio::pin!(tcp); @@ -2661,7 +2673,12 @@ struct Downstairs { /** * The active list of IO for the downstairs. */ - ds_active: HashMap, + ds_active: BTreeMap, + + /** + * Cache of new jobs, indexed by client ID. + */ + ds_new: Vec>, /** * Jobs that have been skipped, indexed by client ID. @@ -2800,7 +2817,8 @@ impl Downstairs { ds_state: vec![DsState::New; 3], ds_last_flush: vec![0; 3], downstairs_errors: HashMap::new(), - ds_active: HashMap::new(), + ds_active: BTreeMap::new(), + ds_new: vec![Vec::new(); 3], ds_skipped_jobs: [HashSet::new(), HashSet::new(), HashSet::new()], completed: AllocRingBuffer::with_capacity(2048), completed_jobs: AllocRingBuffer::with_capacity(8), @@ -3138,19 +3156,14 @@ impl Downstairs { * when there is work in the queue. */ fn ds_deactivate_offline(&mut self, client_id: u8) { - let mut kvec: Vec = - self.ds_active.keys().cloned().collect::>(); - kvec.sort_unstable(); - info!( self.log, "[{}] client skip all {} jobs for deactivate", client_id, - kvec.len(), + self.ds_active.len(), ); - for ds_id in kvec.iter() { - let job = self.ds_active.get_mut(ds_id).unwrap(); + for (ds_id, job) in self.ds_active.iter_mut() { let state = job.state.get(&client_id).unwrap(); if *state == IOState::InProgress || *state == IOState::New { @@ -3162,6 +3175,10 @@ impl Downstairs { self.ds_skipped_jobs[client_id as usize].insert(*ds_id); } } + + // All of IOState::New jobs are now IOState::Skipped, so clear our + // cache of new jobs for this downstairs. + self.ds_new[client_id as usize].clear(); } /** @@ -3182,23 +3199,17 @@ impl Downstairs { */ fn re_new(&mut self, client_id: u8) { let lf = self.ds_last_flush[client_id as usize]; - let mut kvec: Vec = - self.ds_active.keys().cloned().collect::>(); - kvec.sort_unstable(); info!( self.log, - "[{}] client re-new {} jobs since flush {}", - client_id, - kvec.len(), - lf + "[{client_id}] client re-new {} jobs since flush {lf}", + self.ds_active.len(), ); - for ds_id in kvec.iter() { - let is_read = self.is_read(*ds_id).unwrap(); - let wc = self.state_count(*ds_id).unwrap(); - let jobs_completed_ok = wc.completed_ok(); - let job = self.ds_active.get_mut(ds_id).unwrap(); + for (ds_id, job) in self.ds_active.iter_mut() { + let is_read = job.work.is_read(); + let wc = job.state_count(); + let jobs_completed_ok = wc.completed_ok(); // We don't need to send anything before our last good flush if *ds_id <= lf { @@ -3251,6 +3262,7 @@ impl Downstairs { if old_state != IOState::New { self.io_state_count.decr(&old_state, client_id); self.io_state_count.incr(&IOState::New, client_id); + self.ds_new[client_id as usize].push(*ds_id); } } } @@ -3263,20 +3275,15 @@ impl Downstairs { // notify the correct upstairs task that all downstairs related work // for a skipped job has completed. fn ds_set_faulted(&mut self, client_id: u8) -> bool { - let mut kvec: Vec = - self.ds_active.keys().cloned().collect::>(); - kvec.sort_unstable(); - info!( self.log, - "[{}] client skip {} in process jobs because fault", - client_id, - kvec.len(), + "[{client_id}] client skip {} in process jobs because fault", + self.ds_active.len(), ); let mut notify_guest = false; - for ds_id in kvec.iter() { - let job = self.ds_active.get_mut(ds_id).unwrap(); + let mut retire_check = vec![]; + for (ds_id, job) in self.ds_active.iter_mut() { let state = job.state.get(&client_id).unwrap(); if *state == IOState::InProgress || *state == IOState::New { @@ -3293,7 +3300,9 @@ impl Downstairs { // Check to see if this being skipped means we can ACK // the job back to the guest. if job.ack_status == AckStatus::Acked { - self.retire_check(*ds_id); + // Push this onto a queue to do the retire check when + // we aren't doing a mutable iteration. + retire_check.push(*ds_id); } else if job.ack_status == AckStatus::NotAcked { let wc = job.state_count(); if (wc.error + wc.skipped + wc.done) == 3 { @@ -3315,6 +3324,15 @@ impl Downstairs { } } } + + for ds_id in retire_check { + self.retire_check(ds_id); + } + + // We have eliminated all of our jobs in IOState::New above; flush + // our cache to reflect that. + self.ds_new[client_id as usize].clear(); + // As this downstairs is now faulted, we clear the extent_limit. self.extent_limit[client_id as usize] = None; notify_guest @@ -3324,17 +3342,16 @@ impl Downstairs { * Return a list of downstairs request IDs that represent unissued * requests for this client. */ - fn new_work(&self, client_id: u8) -> Vec { - self.ds_active - .values() - .filter_map(|job| { - if let Some(IOState::New) = job.state.get(&client_id) { - Some(job.ds_id) - } else { - None - } - }) - .collect() + fn new_work(&mut self, client_id: u8) -> Vec { + self.ds_new[client_id as usize].drain(..).collect() + } + + /** + * Called to requeue work that was previously found by calling + * [`new_work`], presumably due to flow control. + */ + fn requeue_work(&mut self, client_id: u8, work: &[u64]) { + self.ds_new[client_id as usize].extend_from_slice(work); } /** @@ -3342,12 +3359,7 @@ impl Downstairs { * for this client, but don't yet have a response. */ fn submitted_work(&self, client_id: u8) -> usize { - self.ds_active - .values() - .filter(|job| { - Some(&IOState::InProgress) == job.state.get(&client_id) - }) - .count() + self.io_state_count.in_progress[client_id as usize] as usize } /** @@ -3355,14 +3367,11 @@ impl Downstairs { * work we have for a downstairs. */ fn total_live_work(&self, client_id: u8) -> usize { - self.ds_active - .values() - .filter(|job| { - Some(&IOState::InProgress) == job.state.get(&client_id) - || Some(&IOState::New) == job.state.get(&client_id) - }) - .count() + (self.io_state_count.new[client_id as usize] + + self.io_state_count.in_progress[client_id as usize]) + as usize } + /** * Build a list of jobs that are ready to be acked. */ @@ -3409,6 +3418,7 @@ impl Downstairs { if io.work.send_io_live_repair(my_limit) { // Leave this IO as New, the downstairs will receive it. self.io_state_count.incr(&IOState::New, cid); + self.ds_new[cid as usize].push(io.ds_id); } else { // Move this IO to skipped, we are not ready for // the downstairs to receive it. @@ -3420,6 +3430,7 @@ impl Downstairs { } _ => { self.io_state_count.incr(&IOState::New, cid); + self.ds_new[cid as usize].push(io.ds_id); } } } @@ -3463,6 +3474,7 @@ impl Downstairs { } _ => { self.io_state_count.incr(&IOState::New, cid); + self.ds_new[cid as usize].push(io.ds_id); } } } @@ -4475,26 +4487,23 @@ impl Downstairs { assert!(!self.completed.contains(&ds_id)); assert_eq!(wc.active, 0); - // Sort the job list, and retire all the jobs that happened before - // and including this flush. - - let mut kvec: Vec = self - .ds_active - .keys() - .cloned() - .filter(|&x| x <= ds_id) - .collect::>(); + // Retire all the jobs that happened before and including this + // flush. + let mut retired = Vec::new(); - kvec.sort_unstable(); + loop { + let id = match self.ds_active.keys().next() { + Some(id) if *id > ds_id => break, + None => break, + Some(id) => *id, + }; - let mut retired = Vec::new(); - for id in kvec.iter() { // Remove everything before this flush (because flushes depend // on everything, and everything depends on flushes). - assert!(*id <= ds_id); + assert!(id <= ds_id); // Assert the job is actually done, then complete it - let wc = self.state_count(*id).unwrap(); + let wc = self.state_count(id).unwrap(); // While we don't expect any jobs to still be in progress, // there is nothing to prevent a flush ACK from getting @@ -4508,10 +4517,9 @@ impl Downstairs { continue; } assert_eq!(wc.error + wc.skipped + wc.done, 3); + assert!(!self.completed.contains(&id)); - assert!(!self.completed.contains(id)); - - let oj = self.ds_active.get(id).unwrap(); + let oj = self.ds_active.get(&id).unwrap(); if oj.ack_status != AckStatus::Acked { warn!( self.log, @@ -4521,11 +4529,11 @@ impl Downstairs { ); continue; } - let oj = self.ds_active.remove(id).unwrap(); + let oj = self.ds_active.remove(&id).unwrap(); assert_eq!(oj.ack_status, AckStatus::Acked); - assert_eq!(oj.ds_id, *id); + assert_eq!(oj.ds_id, id); retired.push(oj.ds_id); - self.completed.push(*id); + self.completed.push(id); let summary = oj.io_summarize(); self.completed_jobs.push(summary); for cid in 0..3 { @@ -4563,24 +4571,6 @@ impl Downstairs { } } - /** - * Check if an active job is a read or not. - */ - fn is_read(&self, ds_id: u64) -> Result { - let job = self - .ds_active - .get(&ds_id) - .ok_or_else(|| anyhow!("reqid {} is not active", ds_id))?; - - match &job.work { - IOop::Read { - dependencies: _dependencies, - requests: _, - } => Ok(true), - _ => Ok(false), - } - } - fn client_error( &self, ds_id: u64, @@ -5316,9 +5306,7 @@ impl Upstairs { } let mut ds = self.downstairs.lock().await; - let mut kvec: Vec = - ds.ds_active.keys().cloned().collect::>(); - if kvec.is_empty() { + if ds.ds_active.is_empty() { info!(self.log, "[{}] deactivate, no work so YES", client_id); self.ds_transition_with_lock( &mut ds, @@ -5328,13 +5316,13 @@ impl Upstairs { ); return true; } else { - kvec.sort_unstable(); + let last_id = ds.ds_active.keys().next_back().unwrap(); + /* * The last job must be a flush. It's possible to get * here right after deactivating is set, but before the final * flush happens. */ - let last_id = kvec.last().unwrap(); if !ds.is_flush(*last_id).unwrap() { info!( self.log, @@ -5348,8 +5336,7 @@ impl Upstairs { * Now count our jobs. Any job not done or skipped means * we are not ready to deactivate. */ - for id in kvec.iter() { - let job = ds.ds_active.get(id).unwrap(); + for (id, job) in &ds.ds_active { let state = job.state.get(&client_id).unwrap(); if state == &IOState::New || state == &IOState::InProgress { info!( @@ -5547,21 +5534,12 @@ impl Upstairs { * all jobs. It's currently important that flushes depend on everything, * and everything depends on flushes. */ - let num_jobs = downstairs.ds_active.keys().len(); + let num_jobs = downstairs.ds_active.len(); let mut dep: Vec = Vec::with_capacity(num_jobs); - for job_id in downstairs - .ds_active - .keys() - .sorted() - .collect::>() - .iter() - .rev() - { - let job = &downstairs.ds_active[job_id]; - + for (id, job) in downstairs.ds_active.iter().rev() { // Flushes must depend on everything - dep.push(**job_id); + dep.push(*id); // Depend on the last flush, but then bail out if job.work.is_flush() { @@ -5736,33 +5714,26 @@ impl Upstairs { * with an existing job, it would be nice if those were removed from * this job's dependencies. */ - let num_jobs = downstairs.ds_active.keys().len(); + let num_jobs = downstairs.ds_active.len(); let mut dep: Vec = Vec::with_capacity(num_jobs); // Search backwards in the list of active jobs - for job_id in downstairs - .ds_active - .keys() - .sorted() - .collect::>() - .iter() - .rev() - { - let job = &downstairs.ds_active[job_id]; - + for (id, job) in downstairs.ds_active.iter().rev() { // Depend on the last flush - flushes are a barrier for // all writes. if job.work.is_flush() { - dep.push(**job_id); + dep.push(*id); } // If this job impacts the same blocks as something already active, // create a dependency. if impacted_blocks.conflicts(&job.impacted_blocks) { - dep.push(**job_id); + dep.push(*id); } } + cdt::gw__write__deps!(|| (num_jobs as u64, dep.len() as u64)); + let mut writes: Vec = Vec::with_capacity(impacted_blocks.len(&ddef)); @@ -6011,25 +5982,16 @@ impl Upstairs { let mut dep: Vec = Vec::with_capacity(num_jobs); // Search backwards in the list of active jobs - for job_id in downstairs - .ds_active - .keys() - .sorted() - .collect::>() - .iter() - .rev() - { - let job = &downstairs.ds_active[job_id]; - + for (id, job) in downstairs.ds_active.iter().rev() { if job.work.is_flush() { - dep.push(**job_id); + dep.push(*id); break; } else if (job.work.is_write() | job.work.is_repair()) && impacted_blocks.conflicts(&job.impacted_blocks) { // If this is a write or repair and it impacts the same blocks // as something already active, create a dependency. - dep.push(**job_id); + dep.push(*id); } } @@ -6659,7 +6621,7 @@ impl Upstairs { send_reconcile_work(dst, *lastcast); *lastcast += 1; info!(self.log, "Sent repair work, now wait for resp"); - let mut progress_check = deadline_secs(5); + let mut progress_check = deadline_secs(5.0); /* * What to do if a downstairs goes away and never @@ -6698,7 +6660,7 @@ impl Upstairs { * an ACK from that downstairs. */ info!(self.log, "progress_check"); - progress_check = deadline_secs(5); + progress_check = deadline_secs(5.0); self.ds_state_show().await; let mut ds = self.downstairs.lock().await; if let Err(e) = ds.repair_or_abort() { @@ -7309,9 +7271,9 @@ impl Upstairs { async fn replace_downstairs( &self, - id: Uuid, - old: SocketAddr, - new: SocketAddr, + _id: Uuid, + _old: SocketAddr, + _new: SocketAddr, ) -> Result { crucible_bail!(GenericError, "write more code!"); } @@ -9706,11 +9668,11 @@ async fn up_listen( dst: Vec, mut ds_status_rx: mpsc::Receiver, mut ds_reconcile_done_rx: mpsc::Receiver, - timeout: Option, + timeout: Option, ) { info!(up.log, "up_listen starts"; "task" => "up_listen"); info!(up.log, "Wait for all three downstairs to come online"); - let flush_timeout = timeout.unwrap_or(5); + let flush_timeout = timeout.unwrap_or(0.5); info!(up.log, "Flush timeout: {}", flush_timeout); let mut lastcast = 1; @@ -9725,9 +9687,9 @@ async fn up_listen( let mut leak_deadline = Instant::now().checked_add(leak_tick).unwrap(); up.stat_update("start").await; - let mut flush_check = deadline_secs(flush_timeout.into()); - let mut show_work_interval = deadline_secs(5); - let mut repair_check_interval = deadline_secs(60); + let mut flush_check = deadline_secs(flush_timeout); + let mut show_work_interval = deadline_secs(5.0); + let mut repair_check_interval = deadline_secs(60.0); let mut repair_check = false; loop { /* @@ -9763,7 +9725,7 @@ async fn up_listen( // Set check for repair here. info!(up.log, "Set check for repair"); repair_check = true; - repair_check_interval = deadline_secs(1); + repair_check_interval = deadline_secs(1.0); } } else { info!( @@ -9801,7 +9763,7 @@ async fn up_listen( }, RepairCheck::RepairInProgress => { repair_check = true; - repair_check_interval = deadline_secs(60); + repair_check_interval = deadline_secs(60.0); info!(up.log, "Live Repair in progress, try again"); }, RepairCheck::InvalidState => { @@ -9847,11 +9809,11 @@ async fn up_listen( */ up.stat_update("loop").await; - flush_check = deadline_secs(flush_timeout.into()); + flush_check = deadline_secs(flush_timeout); } _ = sleep_until(show_work_interval) => { // show_all_work(up).await; - show_work_interval = deadline_secs(5); + show_work_interval = deadline_secs(5.0); } } } @@ -10149,7 +10111,8 @@ async fn show_all_work(up: &Arc) -> WQCounts { let up_count = up.guest.guest_work.lock().await.active.len(); let mut ds = up.downstairs.lock().await; - let mut kvec: Vec = ds.ds_active.keys().cloned().collect::>(); + let ds_count = ds.ds_active.len(); + println!( "----------------------------------------------------------------" ); @@ -10159,9 +10122,9 @@ async fn show_all_work(up: &Arc) -> WQCounts { up.get_generation().await, gior, up_count, - kvec.len(), + ds_count, ); - if kvec.is_empty() { + if ds.ds_active.is_empty() { if up_count != 0 { show_guest_work(&up.guest).await; } @@ -10179,9 +10142,7 @@ async fn show_all_work(up: &Arc) -> WQCounts { "REPLAY", ); - kvec.sort_unstable(); - for id in kvec.iter() { - let job = ds.ds_active.get(id).unwrap(); + for (id, job) in &ds.ds_active { let ack = job.ack_status; let (job_type, num_blocks): (String, usize) = match &job.work { @@ -10335,7 +10296,7 @@ async fn show_all_work(up: &Arc) -> WQCounts { WQCounts { up_count, - ds_count: kvec.len(), + ds_count, active_count, } } diff --git a/upstairs/src/live_repair.rs b/upstairs/src/live_repair.rs index 39d383fce..523a522a3 100644 --- a/upstairs/src/live_repair.rs +++ b/upstairs/src/live_repair.rs @@ -626,16 +626,7 @@ fn deps_for_live_repair( // Search backwards in the list of active jobs, stop at the // last flush - for job_id in ds - .ds_active - .keys() - .sorted() - .collect::>() - .iter() - .rev() - { - let job = &ds.ds_active[job_id]; - + for (id, job) in ds.ds_active.iter().rev() { // We are finding dependencies based on impacted blocks. // We may have reserved future job IDs for repair, and it's possible // that this job we are finding dependencies for now is actually a @@ -647,13 +638,13 @@ fn deps_for_live_repair( // If this operation impacts the same blocks as something // already active, create a dependency. if impacted_blocks.conflicts(&job.impacted_blocks) { - deps.push(**job_id); + deps.push(*id); } // A flush job won't show impacted blocks. We can stop looking // for dependencies beyond the last flush. if job.work.is_flush() { - deps.push(**job_id); + deps.push(*id); break; } } @@ -3064,9 +3055,7 @@ pub mod repair_test { // Verify that the future repair jobs were added to our IOs // dependency list. - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs[0].work.deps(), &[1000, 1001, 1002, 1003]); assert_eq!(jobs[1].work.deps(), &[1004, 1000, 1001, 1002, 1003]); @@ -3133,9 +3122,7 @@ pub mod repair_test { // // These future jobs are not actually created yet, so they // won't show up in the work queue. - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs[0].work.deps(), &[1000, 1001, 1002, 1003]); } @@ -3199,9 +3186,7 @@ pub mod repair_test { // Verify that the future repair jobs were added to our IOs // dependency list. - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs[0].work.deps(), &[1000, 1001, 1002, 1003]); } @@ -3273,9 +3258,7 @@ pub mod repair_test { // Verify that the future repair jobs were added to our IOs // dependency list. - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!( jobs[0].work.deps(), @@ -4219,9 +4202,7 @@ pub mod repair_test { create_and_enqueue_close_op(&up, eid).await; let ds = up.downstairs.lock().await; - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); // 4 Jobs were created assert_eq!(jobs.len(), 4); @@ -4274,9 +4255,7 @@ pub mod repair_test { create_and_enqueue_close_op(&up, eid).await; let ds = up.downstairs.lock().await; - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); // 4 Jobs were created assert_eq!(jobs.len(), 4); @@ -4340,9 +4319,7 @@ pub mod repair_test { create_and_enqueue_close_op(&up, eid).await; let ds = up.downstairs.lock().await; - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); // 4 Jobs were created assert_eq!(jobs.len(), 4); @@ -4388,9 +4365,7 @@ pub mod repair_test { .unwrap(); let ds = up.downstairs.lock().await; - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); // 2 Jobs were created assert_eq!(jobs.len(), 2); @@ -4430,9 +4405,7 @@ pub mod repair_test { let ds = up.downstairs.lock().await; - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 2); @@ -4466,9 +4439,7 @@ pub mod repair_test { let ds = up.downstairs.lock().await; - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 2); @@ -4511,9 +4482,7 @@ pub mod repair_test { create_and_enqueue_repair_op(&up, 1).await; let ds = up.downstairs.lock().await; - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 3); @@ -4557,9 +4526,7 @@ pub mod repair_test { .unwrap(); let ds = up.downstairs.lock().await; - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 3); @@ -4592,9 +4559,7 @@ pub mod repair_test { .unwrap(); let ds = up.downstairs.lock().await; - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 3); @@ -4624,9 +4589,7 @@ pub mod repair_test { create_and_enqueue_repair_op(&up, 1).await; let ds = up.downstairs.lock().await; - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 3); @@ -4655,9 +4618,7 @@ pub mod repair_test { let ds = up.downstairs.lock().await; - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 3); @@ -4692,9 +4653,7 @@ pub mod repair_test { create_and_enqueue_repair_op(&up, 1).await; let ds = up.downstairs.lock().await; - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 2); @@ -4728,9 +4687,7 @@ pub mod repair_test { create_and_enqueue_repair_op(&up, 0).await; let ds = up.downstairs.lock().await; - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 2); @@ -4764,10 +4721,7 @@ pub mod repair_test { create_and_enqueue_repair_op(&up, 1).await; let ds = up.downstairs.lock().await; - - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 2); @@ -4801,9 +4755,7 @@ pub mod repair_test { create_and_enqueue_repair_op(&up, 0).await; let ds = up.downstairs.lock().await; - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 2); @@ -4842,10 +4794,7 @@ pub mod repair_test { create_and_enqueue_repair_op(&up, 1).await; let ds = up.downstairs.lock().await; - - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 3); @@ -4879,9 +4828,7 @@ pub mod repair_test { create_and_enqueue_repair_op(&up, 1).await; let ds = up.downstairs.lock().await; - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 2); @@ -4948,10 +4895,7 @@ pub mod repair_test { .unwrap(); let ds = up.downstairs.lock().await; - - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 2); assert!(jobs[0].work.deps().is_empty()); @@ -4993,10 +4937,7 @@ pub mod repair_test { .unwrap(); let ds = up.downstairs.lock().await; - - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 2); @@ -5041,9 +4982,7 @@ pub mod repair_test { create_and_enqueue_repair_op(&up, 1).await; let ds = up.downstairs.lock().await; - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 3); @@ -5120,9 +5059,7 @@ pub mod repair_test { create_and_enqueue_repair_op(&up, 2).await; let ds = up.downstairs.lock().await; - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 7); @@ -5179,10 +5116,7 @@ pub mod repair_test { .unwrap(); let ds = up.downstairs.lock().await; - - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 3); @@ -5242,10 +5176,7 @@ pub mod repair_test { .unwrap(); let ds = up.downstairs.lock().await; - - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 1); @@ -5302,10 +5233,7 @@ pub mod repair_test { // empty job slots. create_and_enqueue_repair_op(&up, 1).await; let ds = up.downstairs.lock().await; - - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 2); @@ -5349,9 +5277,7 @@ pub mod repair_test { .unwrap(); let ds = up.downstairs.lock().await; - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 1); assert!(jobs[0].work.deps().is_empty()); @@ -5456,9 +5382,7 @@ pub mod repair_test { assert_eq!(ds.ds_state[0], DsState::Active); // Check all three IOs again, downstairs 1 will be skipped.. - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); for job in jobs.iter().take(3) { assert_eq!(job.state[&0], IOState::New); @@ -5495,9 +5419,7 @@ pub mod repair_test { up.abort_repair_extent(&mut gw, &mut ds, eid as u32).await; // Check all three IOs again, downstairs 1 will be skipped.. - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); assert_eq!(jobs.len(), 7); for job in jobs.iter().take(7) { @@ -5546,9 +5468,7 @@ pub mod repair_test { up.abort_repair_extent(&mut gw, &mut ds, eid as u32).await; // Check all three IOs again, all downstairs will be skipped.. - let keys: Vec<&u64> = ds.ds_active.keys().sorted().collect(); - let jobs: Vec<&DownstairsIO> = - keys.iter().map(|k| ds.ds_active.get(k).unwrap()).collect(); + let jobs: Vec<&DownstairsIO> = ds.ds_active.values().collect(); for job in jobs.iter().take(3) { assert_eq!(job.state[&0], IOState::Skipped);