Skip to content

Commit

Permalink
WIP (blocked on blob support)
Browse files Browse the repository at this point in the history
  • Loading branch information
williamdes committed Jan 1, 2025
1 parent 3d3cdd0 commit ea2cf1c
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 155 deletions.
4 changes: 2 additions & 2 deletions crates/store/src/backend/rqlite/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use std::ops::Range;

use rusqlite::OptionalExtension;

use super::{into_error, SqliteStore};
use super::{into_error, RqliteStore};

impl SqliteStore {
impl RqliteStore {
pub(crate) async fn get_blob(
&self,
key: &[u8],
Expand Down
15 changes: 10 additions & 5 deletions crates/store/src/backend/rqlite/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use rusqlite::{types::FromSql, Row, Rows, ToSql};

use crate::{IntoRows, QueryResult, QueryType, Value};

use super::{into_error, SqliteStore};
use super::{into_error, RqliteStore};

impl SqliteStore {
impl RqliteStore {
pub(crate) async fn query<T: QueryResult>(
&self,
query: &str,
Expand Down Expand Up @@ -45,6 +45,7 @@ impl SqliteStore {
}
}

/*
impl ToSql for Value<'_> {
fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
match self {
Expand All @@ -58,8 +59,9 @@ impl ToSql for Value<'_> {
)),
}
}
}
} */

/*
impl FromSql for Value<'static> {
fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
Ok(match value {
Expand All @@ -72,8 +74,9 @@ impl FromSql for Value<'static> {
rusqlite::types::ValueRef::Blob(v) => Value::Blob(v.to_vec().into()),
})
}
}
} */

/*
impl IntoRows for Rows<'_> {
fn into_rows(mut self) -> crate::Rows {
let column_count = self.as_ref().map(|s| s.column_count()).unwrap_or_default();
Expand Down Expand Up @@ -124,7 +127,8 @@ impl IntoRows for Rows<'_> {
unreachable!()
}
}

*/
/*
impl IntoRows for Option<&Row<'_>> {
fn into_row(self) -> Option<crate::Row> {
self.map(|row| crate::Row {
Expand All @@ -142,3 +146,4 @@ impl IntoRows for Option<&Row<'_>> {
unreachable!()
}
}
*/
66 changes: 23 additions & 43 deletions crates/store/src/backend/rqlite/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ use utils::config::{utils::AsKey, Config};

use crate::*;

use super::{into_error, pool::SqliteConnectionManager, SqliteStore};
use super::{into_error, pool::RqliteConnectionManager, RqliteStore};

impl SqliteStore {
pub fn open(config: &mut Config, prefix: impl AsKey) -> Option<Self> {
impl RqliteStore {
pub async fn open(config: &mut Config, prefix: impl AsKey) -> Option<Self> {
let prefix = prefix.as_key();
let endpoints = config.properties::<String>((&prefix, "endpoints"))
.into_iter()
.map(|(_key, addr_str)| addr_str)
.collect::<Vec<String>>();

let db = Self {
conn_pool: Pool::builder()
.max_size(
Expand All @@ -23,15 +28,7 @@ impl SqliteStore {
.unwrap_or_else(|| (num_cpus::get() * 4) as u32),
)
.build(
SqliteConnectionManager::file(config.value_require((&prefix, "path"))?)
.with_init(|c| {
c.execute_batch(concat!(
"PRAGMA journal_mode = WAL; ",
"PRAGMA synchronous = NORMAL; ",
"PRAGMA temp_store = memory;",
"PRAGMA busy_timeout = 30000;"
))
}),
RqliteConnectionManager::endpoints(endpoints)
)
.map_err(|err| {
config.new_build_error(
Expand All @@ -58,34 +55,14 @@ impl SqliteStore {
.ok()?,
};

if let Err(err) = db.create_tables() {
if let Err(err) = db.create_tables().await {
config.new_build_error(prefix.as_str(), format!("Failed to create tables: {err}"));
}

Some(db)
}

#[cfg(feature = "test_mode")]
pub fn open_memory() -> trc::Result<Self> {
use super::into_error;

let db = Self {
conn_pool: Pool::builder()
.max_size(1)
.build(SqliteConnectionManager::memory())
.map_err(into_error)?,
worker_pool: rayon::ThreadPoolBuilder::new()
.num_threads(num_cpus::get())
.build()
.map_err(|err| {
into_error(err).ctx(trc::Key::Reason, "Failed to build worker pool")
})?,
};
db.create_tables()?;
Ok(db)
}

pub(super) fn create_tables(&self) -> trc::Result<()> {
pub(super) async fn create_tables(&self) -> trc::Result<()> {
let conn = self.conn_pool.get().map_err(into_error)?;

for table in [
Expand All @@ -109,15 +86,16 @@ impl SqliteStore {
SUBSPACE_TELEMETRY_INDEX,
] {
let table = char::from(table);
conn.execute(
conn.exec(
rqlite_rs::query!(
&format!(
"CREATE TABLE IF NOT EXISTS {table} (
k BLOB PRIMARY KEY,
v BLOB NOT NULL
)"
),
[],
)),
)
.await
.map_err(into_error)?;
}

Expand All @@ -128,28 +106,30 @@ impl SqliteStore {
SUBSPACE_BITMAP_TEXT,
] {
let table = char::from(table);
conn.execute(
conn.exec(
rqlite_rs::query!(
&format!(
"CREATE TABLE IF NOT EXISTS {table} (
k BLOB PRIMARY KEY
)"
),
[],
))
)
.await
.map_err(into_error)?;
}

for table in [SUBSPACE_COUNTER, SUBSPACE_QUOTA] {
conn.execute(
conn.exec(
rqlite_rs::query!(
&format!(
"CREATE TABLE IF NOT EXISTS {} (
k BLOB PRIMARY KEY,
v INTEGER NOT NULL DEFAULT 0
)",
char::from(table)
),
[],
)),
)
.await
.map_err(into_error)?;
}

Expand Down
8 changes: 4 additions & 4 deletions crates/store/src/backend/rqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::fmt::Display;

use r2d2::Pool;

use self::pool::SqliteConnectionManager;
use self::pool::RqliteConnectionManager;

pub mod blob;
pub mod lookup;
Expand All @@ -17,12 +17,12 @@ pub mod pool;
pub mod read;
pub mod write;

pub struct SqliteStore {
pub(crate) conn_pool: Pool<SqliteConnectionManager>,
pub struct RqliteStore {
pub(crate) conn_pool: Pool<RqliteConnectionManager>,
pub(crate) worker_pool: rayon::ThreadPool,
}

#[inline(always)]
fn into_error(err: impl Display) -> trc::Error {
trc::StoreEvent::SqliteError.reason(err)
trc::StoreEvent::RqliteError.reason(err)
}
111 changes: 29 additions & 82 deletions crates/store/src/backend/rqlite/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,116 +4,63 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/

use rusqlite::{Connection, Error, OpenFlags};
use rqlite_rs::client::RqliteClient;
use rqlite_rs::error::{ClientBuilderError, RequestError};
use rqlite_rs::RqliteClientBuilder;
use std::fmt;
use std::path::{Path, PathBuf};

#[derive(Debug)]
enum Source {
File(PathBuf),
Memory,
}

type InitFn = dyn Fn(&mut Connection) -> Result<(), rusqlite::Error> + Send + Sync + 'static;

/// An `r2d2::ManageConnection` for `rusqlite::Connection`s.
pub struct SqliteConnectionManager {
source: Source,
flags: OpenFlags,
init: Option<Box<InitFn>>,
pub struct RqliteConnectionManager {
endpoints: Vec<String>,
}

impl fmt::Debug for SqliteConnectionManager {
impl fmt::Debug for RqliteConnectionManager {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut builder = f.debug_struct("SqliteConnectionManager");
let _ = builder.field("source", &self.source);
let _ = builder.field("flags", &self.source);
let _ = builder.field("init", &self.init.as_ref().map(|_| "InitFn"));
let _ = builder.field("endpoints", &self.endpoints);
builder.finish()
}
}

impl SqliteConnectionManager {
/// Creates a new `SqliteConnectionManager` from file.
///
/// See `rusqlite::Connection::open`
pub fn file<P: AsRef<Path>>(path: P) -> Self {
Self {
source: Source::File(path.as_ref().to_path_buf()),
flags: OpenFlags::default(),
init: None,
}
}

/// Creates a new `SqliteConnectionManager` from memory.
pub fn memory() -> Self {
impl RqliteConnectionManager {
/// Creates a new `RqliteConnectionManager` from endpoints.
pub fn endpoints(endpoints: Vec<String>) -> Self {
Self {
source: Source::Memory,
flags: OpenFlags::default(),
init: None,
endpoints: endpoints,
}
}

/// Converts `SqliteConnectionManager` into one that sets OpenFlags upon
/// connection creation.
///
/// See `rustqlite::OpenFlags` for a list of available flags.
pub fn with_flags(self, flags: OpenFlags) -> Self {
Self { flags, ..self }
}

/// Converts `SqliteConnectionManager` into one that calls an initialization
/// function upon connection creation. Could be used to set PRAGMAs, for
/// example.
///
/// ### Example
///
/// Make a `SqliteConnectionManager` that sets the `foreign_keys` pragma to
/// true for every connection.
///
/// ```rust,no_run
/// # use r2d2_sqlite::{SqliteConnectionManager};
/// let manager = SqliteConnectionManager::file("app.db")
/// .with_init(|c| c.execute_batch("PRAGMA foreign_keys=1;"));
/// ```
pub fn with_init<F>(self, init: F) -> Self
where
F: Fn(&mut Connection) -> Result<(), rusqlite::Error> + Send + Sync + 'static,
{
let init: Option<Box<InitFn>> = Some(Box::new(init));
Self { init, ..self }
}
}

fn sleeper(_: i32) -> bool {
std::thread::sleep(std::time::Duration::from_millis(200));
true
}

impl r2d2::ManageConnection for SqliteConnectionManager {
type Connection = Connection;
type Error = rusqlite::Error;
impl r2d2::ManageConnection for RqliteConnectionManager {
type Connection = RqliteClient;
type Error = ClientBuilderError;

fn connect(&self) -> Result<RqliteClient, ClientBuilderError> {
let mut client_builder = RqliteClientBuilder::new();

fn connect(&self) -> Result<Connection, Error> {
match self.source {
Source::File(ref path) => Connection::open_with_flags(path, self.flags),
Source::Memory => Connection::open_in_memory_with_flags(self.flags),
for endpoint in &self.endpoints {
client_builder = client_builder.known_host(endpoint);
}

client_builder.build()
.map_err(Into::into)
.and_then(|mut c| {
c.busy_handler(Some(sleeper))?;
match self.init {
None => Ok(c),
Some(ref init) => init(&mut c).map(|_| c),
}
})
}

fn is_valid(&self, conn: &mut Connection) -> Result<(), Error> {
conn.execute_batch("").map_err(Into::into)
fn is_valid(&self, conn: &mut RqliteClient) -> Result<(), ClientBuilderError> {
Ok(())
/*let res = conn.exec(rqlite_rs::query!("SELECT 1;"));
match res.wait().map_err(Into::into) {
Ok(_) => Ok(()),
Err(err) => Err(err)
}*/
}

fn has_broken(&self, _: &mut Connection) -> bool {
fn has_broken(&self, _: &mut RqliteClient) -> bool {
false
}
}
Loading

0 comments on commit ea2cf1c

Please sign in to comment.