From 3530b6625f553c90d4a9684feeeb378d8aec5c8e Mon Sep 17 00:00:00 2001 From: mdecimus Date: Fri, 27 Dec 2024 15:49:30 +0100 Subject: [PATCH] Sharded in-memory store --- crates/store/src/backend/composite/mod.rs | 1 + .../src/backend/composite/sharded_blob.rs | 2 +- .../src/backend/composite/sharded_lookup.rs | 176 ++++++++++++++++++ crates/store/src/config.rs | 71 +++++-- crates/store/src/dispatch/lookup.rs | 33 +++- crates/store/src/lib.rs | 2 + 6 files changed, 261 insertions(+), 24 deletions(-) create mode 100644 crates/store/src/backend/composite/sharded_lookup.rs diff --git a/crates/store/src/backend/composite/mod.rs b/crates/store/src/backend/composite/mod.rs index 382432c7..9c7297ab 100644 --- a/crates/store/src/backend/composite/mod.rs +++ b/crates/store/src/backend/composite/mod.rs @@ -11,3 +11,4 @@ #[cfg(any(feature = "postgres", feature = "mysql"))] pub mod read_replica; pub mod sharded_blob; +pub mod sharded_lookup; diff --git a/crates/store/src/backend/composite/sharded_blob.rs b/crates/store/src/backend/composite/sharded_blob.rs index 9b4a11ff..99088f4b 100644 --- a/crates/store/src/backend/composite/sharded_blob.rs +++ b/crates/store/src/backend/composite/sharded_blob.rs @@ -50,7 +50,7 @@ impl ShardedBlob { #[inline(always)] fn get_store(&self, key: &[u8]) -> &BlobBackend { - &self.stores[key.first().copied().unwrap_or_default() as usize % self.stores.len()] + &self.stores[xxhash_rust::xxh3::xxh3_64(key) as usize % self.stores.len()] } pub async fn get_blob( diff --git a/crates/store/src/backend/composite/sharded_lookup.rs b/crates/store/src/backend/composite/sharded_lookup.rs new file mode 100644 index 00000000..832e611d --- /dev/null +++ b/crates/store/src/backend/composite/sharded_lookup.rs @@ -0,0 +1,176 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: LicenseRef-SEL + * + * This file is subject to the Stalwart Enterprise License Agreement (SEL) and + * is NOT open source software. + * + */ + +use utils::config::{utils::AsKey, Config}; + +use crate::{ + dispatch::lookup::{KeyValue, LookupKey}, + Deserialize, InMemoryStore, Stores, Value, +}; + +#[derive(Debug)] +pub struct ShardedInMemory { + pub stores: Vec, +} + +impl ShardedInMemory { + pub fn open(config: &mut Config, prefix: impl AsKey, stores: &Stores) -> Option { + let prefix = prefix.as_key(); + let store_ids = config + .values((&prefix, "stores")) + .map(|(_, v)| v.to_string()) + .collect::>(); + + let mut in_memory_stores = Vec::with_capacity(store_ids.len()); + for store_id in store_ids { + if let Some(store) = stores + .in_memory_stores + .get(&store_id) + .filter(|store| store.is_redis()) + { + in_memory_stores.push(store.clone()); + } else { + config.new_build_error( + (&prefix, "stores"), + format!("In-memory store {store_id} not found"), + ); + return None; + } + } + if !in_memory_stores.is_empty() { + Some(Self { + stores: in_memory_stores, + }) + } else { + config.new_build_error((&prefix, "stores"), "No in-memory stores specified"); + None + } + } + + #[inline(always)] + fn get_store(&self, key: &[u8]) -> &InMemoryStore { + &self.stores[xxhash_rust::xxh3::xxh3_64(key) as usize % self.stores.len()] + } + + pub async fn key_set(&self, kv: KeyValue>) -> trc::Result<()> { + Box::pin(async move { + match self.get_store(&kv.key) { + #[cfg(feature = "redis")] + InMemoryStore::Redis(store) => store.key_set(&kv.key, &kv.value, kv.expires).await, + InMemoryStore::Static(_) => Err(trc::StoreEvent::NotSupported.into_err()), + _ => Err(trc::StoreEvent::NotSupported.into_err()), + } + }) + .await + } + + pub async fn counter_incr(&self, kv: KeyValue) -> trc::Result { + Box::pin(async move { + match self.get_store(&kv.key) { + #[cfg(feature = "redis")] + InMemoryStore::Redis(store) => store.key_incr(&kv.key, kv.value, kv.expires).await, + InMemoryStore::Static(_) => Err(trc::StoreEvent::NotSupported.into_err()), + _ => Err(trc::StoreEvent::NotSupported.into_err()), + } + }) + .await + } + + pub async fn key_delete(&self, key: impl Into>) -> trc::Result<()> { + let key_ = key.into(); + let key = key_.as_bytes(); + Box::pin(async move { + match self.get_store(key) { + #[cfg(feature = "redis")] + InMemoryStore::Redis(store) => store.key_delete(key).await, + InMemoryStore::Static(_) => Err(trc::StoreEvent::NotSupported.into_err()), + _ => Err(trc::StoreEvent::NotSupported.into_err()), + } + }) + .await + } + + pub async fn counter_delete(&self, key: impl Into>) -> trc::Result<()> { + let key_ = key.into(); + let key = key_.as_bytes(); + Box::pin(async move { + match self.get_store(key) { + #[cfg(feature = "redis")] + InMemoryStore::Redis(store) => store.key_delete(key).await, + InMemoryStore::Static(_) => Err(trc::StoreEvent::NotSupported.into_err()), + _ => Err(trc::StoreEvent::NotSupported.into_err()), + } + }) + .await + } + + pub async fn key_delete_prefix(&self, prefix: &[u8]) -> trc::Result<()> { + Box::pin(async move { + for store in &self.stores { + match store { + #[cfg(feature = "redis")] + InMemoryStore::Redis(store) => store.key_delete_prefix(prefix).await?, + InMemoryStore::Static(_) => { + return Err(trc::StoreEvent::NotSupported.into_err()) + } + _ => return Err(trc::StoreEvent::NotSupported.into_err()), + } + } + + Ok(()) + }) + .await + } + + pub async fn key_get> + std::fmt::Debug + 'static>( + &self, + key: impl Into>, + ) -> trc::Result> { + let key_ = key.into(); + let key = key_.as_bytes(); + Box::pin(async move { + match self.get_store(key) { + #[cfg(feature = "redis")] + InMemoryStore::Redis(store) => store.key_get(key).await, + InMemoryStore::Static(_) => Err(trc::StoreEvent::NotSupported.into_err()), + _ => Err(trc::StoreEvent::NotSupported.into_err()), + } + }) + .await + } + + pub async fn counter_get(&self, key: impl Into>) -> trc::Result { + let key_ = key.into(); + let key = key_.as_bytes(); + Box::pin(async move { + match self.get_store(key) { + #[cfg(feature = "redis")] + InMemoryStore::Redis(store) => store.counter_get(key).await, + InMemoryStore::Static(_) => Err(trc::StoreEvent::NotSupported.into_err()), + _ => Err(trc::StoreEvent::NotSupported.into_err()), + } + }) + .await + } + + pub async fn key_exists(&self, key: impl Into>) -> trc::Result { + let key_ = key.into(); + let key = key_.as_bytes(); + Box::pin(async move { + match self.get_store(key) { + #[cfg(feature = "redis")] + InMemoryStore::Redis(store) => store.key_exists(key).await, + InMemoryStore::Static(_) => Err(trc::StoreEvent::NotSupported.into_err()), + _ => Err(trc::StoreEvent::NotSupported.into_err()), + } + }) + .await + } +} diff --git a/crates/store/src/config.rs b/crates/store/src/config.rs index 7be6fc8a..86e744a8 100644 --- a/crates/store/src/config.rs +++ b/crates/store/src/config.rs @@ -38,6 +38,13 @@ use crate::backend::redis::RedisStore; #[cfg(feature = "azure")] use crate::backend::azure::AzureStore; +#[cfg(feature = "enterprise")] +enum CompositeStore { + SQLReadReplica(String), + ShardedBlob(String), + ShardedInMemory(String), +} + impl Stores { pub async fn parse_all(config: &mut Config) -> Self { let mut stores = Self::parse(config).await; @@ -60,8 +67,8 @@ impl Stores { .map(|id| id.to_string()) .collect::>(); - for id in store_ids { - let id = id.as_str(); + for store_id in store_ids { + let id = store_id.as_str(); // Parse store #[cfg(feature = "test_mode")] { @@ -78,7 +85,6 @@ impl Stores { continue; }; let prefix = ("store", id); - let store_id = id.to_string(); let compression_algo = config .property_or_default::(("store", id, "compression"), "none") .unwrap_or(CompressionAlgo::None); @@ -213,8 +219,16 @@ impl Stores { } } #[cfg(feature = "enterprise")] - "sql-read-replica" | "distributed-blob" => { - composite_stores.push((store_id, protocol)); + "sql-read-replica" => { + composite_stores.push(CompositeStore::SQLReadReplica(store_id)); + } + #[cfg(feature = "enterprise")] + "distributed-blob" | "sharded-blob" => { + composite_stores.push(CompositeStore::ShardedBlob(store_id)); + } + #[cfg(feature = "enterprise")] + "sharded-in-memory" => { + composite_stores.push(CompositeStore::ShardedInMemory(store_id)); } #[cfg(feature = "azure")] "azure" => { @@ -233,17 +247,11 @@ impl Stores { } #[cfg(feature = "enterprise")] - for (id, protocol) in composite_stores { - let prefix = ("store", id.as_str()); - let compression = config - .property_or_default::( - ("store", id.as_str(), "compression"), - "none", - ) - .unwrap_or(CompressionAlgo::None); - match protocol.as_str() { + for composite_store in composite_stores { + match composite_store { #[cfg(any(feature = "postgres", feature = "mysql"))] - "sql-read-replica" => { + CompositeStore::SQLReadReplica(id) => { + let prefix = ("store", id.as_str()); if let Some(db) = crate::backend::composite::read_replica::SQLReadReplica::open( config, prefix, @@ -257,23 +265,46 @@ impl Stores { self.fts_stores.insert(id.to_string(), db.clone().into()); self.blob_stores.insert( id.to_string(), - BlobStore::from(db.clone()).with_compression(compression), + BlobStore::from(db.clone()).with_compression( + config + .property_or_default::( + ("store", id.as_str(), "compression"), + "none", + ) + .unwrap_or(CompressionAlgo::None), + ), ); - self.in_memory_stores.insert(id.to_string(), db.into()); + self.in_memory_stores.insert(id, db.into()); } } - "sharded-blob" | "distributed-blob" => { + CompositeStore::ShardedBlob(id) => { + let prefix = ("store", id.as_str()); if let Some(db) = crate::backend::composite::sharded_blob::ShardedBlob::open( config, prefix, self, ) { let store = BlobStore { backend: crate::BlobBackend::Sharded(db.into()), - compression, + compression: config + .property_or_default::( + ("store", id.as_str(), "compression"), + "none", + ) + .unwrap_or(CompressionAlgo::None), }; self.blob_stores.insert(id, store); } } - _ => (), + CompositeStore::ShardedInMemory(id) => { + let prefix = ("store", id.as_str()); + if let Some(db) = + crate::backend::composite::sharded_lookup::ShardedInMemory::open( + config, prefix, self, + ) + { + self.in_memory_stores + .insert(id, InMemoryStore::Sharded(db.into())); + } + } } } } diff --git a/crates/store/src/dispatch/lookup.rs b/crates/store/src/dispatch/lookup.rs index c0dc89e2..937eaa31 100644 --- a/crates/store/src/dispatch/lookup.rs +++ b/crates/store/src/dispatch/lookup.rs @@ -20,9 +20,9 @@ use crate::{ }; pub struct KeyValue { - key: Vec, - value: T, - expires: Option, + pub key: Vec, + pub value: T, + pub expires: Option, } impl InMemoryStore { @@ -44,6 +44,8 @@ impl InMemoryStore { } #[cfg(feature = "redis")] InMemoryStore::Redis(store) => store.key_set(&kv.key, &kv.value, kv.expires).await, + #[cfg(feature = "enterprise")] + InMemoryStore::Sharded(store) => store.key_set(kv).await, InMemoryStore::Static(_) | InMemoryStore::Http(_) => { Err(trc::StoreEvent::NotSupported.into_err()) } @@ -81,6 +83,8 @@ impl InMemoryStore { } #[cfg(feature = "redis")] InMemoryStore::Redis(store) => store.key_incr(&kv.key, kv.value, kv.expires).await, + #[cfg(feature = "enterprise")] + InMemoryStore::Sharded(store) => store.counter_incr(kv).await, InMemoryStore::Static(_) | InMemoryStore::Http(_) => { Err(trc::StoreEvent::NotSupported.into_err()) } @@ -100,6 +104,8 @@ impl InMemoryStore { } #[cfg(feature = "redis")] InMemoryStore::Redis(store) => store.key_delete(key.into().as_bytes()).await, + #[cfg(feature = "enterprise")] + InMemoryStore::Sharded(store) => store.key_delete(key).await, InMemoryStore::Static(_) | InMemoryStore::Http(_) => { Err(trc::StoreEvent::NotSupported.into_err()) } @@ -119,6 +125,8 @@ impl InMemoryStore { } #[cfg(feature = "redis")] InMemoryStore::Redis(store) => store.key_delete(key.into().as_bytes()).await, + #[cfg(feature = "enterprise")] + InMemoryStore::Sharded(store) => store.counter_delete(key).await, InMemoryStore::Static(_) | InMemoryStore::Http(_) => { Err(trc::StoreEvent::NotSupported.into_err()) } @@ -156,6 +164,8 @@ impl InMemoryStore { } #[cfg(feature = "redis")] InMemoryStore::Redis(store) => store.key_delete_prefix(prefix).await, + #[cfg(feature = "enterprise")] + InMemoryStore::Sharded(store) => store.key_delete_prefix(prefix).await, InMemoryStore::Static(_) | InMemoryStore::Http(_) => { Err(trc::StoreEvent::NotSupported.into_err()) } @@ -176,6 +186,8 @@ impl InMemoryStore { .map(|value| value.and_then(|v| v.into())), #[cfg(feature = "redis")] InMemoryStore::Redis(store) => store.key_get(key.into().as_bytes()).await, + #[cfg(feature = "enterprise")] + InMemoryStore::Sharded(store) => store.key_get(key).await, InMemoryStore::Static(store) => Ok(store .get(key.into().as_str()) .map(|value| T::from(value.clone()))), @@ -197,6 +209,8 @@ impl InMemoryStore { } #[cfg(feature = "redis")] InMemoryStore::Redis(store) => store.counter_get(key.into().as_bytes()).await, + #[cfg(feature = "enterprise")] + InMemoryStore::Sharded(store) => store.counter_get(key).await, InMemoryStore::Static(_) | InMemoryStore::Http(_) => { Err(trc::StoreEvent::NotSupported.into_err()) } @@ -214,6 +228,8 @@ impl InMemoryStore { .map(|value| matches!(value, Some(LookupValue::Value(())))), #[cfg(feature = "redis")] InMemoryStore::Redis(store) => store.key_exists(key.into().as_bytes()).await, + #[cfg(feature = "enterprise")] + InMemoryStore::Sharded(store) => store.key_exists(key).await, InMemoryStore::Static(store) => Ok(store.get(key.into().as_str()).is_some()), InMemoryStore::Http(store) => Ok(store.contains(key.into().as_str())), } @@ -345,6 +361,8 @@ impl InMemoryStore { } #[cfg(feature = "redis")] InMemoryStore::Redis(_) => {} + #[cfg(feature = "enterprise")] + InMemoryStore::Sharded(_) => {} InMemoryStore::Static(_) | InMemoryStore::Http(_) => {} } @@ -357,6 +375,15 @@ impl InMemoryStore { _ => false, } } + + pub fn is_redis(&self) -> bool { + match self { + #[cfg(feature = "redis")] + InMemoryStore::Redis(_) => true, + InMemoryStore::Static(_) => false, + _ => false, + } + } } pub enum LookupKey<'x> { diff --git a/crates/store/src/lib.rs b/crates/store/src/lib.rs index 41c8a41b..0b3ee6bb 100644 --- a/crates/store/src/lib.rs +++ b/crates/store/src/lib.rs @@ -232,6 +232,8 @@ pub enum InMemoryStore { Redis(Arc), Http(Arc), Static(Arc), + #[cfg(feature = "enterprise")] + Sharded(Arc), } #[cfg(feature = "sqlite")]