Skip to content

Commit

Permalink
Sharded in-memory store
Browse files Browse the repository at this point in the history
  • Loading branch information
mdecimus committed Dec 27, 2024
1 parent c7499ab commit 3530b66
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 24 deletions.
1 change: 1 addition & 0 deletions crates/store/src/backend/composite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@
#[cfg(any(feature = "postgres", feature = "mysql"))]
pub mod read_replica;
pub mod sharded_blob;
pub mod sharded_lookup;
2 changes: 1 addition & 1 deletion crates/store/src/backend/composite/sharded_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl ShardedBlob {

#[inline(always)]
fn get_store(&self, key: &[u8]) -> &BlobBackend {
&self.stores[key.first().copied().unwrap_or_default() as usize % self.stores.len()]
&self.stores[xxhash_rust::xxh3::xxh3_64(key) as usize % self.stores.len()]
}

pub async fn get_blob(
Expand Down
176 changes: 176 additions & 0 deletions crates/store/src/backend/composite/sharded_lookup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <[email protected]>
*
* SPDX-License-Identifier: LicenseRef-SEL
*
* This file is subject to the Stalwart Enterprise License Agreement (SEL) and
* is NOT open source software.
*
*/

use utils::config::{utils::AsKey, Config};

use crate::{
dispatch::lookup::{KeyValue, LookupKey},
Deserialize, InMemoryStore, Stores, Value,
};

#[derive(Debug)]
pub struct ShardedInMemory {
pub stores: Vec<InMemoryStore>,
}

impl ShardedInMemory {
pub fn open(config: &mut Config, prefix: impl AsKey, stores: &Stores) -> Option<Self> {
let prefix = prefix.as_key();
let store_ids = config
.values((&prefix, "stores"))
.map(|(_, v)| v.to_string())
.collect::<Vec<_>>();

let mut in_memory_stores = Vec::with_capacity(store_ids.len());
for store_id in store_ids {
if let Some(store) = stores
.in_memory_stores
.get(&store_id)
.filter(|store| store.is_redis())
{
in_memory_stores.push(store.clone());
} else {
config.new_build_error(
(&prefix, "stores"),
format!("In-memory store {store_id} not found"),
);
return None;
}
}
if !in_memory_stores.is_empty() {
Some(Self {
stores: in_memory_stores,
})
} else {
config.new_build_error((&prefix, "stores"), "No in-memory stores specified");
None
}
}

#[inline(always)]
fn get_store(&self, key: &[u8]) -> &InMemoryStore {
&self.stores[xxhash_rust::xxh3::xxh3_64(key) as usize % self.stores.len()]
}

pub async fn key_set(&self, kv: KeyValue<Vec<u8>>) -> trc::Result<()> {
Box::pin(async move {
match self.get_store(&kv.key) {
#[cfg(feature = "redis")]
InMemoryStore::Redis(store) => store.key_set(&kv.key, &kv.value, kv.expires).await,
InMemoryStore::Static(_) => Err(trc::StoreEvent::NotSupported.into_err()),
_ => Err(trc::StoreEvent::NotSupported.into_err()),
}
})
.await
}

pub async fn counter_incr(&self, kv: KeyValue<i64>) -> trc::Result<i64> {
Box::pin(async move {
match self.get_store(&kv.key) {
#[cfg(feature = "redis")]
InMemoryStore::Redis(store) => store.key_incr(&kv.key, kv.value, kv.expires).await,
InMemoryStore::Static(_) => Err(trc::StoreEvent::NotSupported.into_err()),
_ => Err(trc::StoreEvent::NotSupported.into_err()),
}
})
.await
}

pub async fn key_delete(&self, key: impl Into<LookupKey<'_>>) -> trc::Result<()> {
let key_ = key.into();
let key = key_.as_bytes();
Box::pin(async move {
match self.get_store(key) {
#[cfg(feature = "redis")]
InMemoryStore::Redis(store) => store.key_delete(key).await,
InMemoryStore::Static(_) => Err(trc::StoreEvent::NotSupported.into_err()),
_ => Err(trc::StoreEvent::NotSupported.into_err()),
}
})
.await
}

pub async fn counter_delete(&self, key: impl Into<LookupKey<'_>>) -> trc::Result<()> {
let key_ = key.into();
let key = key_.as_bytes();
Box::pin(async move {
match self.get_store(key) {
#[cfg(feature = "redis")]
InMemoryStore::Redis(store) => store.key_delete(key).await,
InMemoryStore::Static(_) => Err(trc::StoreEvent::NotSupported.into_err()),
_ => Err(trc::StoreEvent::NotSupported.into_err()),
}
})
.await
}

pub async fn key_delete_prefix(&self, prefix: &[u8]) -> trc::Result<()> {
Box::pin(async move {
for store in &self.stores {
match store {
#[cfg(feature = "redis")]
InMemoryStore::Redis(store) => store.key_delete_prefix(prefix).await?,
InMemoryStore::Static(_) => {
return Err(trc::StoreEvent::NotSupported.into_err())
}
_ => return Err(trc::StoreEvent::NotSupported.into_err()),
}
}

Ok(())
})
.await
}

pub async fn key_get<T: Deserialize + From<Value<'static>> + std::fmt::Debug + 'static>(
&self,
key: impl Into<LookupKey<'_>>,
) -> trc::Result<Option<T>> {
let key_ = key.into();
let key = key_.as_bytes();
Box::pin(async move {
match self.get_store(key) {
#[cfg(feature = "redis")]
InMemoryStore::Redis(store) => store.key_get(key).await,
InMemoryStore::Static(_) => Err(trc::StoreEvent::NotSupported.into_err()),
_ => Err(trc::StoreEvent::NotSupported.into_err()),
}
})
.await
}

pub async fn counter_get(&self, key: impl Into<LookupKey<'_>>) -> trc::Result<i64> {
let key_ = key.into();
let key = key_.as_bytes();
Box::pin(async move {
match self.get_store(key) {
#[cfg(feature = "redis")]
InMemoryStore::Redis(store) => store.counter_get(key).await,
InMemoryStore::Static(_) => Err(trc::StoreEvent::NotSupported.into_err()),
_ => Err(trc::StoreEvent::NotSupported.into_err()),
}
})
.await
}

pub async fn key_exists(&self, key: impl Into<LookupKey<'_>>) -> trc::Result<bool> {
let key_ = key.into();
let key = key_.as_bytes();
Box::pin(async move {
match self.get_store(key) {
#[cfg(feature = "redis")]
InMemoryStore::Redis(store) => store.key_exists(key).await,
InMemoryStore::Static(_) => Err(trc::StoreEvent::NotSupported.into_err()),
_ => Err(trc::StoreEvent::NotSupported.into_err()),
}
})
.await
}
}
71 changes: 51 additions & 20 deletions crates/store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ use crate::backend::redis::RedisStore;
#[cfg(feature = "azure")]
use crate::backend::azure::AzureStore;

#[cfg(feature = "enterprise")]
enum CompositeStore {
SQLReadReplica(String),
ShardedBlob(String),
ShardedInMemory(String),
}

impl Stores {
pub async fn parse_all(config: &mut Config) -> Self {
let mut stores = Self::parse(config).await;
Expand All @@ -60,8 +67,8 @@ impl Stores {
.map(|id| id.to_string())
.collect::<Vec<_>>();

for id in store_ids {
let id = id.as_str();
for store_id in store_ids {
let id = store_id.as_str();
// Parse store
#[cfg(feature = "test_mode")]
{
Expand All @@ -78,7 +85,6 @@ impl Stores {
continue;
};
let prefix = ("store", id);
let store_id = id.to_string();
let compression_algo = config
.property_or_default::<CompressionAlgo>(("store", id, "compression"), "none")
.unwrap_or(CompressionAlgo::None);
Expand Down Expand Up @@ -213,8 +219,16 @@ impl Stores {
}
}
#[cfg(feature = "enterprise")]
"sql-read-replica" | "distributed-blob" => {
composite_stores.push((store_id, protocol));
"sql-read-replica" => {
composite_stores.push(CompositeStore::SQLReadReplica(store_id));
}
#[cfg(feature = "enterprise")]
"distributed-blob" | "sharded-blob" => {
composite_stores.push(CompositeStore::ShardedBlob(store_id));
}
#[cfg(feature = "enterprise")]
"sharded-in-memory" => {
composite_stores.push(CompositeStore::ShardedInMemory(store_id));
}
#[cfg(feature = "azure")]
"azure" => {
Expand All @@ -233,17 +247,11 @@ impl Stores {
}

#[cfg(feature = "enterprise")]
for (id, protocol) in composite_stores {
let prefix = ("store", id.as_str());
let compression = config
.property_or_default::<CompressionAlgo>(
("store", id.as_str(), "compression"),
"none",
)
.unwrap_or(CompressionAlgo::None);
match protocol.as_str() {
for composite_store in composite_stores {
match composite_store {
#[cfg(any(feature = "postgres", feature = "mysql"))]
"sql-read-replica" => {
CompositeStore::SQLReadReplica(id) => {
let prefix = ("store", id.as_str());
if let Some(db) = crate::backend::composite::read_replica::SQLReadReplica::open(
config,
prefix,
Expand All @@ -257,23 +265,46 @@ impl Stores {
self.fts_stores.insert(id.to_string(), db.clone().into());
self.blob_stores.insert(
id.to_string(),
BlobStore::from(db.clone()).with_compression(compression),
BlobStore::from(db.clone()).with_compression(
config
.property_or_default::<CompressionAlgo>(
("store", id.as_str(), "compression"),
"none",
)
.unwrap_or(CompressionAlgo::None),
),
);
self.in_memory_stores.insert(id.to_string(), db.into());
self.in_memory_stores.insert(id, db.into());
}
}
"sharded-blob" | "distributed-blob" => {
CompositeStore::ShardedBlob(id) => {
let prefix = ("store", id.as_str());
if let Some(db) = crate::backend::composite::sharded_blob::ShardedBlob::open(
config, prefix, self,
) {
let store = BlobStore {
backend: crate::BlobBackend::Sharded(db.into()),
compression,
compression: config
.property_or_default::<CompressionAlgo>(
("store", id.as_str(), "compression"),
"none",
)
.unwrap_or(CompressionAlgo::None),
};
self.blob_stores.insert(id, store);
}
}
_ => (),
CompositeStore::ShardedInMemory(id) => {
let prefix = ("store", id.as_str());
if let Some(db) =
crate::backend::composite::sharded_lookup::ShardedInMemory::open(
config, prefix, self,
)
{
self.in_memory_stores
.insert(id, InMemoryStore::Sharded(db.into()));
}
}
}
}
}
Expand Down
Loading

0 comments on commit 3530b66

Please sign in to comment.