Skip to content

Commit

Permalink
Merge pull request #7 from n0-computer/mem-rpc-client
Browse files Browse the repository at this point in the history
feat: add mem rpc client
  • Loading branch information
rklaehn authored Nov 15, 2024
2 parents 5489d1e + 3cf208a commit 6f1d8eb
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ tracing = "0.1"

# rpc
nested_enum_utils = { version = "0.1.0", optional = true }
quic-rpc = { version = "0.15", optional = true }
quic-rpc = { version = "0.15.1", optional = true }
quic-rpc-derive = { version = "0.15", optional = true }
serde-error = { version = "0.1.3", optional = true }
portable-atomic = { version = "1.9.0", optional = true }
Expand Down
4 changes: 4 additions & 0 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub struct Engine<D> {
content_status_cb: ContentStatusCallback,
local_pool_handle: LocalPoolHandle,
blob_store: D,
#[cfg(feature = "rpc")]
pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
}

impl<D: iroh_blobs::store::Store> Engine<D> {
Expand Down Expand Up @@ -118,6 +120,8 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
default_author: Arc::new(default_author),
local_pool_handle,
blob_store: bao_store,
#[cfg(feature = "rpc")]
rpc_handler: Default::default(),
})
}

Expand Down
45 changes: 38 additions & 7 deletions src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
//! Quic RPC implementation for docs.

use proto::RpcService;
use quic_rpc::server::{ChannelTypes, RpcChannel};
use proto::{Request, RpcService};
use quic_rpc::{
server::{ChannelTypes, RpcChannel},
RpcClient, RpcServer,
};
use tokio_util::task::AbortOnDropHandle;

use crate::engine::Engine;

Expand All @@ -14,15 +18,22 @@ type RpcError = serde_error::Error;
type RpcResult<T> = std::result::Result<T, RpcError>;

impl<D: iroh_blobs::store::Store> Engine<D> {
/// Get an in memory client to interact with the docs engine.
pub fn client(&self) -> &client::docs::MemClient {
&self
.rpc_handler
.get_or_init(|| RpcHandler::new(self))
.client
}

/// Handle a docs request from the RPC server.
pub async fn handle_rpc_request<C: ChannelTypes<RpcService>>(
&self,
msg: crate::rpc::proto::Request,
self,
msg: Request,
chan: RpcChannel<RpcService, C>,
) -> Result<(), quic_rpc::server::RpcServerError<C>> {
use crate::rpc::proto::Request::*;

let this = self.clone();
use Request::*;
let this = self;
match msg {
Open(msg) => chan.rpc(msg, this, Self::doc_open).await,
Close(msg) => chan.rpc(msg, this, Self::doc_close).await,
Expand Down Expand Up @@ -65,3 +76,23 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
}
}
}

#[derive(Debug)]
pub(crate) struct RpcHandler {
/// Client to hand out
client: client::docs::MemClient,
/// Handler task
_handler: AbortOnDropHandle<()>,
}

impl RpcHandler {
fn new<D: iroh_blobs::store::Store>(engine: &Engine<D>) -> Self {
let engine = engine.clone();
let (listener, connector) = quic_rpc::transport::flume::channel(1);
let listener = RpcServer::new(listener);
let client = client::docs::MemClient::new(RpcClient::new(connector));
let _handler = listener
.spawn_accept_loop(move |req, chan| engine.clone().handle_rpc_request(req, chan));
Self { client, _handler }
}
}
1 change: 1 addition & 0 deletions src/rpc/client/authors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{

/// Iroh docs client.
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct Client<C = BoxedConnector<RpcService>> {
pub(super) rpc: quic_rpc::RpcClient<RpcService, C>,
}
Expand Down
16 changes: 14 additions & 2 deletions src/rpc/client/docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ use iroh_base::node_addr::AddrInfoOptions;
use iroh_blobs::{export::ExportProgress, store::ExportMode, Hash};
use iroh_net::NodeAddr;
use portable_atomic::{AtomicBool, Ordering};
use quic_rpc::{client::BoxedConnector, message::RpcMsg, Connector};
use quic_rpc::{
client::BoxedConnector, message::RpcMsg, transport::flume::FlumeConnector, Connector,
};
use serde::{Deserialize, Serialize};

use super::flatten;
use super::{authors, flatten};
use crate::{
actor::OpenState,
rpc::proto::{
Expand All @@ -38,8 +40,13 @@ pub use crate::{
Entry,
};

/// Type alias for a memory-backed client.
pub type MemClient =
Client<FlumeConnector<crate::rpc::proto::Response, crate::rpc::proto::Request>>;

/// Iroh docs client.
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct Client<C = BoxedConnector<RpcService>> {
pub(super) rpc: quic_rpc::RpcClient<RpcService, C>,
}
Expand All @@ -50,6 +57,11 @@ impl<C: Connector<RpcService>> Client<C> {
Self { rpc }
}

/// Returns an authors client.
pub fn authors(&self) -> authors::Client<C> {
authors::Client::new(self.rpc.clone())
}

/// Creates a client.
pub async fn create(&self) -> Result<Doc<C>> {
let res = self.rpc.rpc(CreateRequest {}).await??;
Expand Down

0 comments on commit 6f1d8eb

Please sign in to comment.