From e0165b84672e8bbcf23913ddedcee43f6c2f65ed Mon Sep 17 00:00:00 2001 From: James Hugman Date: Thu, 5 Sep 2024 16:30:34 +0100 Subject: [PATCH 1/8] Fixup deadlock when hammering futures of a certain type --- cpp/includes/UniffiCallInvoker.h | 28 ++++++++++++++++------------ typescript/src/async-rust-call.ts | 29 ++++++++++++++++++++++++++--- 2 files changed, 42 insertions(+), 15 deletions(-) diff --git a/cpp/includes/UniffiCallInvoker.h b/cpp/includes/UniffiCallInvoker.h index 598305c5..90ddd424 100644 --- a/cpp/includes/UniffiCallInvoker.h +++ b/cpp/includes/UniffiCallInvoker.h @@ -5,9 +5,10 @@ */ #pragma once #include -#include +#include #include #include +#include #include namespace uniffi_runtime { @@ -57,23 +58,26 @@ class UniffiCallInvoker { if (std::this_thread::get_id() == threadId_) { func(rt); } else { - std::promise promise; - auto future = promise.get_future(); + std::mutex mtx; + std::condition_variable cv; + bool done = false; // The runtime argument was added to CallFunc in // https://github.com/facebook/react-native/pull/43375 // - // Once that is released, there will be a deprecation period. - // - // Any time during the deprecation period, we can switch `&rt` - // from being a captured variable to being an argument, i.e. - // commenting out one line, and uncommenting the other. - std::function wrapper = [&func, &promise, &rt]() { - // react::CallFunc wrapper = [&func, &promise](jsi::Runtime &rt) { + // This can be changed once that change is released. + // react::CallFunc wrapper = [&func, &mtx, &cv, &done](jsi::Runtime &rt) { + std::function wrapper = [&func, &rt, &mtx, &cv, &done]() { func(rt); - promise.set_value(); + { + std::lock_guard lock(mtx); + done = true; + } + cv.notify_one(); }; callInvoker_->invokeAsync(std::move(wrapper)); - future.wait(); + + std::unique_lock lock(mtx); + cv.wait(lock, [&done] { return done; }); } } }; diff --git a/typescript/src/async-rust-call.ts b/typescript/src/async-rust-call.ts index c7a7b59c..d3397227 100644 --- a/typescript/src/async-rust-call.ts +++ b/typescript/src/async-rust-call.ts @@ -34,6 +34,12 @@ type PollFunc = ( handle: UniffiHandle, ) => void; +// Calls setTimeout and then resolves the promise. +// This may be used as a simple yield. +export async function delayPromise(delayMs: number): Promise { + return new Promise((resolve) => setTimeout(resolve, delayMs)); +} + /** * This method calls an asynchronous method on the Rust side. * @@ -92,7 +98,7 @@ export async function uniffiRustCallAsync( pollResult = await pollRust((handle) => { pollFunc(rustFuture, uniffiFutureContinuationCallback, handle); }); - } while (pollResult != UNIFFI_RUST_FUTURE_POLL_READY); + } while (pollResult !== UNIFFI_RUST_FUTURE_POLL_READY); // Now it's ready, all we need to do is pick up the result (and error). return liftFunc( @@ -103,7 +109,7 @@ export async function uniffiRustCallAsync( ), ); } finally { - freeFunc(rustFuture); + setTimeout(() => freeFunc(rustFuture), 0); } } @@ -128,7 +134,24 @@ const uniffiFutureContinuationCallback: UniffiRustFutureContinuationCallback = ( pollResult: number, ) => { const resolve = UNIFFI_RUST_FUTURE_RESOLVER_MAP.remove(handle); - resolve(pollResult); + if (pollResult === UNIFFI_RUST_FUTURE_POLL_READY) { + resolve(pollResult); + } else { + // From https://github.com/mozilla/uniffi-rs/pull/1837/files#diff-8a28c9cf1245b4f714d406ea4044d68e1000099928eaca1afb504ccbc008fe9fR35-R37 + // + // > WARNING: the call to [rust_future_poll] must be scheduled to happen soon after the callback is + // > called, but not inside the callback itself. If [rust_future_poll] is called inside the + // > callback, some futures will deadlock and our scheduler code might as well. + // + // This delay is to ensure that `uniffiFutureContinuationCallback` returns before the next poll, i.e. + // so that the next poll is outside of this callback. + // + // The length of the delay seems to be significant (at least in tests which hammer a network). + // I would like to understand this more: I am still seeing deadlocks when this drops below its current + // delay, but these maybe related to a different issue, as alluded to in + // https://github.com/mozilla/uniffi-rs/pull/1901 + setTimeout(() => resolve(pollResult), 20); + } }; // For testing only. From 72640599d045069e5db992e39096fa8836f2e2b0 Mon Sep 17 00:00:00 2001 From: James Hugman Date: Tue, 10 Sep 2024 17:35:49 +0100 Subject: [PATCH 2/8] Move to use setImmediate --- typescript/src/async-rust-call.ts | 36 +++++++++++++++---------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/typescript/src/async-rust-call.ts b/typescript/src/async-rust-call.ts index d3397227..c0c43964 100644 --- a/typescript/src/async-rust-call.ts +++ b/typescript/src/async-rust-call.ts @@ -40,6 +40,10 @@ export async function delayPromise(delayMs: number): Promise { return new Promise((resolve) => setTimeout(resolve, delayMs)); } +async function nextTickPromise(): Promise { + return new Promise((resolve) => setImmediate(resolve)); +} + /** * This method calls an asynchronous method on the Rust side. * @@ -98,6 +102,9 @@ export async function uniffiRustCallAsync( pollResult = await pollRust((handle) => { pollFunc(rustFuture, uniffiFutureContinuationCallback, handle); }); + // We yield here to allow other tasks to happen between + // the end of the poll and the beginning of the next one. + await nextTickPromise(); } while (pollResult !== UNIFFI_RUST_FUTURE_POLL_READY); // Now it's ready, all we need to do is pick up the result (and error). @@ -109,7 +116,7 @@ export async function uniffiRustCallAsync( ), ); } finally { - setTimeout(() => freeFunc(rustFuture), 0); + setImmediate(() => freeFunc(rustFuture)); } } @@ -134,24 +141,15 @@ const uniffiFutureContinuationCallback: UniffiRustFutureContinuationCallback = ( pollResult: number, ) => { const resolve = UNIFFI_RUST_FUTURE_RESOLVER_MAP.remove(handle); - if (pollResult === UNIFFI_RUST_FUTURE_POLL_READY) { - resolve(pollResult); - } else { - // From https://github.com/mozilla/uniffi-rs/pull/1837/files#diff-8a28c9cf1245b4f714d406ea4044d68e1000099928eaca1afb504ccbc008fe9fR35-R37 - // - // > WARNING: the call to [rust_future_poll] must be scheduled to happen soon after the callback is - // > called, but not inside the callback itself. If [rust_future_poll] is called inside the - // > callback, some futures will deadlock and our scheduler code might as well. - // - // This delay is to ensure that `uniffiFutureContinuationCallback` returns before the next poll, i.e. - // so that the next poll is outside of this callback. - // - // The length of the delay seems to be significant (at least in tests which hammer a network). - // I would like to understand this more: I am still seeing deadlocks when this drops below its current - // delay, but these maybe related to a different issue, as alluded to in - // https://github.com/mozilla/uniffi-rs/pull/1901 - setTimeout(() => resolve(pollResult), 20); - } + // From https://github.com/mozilla/uniffi-rs/pull/1837/files#diff-8a28c9cf1245b4f714d406ea4044d68e1000099928eaca1afb504ccbc008fe9fR35-R37 + // + // > WARNING: the call to [rust_future_poll] must be scheduled to happen soon after the callback is + // > called, but not inside the callback itself. If [rust_future_poll] is called inside the + // > callback, some futures will deadlock and our scheduler code might as well. + // + // This setImmediate is to ensure that `uniffiFutureContinuationCallback` returns + // before the next poll, i.e. so that the next poll is outside of this callback. + setImmediate(() => resolve(pollResult)); }; // For testing only. From 5b6a081e5dcfe4ba815d4f4e4b2b21774928515a Mon Sep 17 00:00:00 2001 From: James Hugman Date: Mon, 16 Sep 2024 20:32:12 +0100 Subject: [PATCH 3/8] Move away from non-standard setImmediate --- typescript/src/async-rust-call.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/typescript/src/async-rust-call.ts b/typescript/src/async-rust-call.ts index c0c43964..26132af6 100644 --- a/typescript/src/async-rust-call.ts +++ b/typescript/src/async-rust-call.ts @@ -41,7 +41,7 @@ export async function delayPromise(delayMs: number): Promise { } async function nextTickPromise(): Promise { - return new Promise((resolve) => setImmediate(resolve)); + return await delayPromise(0); } /** @@ -116,7 +116,7 @@ export async function uniffiRustCallAsync( ), ); } finally { - setImmediate(() => freeFunc(rustFuture)); + setTimeout(() => freeFunc(rustFuture), 0); } } @@ -149,7 +149,7 @@ const uniffiFutureContinuationCallback: UniffiRustFutureContinuationCallback = ( // // This setImmediate is to ensure that `uniffiFutureContinuationCallback` returns // before the next poll, i.e. so that the next poll is outside of this callback. - setImmediate(() => resolve(pollResult)); + setTimeout(() => resolve(pollResult), 0); }; // For testing only. From dfbc08e438cc81eddcf704c1b34746ef7c94fbd5 Mon Sep 17 00:00:00 2001 From: James Hugman Date: Mon, 16 Sep 2024 19:11:03 +0100 Subject: [PATCH 4/8] Add next ticks before polling instead of after --- typescript/src/async-rust-call.ts | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/typescript/src/async-rust-call.ts b/typescript/src/async-rust-call.ts index 26132af6..64dd61eb 100644 --- a/typescript/src/async-rust-call.ts +++ b/typescript/src/async-rust-call.ts @@ -40,6 +40,8 @@ export async function delayPromise(delayMs: number): Promise { return new Promise((resolve) => setTimeout(resolve, delayMs)); } +// We'd most likely want this to be a microTask, but hermes doesn't support the +// Javascript API for them yet. async function nextTickPromise(): Promise { return await delayPromise(0); } @@ -97,16 +99,22 @@ export async function uniffiRustCallAsync( try { let pollResult: number | undefined; do { + // Now we have a future, we should prompt some work to happen in Rust. + // We need to make sure we don't poll from the stack frame as we the end of the poll, + // so we wait until the next tick before polling. + await nextTickPromise(); + // Calling pollFunc with a callback that resolves the promise that pollRust // returns: pollRust makes the promise, uniffiFutureContinuationCallback resolves it. pollResult = await pollRust((handle) => { pollFunc(rustFuture, uniffiFutureContinuationCallback, handle); }); - // We yield here to allow other tasks to happen between - // the end of the poll and the beginning of the next one. - await nextTickPromise(); } while (pollResult !== UNIFFI_RUST_FUTURE_POLL_READY); + // Now we've finished polling, as a precaution, we wait until the next tick before + // picking up the results. + await nextTickPromise(); + // Now it's ready, all we need to do is pick up the result (and error). return liftFunc( makeRustCall( From 3a6174092f1527d89b7d5d43be131ba768318e55 Mon Sep 17 00:00:00 2001 From: James Hugman Date: Mon, 16 Sep 2024 19:11:34 +0100 Subject: [PATCH 5/8] Simplify futures test --- fixtures/futures/tests/bindings/test_futures.ts | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/fixtures/futures/tests/bindings/test_futures.ts b/fixtures/futures/tests/bindings/test_futures.ts index 0b33d1b3..971bfa6a 100644 --- a/fixtures/futures/tests/bindings/test_futures.ts +++ b/fixtures/futures/tests/bindings/test_futures.ts @@ -138,11 +138,7 @@ function checkRemainingFutures(t: Asserts) { await asyncTest("Async methods", async (t) => { const megaphone = newMegaphone(); - let helloAlice = await t.asyncMeasure( - async () => megaphone.sayAfter(500, "Alice"), - 500, - 20, - ); + const helloAlice = await megaphone.sayAfter(500, "Alice"); t.assertEqual("HELLO, ALICE!", helloAlice); checkRemainingFutures(t); t.end(); From 22ddf7ba5fcb45745763cd413750959ffa91d042 Mon Sep 17 00:00:00 2001 From: James Hugman Date: Thu, 5 Sep 2024 18:11:12 +0100 Subject: [PATCH 6/8] tests --- Cargo.toml | 6 + fixtures/async-deadlock/Cargo.toml | 28 ++ fixtures/async-deadlock/src/lib.rs | 16 + .../tests/bindings/test_async_deadlock.py | 292 ++++++++++++++++++ .../tests/bindings/test_async_deadlock.ts | 126 ++++++++ fixtures/ext-types/Cargo.toml | 2 +- 6 files changed, 469 insertions(+), 1 deletion(-) create mode 100644 fixtures/async-deadlock/Cargo.toml create mode 100644 fixtures/async-deadlock/src/lib.rs create mode 100644 fixtures/async-deadlock/tests/bindings/test_async_deadlock.py create mode 100644 fixtures/async-deadlock/tests/bindings/test_async_deadlock.ts diff --git a/Cargo.toml b/Cargo.toml index aae31335..8164f4d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,3 +20,9 @@ serde = { version = "1", features = ["derive"] } uniffi = "=0.28.0" uniffi_bindgen = "=0.28.0" uniffi_meta = "=0.28.0" +matrix-sdk-ffi = { rev = "8fbf1101c232bcd69583eaaa4605542653213822", git = "https://github.com/vimeiro-co/matrix-rust-sdk-mirror" } +matrix-sdk = { rev = "8fbf1101c232bcd69583eaaa4605542653213822", git = "https://github.com/vimeiro-co/matrix-rust-sdk-mirror" } + +[patch.crates-io] +# async-compat = { git = "https://github.com/jplatte/async-compat", rev = "16dc8597ec09a6102d58d4e7b67714a35dd0ecb8" } +# const_panic = { git = "https://github.com/jplatte/const_panic", rev = "9024a4cb3eac45c1d2d980f17aaee287b17be498" } diff --git a/fixtures/async-deadlock/Cargo.toml b/fixtures/async-deadlock/Cargo.toml new file mode 100644 index 00000000..37f7e9c8 --- /dev/null +++ b/fixtures/async-deadlock/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "regression-async-deadlock" +edition = "2021" +version = "0.0.1" +authors = ["Filament Team "] +license = "MPL-2.0" +publish = false + +[lib] +crate-type = ["lib", "staticlib", "cdylib"] +name = "regression_async_deadlock" + +[patch.crates-io] +async-compat = { git = "https://github.com/jplatte/async-compat", rev = "16dc8597ec09a6102d58d4e7b67714a35dd0ecb8" } +const_panic = { git = "https://github.com/jplatte/const_panic", rev = "9024a4cb3eac45c1d2d980f17aaee287b17be498" } + +[dependencies] +uniffi = { workspace = true } +matrix-sdk-ffi = { workspace = true } +matrix-sdk = { workspace = true } +thiserror = "1.0" + + +[build-dependencies] +uniffi = { workspace = true, features = ["build"] } + +[dev-dependencies] +uniffi = { workspace = true, features = ["bindgen-tests"] } diff --git a/fixtures/async-deadlock/src/lib.rs b/fixtures/async-deadlock/src/lib.rs new file mode 100644 index 00000000..c6281c15 --- /dev/null +++ b/fixtures/async-deadlock/src/lib.rs @@ -0,0 +1,16 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/ + */ + +use std::sync::Arc; + +use matrix_sdk_ffi::client_builder::ClientBuilder; + +#[uniffi::export] +pub fn get_matrix_client_builder() -> Arc { + ClientBuilder::new() +} + +uniffi::setup_scaffolding!(); diff --git a/fixtures/async-deadlock/tests/bindings/test_async_deadlock.py b/fixtures/async-deadlock/tests/bindings/test_async_deadlock.py new file mode 100644 index 00000000..91f9a135 --- /dev/null +++ b/fixtures/async-deadlock/tests/bindings/test_async_deadlock.py @@ -0,0 +1,292 @@ +from regression_async_deadlock import * +import unittest +from datetime import datetime +import asyncio +import typing +import futures + +def now(): + return datetime.now() + +class TestFutures(unittest.TestCase): + def test_always_ready(self): + async def test(): + self.assertEqual(await always_ready(), True) + + asyncio.run(test()) + + def test_void(self): + async def test(): + self.assertEqual(await void(), None) + + asyncio.run(test()) + + def test_sleep(self): + async def test(): + t0 = now() + await sleep(2000) + t1 = now() + + t_delta = (t1 - t0).total_seconds() + self.assertGreater(t_delta, 2) + + asyncio.run(test()) + + def test_sequential_futures(self): + async def test(): + t0 = now() + result_alice = await say_after(100, 'Alice') + result_bob = await say_after(200, 'Bob') + t1 = now() + + t_delta = (t1 - t0).total_seconds() + self.assertGreater(t_delta, 0.3) + self.assertEqual(result_alice, 'Hello, Alice!') + self.assertEqual(result_bob, 'Hello, Bob!') + + asyncio.run(test()) + + def test_concurrent_tasks(self): + async def test(): + alice = asyncio.create_task(say_after(100, 'Alice')) + bob = asyncio.create_task(say_after(200, 'Bob')) + + t0 = now() + result_alice = await alice + result_bob = await bob + t1 = now() + + t_delta = (t1 - t0).total_seconds() + self.assertGreater(t_delta, 0.2) + self.assertEqual(result_alice, 'Hello, Alice!') + self.assertEqual(result_bob, 'Hello, Bob!') + + asyncio.run(test()) + + def test_async_methods(self): + async def test(): + megaphone = new_megaphone() + t0 = now() + result_alice = await megaphone.say_after(200, 'Alice') + t1 = now() + + t_delta = (t1 - t0).total_seconds() + self.assertGreater(t_delta, 0.2) + self.assertEqual(result_alice, 'HELLO, ALICE!') + + asyncio.run(test()) + + def test_async_constructors(self): + # Check the default constructor has been disabled. + with self.assertRaises(ValueError) as e: + Megaphone() + self.assertTrue(str(e.exception).startswith("async constructors not supported")) + + async def test(): + megaphone = await Megaphone.secondary() + result_alice = await megaphone.say_after(0, 'Alice') + self.assertEqual(result_alice, 'HELLO, ALICE!') + + udl_megaphone = await UdlMegaphone.secondary() + result_udl = await udl_megaphone.say_after(0, 'udl') + self.assertEqual(result_udl, 'HELLO, UDL!') + + asyncio.run(test()) + + def test_async_trait_interface_methods(self): + async def test(): + traits = get_say_after_traits() + t0 = now() + result1 = await traits[0].say_after(100, 'Alice') + result2 = await traits[1].say_after(100, 'Bob') + t1 = now() + + self.assertEqual(result1, 'Hello, Alice!') + self.assertEqual(result2, 'Hello, Bob!') + t_delta = (t1 - t0).total_seconds() + self.assertGreater(t_delta, 0.2) + + asyncio.run(test()) + + def test_udl_async_trait_interface_methods(self): + async def test(): + traits = get_say_after_udl_traits() + t0 = now() + result1 = await traits[0].say_after(100, 'Alice') + result2 = await traits[1].say_after(100, 'Bob') + t1 = now() + + self.assertEqual(result1, 'Hello, Alice!') + self.assertEqual(result2, 'Hello, Bob!') + t_delta = (t1 - t0).total_seconds() + self.assertGreater(t_delta, 0.2) + + asyncio.run(test()) + + def test_foreign_async_trait_interface_methods(self): + class PyAsyncParser: + def __init__(self): + self.completed_delays = 0 + + async def as_string(self, delay_ms, value): + await asyncio.sleep(delay_ms / 1000.0) + return str(value) + + async def try_from_string(self, delay_ms, value): + await asyncio.sleep(delay_ms / 1000.0) + if value == "force-unexpected-exception": + raise RuntimeError("UnexpectedException") + try: + return int(value) + except: + raise ParserError.NotAnInt() + + async def delay(self, delay_ms): + await asyncio.sleep(delay_ms / 1000.0) + self.completed_delays += 1 + + async def try_delay(self, delay_ms): + try: + delay_ms = int(delay_ms) + except: + raise ParserError.NotAnInt() + await asyncio.sleep(delay_ms / 1000.0) + self.completed_delays += 1 + + async def test(): + trait_obj = PyAsyncParser() + self.assertEqual(await as_string_using_trait(trait_obj, 1, 42), "42") + self.assertEqual(await try_from_string_using_trait(trait_obj, 1, "42"), 42) + with self.assertRaises(ParserError.NotAnInt): + await try_from_string_using_trait(trait_obj, 1, "fourty-two") + with self.assertRaises(ParserError.UnexpectedError): + await try_from_string_using_trait(trait_obj, 1, "force-unexpected-exception") + await delay_using_trait(trait_obj, 1) + await try_delay_using_trait(trait_obj, "1") + with self.assertRaises(ParserError.NotAnInt): + await try_delay_using_trait(trait_obj, "one") + + completed_delays_before = trait_obj.completed_delays + await cancel_delay_using_trait(trait_obj, 10) + # sleep long enough so that the `delay()` call would finish if it wasn't cancelled. + await asyncio.sleep(0.1) + # If the task was cancelled, then completed_delays won't have increased + self.assertEqual(trait_obj.completed_delays, completed_delays_before) + + + asyncio.run(test()) + # check that all foreign future handles were released + self.assertEqual(len(futures._UNIFFI_FOREIGN_FUTURE_HANDLE_MAP), 0) + + def test_async_object_param(self): + async def test(): + megaphone = new_megaphone() + t0 = now() + result_alice = await say_after_with_megaphone(megaphone, 200, 'Alice') + t1 = now() + + t_delta = (t1 - t0).total_seconds() + self.assertGreater(t_delta, 0.2) + self.assertEqual(result_alice, 'HELLO, ALICE!') + + asyncio.run(test()) + + def test_with_tokio_runtime(self): + async def test(): + t0 = now() + result_alice = await say_after_with_tokio(200, 'Alice') + t1 = now() + + t_delta = (t1 - t0).total_seconds() + self.assertGreater(t_delta, 0.2) + self.assertEqual(result_alice, 'Hello, Alice (with Tokio)!') + + asyncio.run(test()) + + def test_fallible(self): + async def test(): + result = await fallible_me(False) + self.assertEqual(result, 42) + + try: + result = await fallible_me(True) + self.assertTrue(False) # should never be reached + except MyError as exception: + self.assertTrue(True) + + megaphone = new_megaphone() + + result = await megaphone.fallible_me(False) + self.assertEqual(result, 42) + + try: + result = await megaphone.fallible_me(True) + self.assertTrue(False) # should never be reached + except MyError as exception: + self.assertTrue(True) + + asyncio.run(test()) + + def test_fallible_struct(self): + async def test(): + megaphone = await fallible_struct(False) + self.assertEqual(await megaphone.fallible_me(False), 42) + + try: + await fallible_struct(True) + self.assertTrue(False) # should never be reached + except MyError as exception: + pass + + asyncio.run(test()) + + def test_record(self): + async def test(): + result = await new_my_record("foo", 42) + self.assertEqual(result.__class__, MyRecord) + self.assertEqual(result.a, "foo") + self.assertEqual(result.b, 42) + + asyncio.run(test()) + + def test_cancel(self): + async def test(): + # Create a task + task = asyncio.create_task(say_after(200, 'Alice')) + # Wait to ensure that the polling has started, then cancel the task + await asyncio.sleep(0.1) + task.cancel() + # Wait long enough for the Rust callback to fire. This shouldn't cause an exception, + # even though the task is cancelled. + await asyncio.sleep(0.2) + # Awaiting the task should result in a CancelledError. + with self.assertRaises(asyncio.CancelledError): + await task + + asyncio.run(test()) + + # Test a future that uses a lock and that is cancelled. + def test_shared_resource_cancellation(self): + async def test(): + task = asyncio.create_task(use_shared_resource( + SharedResourceOptions(release_after_ms=5000, timeout_ms=100))) + # Wait some time to ensure the task has locked the shared resource + await asyncio.sleep(0.05) + task.cancel() + await use_shared_resource(SharedResourceOptions(release_after_ms=0, timeout_ms=1000)) + asyncio.run(test()) + + def test_shared_resource_no_cancellation(self): + async def test(): + await use_shared_resource(SharedResourceOptions(release_after_ms=100, timeout_ms=1000)) + await use_shared_resource(SharedResourceOptions(release_after_ms=0, timeout_ms=1000)) + asyncio.run(test()) + + def test_function_annotations(self): + async def test(): + self.assertEqual(typing.get_type_hints(sleep) , {"ms": int, "return": bool}) + self.assertEqual(typing.get_type_hints(sleep_no_return), {"ms": int, "return": type(None)}) + asyncio.run(test()) + +if __name__ == '__main__': + unittest.main() diff --git a/fixtures/async-deadlock/tests/bindings/test_async_deadlock.ts b/fixtures/async-deadlock/tests/bindings/test_async_deadlock.ts new file mode 100644 index 00000000..c6358dcb --- /dev/null +++ b/fixtures/async-deadlock/tests/bindings/test_async_deadlock.ts @@ -0,0 +1,126 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/ + */ + +import { asyncTest } from "@/asserts"; +import { console } from "@/hermes"; +import { + ClientBuilder, + ClientInterface, + mediaSourceFromUrl, + MediaSourceInterface, +} from "../../generated/matrix_sdk_ffi"; + +import { uniffiRustFutureHandleCount } from "uniffi-bindgen-react-native"; + +const url = "mxc://matrix.1badday.com/RUUIKNjovSSwYbULWuNQarDA"; + +(async () => { + await asyncTest("Test for deadlock 2", async (t) => { + await loadImages(2); + t.end(); + }); + + await asyncTest("Test for deadlock 4", async (t) => { + await loadImages(4); + t.end(); + }); + + await asyncTest("Test for deadlock 8", async (t) => { + await loadImages(8); + t.end(); + }); + + await asyncTest("Test for deadlock 16", async (t) => { + await loadImages(16); + t.end(); + }); + + await asyncTest("Test for deadlock 32", async (t) => { + await loadImages(32); + t.end(); + }); + + await asyncTest("Test for deadlock 64", async (t) => { + await loadImages(64); + t.end(); + }); + + await asyncTest("Test for deadlock 128", async (t) => { + await loadImages(128); + t.end(); + }); + + await asyncTest( + "Test for deadlock 256", + async (t) => { + await loadImages(256); + t.end(); + }, + 30 * 1000, + ); + + await asyncTest( + "Test for deadlock 512", + async (t) => { + await loadImages(512); + t.end(); + }, + 60 * 1000, + ); + + await asyncTest( + "Test for deadlock 512 2", + async (t) => { + await loadImages(512); + t.end(); + }, + 60 * 1000, + ); + + await asyncTest( + "Test for deadlock 1024", + async (t) => { + await loadImages(1024); + t.end(); + }, + 1024 * 1000, + ); +})(); + +async function loadImages(n: number): Promise { + const images = new Array(n).fill(url); + const sourcedImages = images.map((i) => ({ + source: mediaSourceFromUrl(i), + })); + const client = await new ClientBuilder() + .homeserverUrl("https://matrix.1badday.com/") + .build(); + console.log(`Starting to load… ${n}`); + let loaded = 0; + let start = Date.now(); + function elapsed(): number { + const current = Date.now(); + return Math.round((current - start) / 1000); + } + const interval = 1000; + let progress: any | undefined; + function show() { + console.log( + `-- ${elapsed()} sec: Loaded ${loaded}/${n}; currently waiting on ${uniffiRustFutureHandleCount()}`, + ); + progress = setTimeout(show, interval); + } + + const promises = sourcedImages.map((s, i) => { + return client.getMediaContent(s.source).then((content) => { + loaded++; + }); + }); + show(); + await Promise.allSettled(promises); + clearTimeout(progress!); + console.log(`… finished, after: ${elapsed()} sec`); +} diff --git a/fixtures/ext-types/Cargo.toml b/fixtures/ext-types/Cargo.toml index e009ed21..fc677016 100644 --- a/fixtures/ext-types/Cargo.toml +++ b/fixtures/ext-types/Cargo.toml @@ -21,7 +21,7 @@ name = "uniffi_ext_types_lib" [dependencies] anyhow = "1" -bytes = "1.3" +bytes = "^1.7.1" # Add the "scaffolding-ffi-buffer-fns" feature to make sure things can build correctly uniffi = { workspace = true } From 53149bc8546cc58c1c8db2364da90c338f960cee Mon Sep 17 00:00:00 2001 From: James Hugman Date: Tue, 10 Sep 2024 17:51:46 +0100 Subject: [PATCH 7/8] Add UniffiCallInvoker::invokeNonBlocking --- cpp/includes/UniffiCallInvoker.h | 10 ++++++++++ .../gen_cpp/templates/CallbackFunction.cpp | 4 ++++ crates/ubrn_bindgen/src/bindings/react_native/mod.rs | 4 ++++ 3 files changed, 18 insertions(+) diff --git a/cpp/includes/UniffiCallInvoker.h b/cpp/includes/UniffiCallInvoker.h index 90ddd424..520f462d 100644 --- a/cpp/includes/UniffiCallInvoker.h +++ b/cpp/includes/UniffiCallInvoker.h @@ -80,5 +80,15 @@ class UniffiCallInvoker { cv.wait(lock, [&done] { return done; }); } } + + /** + * Invokes the given function on the JS thread, by adding to + * the event queue. + */ + void invokeNonBlocking(jsi::Runtime &rt, UniffiCallFunc func) { + // react::CallFunc wrapper = [func](jsi::Runtime &rt) { + std::function wrapper = [func, &rt]() { func(rt); }; + callInvoker_->invokeAsync(std::move(wrapper)); + } }; } // namespace uniffi_runtime diff --git a/crates/ubrn_bindgen/src/bindings/react_native/gen_cpp/templates/CallbackFunction.cpp b/crates/ubrn_bindgen/src/bindings/react_native/gen_cpp/templates/CallbackFunction.cpp index 20aadbd2..385ad427 100644 --- a/crates/ubrn_bindgen/src/bindings/react_native/gen_cpp/templates/CallbackFunction.cpp +++ b/crates/ubrn_bindgen/src/bindings/react_native/gen_cpp/templates/CallbackFunction.cpp @@ -187,7 +187,11 @@ namespace {{ ns }} { }; // We'll then call that lambda from the callInvoker which will // look after calling it on the correct thread. + {% if callback.is_blocking() -%} callInvoker->invokeBlocking(rt, jsLambda); + {%- else %} + callInvoker->invokeNonBlocking(rt, jsLambda); + {%- endif %} }; return callback; } diff --git a/crates/ubrn_bindgen/src/bindings/react_native/mod.rs b/crates/ubrn_bindgen/src/bindings/react_native/mod.rs index cce42976..1f12d6e8 100644 --- a/crates/ubrn_bindgen/src/bindings/react_native/mod.rs +++ b/crates/ubrn_bindgen/src/bindings/react_native/mod.rs @@ -501,6 +501,10 @@ impl FfiCallbackFunction { .find(|a| a.is_return() && !a.type_().is_void()); arg.map(|a| a.type_()) } + + fn is_blocking(&self) -> bool { + self.name() != "RustFutureContinuationCallback" + } } fn is_future(nm: &str) -> bool { From 56ce2e9e1ee729f6fa757f676fe492284dcd9206 Mon Sep 17 00:00:00 2001 From: James Hugman Date: Tue, 10 Sep 2024 17:35:49 +0100 Subject: [PATCH 8/8] Changes to async-rust-call --- typescript/src/async-rust-call.ts | 28 +++------------------------- 1 file changed, 3 insertions(+), 25 deletions(-) diff --git a/typescript/src/async-rust-call.ts b/typescript/src/async-rust-call.ts index 64dd61eb..f37db17b 100644 --- a/typescript/src/async-rust-call.ts +++ b/typescript/src/async-rust-call.ts @@ -34,18 +34,6 @@ type PollFunc = ( handle: UniffiHandle, ) => void; -// Calls setTimeout and then resolves the promise. -// This may be used as a simple yield. -export async function delayPromise(delayMs: number): Promise { - return new Promise((resolve) => setTimeout(resolve, delayMs)); -} - -// We'd most likely want this to be a microTask, but hermes doesn't support the -// Javascript API for them yet. -async function nextTickPromise(): Promise { - return await delayPromise(0); -} - /** * This method calls an asynchronous method on the Rust side. * @@ -99,11 +87,6 @@ export async function uniffiRustCallAsync( try { let pollResult: number | undefined; do { - // Now we have a future, we should prompt some work to happen in Rust. - // We need to make sure we don't poll from the stack frame as we the end of the poll, - // so we wait until the next tick before polling. - await nextTickPromise(); - // Calling pollFunc with a callback that resolves the promise that pollRust // returns: pollRust makes the promise, uniffiFutureContinuationCallback resolves it. pollResult = await pollRust((handle) => { @@ -111,10 +94,6 @@ export async function uniffiRustCallAsync( }); } while (pollResult !== UNIFFI_RUST_FUTURE_POLL_READY); - // Now we've finished polling, as a precaution, we wait until the next tick before - // picking up the results. - await nextTickPromise(); - // Now it's ready, all we need to do is pick up the result (and error). return liftFunc( makeRustCall( @@ -124,7 +103,7 @@ export async function uniffiRustCallAsync( ), ); } finally { - setTimeout(() => freeFunc(rustFuture), 0); + freeFunc(rustFuture); } } @@ -155,9 +134,8 @@ const uniffiFutureContinuationCallback: UniffiRustFutureContinuationCallback = ( // > called, but not inside the callback itself. If [rust_future_poll] is called inside the // > callback, some futures will deadlock and our scheduler code might as well. // - // This setImmediate is to ensure that `uniffiFutureContinuationCallback` returns - // before the next poll, i.e. so that the next poll is outside of this callback. - setTimeout(() => resolve(pollResult), 0); + // We avoid this by using UniffiCallInvoker::invokeNonBlocking for this callback. + resolve(pollResult); }; // For testing only.