diff --git a/Cargo.lock b/Cargo.lock index 855bac7d8..97bb33ffe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2106,6 +2106,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "errno" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "event-listener" version = "2.5.3" @@ -2192,6 +2202,12 @@ dependencies = [ "instant", ] +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "ff" version = "0.13.0" @@ -2445,7 +2461,7 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" dependencies = [ - "fastrand", + "fastrand 1.9.0", "futures-core", "futures-io", "memchr", @@ -3015,6 +3031,22 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.5.2", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.10" @@ -3793,6 +3825,12 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" +[[package]] +name = "linux-raw-sys" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" + [[package]] name = "litemap" version = "0.7.4" @@ -4193,6 +4231,23 @@ dependencies = [ "zstd", ] +[[package]] +name = "native-tls" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "new_debug_unreachable" version = "1.0.6" @@ -5513,12 +5568,14 @@ dependencies = [ "http-body-util", "hyper 1.5.2", "hyper-rustls 0.27.5", + "hyper-tls", "hyper-util", "ipnet", "js-sys", "log", "mime", "mime_guess", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -5531,6 +5588,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 1.0.2", "tokio", + "tokio-native-tls", "tokio-rustls 0.26.1", "tokio-util", "tower-service", @@ -5677,6 +5735,43 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "rqlite-rs" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a41582c369a716965927e8f505296778f14a44d45cfee3602eaf083c4d0113e8" +dependencies = [ + "base64 0.22.1", + "reqwest 0.12.9", + "rqlite-rs-core", + "rqlite-rs-macros", + "serde", + "serde_json", + "thiserror 1.0.69", +] + +[[package]] +name = "rqlite-rs-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec4b2a118f208858ffac4b0be3d2cb7434b8f622d3048479235fef000ce1dc2" +dependencies = [ + "serde", + "serde_json", + "thiserror 1.0.69", +] + +[[package]] +name = "rqlite-rs-macros" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c067a7c8b0dea8e29478e82dc0abae18a8f633fda317b01dfa68cba49ad7de1" +dependencies = [ + "quote", + "rqlite-rs-core", + "syn 2.0.91", +] + [[package]] name = "rsa" version = "0.9.7" @@ -5846,6 +5941,19 @@ dependencies = [ "nom", ] +[[package]] +name = "rustix" +version = "0.38.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f93dc38ecbab2eb790ff964bb77fa94faf256fd3e73285fd7ba0903b76bedb85" +dependencies = [ + "bitflags 2.6.0", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.59.0", +] + [[package]] name = "rustls" version = "0.21.12" @@ -6642,6 +6750,7 @@ dependencies = [ "ring 0.17.8", "roaring", "rocksdb", + "rqlite-rs", "rusqlite", "rust-s3", "rustls 0.23.20", @@ -6795,6 +6904,19 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "tempfile" +version = "3.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" +dependencies = [ + "cfg-if", + "fastrand 2.3.0", + "once_cell", + "rustix", + "windows-sys 0.59.0", +] + [[package]] name = "term" version = "0.7.0" @@ -7005,6 +7127,16 @@ dependencies = [ "syn 2.0.91", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-postgres" version = "0.7.12" diff --git a/README.md b/README.md index a9c640d46..785d2c95f 100644 --- a/README.md +++ b/README.md @@ -58,12 +58,12 @@ Key features: - **Greylisting** to temporarily defer unknown senders. - **Spam traps** to set up decoy email addresses that catch and analyze spam. - **Flexible and scalable**: - - Pluggable storage backends with **RocksDB**, **FoundationDB**, **PostgreSQL**, **mySQL**, **SQLite**, **S3-Compatible**, **Azure**, **Redis** and **ElasticSearch** support. + - Pluggable storage backends with **RocksDB**, **FoundationDB**, **PostgreSQL**, **mySQL**, **SQLite**,, **rqlite**, **S3-Compatible**, **Azure**, **Redis** and **ElasticSearch** support. - **Clustering** support with node autodiscovery and partition-tolerant failure detection. - Full-text search available in 17 languages. - Sieve scripting language with support for all [registered extensions](https://www.iana.org/assignments/sieve-extensions/sieve-extensions.xhtml). - Email aliases, mailing lists, subaddressing and catch-all addresses support. - - Automatic account configuration and discovery with [autoconfig](https://www.ietf.org/id/draft-bucksch-autoconfig-02.html) and [autodiscover](https://learn.microsoft.com/en-us/exchange/architecture/client-access/autodiscover?view=exchserver-2019). + - Automatic account configuration and discovery with [autoconfig](https://www.ietf.org/id/draft-bucksch-autoconfig-02.html) and [autodiscover](https://learn.microsoft.com/en-us/exchange/architecture/client-access/autodiscover?view=exchserver-2019). - Multi-tenancy support with domain and tenant isolation. - Disk quotas per user and tenant. - **Secure and robust**: @@ -77,7 +77,7 @@ Key features: - **OpenID Connect** authentication. - OAuth 2.0 authorization with [authorization code](https://www.rfc-editor.org/rfc/rfc8628) and [device authorization](https://www.rfc-editor.org/rfc/rfc8628) flows. - **LDAP**, **OIDC**, **SQL** or built-in authentication backend support. - - Two-factor authentication with Time-based One-Time Passwords (`2FA-TOTP`) + - Two-factor authentication with Time-based One-Time Passwords (`2FA-TOTP`) - Application passwords (App Passwords). - Roles and permissions. - Access Control Lists (ACLs). @@ -145,8 +145,8 @@ If you find the project useful you can help by [becoming a sponsor](https://open This project is dual-licensed under the **GNU Affero General Public License v3.0** (AGPL-3.0; as published by the Free Software Foundation) and the **Stalwart Enterprise License v1 (SELv1)**: -- The [GNU Affero General Public License v3.0](./LICENSES/AGPL-3.0-only.txt) is a free software license that ensures your freedom to use, modify, and distribute the software, with the condition that any modified versions of the software must also be distributed under the same license. -- The [Stalwart Enterprise License v1 (SELv1)](./LICENSES/LicenseRef-SEL.txt) is a proprietary license designed for commercial use. It offers additional features and greater flexibility for businesses that do not wish to comply with the AGPL-3.0 license requirements. +- The [GNU Affero General Public License v3.0](./LICENSES/AGPL-3.0-only.txt) is a free software license that ensures your freedom to use, modify, and distribute the software, with the condition that any modified versions of the software must also be distributed under the same license. +- The [Stalwart Enterprise License v1 (SELv1)](./LICENSES/LicenseRef-SEL.txt) is a proprietary license designed for commercial use. It offers additional features and greater flexibility for businesses that do not wish to comply with the AGPL-3.0 license requirements. Each file in this project contains a license notice at the top, indicating the applicable license(s). The license notice follows the [REUSE guidelines](https://reuse.software/) to ensure clarity and consistency. The full text of each license is available in the [LICENSES](./LICENSES/) directory. diff --git a/crates/main/Cargo.toml b/crates/main/Cargo.toml index 264d902d6..5883158b6 100644 --- a/crates/main/Cargo.toml +++ b/crates/main/Cargo.toml @@ -34,9 +34,10 @@ tokio = { version = "1.23", features = ["full"] } jemallocator = "0.5.0" [features] -#default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "azure", "enterprise"] +#default = ["sqlite", "rqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "azure", "enterprise"] default = ["rocks", "enterprise"] sqlite = ["store/sqlite"] +rqlite = ["store/rqlite"] foundationdb = ["store/foundation", "common/foundation"] postgres = ["store/postgres"] mysql = ["store/mysql"] diff --git a/crates/store/Cargo.toml b/crates/store/Cargo.toml index 537ab8055..9dbb3ef8c 100644 --- a/crates/store/Cargo.toml +++ b/crates/store/Cargo.toml @@ -11,6 +11,7 @@ trc = { path = "../trc" } rocksdb = { version = "0.23", optional = true, features = ["multi-threaded-cf"] } foundationdb = { version = "0.9.0", features = ["embedded-fdb-include", "fdb-7_1"], optional = true } rusqlite = { version = "0.32", features = ["bundled"], optional = true } +rqlite-rs = { version = "0.5.0", optional = true } rust-s3 = { version = "=0.35.0-alpha.2", default-features = false, features = ["tokio-rustls-tls", "no-verify-ssl"], optional = true } azure_core = { version = "0.21.0", optional = true } azure_storage = { version = "0.21.0", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"], optional = true } @@ -56,6 +57,7 @@ tokio = { version = "1.23", features = ["full"] } [features] rocks = ["rocksdb", "rayon", "num_cpus"] sqlite = ["rusqlite", "rayon", "r2d2", "num_cpus", "lru-cache"] +rqlite = ["rqlite-rs"] postgres = ["tokio-postgres", "deadpool-postgres", "tokio-rustls", "rustls", "ring", "rustls-pki-types", "futures", "bytes"] elastic = ["elasticsearch", "serde_json"] mysql = ["mysql_async", "futures"] @@ -67,5 +69,3 @@ redis = ["dep:redis", "deadpool"] enterprise = [] test_mode = [] - - diff --git a/crates/store/src/backend/composite/sharded_blob.rs b/crates/store/src/backend/composite/sharded_blob.rs index 99088f4b0..960891698 100644 --- a/crates/store/src/backend/composite/sharded_blob.rs +++ b/crates/store/src/backend/composite/sharded_blob.rs @@ -63,6 +63,8 @@ impl ShardedBlob { BlobBackend::Store(store) => match store { #[cfg(feature = "sqlite")] Store::SQLite(store) => store.get_blob(key, read_range).await, + #[cfg(feature = "rqlite")] + Store::RQLite(store) => store.get_blob(key, read_range).await, #[cfg(feature = "foundation")] Store::FoundationDb(store) => store.get_blob(key, read_range).await, #[cfg(feature = "postgres")] @@ -95,6 +97,8 @@ impl ShardedBlob { BlobBackend::Store(store) => match store { #[cfg(feature = "sqlite")] Store::SQLite(store) => store.put_blob(key, data).await, + #[cfg(feature = "rqlite")] + Store::RQLite(store) => store.put_blob(key, data).await, #[cfg(feature = "foundation")] Store::FoundationDb(store) => store.put_blob(key, data).await, #[cfg(feature = "postgres")] @@ -127,6 +131,8 @@ impl ShardedBlob { BlobBackend::Store(store) => match store { #[cfg(feature = "sqlite")] Store::SQLite(store) => store.delete_blob(key).await, + #[cfg(feature = "rqlite")] + Store::RQLite(store) => store.delete_blob(key).await, #[cfg(feature = "foundation")] Store::FoundationDb(store) => store.delete_blob(key).await, #[cfg(feature = "postgres")] diff --git a/crates/store/src/backend/mod.rs b/crates/store/src/backend/mod.rs index 909021dcf..199874239 100644 --- a/crates/store/src/backend/mod.rs +++ b/crates/store/src/backend/mod.rs @@ -23,6 +23,8 @@ pub mod postgres; pub mod redis; #[cfg(feature = "rocks")] pub mod rocksdb; +#[cfg(feature = "rqlite")] +pub mod rqlite; #[cfg(feature = "s3")] pub mod s3; #[cfg(feature = "sqlite")] diff --git a/crates/store/src/backend/rqlite/blob.rs b/crates/store/src/backend/rqlite/blob.rs new file mode 100644 index 000000000..f407ef69c --- /dev/null +++ b/crates/store/src/backend/rqlite/blob.rs @@ -0,0 +1,70 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use std::ops::Range; + +use rusqlite::OptionalExtension; + +use super::{into_error, RqliteStore}; + +impl RqliteStore { + pub(crate) async fn get_blob( + &self, + key: &[u8], + range: Range, + ) -> trc::Result>> { + let conn = self.conn_pool.get().map_err(into_error)?; + self.spawn_worker(move || { + let mut result = conn + .exec(rqlite_rs::query!("SELECT v FROM t WHERE k = ?", key)) + .await + .map_err(into_error)?; + result + .first() + .map(|row| { + Ok({ + let bytes = row.get_by_index(0)?.as_bytes()?; + if range.start == 0 && range.end == usize::MAX { + bytes.to_vec() + } else { + bytes + .get(range.start..std::cmp::min(bytes.len(), range.end)) + .unwrap_or_default() + .to_vec() + } + }) + }) + .map_err(into_error) + }) + .await + } + + pub(crate) async fn put_blob(&self, key: &[u8], data: &[u8]) -> trc::Result<()> { + let conn = self.conn_pool.get().map_err(into_error)?; + self.spawn_worker(move || { + conn.exec(rqlite_rs::query!( + "INSERT OR REPLACE INTO t (k, v) VALUES (?, ?)", + key, + data + )) + .await + .map_err(into_error) + .map(|_| ()) + }) + .await + } + + pub(crate) async fn delete_blob(&self, key: &[u8]) -> trc::Result { + let conn = self.conn_pool.get().map_err(into_error)?; + self.spawn_worker(move || { + conn.exec(rqlite_rs::query!("DELETE FROM t WHERE k = ?", key)) + .await + .map_err(into_error) + .map(|_| true) + }) + .await + } +} diff --git a/crates/store/src/backend/rqlite/lookup.rs b/crates/store/src/backend/rqlite/lookup.rs new file mode 100644 index 000000000..07def9b45 --- /dev/null +++ b/crates/store/src/backend/rqlite/lookup.rs @@ -0,0 +1,156 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ +use rqlite_rs::query::arguments::RqliteArgument; +use rqlite_rs::query::{Operation, RqliteQuery}; + +use crate::{IntoRows, QueryResult, QueryType, Value}; + +use super::{into_error, RqliteStore}; + +impl RqliteStore { + pub(crate) async fn query( + &self, + query: &str, + params_: &[Value<'_>], + ) -> trc::Result { + let conn = self.conn_pool.get().map_err(into_error)?; + self.spawn_worker(move || { + let params: Vec = + params_.iter().map(|v| (&v.to_owned()).into()).collect(); + + let mut query = RqliteQuery { + query: query.to_string(), + args: params, + op: Operation::Select, + }; + + match T::query_type() { + QueryType::Execute => conn + .exec(query) + .await + .map_err(into_error)? + .map_or_else(|e| Err(into_error(e)), |r| Ok(T::from_exec(r))), + QueryType::Exists => conn + .fetch(query) + .await + .map_err(into_error)? + .first() + .map(T::from_exists) + .map_err(into_error), + QueryType::QueryOne => conn + .fetch(query) + .await + .map_err(into_error)? + .and_then(|mut rows| Ok(T::from_query_one(rows.first()?))) + .map_err(into_error), + QueryType::QueryAll => Ok(T::from_query_all( + conn.fetch(query).await.map_err(into_error)?, + )), + } + }) + .await + } +} + +impl From<&Value<'_>> for RqliteArgument { + fn from(value: &Value<'_>) -> RqliteArgument { + match value { + Value::Integer(u) => RqliteArgument::I64(*u as i64), + Value::Bool(b) => RqliteArgument::Bool(*b), + Value::Float(f) => RqliteArgument::F64(*f as f64), + Value::Text(s) => RqliteArgument::String(s.to_string()), + Value::Blob(blob) => RqliteArgument::Blob(blob.to_vec()), + Value::Null => RqliteArgument::Null, + } + } +} + +/* +impl FromSql for Value<'static> { + fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult { + Ok(match value { + rusqlite::types::ValueRef::Null => Value::Null, + rusqlite::types::ValueRef::Integer(v) => Value::Integer(v), + rusqlite::types::ValueRef::Real(v) => Value::Float(v), + rusqlite::types::ValueRef::Text(v) => { + Value::Text(String::from_utf8_lossy(v).into_owned().into()) + } + rusqlite::types::ValueRef::Blob(v) => Value::Blob(v.to_vec().into()), + }) + } +} */ + +/* +impl IntoRows for Rows<'_> { + fn into_rows(mut self) -> crate::Rows { + let column_count = self.as_ref().map(|s| s.column_count()).unwrap_or_default(); + let mut rows = crate::Rows { rows: Vec::new() }; + + while let Ok(Some(row)) = self.next() { + rows.rows.push(crate::Row { + values: (0..column_count) + .map(|idx| row.get::<_, Value>(idx).unwrap_or(Value::Null)) + .collect(), + }); + } + + rows + } + + fn into_named_rows(mut self) -> crate::NamedRows { + let (column_count, names) = self + .as_ref() + .map(|s| { + ( + s.column_count(), + s.column_names() + .into_iter() + .map(String::from) + .collect::>(), + ) + }) + .unwrap_or((0, Vec::new())); + + let mut rows = crate::NamedRows { + names, + rows: Vec::new(), + }; + + while let Ok(Some(row)) = self.next() { + rows.rows.push(crate::Row { + values: (0..column_count) + .map(|idx| row.get::<_, Value>(idx).unwrap_or(Value::Null)) + .collect(), + }); + } + + rows + } + + fn into_row(self) -> Option { + unreachable!() + } +} +*/ +/* +impl IntoRows for Option<&Row<'_>> { + fn into_row(self) -> Option { + self.map(|row| crate::Row { + values: (0..row.as_ref().column_count()) + .map(|idx| row.get::<_, Value>(idx).unwrap_or(Value::Null)) + .collect(), + }) + } + + fn into_rows(self) -> crate::Rows { + unreachable!() + } + + fn into_named_rows(self) -> crate::NamedRows { + unreachable!() + } +} +*/ diff --git a/crates/store/src/backend/rqlite/main.rs b/crates/store/src/backend/rqlite/main.rs new file mode 100644 index 000000000..cd9f7ef75 --- /dev/null +++ b/crates/store/src/backend/rqlite/main.rs @@ -0,0 +1,160 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use r2d2::Pool; +use tokio::sync::oneshot; +use utils::config::{utils::AsKey, Config}; + +use crate::*; + +use super::{into_error, pool::RqliteConnectionManager, RqliteStore}; + +impl RqliteStore { + pub async fn open(config: &mut Config, prefix: impl AsKey) -> Option { + let prefix = prefix.as_key(); + let endpoints = config + .properties::((&prefix, "endpoints")) + .into_iter() + .map(|(_key, addr_str)| addr_str) + .collect::>(); + + let db = Self { + conn_pool: Pool::builder() + .max_size( + config + .property((&prefix, "pool.max-connections")) + .unwrap_or_else(|| (num_cpus::get() * 4) as u32), + ) + .build(RqliteConnectionManager::endpoints(endpoints)) + .map_err(|err| { + config.new_build_error( + prefix.as_str(), + format!("Failed to build connection pool: {err}"), + ) + }) + .ok()?, + worker_pool: rayon::ThreadPoolBuilder::new() + .num_threads(std::cmp::max( + config + .property::((&prefix, "pool.workers")) + .filter(|v| *v > 0) + .unwrap_or_else(num_cpus::get), + 4, + )) + .build() + .map_err(|err| { + config.new_build_error( + prefix.as_str(), + format!("Failed to build worker pool: {err}"), + ) + }) + .ok()?, + }; + + if let Err(err) = db.create_tables().await { + config.new_build_error(prefix.as_str(), format!("Failed to create tables: {err}")); + } + + Some(db) + } + + pub(crate) async fn create_tables(&self) -> trc::Result<()> { + let conn = self.conn_pool.get().map_err(into_error)?; + + for table in [ + SUBSPACE_ACL, + SUBSPACE_DIRECTORY, + SUBSPACE_TASK_QUEUE, + SUBSPACE_BLOB_RESERVE, + SUBSPACE_BLOB_LINK, + SUBSPACE_LOOKUP_VALUE, + SUBSPACE_PROPERTY, + SUBSPACE_SETTINGS, + SUBSPACE_QUEUE_MESSAGE, + SUBSPACE_QUEUE_EVENT, + SUBSPACE_REPORT_OUT, + SUBSPACE_REPORT_IN, + SUBSPACE_FTS_INDEX, + SUBSPACE_LOGS, + SUBSPACE_TELEMETRY_SPAN, + SUBSPACE_TELEMETRY_METRIC, + SUBSPACE_TELEMETRY_INDEX, + ] { + let table = char::from(table); + conn.exec(rqlite_rs::query!(&format!( + "CREATE TABLE IF NOT EXISTS {table} ( + k TINYBLOB, + v MEDIUMBLOB NOT NULL, + PRIMARY KEY (k(255)) + ) ENGINE=InnoDB" + ))) + .await + .map_err(into_error)?; + } + + conn.exec(rqlite_rs::query!(&format!( + "CREATE TABLE IF NOT EXISTS {} ( + k TINYBLOB, + v LONGBLOB NOT NULL, + PRIMARY KEY (k(255)) + ) ENGINE=InnoDB", + char::from(SUBSPACE_BLOBS), + ))) + .await + .map_err(into_error)?; + + for table in [ + SUBSPACE_INDEXES, + SUBSPACE_BITMAP_ID, + SUBSPACE_BITMAP_TAG, + SUBSPACE_BITMAP_TEXT, + ] { + let table = char::from(table); + conn.exec(rqlite_rs::query!(&format!( + "CREATE TABLE IF NOT EXISTS {table} ( + k BLOB, + PRIMARY KEY (k(400)) + ) ENGINE=InnoDB" + ))) + .await + .map_err(into_error)?; + } + + for table in [SUBSPACE_COUNTER, SUBSPACE_QUOTA] { + conn.exec(rqlite_rs::query!(&format!( + "CREATE TABLE IF NOT EXISTS {} ( + k TINYBLOB, + v BIGINT NOT NULL DEFAULT 0, + PRIMARY KEY (k(255)) + ) ENGINE=InnoDB", + char::from(table) + ))) + .await + .map_err(into_error)?; + } + + Ok(()) + } + + pub async fn spawn_worker(&self, mut f: U) -> trc::Result + where + U: FnMut() -> trc::Result + Send, + V: Sync + Send + 'static, + { + let (tx, rx) = oneshot::channel(); + + self.worker_pool.scope(|s| { + s.spawn(|_| { + tx.send(f()).ok(); + }); + }); + + match rx.await { + Ok(result) => result, + Err(err) => Err(trc::EventType::Server(trc::ServerEvent::ThreadError).reason(err)), + } + } +} diff --git a/crates/store/src/backend/rqlite/mod.rs b/crates/store/src/backend/rqlite/mod.rs new file mode 100644 index 000000000..3b5962be9 --- /dev/null +++ b/crates/store/src/backend/rqlite/mod.rs @@ -0,0 +1,28 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use std::fmt::Display; + +use r2d2::Pool; + +use self::pool::RqliteConnectionManager; + +pub mod blob; +pub mod lookup; +pub mod main; +pub mod pool; +pub mod read; +pub mod write; + +pub struct RqliteStore { + pub(crate) conn_pool: Pool, + pub(crate) worker_pool: rayon::ThreadPool, +} + +#[inline(always)] +fn into_error(err: impl Display) -> trc::Error { + trc::StoreEvent::RqliteError.reason(err) +} diff --git a/crates/store/src/backend/rqlite/pool.rs b/crates/store/src/backend/rqlite/pool.rs new file mode 100644 index 000000000..02c1afb19 --- /dev/null +++ b/crates/store/src/backend/rqlite/pool.rs @@ -0,0 +1,65 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use rqlite_rs::client::RqliteClient; +use rqlite_rs::error::{ClientBuilderError, RequestError}; +use rqlite_rs::RqliteClientBuilder; +use std::fmt; + +/// An `r2d2::ManageConnection` for `rusqlite::Connection`s. +pub struct RqliteConnectionManager { + endpoints: Vec, +} + +impl fmt::Debug for RqliteConnectionManager { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let mut builder = f.debug_struct("SqliteConnectionManager"); + let _ = builder.field("endpoints", &self.endpoints); + builder.finish() + } +} + +impl RqliteConnectionManager { + /// Creates a new `RqliteConnectionManager` from endpoints. + pub fn endpoints(endpoints: Vec) -> Self { + Self { + endpoints: endpoints, + } + } +} + +fn sleeper(_: i32) -> bool { + std::thread::sleep(std::time::Duration::from_millis(200)); + true +} + +impl r2d2::ManageConnection for RqliteConnectionManager { + type Connection = RqliteClient; + type Error = ClientBuilderError; + + fn connect(&self) -> Result { + let mut client_builder = RqliteClientBuilder::new(); + + for endpoint in &self.endpoints { + client_builder = client_builder.known_host(endpoint); + } + + client_builder.build().map_err(Into::into) + } + + fn is_valid(&self, conn: &mut RqliteClient) -> Result<(), ClientBuilderError> { + Ok(()) + /*let res = conn.exec(rqlite_rs::query!("SELECT 1;")); + match res.wait().map_err(Into::into) { + Ok(_) => Ok(()), + Err(err) => Err(err) + }*/ + } + + fn has_broken(&self, _: &mut RqliteClient) -> bool { + false + } +} diff --git a/crates/store/src/backend/rqlite/read.rs b/crates/store/src/backend/rqlite/read.rs new file mode 100644 index 000000000..8dc9b74a4 --- /dev/null +++ b/crates/store/src/backend/rqlite/read.rs @@ -0,0 +1,171 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use roaring::RoaringBitmap; +use rusqlite::OptionalExtension; + +use crate::{ + write::{key::DeserializeBigEndian, BitmapClass, ValueClass}, + BitmapKey, Deserialize, IterateParams, Key, ValueKey, U32_LEN, +}; + +use super::{into_error, RqliteStore}; + +impl RqliteStore { + pub(crate) async fn get_value(&self, key: impl Key) -> trc::Result> + where + U: Deserialize + 'static, + { + let conn = self.conn_pool.get().map_err(into_error)?; + self.spawn_worker(move || { + let query = rqlite_rs::query!( + &format!("SELECT v FROM {} WHERE k = ?", char::from(key.subspace())), + key.serialize(0) + ); + match conn.fetch(query).await.map_err(into_error) { + Ok(rows) => U::deserialize(rows.get_by_index_opt(0)?.as_bytes()?) + .map_err(|err| rusqlite::Error::ToSqlConversionFailure(err.into())), + } + }) + .await + } + + pub(crate) async fn get_bitmap( + &self, + mut key: BitmapKey>, + ) -> trc::Result> { + let begin = key.serialize(0); + key.document_id = u32::MAX; + let key_len = begin.len(); + let end = key.serialize(0); + let conn = self.conn_pool.get().map_err(into_error)?; + let table = char::from(key.subspace()); + + self.spawn_worker(move || { + let mut bm = RoaringBitmap::new(); + let mut rows = conn + .fetch(rqlite_rs::query!( + &format!("SELECT k FROM {table} WHERE k >= ? AND k <= ?"), + begin, + end + )) + .await + .map_err(into_error)?; + + while let Some(row) = rows.next().map_err(into_error)? { + let key = row + .get_by_index(0) + .map_err(into_error)? + .as_bytes() + .map_err(into_error)?; + if key.len() == key_len { + bm.insert(key.deserialize_be_u32(key.len() - U32_LEN)?); + } + } + Ok(if !bm.is_empty() { Some(bm) } else { None }) + }) + .await + } + + pub(crate) async fn iterate( + &self, + params: IterateParams, + mut cb: impl for<'x> FnMut(&'x [u8], &'x [u8]) -> trc::Result + Sync + Send, + ) -> trc::Result<()> { + let conn = self.conn_pool.get().map_err(into_error)?; + + self.spawn_worker(move || { + let table = char::from(params.begin.subspace()); + let begin = params.begin.serialize(0); + let end = params.end.serialize(0); + let keys = if params.values { "k, v" } else { "k" }; + + let mut query = conn + .fetch(rqlite_rs::query!(&match (params.first, params.ascending) { + (true, true) => { + format!( + "SELECT {keys} FROM {table} WHERE k >= ? AND k <= ? ORDER BY k ASC LIMIT 1" + ) + } + (true, false) => { + format!( + "SELECT {keys} FROM {table} WHERE k >= ? AND k <= ? ORDER BY k DESC LIMIT 1" + ) + } + (false, true) => { + format!("SELECT {keys} FROM {table} WHERE k >= ? AND k <= ? ORDER BY k ASC") + } + (false, false) => { + format!( + "SELECT {keys} FROM {table} WHERE k >= ? AND k <= ? ORDER BY k DESC" + ) + } + })) + .await + .map_err(into_error)?; + let mut rows = query.query([&begin, &end]).map_err(into_error)?; + + if params.values { + while let Some(row) = rows.next().map_err(into_error)? { + let key = row + .get_by_index(0) + .map_err(into_error)? + .as_bytes() + .map_err(into_error)?; + let value = row + .get_by_index(1) + .map_err(into_error)? + .as_bytes() + .map_err(into_error)?; + + if !cb(key, value)? { + break; + } + } + } else { + while let Some(row) = rows.next().map_err(into_error)? { + if !cb( + row.get_by_index(0) + .map_err(into_error)? + .as_bytes() + .map_err(into_error)?, + b"", + )? { + break; + } + } + } + + Ok(()) + }) + .await + } + + pub(crate) async fn get_counter( + &self, + key: impl Into>> + Sync + Send, + ) -> trc::Result { + let key = key.into(); + let table = char::from(key.subspace()); + let key = key.serialize(0); + let conn = self.conn_pool.get().map_err(into_error)?; + self.spawn_worker(move || { + match conn + .fetch(rqlite_rs::query!(&format!( + "SELECT v FROM {table} WHERE k = ?" + ))) + .await + .map_err(into_error)? + .query_row([&key], |row| row.get::<_, i64>(0)) + { + Ok(value) => Ok(value), + Err(rusqlite::Error::QueryReturnedNoRows) => Ok(0), + Err(e) => Err(into_error(e)), + } + }) + .await + } +} diff --git a/crates/store/src/backend/rqlite/write.rs b/crates/store/src/backend/rqlite/write.rs new file mode 100644 index 000000000..2003df209 --- /dev/null +++ b/crates/store/src/backend/rqlite/write.rs @@ -0,0 +1,324 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use roaring::RoaringBitmap; +use rqlite_rs::query::RqliteQuery; + +use crate::{ + write::{ + key::DeserializeBigEndian, AssignedIds, Batch, BitmapClass, Operation, RandomAvailableId, + ValueOp, + }, + BitmapKey, IndexKey, Key, LogKey, SUBSPACE_COUNTER, SUBSPACE_QUOTA, U32_LEN, +}; + +use super::{into_error, RqliteStore}; + +impl RqliteStore { + pub(crate) async fn write(&self, batch: Batch) -> trc::Result { + let mut conn = self.conn_pool.get().map_err(into_error)?; + self.spawn_worker(move || { + let mut account_id = u32::MAX; + let mut collection = u8::MAX; + let mut document_id = u32::MAX; + let mut change_id = u64::MAX; + let queries: Vec = vec![]; + let mut result = AssignedIds::default(); + + for op in &batch.ops { + match op { + Operation::AccountId { + account_id: account_id_, + } => { + account_id = *account_id_; + } + Operation::Collection { + collection: collection_, + } => { + collection = *collection_; + } + Operation::DocumentId { + document_id: document_id_, + } => { + document_id = *document_id_; + } + Operation::ChangeId { + change_id: change_id_, + } => { + change_id = *change_id_; + } + Operation::Value { class, op } => { + let key = class.serialize( + account_id, + collection, + document_id, + 0, + (&result).into(), + ); + let table = char::from(class.subspace(collection)); + + match op { + ValueOp::Set(value) => { + queries.push( + rqlite_rs::query!( + &format!( + "INSERT OR REPLACE INTO {} (k, v) VALUES (?, ?)", + table + ), + key, + value.resolve(&result)?.as_ref() + ) + .await, + ); + } + ValueOp::AtomicAdd(by) => { + if *by >= 0 { + queries.push( + rqlite_rs::query!( + &format!( + concat!( + "INSERT INTO {} (k, v) VALUES (?, ?) ", + "ON CONFLICT(k) DO UPDATE SET v = v + excluded.v" + ), + table + ), + key, + *by + ) + .await, + ); + } else { + queries.push( + rqlite_rs::query!( + &format!("UPDATE {table} SET v = v + ? WHERE k = ?"), + *by, + key + ) + .await, + ); + } + } + ValueOp::AddAndGet(by) => { + // NOTE: escapes the transaction + result.push_counter_id( + conn.fetch( + rqlite_rs::query!( + &format!( + concat!( + "INSERT INTO {} (k, v) VALUES (?, ?) ", + "ON CONFLICT(k) DO UPDATE SET v = v + ", + "excluded.v RETURNING v" + ), + table + ), + key, + *by + ) + .await, + ) + .await + .map_err(into_error)? + .first() + .map(|row| row.get::<_, i64>(0)) + .map_err(into_error)?, + ); + } + ValueOp::Clear => { + queries.push(rqlite_rs::query!( + &format!("DELETE FROM {} WHERE k = ?", table), + key + )); + } + } + } + Operation::Index { field, key, set } => { + let key = IndexKey { + account_id, + collection, + document_id, + field: *field, + key, + } + .serialize(0); + + if *set { + queries.push( + rqlite_rs::query!("INSERT OR IGNORE INTO i (k) VALUES (?)", key) + .await, + ); + } else { + queries.push(rqlite_rs::query!("DELETE FROM i WHERE k = ?", key).await); + } + } + Operation::Bitmap { class, set } => { + // Find the next available document id + let is_document_id = matches!(class, BitmapClass::DocumentIds); + if *set && is_document_id && document_id == u32::MAX { + let begin = BitmapKey { + account_id, + collection, + class: BitmapClass::DocumentIds, + document_id: 0, + } + .serialize(0); + let end = BitmapKey { + account_id, + collection, + class: BitmapClass::DocumentIds, + document_id: u32::MAX, + } + .serialize(0); + let key_len = begin.len(); + + // NOTE: escapes the transaction + let rows = rqlite_rs::query!( + "SELECT k FROM b WHERE k >= ? AND k <= ?", + begin, + end + ) + .await + .map_err(into_error)?; + + let mut found_ids = RoaringBitmap::new(); + for row in rows { + let key = row + .get_by_index(0) + .map_err(into_error)? + .as_bytes() + .map_err(into_error)?; + if key.len() == key_len { + found_ids.insert(key.deserialize_be_u32(key.len() - U32_LEN)?); + } + } + + document_id = found_ids.random_available_id(); + result.push_document_id(document_id); + } + let key = class.serialize( + account_id, + collection, + document_id, + 0, + (&result).into(), + ); + let table = char::from(class.subspace()); + + if *set { + if is_document_id { + queries.push( + rqlite_rs::query!("INSERT INTO b (k) VALUES (?)", key).await, + ); + } else { + queries.push( + rqlite_rs::query!( + &format!("INSERT OR IGNORE INTO {} (k) VALUES (?)", table), + key + ) + .await, + ); + } + } else { + queries.push( + rqlite_rs::query!( + &format!("DELETE FROM {} WHERE k = ?", table), + key + ) + .await, + ); + }; + } + Operation::Log { set } => { + let key = LogKey { + account_id, + collection, + change_id, + } + .serialize(0); + + queries.push( + rqlite_rs::query!( + "INSERT OR REPLACE INTO l (k, v) VALUES (?, ?)", + key, + set.resolve(&result).map_err(into_error)?.as_ref() + ) + .await, + ); + } + Operation::AssertValue { + class, + assert_value, + } => { + let key = class.serialize( + account_id, + collection, + document_id, + 0, + (&result).into(), + ); + let table = char::from(class.subspace(collection)); + + // NOTE: escapes the transaction + let matches = conn + .fetch( + rqlite_rs::query!( + &format!("SELECT v FROM {} WHERE k = ?", table), + key + ) + .await, + ) + .await + .map_err(into_error)? + .first() + .map(|row| Ok(assert_value.matches(row.get_by_index(0)?.as_bytes()?))) + .map_err(into_error)? + .unwrap_or_else(|| assert_value.is_none()); + + if !matches { + return Err(trc::StoreEvent::AssertValueFailed.into()); + } + } + } + } + + conn.transaction(queries).await.map_err(into_error)?; + }) + .await + } + + pub(crate) async fn purge_store(&self) -> trc::Result<()> { + let conn = self.conn_pool.get().map_err(into_error)?; + self.spawn_worker(move || { + for subspace in [SUBSPACE_QUOTA, SUBSPACE_COUNTER] { + conn.exec(rqlite_rs::query!(&format!( + "DELETE FROM {} WHERE v = 0", + char::from(subspace), + ))) + .await + .map_err(into_error)?; + } + + Ok(()) + }) + .await + } + + pub(crate) async fn delete_range(&self, from: impl Key, to: impl Key) -> trc::Result<()> { + let conn = self.conn_pool.get().map_err(into_error)?; + self.spawn_worker(move || { + conn.exec(rqlite_rs::query!(&format!( + "DELETE FROM {} WHERE k >= ? AND k < ?", + char::from(from.subspace()), + ))) + .await + .map_err(into_error)? + .execute([from.serialize(0), to.serialize(0)]) + .map_err(into_error)?; + + Ok(()) + }) + .await + } +} diff --git a/crates/store/src/config.rs b/crates/store/src/config.rs index 35839b43e..b751fb47d 100644 --- a/crates/store/src/config.rs +++ b/crates/store/src/config.rs @@ -23,6 +23,9 @@ use crate::backend::mysql::MysqlStore; #[cfg(feature = "sqlite")] use crate::backend::sqlite::SqliteStore; +#[cfg(feature = "rqlite")] +use crate::backend::rqlite::RqliteStore; + #[cfg(feature = "foundation")] use crate::backend::foundationdb::FdbStore; @@ -188,6 +191,28 @@ impl Stores { self.in_memory_stores.insert(store_id.clone(), db.into()); } } + #[cfg(feature = "rqlite")] + "rqlite" => { + // Avoid opening the same store twice + if is_reload + && self + .stores + .values() + .any(|store| matches!(store, Store::RQLite(_))) + { + continue; + } + + if let Some(db) = RqliteStore::open(config, prefix).await.map(Store::from) { + self.stores.insert(store_id.clone(), db.clone()); + self.fts_stores.insert(store_id.clone(), db.clone().into()); + self.blob_stores.insert( + store_id.clone(), + BlobStore::from(db.clone()).with_compression(compression_algo), + ); + self.in_memory_stores.insert(store_id.clone(), db.into()); + } + } "fs" => { if let Some(db) = FsStore::open(config, prefix).await.map(BlobStore::from) { self.blob_stores diff --git a/crates/store/src/dispatch/blob.rs b/crates/store/src/dispatch/blob.rs index 02a9bc247..1d4d07168 100644 --- a/crates/store/src/dispatch/blob.rs +++ b/crates/store/src/dispatch/blob.rs @@ -22,6 +22,8 @@ impl BlobStore { BlobBackend::Store(store) => match store { #[cfg(feature = "sqlite")] Store::SQLite(store) => store.get_blob(key, read_range).await, + #[cfg(feature = "rqlite")] + Store::RQLite(store) => store.get_blob(key, read_range).await, #[cfg(feature = "foundation")] Store::FoundationDb(store) => store.get_blob(key, read_range).await, #[cfg(feature = "postgres")] @@ -104,6 +106,8 @@ impl BlobStore { BlobBackend::Store(store) => match store { #[cfg(feature = "sqlite")] Store::SQLite(store) => store.put_blob(key, data.as_ref()).await, + #[cfg(feature = "rqlite")] + Store::RQLite(store) => store.put_blob(key, data.as_ref()).await, #[cfg(feature = "foundation")] Store::FoundationDb(store) => store.put_blob(key, data.as_ref()).await, #[cfg(feature = "postgres")] @@ -142,6 +146,8 @@ impl BlobStore { BlobBackend::Store(store) => match store { #[cfg(feature = "sqlite")] Store::SQLite(store) => store.delete_blob(key).await, + #[cfg(feature = "rqlite")] + Store::RQLite(store) => store.delete_blob(key).await, #[cfg(feature = "foundation")] Store::FoundationDb(store) => store.delete_blob(key).await, #[cfg(feature = "postgres")] diff --git a/crates/store/src/dispatch/mod.rs b/crates/store/src/dispatch/mod.rs index b6d1c32ac..a628b70f6 100644 --- a/crates/store/src/dispatch/mod.rs +++ b/crates/store/src/dispatch/mod.rs @@ -18,6 +18,8 @@ impl Store { match self { #[cfg(feature = "sqlite")] Self::SQLite(_) => "sqlite", + #[cfg(feature = "rqlite")] + Self::RQLite(_) => "rqlite", #[cfg(feature = "foundation")] Self::FoundationDb(_) => "foundationdb", #[cfg(feature = "postgres")] diff --git a/crates/store/src/dispatch/store.rs b/crates/store/src/dispatch/store.rs index d4eaf2404..0f4672dee 100644 --- a/crates/store/src/dispatch/store.rs +++ b/crates/store/src/dispatch/store.rs @@ -43,6 +43,8 @@ impl Store { match self { #[cfg(feature = "sqlite")] Self::SQLite(store) => store.get_value(key).await, + #[cfg(feature = "rqlite")] + Self::RQLite(store) => store.get_value(key).await, #[cfg(feature = "foundation")] Self::FoundationDb(store) => store.get_value(key).await, #[cfg(feature = "postgres")] @@ -65,6 +67,8 @@ impl Store { match self { #[cfg(feature = "sqlite")] Self::SQLite(store) => store.get_bitmap(key).await, + #[cfg(feature = "rqlite")] + Self::RQLite(store) => store.get_bitmap(key).await, #[cfg(feature = "foundation")] Self::FoundationDb(store) => store.get_bitmap(key).await, #[cfg(feature = "postgres")] @@ -111,6 +115,8 @@ impl Store { let result = match self { #[cfg(feature = "sqlite")] Self::SQLite(store) => store.iterate(params, cb).await, + #[cfg(feature = "rqlite")] + Self::RQLite(store) => store.iterate(params, cb).await, #[cfg(feature = "foundation")] Self::FoundationDb(store) => store.iterate(params, cb).await, #[cfg(feature = "postgres")] @@ -140,6 +146,8 @@ impl Store { match self { #[cfg(feature = "sqlite")] Self::SQLite(store) => store.get_counter(key).await, + #[cfg(feature = "rqlite")] + Self::RQLite(store) => store.get_counter(key).await, #[cfg(feature = "foundation")] Self::FoundationDb(store) => store.get_counter(key).await, #[cfg(feature = "postgres")] @@ -230,6 +238,8 @@ impl Store { } match self { + #[cfg(feature = "rqlite")] + Sel::SQLiRe(store) => store.write(batch).await, #[cfg(feature = "sqlite")] Self::SQLite(store) => store.write(batch).await, #[cfg(feature = "foundation")] @@ -279,6 +289,8 @@ impl Store { let result = match self { #[cfg(feature = "sqlite")] Self::SQLite(store) => store.write(batch).await, + #[cfg(feature = "rqlite")] + Self::RQLite(store) => store.write(batch).await, #[cfg(feature = "foundation")] Self::FoundationDb(store) => store.write(batch).await, #[cfg(feature = "postgres")] @@ -335,6 +347,8 @@ impl Store { match self { #[cfg(feature = "sqlite")] Self::SQLite(store) => store.purge_store().await, + #[cfg(feature = "rqlite")] + Self::RQLite(store) => store.purge_store().await, #[cfg(feature = "foundation")] Self::FoundationDb(store) => store.purge_store().await, #[cfg(feature = "postgres")] @@ -354,6 +368,8 @@ impl Store { match self { #[cfg(feature = "sqlite")] Self::SQLite(store) => store.delete_range(from, to).await, + #[cfg(feature = "rqlite")] + Self::RQLite(store) => store.delete_range(from, to).await, #[cfg(feature = "foundation")] Self::FoundationDb(store) => store.delete_range(from, to).await, #[cfg(feature = "postgres")] @@ -527,6 +543,8 @@ impl Store { match self { #[cfg(feature = "sqlite")] Self::SQLite(store) => store.get_blob(key, range).await, + #[cfg(feature = "rqlite")] + Self::RQLite(store) => store.get_blob(key, range).await, #[cfg(feature = "foundation")] Self::FoundationDb(store) => store.get_blob(key, range).await, #[cfg(feature = "postgres")] @@ -546,6 +564,8 @@ impl Store { match self { #[cfg(feature = "sqlite")] Self::SQLite(store) => store.put_blob(key, data).await, + #[cfg(feature = "rqlite")] + Self::RQLite(store) => store.put_blob(key, data).await, #[cfg(feature = "foundation")] Self::FoundationDb(store) => store.put_blob(key, data).await, #[cfg(feature = "postgres")] @@ -565,6 +585,8 @@ impl Store { match self { #[cfg(feature = "sqlite")] Self::SQLite(store) => store.delete_blob(key).await, + #[cfg(feature = "rqlite")] + Self::RQLite(store) => store.delete_blob(key).await, #[cfg(feature = "foundation")] Self::FoundationDb(store) => store.delete_blob(key).await, #[cfg(feature = "postgres")] diff --git a/crates/store/src/lib.rs b/crates/store/src/lib.rs index 0b3ee6bb7..25a61f9e4 100644 --- a/crates/store/src/lib.rs +++ b/crates/store/src/lib.rs @@ -35,6 +35,9 @@ use backend::mysql::MysqlStore; #[cfg(feature = "sqlite")] use backend::sqlite::SqliteStore; +#[cfg(feature = "rqlite")] +use backend::rqlite::RqliteStore; + #[cfg(feature = "foundation")] use backend::foundationdb::FdbStore; @@ -180,6 +183,8 @@ pub struct Stores { pub enum Store { #[cfg(feature = "sqlite")] SQLite(Arc), + #[cfg(feature = "rqlite")] + RQLite(Arc), #[cfg(feature = "foundation")] FoundationDb(Arc), #[cfg(feature = "postgres")] @@ -243,6 +248,13 @@ impl From for Store { } } +#[cfg(feature = "rqlite")] +impl From for Store { + fn from(store: RqliteStore) -> Self { + Self::RQLite(Arc::new(store)) + } +} + #[cfg(feature = "foundation")] impl From for Store { fn from(store: FdbStore) -> Self { @@ -692,6 +704,8 @@ impl Store { match self { #[cfg(feature = "sqlite")] Store::SQLite(_) => true, + #[cfg(feature = "rqlite")] + Store::RQLite(_) => true, #[cfg(feature = "postgres")] Store::PostgreSQL(_) => true, #[cfg(feature = "mysql")] @@ -706,6 +720,8 @@ impl Store { match self { #[cfg(feature = "sqlite")] Store::SQLite(_) => true, + #[cfg(feature = "rqlite")] + Store::RQLite(_) => true, #[cfg(feature = "postgres")] Store::PostgreSQL(_) => true, _ => false, @@ -732,6 +748,8 @@ impl std::fmt::Debug for Store { match self { #[cfg(feature = "sqlite")] Self::SQLite(_) => f.debug_tuple("SQLite").finish(), + #[cfg(feature = "rqlite")] + Self::RQLite(_) => f.debug_tuple("RQLite").finish(), #[cfg(feature = "foundation")] Self::FoundationDb(_) => f.debug_tuple("FoundationDb").finish(), #[cfg(feature = "postgres")] diff --git a/crates/trc/src/event/description.rs b/crates/trc/src/event/description.rs index c7fc3e2bf..b9dbc55a5 100644 --- a/crates/trc/src/event/description.rs +++ b/crates/trc/src/event/description.rs @@ -1526,6 +1526,7 @@ impl StoreEvent { StoreEvent::PostgresqlError => "PostgreSQL error", StoreEvent::RocksdbError => "RocksDB error", StoreEvent::SqliteError => "SQLite error", + StoreEvent::RqliteError => "RQLite error", StoreEvent::LdapError => "LDAP error", StoreEvent::ElasticsearchError => "ElasticSearch error", StoreEvent::RedisError => "Redis error", @@ -1563,6 +1564,7 @@ impl StoreEvent { StoreEvent::PostgresqlError => "A PostgreSQL error occurred", StoreEvent::RocksdbError => "A RocksDB error occurred", StoreEvent::SqliteError => "An SQLite error occurred", + StoreEvent::RqliteError => "An RQLite error occurred", StoreEvent::LdapError => "An LDAP error occurred", StoreEvent::ElasticsearchError => "An ElasticSearch error occurred", StoreEvent::RedisError => "A Redis error occurred", diff --git a/crates/trc/src/event/level.rs b/crates/trc/src/event/level.rs index d0a47e937..9b6feddcb 100644 --- a/crates/trc/src/event/level.rs +++ b/crates/trc/src/event/level.rs @@ -27,6 +27,7 @@ impl EventType { | StoreEvent::PostgresqlError | StoreEvent::RocksdbError | StoreEvent::SqliteError + | StoreEvent::RqliteError | StoreEvent::LdapError | StoreEvent::ElasticsearchError | StoreEvent::RedisError diff --git a/crates/trc/src/event/mod.rs b/crates/trc/src/event/mod.rs index 61a1a0d48..43f951e11 100644 --- a/crates/trc/src/event/mod.rs +++ b/crates/trc/src/event/mod.rs @@ -302,7 +302,7 @@ impl StoreEvent { Self::MysqlError => "MySQL error", Self::PostgresqlError => "PostgreSQL error", Self::RocksdbError => "RocksDB error", - Self::SqliteError => "SQLite error", + Self::RqliteError => "RQLite error", Self::LdapError => "LDAP error", Self::ElasticsearchError => "ElasticSearch error", Self::RedisError => "Redis error", diff --git a/crates/trc/src/ipc/metrics.rs b/crates/trc/src/ipc/metrics.rs index ca13b63c1..fb07a9eda 100644 --- a/crates/trc/src/ipc/metrics.rs +++ b/crates/trc/src/ipc/metrics.rs @@ -436,6 +436,7 @@ impl EventType { | StoreEvent::PostgresqlError | StoreEvent::RocksdbError | StoreEvent::SqliteError + | StoreEvent::RqliteError | StoreEvent::LdapError | StoreEvent::ElasticsearchError | StoreEvent::RedisError diff --git a/crates/trc/src/lib.rs b/crates/trc/src/lib.rs index 3927caf0e..fd6ca1baa 100644 --- a/crates/trc/src/lib.rs +++ b/crates/trc/src/lib.rs @@ -820,6 +820,7 @@ pub enum StoreEvent { PostgresqlError, RocksdbError, SqliteError, + RqliteError, LdapError, ElasticsearchError, RedisError, diff --git a/crates/trc/src/serializers/binary.rs b/crates/trc/src/serializers/binary.rs index bd41386be..78367350d 100644 --- a/crates/trc/src/serializers/binary.rs +++ b/crates/trc/src/serializers/binary.rs @@ -863,6 +863,7 @@ impl EventType { EventType::Spam(SpamEvent::Dnsbl) => 562, EventType::Spam(SpamEvent::DnsblError) => 563, EventType::Spam(SpamEvent::Pyzor) => 564, + EventType::Store(StoreEvent::RqliteError) => 565, } } @@ -1466,6 +1467,7 @@ impl EventType { 562 => Some(EventType::Spam(SpamEvent::Dnsbl)), 563 => Some(EventType::Spam(SpamEvent::DnsblError)), 564 => Some(EventType::Spam(SpamEvent::Pyzor)), + 565 => Some(EventType::Store(StoreEvent::RqliteError)), _ => None, } } diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 938381c25..a21f5986c 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -5,9 +5,10 @@ edition = "2021" resolver = "2" [features] -#default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "azure", "foundationdb"] +#default = ["sqlite", "rqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "azure", "foundationdb"] default = ["rocks"] sqlite = ["store/sqlite"] +rqlite = ["store/rqlite"] foundationdb = ["store/foundation", "common/foundation"] postgres = ["store/postgres"] mysql = ["store/mysql"] diff --git a/tests/resources/acme/test_acme.sh b/tests/resources/acme/test_acme.sh index f266b917b..ccc435936 100644 --- a/tests/resources/acme/test_acme.sh +++ b/tests/resources/acme/test_acme.sh @@ -6,4 +6,4 @@ cp ./tests/resources/acme/config.toml /tmp/stalwart-temp-data/config.toml curl --request POST --data '{"ip":"192.168.5.2"}' http://localhost:8055/set-default-ipv4 -cargo run -p mail-server --no-default-features --features "sqlite foundationdb postgres mysql rocks elastic s3 redis" -- --config=/tmp/stalwart-temp-data/config.toml +cargo run -p mail-server --no-default-features --features "sqlite rqlite foundationdb postgres mysql rocks elastic s3 redis" -- --config=/tmp/stalwart-temp-data/config.toml diff --git a/tests/resources/scripts/create_test_env.sh b/tests/resources/scripts/create_test_env.sh index 434f1880e..62b30492f 100644 --- a/tests/resources/scripts/create_test_env.sh +++ b/tests/resources/scripts/create_test_env.sh @@ -1,7 +1,7 @@ #!/bin/bash BASE_DIR="/Users/me/Downloads/stalwart-test" -FEATURES="sqlite foundationdb postgres mysql rocks elastic s3 redis" +FEATURES="sqlite rqlite foundationdb postgres mysql rocks elastic s3 redis" # Delete previous tests rm -rf $BASE_DIR diff --git a/tests/src/directory/sql.rs b/tests/src/directory/sql.rs index 0eb5e1eb5..1b29d0f17 100644 --- a/tests/src/directory/sql.rs +++ b/tests/src/directory/sql.rs @@ -30,7 +30,7 @@ async fn sql_directory() { .unwrap();*/ // Obtain directory handle - for directory_id in ["sqlite", "postgresql", "mysql"] { + for directory_id in ["sqlite", "rqlite", "postgresql", "mysql"] { // Parse config let mut config = DirectoryTest::new(directory_id.into()).await;