Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: extract RPC definitions into here #5

Merged
merged 27 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
176715a
refactor: extract RPC definitions into here
dignifiedquire Oct 30, 2024
a761c37
add new method
dignifiedquire Oct 30, 2024
70c565f
minimal test working
dignifiedquire Oct 31, 2024
558ca8a
extract authors
dignifiedquire Oct 31, 2024
552a3d8
Merge branch 'main' into refactor-extract-rpc
rklaehn Nov 7, 2024
7044d0e
use latest quic-rpc
rklaehn Nov 7, 2024
979c33a
Add blob store to engine itself as well
rklaehn Nov 7, 2024
cad1d0b
reenable blobs related fns
rklaehn Nov 7, 2024
2e988b0
Add unicode-3.0 licnese to deny.toml allow list
rklaehn Nov 7, 2024
02de27d
allow iroh-gossip git
rklaehn Nov 7, 2024
fc83d5c
Add complete iroh gossip and docs node so that tests pass.
rklaehn Nov 8, 2024
f745aa8
Reactivate all old tests and fix warnings
rklaehn Nov 11, 2024
db497b0
Only run the big tests when --all-features is given
rklaehn Nov 11, 2024
a2a8c64
more tests - will it ever end
rklaehn Nov 11, 2024
7fc22a0
Fix last remaining test
rklaehn Nov 12, 2024
dc9de49
clippy
rklaehn Nov 12, 2024
bf8338c
use TestResult
rklaehn Nov 12, 2024
e87f0a9
fix unused warning
rklaehn Nov 12, 2024
38e2e7f
WIP
rklaehn Nov 12, 2024
e9aa128
Accept unmaintained instant crate
rklaehn Nov 12, 2024
6d32cdc
Go back to authors and docs split
rklaehn Nov 12, 2024
ce59f9b
fix docs
rklaehn Nov 12, 2024
ac68d39
dedup flatten
rklaehn Nov 13, 2024
ac9400a
Remove redundant tests
rklaehn Nov 13, 2024
c94a761
Reexport LiveEvent instead of duplicating it
rklaehn Nov 13, 2024
4bc6531
fmt
rklaehn Nov 13, 2024
622d1c3
clippy
rklaehn Nov 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
504 changes: 434 additions & 70 deletions Cargo.lock

Large diffs are not rendered by default.

26 changes: 24 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,49 @@ redb = { version = "2.0.0" }
redb_v1 = { package = "redb", version = "1.5.1" }
self_cell = "1.0.3"
serde = { version = "1.0.164", features = ["derive"] }
strum = { version = "0.25", features = ["derive"] }
strum = { version = "0.26", features = ["derive"] }
tempfile = { version = "3.4" }
thiserror = "1"
tokio = { version = "1", features = ["sync", "rt", "time", "macros"] }
tokio-stream = { version = "0.1", optional = true, features = ["sync"]}
tokio-util = { version = "0.7.12", optional = true, features = ["codec", "io-util", "io", "rt"] }
tracing = "0.1"

# rpc
nested_enum_utils = { version = "0.1.0", optional = true }
quic-rpc = { version = "0.15", optional = true }
quic-rpc-derive = { version = "0.15", optional = true }
serde-error = { version = "0.1.3", optional = true }
portable-atomic = { version = "1.9.0", optional = true }

[dev-dependencies]
iroh-test = "0.28.0"
rand_chacha = "0.3.1"
tokio = { version = "1", features = ["sync", "macros"] }
proptest = "1.2.0"
tempfile = "3.4"
test-strategy = "0.3.1"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
parking_lot = "0.12.3"
testresult = "0.4.1"
nested_enum_utils = "0.1.0"
iroh-io = "0.6.1"
testdir = "0.9.1"

[features]
default = ["net", "metrics", "engine"]
default = ["net", "metrics", "engine", "rpc", "test-utils"]
net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util"]
metrics = ["iroh-metrics/metrics"]
engine = ["net", "dep:iroh-gossip", "dep:iroh-blobs", "dep:iroh-router"]
test-utils = ["iroh-net/test-utils"]
rpc = [
"engine",
"dep:nested_enum_utils",
"dep:quic-rpc",
"dep:quic-rpc-derive",
"dep:serde-error",
"dep:portable-atomic",
]

[package.metadata.docs.rs]
all-features = true
Expand Down
3 changes: 3 additions & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ allow = [
"Unicode-DFS-2016",
"Zlib",
"MPL-2.0", # https://fossa.com/blog/open-source-software-licenses-101-mozilla-public-license-2-0/
"Unicode-3.0"
]

[[licenses.clarify]]
Expand All @@ -33,10 +34,12 @@ license-files = [
[advisories]
ignore = [
"RUSTSEC-2024-0370", # unmaintained, no upgrade available
"RUSTSEC-2024-0384", # unmaintained, no upgrade available
]

[sources]
allow-git = [
"https://github.com/n0-computer/iroh.git",
"https://github.com/n0-computer/iroh-blobs.git",
"https://github.com/n0-computer/iroh-gossip.git",
]
28 changes: 22 additions & 6 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use std::{

use anyhow::{bail, Context, Result};
use futures_lite::{Stream, StreamExt};
use iroh_blobs::{downloader::Downloader, store::EntryStatus, Hash};
use iroh_blobs::{
downloader::Downloader, store::EntryStatus, util::local_pool::LocalPoolHandle, Hash,
};
use iroh_gossip::net::Gossip;
use iroh_net::{key::PublicKey, Endpoint, NodeAddr};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -40,7 +42,7 @@ const SUBSCRIBE_CHANNEL_CAP: usize = 256;
/// The sync engine coordinates actors that manage open documents, set-reconciliation syncs with
/// peers and a gossip swarm for each syncing document.
#[derive(derive_more::Debug, Clone)]
pub struct Engine {
pub struct Engine<D> {
/// [`Endpoint`] used by the engine.
pub endpoint: Endpoint,
/// Handle to the actor thread.
Expand All @@ -52,20 +54,23 @@ pub struct Engine {
actor_handle: Arc<AbortOnDropHandle<()>>,
#[debug("ContentStatusCallback")]
content_status_cb: ContentStatusCallback,
local_pool_handle: LocalPoolHandle,
blob_store: D,
}

impl Engine {
impl<D: iroh_blobs::store::Store> Engine<D> {
/// Start the sync engine.
///
/// This will spawn two tokio tasks for the live sync coordination and gossip actors, and a
/// thread for the [`crate::actor::SyncHandle`].
pub async fn spawn<B: iroh_blobs::store::Store>(
pub async fn spawn(
endpoint: Endpoint,
gossip: Gossip,
replica_store: crate::store::Store,
bao_store: B,
bao_store: D,
downloader: Downloader,
default_author_storage: DefaultAuthorStorage,
local_pool_handle: LocalPoolHandle,
) -> anyhow::Result<Self> {
let (live_actor_tx, to_live_actor_recv) = mpsc::channel(ACTOR_CHANNEL_CAP);
let me = endpoint.node_id().fmt_short();
Expand All @@ -80,7 +85,7 @@ impl Engine {
sync.clone(),
endpoint.clone(),
gossip.clone(),
bao_store,
bao_store.clone(),
downloader,
to_live_actor_recv,
live_actor_tx.clone(),
Expand Down Expand Up @@ -111,9 +116,16 @@ impl Engine {
actor_handle: Arc::new(AbortOnDropHandle::new(actor_handle)),
content_status_cb,
default_author: Arc::new(default_author),
local_pool_handle,
blob_store: bao_store,
})
}

/// Get the blob store.
pub fn blob_store(&self) -> &D {
&self.blob_store
}

/// Start to sync a document.
///
/// If `peers` is non-empty, it will both do an initial set-reconciliation sync with each peer,
Expand Down Expand Up @@ -205,6 +217,10 @@ impl Engine {
reply_rx.await?;
Ok(())
}

pub(crate) fn local_pool_handle(&self) -> &LocalPoolHandle {
&self.local_pool_handle
}
}

/// Converts an [`EntryStatus`] into a ['ContentStatus'].
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ mod ticket;
#[cfg(feature = "engine")]
#[cfg_attr(iroh_docsrs, doc(cfg(feature = "engine")))]
pub mod engine;
#[cfg(feature = "engine")]
#[cfg_attr(iroh_docsrs, doc(cfg(feature = "engine")))]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be featurerpc

pub mod rpc;

pub mod actor;
pub mod store;
Expand Down
2 changes: 1 addition & 1 deletion src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use iroh_router::ProtocolHandler;

use crate::engine::Engine;

impl ProtocolHandler for Engine {
impl<D: iroh_blobs::store::Store> ProtocolHandler for Engine<D> {
fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
Box::pin(async move { self.handle_connection(conn).await })
}
Expand Down
1 change: 0 additions & 1 deletion src/ranger.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Implementation of Set Reconcilliation based on
//! "Range-Based Set Reconciliation" by Aljoscha Meyer.
//!

use std::{cmp::Ordering, fmt::Debug};

Expand Down
67 changes: 67 additions & 0 deletions src/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! Quic RPC implementation for docs.

use proto::RpcService;
use quic_rpc::server::{ChannelTypes, RpcChannel};

use crate::engine::Engine;

pub mod client;
pub mod proto;

mod docs_handle_request;

type RpcError = serde_error::Error;
type RpcResult<T> = std::result::Result<T, RpcError>;

impl<D: iroh_blobs::store::Store> Engine<D> {
/// Handle a docs request from the RPC server.
pub async fn handle_rpc_request<C: ChannelTypes<RpcService>>(
&self,
msg: crate::rpc::proto::Request,
chan: RpcChannel<RpcService, C>,
) -> Result<(), quic_rpc::server::RpcServerError<C>> {
use crate::rpc::proto::Request::*;

let this = self.clone();
match msg {
Open(msg) => chan.rpc(msg, this, Self::doc_open).await,
Close(msg) => chan.rpc(msg, this, Self::doc_close).await,
Status(msg) => chan.rpc(msg, this, Self::doc_status).await,
List(msg) => chan.server_streaming(msg, this, Self::doc_list).await,
Create(msg) => chan.rpc(msg, this, Self::doc_create).await,
Drop(msg) => chan.rpc(msg, this, Self::doc_drop).await,
Import(msg) => chan.rpc(msg, this, Self::doc_import).await,
Set(msg) => chan.rpc(msg, this, Self::doc_set).await,
ImportFile(msg) => {
chan.server_streaming(msg, this, Self::doc_import_file)
.await
}
ExportFile(msg) => {
chan.server_streaming(msg, this, Self::doc_export_file)
.await
}
Del(msg) => chan.rpc(msg, this, Self::doc_del).await,
SetHash(msg) => chan.rpc(msg, this, Self::doc_set_hash).await,
Get(msg) => chan.server_streaming(msg, this, Self::doc_get_many).await,
GetExact(msg) => chan.rpc(msg, this, Self::doc_get_exact).await,
StartSync(msg) => chan.rpc(msg, this, Self::doc_start_sync).await,
Leave(msg) => chan.rpc(msg, this, Self::doc_leave).await,
Share(msg) => chan.rpc(msg, this, Self::doc_share).await,
Subscribe(msg) => {
chan.try_server_streaming(msg, this, Self::doc_subscribe)
.await
}
SetDownloadPolicy(msg) => chan.rpc(msg, this, Self::doc_set_download_policy).await,
GetDownloadPolicy(msg) => chan.rpc(msg, this, Self::doc_get_download_policy).await,
GetSyncPeers(msg) => chan.rpc(msg, this, Self::doc_get_sync_peers).await,

AuthorList(msg) => chan.server_streaming(msg, this, Self::author_list).await,
AuthorCreate(msg) => chan.rpc(msg, this, Self::author_create).await,
AuthorImport(msg) => chan.rpc(msg, this, Self::author_import).await,
AuthorExport(msg) => chan.rpc(msg, this, Self::author_export).await,
AuthorDelete(msg) => chan.rpc(msg, this, Self::author_delete).await,
AuthorGetDefault(msg) => chan.rpc(msg, this, Self::author_default).await,
AuthorSetDefault(msg) => chan.rpc(msg, this, Self::author_set_default).await,
}
}
}
3 changes: 3 additions & 0 deletions src/rpc/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! RPC Client for docs and authors
pub mod authors;
pub mod docs;
113 changes: 113 additions & 0 deletions src/rpc/client/authors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//! API for document management.
//!
//! The main entry point is the [`Client`].

use anyhow::Result;
use futures_lite::{Stream, StreamExt};
use quic_rpc::{client::BoxedConnector, Connector};

#[doc(inline)]
pub use crate::engine::{Origin, SyncEvent, SyncReason};
use crate::{
rpc::proto::{
AuthorCreateRequest, AuthorDeleteRequest, AuthorExportRequest, AuthorGetDefaultRequest,
AuthorImportRequest, AuthorListRequest, AuthorSetDefaultRequest, RpcService,
},
Author, AuthorId,
};

/// Iroh docs client.
#[derive(Debug, Clone)]
pub struct Client<C = BoxedConnector<RpcService>> {
pub(super) rpc: quic_rpc::RpcClient<RpcService, C>,
}

impl<C: Connector<RpcService>> Client<C> {
/// Creates a new docs client.
pub fn new(rpc: quic_rpc::RpcClient<RpcService, C>) -> Self {
Self { rpc }
}

/// Creates a new document author.
///
/// You likely want to save the returned [`AuthorId`] somewhere so that you can use this author
/// again.
///
/// If you need only a single author, use [`Self::default`].
pub async fn create(&self) -> Result<AuthorId> {
let res = self.rpc.rpc(AuthorCreateRequest).await??;
Ok(res.author_id)
}

/// Returns the default document author of this node.
///
/// On persistent nodes, the author is created on first start and its public key is saved
/// in the data directory.
///
/// The default author can be set with [`Self::set_default`].
pub async fn default(&self) -> Result<AuthorId> {
let res = self.rpc.rpc(AuthorGetDefaultRequest).await??;
Ok(res.author_id)
}

/// Sets the node-wide default author.
///
/// If the author does not exist, an error is returned.
///
/// On a persistent node, the author id will be saved to a file in the data directory and
/// reloaded after a restart.
pub async fn set_default(&self, author_id: AuthorId) -> Result<()> {
self.rpc
.rpc(AuthorSetDefaultRequest { author_id })
.await??;
Ok(())
}

/// Lists document authors for which we have a secret key.
///
/// It's only possible to create writes from authors that we have the secret key of.
pub async fn list(&self) -> Result<impl Stream<Item = Result<AuthorId>>> {
let stream = self.rpc.server_streaming(AuthorListRequest {}).await?;
Ok(flatten(stream).map(|res| res.map(|res| res.author_id)))
}

/// Exports the given author.
///
/// Warning: The [`Author`] struct contains sensitive data.
pub async fn export(&self, author: AuthorId) -> Result<Option<Author>> {
let res = self.rpc.rpc(AuthorExportRequest { author }).await??;
Ok(res.author)
}

/// Imports the given author.
///
/// Warning: The [`Author`] struct contains sensitive data.
pub async fn import(&self, author: Author) -> Result<()> {
self.rpc.rpc(AuthorImportRequest { author }).await??;
Ok(())
}

/// Deletes the given author by id.
///
/// Warning: This permanently removes this author.
///
/// Returns an error if attempting to delete the default author.
pub async fn delete(&self, author: AuthorId) -> Result<()> {
self.rpc.rpc(AuthorDeleteRequest { author }).await??;
Ok(())
}
}

fn flatten<T, E1, E2>(
s: impl Stream<Item = Result<Result<T, E1>, E2>>,
) -> impl Stream<Item = Result<T>>
where
E1: std::error::Error + Send + Sync + 'static,
E2: std::error::Error + Send + Sync + 'static,
{
s.map(|res| match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err.into()),
Err(err) => Err(err.into()),
})
}
Loading
Loading