From 3258ea5c418f0fd8ee14b816bd00189a1881d6c0 Mon Sep 17 00:00:00 2001 From: Hamish Peebles Date: Fri, 8 Mar 2024 15:24:37 +0000 Subject: [PATCH] Add Rust `consumer` implementation --- Cargo.lock | 30 +++++++++++ Cargo.toml | 3 ++ rs/consumer/Cargo.toml | 10 ++++ rs/consumer/agent_runtime/Cargo.toml | 13 +++++ rs/consumer/agent_runtime/src/lib.rs | 45 +++++++++++++++++ rs/consumer/cdk_runtime/Cargo.toml | 13 +++++ rs/consumer/cdk_runtime/src/lib.rs | 31 ++++++++++++ rs/consumer/src/lib.rs | 75 ++++++++++++++++++++++++++++ 8 files changed, 220 insertions(+) create mode 100644 rs/consumer/Cargo.toml create mode 100644 rs/consumer/agent_runtime/Cargo.toml create mode 100644 rs/consumer/agent_runtime/src/lib.rs create mode 100644 rs/consumer/cdk_runtime/Cargo.toml create mode 100644 rs/consumer/cdk_runtime/src/lib.rs create mode 100644 rs/consumer/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 473a2a7..aa4e8d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -519,6 +519,36 @@ dependencies = [ "sha2 0.10.8", ] +[[package]] +name = "event_store_consumer" +version = "0.1.0" +dependencies = [ + "event_store_canister", + "ic_principal", +] + +[[package]] +name = "event_store_consumer_agent_runtime" +version = "0.1.0" +dependencies = [ + "candid", + "event_store_canister", + "event_store_consumer", + "ic-agent", + "ic_principal", +] + +[[package]] +name = "event_store_consumer_cdk_runtime" +version = "0.1.0" +dependencies = [ + "event_store_canister", + "event_store_consumer", + "ic-cdk", + "ic_principal", + "serde", +] + [[package]] name = "event_store_producer" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 8f8ede2..bee7181 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,9 @@ members = [ "rs/canister/api", "rs/canister/impl", + "rs/consumer", + "rs/consumer/agent_runtime", + "rs/consumer/cdk_runtime", "rs/producer", "rs/producer/agent_runtime", "rs/producer/cdk_runtime", diff --git a/rs/consumer/Cargo.toml b/rs/consumer/Cargo.toml new file mode 100644 index 0000000..f6b797c --- /dev/null +++ b/rs/consumer/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "event_store_consumer" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +ic_principal.workspace = true +event_store_canister.path = "../canister/api" diff --git a/rs/consumer/agent_runtime/Cargo.toml b/rs/consumer/agent_runtime/Cargo.toml new file mode 100644 index 0000000..437fc58 --- /dev/null +++ b/rs/consumer/agent_runtime/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "event_store_consumer_agent_runtime" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +candid.workspace = true +event_store_canister.path = "../../canister/api" +event_store_consumer.path = ".." +ic-agent.workspace = true +ic_principal.workspace = true diff --git a/rs/consumer/agent_runtime/src/lib.rs b/rs/consumer/agent_runtime/src/lib.rs new file mode 100644 index 0000000..2cc4333 --- /dev/null +++ b/rs/consumer/agent_runtime/src/lib.rs @@ -0,0 +1,45 @@ +use event_store_canister::{EventsArgs, EventsResponse}; +use event_store_consumer::Runtime; +use ic_agent::{Agent, AgentError}; +use ic_principal::Principal; +use std::future::Future; + +pub struct AgentRuntime { + agent: Agent, +} + +impl AgentRuntime { + pub fn new(agent: Agent) -> Self { + Self { agent } + } + + async fn events_async( + &self, + canister_id: Principal, + args: EventsArgs, + ) -> Result { + match self + .agent + .query(&canister_id, "events") + .with_arg(candid::encode_one(args).unwrap()) + .call() + .await + { + Ok(response) => Ok(candid::decode_one(&response).unwrap()), + Err(AgentError::ReplicaError(error)) => { + Err((error.reject_code as i32, error.reject_message)) + } + Err(error) => Err((0, error.to_string())), + } + } +} + +impl Runtime for AgentRuntime { + fn events( + &self, + canister_id: Principal, + args: EventsArgs, + ) -> impl Future> + Send { + self.events_async(canister_id, args) + } +} diff --git a/rs/consumer/cdk_runtime/Cargo.toml b/rs/consumer/cdk_runtime/Cargo.toml new file mode 100644 index 0000000..3e9a196 --- /dev/null +++ b/rs/consumer/cdk_runtime/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "event_store_consumer_cdk_runtime" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +event_store_canister.path = "../../canister/api" +event_store_consumer.path = ".." +ic-cdk.workspace = true +ic_principal.workspace = true +serde.workspace = true diff --git a/rs/consumer/cdk_runtime/src/lib.rs b/rs/consumer/cdk_runtime/src/lib.rs new file mode 100644 index 0000000..12f7908 --- /dev/null +++ b/rs/consumer/cdk_runtime/src/lib.rs @@ -0,0 +1,31 @@ +use event_store_canister::{EventsArgs, EventsResponse}; +use event_store_consumer::Runtime; +use ic_principal::Principal; +use serde::{Deserialize, Serialize}; +use std::future::Future; + +#[derive(Serialize, Deserialize, Default)] +pub struct CdkRuntime; + +impl CdkRuntime { + async fn events_async( + &self, + canister_id: Principal, + args: EventsArgs, + ) -> Result { + match ic_cdk::call(canister_id, "events", (args,)).await { + Ok((response,)) => Ok(response), + Err((code, msg)) => Err((code as i32, msg)), + } + } +} + +impl Runtime for CdkRuntime { + fn events( + &self, + canister_id: Principal, + args: EventsArgs, + ) -> impl Future> + Send { + self.events_async(canister_id, args) + } +} diff --git a/rs/consumer/src/lib.rs b/rs/consumer/src/lib.rs new file mode 100644 index 0000000..e06c5d6 --- /dev/null +++ b/rs/consumer/src/lib.rs @@ -0,0 +1,75 @@ +use event_store_canister::{EventsArgs, EventsResponse}; +use ic_principal::Principal; + +pub struct EventStoreClient { + event_store_canister_id: Principal, + runtime: R, + synced_up_to: Option, + batch_size: u64, +} + +pub struct EventStoreClientBuilder { + event_store_canister_id: Principal, + runtime: R, + synced_up_to: Option, + batch_size: Option, +} + +impl EventStoreClient { + pub async fn next_batch(&mut self) -> Result { + let response = self + .runtime + .events( + self.event_store_canister_id, + EventsArgs { + start: self.synced_up_to.map_or(0, |i| i + 1), + length: self.batch_size, + }, + ) + .await?; + + if let Some(event) = response.events.last() { + self.synced_up_to = Some(event.index); + } + + Ok(response) + } +} + +impl EventStoreClientBuilder { + pub fn new(event_store_canister_id: Principal, runtime: R) -> Self { + Self { + event_store_canister_id, + runtime, + synced_up_to: None, + batch_size: None, + } + } + + pub fn set_synced_up_to(mut self, synced_up_to: u64) -> Self { + self.synced_up_to = Some(synced_up_to); + self + } + + pub fn with_batch_size(mut self, batch_size: u64) -> Self { + self.batch_size = Some(batch_size); + self + } + + pub fn build(self) -> EventStoreClient { + EventStoreClient { + event_store_canister_id: self.event_store_canister_id, + runtime: self.runtime, + synced_up_to: self.synced_up_to, + batch_size: self.batch_size.unwrap_or(1000), + } + } +} + +pub trait Runtime { + fn events( + &self, + canister_id: Principal, + args: EventsArgs, + ) -> impl std::future::Future> + Send; +}