Skip to content

Commit

Permalink
Add Rust consumer implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
hpeebles committed Mar 8, 2024
1 parent 27624b9 commit 3258ea5
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 0 deletions.
30 changes: 30 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
members = [
"rs/canister/api",
"rs/canister/impl",
"rs/consumer",
"rs/consumer/agent_runtime",
"rs/consumer/cdk_runtime",
"rs/producer",
"rs/producer/agent_runtime",
"rs/producer/cdk_runtime",
Expand Down
10 changes: 10 additions & 0 deletions rs/consumer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "event_store_consumer"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
ic_principal.workspace = true
event_store_canister.path = "../canister/api"
13 changes: 13 additions & 0 deletions rs/consumer/agent_runtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "event_store_consumer_agent_runtime"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
candid.workspace = true
event_store_canister.path = "../../canister/api"
event_store_consumer.path = ".."
ic-agent.workspace = true
ic_principal.workspace = true
45 changes: 45 additions & 0 deletions rs/consumer/agent_runtime/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use event_store_canister::{EventsArgs, EventsResponse};
use event_store_consumer::Runtime;
use ic_agent::{Agent, AgentError};
use ic_principal::Principal;
use std::future::Future;

pub struct AgentRuntime {
agent: Agent,
}

impl AgentRuntime {
pub fn new(agent: Agent) -> Self {
Self { agent }
}

async fn events_async(
&self,
canister_id: Principal,
args: EventsArgs,
) -> Result<EventsResponse, (i32, String)> {
match self
.agent
.query(&canister_id, "events")
.with_arg(candid::encode_one(args).unwrap())
.call()
.await
{
Ok(response) => Ok(candid::decode_one(&response).unwrap()),
Err(AgentError::ReplicaError(error)) => {
Err((error.reject_code as i32, error.reject_message))
}
Err(error) => Err((0, error.to_string())),
}
}
}

impl Runtime for AgentRuntime {
fn events(
&self,
canister_id: Principal,
args: EventsArgs,
) -> impl Future<Output = Result<EventsResponse, (i32, String)>> + Send {
self.events_async(canister_id, args)
}
}
13 changes: 13 additions & 0 deletions rs/consumer/cdk_runtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "event_store_consumer_cdk_runtime"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
event_store_canister.path = "../../canister/api"
event_store_consumer.path = ".."
ic-cdk.workspace = true
ic_principal.workspace = true
serde.workspace = true
31 changes: 31 additions & 0 deletions rs/consumer/cdk_runtime/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use event_store_canister::{EventsArgs, EventsResponse};
use event_store_consumer::Runtime;
use ic_principal::Principal;
use serde::{Deserialize, Serialize};
use std::future::Future;

#[derive(Serialize, Deserialize, Default)]
pub struct CdkRuntime;

impl CdkRuntime {
async fn events_async(
&self,
canister_id: Principal,
args: EventsArgs,
) -> Result<EventsResponse, (i32, String)> {
match ic_cdk::call(canister_id, "events", (args,)).await {
Ok((response,)) => Ok(response),
Err((code, msg)) => Err((code as i32, msg)),
}
}
}

impl Runtime for CdkRuntime {
fn events(
&self,
canister_id: Principal,
args: EventsArgs,
) -> impl Future<Output = Result<EventsResponse, (i32, String)>> + Send {
self.events_async(canister_id, args)
}
}
75 changes: 75 additions & 0 deletions rs/consumer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use event_store_canister::{EventsArgs, EventsResponse};
use ic_principal::Principal;

pub struct EventStoreClient<R> {
event_store_canister_id: Principal,
runtime: R,
synced_up_to: Option<u64>,
batch_size: u64,
}

pub struct EventStoreClientBuilder<R> {
event_store_canister_id: Principal,
runtime: R,
synced_up_to: Option<u64>,
batch_size: Option<u64>,
}

impl<R: Runtime> EventStoreClient<R> {
pub async fn next_batch(&mut self) -> Result<EventsResponse, (i32, String)> {
let response = self
.runtime
.events(
self.event_store_canister_id,
EventsArgs {
start: self.synced_up_to.map_or(0, |i| i + 1),
length: self.batch_size,
},
)
.await?;

if let Some(event) = response.events.last() {
self.synced_up_to = Some(event.index);
}

Ok(response)
}
}

impl<R> EventStoreClientBuilder<R> {
pub fn new(event_store_canister_id: Principal, runtime: R) -> Self {
Self {
event_store_canister_id,
runtime,
synced_up_to: None,
batch_size: None,
}
}

pub fn set_synced_up_to(mut self, synced_up_to: u64) -> Self {
self.synced_up_to = Some(synced_up_to);
self
}

pub fn with_batch_size(mut self, batch_size: u64) -> Self {
self.batch_size = Some(batch_size);
self
}

pub fn build(self) -> EventStoreClient<R> {
EventStoreClient {
event_store_canister_id: self.event_store_canister_id,
runtime: self.runtime,
synced_up_to: self.synced_up_to,
batch_size: self.batch_size.unwrap_or(1000),
}
}
}

pub trait Runtime {
fn events(
&self,
canister_id: Principal,
args: EventsArgs,
) -> impl std::future::Future<Output = Result<EventsResponse, (i32, String)>> + Send;
}

0 comments on commit 3258ea5

Please sign in to comment.