Skip to content

Commit

Permalink
Delete in-memory keys by prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
mdecimus committed Dec 26, 2024
1 parent 1ba229a commit e5ebc5c
Show file tree
Hide file tree
Showing 13 changed files with 565 additions and 214 deletions.
312 changes: 182 additions & 130 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions crates/common/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,14 @@ pub enum HousekeeperEvent {

pub enum PurgeType {
Data(Store),
Blobs { store: Store, blob_store: BlobStore },
Lookup(InMemoryStore),
Blobs {
store: Store,
blob_store: BlobStore,
},
Lookup {
store: InMemoryStore,
prefix: Option<Vec<u8>>,
},
Account(Option<u32>),
}

Expand Down
19 changes: 18 additions & 1 deletion crates/jmap/src/api/management/principal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use std::sync::{atomic::Ordering, Arc};

use common::{auth::AccessToken, Server};
use common::{auth::AccessToken, Server, KV_BAYES_MODEL_USER};
use directory::{
backend::internal::{
lookup::DirectoryStore,
Expand Down Expand Up @@ -355,6 +355,23 @@ impl PrincipalManager for Server {
self.core.storage.fts.remove_all(account_id).await?;
}

// Delete bayes model
if self
.core
.spam
.bayes
.as_ref()
.map_or(false, |c| c.account_classify)
{
let mut key = Vec::with_capacity(std::mem::size_of::<u32>() + 1);
key.push(KV_BAYES_MODEL_USER);
key.extend_from_slice(&account_id.to_be_bytes());

if let Err(err) = self.in_memory_store().key_delete_prefix(&key).await {
trc::error!(err.details("Failed to delete user bayes model"));
}
}

// Remove entries from cache
self.inner
.data
Expand Down
61 changes: 55 additions & 6 deletions crates/jmap/src/api/management/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use common::{
auth::AccessToken,
ipc::{HousekeeperEvent, PurgeType},
manager::webadmin::Resource,
Server,
*,
};
use directory::{
backend::internal::manage::{self, ManageDirectory},
Expand Down Expand Up @@ -108,7 +108,7 @@ impl ManageStore for Server {
// Validate the access token
access_token.assert_has_permission(Permission::PurgeDataStore)?;

let store = if let Some(id) = id {
let store = if let Some(id) = id.filter(|id| *id != "default") {
if let Some(store) = self.core.storage.stores.get(id) {
store.clone()
} else {
Expand All @@ -121,11 +121,11 @@ impl ManageStore for Server {
self.housekeeper_request(HousekeeperEvent::Purge(PurgeType::Data(store)))
.await
}
(Some("purge"), Some("lookup"), id, &Method::GET) => {
(Some("purge"), Some("in-memory"), id, &Method::GET) => {
// Validate the access token
access_token.assert_has_permission(Permission::PurgeInMemoryStore)?;

let store = if let Some(id) = id {
let store = if let Some(id) = id.filter(|id| *id != "default") {
if let Some(store) = self.core.storage.lookups.get(id) {
store.clone()
} else {
Expand All @@ -135,8 +135,57 @@ impl ManageStore for Server {
self.core.storage.lookup.clone()
};

self.housekeeper_request(HousekeeperEvent::Purge(PurgeType::Lookup(store)))
.await
let prefix = match path.get(4).copied() {
Some("acme") => vec![KV_ACME].into(),
Some("oauth") => vec![KV_OAUTH].into(),
Some("rate-rcpt") => vec![KV_RATE_LIMIT_RCPT].into(),
Some("rate-scan") => vec![KV_RATE_LIMIT_SCAN].into(),
Some("rate-loiter") => vec![KV_RATE_LIMIT_LOITER].into(),
Some("rate-auth") => vec![KV_RATE_LIMIT_AUTH].into(),
Some("rate-hash") => vec![KV_RATE_LIMIT_HASH].into(),
Some("rate-contact") => vec![KV_RATE_LIMIT_CONTACT].into(),
Some("rate-jmap") => vec![KV_RATE_LIMIT_JMAP].into(),
Some("rate-jmap-auth") => vec![KV_RATE_LIMIT_JMAP_AUTH].into(),
Some("rate-http-anonymous") => vec![KV_RATE_LIMIT_HTTP_ANONYM].into(),
Some("rate-imap") => vec![KV_RATE_LIMIT_IMAP].into(),
Some("reputation-ip") => vec![KV_REPUTATION_IP].into(),
Some("reputation-from") => vec![KV_REPUTATION_FROM].into(),
Some("reputation-domain") => vec![KV_REPUTATION_DOMAIN].into(),
Some("reputation-asn") => vec![KV_REPUTATION_ASN].into(),
Some("greylist") => vec![KV_GREYLIST].into(),
Some("bayes-account") => {
if let Some(account) = path.get(5).copied() {
let account_id = self
.core
.storage
.data
.get_principal_id(decode_path_element(account).as_ref())
.await?
.ok_or_else(|| trc::ManageEvent::NotFound.into_err())?;

let mut key = Vec::with_capacity(std::mem::size_of::<u32>() + 1);
key.push(KV_BAYES_MODEL_USER);
key.extend_from_slice(&account_id.to_be_bytes());
key.into()
} else {
vec![KV_BAYES_MODEL_USER].into()
}
}
Some("bayes-global") => vec![KV_BAYES_MODEL_GLOBAL].into(),
Some("trusted-reply") => vec![KV_TRUSTED_REPLY].into(),
Some("lock-purge-account") => vec![KV_LOCK_PURGE_ACCOUNT].into(),
Some("lock-queue-message") => vec![KV_LOCK_QUEUE_MESSAGE].into(),
Some("lock-queue-report") => vec![KV_LOCK_QUEUE_REPORT].into(),
Some("lock-email-task") => vec![KV_LOCK_EMAIL_TASK].into(),
Some("lock-housekeeper") => vec![KV_LOCK_HOUSEKEEPER].into(),
_ => None,
};

self.housekeeper_request(HousekeeperEvent::Purge(PurgeType::Lookup {
store,
prefix,
}))
.await
}
(Some("purge"), Some("account"), id, &Method::GET) => {
// Validate the access token
Expand Down
18 changes: 14 additions & 4 deletions crates/jmap/src/services/housekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,10 @@ pub fn spawn_housekeeper(inner: Arc<Inner>, mut rx: mpsc::Receiver<HousekeeperEv
PurgeType::Blobs { store, blob_store }
}
PurgeStore::Lookup(in_memory_store) => {
PurgeType::Lookup(in_memory_store)
PurgeType::Lookup {
store: in_memory_store,
prefix: None,
}
}
},
idx as u32,
Expand Down Expand Up @@ -638,14 +641,15 @@ impl Purge for Server {
.collect::<Vec<_>>()
.into(),
),
PurgeType::Lookup(_) => (
PurgeType::Lookup { prefix: None, .. } => (
"in-memory",
[2u8]
.into_iter()
.chain(store_idx.to_be_bytes().into_iter())
.collect::<Vec<_>>()
.into(),
),
PurgeType::Lookup { .. } => ("in-memory-prefix", None),
PurgeType::Account(_) => ("account", None),
};
if let Some(lock_name) = &lock_name {
Expand Down Expand Up @@ -719,8 +723,14 @@ impl Purge for Server {
trc::error!(err.details("Failed to purge blob store"));
}
}
PurgeType::Lookup(store) => {
if let Err(err) = store.purge_in_memory_store().await {
PurgeType::Lookup { store, prefix } => {
if let Some(prefix) = prefix {
if let Err(err) = store.key_delete_prefix(&prefix).await {
trc::error!(err
.details("Failed to delete key prefix")
.ctx(trc::Key::Key, prefix));
}
} else if let Err(err) = store.purge_in_memory_store().await {
trc::error!(err.details("Failed to purge lookup store"));
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ resolver = "2"
utils = { path = "../utils" }
nlp = { path = "../nlp" }
trc = { path = "../trc" }
rocksdb = { version = "0.22", optional = true, features = ["multi-threaded-cf"] }
rocksdb = { version = "0.23", optional = true, features = ["multi-threaded-cf"] }
foundationdb = { version = "0.9.0", features = ["embedded-fdb-include", "fdb-7_1"], optional = true }
rusqlite = { version = "0.32", features = ["bundled"], optional = true }
rust-s3 = { version = "=0.35.0-alpha.2", default-features = false, features = ["tokio-rustls-tls", "no-verify-ssl"], optional = true }
Expand Down Expand Up @@ -44,7 +44,7 @@ serde_json = {version = "1.0.64", optional = true }
regex = "1.7.0"
flate2 = "1.0"
async-trait = "0.1.68"
redis = { version = "0.26", features = [ "tokio-comp", "tokio-rustls-comp", "tls-rustls-insecure", "tls-rustls-webpki-roots", "cluster-async"], optional = true }
redis = { version = "0.27", features = [ "tokio-comp", "tokio-rustls-comp", "tls-rustls-insecure", "tls-rustls-webpki-roots", "cluster-async"], optional = true }
deadpool = { version = "0.12", features = ["managed"], optional = true }
bincode = "1.3.3"
arc-swap = "1.6.0"
Expand Down
88 changes: 62 additions & 26 deletions crates/store/src/backend/redis/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,7 @@ use crate::Deserialize;
use super::{into_error, RedisPool, RedisStore};

impl RedisStore {
pub async fn key_set(
&self,
key: Vec<u8>,
value: Vec<u8>,
expires: Option<u64>,
) -> trc::Result<()> {
pub async fn key_set(&self, key: &[u8], value: &[u8], expires: Option<u64>) -> trc::Result<()> {
match &self.pool {
RedisPool::Single(pool) => {
self.key_set_(
Expand All @@ -39,12 +34,7 @@ impl RedisStore {
}
}

pub async fn key_incr(
&self,
key: Vec<u8>,
value: i64,
expires: Option<u64>,
) -> trc::Result<i64> {
pub async fn key_incr(&self, key: &[u8], value: i64, expires: Option<u64>) -> trc::Result<i64> {
match &self.pool {
RedisPool::Single(pool) => {
self.key_incr_(
Expand All @@ -67,7 +57,7 @@ impl RedisStore {
}
}

pub async fn key_delete(&self, key: Vec<u8>) -> trc::Result<()> {
pub async fn key_delete(&self, key: &[u8]) -> trc::Result<()> {
match &self.pool {
RedisPool::Single(pool) => {
self.key_delete_(pool.get().await.map_err(into_error)?.as_mut(), key)
Expand All @@ -80,9 +70,22 @@ impl RedisStore {
}
}

pub async fn key_delete_prefix(&self, prefix: &[u8]) -> trc::Result<()> {
match &self.pool {
RedisPool::Single(pool) => {
self.key_delete_prefix_(pool.get().await.map_err(into_error)?.as_mut(), prefix)
.await
}
RedisPool::Cluster(pool) => {
self.key_delete_prefix_(pool.get().await.map_err(into_error)?.as_mut(), prefix)
.await
}
}
}

pub async fn key_get<T: Deserialize + std::fmt::Debug + 'static>(
&self,
key: Vec<u8>,
key: &[u8],
) -> trc::Result<Option<T>> {
match &self.pool {
RedisPool::Single(pool) => {
Expand All @@ -96,7 +99,7 @@ impl RedisStore {
}
}

pub async fn counter_get(&self, key: Vec<u8>) -> trc::Result<i64> {
pub async fn counter_get(&self, key: &[u8]) -> trc::Result<i64> {
match &self.pool {
RedisPool::Single(pool) => {
self.counter_get_(pool.get().await.map_err(into_error)?.as_mut(), key)
Expand All @@ -109,7 +112,7 @@ impl RedisStore {
}
}

pub async fn key_exists(&self, key: Vec<u8>) -> trc::Result<bool> {
pub async fn key_exists(&self, key: &[u8]) -> trc::Result<bool> {
match &self.pool {
RedisPool::Single(pool) => {
self.key_exists_(pool.get().await.map_err(into_error)?.as_mut(), key)
Expand All @@ -125,7 +128,7 @@ impl RedisStore {
async fn key_get_<T: Deserialize + std::fmt::Debug + 'static>(
&self,
conn: &mut impl AsyncCommands,
key: Vec<u8>,
key: &[u8],
) -> trc::Result<Option<T>> {
if let Some(value) = redis::cmd("GET")
.arg(key)
Expand All @@ -139,7 +142,7 @@ impl RedisStore {
}
}

async fn counter_get_(&self, conn: &mut impl AsyncCommands, key: Vec<u8>) -> trc::Result<i64> {
async fn counter_get_(&self, conn: &mut impl AsyncCommands, key: &[u8]) -> trc::Result<i64> {
redis::cmd("GET")
.arg(key)
.query_async::<Option<i64>>(conn)
Expand All @@ -148,15 +151,15 @@ impl RedisStore {
.map_err(into_error)
}

async fn key_exists_(&self, conn: &mut impl AsyncCommands, key: Vec<u8>) -> trc::Result<bool> {
async fn key_exists_(&self, conn: &mut impl AsyncCommands, key: &[u8]) -> trc::Result<bool> {
conn.exists(key).await.map_err(into_error)
}

async fn key_set_(
&self,
conn: &mut impl AsyncCommands,
key: Vec<u8>,
value: Vec<u8>,
key: &[u8],
value: &[u8],
expires: Option<u64>,
) -> trc::Result<()> {
if let Some(expires) = expires {
Expand All @@ -169,26 +172,59 @@ impl RedisStore {
async fn key_incr_(
&self,
conn: &mut impl AsyncCommands,
key: Vec<u8>,
key: &[u8],
value: i64,
expires: Option<u64>,
) -> trc::Result<i64> {
if let Some(expires) = expires {
redis::pipe()
.atomic()
.incr(&key, value)
.expire(&key, expires as i64)
.incr(key, value)
.expire(key, expires as i64)
.ignore()
.query_async::<Vec<i64>>(conn)
.await
.map_err(into_error)
.map(|v| v.first().copied().unwrap_or(0))
} else {
conn.incr(&key, value).await.map_err(into_error)
conn.incr(key, value).await.map_err(into_error)
}
}

async fn key_delete_(&self, conn: &mut impl AsyncCommands, key: Vec<u8>) -> trc::Result<()> {
async fn key_delete_(&self, conn: &mut impl AsyncCommands, key: &[u8]) -> trc::Result<()> {
conn.del(key).await.map_err(into_error)
}

async fn key_delete_prefix_(
&self,
conn: &mut impl AsyncCommands,
prefix: &[u8],
) -> trc::Result<()> {
let mut pattern = Vec::with_capacity(prefix.len() + 1);
pattern.extend_from_slice(prefix);
pattern.push(b'*');

let mut cursor = 0;
loop {
let (new_cursor, keys): (u64, Vec<Vec<u8>>) = redis::cmd("SCAN")
.cursor_arg(cursor)
.arg("MATCH")
.arg(&pattern)
.arg("COUNT")
.arg(100)
.query_async(conn)
.await
.map_err(into_error)?;

if !keys.is_empty() {
conn.del::<_, ()>(&keys).await.map_err(into_error)?;
}

if new_cursor != 0 {
cursor = new_cursor;
} else {
return Ok(());
}
}
}
}
Loading

0 comments on commit e5ebc5c

Please sign in to comment.