diff --git a/src/behaviour.rs b/src/behaviour.rs index 07ff69f..c39940f 100644 --- a/src/behaviour.rs +++ b/src/behaviour.rs @@ -12,6 +12,7 @@ use crate::protocol::{ }; use crate::query::{QueryEvent, QueryId, QueryManager, Request, Response}; use crate::stats::*; +use async_trait::async_trait; use fnv::FnvHashMap; #[cfg(feature = "compat")] use fnv::FnvHashSet; @@ -55,17 +56,18 @@ pub enum BitswapEvent { } /// Trait implemented by a block store. +#[async_trait] pub trait BitswapStore: Send + Sync + 'static { /// The store params. type Params: StoreParams; /// A have query needs to know if the block store contains the block. - fn contains(&mut self, cid: &Cid) -> Result; + async fn contains(&mut self, cid: &Cid) -> Result; /// A block query needs to retrieve the block from the store. - fn get(&mut self, cid: &Cid) -> Result>>; + async fn get(&mut self, cid: &Cid) -> Result>>; /// A block response needs to insert the block into the store. - fn insert(&mut self, block: &Block) -> Result<()>; + async fn insert(&mut self, block: &Block) -> Result<()>; /// A sync query needs a list of missing blocks to make progress. - fn missing_blocks(&mut self, cid: &Cid) -> Result>; + async fn missing_blocks(&mut self, cid: &Cid) -> Result>; } /// Bitswap configuration. @@ -216,53 +218,56 @@ fn start_db_thread( ) { let (tx, requests) = mpsc::unbounded(); let (responses, rx) = mpsc::unbounded(); - std::thread::spawn(move || { - let mut requests: mpsc::UnboundedReceiver> = requests; - while let Some(request) = futures::executor::block_on(requests.next()) { - match request { - DbRequest::Bitswap(channel, request) => { - let response = match request.ty { - RequestType::Have => { - let have = store.contains(&request.cid).ok().unwrap_or_default(); - if have { - RESPONSES_TOTAL.with_label_values(&["have"]).inc(); - } else { - RESPONSES_TOTAL.with_label_values(&["dont_have"]).inc(); + std::thread::spawn(|| { + futures::executor::block_on(async move { + let mut requests: mpsc::UnboundedReceiver> = requests; + while let Some(request) = requests.next().await { + match request { + DbRequest::Bitswap(channel, request) => { + let response = match request.ty { + RequestType::Have => { + let have = + store.contains(&request.cid).await.ok().unwrap_or_default(); + if have { + RESPONSES_TOTAL.with_label_values(&["have"]).inc(); + } else { + RESPONSES_TOTAL.with_label_values(&["dont_have"]).inc(); + } + tracing::trace!("have {}", have); + BitswapResponse::Have(have) } - tracing::trace!("have {}", have); - BitswapResponse::Have(have) - } - RequestType::Block => { - let block = store.get(&request.cid).ok().unwrap_or_default(); - if let Some(data) = block { - RESPONSES_TOTAL.with_label_values(&["block"]).inc(); - SENT_BLOCK_BYTES.inc_by(data.len() as u64); - tracing::trace!("block {}", data.len()); - BitswapResponse::Block(data) - } else { - RESPONSES_TOTAL.with_label_values(&["dont_have"]).inc(); - tracing::trace!("have false"); - BitswapResponse::Have(false) + RequestType::Block => { + let block = store.get(&request.cid).await.ok().unwrap_or_default(); + if let Some(data) = block { + RESPONSES_TOTAL.with_label_values(&["block"]).inc(); + SENT_BLOCK_BYTES.inc_by(data.len() as u64); + tracing::trace!("block {}", data.len()); + BitswapResponse::Block(data) + } else { + RESPONSES_TOTAL.with_label_values(&["dont_have"]).inc(); + tracing::trace!("have false"); + BitswapResponse::Have(false) + } } + }; + responses + .unbounded_send(DbResponse::Bitswap(channel, response)) + .ok(); + } + DbRequest::Insert(block) => { + if let Err(err) = store.insert(&block).await { + tracing::error!("error inserting blocks {}", err); } - }; - responses - .unbounded_send(DbResponse::Bitswap(channel, response)) - .ok(); - } - DbRequest::Insert(block) => { - if let Err(err) = store.insert(&block) { - tracing::error!("error inserting blocks {}", err); } - } - DbRequest::MissingBlocks(id, cid) => { - let res = store.missing_blocks(&cid); - responses - .unbounded_send(DbResponse::MissingBlocks(id, res)) - .ok(); + DbRequest::MissingBlocks(id, cid) => { + let res = store.missing_blocks(&cid).await; + responses + .unbounded_send(DbResponse::MissingBlocks(id, res)) + .ok(); + } } } - } + }) }); (tx, rx) } @@ -731,26 +736,27 @@ mod tests { #[derive(Clone, Default)] struct Store(Arc>>>); + #[async_trait] impl BitswapStore for Store { type Params = DefaultParams; - fn contains(&mut self, cid: &Cid) -> Result { + async fn contains(&mut self, cid: &Cid) -> Result { Ok(self.0.lock().unwrap().contains_key(cid)) } - fn get(&mut self, cid: &Cid) -> Result>> { + async fn get(&mut self, cid: &Cid) -> Result>> { Ok(self.0.lock().unwrap().get(cid).cloned()) } - fn insert(&mut self, block: &Block) -> Result<()> { + async fn insert(&mut self, block: &Block) -> Result<()> { self.0 .lock() .unwrap() .insert(*block.cid(), block.data().to_vec()); Ok(()) } - fn missing_blocks(&mut self, cid: &Cid) -> Result> { + async fn missing_blocks(&mut self, cid: &Cid) -> Result> { let mut stack = vec![*cid]; let mut missing = vec![]; while let Some(cid) = stack.pop() { - if let Some(data) = self.get(&cid)? { + if let Some(data) = self.get(&cid).await? { let block = Block::::new_unchecked(cid, data); block.references(&mut stack)?; } else {