Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove old events storage now that the migration is complete #47

Merged
merged 1 commit into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions rs/canister/api/can.did
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ type AnonymizationInitConfig = record {
};
type EventsArgs = record { start : nat64; length : nat64 };
type EventsResponse = record {
latest_event_index_v2 : opt nat64;
events : vec IndexedEvent;
latest_event_index : opt nat64;
is_v2 : bool;
};
type IdempotentEvent = record {
source : opt text;
Expand Down
2 changes: 0 additions & 2 deletions rs/canister/api/src/queries/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,4 @@ pub struct EventsArgs {
pub struct EventsResponse {
pub events: Vec<IndexedEvent>,
pub latest_event_index: Option<u64>,
pub latest_event_index_v2: Option<u64>,
pub is_v2: bool,
}
14 changes: 2 additions & 12 deletions rs/canister/impl/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ use ic_stable_structures::{
};

const UPGRADES: MemoryId = MemoryId::new(0);
const EVENTS_INDEX: MemoryId = MemoryId::new(1);
const EVENTS_DATA: MemoryId = MemoryId::new(2);
const EVENTS_V2_INDEX: MemoryId = MemoryId::new(3);
const EVENTS_V2_DATA: MemoryId = MemoryId::new(4);
const EVENTS_INDEX: MemoryId = MemoryId::new(3);
const EVENTS_DATA: MemoryId = MemoryId::new(4);
const STRING_TO_NUM_MAP: MemoryId = MemoryId::new(5);
const NUM_TO_STRING_INDEX: MemoryId = MemoryId::new(6);
const NUM_TO_STRING_DATA: MemoryId = MemoryId::new(7);
Expand All @@ -31,14 +29,6 @@ pub fn get_events_data_memory() -> Memory {
get_memory(EVENTS_DATA)
}

pub fn get_events_v2_index_memory() -> Memory {
get_memory(EVENTS_V2_INDEX)
}

pub fn get_events_v2_data_memory() -> Memory {
get_memory(EVENTS_V2_DATA)
}

pub fn get_string_to_num_map_memory() -> Memory {
get_memory(STRING_TO_NUM_MAP)
}
Expand Down
78 changes: 45 additions & 33 deletions rs/canister/impl/src/model/events.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::memory::{get_events_data_memory, get_events_index_memory, Memory};
use crate::model::string_to_num_map::StringToNumMap;
use candid::Deserialize;
use event_store_canister::{IdempotentEvent, IndexedEvent, TimestampMillis};
use ic_stable_structures::storable::Bound;
Expand All @@ -10,6 +11,7 @@ use std::borrow::Cow;
pub struct Events {
#[serde(skip, default = "init_events")]
events: StableLog<StorableEvent, Memory, Memory>,
string_to_num_map: StringToNumMap,
}

impl Events {
Expand All @@ -18,27 +20,63 @@ impl Events {
.iter()
.skip(start as usize)
.take(length as usize)
.map(|e| e.into())
.map(|e| self.hydrate(e))
.collect()
}

pub fn push(&mut self, event: IdempotentEvent) {
self.events
.append(&StorableEvent::new(event.clone(), self.events.len()))
.unwrap();
let storable = self.convert_to_storable(event, self.events.len());

self.events.append(&storable).unwrap();
}

pub fn stats(&self) -> EventsStats {
EventsStats {
latest_event_index: self.events.len().checked_sub(1),
}
}

pub fn len(&self) -> u64 {
self.events.len()
}

fn convert_to_storable(&mut self, event: IdempotentEvent, index: u64) -> StorableEvent {
StorableEvent {
index,
name: self.string_to_num_map.convert_to_num(event.name),
timestamp: event.timestamp,
user: event.user.map(|u| self.string_to_num_map.convert_to_num(u)),
source: event
.source
.map(|s| self.string_to_num_map.convert_to_num(s)),
payload: event.payload,
}
}

fn hydrate(&self, event: StorableEvent) -> IndexedEvent {
IndexedEvent {
index: event.index,
name: self
.string_to_num_map
.convert_to_string(event.name)
.unwrap_or("unknown".to_string()),
timestamp: event.timestamp,
user: event
.user
.and_then(|u| self.string_to_num_map.convert_to_string(u)),
source: event
.source
.and_then(|s| self.string_to_num_map.convert_to_string(s)),
payload: event.payload,
}
}
}

impl Default for Events {
fn default() -> Self {
Events {
events: init_events(),
string_to_num_map: StringToNumMap::default(),
}
}
}
Expand All @@ -56,13 +94,13 @@ struct StorableEvent {
#[serde(rename = "i")]
index: u64,
#[serde(rename = "n")]
name: String,
name: u32,
#[serde(rename = "t")]
timestamp: TimestampMillis,
#[serde(rename = "u", default, skip_serializing_if = "Option::is_none")]
user: Option<String>,
user: Option<u32>,
#[serde(rename = "s", default, skip_serializing_if = "Option::is_none")]
source: Option<String>,
source: Option<u32>,
#[serde(
rename = "p",
default,
Expand All @@ -72,32 +110,6 @@ struct StorableEvent {
payload: Vec<u8>,
}

impl StorableEvent {
fn new(event: IdempotentEvent, index: u64) -> StorableEvent {
StorableEvent {
index,
name: event.name,
timestamp: event.timestamp,
user: event.user,
source: event.source,
payload: event.payload,
}
}
}

impl From<StorableEvent> for IndexedEvent {
fn from(value: StorableEvent) -> Self {
IndexedEvent {
index: value.index,
name: value.name,
timestamp: value.timestamp,
user: value.user,
source: value.source,
payload: value.payload,
}
}
}

impl Storable for StorableEvent {
fn to_bytes(&self) -> Cow<[u8]> {
Cow::Owned(rmp_serde::to_vec_named(&self).unwrap())
Expand Down
128 changes: 0 additions & 128 deletions rs/canister/impl/src/model/events_v2.rs

This file was deleted.

1 change: 0 additions & 1 deletion rs/canister/impl/src/model/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod events;
pub mod events_v2;
pub mod salt;
mod string_to_num_map;
9 changes: 1 addition & 8 deletions rs/canister/impl/src/queries/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,11 @@ use ic_cdk::query;
fn events(args: EventsArgs) -> EventsResponse {
state::read(|s| {
let stats = s.events().stats();
let stats_v2 = s.events_v2().stats();
let (events, is_v2) = if stats.latest_event_index > stats_v2.latest_event_index {
(s.events().get(args.start, args.length), false)
} else {
(s.events_v2().get(args.start, args.length), true)
};
let events = s.events().get(args.start, args.length);

EventsResponse {
events,
latest_event_index: stats.latest_event_index,
latest_event_index_v2: stats_v2.latest_event_index,
is_v2,
}
})
}
20 changes: 4 additions & 16 deletions rs/canister/impl/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::env;
use crate::model::events::Events;
use crate::model::events_v2::EventsV2;
use crate::model::salt::Salt;
use candid::Principal;
use event_store_canister::{
Expand All @@ -21,9 +20,7 @@ thread_local! {
pub struct State {
push_events_whitelist: HashSet<Principal>,
read_events_whitelist: HashSet<Principal>,
events: Events,
#[serde(default)]
events_v2: EventsV2,
events_v2: Events,
event_deduper: EventDeduper,
salt: Salt,
anonymization_config: AnonymizationConfig,
Expand Down Expand Up @@ -63,8 +60,7 @@ impl State {
State {
push_events_whitelist,
read_events_whitelist,
events: Events::default(),
events_v2: EventsV2::default(),
events_v2: Events::default(),
event_deduper: EventDeduper::default(),
anonymization_config: anonymization_config.into(),
salt: Salt::default(),
Expand All @@ -89,10 +85,6 @@ impl State {
}

pub fn events(&self) -> &Events {
&self.events
}

pub fn events_v2(&self) -> &EventsV2 {
&self.events_v2
}

Expand Down Expand Up @@ -125,15 +117,11 @@ impl State {
}
}

if self.events_v2.stats().latest_event_index >= self.events.stats().latest_event_index {
self.events_v2.push(event);
} else {
self.events.push(event);
}
self.events_v2.push(event);
}

pub fn migrate_events(&mut self, count: u32) {
for event in self.events.get(self.events_v2.len(), count as u64) {
for event in self.events_v2.get(self.events_v2.len(), count as u64) {
self.events_v2.push(IdempotentEvent {
idempotency_key: 0,
name: event.name,
Expand Down
3 changes: 1 addition & 2 deletions rs/integration_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ fn push_then_read_events_succeeds() {
assert_eq!(read_response.events.len(), 5);
assert_eq!(read_response.events.first().unwrap().index, 0);
assert_eq!(read_response.events.last().unwrap().index, 4);
assert_eq!(read_response.latest_event_index, None);
assert_eq!(read_response.latest_event_index_v2, Some(9));
assert_eq!(read_response.latest_event_index, Some(9));
}

#[test_case(true, true)]
Expand Down
Loading