Skip to content

Commit

Permalink
refactor(iroh): extract docs RPC into iroh-docs
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Oct 31, 2024
1 parent f0590be commit 4d30302
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 537 deletions.
13 changes: 9 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,6 @@ iroh-router = { path = "./iroh-router" }

iroh-blobs = { git = "https://github.com/n0-computer/iroh-blobs", branch = "main" }
iroh-gossip = { git = "https://github.com/n0-computer/iroh-gossip", branch = "main" }
iroh-docs = { git = "https://github.com/n0-computer/iroh-docs", branch = "main" }
iroh-docs = { git = "https://github.com/n0-computer/iroh-docs", branch = "refactor-extract-rpc" }

# iroh-docs = { path = "../iroh-crates/iroh-docs" }
2 changes: 1 addition & 1 deletion iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ iroh-router = { version = "0.27.0" }
nested_enum_utils = "0.1.0"
num_cpus = { version = "1.15.0" }
portable-atomic = "1"
iroh-docs = { version = "0.27.0" }
iroh-docs = { version = "0.27.0", features = ["rpc"] }
iroh-gossip = "0.27.0"
parking_lot = "0.12.1"
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
Expand Down
12 changes: 8 additions & 4 deletions iroh/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ pub use crate::rpc_protocol::RpcService;

mod quic;

pub use iroh_docs::rpc::client::Doc;

pub use self::net::NodeStatus;
pub(crate) use self::quic::{connect_raw as quic_connect_raw, RPC_ALPN};
pub use self::{docs::Doc, net::NodeStatus};

pub mod authors;
pub mod blobs;
pub mod docs;
pub mod net;
pub mod tags;

Expand Down Expand Up @@ -65,8 +66,11 @@ impl Iroh {
}

/// Returns the docs client.
pub fn docs(&self) -> &docs::Client {
docs::Client::ref_cast(&self.rpc)
pub fn docs(&self) -> iroh_docs::rpc::client::Client {
let rpc = self.rpc.clone();
let channel: quic_rpc::RpcClient<iroh_docs::rpc::proto::RpcService> =
rpc.map::<iroh_docs::rpc::proto::RpcService>();
iroh_docs::rpc::client::Client::new(channel)
}

/// Returns the authors client.
Expand Down
3 changes: 3 additions & 0 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ async fn spawn_docs<S: iroh_blobs::store::Store>(
endpoint: Endpoint,
gossip: Gossip,
downloader: Downloader,
local_pool_handle: LocalPoolHandle,
) -> anyhow::Result<Option<Engine>> {
let docs_store = match storage {
DocsStorage::Disabled => return Ok(None),
Expand All @@ -95,6 +96,7 @@ async fn spawn_docs<S: iroh_blobs::store::Store>(
blobs_store,
downloader,
default_author_storage,
local_pool_handle,
)
.await?;
Ok(Some(engine))
Expand Down Expand Up @@ -687,6 +689,7 @@ where
endpoint.clone(),
gossip.clone(),
downloader.clone(),
lp.handle().clone(),
)
.await?;

Expand Down
179 changes: 11 additions & 168 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ use crate::{
DeleteRequest, DownloadResponse, ExportRequest, ExportResponse, ListIncompleteRequest,
ListRequest, ReadAtRequest, ReadAtResponse, ValidateRequest,
},
docs::{
ExportFileRequest, ExportFileResponse, ImportFileRequest, ImportFileResponse,
Request as DocsRequest, SetHashRequest,
},
net::{
self, AddAddrRequest, AddrRequest, IdRequest, NodeWatchRequest, RelayRequest,
RemoteInfoRequest, RemoteInfoResponse, RemoteInfosIterRequest, RemoteInfosIterResponse,
Expand Down Expand Up @@ -274,41 +270,19 @@ impl<D: BaoStore> Handler<D> {

async fn handle_docs_request(
self,
msg: DocsRequest,
msg: iroh_docs::rpc::proto::Request,
chan: RpcChannel<RpcService, IrohServerEndpoint>,
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
use DocsRequest::*;
match msg {
Open(msg) => chan.rpc(msg, self, Self::doc_open).await,
Close(msg) => chan.rpc(msg, self, Self::doc_close).await,
Status(msg) => chan.rpc(msg, self, Self::doc_status).await,
List(msg) => chan.server_streaming(msg, self, Self::doc_list).await,
Create(msg) => chan.rpc(msg, self, Self::doc_create).await,
Drop(msg) => chan.rpc(msg, self, Self::doc_drop).await,
Import(msg) => chan.rpc(msg, self, Self::doc_import).await,
Set(msg) => chan.rpc(msg, self, Self::doc_set).await,
ImportFile(msg) => {
chan.server_streaming(msg, self, Self::doc_import_file)
.await
}
ExportFile(msg) => {
chan.server_streaming(msg, self, Self::doc_export_file)
.await
}
Del(msg) => chan.rpc(msg, self, Self::doc_del).await,
SetHash(msg) => chan.rpc(msg, self, Self::doc_set_hash).await,
Get(msg) => chan.server_streaming(msg, self, Self::doc_get_many).await,
GetExact(msg) => chan.rpc(msg, self, Self::doc_get_exact).await,
StartSync(msg) => chan.rpc(msg, self, Self::doc_start_sync).await,
Leave(msg) => chan.rpc(msg, self, Self::doc_leave).await,
Share(msg) => chan.rpc(msg, self, Self::doc_share).await,
Subscribe(msg) => {
chan.try_server_streaming(msg, self, Self::doc_subscribe)
.await
}
SetDownloadPolicy(msg) => chan.rpc(msg, self, Self::doc_set_download_policy).await,
GetDownloadPolicy(msg) => chan.rpc(msg, self, Self::doc_get_download_policy).await,
GetSyncPeers(msg) => chan.rpc(msg, self, Self::doc_get_sync_peers).await,
if let Some(docs) = self
.router
.get_protocol::<iroh_docs::engine::Engine>(DOCS_ALPN)
{
let chan = chan.map::<iroh_docs::rpc::proto::RpcService>();
docs.handle_rpc_request(msg, chan).await
} else {
Err(RpcServerError::SendError(anyhow::anyhow!(
"Docs is not enabled"
)))
}
}

Expand Down Expand Up @@ -509,137 +483,6 @@ impl<D: BaoStore> Handler<D> {
rx.map(AddPathResponse)
}

fn doc_import_file(self, msg: ImportFileRequest) -> impl Stream<Item = ImportFileResponse> {
// provide a little buffer so that we don't slow down the sender
let (tx, rx) = async_channel::bounded(32);
let tx2 = tx.clone();
self.local_pool_handle().spawn_detached(|| async move {
if let Err(e) = self.doc_import_file0(msg, tx).await {
tx2.send(crate::client::docs::ImportProgress::Abort(RpcError::new(
&*e,
)))
.await
.ok();
}
});
rx.map(ImportFileResponse)
}

async fn doc_import_file0(
self,
msg: ImportFileRequest,
progress: async_channel::Sender<crate::client::docs::ImportProgress>,
) -> anyhow::Result<()> {
use std::collections::BTreeMap;

use iroh_blobs::store::ImportMode;

use crate::client::docs::ImportProgress as DocImportProgress;

let progress = AsyncChannelProgressSender::new(progress);
let names = Arc::new(Mutex::new(BTreeMap::new()));
// convert import progress to provide progress
let import_progress = progress.clone().with_filter_map(move |x| match x {
ImportProgress::Found { id, name } => {
names.lock().unwrap().insert(id, name);
None
}
ImportProgress::Size { id, size } => {
let name = names.lock().unwrap().remove(&id)?;
Some(DocImportProgress::Found { id, name, size })
}
ImportProgress::OutboardProgress { id, offset } => {
Some(DocImportProgress::Progress { id, offset })
}
ImportProgress::OutboardDone { hash, id } => {
Some(DocImportProgress::IngestDone { hash, id })
}
_ => None,
});
let ImportFileRequest {
doc_id,
author_id,
key,
path: root,
in_place,
} = msg;
// Check that the path is absolute and exists.
anyhow::ensure!(root.is_absolute(), "path must be absolute");
anyhow::ensure!(
root.exists(),
"trying to add missing path: {}",
root.display()
);

let import_mode = match in_place {
true => ImportMode::TryReference,
false => ImportMode::Copy,
};

let blobs = self.blobs();
let (temp_tag, size) = blobs
.store()
.import_file(root, import_mode, BlobFormat::Raw, import_progress)
.await?;

let hash_and_format = temp_tag.inner();
let HashAndFormat { hash, .. } = *hash_and_format;
self.doc_set_hash(SetHashRequest {
doc_id,
author_id,
key: key.clone(),
hash,
size,
})
.await?;
drop(temp_tag);
progress.send(DocImportProgress::AllDone { key }).await?;
Ok(())
}

fn doc_export_file(self, msg: ExportFileRequest) -> impl Stream<Item = ExportFileResponse> {
let (tx, rx) = async_channel::bounded(1024);
let tx2 = tx.clone();
self.local_pool_handle().spawn_detached(|| async move {
if let Err(e) = self.doc_export_file0(msg, tx).await {
tx2.send(ExportProgress::Abort(RpcError::new(&*e)))
.await
.ok();
}
});
rx.map(ExportFileResponse)
}

async fn doc_export_file0(
self,
msg: ExportFileRequest,
progress: async_channel::Sender<ExportProgress>,
) -> anyhow::Result<()> {
let _docs = self.docs().ok_or_else(|| anyhow!("docs are disabled"))?;
let progress = AsyncChannelProgressSender::new(progress);
let ExportFileRequest { entry, path, mode } = msg;
let key = bytes::Bytes::from(entry.key().to_vec());
let export_progress = progress.clone().with_map(move |mut x| {
// assign the doc key to the `meta` field of the initial progress event
if let ExportProgress::Found { meta, .. } = &mut x {
*meta = Some(key.clone())
}
x
});
let blobs = self.blobs();
iroh_blobs::export::export(
blobs.store(),
entry.content_hash(),
path,
ExportFormat::Blob,
mode,
export_progress,
)
.await?;
progress.send(ExportProgress::AllDone).await?;
Ok(())
}

fn blob_download(self, msg: BlobDownloadRequest) -> impl Stream<Item = DownloadResponse> {
let (sender, receiver) = async_channel::bounded(1024);
let endpoint = self.inner.endpoint.clone();
Expand Down
Loading

0 comments on commit 4d30302

Please sign in to comment.