Skip to content

Commit

Permalink
wip: debug, and handle suspensions in the main callback resolution loop
Browse files Browse the repository at this point in the history
  • Loading branch information
c-cube committed Feb 11, 2024
1 parent c135d03 commit 360e634
Showing 1 changed file with 62 additions and 51 deletions.
113 changes: 62 additions & 51 deletions src/core/lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,6 @@ struct

type backtrace = Printexc.raw_backtrace


(* Promises proper. *)

type ('a, 'u, 'c) promise = {
Expand Down Expand Up @@ -532,6 +531,10 @@ struct
end
open Main_internal_types

(* This effect is used, on OCaml 5.xx, to implement [await]. Unlike regular
promise chaining using [bind] and the likes, [await] operates
purely within a function execution tied to a single promise.
*)
type _ Effect.t +=
| Await : 'a callbacks -> 'a Effect.t

Expand Down Expand Up @@ -571,6 +574,7 @@ struct
let state_of_result = function
| Ok x -> Fulfilled x
| Error exn ->
(* NOTE: this is new and makes [Obj.magic] a non-starter. *)
let bt = Printexc.get_raw_backtrace () in
Rejected (exn, bt)
end
Expand Down Expand Up @@ -1009,7 +1013,6 @@ struct
end
open Pending_callbacks


let await (p: 'a t) : 'a =
let Internal p = Public_types.to_internal_promise p in
let p = underlying p in
Expand All @@ -1019,6 +1022,17 @@ let await (p: 'a t) : 'a =
| Pending cbs ->
Effect.perform (Await cbs)

let new_pending_promise ~how_to_cancel : _ promise =
let state =
Pending {
regular_callbacks = Regular_callback_list_empty;
cancel_callbacks = Cancel_callback_list_empty;
how_to_cancel;
cleanups_deferred = 0;
}
in
{state}

module Resolution_loop :
sig
(* All user-provided callbacks are called by Lwt only through this module. It
Expand Down Expand Up @@ -1053,6 +1067,8 @@ sig

val run_with_effect : (unit -> unit) -> unit

val run_in_resolution_loop : (unit -> 'a) -> 'a

(* Public interface *)
exception Canceled

Expand Down Expand Up @@ -1157,7 +1173,29 @@ struct
with exn when Exception_filter.run exn ->
!async_exception_hook exn

type task =
| Deferred : 'a callbacks * 'a resolved_state -> task
| Fiber : ('a, unit) Effect.Deep.continuation * 'a resolved_state -> task

let tasks : task Queue.t = Queue.create ()

let run_with_effect (f: unit -> unit) : unit =
let eff_handler = {
Effect.Deep.effc=function
| Await cbs ->
Some (fun k ->
add_implicitly_removed_callback cbs
(fun x ->
(* postpone the resuming of the continuation *)
Queue.push (Fiber (k, x)) tasks))
| _ -> None
}
in
Effect.Deep.try_with
f () eff_handler

let handle_with_effect_and_async_exception_hook f v =
run_with_effect (fun () -> handle_with_async_exception_hook f v)

exception Canceled

Expand All @@ -1182,7 +1220,7 @@ struct
iter_list rest
| Cancel_callback_list_callback (storage, f) ->
current_storage := storage;
handle_with_async_exception_hook f ();
handle_with_effect_and_async_exception_hook f ();
iter_list rest
| Cancel_callback_list_remove_sequence_node node ->
Lwt_sequence.remove node;
Expand Down Expand Up @@ -1239,54 +1277,36 @@ struct
run_cancel_callbacks callbacks.cancel_callbacks;
run_regular_callbacks callbacks.regular_callbacks



let default_maximum_callback_nesting_depth = 42

let current_callback_nesting_depth = ref 0

type deferred_callbacks =
Deferred : ('a callbacks * 'a resolved_state) -> deferred_callbacks
[@@ocaml.unboxed]

let deferred_callbacks : deferred_callbacks Queue.t = Queue.create ()

(* Before entering a resolution loop, it is necessary to take a snapshot of
the current state of sequence-associated storage. This is because many of
the callbacks that will be run will modify the storage. The storage is
restored to the snapshot when the resolution loop is exited. *)
let enter_resolution_loop () =
current_callback_nesting_depth := !current_callback_nesting_depth + 1;
Printf.printf "enter resolution loop (depth=%d)\n%!" !current_callback_nesting_depth;
let storage_snapshot = !current_storage in
storage_snapshot

let leave_resolution_loop (storage_snapshot : storage) : unit =
if !current_callback_nesting_depth = 1 then begin
while not (Queue.is_empty deferred_callbacks) do
let Deferred (callbacks, result) = Queue.pop deferred_callbacks in
run_callbacks callbacks result
while not (Queue.is_empty tasks) do
Printf.printf "run loop iteration\n%!";
match Queue.pop tasks with
| Deferred (callbacks, result) -> run_callbacks callbacks result
| Fiber (k, x) ->
( match x with
| Fulfilled x -> Effect.Deep.continue k x
| Rejected (exn, bt) -> Effect.Deep.discontinue_with_backtrace k exn bt)
done
end;
Printf.printf "done with loop\n%!";
current_callback_nesting_depth := !current_callback_nesting_depth - 1;
current_storage := storage_snapshot

let eff_handler : unit Effect.Deep.effect_handler = {
Effect.Deep.effc=function
| Await cbs ->
Some (fun k ->
add_implicitly_removed_callback cbs
(fun x -> match x with
| Fulfilled x -> Effect.Deep.continue k x
| Rejected (exn, _) -> Effect.Deep.discontinue k exn
)
)
| _ -> None
}

let run_with_effect (f: unit -> unit) : unit =
Effect.Deep.try_with
f () eff_handler

let run_in_resolution_loop f =
let storage_snapshot = enter_resolution_loop () in
let result = f () in
Expand Down Expand Up @@ -1316,11 +1336,10 @@ struct
in

if should_defer then
Queue.push (Deferred (callbacks, result)) deferred_callbacks
Queue.push (Deferred (callbacks, result)) tasks
else
run_with_effect (fun () ->
run_in_resolution_loop (fun () ->
run_callbacks callbacks result))
run_callbacks callbacks result)

let resolve ?allow_deferring ?maximum_callback_nesting_depth p result =
let Pending callbacks = p.state in
Expand Down Expand Up @@ -1358,7 +1377,7 @@ struct
}
in
Queue.push
(Deferred (deferred_record, deferred_result)) deferred_callbacks;
(Deferred (deferred_record, deferred_result)) tasks;
immediate_result
end
else
Expand Down Expand Up @@ -1564,16 +1583,7 @@ sig
val no_cancel : 'a t -> 'a t
end =
struct
let new_pending ~how_to_cancel =
let state =
Pending {
regular_callbacks = Regular_callback_list_empty;
cancel_callbacks = Cancel_callback_list_empty;
how_to_cancel;
cleanups_deferred = 0;
}
in
{state}
let new_pending = new_pending_promise

let propagate_cancel_to_several ps =
(* Using a dirty cast here to avoid rebuilding the list :( Not bothering
Expand Down Expand Up @@ -2555,13 +2565,8 @@ struct
in
add_implicitly_removed_callback p_callbacks callback

let async f =
let p =
try f ()
with exn when Exception_filter.run exn -> fail exn
in
let handle_promise_computed_in_async (p: _ t) =
let Internal p = to_internal_promise p in

match (underlying p).state with
| Fulfilled _ ->
()
Expand All @@ -2578,6 +2583,12 @@ struct
in
add_implicitly_removed_callback p_callbacks callback

let async f =
(* run [f()] in a context where [await] is handled *)
run_in_resolution_loop (fun () ->
run_with_effect (fun () ->
handle_promise_computed_in_async (f ())))

let ignore_result p =
let Internal p = to_internal_promise p in

Expand Down

0 comments on commit 360e634

Please sign in to comment.