Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TiKV as a data store backend #668

Draft
wants to merge 20 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
637 changes: 491 additions & 146 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Key features:
- **Greylisting** to temporarily defer unknown senders.
- **Spam traps** to set up decoy email addresses that catch and analyze spam.
- **Flexible and scalable**:
- Pluggable storage backends with **RocksDB**, **FoundationDB**, **PostgreSQL**, **mySQL**, **SQLite**, **S3-Compatible**, **Redis** and **ElasticSearch** support.
- Pluggable storage backends with **RocksDB**, **FoundationDB**, **FoundationDB**, **TiKV**, **PostgreSQL**, **mySQL**, **SQLite**, **S3-Compatible**, **Redis** and **ElasticSearch** support.
- **Clustering** support with node autodiscovery and partition-tolerant failure detection.
- Built-in, **LDAP** or **SQL** authentication backend support.
- Full-text search available in 17 languages.
Expand Down
6 changes: 3 additions & 3 deletions crates/common/src/telemetry/metrics/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl Core {
metric.set_name(metric_name(counter.id().name()));
metric.set_help(counter.id().description().into());
metric.set_field_type(MetricType::COUNTER);
metric.set_metric(vec![new_counter(counter.value())]);
metric.set_metric(vec![new_counter(counter.value())].into());
metrics.push(metric);
}

Expand All @@ -38,7 +38,7 @@ impl Core {
metric.set_name(metric_name(gauge.id().name()));
metric.set_help(gauge.id().description().into());
metric.set_field_type(MetricType::GAUGE);
metric.set_metric(vec![new_gauge(gauge.get())]);
metric.set_metric(vec![new_gauge(gauge.get())].into());
metrics.push(metric);
}

Expand All @@ -48,7 +48,7 @@ impl Core {
metric.set_name(metric_name(histogram.id().name()));
metric.set_help(histogram.id().description().into());
metric.set_field_type(MetricType::HISTOGRAM);
metric.set_metric(vec![new_histogram(histogram)]);
metric.set_metric(vec![new_histogram(histogram)].into());
metrics.push(metric);
}

Expand Down
5 changes: 3 additions & 2 deletions crates/main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ tokio = { version = "1.23", features = ["full"] }
jemallocator = "0.5.0"

[features]
default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "enterprise"]
#default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "foundationdb", "enterprise"]
default = ["sqlite", "postgres", "mysql", "rocks", "tikv", "elastic", "s3", "redis", "enterprise"]
#default = ["sqlite", "postgres", "mysql", "rocks", "tikv", "elastic", "s3", "redis", "foundationdb", "enterprise"]
sqlite = ["store/sqlite"]
foundationdb = ["store/foundation", "common/foundation"]
postgres = ["store/postgres"]
mysql = ["store/mysql"]
rocks = ["store/rocks"]
tikv = ["store/tikv"]
elastic = ["store/elastic"]
s3 = ["store/s3"]
redis = ["store/redis"]
Expand Down
2 changes: 2 additions & 0 deletions crates/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ trc = { path = "../trc" }
rocksdb = { version = "0.22", optional = true, features = ["multi-threaded-cf"] }
foundationdb = { version = "0.9.0", features = ["embedded-fdb-include", "fdb-7_1"], optional = true }
rusqlite = { version = "0.32", features = ["bundled"], optional = true }
tikv-client = { version = "0.3.0", optional = true }
rust-s3 = { version = "=0.35.0-alpha.2", default-features = false, features = ["tokio-rustls-tls", "no-verify-ssl"], optional = true }
tokio = { version = "1.23", features = ["sync", "fs", "io-util"] }
r2d2 = { version = "0.8.10", optional = true }
Expand Down Expand Up @@ -57,6 +58,7 @@ elastic = ["elasticsearch", "serde_json"]
mysql = ["mysql_async", "futures"]
s3 = ["rust-s3"]
foundation = ["foundationdb", "futures"]
tikv = ["tikv-client"]
fdb-chunked-bm = []
redis = ["dep:redis", "deadpool"]
enterprise = []
Expand Down
6 changes: 6 additions & 0 deletions crates/store/src/backend/composite/distributed_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ impl DistributedBlob {
Store::MySQL(store) => store.get_blob(key, read_range).await,
#[cfg(feature = "rocks")]
Store::RocksDb(store) => store.get_blob(key, read_range).await,
#[cfg(feature = "tikv")]
Store::TiKV(store) => store.get_blob(key, read_range).await,
#[cfg(all(
feature = "enterprise",
any(feature = "postgres", feature = "mysql")
Expand Down Expand Up @@ -101,6 +103,8 @@ impl DistributedBlob {
Store::MySQL(store) => store.put_blob(key, data).await,
#[cfg(feature = "rocks")]
Store::RocksDb(store) => store.put_blob(key, data).await,
#[cfg(feature = "tikv")]
Store::TiKV(store) => store.put_blob(key, data).await,
#[cfg(all(
feature = "enterprise",
any(feature = "postgres", feature = "mysql")
Expand Down Expand Up @@ -131,6 +135,8 @@ impl DistributedBlob {
Store::MySQL(store) => store.delete_blob(key).await,
#[cfg(feature = "rocks")]
Store::RocksDb(store) => store.delete_blob(key).await,
#[cfg(feature = "tikv")]
Store::TiKV(store) => store.delete_blob(key).await,
#[cfg(all(
feature = "enterprise",
any(feature = "postgres", feature = "mysql")
Expand Down
2 changes: 2 additions & 0 deletions crates/store/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub mod rocksdb;
pub mod s3;
#[cfg(feature = "sqlite")]
pub mod sqlite;
#[cfg(feature = "tikv")]
pub mod tikv;

pub const MAX_TOKEN_LENGTH: usize = (u8::MAX >> 1) as usize;
pub const MAX_TOKEN_MASK: usize = MAX_TOKEN_LENGTH - 1;
Expand Down
167 changes: 167 additions & 0 deletions crates/store/src/backend/tikv/blob.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <[email protected]>
alvinpeters marked this conversation as resolved.
Show resolved Hide resolved
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/

use std::ops::{Bound, Range};
use roaring::RoaringBitmap;
use tikv_client::{BoundRange, Key as TikvKey};
use trc::EventType::Store;
use trc::StoreEvent;
use utils::BLOB_HASH_LEN;
use crate::{write::key::KeySerializer, SUBSPACE_BLOBS};
use super::write::chunking::{delete_chunked_value, put_chunked_value};
use super::read::chunking::get_chunked_value;
use super::{into_error, MAX_KEY_SIZE, MAX_SCAN_KEYS_SIZE, MAX_SCAN_VALUES_SIZE, MAX_VALUE_SIZE, TikvStore, MAX_CHUNKED_SIZED};

// TODO: Allow handling of more than MAX_SCAN_KEYS_SIZE

impl TikvStore {
pub(crate) async fn get_blob(
&self,
key: &[u8],
range: Range<usize>,
) -> trc::Result<Option<Vec<u8>>> {
let mut trx = self.snapshot_read().await?;

let block_start = range.start / MAX_VALUE_SIZE;
let bytes_start = range.start % MAX_VALUE_SIZE;
let block_end = (range.end / MAX_VALUE_SIZE) + 1;

let mut begin = KeySerializer::new(1 + key.len() + 2)
.write(SUBSPACE_BLOBS)
.write(key)
.write(block_start as u16)
.finalize();
let end = KeySerializer::new(1 + key.len() + 2)
.write(SUBSPACE_BLOBS)
.write(key)
.write(block_end as u16)
.write(u8::MIN) // Null byte to make the end inclusive
.finalize();

let mut blob_data_opt: Option<Vec<u8>> = None;
let mut blob_range = range.end - range.start;

'outer: loop {
let mut keys = trx.scan((begin, end.clone()), MAX_SCAN_VALUES_SIZE)
.await
.map_err(into_error)?;

let mut counter = 0;
let mut last_key = None;
while let Some(kv_pair) = keys.next() {
let key: Vec<u8> = kv_pair.0.into();
let mut value: Vec<u8> = kv_pair.1.into();

if let Some(blob_data) = &mut blob_data_opt {
blob_data.extend_from_slice(
value
.get(
..std::cmp::min(
blob_range.saturating_sub(blob_data.len()),
value.len(),
),
)
.unwrap_or(&[]),
);
if blob_data.len() == blob_range {
break 'outer;
}
} else {
let blob_size = if blob_range <= (5 * (1 << 20)) {
blob_range
} else if value.len() == MAX_VALUE_SIZE {
MAX_VALUE_SIZE * 2
} else {
value.len()
};
let mut blob_data = Vec::with_capacity(blob_size);
blob_data.extend_from_slice(
value
.get(bytes_start..std::cmp::min(bytes_start + blob_range, value.len()))
.unwrap_or(&[]),
);
if blob_data.len() == blob_range {
return Ok(Some(blob_data));
}
blob_data_opt = Some(blob_data)
}

last_key = Some(key);
}

if counter == MAX_SCAN_VALUES_SIZE {
// Guaranteed to have the last key
begin = last_key.unwrap();
continue;
} else {
break;
}

}

Ok(blob_data_opt)
}

pub(crate) async fn put_blob(&self, key: &[u8], data: &[u8]) -> trc::Result<()> {
let mut trx = self.write_trx_with_backoff().await?;

for (chunk_pos, chunk_value) in data.chunks(MAX_VALUE_SIZE).enumerate() {
let chunk_key = KeySerializer::new(1 + key.len() + 2)
.write(SUBSPACE_BLOBS)
.write(key)
.write(chunk_pos as u16)
.finalize();

trx.put(chunk_key, chunk_value).await.map_err(into_error)?;
}

trx.commit().await.map_err(into_error)?;
Ok(())
}

pub(crate) async fn delete_blob(&self, key: &[u8]) -> trc::Result<bool> {
if key.len() < BLOB_HASH_LEN {
return Ok(false);
}

let begin = KeySerializer::new(1 + key.len() + 1)
.write(SUBSPACE_BLOBS)
.write(key)
.write(u16::MIN)
.finalize();
let end = KeySerializer::new(1 + key.len() + 3)
.write(SUBSPACE_BLOBS)
.write(key)
.write(u16::MAX)
.write(u8::MIN) // Null byte to make the end inclusive
.finalize();

let range = BoundRange::from((begin, end));

let mut trx = self.write_trx_with_backoff().await?;

loop {
let keys = trx
.scan_keys(range.clone(), MAX_SCAN_KEYS_SIZE)
.await
.map_err(into_error)?;

let mut count = 0;
for key in keys {
count += 1;
trx.delete(key).await.map_err(into_error)?;
}

if count < MAX_SCAN_KEYS_SIZE {
break;
}
}

trx.commit().await.map_err(into_error)?;

Ok(true)
}
}
96 changes: 96 additions & 0 deletions crates/store/src/backend/tikv/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <[email protected]>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::time::Duration;
use tikv_client::{Backoff, CheckLevel, RetryOptions, TransactionClient, TransactionOptions};
use utils::config::{utils::AsKey, Config};
use super::TikvStore;

impl TikvStore {
pub async fn open(config: &mut Config, prefix: impl AsKey) -> Option<Self> {
let prefix = prefix.as_key();

// Parse as SocketAddr but don't use it. TransactionClient takes only a String vector
let pd_endpoints= config.properties::<String>((&prefix, "pd-endpoints"))
.into_iter()
.map(|(_key, addr_str)| addr_str)
.collect::<Vec<String>>();

let trx_client = TransactionClient::new(pd_endpoints.clone())
.await
.map_err(|err| {
config.new_build_error(
prefix.as_str(),
format!("Failed to create TiKV database: {err:?}"),
)
})
.ok()?;

let backoff_min_delay = config
.property::<Duration>((&prefix, "transaction.backoff-min-delay"))
.unwrap_or_else(|| Duration::from_millis(2));

let backoff_max_delay = config
.property::<Duration>((&prefix, "transaction.backoff-max-delay"))
.unwrap_or_else(|| Duration::from_millis(500));

let max_attempts = config
.property::<u32>((&prefix, "transaction.backoff-retry-limit"))
.unwrap_or_else(|| 10);

let backoff = if let Some(backoff_type) = config
.property::<String>((&prefix, "transaction.backoff-type")) {
match backoff_type.as_str() {
"expo-jitter" => Backoff::no_jitter_backoff(
backoff_min_delay.as_millis() as u64,
backoff_max_delay.as_millis() as u64,
max_attempts
),
"equal-jitter" => Backoff::equal_jitter_backoff(
backoff_min_delay.as_millis() as u64,
backoff_max_delay.as_millis() as u64,
max_attempts
),
"decor-jitter" => Backoff::decorrelated_jitter_backoff(
backoff_min_delay.as_millis() as u64,
backoff_max_delay.as_millis() as u64,
max_attempts
),
"none" => Backoff::no_backoff(),
// Default
"full-jitter" | &_ => Backoff::full_jitter_backoff(
backoff_min_delay.as_millis() as u64,
backoff_max_delay.as_millis() as u64,
max_attempts
),
}
} else {
// Default
Backoff::full_jitter_backoff(
backoff_min_delay.as_millis() as u64,
backoff_max_delay.as_millis() as u64,
max_attempts
)
};

let write_trx_options = TransactionOptions::new_pessimistic()
.drop_check(CheckLevel::Warn)
.retry_options(RetryOptions::new(backoff.clone(), backoff.clone()));

let read_trx_options = TransactionOptions::new_optimistic()
.drop_check(CheckLevel::None)
.retry_options(RetryOptions::none())
.read_only();

let store = Self {
trx_client,
write_trx_options,
read_trx_options,
backoff,
};

Some(store)
}
}
Loading