Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async BitswapStore trait #28

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 56 additions & 50 deletions src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::protocol::{
};
use crate::query::{QueryEvent, QueryId, QueryManager, Request, Response};
use crate::stats::*;
use async_trait::async_trait;
use fnv::FnvHashMap;
#[cfg(feature = "compat")]
use fnv::FnvHashSet;
Expand Down Expand Up @@ -55,17 +56,18 @@ pub enum BitswapEvent {
}

/// Trait implemented by a block store.
#[async_trait]
pub trait BitswapStore: Send + Sync + 'static {
/// The store params.
type Params: StoreParams;
/// A have query needs to know if the block store contains the block.
fn contains(&mut self, cid: &Cid) -> Result<bool>;
async fn contains(&mut self, cid: &Cid) -> Result<bool>;
/// A block query needs to retrieve the block from the store.
fn get(&mut self, cid: &Cid) -> Result<Option<Vec<u8>>>;
async fn get(&mut self, cid: &Cid) -> Result<Option<Vec<u8>>>;
/// A block response needs to insert the block into the store.
fn insert(&mut self, block: &Block<Self::Params>) -> Result<()>;
async fn insert(&mut self, block: &Block<Self::Params>) -> Result<()>;
/// A sync query needs a list of missing blocks to make progress.
fn missing_blocks(&mut self, cid: &Cid) -> Result<Vec<Cid>>;
async fn missing_blocks(&mut self, cid: &Cid) -> Result<Vec<Cid>>;
}

/// Bitswap configuration.
Expand Down Expand Up @@ -216,53 +218,56 @@ fn start_db_thread<S: BitswapStore>(
) {
let (tx, requests) = mpsc::unbounded();
let (responses, rx) = mpsc::unbounded();
std::thread::spawn(move || {
let mut requests: mpsc::UnboundedReceiver<DbRequest<S::Params>> = requests;
while let Some(request) = futures::executor::block_on(requests.next()) {
match request {
DbRequest::Bitswap(channel, request) => {
let response = match request.ty {
RequestType::Have => {
let have = store.contains(&request.cid).ok().unwrap_or_default();
if have {
RESPONSES_TOTAL.with_label_values(&["have"]).inc();
} else {
RESPONSES_TOTAL.with_label_values(&["dont_have"]).inc();
std::thread::spawn(|| {
futures::executor::block_on(async move {
let mut requests: mpsc::UnboundedReceiver<DbRequest<S::Params>> = requests;
while let Some(request) = requests.next().await {
match request {
DbRequest::Bitswap(channel, request) => {
let response = match request.ty {
RequestType::Have => {
let have =
store.contains(&request.cid).await.ok().unwrap_or_default();
if have {
RESPONSES_TOTAL.with_label_values(&["have"]).inc();
} else {
RESPONSES_TOTAL.with_label_values(&["dont_have"]).inc();
}
tracing::trace!("have {}", have);
BitswapResponse::Have(have)
}
tracing::trace!("have {}", have);
BitswapResponse::Have(have)
}
RequestType::Block => {
let block = store.get(&request.cid).ok().unwrap_or_default();
if let Some(data) = block {
RESPONSES_TOTAL.with_label_values(&["block"]).inc();
SENT_BLOCK_BYTES.inc_by(data.len() as u64);
tracing::trace!("block {}", data.len());
BitswapResponse::Block(data)
} else {
RESPONSES_TOTAL.with_label_values(&["dont_have"]).inc();
tracing::trace!("have false");
BitswapResponse::Have(false)
RequestType::Block => {
let block = store.get(&request.cid).await.ok().unwrap_or_default();
if let Some(data) = block {
RESPONSES_TOTAL.with_label_values(&["block"]).inc();
SENT_BLOCK_BYTES.inc_by(data.len() as u64);
tracing::trace!("block {}", data.len());
BitswapResponse::Block(data)
} else {
RESPONSES_TOTAL.with_label_values(&["dont_have"]).inc();
tracing::trace!("have false");
BitswapResponse::Have(false)
}
}
};
responses
.unbounded_send(DbResponse::Bitswap(channel, response))
.ok();
}
DbRequest::Insert(block) => {
if let Err(err) = store.insert(&block).await {
tracing::error!("error inserting blocks {}", err);
}
};
responses
.unbounded_send(DbResponse::Bitswap(channel, response))
.ok();
}
DbRequest::Insert(block) => {
if let Err(err) = store.insert(&block) {
tracing::error!("error inserting blocks {}", err);
}
}
DbRequest::MissingBlocks(id, cid) => {
let res = store.missing_blocks(&cid);
responses
.unbounded_send(DbResponse::MissingBlocks(id, res))
.ok();
DbRequest::MissingBlocks(id, cid) => {
let res = store.missing_blocks(&cid).await;
responses
.unbounded_send(DbResponse::MissingBlocks(id, res))
.ok();
}
}
}
}
})
});
(tx, rx)
}
Expand Down Expand Up @@ -731,26 +736,27 @@ mod tests {
#[derive(Clone, Default)]
struct Store(Arc<Mutex<FnvHashMap<Cid, Vec<u8>>>>);

#[async_trait]
impl BitswapStore for Store {
type Params = DefaultParams;
fn contains(&mut self, cid: &Cid) -> Result<bool> {
async fn contains(&mut self, cid: &Cid) -> Result<bool> {
Ok(self.0.lock().unwrap().contains_key(cid))
}
fn get(&mut self, cid: &Cid) -> Result<Option<Vec<u8>>> {
async fn get(&mut self, cid: &Cid) -> Result<Option<Vec<u8>>> {
Ok(self.0.lock().unwrap().get(cid).cloned())
}
fn insert(&mut self, block: &Block<Self::Params>) -> Result<()> {
async fn insert(&mut self, block: &Block<Self::Params>) -> Result<()> {
self.0
.lock()
.unwrap()
.insert(*block.cid(), block.data().to_vec());
Ok(())
}
fn missing_blocks(&mut self, cid: &Cid) -> Result<Vec<Cid>> {
async fn missing_blocks(&mut self, cid: &Cid) -> Result<Vec<Cid>> {
let mut stack = vec![*cid];
let mut missing = vec![];
while let Some(cid) = stack.pop() {
if let Some(data) = self.get(&cid)? {
if let Some(data) = self.get(&cid).await? {
let block = Block::<Self::Params>::new_unchecked(cid, data);
block.references(&mut stack)?;
} else {
Expand Down