From 64ec66ff57a4bb8228ea8620b55d86f219034267 Mon Sep 17 00:00:00 2001 From: youxingzhi Date: Sat, 11 May 2024 11:52:52 +0800 Subject: [PATCH 1/6] blog-13: fix bugs --- examples/hello-world/src/App.tsx | 25 +++++--------------- packages/react-reconciler/src/child_fiber.rs | 2 +- 2 files changed, 7 insertions(+), 20 deletions(-) diff --git a/examples/hello-world/src/App.tsx b/examples/hello-world/src/App.tsx index 84830df..f70ebe1 100644 --- a/examples/hello-world/src/App.tsx +++ b/examples/hello-world/src/App.tsx @@ -3,30 +3,17 @@ import {useState} from 'react' function App() { const [num, updateNum] = useState(0); - const isOdd = num % 2 === 1; - - const before = [ -
  • 1
  • , -
  • 2
  • , -
  • 3
  • , -
  • 4
  • - ]; - const after = [ - //
  • 4
  • , -
  • 2
  • , -
  • 3
  • , -
  • 1
  • - ]; - - const listToUse = isOdd ? after : before; - console.log(num, listToUse) return ( ); } diff --git a/packages/react-reconciler/src/child_fiber.rs b/packages/react-reconciler/src/child_fiber.rs index 4c28126..740f4e0 100644 --- a/packages/react-reconciler/src/child_fiber.rs +++ b/packages/react-reconciler/src/child_fiber.rs @@ -215,7 +215,7 @@ fn update_from_map( } } let before = existing_children.get(&Key(key_to_use.clone())).clone(); - if type_of(element, "string") { + if type_of(element, "string") || type_of(element, "number") { let props = create_props_with_content(element.clone()); if before.is_some() { let before = (*before.clone().unwrap()).clone(); From e66852a8d3d68fb743b0199e0434c7d082fc53c3 Mon Sep 17 00:00:00 2001 From: youxingzhi Date: Sat, 11 May 2024 16:49:40 +0800 Subject: [PATCH 2/6] update readme --- readme.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/readme.md b/readme.md index c6ff4d1..34860a4 100644 --- a/readme.md +++ b/readme.md @@ -29,3 +29,7 @@ [从零实现 React v18,但 WASM 版 - [10] 实现单节点更新流程](https://www.paradeto.com/2024/04/26/big-react-wasm-10/) [从零实现 React v18,但 WASM 版 - [11] 实现事件系统](https://www.paradeto.com/2024/04/30/big-react-wasm-11/) + +[从零实现 React v18,但 WASM 版 - [12] 实现多节点更新流程](https://www.paradeto.com/2024/05/07/big-react-wasm-12/) + +[从零实现 React v18,但 WASM 版 - [13] 引入 Lane 模型,实现 Batch Update](https://www.paradeto.com/2024/05/11/big-react-wasm-13/) From 881fa88a7e9b7b1db3fd7a2b49d9e0359496848e Mon Sep 17 00:00:00 2001 From: youxingzhi Date: Wed, 15 May 2024 11:22:57 +0800 Subject: [PATCH 3/6] blog-14: add min heap --- Cargo.lock | 8 ++ Cargo.toml | 1 + packages/react-reconciler/tests/web.rs | 13 --- packages/scheduler/.gitignore | 1 + packages/scheduler/Cargo.toml | 11 ++ packages/scheduler/src/heap.rs | 135 +++++++++++++++++++++++++ packages/scheduler/src/lib.rs | 103 +++++++++++++++++++ packages/scheduler/tests/web.rs | 30 ++++++ 8 files changed, 289 insertions(+), 13 deletions(-) delete mode 100644 packages/react-reconciler/tests/web.rs create mode 100644 packages/scheduler/.gitignore create mode 100644 packages/scheduler/Cargo.toml create mode 100644 packages/scheduler/src/heap.rs create mode 100644 packages/scheduler/src/lib.rs create mode 100644 packages/scheduler/tests/web.rs diff --git a/Cargo.lock b/Cargo.lock index 5c73bd3..8089cb4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -538,6 +538,14 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" +[[package]] +name = "scheduler" +version = "0.1.0" +dependencies = [ + "wasm-bindgen", + "web-sys", +] + [[package]] name = "scoped-tls" version = "1.0.1" diff --git a/Cargo.toml b/Cargo.toml index 3270eac..9253f16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,5 +3,6 @@ members = [ "packages/react", "packages/react-dom", "packages/react-reconciler", + "packages/scheduler", "packages/shared" ] diff --git a/packages/react-reconciler/tests/web.rs b/packages/react-reconciler/tests/web.rs deleted file mode 100644 index de5c1da..0000000 --- a/packages/react-reconciler/tests/web.rs +++ /dev/null @@ -1,13 +0,0 @@ -//! Test suite for the Web and headless browsers. - -#![cfg(target_arch = "wasm32")] - -extern crate wasm_bindgen_test; -use wasm_bindgen_test::*; - -wasm_bindgen_test_configure!(run_in_browser); - -#[wasm_bindgen_test] -fn pass() { - assert_eq!(1 + 1, 2); -} diff --git a/packages/scheduler/.gitignore b/packages/scheduler/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/packages/scheduler/.gitignore @@ -0,0 +1 @@ +/target diff --git a/packages/scheduler/Cargo.toml b/packages/scheduler/Cargo.toml new file mode 100644 index 0000000..50fd5c6 --- /dev/null +++ b/packages/scheduler/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "scheduler" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +wasm-bindgen = "0.2.84" +web-sys = "0.3.69" + diff --git a/packages/scheduler/src/heap.rs b/packages/scheduler/src/heap.rs new file mode 100644 index 0000000..39c0990 --- /dev/null +++ b/packages/scheduler/src/heap.rs @@ -0,0 +1,135 @@ +use std::any::Any; + +pub trait Comparable { + // self lower than Self, return true + fn compare(&self, b: &dyn Comparable) -> bool; + fn as_any(&self) -> &dyn Any; +} + +fn push(mut heap: &mut Vec>, node: Box) { + heap.push(node); + sift_up(heap, heap.len() - 1); +} + +fn peek(heap: &Vec>) -> Option<&Box> { + if heap.is_empty() { + return None; + } + return Some(&heap[0]); +} + +fn pop(mut heap: &mut Vec>) -> Option> { + if heap.is_empty() { + None + } else { + let min = heap.swap_remove(0); + if !heap.is_empty() { + bubble_down(heap, 0); + } + Some(min) + } +} + +fn bubble_down(mut heap: &mut Vec>, index: usize) { + let mut parent = index; + + loop { + let mut child = 2 * parent + 1; + if child >= heap.len() { + break; + } + if child + 1 < heap.len() && heap[child + 1].compare(&*heap[child]) { + child += 1; + } + if heap[parent].compare(&*heap[child]) { + break; + } + heap.swap(parent, child); + parent = child; + } +} + +fn sift_up(mut heap: &mut Vec>, i: usize) { + let mut child = i; + if child <= 0 { + return; + } + let mut parent = (child - 1) / 2; + + while child > 0 && !&heap[parent].compare(&*heap[child]) { + heap.swap(parent, child); + child = parent; + parent = ((child as isize - 1) / 2) as usize; + } +} + +#[cfg(test)] +mod tests { + use std::any::Any; + + use crate::heap::{Comparable, pop, push}; + + struct Task { + id: u32, + sort_index: f64, + } + + impl Task { + fn new(id: u32, sort_index: f64) -> Self { + Self { + id, + sort_index, + } + } + } + + // 实现 Task 的 Debug trait,以便在测试失败时能够打印 Task 的值。 + impl std::fmt::Debug for Task { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "Task {{ id: {}, sort_index: {} }}", self.id, self.sort_index) + } + } + + // 实现 Task 的 PartialEq trait,以便能够在断言中比较 Task 的值。 + impl PartialEq for Task { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } + } + + impl Comparable for Task { + fn compare(&self, other: &dyn Comparable) -> bool { + let other = other.as_any().downcast_ref::().unwrap(); + let diff = self.sort_index - other.sort_index; + if diff != 0.0 { + return diff < 0.0; + } + (self.id as i32 - other.id as i32) < 0 + } + + fn as_any(&self) -> &dyn Any { + self + } + } + + #[test] + fn test_min_heap() { + let mut heap = &mut vec![]; + + + // 添加任务到堆中 + push(heap, Box::new(Task::new(1, 2.0))); + push(heap, Box::new(Task::new(2, 1.0))); + push(heap, Box::new(Task::new(3, 3.0))); + + // 按预期顺序弹出任务 + assert_eq!(pop(heap).unwrap().as_any().downcast_ref::().unwrap().id, 2); + assert_eq!(pop(heap).unwrap().as_any().downcast_ref::().unwrap().id, 1); + assert_eq!(pop(heap).unwrap().as_any().downcast_ref::().unwrap().id, 3); + + // 堆应该为空 + assert!(heap.pop().is_none()); + } +} + + diff --git a/packages/scheduler/src/lib.rs b/packages/scheduler/src/lib.rs new file mode 100644 index 0000000..0b5daa6 --- /dev/null +++ b/packages/scheduler/src/lib.rs @@ -0,0 +1,103 @@ +use std::cmp::PartialEq; + +use wasm_bindgen::prelude::*; +use web_sys::js_sys::Function; + +use crate::heap::Comparable; + +mod heap; + +static FRAME_YIELD_MS: u32 = 5; +static mut TASK_ID_COUNTER: u32 = 1; +static mut TASK_QUEUE: Vec = vec![]; + +#[derive(Clone)] +enum Priority { + Normal = 3 +} + +#[wasm_bindgen] +extern "C" { + type Performance; + + #[wasm_bindgen(static_method_of = Performance, catch, js_namespace = performance, js_name = now)] + fn now() -> Result; + + #[wasm_bindgen(js_namespace = Date, js_name = now)] + fn date_now() -> f64; +} + +struct Task { + id: u32, + callback: Function, + priority_level: Priority, + start_time: f64, + expiration_time: f64, + sort_index: f64, +} + +impl Task { + fn new(callback: Function, priority_level: Priority, start_time: f64, expiration_time: f64) -> Self { + unsafe { + let s = Self { + id: TASK_ID_COUNTER, + callback, + priority_level, + start_time, + expiration_time, + sort_index: -1.0, + }; + TASK_ID_COUNTER += TASK_ID_COUNTER; + s + } + } +} + +impl PartialEq for Task { + fn eq(&self, other: &Task) -> bool { + self.id == other.id + } +} + +// impl Comparable for Task { +// fn compare(&self, b: &dyn Comparable) -> bool { +// let diff = self.sort_index - b.sort_index; +// if diff != 0.0 { +// return diff < 0.0; +// } +// (self.id - b.id) < 0 +// } +// } + + +fn unstable_now() -> f64 { + Performance::now().unwrap_or_else(|_| date_now()) +} + +fn get_priority_timeout(priority_level: Priority) -> f64 { + match priority_level { + Priority::Normal => 5000.0 + } +} + +fn _unstable_schedule_callback(priority_level: Priority, callback: Function, delay: f64) { + let current_time = unstable_now(); + let mut start_time = current_time; + + if delay > 0.0 { + start_time += delay; + } + + let timeout = get_priority_timeout(priority_level.clone()); + let expiration_time = start_time + timeout; + let mut new_task = Task::new(callback, priority_level.clone(), start_time, expiration_time); + + if start_time > current_time { + new_task.sort_index = start_time; + } +} + +fn unstable_schedule_callback_no_delay(priority_level: Priority, callback: Function) { + _unstable_schedule_callback(priority_level, callback, 0.0) +} + diff --git a/packages/scheduler/tests/web.rs b/packages/scheduler/tests/web.rs new file mode 100644 index 0000000..9d7a049 --- /dev/null +++ b/packages/scheduler/tests/web.rs @@ -0,0 +1,30 @@ +//! Test suite for the Web and headless browsers. + +#![cfg(target_arch = "wasm32")] + +extern crate wasm_bindgen_test; + +use wasm_bindgen_test::*; +use web_sys::js_sys::Function; + +wasm_bindgen_test_configure!(run_in_browser); + +#[wasm_bindgen_test] +fn pass() { + // 使用假的 Function 实例,因为我们在这里不会真的调用它 + let fake_callback = Function::new_no_args(""); + + let start_time = 0.0; + // 添加任务到堆中 + push(Task::new(fake_callback.clone(), Priority::Normal, start_time, 1.0)); + push(Task::new(fake_callback.clone(), Priority::Normal, start_time, 2.0)); + push(Task::new(fake_callback, Priority::Normal, start_time, 3.0)); + + // 按预期顺序弹出任务 + assert_eq!(TASK_QUEUE.pop().unwrap().id, 1); + assert_eq!(TASK_QUEUE.pop().unwrap().id, 2); + assert_eq!(TASK_QUEUE.pop().unwrap().id, 3); + + // 堆应该为空 + assert!(TASK_QUEUE.pop().is_none()); +} From 19825804e47d98e0fb1854d0b5dc3cf6f1e82607 Mon Sep 17 00:00:00 2001 From: youxingzhi Date: Wed, 15 May 2024 18:56:44 +0800 Subject: [PATCH 4/6] blog-14: scheduler --- packages/scheduler/Cargo.toml | 2 +- packages/scheduler/src/heap.rs | 83 ++++++--- packages/scheduler/src/lib.rs | 319 ++++++++++++++++++++++++++++++--- 3 files changed, 358 insertions(+), 46 deletions(-) diff --git a/packages/scheduler/Cargo.toml b/packages/scheduler/Cargo.toml index 50fd5c6..1f6d89e 100644 --- a/packages/scheduler/Cargo.toml +++ b/packages/scheduler/Cargo.toml @@ -7,5 +7,5 @@ edition = "2021" [dependencies] wasm-bindgen = "0.2.84" -web-sys = "0.3.69" +web-sys = { version = "0.3.69", features = ["MessagePort", "MessageChannel"] } diff --git a/packages/scheduler/src/heap.rs b/packages/scheduler/src/heap.rs index 39c0990..387818f 100644 --- a/packages/scheduler/src/heap.rs +++ b/packages/scheduler/src/heap.rs @@ -4,21 +4,22 @@ pub trait Comparable { // self lower than Self, return true fn compare(&self, b: &dyn Comparable) -> bool; fn as_any(&self) -> &dyn Any; + fn as_mut_any(&mut self) -> &mut dyn Any; } -fn push(mut heap: &mut Vec>, node: Box) { +pub fn push(mut heap: &mut Vec>, node: Box) { heap.push(node); sift_up(heap, heap.len() - 1); } -fn peek(heap: &Vec>) -> Option<&Box> { +pub fn peek(heap: &mut Vec>) -> Option<&mut Box> { if heap.is_empty() { return None; } - return Some(&heap[0]); + return Some(&mut heap[0]); } -fn pop(mut heap: &mut Vec>) -> Option> { +pub fn pop(mut heap: &mut Vec>) -> Option> { if heap.is_empty() { None } else { @@ -69,6 +70,7 @@ mod tests { use crate::heap::{Comparable, pop, push}; + #[derive(Clone)] struct Task { id: u32, sort_index: f64, @@ -76,21 +78,20 @@ mod tests { impl Task { fn new(id: u32, sort_index: f64) -> Self { - Self { - id, - sort_index, - } + Self { id, sort_index } } } - // 实现 Task 的 Debug trait,以便在测试失败时能够打印 Task 的值。 impl std::fmt::Debug for Task { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "Task {{ id: {}, sort_index: {} }}", self.id, self.sort_index) + write!( + f, + "Task {{ id: {}, sort_index: {} }}", + self.id, self.sort_index + ) } } - // 实现 Task 的 PartialEq trait,以便能够在断言中比较 Task 的值。 impl PartialEq for Task { fn eq(&self, other: &Self) -> bool { self.id == other.id @@ -110,26 +111,64 @@ mod tests { fn as_any(&self) -> &dyn Any { self } + + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } } #[test] fn test_min_heap() { let mut heap = &mut vec![]; - + let task3 = Task::new(3, 3.0); + let task2 = Task::new(2, 2.0); + let task1 = Task::new(1, 1.0); + let task4 = Task::new(4, 4.0); // 添加任务到堆中 - push(heap, Box::new(Task::new(1, 2.0))); - push(heap, Box::new(Task::new(2, 1.0))); - push(heap, Box::new(Task::new(3, 3.0))); + push(heap, Box::new(task3.clone())); + push(heap, Box::new(task2.clone())); + push(heap, Box::new(task1.clone())); + push(heap, Box::new(task4.clone())); // 按预期顺序弹出任务 - assert_eq!(pop(heap).unwrap().as_any().downcast_ref::().unwrap().id, 2); - assert_eq!(pop(heap).unwrap().as_any().downcast_ref::().unwrap().id, 1); - assert_eq!(pop(heap).unwrap().as_any().downcast_ref::().unwrap().id, 3); - + assert_eq!( + pop(heap) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + == &task1, + true + ); + assert_eq!( + pop(heap) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + == &task2, + true + ); + assert_eq!( + pop(heap) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + == &task3, + true + ); + assert_eq!( + pop(heap) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + == &task4, + true + ); // 堆应该为空 - assert!(heap.pop().is_none()); + assert!(pop(heap).is_none()); } } - - diff --git a/packages/scheduler/src/lib.rs b/packages/scheduler/src/lib.rs index 0b5daa6..b9dafba 100644 --- a/packages/scheduler/src/lib.rs +++ b/packages/scheduler/src/lib.rs @@ -1,19 +1,33 @@ +use std::any::Any; use std::cmp::PartialEq; use wasm_bindgen::prelude::*; -use web_sys::js_sys::Function; +use web_sys::{MessageChannel, MessagePort}; +use web_sys::js_sys::{Function, global, Reflect}; -use crate::heap::Comparable; +use crate::heap::{Comparable, peek, pop, push}; mod heap; -static FRAME_YIELD_MS: u32 = 5; +static FRAME_YIELD_MS: f64 = 5.0; static mut TASK_ID_COUNTER: u32 = 1; -static mut TASK_QUEUE: Vec = vec![]; +static mut TASK_QUEUE: Vec> = vec![]; +static mut TIMER_QUEUE: Vec> = vec![]; +static mut IS_HOST_TIMEOUT_SCHEDULED: bool = false; +static mut IS_HOST_CALLBACK_SCHEDULED: bool = false; +static mut IS_PERFORMING_WORK: bool = false; +static mut TASK_TIMEOUT_ID: f64 = -1.0; +static mut SCHEDULED_HOST_CALLBACK: Option bool> = None; +static mut IS_MESSAGE_LOOP_RUNNING: bool = false; +static mut MESSAGE_CHANNEL: Option = None; +static mut MESSAGE_CHANNEL_LISTENED: bool = false; +static mut START_TIME: f64 = -1.0; +static mut CURRENT_PRIORITY_LEVEL: Priority = Priority::Normal; +static mut CURRENT_TASK: Option<&mut Box> = None; #[derive(Clone)] enum Priority { - Normal = 3 + Normal = 3, } #[wasm_bindgen] @@ -22,14 +36,21 @@ extern "C" { #[wasm_bindgen(static_method_of = Performance, catch, js_namespace = performance, js_name = now)] fn now() -> Result; - + #[wasm_bindgen] + fn clearTimeout(id: f64); + #[wasm_bindgen] + fn setTimeout(closure: &Function, timeout: f64) -> f64; #[wasm_bindgen(js_namespace = Date, js_name = now)] fn date_now() -> f64; + + #[wasm_bindgen] + fn setImmediate(f: &Function); } +#[derive(Clone)] struct Task { id: u32, - callback: Function, + callback: JsValue, priority_level: Priority, start_time: f64, expiration_time: f64, @@ -37,11 +58,16 @@ struct Task { } impl Task { - fn new(callback: Function, priority_level: Priority, start_time: f64, expiration_time: f64) -> Self { + fn new( + callback: Function, + priority_level: Priority, + start_time: f64, + expiration_time: f64, + ) -> Self { unsafe { let s = Self { id: TASK_ID_COUNTER, - callback, + callback: JsValue::from(callback), priority_level, start_time, expiration_time, @@ -59,16 +85,24 @@ impl PartialEq for Task { } } -// impl Comparable for Task { -// fn compare(&self, b: &dyn Comparable) -> bool { -// let diff = self.sort_index - b.sort_index; -// if diff != 0.0 { -// return diff < 0.0; -// } -// (self.id - b.id) < 0 -// } -// } +impl Comparable for Task { + fn compare(&self, other: &dyn Comparable) -> bool { + let other = other.as_any().downcast_ref::().unwrap(); + let diff = self.sort_index - other.sort_index; + if diff != 0.0 { + return diff < 0.0; + } + (self.id as i32 - other.id as i32) < 0 + } + + fn as_any(&self) -> &dyn Any { + self + } + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } +} fn unstable_now() -> f64 { Performance::now().unwrap_or_else(|_| date_now()) @@ -76,7 +110,226 @@ fn unstable_now() -> f64 { fn get_priority_timeout(priority_level: Priority) -> f64 { match priority_level { - Priority::Normal => 5000.0 + Priority::Normal => 5000.0, + } +} + +fn cancel_host_timeout() { + unsafe { + clearTimeout(TASK_TIMEOUT_ID); + TASK_TIMEOUT_ID = -1.0; + } +} + +pub fn schedule_perform_work_until_deadline() { + let perform_work_closure = + Closure::wrap(Box::new(perform_work_until_deadline) as Box); + let perform_work_function = perform_work_closure + .as_ref() + .unchecked_ref::() + .clone(); + // let schedule_closure = Closure::wrap(Box::new(schedule_perform_work_until_deadline) as Box); + + if Reflect::get(&global(), &JsValue::from_str("setImmediate")).is_ok() { + setImmediate(&perform_work_function); + } else if let Ok(message_channel) = MessageChannel::new() { + unsafe { + let initialized = false; + if MESSAGE_CHANNEL.is_none() { + MESSAGE_CHANNEL = Some(message_channel); + } + let mc = MESSAGE_CHANNEL.as_ref().unwrap(); + let port: MessagePort = message_channel.port2(); + if !MESSAGE_CHANNEL_LISTENED { + let on_message_closure = + Closure::wrap( + Box::new(move || perform_work_until_deadline()) as Box + ); + port.set_onmessage(Some(&on_message_closure.as_ref().unchecked_ref())); + on_message_closure.forget(); + } + port.post_message(&JsValue::null()); + } + } else { + setTimeout(&perform_work_function, 0.0); + } + + perform_work_closure.forget(); +} + +fn perform_work_until_deadline() { + unsafe { + if SCHEDULED_HOST_CALLBACK.is_some() { + let scheduled_host_callback = SCHEDULED_HOST_CALLBACK.unwrap(); + let current_time = unstable_now(); + + START_TIME = current_time; + let has_time_remaining = true; + let has_more_work = scheduled_host_callback(has_time_remaining, current_time); + if has_more_work { + schedule_perform_work_until_deadline(); + } else { + IS_MESSAGE_LOOP_RUNNING = false; + SCHEDULED_HOST_CALLBACK = None; + } + } else { + IS_MESSAGE_LOOP_RUNNING = false + } + } +} + +fn advance_timers(current_time: f64) { + unsafe { + let mut timer = peek(&mut TIMER_QUEUE); + while timer.is_some() { + let task = timer.unwrap().as_mut_any().downcast_mut::().unwrap(); + if task.callback.is_null() { + pop(&mut TIMER_QUEUE); + } else if task.start_time <= current_time { + pop(&mut TIMER_QUEUE); + task.sort_index = task.expiration_time; + push(&mut TASK_QUEUE, Box::new(task.clone())); + } else { + return; + } + timer = peek(&mut TIMER_QUEUE); + } + } +} + +fn flush_work(has_time_remaining: bool, initial_time: f64) -> bool { + unsafe { + IS_HOST_CALLBACK_SCHEDULED = false; + if IS_HOST_TIMEOUT_SCHEDULED { + IS_HOST_TIMEOUT_SCHEDULED = false; + cancel_host_timeout(); + } + + IS_PERFORMING_WORK = true; + let previous_priority_level = CURRENT_PRIORITY_LEVEL.clone(); + return true; + } +} + +fn should_yield_to_host() -> bool { + unsafe { + let time_elapsed = unstable_now() - START_TIME; + if time_elapsed < FRAME_YIELD_MS { + return false; + } + } + return true; +} + +fn work_loop(has_time_remaining: bool, initial_time: f64) -> bool { + unsafe { + let mut current_time = initial_time; + advance_timers(current_time); + CURRENT_TASK = peek(&mut TASK_QUEUE); + while CURRENT_TASK.is_some() { + let mut t = CURRENT_TASK + .as_ref() + .unwrap() + .as_any() + .downcast_mut::() + .unwrap(); + if t.expiration_time > current_time && (!has_time_remaining || should_yield_to_host()) { + break; + } + + let callback = &t.callback; + if callback.is_function() { + t.callback = JsValue::null(); + CURRENT_TASK = Some(&mut (Box::new(t.clone()) as Box)); + CURRENT_PRIORITY_LEVEL = t.priority_level.clone(); + let did_user_callback_timeout = t.expiration_time <= current_time; + let continuation_callback = callback + .dyn_ref::() + .unwrap() + .call1(JsValue::null(), did_user_callback_timeout)?; + current_time = unstable_now(); + + if continuation_callback.is_function() { + t.callback = continuation_callback; + CURRENT_TASK = Some(&mut (Box::new(t.clone()) as Box)); + } else { + if CURRENT_TASK == peek(&mut TASK_QUEUE) { + pop(&mut TASK_QUEUE); + } + } + + advance_timers(current_time); + } else { + pop(&mut TASK_QUEUE); + } + + CURRENT_TASK = peek(&mut TASK_QUEUE); + } + + + if CURRENT_TASK.is_some() { + return true; + } else { + let first_timer = peek(&mut TIMER_QUEUE); + if first_timer.is_some() { + let task = first_timer + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + request_host_timeout(handle_timeout, task.start_time - current_time); + } + + return false; + } + } +} + +fn request_host_callback(callback: fn(bool, f64) -> bool) { + unsafe { + SCHEDULED_HOST_CALLBACK = Some(callback); + if !IS_MESSAGE_LOOP_RUNNING { + IS_MESSAGE_LOOP_RUNNING = true; + schedule_perform_work_until_deadline(); + } + } +} + +fn handle_timeout(current_time: f64) { + unsafe { + IS_HOST_TIMEOUT_SCHEDULED = false; + advance_timers(current_time); + + if !IS_HOST_TIMEOUT_SCHEDULED { + if peek(&mut TASK_QUEUE).is_some() { + IS_HOST_CALLBACK_SCHEDULED = true; + request_host_callback(flush_work); + } else { + let first_timer = peek(&mut TIMER_QUEUE); + if first_timer.is_some() { + let first_timer_task = first_timer + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + request_host_timeout( + handle_timeout, + first_timer_task.start_time - current_time, + ); + } + } + } + } +} + +fn request_host_timeout(callback: fn(f64), ms: f64) { + unsafe { + let closure = Closure::wrap(Box::new(move || { + callback(unstable_now()); + }) as Box); + let function = closure.as_ref().unchecked_ref::().clone(); + closure.forget(); + TASK_TIMEOUT_ID = setTimeout(&function, ms); } } @@ -90,14 +343,34 @@ fn _unstable_schedule_callback(priority_level: Priority, callback: Function, del let timeout = get_priority_timeout(priority_level.clone()); let expiration_time = start_time + timeout; - let mut new_task = Task::new(callback, priority_level.clone(), start_time, expiration_time); + let mut new_task = Task::new( + callback, + priority_level.clone(), + start_time, + expiration_time, + ); + unsafe { + if start_time > current_time { + new_task.sort_index = start_time; + push(&mut TIMER_QUEUE, Box::new(new_task.clone())); - if start_time > current_time { - new_task.sort_index = start_time; + if peek(&mut TASK_QUEUE).is_none() { + if let Some(task) = peek(&mut TASK_QUEUE) { + let task = task.as_any().downcast_ref::().unwrap(); + if task == &new_task { + if (IS_HOST_TIMEOUT_SCHEDULED) { + cancel_host_timeout(); + } else { + IS_HOST_TIMEOUT_SCHEDULED = true; + } + request_host_timeout(handle_timeout, start_time - current_time); + } + } + } + } } } fn unstable_schedule_callback_no_delay(priority_level: Priority, callback: Function) { _unstable_schedule_callback(priority_level, callback, 0.0) } - From 4e194f5689e8214005b0140fb94b021f6736c1fc Mon Sep 17 00:00:00 2001 From: youxingzhi Date: Thu, 16 May 2024 15:49:45 +0800 Subject: [PATCH 5/6] blog-14: scheduler --- Cargo.lock | 2 + examples/hello-world/src/main.tsx | 43 +++++- packages/react-dom/Cargo.toml | 1 + packages/react-dom/src/lib.rs | 20 +++ packages/scheduler/Cargo.toml | 2 + packages/scheduler/src/lib.rs | 213 +++++++++++++++++++++++------- 6 files changed, 230 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8089cb4..e86c6c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -507,6 +507,7 @@ dependencies = [ "gloo", "js-sys", "react-reconciler", + "scheduler", "shared", "wasm-bindgen", "wasm-bindgen-test", @@ -542,6 +543,7 @@ checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" name = "scheduler" version = "0.1.0" dependencies = [ + "shared", "wasm-bindgen", "web-sys", ] diff --git a/examples/hello-world/src/main.tsx b/examples/hello-world/src/main.tsx index 8f70611..f5c0c12 100644 --- a/examples/hello-world/src/main.tsx +++ b/examples/hello-world/src/main.tsx @@ -1,6 +1,41 @@ -import {createRoot} from 'react-dom' -import App from './App.tsx' +// import App from './App.tsx' +// +// const root = createRoot(document.getElementById("root")) +// root.render() +import {Priority, scheduleCallback, shouldYieldToHost} from 'react-dom' -const root = createRoot(document.getElementById("root")) -root.render() + +// scheduleCallback(2, function func1() { +// console.log('1') +// }) +// +// const taskId = scheduleCallback(1, function func2() { +// console.log('2') +// }) + +// cancelCallback(taskId) + + +function func2(didTimeout) { + console.log(didTimeout) + if (!didTimeout) console.log(2) +} + +function func1() { + console.log(1) + return func2 +} + +scheduleCallback(Priority.NormalPriority, func1) + +function work() { + while (!shouldYieldToHost()) { + console.log('work') + } + console.log('yield to host') +} + +scheduleCallback(1, function func2() { + work() +}) diff --git a/packages/react-dom/Cargo.toml b/packages/react-dom/Cargo.toml index 23b4c4b..8a22f0a 100644 --- a/packages/react-dom/Cargo.toml +++ b/packages/react-dom/Cargo.toml @@ -15,6 +15,7 @@ wasm-bindgen = "0.2.84" web-sys = { version = "0.3.69", features = ["console", "Window", "Document", "Text", "Element", "EventListener"] } react-reconciler = { path = "../react-reconciler" } shared = { path = "../shared" } +scheduler = { path = "../scheduler" } # The `console_error_panic_hook` crate provides better debugging of panics by # logging them with `console.error`. This is great for development, but requires # all the `std::fmt` and `std::panicking` infrastructure, so isn't great for diff --git a/packages/react-dom/src/lib.rs b/packages/react-dom/src/lib.rs index 84b5cd4..835efde 100644 --- a/packages/react-dom/src/lib.rs +++ b/packages/react-dom/src/lib.rs @@ -1,9 +1,11 @@ use std::rc::Rc; +use js_sys::{Array, Function}; use wasm_bindgen::prelude::*; use web_sys::Node; use react_reconciler::Reconciler; +use scheduler::{Priority, unstable_cancel_callback, unstable_schedule_callback as origin_unstable_schedule_callback, unstable_should_yield_to_host}; use crate::host_config::ReactDomHostConfig; use crate::renderer::Renderer; @@ -28,3 +30,21 @@ pub fn create_root(container: &JsValue) -> Renderer { let renderer = Renderer::new(root, reconciler, container); renderer } + +#[wasm_bindgen(js_name = scheduleCallback, variadic)] +pub fn unstable_schedule_callback(priority_level: Priority, callback: Function, delay: &JsValue) -> u32 { + let delay = delay.dyn_ref::().unwrap(); + let d = delay.get(0).as_f64().unwrap_or_else(|| 0.0); + origin_unstable_schedule_callback(priority_level, callback, d) +} + +#[wasm_bindgen(js_name = cancelCallback)] +pub fn cancel_callback(id: u32) { + unstable_cancel_callback(id) +} + +#[wasm_bindgen(js_name = shouldYieldToHost)] +pub fn should_yield_to_host() -> bool { + unstable_should_yield_to_host() +} + diff --git a/packages/scheduler/Cargo.toml b/packages/scheduler/Cargo.toml index 1f6d89e..e1597ae 100644 --- a/packages/scheduler/Cargo.toml +++ b/packages/scheduler/Cargo.toml @@ -8,4 +8,6 @@ edition = "2021" [dependencies] wasm-bindgen = "0.2.84" web-sys = { version = "0.3.69", features = ["MessagePort", "MessageChannel"] } +shared = { path = "../shared" } + diff --git a/packages/scheduler/src/lib.rs b/packages/scheduler/src/lib.rs index b9dafba..fa2cd7e 100644 --- a/packages/scheduler/src/lib.rs +++ b/packages/scheduler/src/lib.rs @@ -3,7 +3,9 @@ use std::cmp::PartialEq; use wasm_bindgen::prelude::*; use web_sys::{MessageChannel, MessagePort}; -use web_sys::js_sys::{Function, global, Reflect}; +use web_sys::js_sys::{Function, global}; + +use shared::log; use crate::heap::{Comparable, peek, pop, push}; @@ -20,20 +22,27 @@ static mut TASK_TIMEOUT_ID: f64 = -1.0; static mut SCHEDULED_HOST_CALLBACK: Option bool> = None; static mut IS_MESSAGE_LOOP_RUNNING: bool = false; static mut MESSAGE_CHANNEL: Option = None; -static mut MESSAGE_CHANNEL_LISTENED: bool = false; +// static mut MESSAGE_CHANNEL_LISTENED: bool = false; static mut START_TIME: f64 = -1.0; -static mut CURRENT_PRIORITY_LEVEL: Priority = Priority::Normal; +static mut CURRENT_PRIORITY_LEVEL: Priority = Priority::NormalPriority; static mut CURRENT_TASK: Option<&mut Box> = None; +static mut PORT1: Option = None; +static mut PORT2: Option = None; -#[derive(Clone)] -enum Priority { - Normal = 3, +#[derive(Clone, Debug)] +#[wasm_bindgen] +pub enum Priority { + ImmediatePriority = 1, + UserBlockingPriority = 2, + NormalPriority = 3, + LowPriority = 4, + IdlePriority = 5, } #[wasm_bindgen] extern "C" { type Performance; - + type Global; #[wasm_bindgen(static_method_of = Performance, catch, js_namespace = performance, js_name = now)] fn now() -> Result; #[wasm_bindgen] @@ -45,9 +54,12 @@ extern "C" { #[wasm_bindgen] fn setImmediate(f: &Function); + + #[wasm_bindgen(method, getter, js_name = setImmediate)] + fn hasSetImmediate(this: &Global) -> JsValue; } -#[derive(Clone)] +#[derive(Clone, Debug)] struct Task { id: u32, callback: JsValue, @@ -110,7 +122,11 @@ fn unstable_now() -> f64 { fn get_priority_timeout(priority_level: Priority) -> f64 { match priority_level { - Priority::Normal => 5000.0, + Priority::NormalPriority => 5000.0, + Priority::ImmediatePriority => -1.0, + Priority::UserBlockingPriority => 250.0, + Priority::IdlePriority => 1073741823.0, + Priority::LowPriority => 10000.0, } } @@ -130,25 +146,27 @@ pub fn schedule_perform_work_until_deadline() { .clone(); // let schedule_closure = Closure::wrap(Box::new(schedule_perform_work_until_deadline) as Box); - if Reflect::get(&global(), &JsValue::from_str("setImmediate")).is_ok() { + if global() + .unchecked_into::() + .hasSetImmediate() + .is_function() + { setImmediate(&perform_work_function); } else if let Ok(message_channel) = MessageChannel::new() { unsafe { - let initialized = false; - if MESSAGE_CHANNEL.is_none() { - MESSAGE_CHANNEL = Some(message_channel); + if PORT1.is_none() { + PORT1 = Some(message_channel.port1()); + PORT2 = Some(message_channel.port2()) } - let mc = MESSAGE_CHANNEL.as_ref().unwrap(); - let port: MessagePort = message_channel.port2(); - if !MESSAGE_CHANNEL_LISTENED { - let on_message_closure = - Closure::wrap( - Box::new(move || perform_work_until_deadline()) as Box - ); - port.set_onmessage(Some(&on_message_closure.as_ref().unchecked_ref())); - on_message_closure.forget(); - } - port.post_message(&JsValue::null()); + PORT1 + .as_ref() + .unwrap() + .set_onmessage(Some(&perform_work_function)); + PORT2 + .as_ref() + .unwrap() + .post_message(&JsValue::null()) + .expect("port post message panic"); } } else { setTimeout(&perform_work_function, 0.0); @@ -178,6 +196,41 @@ fn perform_work_until_deadline() { } } +/** +static mut MY_V: Vec> = vec![]; + +#[derive(Debug)] +struct Task { + id: f64, +} + +fn peek<'a>(v: &'a mut Vec>) -> &'a Box { + &v[0] +} + +fn pop<'a>(v: &'a mut Vec>) -> Box { + let t = v.swap_remove(0); + t +} + +fn main() { + unsafe { + MY_V = vec![Box::new(Task { + id: 10000.0 + })]; + + let t = peek(&mut MY_V); + + println!("{:?}", t); + + pop(&mut MY_V); + // let a = pop(&mut MY_V); + + println!("{:?}", t); + }; +} + + */ fn advance_timers(current_time: f64) { unsafe { let mut timer = peek(&mut TIMER_QUEUE); @@ -186,7 +239,7 @@ fn advance_timers(current_time: f64) { if task.callback.is_null() { pop(&mut TIMER_QUEUE); } else if task.start_time <= current_time { - pop(&mut TIMER_QUEUE); + let t = pop(&mut TIMER_QUEUE); task.sort_index = task.expiration_time; push(&mut TASK_QUEUE, Box::new(task.clone())); } else { @@ -201,17 +254,28 @@ fn flush_work(has_time_remaining: bool, initial_time: f64) -> bool { unsafe { IS_HOST_CALLBACK_SCHEDULED = false; if IS_HOST_TIMEOUT_SCHEDULED { + log!("IS_HOST_TIMEOUT_SCHEDULED"); IS_HOST_TIMEOUT_SCHEDULED = false; cancel_host_timeout(); } IS_PERFORMING_WORK = true; let previous_priority_level = CURRENT_PRIORITY_LEVEL.clone(); - return true; + + let has_more = work_loop(has_time_remaining, initial_time).unwrap_or_else(|_| { + log!("work_loop error"); + false + }); + + CURRENT_TASK = None; + CURRENT_PRIORITY_LEVEL = previous_priority_level.clone(); + IS_PERFORMING_WORK = false; + + return has_more; } } -fn should_yield_to_host() -> bool { +pub fn unstable_should_yield_to_host() -> bool { unsafe { let time_elapsed = unstable_now() - START_TIME; if time_elapsed < FRAME_YIELD_MS { @@ -221,41 +285,61 @@ fn should_yield_to_host() -> bool { return true; } -fn work_loop(has_time_remaining: bool, initial_time: f64) -> bool { +fn work_loop(has_time_remaining: bool, initial_time: f64) -> Result { unsafe { let mut current_time = initial_time; advance_timers(current_time); - CURRENT_TASK = peek(&mut TASK_QUEUE); - while CURRENT_TASK.is_some() { - let mut t = CURRENT_TASK - .as_ref() + let mut current_task = peek(&mut TASK_QUEUE); + log!( + "current_task {:?}", + current_task.as_ref() .unwrap() .as_any() + .downcast_ref::() + .unwrap() + ); + + CURRENT_TASK = peek(&mut TASK_QUEUE); + while current_task.is_some() { + let mut t = current_task + .unwrap() + .as_mut_any() .downcast_mut::() .unwrap(); - if t.expiration_time > current_time && (!has_time_remaining || should_yield_to_host()) { + if t.expiration_time > current_time && (!has_time_remaining || unstable_should_yield_to_host()) { break; } - let callback = &t.callback; + let callback = t.callback.clone(); if callback.is_function() { t.callback = JsValue::null(); - CURRENT_TASK = Some(&mut (Box::new(t.clone()) as Box)); + // CURRENT_TASK = Some(&mut (Box::new(t.clone()) as Box)); CURRENT_PRIORITY_LEVEL = t.priority_level.clone(); let did_user_callback_timeout = t.expiration_time <= current_time; let continuation_callback = callback .dyn_ref::() .unwrap() - .call1(JsValue::null(), did_user_callback_timeout)?; + .call1(&JsValue::null(), &JsValue::from(did_user_callback_timeout))?; current_time = unstable_now(); if continuation_callback.is_function() { t.callback = continuation_callback; - CURRENT_TASK = Some(&mut (Box::new(t.clone()) as Box)); + // let mut boxed_t = Box::new(t.clone()) as Box; + // CURRENT_TASK = Some(&mut boxed_t.clone()); } else { - if CURRENT_TASK == peek(&mut TASK_QUEUE) { + if match peek(&mut TASK_QUEUE) { + None => false, + Some(task) => { + let task = task.as_any().downcast_ref::().unwrap(); + log!("{:?} {:?} {:?}", task, t, task == t); + task == t + } + } { pop(&mut TASK_QUEUE); } + // if t == peek(&mut TASK_QUEUE) { + // pop(&mut TASK_QUEUE); + // } } advance_timers(current_time); @@ -263,14 +347,15 @@ fn work_loop(has_time_remaining: bool, initial_time: f64) -> bool { pop(&mut TASK_QUEUE); } + current_task = peek(&mut TASK_QUEUE); CURRENT_TASK = peek(&mut TASK_QUEUE); } - if CURRENT_TASK.is_some() { - return true; + return Ok(true); } else { let first_timer = peek(&mut TIMER_QUEUE); + log!("request_host_timeout"); if first_timer.is_some() { let task = first_timer .unwrap() @@ -280,7 +365,7 @@ fn work_loop(has_time_remaining: bool, initial_time: f64) -> bool { request_host_timeout(handle_timeout, task.start_time - current_time); } - return false; + return Ok(false); } } } @@ -301,10 +386,14 @@ fn handle_timeout(current_time: f64) { advance_timers(current_time); if !IS_HOST_TIMEOUT_SCHEDULED { + log!("handle_timeout0 {:?}", TASK_QUEUE.len()); if peek(&mut TASK_QUEUE).is_some() { + log!("handle_timeout1"); IS_HOST_CALLBACK_SCHEDULED = true; request_host_callback(flush_work); } else { + log!("handle_timeout2"); + let first_timer = peek(&mut TIMER_QUEUE); if first_timer.is_some() { let first_timer_task = first_timer @@ -333,14 +422,33 @@ fn request_host_timeout(callback: fn(f64), ms: f64) { } } -fn _unstable_schedule_callback(priority_level: Priority, callback: Function, delay: f64) { +pub fn unstable_cancel_callback(id: u32) { + unsafe { + for mut task in &mut TASK_QUEUE { + let task = task.as_mut_any().downcast_mut::().unwrap(); + if task.id == id { + task.callback = JsValue::null(); + } + } + + for mut task in &mut TIMER_QUEUE { + let task = task.as_mut_any().downcast_mut::().unwrap(); + if task.id == id { + task.callback = JsValue::null(); + } + } + } +} + +pub fn unstable_schedule_callback(priority_level: Priority, callback: Function, delay: f64) -> u32 { let current_time = unstable_now(); let mut start_time = current_time; - + log!("starttime {:?} {:?} {:?}", start_time, delay, start_time + delay); if delay > 0.0 { start_time += delay; } + let timeout = get_priority_timeout(priority_level.clone()); let expiration_time = start_time + timeout; let mut new_task = Task::new( @@ -349,16 +457,17 @@ fn _unstable_schedule_callback(priority_level: Priority, callback: Function, del start_time, expiration_time, ); + let id = new_task.id; unsafe { if start_time > current_time { new_task.sort_index = start_time; push(&mut TIMER_QUEUE, Box::new(new_task.clone())); if peek(&mut TASK_QUEUE).is_none() { - if let Some(task) = peek(&mut TASK_QUEUE) { + if let Some(task) = peek(&mut TIMER_QUEUE) { let task = task.as_any().downcast_ref::().unwrap(); if task == &new_task { - if (IS_HOST_TIMEOUT_SCHEDULED) { + if IS_HOST_TIMEOUT_SCHEDULED { cancel_host_timeout(); } else { IS_HOST_TIMEOUT_SCHEDULED = true; @@ -367,10 +476,20 @@ fn _unstable_schedule_callback(priority_level: Priority, callback: Function, del } } } + } else { + new_task.sort_index = expiration_time; + push(&mut TASK_QUEUE, Box::new(new_task)); + + if !IS_HOST_CALLBACK_SCHEDULED && !IS_PERFORMING_WORK { + IS_HOST_CALLBACK_SCHEDULED = true; + request_host_callback(flush_work); + } } } + + id } -fn unstable_schedule_callback_no_delay(priority_level: Priority, callback: Function) { - _unstable_schedule_callback(priority_level, callback, 0.0) +pub fn unstable_schedule_callback_no_delay(priority_level: Priority, callback: Function) -> u32 { + unstable_schedule_callback(priority_level, callback, 0.0) } From abf1766b44c385f6dbf889693dcdfc78fd4b8cf5 Mon Sep 17 00:00:00 2001 From: youxingzhi Date: Fri, 17 May 2024 11:21:33 +0800 Subject: [PATCH 6/6] blog-14: refactor scheduler --- packages/scheduler/src/heap.rs | 191 ++++++------ packages/scheduler/src/lib.rs | 122 ++++---- packages/scheduler/src/scheduler.rs | 432 ++++++++++++++++++++++++++++ 3 files changed, 573 insertions(+), 172 deletions(-) create mode 100644 packages/scheduler/src/scheduler.rs diff --git a/packages/scheduler/src/heap.rs b/packages/scheduler/src/heap.rs index 387818f..e066aa3 100644 --- a/packages/scheduler/src/heap.rs +++ b/packages/scheduler/src/heap.rs @@ -1,74 +1,84 @@ -use std::any::Any; - -pub trait Comparable { - // self lower than Self, return true - fn compare(&self, b: &dyn Comparable) -> bool; - fn as_any(&self) -> &dyn Any; - fn as_mut_any(&mut self) -> &mut dyn Any; -} - -pub fn push(mut heap: &mut Vec>, node: Box) { - heap.push(node); +// 向堆中插入元素 +pub fn push(heap: &mut Vec, value: T) { + heap.push(value); sift_up(heap, heap.len() - 1); } -pub fn peek(heap: &mut Vec>) -> Option<&mut Box> { +// 从堆中取出最小的元素 +pub fn pop(heap: &mut Vec) -> Option { if heap.is_empty() { return None; } - return Some(&mut heap[0]); + let last_index = heap.len() - 1; + heap.swap(0, last_index); + let result = heap.pop(); + if !heap.is_empty() { + sift_down(heap, 0); + } + result } -pub fn pop(mut heap: &mut Vec>) -> Option> { - if heap.is_empty() { - None - } else { - let min = heap.swap_remove(0); - if !heap.is_empty() { - bubble_down(heap, 0); +// 向上调整堆 +fn sift_up(heap: &mut Vec, mut index: usize) { + while index != 0 { + let parent = (index - 1) / 2; + if heap[parent] <= heap[index] { + break; } - Some(min) + heap.swap(parent, index); + index = parent; } } -fn bubble_down(mut heap: &mut Vec>, index: usize) { - let mut parent = index; - +// 向下调整堆 +fn sift_down(heap: &mut Vec, mut index: usize) { + let len = heap.len(); loop { - let mut child = 2 * parent + 1; - if child >= heap.len() { - break; + let left_child = index * 2 + 1; + let right_child = left_child + 1; + + // 找出当前节点和它的子节点中最小的节点 + let mut smallest = index; + if left_child < len && heap[left_child] < heap[smallest] { + smallest = left_child; } - if child + 1 < heap.len() && heap[child + 1].compare(&*heap[child]) { - child += 1; + if right_child < len && heap[right_child] < heap[smallest] { + smallest = right_child; } - if heap[parent].compare(&*heap[child]) { + + // 如果当前节点是最小的,那么堆已经是正确的了 + if smallest == index { break; } - heap.swap(parent, child); - parent = child; + + // 否则,交换当前节点和最小的节点 + heap.swap(index, smallest); + index = smallest; } } -fn sift_up(mut heap: &mut Vec>, i: usize) { - let mut child = i; - if child <= 0 { - return; - } - let mut parent = (child - 1) / 2; +pub fn peek(heap: &Vec) -> Option<&T> { + heap.get(0) +} + +pub fn is_empty(heap: &Vec) -> bool { + heap.is_empty() +} - while child > 0 && !&heap[parent].compare(&*heap[child]) { - heap.swap(parent, child); - child = parent; - parent = ((child as isize - 1) / 2) as usize; +pub fn peek_mut(heap: &mut Vec) -> Option<&mut T> { + if heap.is_empty() { + None + } else { + Some(&mut heap[0]) } } + #[cfg(test)] mod tests { - use std::any::Any; + use std::cmp::Ordering; - use crate::heap::{Comparable, pop, push}; + use crate::heap::{pop, push}; #[derive(Clone)] struct Task { @@ -92,83 +102,64 @@ mod tests { } } + impl Eq for Task {} + impl PartialEq for Task { fn eq(&self, other: &Self) -> bool { - self.id == other.id + self.id.cmp(&other.id) == Ordering::Equal } } - impl Comparable for Task { - fn compare(&self, other: &dyn Comparable) -> bool { - let other = other.as_any().downcast_ref::().unwrap(); - let diff = self.sort_index - other.sort_index; - if diff != 0.0 { - return diff < 0.0; + impl PartialOrd for Task { + fn partial_cmp(&self, other: &Self) -> Option { + let mut sort_index_ordering; + + if self.sort_index.is_nan() { + if other.sort_index.is_nan() { + sort_index_ordering = Ordering::Equal + } else { + sort_index_ordering = Ordering::Less + } + } else if other.sort_index.is_nan() { + sort_index_ordering = (Ordering::Greater) + } else { + sort_index_ordering = self.sort_index.partial_cmp(&other.sort_index).unwrap() } - (self.id as i32 - other.id as i32) < 0 - } - fn as_any(&self) -> &dyn Any { - self + if sort_index_ordering != Ordering::Equal { + return Some(sort_index_ordering); + } + return self.id.partial_cmp(&other.id); } + } - fn as_mut_any(&mut self) -> &mut dyn Any { - self + impl Ord for Task { + fn cmp(&self, other: &Self) -> Ordering { + self.partial_cmp(other).unwrap_or(Ordering::Equal) } } #[test] fn test_min_heap() { - let mut heap = &mut vec![]; + let mut heap = vec![]; let task3 = Task::new(3, 3.0); let task2 = Task::new(2, 2.0); let task1 = Task::new(1, 1.0); let task4 = Task::new(4, 4.0); - // 添加任务到堆中 - push(heap, Box::new(task3.clone())); - push(heap, Box::new(task2.clone())); - push(heap, Box::new(task1.clone())); - push(heap, Box::new(task4.clone())); + + push(&mut heap, task3); + push(&mut heap, task2); + push(&mut heap, task1); + push(&mut heap, task4); // 按预期顺序弹出任务 - assert_eq!( - pop(heap) - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - == &task1, - true - ); - assert_eq!( - pop(heap) - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - == &task2, - true - ); - assert_eq!( - pop(heap) - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - == &task3, - true - ); - assert_eq!( - pop(heap) - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - == &task4, - true - ); + assert_eq!(pop(&mut heap).unwrap().id == 1, true); + assert_eq!(pop(&mut heap).unwrap().id == 2, true); + assert_eq!(pop(&mut heap).unwrap().id == 3, true); + assert_eq!(pop(&mut heap).unwrap().id == 4, true); + // 堆应该为空 - assert!(pop(heap).is_none()); + assert!(heap.pop().is_none()); } } diff --git a/packages/scheduler/src/lib.rs b/packages/scheduler/src/lib.rs index fa2cd7e..b428580 100644 --- a/packages/scheduler/src/lib.rs +++ b/packages/scheduler/src/lib.rs @@ -1,5 +1,5 @@ use std::any::Any; -use std::cmp::PartialEq; +use std::cmp::{Ordering, PartialEq}; use wasm_bindgen::prelude::*; use web_sys::{MessageChannel, MessagePort}; @@ -7,14 +7,14 @@ use web_sys::js_sys::{Function, global}; use shared::log; -use crate::heap::{Comparable, peek, pop, push}; +use crate::heap::{peek, peek_mut, pop, push}; mod heap; static FRAME_YIELD_MS: f64 = 5.0; static mut TASK_ID_COUNTER: u32 = 1; -static mut TASK_QUEUE: Vec> = vec![]; -static mut TIMER_QUEUE: Vec> = vec![]; +static mut TASK_QUEUE: Vec = vec![]; +static mut TIMER_QUEUE: Vec = vec![]; static mut IS_HOST_TIMEOUT_SCHEDULED: bool = false; static mut IS_HOST_CALLBACK_SCHEDULED: bool = false; static mut IS_PERFORMING_WORK: bool = false; @@ -25,7 +25,7 @@ static mut MESSAGE_CHANNEL: Option = None; // static mut MESSAGE_CHANNEL_LISTENED: bool = false; static mut START_TIME: f64 = -1.0; static mut CURRENT_PRIORITY_LEVEL: Priority = Priority::NormalPriority; -static mut CURRENT_TASK: Option<&mut Box> = None; +static mut CURRENT_TASK: Option<&Task> = None; static mut PORT1: Option = None; static mut PORT2: Option = None; @@ -91,28 +91,40 @@ impl Task { } } +impl Eq for Task {} + impl PartialEq for Task { - fn eq(&self, other: &Task) -> bool { - self.id == other.id + fn eq(&self, other: &Self) -> bool { + self.id.cmp(&other.id) == Ordering::Equal } } -impl Comparable for Task { - fn compare(&self, other: &dyn Comparable) -> bool { - let other = other.as_any().downcast_ref::().unwrap(); - let diff = self.sort_index - other.sort_index; - if diff != 0.0 { - return diff < 0.0; +impl PartialOrd for Task { + fn partial_cmp(&self, other: &Self) -> Option { + let mut sort_index_ordering; + + if self.sort_index.is_nan() { + if other.sort_index.is_nan() { + sort_index_ordering = Ordering::Equal + } else { + sort_index_ordering = Ordering::Less + } + } else if other.sort_index.is_nan() { + sort_index_ordering = (Ordering::Greater) + } else { + sort_index_ordering = self.sort_index.partial_cmp(&other.sort_index).unwrap() } - (self.id as i32 - other.id as i32) < 0 - } - fn as_any(&self) -> &dyn Any { - self + if sort_index_ordering != Ordering::Equal { + return Some(sort_index_ordering); + } + return self.id.partial_cmp(&other.id); } +} - fn as_mut_any(&mut self) -> &mut dyn Any { - self +impl Ord for Task { + fn cmp(&self, other: &Self) -> Ordering { + self.partial_cmp(other).unwrap_or(Ordering::Equal) } } @@ -233,19 +245,19 @@ fn main() { */ fn advance_timers(current_time: f64) { unsafe { - let mut timer = peek(&mut TIMER_QUEUE); + let mut timer = peek_mut(&mut TIMER_QUEUE); while timer.is_some() { - let task = timer.unwrap().as_mut_any().downcast_mut::().unwrap(); + let task = timer.unwrap(); if task.callback.is_null() { pop(&mut TIMER_QUEUE); } else if task.start_time <= current_time { let t = pop(&mut TIMER_QUEUE); task.sort_index = task.expiration_time; - push(&mut TASK_QUEUE, Box::new(task.clone())); + push(&mut TASK_QUEUE, task.clone()); } else { return; } - timer = peek(&mut TIMER_QUEUE); + timer = peek_mut(&mut TIMER_QUEUE); } } } @@ -254,7 +266,6 @@ fn flush_work(has_time_remaining: bool, initial_time: f64) -> bool { unsafe { IS_HOST_CALLBACK_SCHEDULED = false; if IS_HOST_TIMEOUT_SCHEDULED { - log!("IS_HOST_TIMEOUT_SCHEDULED"); IS_HOST_TIMEOUT_SCHEDULED = false; cancel_host_timeout(); } @@ -289,31 +300,21 @@ fn work_loop(has_time_remaining: bool, initial_time: f64) -> Result() - .unwrap() - ); + let mut current_task = peek_mut(&mut TASK_QUEUE); CURRENT_TASK = peek(&mut TASK_QUEUE); while current_task.is_some() { - let mut t = current_task - .unwrap() - .as_mut_any() - .downcast_mut::() - .unwrap(); - if t.expiration_time > current_time && (!has_time_remaining || unstable_should_yield_to_host()) { + let mut t = current_task.unwrap(); + + if t.expiration_time > current_time + && (!has_time_remaining || unstable_should_yield_to_host()) + { break; } let callback = t.callback.clone(); if callback.is_function() { t.callback = JsValue::null(); - // CURRENT_TASK = Some(&mut (Box::new(t.clone()) as Box)); CURRENT_PRIORITY_LEVEL = t.priority_level.clone(); let did_user_callback_timeout = t.expiration_time <= current_time; let continuation_callback = callback @@ -324,22 +325,15 @@ fn work_loop(has_time_remaining: bool, initial_time: f64) -> Result; - // CURRENT_TASK = Some(&mut boxed_t.clone()); } else { - if match peek(&mut TASK_QUEUE) { + if match peek(&TASK_QUEUE) { None => false, Some(task) => { - let task = task.as_any().downcast_ref::().unwrap(); - log!("{:?} {:?} {:?}", task, t, task == t); task == t } } { pop(&mut TASK_QUEUE); } - // if t == peek(&mut TASK_QUEUE) { - // pop(&mut TASK_QUEUE); - // } } advance_timers(current_time); @@ -347,21 +341,17 @@ fn work_loop(has_time_remaining: bool, initial_time: f64) -> Result() - .unwrap(); + let task = first_timer.unwrap(); + request_host_timeout(handle_timeout, task.start_time - current_time); } @@ -386,21 +376,13 @@ fn handle_timeout(current_time: f64) { advance_timers(current_time); if !IS_HOST_TIMEOUT_SCHEDULED { - log!("handle_timeout0 {:?}", TASK_QUEUE.len()); if peek(&mut TASK_QUEUE).is_some() { - log!("handle_timeout1"); IS_HOST_CALLBACK_SCHEDULED = true; request_host_callback(flush_work); } else { - log!("handle_timeout2"); - let first_timer = peek(&mut TIMER_QUEUE); if first_timer.is_some() { - let first_timer_task = first_timer - .unwrap() - .as_any() - .downcast_ref::() - .unwrap(); + let first_timer_task = first_timer.unwrap(); request_host_timeout( handle_timeout, first_timer_task.start_time - current_time, @@ -425,14 +407,12 @@ fn request_host_timeout(callback: fn(f64), ms: f64) { pub fn unstable_cancel_callback(id: u32) { unsafe { for mut task in &mut TASK_QUEUE { - let task = task.as_mut_any().downcast_mut::().unwrap(); if task.id == id { task.callback = JsValue::null(); } } for mut task in &mut TIMER_QUEUE { - let task = task.as_mut_any().downcast_mut::().unwrap(); if task.id == id { task.callback = JsValue::null(); } @@ -443,12 +423,11 @@ pub fn unstable_cancel_callback(id: u32) { pub fn unstable_schedule_callback(priority_level: Priority, callback: Function, delay: f64) -> u32 { let current_time = unstable_now(); let mut start_time = current_time; - log!("starttime {:?} {:?} {:?}", start_time, delay, start_time + delay); + if delay > 0.0 { start_time += delay; } - let timeout = get_priority_timeout(priority_level.clone()); let expiration_time = start_time + timeout; let mut new_task = Task::new( @@ -461,11 +440,10 @@ pub fn unstable_schedule_callback(priority_level: Priority, callback: Function, unsafe { if start_time > current_time { new_task.sort_index = start_time; - push(&mut TIMER_QUEUE, Box::new(new_task.clone())); + push(&mut TIMER_QUEUE, new_task.clone()); if peek(&mut TASK_QUEUE).is_none() { if let Some(task) = peek(&mut TIMER_QUEUE) { - let task = task.as_any().downcast_ref::().unwrap(); if task == &new_task { if IS_HOST_TIMEOUT_SCHEDULED { cancel_host_timeout(); @@ -478,7 +456,7 @@ pub fn unstable_schedule_callback(priority_level: Priority, callback: Function, } } else { new_task.sort_index = expiration_time; - push(&mut TASK_QUEUE, Box::new(new_task)); + push(&mut TASK_QUEUE, new_task); if !IS_HOST_CALLBACK_SCHEDULED && !IS_PERFORMING_WORK { IS_HOST_CALLBACK_SCHEDULED = true; diff --git a/packages/scheduler/src/scheduler.rs b/packages/scheduler/src/scheduler.rs new file mode 100644 index 0000000..c7ffa24 --- /dev/null +++ b/packages/scheduler/src/scheduler.rs @@ -0,0 +1,432 @@ +use wasm_bindgen::JsValue; +use wasm_bindgen::prelude::*; +use web_sys::{MessageChannel, MessagePort}; +use web_sys::js_sys::Function; + +#[wasm_bindgen] +extern "C" { + type Performance; + type Global; + #[wasm_bindgen(static_method_of = Performance, catch, js_namespace = performance, js_name = now)] + fn now() -> Result; + #[wasm_bindgen] + fn clearTimeout(id: f64); + #[wasm_bindgen] + fn setTimeout(closure: &Function, timeout: f64) -> f64; + #[wasm_bindgen(js_namespace = Date, js_name = now)] + fn date_now() -> f64; + + #[wasm_bindgen] + fn setImmediate(f: &Function); + + #[wasm_bindgen(method, getter, js_name = setImmediate)] + fn hasSetImmediate(this: &Global) -> JsValue; +} + +#[derive(Clone, Debug)] +#[wasm_bindgen] +pub enum Priority { + ImmediatePriority = 1, + UserBlockingPriority = 2, + NormalPriority = 3, + LowPriority = 4, + IdlePriority = 5, +} + +static FRAME_YIELD_MS: f64 = 5.0; + + +#[derive(Clone, Debug)] +struct Task { + id: u32, + callback: JsValue, + priority_level: Priority, + start_time: f64, + expiration_time: f64, + sort_index: f64, +} + +impl Task { + fn new( + id: u32, + callback: Function, + priority_level: Priority, + start_time: f64, + expiration_time: f64, + ) -> Self { + Self { + id, + callback: JsValue::from(callback), + priority_level, + start_time, + expiration_time, + sort_index: -1.0, + } + } +} + +impl PartialEq for Task { + fn eq(&self, other: &Task) -> bool { + self.id == other.id + } +} + +struct Scheduler<'a> { + task_id_counter: u32, + task_queue: Vec, + timer_queue: Vec, + is_host_timeout_scheduled: bool, + is_host_callback_scheduled: bool, + is_performing_work: bool, + task_timeout_id: f64, + scheduled_host_callback: Option bool>, + is_message_loop_running: bool, + message_channel: Option, + start_time: f64, + current_priority_level: Priority, + current_task: Option<&'a Task>, + port1: Option, + port2: Option, +} + +fn unstable_now() -> f64 { + Performance::now().unwrap_or_else(|_| date_now()) +} + +fn get_priority_timeout(priority_level: Priority) -> f64 { + match priority_level { + Priority::NormalPriority => 5000.0, + Priority::ImmediatePriority => -1.0, + Priority::UserBlockingPriority => 250.0, + Priority::IdlePriority => 1073741823.0, + Priority::LowPriority => 10000.0, + } +} + +impl Scheduler { + fn cancel_host_timeout(&mut self) { + clearTimeout(self.task_timeout_id); + self.task_timeout_id = -1.0; + } + + pub fn schedule_perform_work_until_deadline(&self) { + let perform_work_closure = + Closure::wrap(Box::new(self.perform_work_until_deadline) as Box); + let perform_work_function = perform_work_closure + .as_ref() + .unchecked_ref::() + .clone(); + // let schedule_closure = Closure::wrap(Box::new(schedule_perform_work_until_deadline) as Box); + + if global() + .unchecked_into::() + .hasSetImmediate() + .is_function() + { + setImmediate(&perform_work_function); + } else if let Ok(message_channel) = MessageChannel::new() { + unsafe { + if PORT1.is_none() { + PORT1 = Some(message_channel.port1()); + PORT2 = Some(message_channel.port2()) + } + PORT1 + .as_ref() + .unwrap() + .set_onmessage(Some(&perform_work_function)); + PORT2 + .as_ref() + .unwrap() + .post_message(&JsValue::null()) + .expect("port post message panic"); + } + } else { + setTimeout(&perform_work_function, 0.0); + } + + perform_work_closure.forget(); + } + + fn perform_work_until_deadline(&self) { + // unsafe { + // if SCHEDULED_HOST_CALLBACK.is_some() { + // let scheduled_host_callback = SCHEDULED_HOST_CALLBACK.unwrap(); + // let current_time = unstable_now(); + // + // START_TIME = current_time; + // let has_time_remaining = true; + // let has_more_work = scheduled_host_callback(has_time_remaining, current_time); + // if has_more_work { + // schedule_perform_work_until_deadline(); + // } else { + // IS_MESSAGE_LOOP_RUNNING = false; + // SCHEDULED_HOST_CALLBACK = None; + // } + // } else { + // IS_MESSAGE_LOOP_RUNNING = false + // } + // } + } + fn advance_timers(current_time: f64) { + unsafe { + let mut timer = peek(&mut TIMER_QUEUE); + while timer.is_some() { + let task = timer.unwrap().as_mut_any().downcast_mut::().unwrap(); + if task.callback.is_null() { + pop(&mut TIMER_QUEUE); + } else if task.start_time <= current_time { + let t = pop(&mut TIMER_QUEUE); + task.sort_index = task.expiration_time; + push(&mut TASK_QUEUE, Box::new(task.clone())); + } else { + return; + } + timer = peek(&mut TIMER_QUEUE); + } + } + } + + fn flush_work(has_time_remaining: bool, initial_time: f64) -> bool { + unsafe { + IS_HOST_CALLBACK_SCHEDULED = false; + if IS_HOST_TIMEOUT_SCHEDULED { + log!("IS_HOST_TIMEOUT_SCHEDULED"); + IS_HOST_TIMEOUT_SCHEDULED = false; + cancel_host_timeout(); + } + + IS_PERFORMING_WORK = true; + let previous_priority_level = CURRENT_PRIORITY_LEVEL.clone(); + + let has_more = work_loop(has_time_remaining, initial_time).unwrap_or_else(|_| { + log!("work_loop error"); + false + }); + + CURRENT_TASK = None; + CURRENT_PRIORITY_LEVEL = previous_priority_level.clone(); + IS_PERFORMING_WORK = false; + + return has_more; + } + } + + pub fn unstable_should_yield_to_host() -> bool { + unsafe { + let time_elapsed = unstable_now() - START_TIME; + if time_elapsed < FRAME_YIELD_MS { + return false; + } + } + return true; + } + + fn work_loop(has_time_remaining: bool, initial_time: f64) -> Result { + unsafe { + let mut current_time = initial_time; + advance_timers(current_time); + let mut current_task = peek(&mut TASK_QUEUE); + log!( + "current_task {:?}", + current_task.as_ref() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + ); + + CURRENT_TASK = peek(&mut TASK_QUEUE); + while current_task.is_some() { + let mut t = current_task + .unwrap() + .as_mut_any() + .downcast_mut::() + .unwrap(); + if t.expiration_time > current_time && (!has_time_remaining || unstable_should_yield_to_host()) { + break; + } + + let callback = t.callback.clone(); + if callback.is_function() { + t.callback = JsValue::null(); + // CURRENT_TASK = Some(&mut (Box::new(t.clone()) as Box)); + CURRENT_PRIORITY_LEVEL = t.priority_level.clone(); + let did_user_callback_timeout = t.expiration_time <= current_time; + let continuation_callback = callback + .dyn_ref::() + .unwrap() + .call1(&JsValue::null(), &JsValue::from(did_user_callback_timeout))?; + current_time = unstable_now(); + + if continuation_callback.is_function() { + t.callback = continuation_callback; + // let mut boxed_t = Box::new(t.clone()) as Box; + // CURRENT_TASK = Some(&mut boxed_t.clone()); + } else { + if match peek(&mut TASK_QUEUE) { + None => false, + Some(task) => { + let task = task.as_any().downcast_ref::().unwrap(); + log!("{:?} {:?} {:?}", task, t, task == t); + task == t + } + } { + pop(&mut TASK_QUEUE); + } + // if t == peek(&mut TASK_QUEUE) { + // pop(&mut TASK_QUEUE); + // } + } + + advance_timers(current_time); + } else { + pop(&mut TASK_QUEUE); + } + + current_task = peek(&mut TASK_QUEUE); + CURRENT_TASK = peek(&mut TASK_QUEUE); + } + + if CURRENT_TASK.is_some() { + return Ok(true); + } else { + let first_timer = peek(&mut TIMER_QUEUE); + log!("request_host_timeout"); + if first_timer.is_some() { + let task = first_timer + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + request_host_timeout(handle_timeout, task.start_time - current_time); + } + + return Ok(false); + } + } + } + + fn request_host_callback(callback: fn(bool, f64) -> bool) { + unsafe { + SCHEDULED_HOST_CALLBACK = Some(callback); + if !IS_MESSAGE_LOOP_RUNNING { + IS_MESSAGE_LOOP_RUNNING = true; + schedule_perform_work_until_deadline(); + } + } + } + + fn handle_timeout(current_time: f64) { + unsafe { + IS_HOST_TIMEOUT_SCHEDULED = false; + advance_timers(current_time); + + if !IS_HOST_TIMEOUT_SCHEDULED { + log!("handle_timeout0 {:?}", TASK_QUEUE.len()); + if peek(&mut TASK_QUEUE).is_some() { + log!("handle_timeout1"); + IS_HOST_CALLBACK_SCHEDULED = true; + request_host_callback(flush_work); + } else { + log!("handle_timeout2"); + + let first_timer = peek(&mut TIMER_QUEUE); + if first_timer.is_some() { + let first_timer_task = first_timer + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + request_host_timeout( + handle_timeout, + first_timer_task.start_time - current_time, + ); + } + } + } + } + } + + fn request_host_timeout(callback: fn(f64), ms: f64) { + unsafe { + let closure = Closure::wrap(Box::new(move || { + callback(unstable_now()); + }) as Box); + let function = closure.as_ref().unchecked_ref::().clone(); + closure.forget(); + TASK_TIMEOUT_ID = setTimeout(&function, ms); + } + } + + pub fn unstable_cancel_callback(id: u32) { + unsafe { + for mut task in &mut TASK_QUEUE { + let task = task.as_mut_any().downcast_mut::().unwrap(); + if task.id == id { + task.callback = JsValue::null(); + } + } + + for mut task in &mut TIMER_QUEUE { + let task = task.as_mut_any().downcast_mut::().unwrap(); + if task.id == id { + task.callback = JsValue::null(); + } + } + } + } + + pub fn unstable_schedule_callback(&self, priority_level: Priority, callback: Function, delay: f64) -> u32 { + let current_time = unstable_now(); + let mut start_time = current_time; + log!("starttime {:?} {:?} {:?}", start_time, delay, start_time + delay); + if delay > 0.0 { + start_time += delay; + } + + + let timeout = get_priority_timeout(priority_level.clone()); + let expiration_time = start_time + timeout; + let mut new_task = Task::new( + callback, + priority_level.clone(), + start_time, + expiration_time, + ); + let id = new_task.id; + unsafe { + if start_time > current_time { + new_task.sort_index = start_time; + push(&mut TIMER_QUEUE, Box::new(new_task.clone())); + + if peek(&mut TASK_QUEUE).is_none() { + if let Some(task) = peek(&mut TIMER_QUEUE) { + let task = task.as_any().downcast_ref::().unwrap(); + if task == &new_task { + if IS_HOST_TIMEOUT_SCHEDULED { + cancel_host_timeout(); + } else { + IS_HOST_TIMEOUT_SCHEDULED = true; + } + request_host_timeout(handle_timeout, start_time - current_time); + } + } + } + } else { + new_task.sort_index = expiration_time; + push(&mut TASK_QUEUE, Box::new(new_task)); + + if !IS_HOST_CALLBACK_SCHEDULED && !IS_PERFORMING_WORK { + IS_HOST_CALLBACK_SCHEDULED = true; + request_host_callback(flush_work); + } + } + } + + id + } + + pub fn unstable_schedule_callback_no_delay(&self, priority_level: Priority, callback: Function) -> u32 { + self.unstable_schedule_callback(priority_level, callback, 0.0) + } +} \ No newline at end of file