Skip to content

Commit

Permalink
Migrate events to stable memory (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
hpeebles authored Feb 26, 2024
1 parent b043200 commit 7ce70d0
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 20 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rs/canister/api/can.did
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
type EventsArgs = record { start : nat64; length : nat64 };
type EventsResponse = record {
latest_event_index_in_stable_memory : opt nat64;
events : vec IndexedEvent;
latest_event_index : opt nat64;
};
Expand Down
1 change: 1 addition & 0 deletions rs/canister/api/src/queries/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ pub struct EventsArgs {
pub struct EventsResponse {
pub events: Vec<IndexedEvent>,
pub latest_event_index: Option<u64>,
pub latest_event_index_in_stable_memory: Option<u64>,
}
1 change: 1 addition & 0 deletions rs/canister/impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ ic-cdk.workspace = true
ic-stable-structures.workspace = true
rmp-serde.workspace = true
serde.workspace = true
serde_bytes.workspace = true
7 changes: 7 additions & 0 deletions rs/canister/impl/src/lifecycle/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use crate::state;
use ic_cdk::heartbeat;

#[heartbeat]
fn heartbeat() {
state::mutate(|s| s.events.migrate(1000));
}
1 change: 1 addition & 0 deletions rs/canister/impl/src/lifecycle/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod heartbeat;
mod init;
mod post_upgrade;
mod pre_upgrade;
Expand Down
4 changes: 1 addition & 3 deletions rs/canister/impl/src/lifecycle/post_upgrade.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::lifecycle::READER_WRITER_BUFFER_SIZE;
use crate::memory::{get_upgrades_memory, reset_memory_manager};
use crate::memory::get_upgrades_memory;
use crate::state;
use crate::state::State;
use ic_cdk::post_upgrade;
Expand All @@ -13,6 +13,4 @@ fn post_upgrade() {
let mut deserializer = rmp_serde::Deserializer::new(reader);

state::init(State::deserialize(&mut deserializer).unwrap());

reset_memory_manager();
}
28 changes: 13 additions & 15 deletions rs/canister/impl/src/memory.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,31 @@
use ic_cdk::api::stable::StableWriter;
use ic_stable_structures::{
memory_manager::{MemoryId, MemoryManager, VirtualMemory},
DefaultMemoryImpl,
};
use std::cell::RefCell;

const UPGRADES: MemoryId = MemoryId::new(0);
const EVENTS_INDEX: MemoryId = MemoryId::new(1);
const EVENTS_DATA: MemoryId = MemoryId::new(2);

pub type Memory = VirtualMemory<DefaultMemoryImpl>;

thread_local! {
static MEMORY_MANAGER: RefCell<MemoryManager<DefaultMemoryImpl>>
= RefCell::new(MemoryManager::init_with_bucket_size(DefaultMemoryImpl::default(), 128));
}

// This forces the buckets to be the specified size rather than preserving the previous bucket size
pub fn reset_memory_manager() {
let mut writer = StableWriter::default();
writer.write(&[0, 0, 0]).unwrap();
MEMORY_MANAGER.replace(MemoryManager::init_with_bucket_size(
DefaultMemoryImpl::default(),
128,
));
static MEMORY_MANAGER: MemoryManager<DefaultMemoryImpl>
= MemoryManager::init_with_bucket_size(DefaultMemoryImpl::default(), 128);
}

pub fn get_upgrades_memory() -> Memory {
get_memory(UPGRADES)
}

pub fn get_events_index_memory() -> Memory {
get_memory(EVENTS_INDEX)
}

pub fn get_events_data_memory() -> Memory {
get_memory(EVENTS_DATA)
}

fn get_memory(id: MemoryId) -> Memory {
MEMORY_MANAGER.with_borrow(|m| m.get(id))
MEMORY_MANAGER.with(|m| m.get(id))
}
111 changes: 109 additions & 2 deletions rs/canister/impl/src/model/events.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
use crate::memory::{get_events_data_memory, get_events_index_memory, Memory};
use candid::Deserialize;
use event_sink_canister::{IdempotentEvent, IndexedEvent};
use event_sink_canister::{IdempotentEvent, IndexedEvent, TimestampMillis};
use ic_stable_structures::storable::Bound;
use ic_stable_structures::{StableLog, Storable};
use serde::Serialize;
use std::borrow::Cow;
use std::collections::VecDeque;

#[derive(Serialize, Deserialize, Default)]
#[derive(Serialize, Deserialize)]
pub struct Events {
events: VecDeque<IndexedEvent>,
#[serde(skip, default = "init_events")]
events_v2: StableLog<StorableEvent, Memory, Memory>,
latest_event_index: Option<u64>,
}

Expand All @@ -23,8 +29,30 @@ impl Events {
}
}

pub fn migrate(&mut self, count: u64) {
for event in self.get(self.events_v2.len(), count) {
self.events_v2
.append(&StorableEvent {
index: event.index,
name: event.name,
timestamp: event.timestamp,
user: event.user,
source: event.source,
payload: event.payload,
})
.unwrap();
}
}

pub fn push(&mut self, event: IdempotentEvent) {
let index = self.latest_event_index.map_or(0, |i| i + 1);

if self.events_v2.len() == index {
self.events_v2
.append(&StorableEvent::new(event.clone(), index))
.unwrap();
}

self.events.push_back(IndexedEvent {
index,
name: event.name,
Expand All @@ -39,10 +67,89 @@ impl Events {
pub fn stats(&self) -> EventsStats {
EventsStats {
latest_event_index: self.latest_event_index,
latest_event_index_in_stable_memory: self.events_v2.iter().last().map(|e| e.index),
}
}
}

impl Default for Events {
fn default() -> Self {
Events {
events: VecDeque::new(),
events_v2: init_events(),
latest_event_index: None,
}
}
}

fn init_events() -> StableLog<StorableEvent, Memory, Memory> {
StableLog::init(get_events_index_memory(), get_events_data_memory()).unwrap()
}

pub struct EventsStats {
pub latest_event_index: Option<u64>,
pub latest_event_index_in_stable_memory: Option<u64>,
}

#[derive(Serialize, Deserialize)]
struct StorableEvent {
#[serde(rename = "i")]
index: u64,
#[serde(rename = "n")]
name: String,
#[serde(rename = "t")]
timestamp: TimestampMillis,
#[serde(rename = "u", default, skip_serializing_if = "Option::is_none")]
user: Option<String>,
#[serde(rename = "s", default, skip_serializing_if = "Option::is_none")]
source: Option<String>,
#[serde(
rename = "p",
default,
skip_serializing_if = "is_empty_slice",
with = "serde_bytes"
)]
payload: Vec<u8>,
}

impl StorableEvent {
fn new(event: IdempotentEvent, index: u64) -> StorableEvent {
StorableEvent {
index,
name: event.name,
timestamp: event.timestamp,
user: event.user,
source: event.source,
payload: event.payload,
}
}
}

impl From<StorableEvent> for IndexedEvent {
fn from(value: StorableEvent) -> Self {
IndexedEvent {
index: value.index,
name: value.name,
timestamp: value.timestamp,
user: value.user,
source: value.source,
payload: value.payload,
}
}
}

impl Storable for StorableEvent {
fn to_bytes(&self) -> Cow<[u8]> {
Cow::Owned(rmp_serde::to_vec_named(&self).unwrap())
}

fn from_bytes(bytes: Cow<[u8]>) -> Self {
rmp_serde::from_slice(bytes.as_ref()).unwrap()
}

const BOUND: Bound = Bound::Unbounded;
}

fn is_empty_slice<T>(vec: &[T]) -> bool {
vec.is_empty()
}
1 change: 1 addition & 0 deletions rs/canister/impl/src/queries/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ fn events(args: EventsArgs) -> EventsResponse {
EventsResponse {
events,
latest_event_index: stats.latest_event_index,
latest_event_index_in_stable_memory: stats.latest_event_index_in_stable_memory,
}
})
}

0 comments on commit 7ce70d0

Please sign in to comment.