From 3cddc61bd80c6f6092c77d78c7a30dae0e9c6529 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=83=A5=EB=83=90=EC=B1=A0?= Date: Thu, 28 Nov 2024 13:52:22 +0900 Subject: [PATCH] feat: support waiting for background tasks on the `per_worker` policy (#451) * chore: update deno manifest * chore: update `.gitignore` * feat: support waiting for background tasks on the `per_worker` policy * chore: add integration tests * chore: add an example * chore: add `global.d.ts` --- .gitignore | 3 +- crates/base/src/deno_runtime.rs | 20 +-- crates/base/src/rt_worker/supervisor/mod.rs | 2 + .../supervisor/strategy_per_worker.rs | 77 ++++++++---- crates/base/src/rt_worker/worker.rs | 12 +- crates/base/src/rt_worker/worker_ctx.rs | 2 + crates/base/test_cases/main/index.ts | 25 +++- .../mark-background-task-2/index.ts | 43 +++++++ .../test_cases/mark-background-task/index.ts | 43 +++++++ crates/base/tests/integration_tests.rs | 114 ++++++++++++++---- crates/sb_core/js/async_hook.js | 36 ++++++ crates/sb_core/js/bootstrap.js | 10 +- crates/sb_core/lib.rs | 48 ++++++++ deno.json | 3 + deno.lock | 41 ------- examples/mark-background-task/index.ts | 43 +++++++ types/global.d.ts | 1 + 17 files changed, 410 insertions(+), 113 deletions(-) create mode 100644 crates/base/test_cases/mark-background-task-2/index.ts create mode 100644 crates/base/test_cases/mark-background-task/index.ts create mode 100644 crates/sb_core/js/async_hook.js delete mode 100644 deno.lock create mode 100644 examples/mark-background-task/index.ts create mode 100644 types/global.d.ts diff --git a/.gitignore b/.gitignore index b7bd1eff0..a3bc80935 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,5 @@ scripts/debug.sh node_modules/ .DS_Store -eszip.bin \ No newline at end of file +eszip.bin +deno.lock \ No newline at end of file diff --git a/crates/base/src/deno_runtime.rs b/crates/base/src/deno_runtime.rs index 0b33680ee..18456551c 100644 --- a/crates/base/src/deno_runtime.rs +++ b/crates/base/src/deno_runtime.rs @@ -69,7 +69,7 @@ use sb_core::external_memory::CustomAllocator; use sb_core::net::sb_core_net; use sb_core::permissions::{sb_core_permissions, Permissions}; use sb_core::runtime::sb_core_runtime; -use sb_core::{sb_core_main_js, MemCheckWaker}; +use sb_core::{sb_core_main_js, MemCheckWaker, PromiseMetrics}; use sb_env::sb_env as sb_env_op; use sb_fs::deno_compile_fs::DenoCompileFileSystem; use sb_graph::emitter::EmitterFactory; @@ -254,6 +254,7 @@ pub struct DenoRuntime { main_module_id: ModuleId, maybe_inspector: Option, + promise_metrics: PromiseMetrics, mem_check: Arc, waker: Arc, @@ -322,6 +323,7 @@ where // TODO(Nyannyacha): Make sure `service_path` is an absolute path first. let drop_token = CancellationToken::default(); + let promise_metrics = PromiseMetrics::default(); let base_dir_path = std::env::current_dir().map(|p| p.join(&service_path))?; let Ok(mut main_module_url) = Url::from_directory_path(&base_dir_path) else { @@ -709,19 +711,12 @@ where { let main_context = js_runtime.main_context(); - let op_state = js_runtime.op_state(); let mut op_state = op_state.borrow_mut(); op_state.put(dispatch_fns); + op_state.put(promise_metrics.clone()); op_state.put(GlobalMainContext(main_context)); - } - - let version: Option<&str> = option_env!("GIT_V_TAG"); - - { - let op_state_rc = js_runtime.op_state(); - let mut op_state = op_state_rc.borrow_mut(); // NOTE(Andreespirela): We do this because "NODE_DEBUG" is trying to be read during // initialization, But we need the gotham state to be up-to-date. @@ -739,7 +734,7 @@ where // 2: isEventsWorker conf.is_events_worker(), // 3: edgeRuntimeVersion - version.unwrap_or("0.1.0"), + option_env!("GIT_V_TAG").unwrap_or("0.1.0"), // 4: denoVersion MAYBE_DENO_VERSION .get() @@ -884,6 +879,7 @@ where main_module_id, maybe_inspector, + promise_metrics, mem_check, waker: Arc::default(), @@ -1210,6 +1206,10 @@ where self.maybe_inspector.clone() } + pub fn promise_metrics(&self) -> PromiseMetrics { + self.promise_metrics.clone() + } + pub fn mem_check_state(&self) -> Arc> { self.mem_check.state.clone() } diff --git a/crates/base/src/rt_worker/supervisor/mod.rs b/crates/base/src/rt_worker/supervisor/mod.rs index ee3531e0c..277586454 100644 --- a/crates/base/src/rt_worker/supervisor/mod.rs +++ b/crates/base/src/rt_worker/supervisor/mod.rs @@ -8,6 +8,7 @@ use deno_core::v8; use enum_as_inner::EnumAsInner; use futures_util::task::AtomicWaker; use log::{error, warn}; +use sb_core::PromiseMetrics; use sb_workers::context::{Timing, UserWorkerMsgs, UserWorkerRuntimeOpts}; use tokio::sync::{ mpsc::{self, UnboundedReceiver}, @@ -129,6 +130,7 @@ pub struct Arguments { pub cpu_usage_metrics_rx: Option>, pub cpu_timer_param: CPUTimerParam, pub supervisor_policy: SupervisorPolicy, + pub promise_metrics: PromiseMetrics, pub timing: Option, pub memory_limit_rx: mpsc::UnboundedReceiver<()>, pub pool_msg_tx: Option>, diff --git a/crates/base/src/rt_worker/supervisor/strategy_per_worker.rs b/crates/base/src/rt_worker/supervisor/strategy_per_worker.rs index 9fbc65c0d..a46add2bb 100644 --- a/crates/base/src/rt_worker/supervisor/strategy_per_worker.rs +++ b/crates/base/src/rt_worker/supervisor/strategy_per_worker.rs @@ -18,6 +18,7 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { let Arguments { key, runtime_opts, + promise_metrics, timing, mut memory_limit_rx, cpu_timer, @@ -55,11 +56,13 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { let is_wall_clock_limit_disabled = wall_clock_limit_ms == 0; let mut is_worker_entered = false; let mut is_wall_clock_beforeunload_armed = false; + let mut is_cpu_time_soft_limit_reached = false; + let mut is_termination_requested = false; + let mut have_all_reqs_been_acknowledged = false; let mut cpu_usage_metrics_rx = cpu_usage_metrics_rx.unwrap(); let mut cpu_usage_ms = 0i64; - let mut cpu_time_soft_limit_reached = false; let mut wall_clock_alerts = 0; let mut req_ack_count = 0usize; @@ -113,10 +116,10 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { tokio::pin!(wall_clock_duration_alert); tokio::pin!(wall_clock_beforeunload_alert); - loop { + let result = 'scope: loop { tokio::select! { _ = supervise.cancelled() => { - return (ShutdownReason::TerminationRequested, cpu_usage_ms); + break 'scope (ShutdownReason::TerminationRequested, cpu_usage_ms); } _ = async { @@ -124,9 +127,12 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { Some(token) => token.inbound.cancelled().await, None => pending().await, } - } => { - terminate_fn(); - return (ShutdownReason::TerminationRequested, cpu_usage_ms); + }, if !is_termination_requested => { + is_termination_requested = true; + if promise_metrics.have_all_promises_been_resolved() { + terminate_fn(); + break 'scope (ShutdownReason::TerminationRequested, cpu_usage_ms); + } } Some(metrics) = cpu_usage_metrics_rx.recv() => { @@ -160,17 +166,28 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { if cpu_usage_ms >= hard_limit_ms as i64 { terminate_fn(); error!("CPU time hard limit reached: isolate: {:?}", key); - return (ShutdownReason::CPUTime, cpu_usage_ms); - } else if cpu_usage_ms >= soft_limit_ms as i64 && !cpu_time_soft_limit_reached { + break 'scope (ShutdownReason::CPUTime, cpu_usage_ms); + } else if cpu_usage_ms >= soft_limit_ms as i64 && !is_cpu_time_soft_limit_reached { early_retire_fn(); error!("CPU time soft limit reached: isolate: {:?}", key); - cpu_time_soft_limit_reached = true; - if req_ack_count == demand.load(Ordering::Acquire) { + is_cpu_time_soft_limit_reached = true; + have_all_reqs_been_acknowledged = req_ack_count == demand.load(Ordering::Acquire); + + if have_all_reqs_been_acknowledged + && promise_metrics.have_all_promises_been_resolved() + { terminate_fn(); error!("early termination due to the last request being completed: isolate: {:?}", key); - return (ShutdownReason::EarlyDrop, cpu_usage_ms); + break 'scope (ShutdownReason::EarlyDrop, cpu_usage_ms); } + + } else if is_cpu_time_soft_limit_reached + && have_all_reqs_been_acknowledged + && promise_metrics.have_all_promises_been_resolved() + { + terminate_fn(); + break 'scope (ShutdownReason::EarlyDrop, cpu_usage_ms); } } } @@ -179,28 +196,33 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { Some(_) = wait_cpu_alarm(cpu_alarms_rx.as_mut()) => { if is_worker_entered { - if !cpu_time_soft_limit_reached { + if !is_cpu_time_soft_limit_reached { early_retire_fn(); error!("CPU time soft limit reached: isolate: {:?}", key); - cpu_time_soft_limit_reached = true; - if req_ack_count == demand.load(Ordering::Acquire) { + is_cpu_time_soft_limit_reached = true; + have_all_reqs_been_acknowledged = req_ack_count == demand.load(Ordering::Acquire); + + if have_all_reqs_been_acknowledged + && promise_metrics.have_all_promises_been_resolved() + { terminate_fn(); error!("early termination due to the last request being completed: isolate: {:?}", key); - return (ShutdownReason::EarlyDrop, cpu_usage_ms); + break 'scope (ShutdownReason::EarlyDrop, cpu_usage_ms); } } else { terminate_fn(); error!("CPU time hard limit reached: isolate: {:?}", key); - return (ShutdownReason::CPUTime, cpu_usage_ms); + break 'scope (ShutdownReason::CPUTime, cpu_usage_ms); } } } Some(_) = req_end_rx.recv() => { req_ack_count += 1; + have_all_reqs_been_acknowledged = req_ack_count == demand.load(Ordering::Acquire); - if !cpu_time_soft_limit_reached { + if !is_cpu_time_soft_limit_reached { if let Some(tx) = pool_msg_tx.clone() { if tx.send(UserWorkerMsgs::Idle(key)).is_err() { error!("failed to send idle msg to pool: {:?}", key); @@ -208,13 +230,16 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { } } - if !cpu_time_soft_limit_reached || req_ack_count != demand.load(Ordering::Acquire) { + if !is_cpu_time_soft_limit_reached + || !have_all_reqs_been_acknowledged + || !promise_metrics.have_all_promises_been_resolved() + { continue; } terminate_fn(); error!("early termination due to the last request being completed: isolate: {:?}", key); - return (ShutdownReason::EarlyDrop, cpu_usage_ms); + break 'scope (ShutdownReason::EarlyDrop, cpu_usage_ms); } _ = wall_clock_duration_alert.tick(), if !is_wall_clock_limit_disabled => { @@ -229,10 +254,8 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { let is_in_flight_req_exists = req_ack_count != demand.load(Ordering::Acquire); terminate_fn(); - error!("wall clock duration reached: isolate: {:?} (in_flight_req_exists = {})", key, is_in_flight_req_exists); - - return (ShutdownReason::WallClockTime, cpu_usage_ms); + break 'scope (ShutdownReason::WallClockTime, cpu_usage_ms); } } @@ -252,8 +275,16 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { Some(_) = memory_limit_rx.recv() => { terminate_fn(); error!("memory limit reached for the worker: isolate: {:?}", key); - return (ShutdownReason::Memory, cpu_usage_ms); + break 'scope (ShutdownReason::Memory, cpu_usage_ms); } } + }; + + match result { + (ShutdownReason::EarlyDrop, cpu_usage_ms) if is_termination_requested => { + (ShutdownReason::TerminationRequested, cpu_usage_ms) + } + + result => result, } } diff --git a/crates/base/src/rt_worker/worker.rs b/crates/base/src/rt_worker/worker.rs index a1062423e..fad42ba66 100644 --- a/crates/base/src/rt_worker/worker.rs +++ b/crates/base/src/rt_worker/worker.rs @@ -242,12 +242,12 @@ impl Worker { }, )); - if !thread_safe_handle.request_interrupt( - supervisor::v8_handle_termination, - data_ptr_mut as *mut std::ffi::c_void, - ) { - drop(unsafe { Box::from_raw(data_ptr_mut) }); - } + if !thread_safe_handle.request_interrupt( + supervisor::v8_handle_termination, + data_ptr_mut as *mut std::ffi::c_void, + ) { + drop(unsafe { Box::from_raw(data_ptr_mut) }); + } while !is_terminated.is_raised() { waker.wake(); diff --git a/crates/base/src/rt_worker/worker_ctx.rs b/crates/base/src/rt_worker/worker_ctx.rs index 95e118040..5e5f3c395 100644 --- a/crates/base/src/rt_worker/worker_ctx.rs +++ b/crates/base/src/rt_worker/worker_ctx.rs @@ -352,6 +352,7 @@ pub fn create_supervisor( let _rt_guard = base_rt::SUPERVISOR_RT.enter(); let maybe_cpu_timer_inner = maybe_cpu_timer.clone(); let supervise_cancel_token_inner = supervise_cancel_token.clone(); + let promise_metrics = worker_runtime.promise_metrics(); tokio::spawn(async move { let (isolate_memory_usage_tx, isolate_memory_usage_rx) = @@ -364,6 +365,7 @@ pub fn create_supervisor( cpu_usage_metrics_rx, cpu_timer_param, supervisor_policy, + promise_metrics, timing, memory_limit_rx, pool_msg_tx, diff --git a/crates/base/test_cases/main/index.ts b/crates/base/test_cases/main/index.ts index 2873d9832..1c844f1e2 100644 --- a/crates/base/test_cases/main/index.ts +++ b/crates/base/test_cases/main/index.ts @@ -1,6 +1,20 @@ console.log('main function started'); -Deno.serve(async (req: Request) => { +function parseIntFromHeadersOrDefault(req: Request, key: string, val: number) { + const headerValue = req.headers.get(key); + if (!headerValue) { + return val; + } + + const parsedValue = parseInt(headerValue); + if (isNaN(parsedValue)) { + return val; + } + + return parsedValue; +} + +Deno.serve((req: Request) => { console.log(req.url); const url = new URL(req.url); const { pathname } = url; @@ -19,10 +33,11 @@ Deno.serve(async (req: Request) => { console.error(`serving the request with ${servicePath}`); const createWorker = async () => { - const memoryLimitMb = 150; - const workerTimeoutMs = 10 * 60 * 1000; - const cpuTimeSoftLimitMs = 10 * 60 * 1000; - const cpuTimeHardLimitMs = 10 * 60 * 1000; + const memoryLimitMb = parseIntFromHeadersOrDefault(req, "x-memory-limit-mb", 150); + const workerTimeoutMs = parseIntFromHeadersOrDefault(req, "x-worker-timeout-ms", 10 * 60 * 1000); + const cpuTimeSoftLimitMs = parseIntFromHeadersOrDefault(req, "x-cpu-time-soft-limit-ms", 10 * 60 * 1000); + const cpuTimeHardLimitMs = parseIntFromHeadersOrDefault(req, "x-cpu-time-hard-limit-ms", 10 * 60 * 1000); + console.log(cpuTimeSoftLimitMs); const noModuleCache = false; const importMapPath = null; const envVarsObj = Deno.env.toObject(); diff --git a/crates/base/test_cases/mark-background-task-2/index.ts b/crates/base/test_cases/mark-background-task-2/index.ts new file mode 100644 index 000000000..c8e73f382 --- /dev/null +++ b/crates/base/test_cases/mark-background-task-2/index.ts @@ -0,0 +1,43 @@ +function sleep(ms: number): Promise { + return new Promise(res => { + setTimeout(() => { + res("meow"); + }, ms) + }); +} + +function mySlowFunction(baseNumber: number) { + const now = Date.now(); + let result = 0; + for (let i = Math.pow(baseNumber, 7); i >= 0; i--) { + result += Math.atan(i) * Math.tan(i); + } + const duration = Date.now() - now; + return { result: result, duration: duration }; +} + +class MyBackgroundTaskEvent extends Event { + readonly taskPromise: Promise + + constructor(taskPromise: Promise) { + super('myBackgroundTask') + this.taskPromise = taskPromise + } +} + +globalThis.addEventListener('myBackgroundTask', async (event) => { + const str = await (event as MyBackgroundTaskEvent).taskPromise + console.log(str); +}); + + +export default { + fetch() { + // consumes lots of cpu time + mySlowFunction(10); + // however, this time we did not notify the runtime that it should wait for this promise. + // therefore, the above console.log(str) will not be output and the worker will terminate. + dispatchEvent(new MyBackgroundTaskEvent(sleep(5000))); + return new Response(); + } +} \ No newline at end of file diff --git a/crates/base/test_cases/mark-background-task/index.ts b/crates/base/test_cases/mark-background-task/index.ts new file mode 100644 index 000000000..5960e1d40 --- /dev/null +++ b/crates/base/test_cases/mark-background-task/index.ts @@ -0,0 +1,43 @@ +function sleep(ms: number): Promise { + return new Promise(res => { + setTimeout(() => { + res("meow"); + }, ms) + }); +} + +function mySlowFunction(baseNumber: number) { + const now = Date.now(); + let result = 0; + for (let i = Math.pow(baseNumber, 7); i >= 0; i--) { + result += Math.atan(i) * Math.tan(i); + } + const duration = Date.now() - now; + return { result: result, duration: duration }; +} + +class MyBackgroundTaskEvent extends Event { + readonly taskPromise: Promise + + constructor(taskPromise: Promise) { + super('myBackgroundTask') + this.taskPromise = taskPromise + } +} + +globalThis.addEventListener('myBackgroundTask', async (event) => { + const str = await (event as MyBackgroundTaskEvent).taskPromise + console.log(str); +}); + + +export default { + fetch() { + // consumes lots of cpu time + mySlowFunction(10); + // make a promise that waits for 5s, and at the same time, notify the runtime that it should + // wait for this promise. + dispatchEvent(new MyBackgroundTaskEvent(markAsBackgroundTask(sleep(5000)))); + return new Response(); + } +} \ No newline at end of file diff --git a/crates/base/tests/integration_tests.rs b/crates/base/tests/integration_tests.rs index 09895a2aa..932b9bffb 100644 --- a/crates/base/tests/integration_tests.rs +++ b/crates/base/tests/integration_tests.rs @@ -3,7 +3,7 @@ use deno_config::JsxImportSourceConfig; use event_worker::events::{LogLevel, WorkerEvents}; -use http_v02 as http; +use http_v02::{self as http, HeaderValue}; use hyper_v014 as hyper; use reqwest_v011 as reqwest; use sb_graph::{emitter::EmitterFactory, generate_binary_eszip, EszipPayloadKind}; @@ -3105,8 +3105,6 @@ async fn test_runtime_beforeunload_event(kind: &'static str, pct: u8) { tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; - let mut found_triggered = false; - while let Some(ev) = rx.recv().await { let WorkerEvents::Log(ev) = ev.event else { continue; @@ -3114,17 +3112,15 @@ async fn test_runtime_beforeunload_event(kind: &'static str, pct: u8) { if ev.level != LogLevel::Info { continue; } - - found_triggered = ev + if ev .msg - .contains(&format!("triggered {}", kind.replace('-', "_"))); - - if found_triggered { - break; + .contains(&format!("triggered {}", kind.replace('-', "_"))) + { + return; } } - assert!(found_triggered); + unreachable!("test failed"); } #[tokio::test] @@ -3147,6 +3143,7 @@ async fn test_runtime_event_beforeunload_mem() { // NOTE(Nyannyacha): We cannot enable this test unless we clarify the trigger point of the unload // event. +// // #[tokio::test] // #[serial] // async fn test_runtime_event_unload() { @@ -3156,7 +3153,7 @@ async fn test_runtime_event_beforeunload_mem() { // .with_worker_event_sender(Some(tx)) // .build() // .await; - +// // let resp = tb // .request(|b| { // b.uri("/unload") @@ -3166,14 +3163,12 @@ async fn test_runtime_event_beforeunload_mem() { // }) // .await // .unwrap(); - +// // assert_eq!(resp.status().as_u16(), StatusCode::OK); - +// // sleep(Duration::from_secs(8)).await; // tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; - -// let mut found_triggered = false; - +// // while let Some(ev) = rx.recv().await { // let WorkerEvents::Log(ev) = ev.event else { // continue; @@ -3181,17 +3176,92 @@ async fn test_runtime_event_beforeunload_mem() { // if ev.level != LogLevel::Info { // continue; // } - -// found_triggered = ev.msg.contains("triggered unload"); - -// if found_triggered { +// if ev.msg.contains("triggered unload") { // break; // } // } - -// assert!(found_triggered); +// +// unreachable!("test failed"); // } +#[tokio::test] +#[serial] +async fn test_should_wait_for_background_tests() { + let (tx, mut rx) = mpsc::unbounded_channel(); + let tb = TestBedBuilder::new("./test_cases/main") + // only the `per_worker` policy allows waiting for background tasks. + .with_per_worker_policy(None) + .with_worker_event_sender(Some(tx)) + .build() + .await; + + let resp = tb + .request(|b| { + b.uri("/mark-background-task") + .header("x-cpu-time-soft-limit-ms", HeaderValue::from_static("100")) + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + + while let Some(ev) = rx.recv().await { + let WorkerEvents::Log(ev) = ev.event else { + continue; + }; + if ev.level != LogLevel::Info { + continue; + } + if ev.msg.contains("meow") { + return; + } + } + + unreachable!("test failed"); +} + +#[tokio::test] +#[serial] +async fn test_should_not_wait_for_background_tests() { + let (tx, mut rx) = mpsc::unbounded_channel(); + let tb = TestBedBuilder::new("./test_cases/main") + // only the `per_worker` policy allows waiting for background tasks. + .with_per_worker_policy(None) + .with_worker_event_sender(Some(tx)) + .build() + .await; + + let resp = tb + .request(|b| { + b.uri("/mark-background-task-2") + .header("x-cpu-time-soft-limit-ms", HeaderValue::from_static("100")) + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + + while let Some(ev) = rx.recv().await { + let WorkerEvents::Log(ev) = ev.event else { + continue; + }; + if ev.level != LogLevel::Info { + continue; + } + if ev.msg.contains("meow") { + unreachable!("test failed"); + } + } +} + #[derive(Deserialize)] struct ErrorResponsePayload { msg: String, diff --git a/crates/sb_core/js/async_hook.js b/crates/sb_core/js/async_hook.js new file mode 100644 index 000000000..e4c41e91e --- /dev/null +++ b/crates/sb_core/js/async_hook.js @@ -0,0 +1,36 @@ +import { core, primordials } from 'ext:core/mod.js'; + +const ops = core.ops; +const { + Promise +} = primordials; + +let COUNTER = 0; +const PROMISES = new Map(); + +function markAsBackgroundTask(maybePromise) { + if (maybePromise instanceof Promise) { + ops.op_tap_promise_metrics("init"); + PROMISES.set(maybePromise, ++COUNTER); + } + + return maybePromise; +} + +function installPromiseHook() { + core.setPromiseHooks( + null, + null, + null, + promise => { + if (PROMISES.delete(promise)) { + ops.op_tap_promise_metrics("resolve"); + } + }, + ); +} + +export { + markAsBackgroundTask, + installPromiseHook +} \ No newline at end of file diff --git a/crates/sb_core/js/bootstrap.js b/crates/sb_core/js/bootstrap.js index 831d66a71..228a477bf 100644 --- a/crates/sb_core/js/bootstrap.js +++ b/crates/sb_core/js/bootstrap.js @@ -24,6 +24,7 @@ import * as globalInterfaces from 'ext:deno_web/04_global_interfaces.js'; import { SUPABASE_ENV } from 'ext:sb_env/env.js'; import { USER_WORKER_API as ai } from 'ext:sb_ai/js/ai.js'; import 'ext:sb_ai/js/onnxruntime/cache_adapter.js'; +import { markAsBackgroundTask, installPromiseHook } from 'ext:sb_core_main_js/js/async_hook.js'; import { registerErrors } from 'ext:sb_core_main_js/js/errors.js'; import { formatException, @@ -356,13 +357,12 @@ function warnOnDeprecatedApi(apiName, stack, ...suggestions) { ObjectAssign(internals, { warnOnDeprecatedApi }); function runtimeStart(target) { - /* core.setMacrotaskCallback(timers.handleTimerMacrotask); - core.setMacrotaskCallback(promiseRejectMacrotaskCallback);*/ + // core.setMacrotaskCallback(timers.handleTimerMacrotask); + // core.setMacrotaskCallback(promiseRejectMacrotaskCallback); core.setWasmStreamingCallback(fetch.handleWasmStreaming); - ops.op_set_format_exception_callback(formatException); - core.setBuildInfo(target); + installPromiseHook(); // deno-lint-ignore prefer-primordials Error.prepareStackTrace = core.prepareStackTrace; @@ -594,8 +594,8 @@ globalThis.bootstrapSBEdge = (opts, extraCtx) => { if (isUserWorker) { delete globalThis.EdgeRuntime; - // override console ObjectDefineProperties(globalThis, { + markAsBackgroundTask: nonEnumerable(markAsBackgroundTask), console: nonEnumerable( new console.Console((msg, level) => { return ops.op_user_worker_log(msg, level > 1); diff --git a/crates/sb_core/lib.rs b/crates/sb_core/lib.rs index e050ceb39..d3dcb8bba 100644 --- a/crates/sb_core/lib.rs +++ b/crates/sb_core/lib.rs @@ -14,6 +14,7 @@ use futures::FutureExt; use log::error; use serde::Serialize; use tokio::sync::oneshot; +use tracing::{debug, debug_span}; mod upgrade; @@ -351,6 +352,51 @@ fn op_raise_segfault(_state: &mut OpState) { } } +#[derive(Debug, Default, Clone)] +pub struct PromiseMetrics { + init: Arc, + resolve: Arc, +} + +impl PromiseMetrics { + pub fn get_init_count(&self) -> usize { + self.init.load(Ordering::Acquire) + } + + pub fn get_resolve_count(&self) -> usize { + self.resolve.load(Ordering::Acquire) + } + + pub fn have_all_promises_been_resolved(&self) -> bool { + self.get_init_count() == self.get_resolve_count() + } +} + +#[op2(fast)] +fn op_tap_promise_metrics(state: &mut OpState, #[string] kind: &str) { + let _span = debug_span!("op_tap_promise_metrics", kind).entered(); + let metrics = if state.has::() { + state.borrow_mut::() + } else { + state.put(PromiseMetrics::default()); + state.borrow_mut() + }; + + match kind { + "init" => { + metrics.init.fetch_add(1, Ordering::Release); + } + + "resolve" => { + metrics.resolve.fetch_add(1, Ordering::Release); + } + + _ => {} + } + + debug!(?metrics); +} + #[op2] #[serde] pub fn op_bootstrap_unstable_args(_state: &mut OpState) -> Vec { @@ -371,9 +417,11 @@ deno_core::extension!( op_set_raw, op_bootstrap_unstable_args, op_raise_segfault, + op_tap_promise_metrics, ], esm_entry_point = "ext:sb_core_main_js/js/bootstrap.js", esm = [ + "js/async_hook.js", "js/permissions.js", "js/errors.js", "js/fieldUtils.js", diff --git a/deno.json b/deno.json index f830c46bc..04dc868d9 100644 --- a/deno.json +++ b/deno.json @@ -6,5 +6,8 @@ "singleQuote": true, "proseWrap": "preserve", "include": ["examples/**/*.ts"] + }, + "imports": { + "npm:@meowmeow/foobar": "npm:is-odd" } } diff --git a/deno.lock b/deno.lock deleted file mode 100644 index e6377b07c..000000000 --- a/deno.lock +++ /dev/null @@ -1,41 +0,0 @@ -{ - "version": "3", - "packages": { - "specifiers": { - "npm:is-even": "npm:is-even@1.0.0" - }, - "npm": { - "is-buffer@1.1.6": { - "integrity": "sha512-NcdALwpXkTm5Zvvbk7owOUSvVvBKDgKP5/ewfXEznmQFfs4ZRmanOeKBTjRVjka3QFoN6XJ+9F3USqfHqTaU5w==", - "dependencies": {} - }, - "is-even@1.0.0": { - "integrity": "sha512-LEhnkAdJqic4Dbqn58A0y52IXoHWlsueqQkKfMfdEnIYG8A1sm/GHidKkS6yvXlMoRrkM34csHnXQtOqcb+Jzg==", - "dependencies": { - "is-odd": "is-odd@0.1.2" - } - }, - "is-number@3.0.0": { - "integrity": "sha512-4cboCqIpliH+mAvFNegjZQ4kgKc3ZUhQVr3HvWbSh5q3WH2v82ct+T2Y1hdU5Gdtorx/cLifQjqCbL7bpznLTg==", - "dependencies": { - "kind-of": "kind-of@3.2.2" - } - }, - "is-odd@0.1.2": { - "integrity": "sha512-Ri7C2K7o5IrUU9UEI8losXJCCD/UtsaIrkR5sxIcFg4xQ9cRJXlWA5DQvTE0yDc0krvSNLsRGXN11UPS6KyfBw==", - "dependencies": { - "is-number": "is-number@3.0.0" - } - }, - "kind-of@3.2.2": { - "integrity": "sha512-NOW9QQXMoZGg/oqnVNoNTTIFEIid1627WCffUBJEdMxYApq7mNE7CpzucIPc+ZQg25Phej7IJSmX3hO+oblOtQ==", - "dependencies": { - "is-buffer": "is-buffer@1.1.6" - } - } - } - }, - "remote": { - "https://deno.land/x/is_even@v1.0/mod.ts": "f0596cc34079796565f5fb7ebc0496c1dfe435f1039a7751e1922c89e6f85d6e" - } -} diff --git a/examples/mark-background-task/index.ts b/examples/mark-background-task/index.ts new file mode 100644 index 000000000..5960e1d40 --- /dev/null +++ b/examples/mark-background-task/index.ts @@ -0,0 +1,43 @@ +function sleep(ms: number): Promise { + return new Promise(res => { + setTimeout(() => { + res("meow"); + }, ms) + }); +} + +function mySlowFunction(baseNumber: number) { + const now = Date.now(); + let result = 0; + for (let i = Math.pow(baseNumber, 7); i >= 0; i--) { + result += Math.atan(i) * Math.tan(i); + } + const duration = Date.now() - now; + return { result: result, duration: duration }; +} + +class MyBackgroundTaskEvent extends Event { + readonly taskPromise: Promise + + constructor(taskPromise: Promise) { + super('myBackgroundTask') + this.taskPromise = taskPromise + } +} + +globalThis.addEventListener('myBackgroundTask', async (event) => { + const str = await (event as MyBackgroundTaskEvent).taskPromise + console.log(str); +}); + + +export default { + fetch() { + // consumes lots of cpu time + mySlowFunction(10); + // make a promise that waits for 5s, and at the same time, notify the runtime that it should + // wait for this promise. + dispatchEvent(new MyBackgroundTaskEvent(markAsBackgroundTask(sleep(5000)))); + return new Response(); + } +} \ No newline at end of file diff --git a/types/global.d.ts b/types/global.d.ts new file mode 100644 index 000000000..fdf7d2a1a --- /dev/null +++ b/types/global.d.ts @@ -0,0 +1 @@ +declare function markAsBackgroundTask(promise: Promise): Promise; \ No newline at end of file