Skip to content

Commit

Permalink
Clean up AsyncAdapter (#277)
Browse files Browse the repository at this point in the history
Address TODOs, and improve comments including about why I believe the unsafe usage to be sound.
That said, I welcome additional eyes on the unsafe sections here . In some ways I find unsafe in Rust even harder to reason about than memory safety in C++ as Rust has a greater expectation that objects may be allowed to move.
  • Loading branch information
Electron100 authored Nov 12, 2024
1 parent 3d71616 commit b325162
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 74 deletions.
3 changes: 1 addition & 2 deletions async_checklist.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
* [x] Ensure sqlite works in async
* [x] Fully support sync too. Using async should not be required
* [ ] Clean up miscellaneous TODOs
* [ ] Establish soundness for unsafe sections of AsyncAdapter
* [ ] Consider publishing `AsyncAdapter` into its own crate
* [x] Establish soundness for unsafe sections of AsyncAdapter
* [ ] Should async and/or async_adapter be under a separate feature?
* [ ] Integrate deadpool or bb8 for async connection pool
184 changes: 113 additions & 71 deletions butane_core/src/db/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::query::Order;
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;
use tokio::sync::oneshot;

enum Command {
Func(Box<dyn FnOnce() + Send>),
Expand All @@ -32,8 +31,7 @@ impl AsyncAdapterEnv {
match cmd {
Command::Func(func) => func(),
Command::Shutdown => {
// TODO should connection support an explicit close?
return;
return; // break out of the loop
}
}
}
Expand All @@ -44,32 +42,37 @@ impl AsyncAdapterEnv {
}
}

/// Invokes a blocking function `func` as if it were async. This
/// is implemented by running it on the special thread created when the `AsyncAdapterEnv` was created.
async fn invoke<'c, 's, 'result, F, T, U>(
&'s self,
context: &SyncSendPtrMut<T>,
func: F,
) -> Result<U>
// todo can this just be result
where
F: FnOnce(&'c T) -> Result<U> + Send,
F: 'result,
U: Send + 'result,
T: ?Sized + 'c, // TODO should this be Send
T: ?Sized + 'c,
's: 'result,
'c: 'result,
{
// todo parts of this can be shared with the other two invoke functions
let (tx, rx) = oneshot::channel();
let context_ptr = SendPtr::new(context.inner);
let func_taking_ptr = |ctx: SendPtr<T>| func(unsafe { ctx.inner.as_ref() }.unwrap());
let wrapped_func = move || _ = tx.send(func_taking_ptr(context_ptr));
let boxed_func: Box<dyn FnOnce() + Send + 'result> = Box::new(wrapped_func);
let static_func: Box<dyn FnOnce() + Send + 'static> =
unsafe { std::mem::transmute(boxed_func) };
self.sender.send(Command::Func(static_func))?;
// https://stackoverflow.com/questions/52424449/
// https://docs.rs/crossbeam/0.8.2/crossbeam/fn.scope.html
// TODO ensure soundness and document why
// func itself must be `Send`, but we do not require &T to be
// Send (and thus don't reuire T to be Sync). We do this by
// basically unsafely sending our raw context pointer over to
// the worker thread anyway. The key observation on why we
// believe this to be sound is that we actually created the
// context over on the worker thread in the first place (see
// [AsyncAdapter::new]) and we do not allow direct access to
// it. So despite fact that we pass the context pointer back
// and forth, it's essentially owned by the worker thread -- all operations
// with context occur on that worker thread.
let (tx, rx) = tokio::sync::oneshot::channel();
let func_taking_ptr = |ctx: SyncSendPtrMut<T>| func(unsafe { ctx.inner.as_ref() }.unwrap());
unsafe {
let wrapped_func = move || _ = tx.send(func_taking_ptr(context.clone_unsafe()));
self.invoke_internal_unsafe(wrapped_func)?;
}
rx.await?
}

Expand All @@ -82,21 +85,16 @@ impl AsyncAdapterEnv {
F: FnOnce(&'c mut T) -> Result<U> + Send,
F: 'result,
U: Send + 'result,
T: ?Sized + 'c, // TODO should this be Send
T: ?Sized + 'c,
's: 'result,
'c: 'result,
{
let (tx, rx) = oneshot::channel();
let context_ptr = SendPtrMut::new(context.inner);
let func_taking_ptr = |ctx: SendPtrMut<T>| func(unsafe { ctx.inner.as_mut().unwrap() });
let wrapped_func = move || _ = tx.send(func_taking_ptr(context_ptr));
let boxed_func: Box<dyn FnOnce() + Send + 'result> = Box::new(wrapped_func);
let static_func: Box<dyn FnOnce() + Send + 'static> =
unsafe { std::mem::transmute(boxed_func) };
self.sender.send(Command::Func(static_func))?;
// https://stackoverflow.com/questions/52424449/
// https://docs.rs/crossbeam/0.8.2/crossbeam/fn.scope.html
// TODO ensure soundness and document why
let (tx, rx) = tokio::sync::oneshot::channel();
let func_taking_ptr = |ctx: SyncSendPtrMut<T>| func(unsafe { ctx.inner.as_mut().unwrap() });
unsafe {
let wrapped_func = move || _ = tx.send(func_taking_ptr(context.clone_unsafe()));
self.invoke_internal_unsafe(wrapped_func)?;
}
rx.await?
}

Expand All @@ -110,71 +108,92 @@ impl AsyncAdapterEnv {
'c: 'result,
{
let (tx, rx) = crossbeam_channel::unbounded();
let context_ptr = SendPtr::new(context);
let context_ptr = unsafe { SendPtr::new(context) };
let func_taking_ptr = |ctx: SendPtr<T>| func(unsafe { ctx.inner.as_ref() }.unwrap());
let wrapped_func = move || _ = tx.send(func_taking_ptr(context_ptr));
unsafe {
let wrapped_func = move || _ = tx.send(func_taking_ptr(context_ptr));
self.invoke_internal_unsafe(wrapped_func)?;
}
rx.recv()?
}

unsafe fn invoke_internal_unsafe<'s, 'result>(
&'s self,
// wrapped_func is a complete encapsulation of the function we
// want to invoke without any parameters left to provide
wrapped_func: impl FnOnce() + Send + 'result,
) -> Result<()> {
// We transmute wrapped_func (with an intermediate boxing
// step) solely to transform it's lifetime. The lifetime is an
// issue here because Rust itself has no way of knowing how
// long our sync worker thread is going to use it for. But
// *we* know that our worker thread will immediately execute
// the function and the caller to this method will wait to
// hear from the worker thread before proceeding (and thus
// before letting the lifetime lapse)
// https://stackoverflow.com/questions/52424449/
let boxed_func: Box<dyn FnOnce() + Send + 'result> = Box::new(wrapped_func);
let static_func: Box<dyn FnOnce() + Send + 'static> =
unsafe { std::mem::transmute(boxed_func) };
self.sender.send(Command::Func(static_func))?;
// TODO ensure soundness and document why
rx.recv()?
Ok(())
}
}

impl Drop for AsyncAdapterEnv {
fn drop(&mut self) {
self.sender
.send(Command::Shutdown)
.expect("Cannot send async adapter env shutdown command, cannot join thread");
let r = self.sender.send(Command::Shutdown);
if r.is_err() {
// editorconfig-checker-disable
crate::error!("Cannot send async adapter env shutdown command because channel is disconnected.\
Assuming this means thread died and is joinable. If it is not, join may hang indefinitely");
// editorconfig-checker-enable
}
self.thread_handle.take().map(|h| h.join());
}
}

/// Wrapper around a raw pointer that we assert is [Send]. Needless to
/// say, this requires care. See comments on `AsyncAdapterEnv::invoke`
/// for why we believe this to be sound.
struct SendPtr<T: ?Sized> {
inner: *const T,
}
impl<T: ?Sized> SendPtr<T> {
fn new(inner: *const T) -> Self {
unsafe fn new(inner: *const T) -> Self {
Self { inner }
}
}
unsafe impl<T: ?Sized> Send for SendPtr<T> {}

struct SendPtrMut<T: ?Sized> {
inner: *mut T,
}
impl<T: ?Sized> SendPtrMut<T> {
fn new(inner: *mut T) -> Self {
Self { inner }
}
}
unsafe impl<T: ?Sized> Send for SendPtrMut<T> {}

/// Like [SendPtrMut] but we also assert that it is [Sync]
struct SyncSendPtrMut<T: ?Sized> {
inner: *mut T,
}
impl<T: ?Sized> SyncSendPtrMut<T> {
fn new(inner: *mut T) -> Self {
// todo should this be unsafe
unsafe fn new(inner: *mut T) -> Self {
Self { inner }
}
unsafe fn clone_unsafe(&self) -> Self {
Self { inner: self.inner }
}
}
impl<T> From<T> for SyncSendPtrMut<T>
where
T: Debug + Sized,
T: Sized,
{
fn from(val: T) -> Self {
Self {
inner: Box::into_raw(Box::new(val)),
} // todo should this be unsafe
}
}
}
unsafe impl<T: Debug + ?Sized> Send for SyncSendPtrMut<T> {}
unsafe impl<T: ?Sized> Send for SyncSendPtrMut<T> {}
unsafe impl<T: ?Sized> Sync for SyncSendPtrMut<T> {}

impl<T: Debug + ?Sized> Debug for SyncSendPtrMut<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
// We enforce that inner is non-null and valid.
unsafe { (*self.inner).fmt(f) }
}
}
Expand All @@ -186,8 +205,8 @@ pub(super) struct AsyncAdapter<T: ?Sized> {
}

impl<T: ?Sized> AsyncAdapter<T> {
//todo document what this is for
fn new_internal<U: ?Sized>(&self, context_ptr: SyncSendPtrMut<U>) -> AsyncAdapter<U> {
/// Create a new AsyncAdapter with the given context and using the same `env` as self. Not a public method.
fn create_with_same_env<U: ?Sized>(&self, context_ptr: SyncSendPtrMut<U>) -> AsyncAdapter<U> {
AsyncAdapter {
env: self.env.clone(),
context: context_ptr,
Expand All @@ -204,7 +223,6 @@ impl<T: ?Sized> AsyncAdapter<T> {
'c: 'result,
's: 'c,
{
// todo verify the interior mutability won't panic here
self.env.invoke(&self.context, func).await
}

Expand All @@ -215,7 +233,6 @@ impl<T: ?Sized> AsyncAdapter<T> {
U: Send + 'result,
'c: 'result,
{
// todo verify the interior mutability won't panic here
self.env.invoke_mut(&self.context, func).await
}

Expand All @@ -226,22 +243,31 @@ impl<T: ?Sized> AsyncAdapter<T> {
U: Send + 'result,
'c: 'result,
{
// todo verify the interior mutability won't panic here
self.env.invoke_blocking(self.context.inner, func)
}
}

impl<T> AsyncAdapter<T> {
/// Create a new async adapter using `create_context` to create an instance of the inner type `T`.
pub(super) fn new<F>(create_context: F) -> Result<Self>
where
Self: Sized,
F: FnOnce() -> Result<T> + Send,
{
// TODO execute the create context function on the thread
let context = create_context()?;
let env = AsyncAdapterEnv::new();

// Execute the context creation function on our worker thread.
let dummy = (); // because we have to pass a context pointer to env.invoke
let context = env.invoke_blocking(&dummy, |_ctx: &()| {
let concrete_context = create_context()?;
// See comments about soundness on AsyncAdapterEnv::invoke
let context = unsafe { SyncSendPtrMut::new(Box::into_raw(Box::new(concrete_context))) };
Ok(context)
})?;

Ok(Self {
env: Arc::new(AsyncAdapterEnv::new()),
context: SyncSendPtrMut::new(Box::into_raw(Box::new(context))),
env: Arc::new(env),
context,
})
}
}
Expand Down Expand Up @@ -348,27 +374,44 @@ where
.invoke_mut(|conn| {
let transaction: Transaction = conn.transaction()?;
let transaction_ptr: *mut dyn BackendTransaction = Box::into_raw(transaction.trans);
Ok(SyncSendPtrMut::new(transaction_ptr))
Ok(unsafe { SyncSendPtrMut::new(transaction_ptr) })
})
.await?;
let transaction_adapter = self.new_internal(transaction_ptr);
let transaction_adapter = self.create_with_same_env(transaction_ptr);
Ok(TransactionAsync::new(Box::new(transaction_adapter)))
}

fn backend(&self) -> Box<dyn Backend> {
// todo clean up unwrap
self.invoke_blocking(|conn| Ok(conn.backend())).unwrap()
ok_or_panic_with_adapter_error(self.invoke_blocking(|conn| Ok(conn.backend())))
}

fn backend_name(&self) -> &'static str {
// todo clean up unwrap
self.invoke_blocking(|conn| Ok(conn.backend_name()))
.unwrap()
ok_or_panic_with_adapter_error(self.invoke_blocking(|conn| Ok(conn.backend_name())))
}

/// Tests if the connection has been closed. Backends which do not
/// support this check should return false.
fn is_closed(&self) -> bool {
// todo clean up unwrap
self.invoke_blocking(|conn| Ok(conn.is_closed())).unwrap()
ok_or_panic_with_adapter_error(self.invoke_blocking(|conn| Ok(conn.is_closed())))
}
}

fn ok_or_panic_with_adapter_error<T>(r: Result<T>) -> T {
match r {
Ok(ret) => ret,
// This is unfortunate, but should be rare. We never use it
// when invoking functions that can fail in their own right,
// so it indicates that the channel operation failed, which
// should only be possible if the other thread died
// unexpectedly.
Err(e) => panic!(
// editorconfig-checker-disable
"Internal error occurred within the sync->async adapter invoked when wrapping a function\
which does not permit error returns.\n\
Error: {}",
// editorconfig-checker-enable
e
)
}
}

Expand Down Expand Up @@ -401,7 +444,6 @@ where

/// Create an async connection using the synchronous `connect` method of `backend`. Use this when authoring
/// a backend which doesn't natively support async.
#[cfg(feature = "sqlite")] // todo expose this publicly for out-of-tree backends
pub async fn connect_async_via_sync<B>(backend: &B, conn_str: &str) -> Result<ConnectionAsync>
where
B: Backend + Clone + 'static,
Expand Down
2 changes: 1 addition & 1 deletion butane_core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::query::{BoolExpr, Order};
use crate::{migrations::adb, Error, Result, SqlVal, SqlValRef};

mod adapter;
pub use adapter::connect_async_via_sync;
pub(crate) mod dummy;
use dummy::DummyConnection;
mod sync_adapter;
Expand Down Expand Up @@ -81,7 +82,6 @@ mod internal {
/// Database connection.
#[maybe_async_cfg::maybe(
idents(
AsyncRequiresSend,
ConnectionMethods(sync = "ConnectionMethods", async = "ConnectionMethodsAsync"),
Transaction(sync = "Transaction", async = "TransactionAsync"),
),
Expand Down
9 changes: 9 additions & 0 deletions butane_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,8 @@ impl std::fmt::Display for SqlType {
#[cfg(feature = "log")]
pub use log::debug;
#[cfg(feature = "log")]
pub use log::error;
#[cfg(feature = "log")]
pub use log::info;
#[cfg(feature = "log")]
pub use log::warn;
Expand Down Expand Up @@ -411,4 +413,11 @@ mod btlog {
(target: $target:expr, $($arg:tt)+) => {};
($($arg:tt)+) => {};
}

/// Noop for when feature log is not enabled.
#[macro_export]
macro_rules! error {
(target: $target:expr, $($arg:tt)+) => {};
($($arg:tt)+) => {};
}
}

0 comments on commit b325162

Please sign in to comment.