diff --git a/Cargo.lock b/Cargo.lock index c8cdf6109c..a2c9149d8a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2698,7 +2698,7 @@ dependencies = [ [[package]] name = "iroh-docs" version = "0.27.0" -source = "git+https://github.com/n0-computer/iroh-docs?branch=main#16bc7fe4c7dee1b1b88f54390856c3fafa3d7656" +source = "git+https://github.com/n0-computer/iroh-docs?branch=refactor-extract-rpc#a761c374de391da3181f27398f2c97da30c1469f" dependencies = [ "anyhow", "async-channel", @@ -2717,15 +2717,20 @@ dependencies = [ "iroh-net", "iroh-router", "lru", + "nested_enum_utils", "num_enum", + "portable-atomic", "postcard", + "quic-rpc 0.13.0", + "quic-rpc-derive", "rand", "rand_core", "redb 1.5.1", "redb 2.1.1", "self_cell", "serde", - "strum 0.25.0", + "serde-error", + "strum 0.26.3", "tempfile", "thiserror", "tokio", @@ -4003,9 +4008,9 @@ dependencies = [ [[package]] name = "portable-atomic" -version = "1.7.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da544ee218f0d287a911e9c99a39a8c9bc8fcad3cb8db5959940044ecfc67265" +checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" [[package]] name = "portmapper" diff --git a/Cargo.toml b/Cargo.toml index c069d76c1a..7d3764b9ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,4 +56,6 @@ iroh-router = { path = "./iroh-router" } iroh-blobs = { git = "https://github.com/n0-computer/iroh-blobs", branch = "main" } iroh-gossip = { git = "https://github.com/n0-computer/iroh-gossip", branch = "main" } -iroh-docs = { git = "https://github.com/n0-computer/iroh-docs", branch = "main" } +iroh-docs = { git = "https://github.com/n0-computer/iroh-docs", branch = "refactor-extract-rpc" } + +# iroh-docs = { path = "../iroh-crates/iroh-docs" } diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 922d99d08a..9dfe900e08 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -35,7 +35,7 @@ iroh-router = { version = "0.27.0" } nested_enum_utils = "0.1.0" num_cpus = { version = "1.15.0" } portable-atomic = "1" -iroh-docs = { version = "0.27.0" } +iroh-docs = { version = "0.27.0", features = ["rpc"] } iroh-gossip = "0.27.0" parking_lot = "0.12.1" postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } diff --git a/iroh/src/client.rs b/iroh/src/client.rs index 54505682a0..e2cae72d14 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -15,12 +15,13 @@ pub use crate::rpc_protocol::RpcService; mod quic; +pub use iroh_docs::rpc::client::Doc; + +pub use self::net::NodeStatus; pub(crate) use self::quic::{connect_raw as quic_connect_raw, RPC_ALPN}; -pub use self::{docs::Doc, net::NodeStatus}; pub mod authors; pub mod blobs; -pub mod docs; pub mod net; pub mod tags; @@ -65,8 +66,11 @@ impl Iroh { } /// Returns the docs client. - pub fn docs(&self) -> &docs::Client { - docs::Client::ref_cast(&self.rpc) + pub fn docs(&self) -> iroh_docs::rpc::client::Client { + let rpc = self.rpc.clone(); + let channel: quic_rpc::RpcClient = + rpc.map::(); + iroh_docs::rpc::client::Client::new(channel) } /// Returns the authors client. diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index beca58c211..7cedcf6ad5 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -82,6 +82,7 @@ async fn spawn_docs( endpoint: Endpoint, gossip: Gossip, downloader: Downloader, + local_pool_handle: LocalPoolHandle, ) -> anyhow::Result> { let docs_store = match storage { DocsStorage::Disabled => return Ok(None), @@ -95,6 +96,7 @@ async fn spawn_docs( blobs_store, downloader, default_author_storage, + local_pool_handle, ) .await?; Ok(Some(engine)) @@ -687,6 +689,7 @@ where endpoint.clone(), gossip.clone(), downloader.clone(), + lp.handle().clone(), ) .await?; diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index a6dc9f3a2b..df50654961 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -56,10 +56,6 @@ use crate::{ DeleteRequest, DownloadResponse, ExportRequest, ExportResponse, ListIncompleteRequest, ListRequest, ReadAtRequest, ReadAtResponse, ValidateRequest, }, - docs::{ - ExportFileRequest, ExportFileResponse, ImportFileRequest, ImportFileResponse, - Request as DocsRequest, SetHashRequest, - }, net::{ self, AddAddrRequest, AddrRequest, IdRequest, NodeWatchRequest, RelayRequest, RemoteInfoRequest, RemoteInfoResponse, RemoteInfosIterRequest, RemoteInfosIterResponse, @@ -274,41 +270,19 @@ impl Handler { async fn handle_docs_request( self, - msg: DocsRequest, + msg: iroh_docs::rpc::proto::Request, chan: RpcChannel, ) -> Result<(), RpcServerError> { - use DocsRequest::*; - match msg { - Open(msg) => chan.rpc(msg, self, Self::doc_open).await, - Close(msg) => chan.rpc(msg, self, Self::doc_close).await, - Status(msg) => chan.rpc(msg, self, Self::doc_status).await, - List(msg) => chan.server_streaming(msg, self, Self::doc_list).await, - Create(msg) => chan.rpc(msg, self, Self::doc_create).await, - Drop(msg) => chan.rpc(msg, self, Self::doc_drop).await, - Import(msg) => chan.rpc(msg, self, Self::doc_import).await, - Set(msg) => chan.rpc(msg, self, Self::doc_set).await, - ImportFile(msg) => { - chan.server_streaming(msg, self, Self::doc_import_file) - .await - } - ExportFile(msg) => { - chan.server_streaming(msg, self, Self::doc_export_file) - .await - } - Del(msg) => chan.rpc(msg, self, Self::doc_del).await, - SetHash(msg) => chan.rpc(msg, self, Self::doc_set_hash).await, - Get(msg) => chan.server_streaming(msg, self, Self::doc_get_many).await, - GetExact(msg) => chan.rpc(msg, self, Self::doc_get_exact).await, - StartSync(msg) => chan.rpc(msg, self, Self::doc_start_sync).await, - Leave(msg) => chan.rpc(msg, self, Self::doc_leave).await, - Share(msg) => chan.rpc(msg, self, Self::doc_share).await, - Subscribe(msg) => { - chan.try_server_streaming(msg, self, Self::doc_subscribe) - .await - } - SetDownloadPolicy(msg) => chan.rpc(msg, self, Self::doc_set_download_policy).await, - GetDownloadPolicy(msg) => chan.rpc(msg, self, Self::doc_get_download_policy).await, - GetSyncPeers(msg) => chan.rpc(msg, self, Self::doc_get_sync_peers).await, + if let Some(docs) = self + .router + .get_protocol::(DOCS_ALPN) + { + let chan = chan.map::(); + docs.handle_rpc_request(msg, chan).await + } else { + Err(RpcServerError::SendError(anyhow::anyhow!( + "Docs is not enabled" + ))) } } @@ -509,137 +483,6 @@ impl Handler { rx.map(AddPathResponse) } - fn doc_import_file(self, msg: ImportFileRequest) -> impl Stream { - // provide a little buffer so that we don't slow down the sender - let (tx, rx) = async_channel::bounded(32); - let tx2 = tx.clone(); - self.local_pool_handle().spawn_detached(|| async move { - if let Err(e) = self.doc_import_file0(msg, tx).await { - tx2.send(crate::client::docs::ImportProgress::Abort(RpcError::new( - &*e, - ))) - .await - .ok(); - } - }); - rx.map(ImportFileResponse) - } - - async fn doc_import_file0( - self, - msg: ImportFileRequest, - progress: async_channel::Sender, - ) -> anyhow::Result<()> { - use std::collections::BTreeMap; - - use iroh_blobs::store::ImportMode; - - use crate::client::docs::ImportProgress as DocImportProgress; - - let progress = AsyncChannelProgressSender::new(progress); - let names = Arc::new(Mutex::new(BTreeMap::new())); - // convert import progress to provide progress - let import_progress = progress.clone().with_filter_map(move |x| match x { - ImportProgress::Found { id, name } => { - names.lock().unwrap().insert(id, name); - None - } - ImportProgress::Size { id, size } => { - let name = names.lock().unwrap().remove(&id)?; - Some(DocImportProgress::Found { id, name, size }) - } - ImportProgress::OutboardProgress { id, offset } => { - Some(DocImportProgress::Progress { id, offset }) - } - ImportProgress::OutboardDone { hash, id } => { - Some(DocImportProgress::IngestDone { hash, id }) - } - _ => None, - }); - let ImportFileRequest { - doc_id, - author_id, - key, - path: root, - in_place, - } = msg; - // Check that the path is absolute and exists. - anyhow::ensure!(root.is_absolute(), "path must be absolute"); - anyhow::ensure!( - root.exists(), - "trying to add missing path: {}", - root.display() - ); - - let import_mode = match in_place { - true => ImportMode::TryReference, - false => ImportMode::Copy, - }; - - let blobs = self.blobs(); - let (temp_tag, size) = blobs - .store() - .import_file(root, import_mode, BlobFormat::Raw, import_progress) - .await?; - - let hash_and_format = temp_tag.inner(); - let HashAndFormat { hash, .. } = *hash_and_format; - self.doc_set_hash(SetHashRequest { - doc_id, - author_id, - key: key.clone(), - hash, - size, - }) - .await?; - drop(temp_tag); - progress.send(DocImportProgress::AllDone { key }).await?; - Ok(()) - } - - fn doc_export_file(self, msg: ExportFileRequest) -> impl Stream { - let (tx, rx) = async_channel::bounded(1024); - let tx2 = tx.clone(); - self.local_pool_handle().spawn_detached(|| async move { - if let Err(e) = self.doc_export_file0(msg, tx).await { - tx2.send(ExportProgress::Abort(RpcError::new(&*e))) - .await - .ok(); - } - }); - rx.map(ExportFileResponse) - } - - async fn doc_export_file0( - self, - msg: ExportFileRequest, - progress: async_channel::Sender, - ) -> anyhow::Result<()> { - let _docs = self.docs().ok_or_else(|| anyhow!("docs are disabled"))?; - let progress = AsyncChannelProgressSender::new(progress); - let ExportFileRequest { entry, path, mode } = msg; - let key = bytes::Bytes::from(entry.key().to_vec()); - let export_progress = progress.clone().with_map(move |mut x| { - // assign the doc key to the `meta` field of the initial progress event - if let ExportProgress::Found { meta, .. } = &mut x { - *meta = Some(key.clone()) - } - x - }); - let blobs = self.blobs(); - iroh_blobs::export::export( - blobs.store(), - entry.content_hash(), - path, - ExportFormat::Blob, - mode, - export_progress, - ) - .await?; - progress.send(ExportProgress::AllDone).await?; - Ok(()) - } - fn blob_download(self, msg: BlobDownloadRequest) -> impl Stream { let (sender, receiver) = async_channel::bounded(1024); let endpoint = self.inner.endpoint.clone(); diff --git a/iroh/src/node/rpc/docs.rs b/iroh/src/node/rpc/docs.rs index e8411aeb17..ee74318084 100644 --- a/iroh/src/node/rpc/docs.rs +++ b/iroh/src/node/rpc/docs.rs @@ -6,28 +6,11 @@ use iroh_blobs::{store::Store as BaoStore, BlobFormat}; use iroh_docs::{Author, DocTicket, NamespaceSecret}; use super::{Handler, RpcError, RpcResult}; -use crate::{ - client::docs::ShareMode, - rpc_protocol::{ - authors::{ - CreateRequest, CreateResponse, DeleteRequest, DeleteResponse, ExportRequest, - ExportResponse, GetDefaultRequest, GetDefaultResponse, ImportRequest, ImportResponse, - ListRequest as AuthorListRequest, ListResponse as AuthorListResponse, - SetDefaultRequest, SetDefaultResponse, - }, - docs::{ - CloseRequest, CloseResponse, CreateRequest as DocCreateRequest, - CreateResponse as DocCreateResponse, DelRequest, DelResponse, DocListRequest, - DocSubscribeRequest, DocSubscribeResponse, DropRequest, DropResponse, - GetDownloadPolicyRequest, GetDownloadPolicyResponse, GetExactRequest, GetExactResponse, - GetManyRequest, GetManyResponse, GetSyncPeersRequest, GetSyncPeersResponse, - ImportRequest as DocImportRequest, ImportResponse as DocImportResponse, LeaveRequest, - LeaveResponse, ListResponse as DocListResponse, OpenRequest, OpenResponse, - SetDownloadPolicyRequest, SetDownloadPolicyResponse, SetHashRequest, SetHashResponse, - SetRequest, SetResponse, ShareRequest, ShareResponse, StartSyncRequest, - StartSyncResponse, StatusRequest, StatusResponse, - }, - }, +use crate::rpc_protocol::authors::{ + CreateRequest, CreateResponse, DeleteRequest, DeleteResponse, ExportRequest, ExportResponse, + GetDefaultRequest, GetDefaultResponse, ImportRequest, ImportResponse, + ListRequest as AuthorListRequest, ListResponse as AuthorListResponse, SetDefaultRequest, + SetDefaultResponse, }; /// Capacity for the flume channels to forward sync store iterators to async RPC streams. @@ -136,338 +119,4 @@ impl Handler { }) .await } - - pub(super) async fn doc_create(self, _req: DocCreateRequest) -> RpcResult { - self.with_docs(|docs| async move { - let namespace = NamespaceSecret::new(&mut rand::rngs::OsRng {}); - let id = namespace.id(); - docs.sync - .import_namespace(namespace.into()) - .await - .map_err(|e| RpcError::new(&*e))?; - docs.sync - .open(id, Default::default()) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(DocCreateResponse { id }) - }) - .await - } - - pub(super) async fn doc_drop(self, req: DropRequest) -> RpcResult { - self.with_docs(|docs| async move { - let DropRequest { doc_id } = req; - docs.leave(doc_id, true) - .await - .map_err(|e| RpcError::new(&*e))?; - docs.sync - .drop_replica(doc_id) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(DropResponse {}) - }) - .await - } - - pub(super) fn doc_list( - self, - _req: DocListRequest, - ) -> impl Stream> + Unpin { - self.with_docs_stream(|docs| { - let (tx, rx) = async_channel::bounded(ITER_CHANNEL_CAP); - let sync = docs.sync.clone(); - // we need to spawn a task to send our request to the sync handle, because the method - // itself must be sync. - tokio::task::spawn(async move { - let tx2 = tx.clone(); - if let Err(err) = sync.list_replicas(tx).await { - tx2.send(Err(err)).await.ok(); - } - }); - rx.boxed().map(|r| { - r.map(|(id, capability)| DocListResponse { id, capability }) - .map_err(|e| RpcError::new(&*e)) - }) - }) - } - - pub(super) async fn doc_open(self, req: OpenRequest) -> RpcResult { - self.with_docs(|docs| async move { - docs.sync - .open(req.doc_id, Default::default()) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(OpenResponse {}) - }) - .await - } - - pub(super) async fn doc_close(self, req: CloseRequest) -> RpcResult { - self.with_docs(|docs| async move { - docs.sync - .close(req.doc_id) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(CloseResponse {}) - }) - .await - } - - pub(super) async fn doc_status(self, req: StatusRequest) -> RpcResult { - self.with_docs(|docs| async move { - let status = docs - .sync - .get_state(req.doc_id) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(StatusResponse { status }) - }) - .await - } - - pub(super) async fn doc_share(self, req: ShareRequest) -> RpcResult { - self.with_docs(|docs| async move { - let ShareRequest { - doc_id, - mode, - addr_options, - } = req; - let mut me = docs - .endpoint - .node_addr() - .await - .map_err(|e| RpcError::new(&*e))?; - me.apply_options(addr_options); - - let capability = match mode { - ShareMode::Read => iroh_docs::Capability::Read(doc_id), - ShareMode::Write => { - let secret = docs - .sync - .export_secret_key(doc_id) - .await - .map_err(|e| RpcError::new(&*e))?; - iroh_docs::Capability::Write(secret) - } - }; - docs.start_sync(doc_id, vec![]) - .await - .map_err(|e| RpcError::new(&*e))?; - - Ok(ShareResponse(DocTicket { - capability, - nodes: vec![me], - })) - }) - .await - } - - pub(super) async fn doc_subscribe( - self, - req: DocSubscribeRequest, - ) -> RpcResult>> { - self.with_docs(|docs| async move { - let stream = docs - .subscribe(req.doc_id) - .await - .map_err(|e| RpcError::new(&*e))?; - - Ok(stream.map(|el| { - el.map(|event| DocSubscribeResponse { event }) - .map_err(|e| RpcError::new(&*e)) - })) - }) - .await - } - - pub(super) async fn doc_import(self, req: DocImportRequest) -> RpcResult { - self.with_docs(|docs| async move { - let DocImportRequest { capability } = req; - let doc_id = docs - .sync - .import_namespace(capability) - .await - .map_err(|e| RpcError::new(&*e))?; - docs.sync - .open(doc_id, Default::default()) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(DocImportResponse { doc_id }) - }) - .await - } - - pub(super) async fn doc_start_sync( - self, - req: StartSyncRequest, - ) -> RpcResult { - self.with_docs(|docs| async move { - let StartSyncRequest { doc_id, peers } = req; - docs.start_sync(doc_id, peers) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(StartSyncResponse {}) - }) - .await - } - - pub(super) async fn doc_leave(self, req: LeaveRequest) -> RpcResult { - self.with_docs(|docs| async move { - let LeaveRequest { doc_id } = req; - docs.leave(doc_id, false) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(LeaveResponse {}) - }) - .await - } - - pub(super) async fn doc_set(self, req: SetRequest) -> RpcResult { - let blobs_store = self.blobs_store(); - self.with_docs(|docs| async move { - let SetRequest { - doc_id, - author_id, - key, - value, - } = req; - let len = value.len(); - let tag = blobs_store - .import_bytes(value, BlobFormat::Raw) - .await - .map_err(|e| RpcError::new(&e))?; - docs.sync - .insert_local(doc_id, author_id, key.clone(), *tag.hash(), len as u64) - .await - .map_err(|e| RpcError::new(&*e))?; - let entry = docs - .sync - .get_exact(doc_id, author_id, key, false) - .await - .map_err(|e| RpcError::new(&*e))? - .ok_or_else(|| RpcError::new(&*anyhow!("failed to get entry after insertion")))?; - Ok(SetResponse { entry }) - }) - .await - } - - pub(super) async fn doc_del(self, req: DelRequest) -> RpcResult { - self.with_docs(|docs| async move { - let DelRequest { - doc_id, - author_id, - prefix, - } = req; - let removed = docs - .sync - .delete_prefix(doc_id, author_id, prefix) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(DelResponse { removed }) - }) - .await - } - - pub(super) async fn doc_set_hash(self, req: SetHashRequest) -> RpcResult { - self.with_docs(|docs| async move { - let SetHashRequest { - doc_id, - author_id, - key, - hash, - size, - } = req; - docs.sync - .insert_local(doc_id, author_id, key.clone(), hash, size) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(SetHashResponse {}) - }) - .await - } - - pub(super) fn doc_get_many( - self, - req: GetManyRequest, - ) -> impl Stream> + Unpin { - let GetManyRequest { doc_id, query } = req; - self.with_docs_stream(move |docs| { - let (tx, rx) = async_channel::bounded(ITER_CHANNEL_CAP); - let sync = docs.sync.clone(); - // we need to spawn a task to send our request to the sync handle, because the method - // itself must be sync. - tokio::task::spawn(async move { - let tx2 = tx.clone(); - if let Err(err) = sync.get_many(doc_id, query, tx).await { - tx2.send(Err(err)).await.ok(); - } - }); - rx.boxed().map(|r| { - r.map(|entry| GetManyResponse { entry }) - .map_err(|e| RpcError::new(&*e)) - }) - }) - } - - pub(super) async fn doc_get_exact(self, req: GetExactRequest) -> RpcResult { - self.with_docs(|docs| async move { - let GetExactRequest { - doc_id, - author, - key, - include_empty, - } = req; - let entry = docs - .sync - .get_exact(doc_id, author, key, include_empty) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(GetExactResponse { entry }) - }) - .await - } - - pub(super) async fn doc_set_download_policy( - self, - req: SetDownloadPolicyRequest, - ) -> RpcResult { - self.with_docs(|docs| async move { - docs.sync - .set_download_policy(req.doc_id, req.policy) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(SetDownloadPolicyResponse {}) - }) - .await - } - - pub(super) async fn doc_get_download_policy( - self, - req: GetDownloadPolicyRequest, - ) -> RpcResult { - self.with_docs(|docs| async move { - let policy = docs - .sync - .get_download_policy(req.doc_id) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(GetDownloadPolicyResponse { policy }) - }) - .await - } - - pub(super) async fn doc_get_sync_peers( - self, - req: GetSyncPeersRequest, - ) -> RpcResult { - self.with_docs(|docs| async move { - let peers = docs - .sync - .get_sync_peers(req.doc_id) - .await - .map_err(|e| RpcError::new(&*e))?; - Ok(GetSyncPeersResponse { peers }) - }) - .await - } } diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index d062c6c444..f81182646a 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -19,7 +19,6 @@ use serde::{Deserialize, Serialize}; pub mod authors; pub mod blobs; -pub mod docs; pub mod net; pub mod node; pub mod tags; @@ -36,7 +35,7 @@ pub enum Request { Node(node::Request), Net(net::Request), Blobs(blobs::Request), - Docs(docs::Request), + Docs(iroh_docs::rpc::proto::Request), Tags(tags::Request), Authors(authors::Request), Gossip(iroh_gossip::RpcRequest), @@ -51,7 +50,7 @@ pub enum Response { Net(net::Response), Blobs(blobs::Response), Tags(tags::Response), - Docs(docs::Response), + Docs(iroh_docs::rpc::proto::Response), Authors(authors::Response), Gossip(iroh_gossip::RpcResponse), }