Skip to content

Commit

Permalink
Statement Stream for AsyncConnection::Load (#961)
Browse files Browse the repository at this point in the history
* statement stream

* fix lifetimes on load query

* execute returning count

* transaction manager

* cleanup
  • Loading branch information
insipx authored Aug 15, 2024
1 parent 68e71a8 commit d7e509c
Show file tree
Hide file tree
Showing 12 changed files with 355 additions and 359 deletions.
3 changes: 2 additions & 1 deletion diesel-wasm-sqlite/.vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
"napi-derive": ["napi"],
"async-recursion": ["async_recursion"],
"ctor": ["ctor"],
"tokio": ["test"]
"tokio": ["test"],
"diesel": ["table"],
}
}
},
Expand Down
2 changes: 1 addition & 1 deletion diesel-wasm-sqlite/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion diesel-wasm-sqlite/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub enum SqliteType {

impl Backend for WasmSqlite {
type QueryBuilder = SqliteQueryBuilder;
type RawValue<'a> = SqliteValue<'a, 'a, 'a>;
type RawValue<'a> = SqliteValue<'a, 'a>;
type BindCollector<'a> = SqliteBindCollector<'a>;
}

Expand Down
4 changes: 2 additions & 2 deletions diesel-wasm-sqlite/src/connection/bind_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl<'a> BindCollector<'a, WasmSqlite> for SqliteBindCollector<'a> {

#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
enum OwnedSqliteBindValue {
pub enum OwnedSqliteBindValue {
String(Box<str>),
Binary(Box<[u8]>),
I32(i32),
Expand Down Expand Up @@ -229,7 +229,7 @@ impl<'a> std::convert::From<&OwnedSqliteBindValue> for InternalSqliteBindValue<'
#[derive(Debug)]
/// Sqlite bind collector data that is movable across threads
pub struct SqliteBindCollectorData {
binds: Vec<(OwnedSqliteBindValue, SqliteType)>,
pub binds: Vec<(OwnedSqliteBindValue, SqliteType)>,
}

impl MoveableBindCollector<WasmSqlite> for SqliteBindCollector<'_> {
Expand Down
138 changes: 68 additions & 70 deletions diesel-wasm-sqlite/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod raw;
mod row;
// mod serialized_database;
mod sqlite_value;
// mod statement_iterator;
mod statement_stream;
mod stmt;

pub(crate) use self::bind_collector::SqliteBindCollector;
Expand All @@ -17,15 +17,19 @@ use self::raw::RawConnection;
// use self::statement_iterator::*;
use self::stmt::{Statement, StatementUse};
use crate::query_builder::*;
use diesel::{connection::{statement_cache::StatementCacheKey, DefaultLoadingMode, LoadConnection}, deserialize::{FromSqlRow, StaticallySizedRow}, expression::QueryMetadata, query_builder::QueryBuilder as _, result::*, serialize::ToSql, sql_types::HasSqlType};
use futures::{FutureExt, TryFutureExt};
use diesel::query_builder::MoveableBindCollector;
use diesel::{connection::{statement_cache::StatementCacheKey}, query_builder::QueryBuilder as _, result::*};
use futures::future::LocalBoxFuture;
use futures::stream::LocalBoxStream;
use futures::FutureExt;
use owned_row::OwnedSqliteRow;
use statement_stream::StatementStream;
use std::future::Future;
use std::sync::{Arc, Mutex};


use diesel::{connection::{ConnectionSealed, Instrumentation, WithMetadataLookup}, query_builder::{AsQuery, QueryFragment, QueryId}, sql_types::TypeMetadata, QueryResult};
use diesel::{connection::{ConnectionSealed, Instrumentation}, query_builder::{AsQuery, QueryFragment, QueryId}, QueryResult};
pub use diesel_async::{AnsiTransactionManager, AsyncConnection, SimpleAsyncConnection, TransactionManager, stmt_cache::StmtCache};
use futures::{future::BoxFuture, stream::BoxStream};
use row::SqliteRow;

use crate::{get_sqlite_unchecked, WasmSqlite, WasmSqliteError};

Expand All @@ -35,10 +39,9 @@ pub struct WasmSqliteConnection {
// connection itself
statement_cache: StmtCache<WasmSqlite, Statement>,
pub raw_connection: RawConnection,
transaction_state: AnsiTransactionManager,
transaction_manager: AnsiTransactionManager,
// this exists for the sole purpose of implementing `WithMetadataLookup` trait
// and avoiding static mut which will be deprecated in 2024 edition
metadata_lookup: (),
instrumentation: Arc<Mutex<Option<Box<dyn Instrumentation>>>>,
}

Expand Down Expand Up @@ -69,37 +72,44 @@ impl SimpleAsyncConnection for WasmSqliteConnection {
impl AsyncConnection for WasmSqliteConnection {
type Backend = WasmSqlite;
type TransactionManager = AnsiTransactionManager;
type ExecuteFuture<'conn, 'query> = BoxFuture<'query, QueryResult<usize>>;
type LoadFuture<'conn, 'query> = BoxFuture<'query, QueryResult<Self::Stream<'conn, 'query>>>;
type Stream<'conn, 'query> = BoxStream<'static, QueryResult<SqliteRow<'conn, 'query>>>;
type Row<'conn, 'query> = SqliteRow<'conn, 'query>;
type ExecuteFuture<'conn, 'query> = LocalBoxFuture<'conn, QueryResult<usize>>;
type LoadFuture<'conn, 'query> = LocalBoxFuture<'conn, QueryResult<Self::Stream<'conn, 'query>>>;
type Stream<'conn, 'query> = LocalBoxStream<'conn, QueryResult<Self::Row<'conn, 'query>>>;
type Row<'conn, 'query> = OwnedSqliteRow;

async fn establish(database_url: &str) -> diesel::prelude::ConnectionResult<Self> {
WasmSqliteConnection::establish_inner(database_url).await
}

fn load<'conn, 'query, T>(&'conn mut self, _source: T) -> Self::LoadFuture<'conn, 'query>
fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
where
T: AsQuery + 'query,
T::Query: QueryFragment<Self::Backend> + QueryId + 'query,
T: AsQuery,
T::Query: QueryFragment<Self::Backend> + QueryId,
{
todo!()
let query = source.as_query();
self.with_prepared_statement(query, |_, statement| async move {
Ok(StatementStream::new(statement).stream())
})
}

fn execute_returning_count<'conn, 'query, T>(
&'conn mut self,
_source: T,
query: T,
) -> Self::ExecuteFuture<'conn, 'query>
where
T: QueryFragment<Self::Backend> + QueryId + 'query,
{
todo!()
self.with_prepared_statement(query, |conn, statement| async move {
statement.run().await.map(|_| {
conn.rows_affected_by_last_query()
})
})
}

fn transaction_state(
&mut self,
) -> &mut <Self::TransactionManager as diesel_async::TransactionManager<Self>>::TransactionStateData{
todo!()
&mut self.transaction_manager
}

fn instrumentation(&mut self) -> &mut dyn Instrumentation {
Expand All @@ -111,32 +121,7 @@ impl AsyncConnection for WasmSqliteConnection {
}
}

/*
impl LoadConnection<DefaultLoadingMode> for WasmSqliteConnection {
type Cursor<'conn, 'query> = StatementIterator<'conn, 'query>;
type Row<'conn, 'query> = self::row::SqliteRow<'conn, 'query>;
fn load<'conn, 'query, T>(
&'conn mut self,
source: T,
) -> QueryResult<Self::Cursor<'conn, 'query>>
where
T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
Self::Backend: QueryMetadata<T::SqlType>,
{
let statement = self.prepared_query(source)?;

Ok(StatementIterator::new(statement))
}
}
*/
/*
impl WithMetadataLookup for WasmSqliteConnection {
fn metadata_lookup(&mut self) -> &mut <WasmSqlite as TypeMetadata>::MetadataLookup {
&mut self.metadata_lookup
}
}
*/

#[cfg(feature = "r2d2")]
impl crate::r2d2::R2D2Connection for crate::sqlite::SqliteConnection {
Expand Down Expand Up @@ -243,39 +228,53 @@ impl WasmSqliteConnection {
}
}

async fn prepared_query<'conn, 'query, T>(
fn with_prepared_statement<'conn, Q, F, R>(
&'conn mut self,
source: T,
) -> QueryResult<StatementUse<'conn, 'query>>
query: Q,
callback: impl (FnOnce(&'conn mut RawConnection, StatementUse<'conn>) -> F) + 'conn
) -> LocalBoxFuture<'_, QueryResult<R>>
where
T: QueryFragment<WasmSqlite> + QueryId + 'query,
Q: QueryFragment<WasmSqlite> + QueryId,
F: Future<Output = QueryResult<R>>,
{
let raw_connection = &self.raw_connection;
let cache = &mut self.statement_cache;
let maybe_type_id = T::query_id();
let cache_key = StatementCacheKey::for_source(maybe_type_id, &source, &[], &WasmSqlite)?;


let is_safe_to_cache_prepared = source.is_safe_to_cache_prepared(&WasmSqlite)?;
let mut qb = SqliteQueryBuilder::new();
let sql = source.to_sql(&mut qb, &WasmSqlite).map(|()| qb.finish())?;
let WasmSqliteConnection {
ref mut raw_connection,
ref mut statement_cache,
ref mut instrumentation,
..
} = self;

let statement = cache.cached_prepared_statement(
cache_key,
sql,
is_safe_to_cache_prepared,
&[],
raw_connection.clone(),
&self.instrumentation,
).await?.0; // Cloned RawConnection is dropped here
let maybe_type_id = Q::query_id();
let instrumentation = instrumentation.clone();


Ok(StatementUse::bind(statement, source, self.instrumentation.as_ref())?)
let cache_key = StatementCacheKey::for_source(maybe_type_id, &query, &[], &WasmSqlite);
let is_safe_to_cache_prepared = query.is_safe_to_cache_prepared(&WasmSqlite);

// C put this in box to avoid virtual fn call for SQLite C
// not sure if that still applies here
let query = Box::new(query);
let mut bind_collector = SqliteBindCollector::new();
let bind_collector = query.collect_binds(&mut bind_collector, &mut (), &WasmSqlite).map(|()| bind_collector.moveable());

let mut qb = SqliteQueryBuilder::new();
let sql = query.to_sql(&mut qb, &WasmSqlite).map(|()| qb.finish());

async move {
let (statement, conn) = statement_cache.cached_prepared_statement(
cache_key?,
sql?,
is_safe_to_cache_prepared?,
&[],
raw_connection,
&instrumentation,
).await?; // Cloned RawConnection is dropped here
let statement = StatementUse::bind(statement, bind_collector?, instrumentation)?;
callback(conn, statement).await
}.boxed_local()
}

async fn establish_inner(database_url: &str) -> Result<WasmSqliteConnection, ConnectionError> {
use diesel::result::ConnectionError::CouldntSetupConfiguration;
// use diesel::result::ConnectionError::CouldntSetupConfiguration;
let raw_connection = RawConnection::establish(database_url).await.unwrap();
let sqlite3 = crate::get_sqlite().await;

Expand All @@ -284,8 +283,7 @@ impl WasmSqliteConnection {
Ok(Self {
statement_cache: StmtCache::new(),
raw_connection,
transaction_state: AnsiTransactionManager::default(),
metadata_lookup: (),
transaction_manager: AnsiTransactionManager::default(),
instrumentation: Arc::new(Mutex::new(None)),
})
}
Expand Down
5 changes: 4 additions & 1 deletion diesel-wasm-sqlite/src/connection/raw.rs
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#![allow(dead_code)]
// functions are needed, but missing functionality means they aren't used yet.

use crate::{
sqlite_types::{SqliteFlags, SqliteOpenFlags},
SqliteType, WasmSqlite, WasmSqliteError,
Expand Down Expand Up @@ -159,7 +162,7 @@ impl RawConnection {
}

#[async_trait::async_trait(?Send)]
impl diesel_async::stmt_cache::PrepareCallback<Statement, SqliteType> for RawConnection {
impl diesel_async::stmt_cache::PrepareCallback<Statement, SqliteType> for &'_ mut RawConnection {
async fn prepare(
self,
sql: &str,
Expand Down
30 changes: 15 additions & 15 deletions diesel-wasm-sqlite/src/connection/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@ use diesel::{
};

#[allow(missing_debug_implementations)]
pub struct SqliteRow<'stmt, 'query> {
pub(super) inner: Rc<RefCell<PrivateSqliteRow<'stmt, 'query>>>,
pub struct SqliteRow<'stmt> {
pub(super) inner: Rc<RefCell<PrivateSqliteRow<'stmt>>>,
pub(super) field_count: usize,
}

pub(super) enum PrivateSqliteRow<'stmt, 'query> {
Direct(StatementUse<'stmt, 'query>),
pub(super) enum PrivateSqliteRow<'stmt> {
Direct(StatementUse<'stmt>),
Duplicated {
values: Vec<Option<OwnedSqliteValue>>,
column_names: Rc<[Option<String>]>,
},
}

impl<'stmt> IntoOwnedRow<'stmt, WasmSqlite> for SqliteRow<'stmt, '_> {
impl<'stmt> IntoOwnedRow<'stmt, WasmSqlite> for SqliteRow<'stmt> {
type OwnedRow = OwnedSqliteRow;

type Cache = Option<Arc<[Option<String>]>>;
Expand All @@ -35,11 +35,11 @@ impl<'stmt> IntoOwnedRow<'stmt, WasmSqlite> for SqliteRow<'stmt, '_> {
}
}

impl<'stmt, 'query> PrivateSqliteRow<'stmt, 'query> {
impl<'stmt> PrivateSqliteRow<'stmt> {
pub(super) fn duplicate(
&mut self,
column_names: &mut Option<Rc<[Option<String>]>>,
) -> PrivateSqliteRow<'stmt, 'query> {
) -> PrivateSqliteRow<'stmt> {
match self {
PrivateSqliteRow::Direct(stmt) => {
let column_names = if let Some(column_names) = column_names {
Expand Down Expand Up @@ -129,10 +129,10 @@ impl<'stmt, 'query> PrivateSqliteRow<'stmt, 'query> {
}
}

impl<'stmt, 'query> RowSealed for SqliteRow<'stmt, 'query> {}
impl<'stmt> RowSealed for SqliteRow<'stmt> {}

impl<'stmt, 'query> Row<'stmt, WasmSqlite> for SqliteRow<'stmt, 'query> {
type Field<'field> = SqliteField<'field, 'field> where 'stmt: 'field, Self: 'field;
impl<'stmt> Row<'stmt, WasmSqlite> for SqliteRow<'stmt> {
type Field<'field> = SqliteField<'field> where 'stmt: 'field, Self: 'field;
type InnerPartialRow = Self;

fn field_count(&self) -> usize {
Expand All @@ -156,7 +156,7 @@ impl<'stmt, 'query> Row<'stmt, WasmSqlite> for SqliteRow<'stmt, 'query> {
}
}

impl<'stmt, 'query> RowIndex<usize> for SqliteRow<'stmt, 'query> {
impl<'stmt> RowIndex<usize> for SqliteRow<'stmt> {
fn idx(&self, idx: usize) -> Option<usize> {
if idx < self.field_count {
Some(idx)
Expand All @@ -166,7 +166,7 @@ impl<'stmt, 'query> RowIndex<usize> for SqliteRow<'stmt, 'query> {
}
}

impl<'stmt, 'idx, 'query> RowIndex<&'idx str> for SqliteRow<'stmt, 'query> {
impl<'stmt, 'idx> RowIndex<&'idx str> for SqliteRow<'stmt> {
fn idx(&self, field_name: &'idx str) -> Option<usize> {
match &mut *self.inner.borrow_mut() {
PrivateSqliteRow::Direct(stmt) => stmt.index_for_column_name(field_name),
Expand All @@ -178,12 +178,12 @@ impl<'stmt, 'idx, 'query> RowIndex<&'idx str> for SqliteRow<'stmt, 'query> {
}

#[allow(missing_debug_implementations)]
pub struct SqliteField<'stmt, 'query> {
pub(super) row: Ref<'stmt, PrivateSqliteRow<'stmt, 'query>>,
pub struct SqliteField<'stmt> {
pub(super) row: Ref<'stmt, PrivateSqliteRow<'stmt>>,
pub(super) col_idx: i32,
}

impl<'stmt, 'query> Field<'stmt, WasmSqlite> for SqliteField<'stmt, 'query> {
impl<'stmt> Field<'stmt, WasmSqlite> for SqliteField<'stmt> {
fn field_name(&self) -> Option<&str> {
match &*self.row {
PrivateSqliteRow::Direct(stmt) => stmt.field_name(self.col_idx),
Expand Down
Loading

0 comments on commit d7e509c

Please sign in to comment.