Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Consolidate the blobs API #42

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ futures-util = "0.3.30"
testdir = "0.9.1"

[features]
default = ["fs-store", "net_protocol"]
default = ["fs-store", "net_protocol", "formats-collection"]
downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"]
net_protocol = ["downloader", "dep:futures-util"]
fs-store = ["dep:reflink-copy", "redb", "dep:tempfile"]
Expand All @@ -112,6 +112,8 @@ rpc = [
"dep:walkdir",
"downloader",
]
formats = []
formats-collection = ["formats"]

example-iroh = [
"dep:clap",
Expand All @@ -127,6 +129,7 @@ rustdoc-args = ["--cfg", "iroh_docsrs"]

[[example]]
name = "provide-bytes"
required-features = ["formats-collection"]

[[example]]
name = "fetch-fsm"
Expand Down
5 changes: 4 additions & 1 deletion examples/local-swarm-discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ mod progress {
ProgressStyle,
};
use iroh_blobs::{
get::{db::DownloadProgress, progress::BlobProgress, Stats},
get::{
progress::{BlobProgress, DownloadProgress},
Stats,
},
Hash,
};

Expand Down
9 changes: 6 additions & 3 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ use iroh::{NodeAddr, PublicKey, RelayUrl};
use tokio::io::AsyncWriteExt;

use crate::{
get::{db::DownloadProgress, progress::BlobProgress, Stats},
net_protocol::DownloadMode,
get::{
progress::{BlobProgress, DownloadProgress},
Stats,
},
provider::AddProgress,
rpc::client::blobs::{
self, BlobInfo, BlobStatus, CollectionInfo, DownloadOptions, IncompleteBlobInfo, WrapOption,
self, BlobInfo, BlobStatus, CollectionInfo, DownloadMode, DownloadOptions,
IncompleteBlobInfo, WrapOption,
},
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ReportLevel, ValidateProgress},
ticket::BlobTicket,
Expand Down
2 changes: 1 addition & 1 deletion src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use tokio_util::{either::Either, sync::CancellationToken, time::delay_queue};
use tracing::{debug, error, error_span, trace, warn, Instrument};

use crate::{
get::{db::DownloadProgress, Stats},
get::{progress::DownloadProgress, Stats},
metrics::Metrics,
store::Store,
util::{local_pool::LocalPoolHandle, progress::ProgressSender},
Expand Down
34 changes: 16 additions & 18 deletions src/downloader/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ use iroh::endpoint;

use super::{progress::BroadcastProgressSender, DownloadKind, FailureAction, GetStartFut, Getter};
use crate::{
get::{db::get_to_db_in_steps, error::GetError},
store::Store,
get::Error,
store::{get_to_db_in_steps, FetchState, FetchStateNeedsConn, Store},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so there is get and fetch now? this is confusing 😅

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

everything that needs a store is in the store module. I might even make it a fn on the store traits (see the OO or not OO question)

One reason for this is that I think it would be neat if you could hide the entire store behind a ff, but to do that you must make sure that store stuff is not all over the place, otherwise you will go crazy.

};

impl From<GetError> for FailureAction {
fn from(e: GetError) -> Self {
impl From<Error> for FailureAction {
fn from(e: Error) -> Self {
match e {
e @ GetError::NotFound(_) => FailureAction::AbortRequest(e.into()),
e @ GetError::RemoteReset(_) => FailureAction::RetryLater(e.into()),
e @ GetError::NoncompliantNode(_) => FailureAction::DropPeer(e.into()),
e @ GetError::Io(_) => FailureAction::RetryLater(e.into()),
e @ GetError::BadRequest(_) => FailureAction::AbortRequest(e.into()),
e @ Error::NotFound(_) => FailureAction::AbortRequest(e.into()),
e @ Error::RemoteReset(_) => FailureAction::RetryLater(e.into()),
e @ Error::NoncompliantNode(_) => FailureAction::DropPeer(e.into()),
e @ Error::Io(_) => FailureAction::RetryLater(e.into()),
e @ Error::BadRequest(_) => FailureAction::AbortRequest(e.into()),
// TODO: what do we want to do on local failures?
e @ GetError::LocalFailure(_) => FailureAction::AbortRequest(e.into()),
e @ Error::LocalFailure(_) => FailureAction::AbortRequest(e.into()),
}
}
}
Expand All @@ -34,7 +34,7 @@ pub(crate) struct IoGetter<S: Store> {

impl<S: Store> Getter for IoGetter<S> {
type Connection = endpoint::Connection;
type NeedsConn = crate::get::db::GetStateNeedsConn;
type NeedsConn = FetchStateNeedsConn;

fn get(
&mut self,
Expand All @@ -45,10 +45,8 @@ impl<S: Store> Getter for IoGetter<S> {
async move {
match get_to_db_in_steps(store, kind.hash_and_format(), progress_sender).await {
Err(err) => Err(err.into()),
Ok(crate::get::db::GetState::Complete(stats)) => {
Ok(super::GetOutput::Complete(stats))
}
Ok(crate::get::db::GetState::NeedsConn(needs_conn)) => {
Ok(FetchState::Complete(stats)) => Ok(super::GetOutput::Complete(stats)),
Ok(FetchState::NeedsConn(needs_conn)) => {
Ok(super::GetOutput::NeedsConn(needs_conn))
}
}
Expand All @@ -57,7 +55,7 @@ impl<S: Store> Getter for IoGetter<S> {
}
}

impl super::NeedsConn<endpoint::Connection> for crate::get::db::GetStateNeedsConn {
impl super::NeedsConn<endpoint::Connection> for FetchStateNeedsConn {
fn proceed(self, conn: endpoint::Connection) -> super::GetProceedFut {
async move {
let res = self.proceed(conn).await;
Expand All @@ -73,7 +71,7 @@ impl super::NeedsConn<endpoint::Connection> for crate::get::db::GetStateNeedsCon
}

#[cfg(feature = "metrics")]
fn track_metrics(res: &Result<crate::get::Stats, GetError>) {
fn track_metrics(res: &Result<crate::get::Stats, Error>) {
use iroh_metrics::{inc, inc_by};

use crate::metrics::Metrics;
Expand All @@ -90,7 +88,7 @@ fn track_metrics(res: &Result<crate::get::Stats, GetError>) {
inc_by!(Metrics, download_time_total, elapsed.as_millis() as u64);
}
Err(e) => match &e {
GetError::NotFound(_) => inc!(Metrics, downloads_notfound),
Error::NotFound(_) => inc!(Metrics, downloads_notfound),
_ => inc!(Metrics, downloads_error),
},
}
Expand Down
4 changes: 2 additions & 2 deletions src/downloader/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use parking_lot::Mutex;

use super::DownloadKind;
use crate::{
get::{db::DownloadProgress, progress::TransferState},
get::progress::{DownloadProgress, TransferState},
util::progress::{AsyncChannelProgressSender, IdGenerator, ProgressSendError, ProgressSender},
};

Expand All @@ -21,7 +21,7 @@ pub type ProgressSubscriber = AsyncChannelProgressSender<DownloadProgress>;
/// Track the progress of downloads.
///
/// This struct allows to create [`ProgressSender`] structs to be passed to
/// [`crate::get::db::get_to_db`]. Each progress sender can be subscribed to by any number of
/// [`crate::store::get_to_db`]. Each progress sender can be subscribed to by any number of
/// [`ProgressSubscriber`] channel senders, which will receive each progress update (if they have
/// capacity). Additionally, the [`ProgressTracker`] maintains a [`TransferState`] for each
/// transfer, applying each progress update to update this state. When subscribing to an already
Expand Down
5 changes: 1 addition & 4 deletions src/downloader/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ use iroh::SecretKey;

use super::*;
use crate::{
get::{
db::BlobId,
progress::{BlobProgress, TransferState},
},
get::progress::{BlobId, BlobProgress, TransferState},
util::{
local_pool::LocalPool,
progress::{AsyncChannelProgressSender, IdGenerator},
Expand Down
10 changes: 5 additions & 5 deletions src/downloader/test/getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ impl Getter for TestingGetter {
// since for testing we don't need a real connection, just keep track of what peer is the
// request being sent to
type Connection = NodeId;
type NeedsConn = GetStateNeedsConn;
type NeedsConn = FetchStateNeedsConn;

fn get(
&mut self,
kind: DownloadKind,
progress_sender: BroadcastProgressSender,
) -> GetStartFut<Self::NeedsConn> {
std::future::ready(Ok(downloader::GetOutput::NeedsConn(GetStateNeedsConn(
std::future::ready(Ok(downloader::GetOutput::NeedsConn(FetchStateNeedsConn(
self.clone(),
kind,
progress_sender,
Expand All @@ -53,11 +53,11 @@ impl Getter for TestingGetter {
}

#[derive(Debug)]
pub(super) struct GetStateNeedsConn(TestingGetter, DownloadKind, BroadcastProgressSender);
pub(super) struct FetchStateNeedsConn(TestingGetter, DownloadKind, BroadcastProgressSender);

impl downloader::NeedsConn<NodeId> for GetStateNeedsConn {
impl downloader::NeedsConn<NodeId> for FetchStateNeedsConn {
fn proceed(self, peer: NodeId) -> super::GetProceedFut {
let GetStateNeedsConn(getter, kind, progress_sender) = self;
let FetchStateNeedsConn(getter, kind, progress_sender) = self;
let mut inner = getter.0.write();
inner.request_history.push((kind, peer));
let request_duration = inner.request_duration;
Expand Down
1 change: 1 addition & 0 deletions src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
//! n-1 items, where n is the number of blobs in the HashSeq.
//!
//! [postcard]: https://docs.rs/postcard/latest/postcard/
#[cfg(feature = "formats-collection")]
pub mod collection;
71 changes: 5 additions & 66 deletions src/get.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! The client side API
//!
//! To get data, create a connection using [iroh-net] or use any quinn
//! To get data, create a connection using [iroh] or use any [iroh-quinn]
//! connection that was obtained in another way.
//!
//! Create a request describing the data you want to get.
Expand All @@ -11,9 +11,9 @@
//! For some states you have to provide additional arguments when calling next,
//! or you can choose to finish early.
//!
//! [iroh-net]: https://docs.rs/iroh-net
//! [iroh-net]: https://docs.rs/iroh
//! [iroh-quinn]: https://docs.rs/iroh-quinn
use std::{
error::Error,
fmt::{self, Debug},
time::{Duration, Instant},
};
Expand All @@ -30,8 +30,8 @@ use crate::{
Hash, IROH_BLOCK_SIZE,
};

pub mod db;
pub mod error;
mod error;
pub use error::Error;
pub mod progress;
pub mod request;

Expand Down Expand Up @@ -882,64 +882,3 @@ pub mod fsm {
ranges_iter: RangesIter,
}
}

/// Error when processing a response
#[derive(thiserror::Error, Debug)]
pub enum GetResponseError {
/// Error when opening a stream
#[error("connection: {0}")]
Connection(#[from] endpoint::ConnectionError),
/// Error when writing the handshake or request to the stream
#[error("write: {0}")]
Write(#[from] endpoint::WriteError),
/// Error when reading from the stream
#[error("read: {0}")]
Read(#[from] endpoint::ReadError),
/// Error when decoding, e.g. hash mismatch
#[error("decode: {0}")]
Decode(bao_tree::io::DecodeError),
/// A generic error
#[error("generic: {0}")]
Generic(anyhow::Error),
}

impl From<postcard::Error> for GetResponseError {
fn from(cause: postcard::Error) -> Self {
Self::Generic(cause.into())
}
}

impl From<bao_tree::io::DecodeError> for GetResponseError {
fn from(cause: bao_tree::io::DecodeError) -> Self {
match cause {
bao_tree::io::DecodeError::Io(cause) => {
// try to downcast to specific quinn errors
if let Some(source) = cause.source() {
if let Some(error) = source.downcast_ref::<endpoint::ConnectionError>() {
return Self::Connection(error.clone());
}
if let Some(error) = source.downcast_ref::<endpoint::ReadError>() {
return Self::Read(error.clone());
}
if let Some(error) = source.downcast_ref::<endpoint::WriteError>() {
return Self::Write(error.clone());
}
}
Self::Generic(cause.into())
}
_ => Self::Decode(cause),
}
}
}

impl From<anyhow::Error> for GetResponseError {
fn from(cause: anyhow::Error) -> Self {
Self::Generic(cause)
}
}

impl From<GetResponseError> for std::io::Error {
fn from(cause: GetResponseError) -> Self {
Self::new(std::io::ErrorKind::Other, cause)
}
}
Loading
Loading