diff --git a/zstor/src/actors/zstor.rs b/zstor/src/actors/zstor.rs index 2ee6819..59db0a2 100644 --- a/zstor/src/actors/zstor.rs +++ b/zstor/src/actors/zstor.rs @@ -8,7 +8,7 @@ use crate::{ config::Config, erasure::Shard, meta::{Checksum, MetaData, ShardInfo}, - zdb::{SequentialZdb, ZdbError, ZdbResult}, + zdb::{Key, SequentialZdb, ZdbConnectionInfo, ZdbError, ZdbResult}, ZstorError, ZstorResult, }; use actix::prelude::*; @@ -324,6 +324,7 @@ impl Handler for ZstorActor { }; let input = load_data(&old_metadata).await?; + let existing_data = input.clone(); let (mut metadata, shards) = pipeline .send(RebuildData { input, @@ -332,7 +333,29 @@ impl Handler for ZstorActor { }) .await??; - save_data(&mut cfg.deref().clone(), shards, &mut metadata).await?; + // build a list of the key and the backend used for the shards + let mut used_backends = Vec::new(); + for (i, data) in existing_data.iter().enumerate() { + let key = old_metadata.shards()[i].key().to_vec(); + if let Some(data) = data { + if data.as_slice() == shards[i].as_ref() { + used_backends.push((key, Some(old_metadata.shards()[i].zdb().clone()))); + } else { + used_backends.push((key, None)); + error!("Shard {} is DIFFERENT", i); + } + } else { + used_backends.push((key, None)); + } + } + + rebuild_data( + &mut cfg.deref().clone(), + shards, + &mut metadata, + used_backends, + ) + .await?; info!( "Rebuild file from {} to {}", @@ -471,63 +494,156 @@ async fn load_data(metadata: &MetaData) -> ZstorResult>>> { Ok(shards) } -async fn save_data( +async fn check_backend_space( + backend: ZdbConnectionInfo, + shard_len: usize, +) -> ZdbResult { + let db = SequentialZdb::new(backend.clone()).await?; + let ns_info = db.ns_info().await?; + match ns_info.free_space() { + insufficient if (insufficient as usize) < shard_len => Err(ZdbError::new_storage_size( + db.connection_info().clone(), + shard_len, + ns_info.free_space() as usize, + )), + _ => Ok(db), + } +} + +async fn find_valid_backends( + cfg: &mut Config, + shard_len: usize, + needed_backends: usize, + skip_backends: Vec<(Vec, Option)>, +) -> ZstorResult> { + loop { + debug!("Finding backend config"); + let backends = cfg.shard_stores()?; + let mut failed_shards = 0; + let mut valid_dbs = Vec::new(); + + let handles: Vec<_> = backends + .into_iter() + .filter(|backend| { + !skip_backends + .iter() + .any(|(_, b)| b.as_ref() == Some(backend)) + }) + .map(|backend| { + tokio::spawn(async move { check_backend_space(backend, shard_len).await }) + }) + .collect(); + + for result in join_all(handles).await { + match result? { + Ok(db) => valid_dbs.push(db), + Err(e) => { + debug!("Backend error: {}", e); + cfg.remove_shard(e.remote()); + failed_shards += 1; + } + } + } + + if valid_dbs.len() >= needed_backends && failed_shards == 0 { + return Ok(valid_dbs); + } + + debug!("Backend config failed, retrying..."); + } +} + +async fn rebuild_data( cfg: &mut Config, shards: Vec, metadata: &mut MetaData, + // used_backends specifies which backends are already used + // which also means we don't need to check it again and the shard is not missing + used_backends: Vec<(Vec, Option)>, ) -> ZstorResult<()> { let shard_len = if shards.is_empty() { 0 } else { shards[0].len() }; - - let dbs = loop { - debug!("Finding backend config"); - let backends = cfg.shard_stores()?; - - let mut failed_shards: usize = 0; - let mut handles: Vec>> = Vec::with_capacity(shards.len()); - - for backend in backends { - handles.push(tokio::spawn(async move { - let db = SequentialZdb::new(backend.clone()).await?; - // check space in backend - let ns_info = db.ns_info().await?; - match ns_info.free_space() { - insufficient if (insufficient as usize) < shard_len => { - Err(ZdbError::new_storage_size( - db.connection_info().clone(), - shard_len, - ns_info.free_space() as usize, - )) - } - _ => Ok(db), - } - })); + let mut existing_backends_num = 0; + for (_, ci) in used_backends.iter() { + if ci.is_some() { + existing_backends_num += 1; } + } - let mut dbs = Vec::new(); - for db in join_all(handles).await { - match db? { - Err(zdbe) => { - debug!("could not connect to 0-db: {}", zdbe); - cfg.remove_shard(zdbe.remote()); - failed_shards += 1; - } - Ok(db) => dbs.push(db), // no error so healthy db backend + let new_dbs = find_valid_backends( + cfg, + shard_len, + shards.len() - existing_backends_num, + used_backends.clone(), + ) + .await?; + + // create the key,connection_info, and db for the shard + // - if the backend is already used, we don't need to set the shard + // hence the None db + // - if the backend is not used, we need to set the shard + // hence the Some(db) which will be used the set the shard + let mut new_dbs = new_dbs.into_iter(); + let mut key_dbs = Vec::new(); + for (key, ci) in used_backends { + match ci { + Some(ci) => key_dbs.push((key, ci, None)), + None => { + // unwrap is safe here because we know we have enough backends from the find_valid_backends + let db = new_dbs.next().unwrap(); + key_dbs.push((key, db.connection_info().clone(), Some(db))); } } + } - // if we find one we are good - if failed_shards == 0 { - debug!("found valid backend configuration"); - break dbs; - } + let mut handles: Vec>> = Vec::with_capacity(shards.len()); + for ((existing_key, existing_ci, db), (shard_idx, shard)) in + key_dbs.into_iter().zip(shards.into_iter().enumerate()) + { + handles.push(tokio::spawn(async move { + if let Some(db) = db { + let keys = db.set(&shard).await?; + Ok(ShardInfo::new( + shard_idx, + shard.checksum(), + keys, + db.connection_info().clone(), + )) + } else { + // no need to db.set if it is an already used backend (shard is not missing) + Ok(ShardInfo::new( + shard_idx, + shard.checksum(), + existing_key.clone(), + existing_ci.clone(), + )) + } + })); + } + + for shard_info in try_join_all(handles).await? { + metadata.add_shard(shard_info?); + } - debug!("Backend config failed"); + Ok(()) +} + +async fn save_data( + cfg: &mut Config, + shards: Vec, + metadata: &mut MetaData, +) -> ZstorResult<()> { + let shard_len = if shards.is_empty() { + 0 + } else { + shards[0].len() }; + let dbs = find_valid_backends(cfg, shard_len, shards.len(), [].to_vec()).await?; + trace!("store shards in backends"); let mut handles: Vec>> = Vec::with_capacity(shards.len());