Skip to content

Commit

Permalink
fix: include more detailed memory metric in mem check record (#364)
Browse files Browse the repository at this point in the history
* chore: add `base_mem_check` crate

* chore: update `Cargo.lock`

* fix: include more detailed memory metric in mem check record

* chore: update event manager example

* stamp: simplify
  • Loading branch information
nyannyacha authored Jun 14, 2024
1 parent d7addff commit ac6a0a4
Show file tree
Hide file tree
Showing 14 changed files with 152 additions and 139 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]
members = [
"./crates/base",
"./crates/base_mem_check",
"./crates/cli",
"./crates/sb_workers",
"./crates/sb_env",
Expand Down
1 change: 1 addition & 0 deletions crates/base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
base_mem_check = { version = "0.1.0", path = "../base_mem_check" }
http_utils = { version = "0.1.0", path = "../http_utils" }
async-trait.workspace = true
thiserror.workspace = true
Expand Down
91 changes: 46 additions & 45 deletions crates/base/src/deno_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::rt_worker::worker::DuplexStreamEntry;
use crate::utils::units::{bytes_to_display, mib_to_bytes};

use anyhow::{anyhow, bail, Context, Error};
use base_mem_check::{MemCheckState, WorkerHeapStatistics};
use cooked_waker::{IntoWaker, WakeRef};
use cpu_timer::get_thread_time;
use ctor::ctor;
Expand Down Expand Up @@ -33,8 +34,7 @@ use std::collections::HashMap;
use std::ffi::c_void;
use std::fmt;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::task::Poll;
use std::time::Duration;
use tokio::sync::{mpsc, Notify};
Expand Down Expand Up @@ -118,25 +118,22 @@ fn get_error_class_name(e: &AnyError) -> &'static str {
sb_core::errors_rt::get_error_class_name(e).unwrap_or("Error")
}

#[derive(Default, Clone)]
struct MemCheckState {
#[derive(Default)]
struct MemCheck {
drop_token: CancellationToken,
limit: Option<usize>,
waker: Arc<AtomicWaker>,
notify: Arc<Notify>,
current_bytes: Arc<AtomicUsize>,

#[cfg(debug_assertions)]
exceeded: Arc<AtomicFlag>,
state: Arc<RwLock<MemCheckState>>,
}

impl Drop for MemCheckState {
impl Drop for MemCheck {
fn drop(&mut self) {
self.drop_token.cancel();
}
}

impl MemCheckState {
impl MemCheck {
fn check(&self, isolate: &mut Isolate) -> usize {
let Some(limit) = self.limit else {
return 0;
Expand All @@ -158,25 +155,22 @@ impl MemCheckState {
.saturating_add(used_heap_bytes)
.saturating_add(external_bytes);

self.current_bytes.store(total_bytes, Ordering::Release);
let heap_stats = WorkerHeapStatistics::from(&stats);
let mut state = self.state.write().unwrap();

if total_bytes >= limit {
self.notify.notify_waiters();
state.current = heap_stats;

#[cfg(debug_assertions)]
if !self.exceeded.is_raised() {
self.exceeded.raise();
if total_bytes >= limit {
if !state.exceeded {
state.exceeded = true;
}

drop(state);
self.notify.notify_waiters();
}

total_bytes
}

#[allow(dead_code)]
#[cfg(debug_assertions)]
fn is_exceeded(&self) -> bool {
self.exceeded.is_raised()
}
}

pub trait GetRuntimeContext {
Expand All @@ -201,7 +195,7 @@ pub struct DenoRuntime<RuntimeContext = ()> {
main_module_id: ModuleId,
maybe_inspector: Option<Inspector>,

mem_check_state: Arc<MemCheckState>,
mem_check: Arc<MemCheck>,
waker: Arc<AtomicWaker>,

_phantom_runtime_context: PhantomData<RuntimeContext>,
Expand All @@ -212,7 +206,7 @@ impl<RuntimeContext> Drop for DenoRuntime<RuntimeContext> {
if self.conf.is_user_worker() {
self.js_runtime.v8_isolate().remove_gc_prologue_callback(
mem_check_gc_prologue_callback_fn,
Arc::as_ptr(&self.mem_check_state) as *mut _,
Arc::as_ptr(&self.mem_check) as *mut _,
);
}
}
Expand Down Expand Up @@ -458,25 +452,25 @@ where
];

let mut create_params = None;
let mut mem_check_state = MemCheckState::default();
let mut mem_check = MemCheck::default();

if conf.is_user_worker() {
let memory_limit =
mib_to_bytes(conf.as_user_worker().unwrap().memory_limit_mb) as usize;

let allocator = CustomAllocator::new(memory_limit);

allocator.set_waker(mem_check_state.waker.clone());
allocator.set_waker(mem_check.waker.clone());

mem_check_state.limit = Some(memory_limit);
mem_check.limit = Some(memory_limit);
create_params = Some(
deno_core::v8::CreateParams::default()
.heap_limits(mib_to_bytes(0) as usize, memory_limit)
.array_buffer_allocator(allocator.into_v8_allocator()),
)
};

let mem_check_state = Arc::new(mem_check_state);
let mem_check = Arc::new(mem_check);
let runtime_options = RuntimeOptions {
extensions,
is_main: true,
Expand Down Expand Up @@ -543,14 +537,14 @@ where
if is_user_worker {
js_runtime.v8_isolate().add_gc_prologue_callback(
mem_check_gc_prologue_callback_fn,
Arc::as_ptr(&mem_check_state) as *mut _,
Arc::as_ptr(&mem_check) as *mut _,
GCType::ALL,
);

js_runtime
.op_state()
.borrow_mut()
.put(MemCheckWaker::from(mem_check_state.waker.clone()));
.put(MemCheckWaker::from(mem_check.waker.clone()));
}

js_runtime
Expand Down Expand Up @@ -607,8 +601,8 @@ where

if is_user_worker {
drop(rt::SUPERVISOR_RT.spawn({
let drop_token = mem_check_state.drop_token.clone();
let waker = mem_check_state.waker.clone();
let drop_token = mem_check.drop_token.clone();
let waker = mem_check.waker.clone();

async move {
// TODO(Nyannyacha): Should we introduce exponential
Expand Down Expand Up @@ -641,7 +635,7 @@ where
main_module_id,
maybe_inspector,

mem_check_state,
mem_check,
waker: Arc::default(),

_phantom_runtime_context: PhantomData,
Expand Down Expand Up @@ -735,7 +729,7 @@ where
let is_termination_requested = self.is_termination_requested.clone();
let is_user_worker = self.conf.is_user_worker();
let global_waker = self.waker.clone();
let mem_check_state = is_user_worker.then(|| self.mem_check_state.clone());
let mem_check = is_user_worker.then(|| self.mem_check.clone());

let poll_result = poll_fn(|cx| unsafe {
// INVARIANT: Only can steal current task by other threads when LIFO
Expand Down Expand Up @@ -809,7 +803,7 @@ where
}));

if is_user_worker {
let mem_state = mem_check_state.as_ref().unwrap();
let mem_state = mem_check.as_ref().unwrap();
let total_malloced_bytes = mem_state.check(js_runtime.v8_isolate().as_mut());

mem_state.waker.register(waker);
Expand Down Expand Up @@ -859,24 +853,31 @@ where
self.maybe_inspector.clone()
}

pub fn mem_check_captured_bytes(&self) -> Arc<AtomicUsize> {
self.mem_check_state.current_bytes.clone()
pub fn mem_check_state(&self) -> Arc<RwLock<MemCheckState>> {
self.mem_check.state.clone()
}

pub fn add_memory_limit_callback<C>(&self, mut cb: C)
where
// XXX(Nyannyacha): Should we relax bounds a bit more?
C: FnMut(usize) -> bool + Send + 'static,
C: FnMut(MemCheckState) -> bool + Send + 'static,
{
let notify = self.mem_check_state.notify.clone();
let drop_token = self.mem_check_state.drop_token.clone();
let current_bytes = self.mem_check_state.current_bytes.clone();
let notify = self.mem_check.notify.clone();
let drop_token = self.mem_check.drop_token.clone();
let state = self.mem_check_state();

drop(rt::SUPERVISOR_RT.spawn(async move {
loop {
tokio::select! {
_ = notify.notified() => {
if cb(current_bytes.load(Ordering::Acquire)) {
let state = tokio::task::spawn_blocking({
let state = state.clone();
move || {
*state.read().unwrap()
}
}).await.unwrap();

if cb(state) {
break;
}
}
Expand Down Expand Up @@ -931,7 +932,7 @@ extern "C" fn mem_check_gc_prologue_callback_fn(
data: *mut c_void,
) {
unsafe {
(*(data as *mut MemCheckState)).check(&mut *isolate);
(*(data as *mut MemCheck)).check(&mut *isolate);
}
}

Expand Down Expand Up @@ -1607,7 +1608,7 @@ mod test {
assert!(result.is_ok(), "expected no errors");

// however, mem checker must be raised because it aggregates heap usage
assert!(user_rt.mem_check_state.is_exceeded());
assert!(user_rt.mem_check.state.read().unwrap().exceeded);
}

#[tokio::test]
Expand Down Expand Up @@ -1661,7 +1662,7 @@ mod test {

callback_rx.recv().await.unwrap();

assert!(user_rt.mem_check_state.is_exceeded());
assert!(user_rt.mem_check.state.read().unwrap().exceeded);
};

if timeout(Duration::from_secs(10), wait_fut).await.is_err() {
Expand Down
3 changes: 2 additions & 1 deletion crates/base/src/rt_worker/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::rt_worker::utils::{get_event_metadata, parse_worker_conf};
use crate::rt_worker::worker_ctx::create_supervisor;
use crate::utils::send_event_if_event_worker_available;
use anyhow::{anyhow, Error};
use base_mem_check::MemCheckState;
use event_worker::events::{
EventLoopCompletedEvent, EventMetadata, ShutdownEvent, ShutdownReason, UncaughtExceptionEvent,
WorkerEventWithMetadata, WorkerEvents, WorkerMemoryUsed,
Expand Down Expand Up @@ -224,7 +225,7 @@ impl Worker {
total: 0,
heap: 0,
external: 0,
mem_check_captured: 0,
mem_check_captured: MemCheckState::default(),
},
},
));
Expand Down
Loading

0 comments on commit ac6a0a4

Please sign in to comment.