diff --git a/bench_primes.sh b/bench_primes.sh new file mode 100755 index 00000000..52b618c5 --- /dev/null +++ b/bench_primes.sh @@ -0,0 +1,3 @@ +#!/bin/sh +OPTS="--profile=release --display=quiet" +exec dune exec $OPTS -- benchs/primes.exe $@ diff --git a/benchs/dune b/benchs/dune index 14393230..6d2ec5ff 100644 --- a/benchs/dune +++ b/benchs/dune @@ -1,5 +1,5 @@ (executables - (names fib_rec pi) + (names fib_rec pi primes) (preprocess (action (run %{project_root}/src/cpp/cpp.exe %{input-file}))) diff --git a/benchs/primes.ml b/benchs/primes.ml new file mode 100644 index 00000000..1a8557d9 --- /dev/null +++ b/benchs/primes.ml @@ -0,0 +1,60 @@ +let ( let@ ) = ( @@ ) +let spf = Printf.sprintf + +let generate' chan = + for i = 2 to Int.max_int do + Moonpool.Chan.push chan i + done + +let filter' in_chan out_chan prime = + let rec loop () = + let n = Moonpool.Chan.pop in_chan in + if n mod prime <> 0 then Moonpool.Chan.push out_chan n; + loop () + in + loop () + +let main ~chan_size ~n ~on_prime () : unit = + let@ runner = Moonpool.Ws_pool.with_ () in + let@ () = Moonpool.Ws_pool.run_wait_block runner in + let primes = ref @@ Moonpool.Chan.create ~max_size:chan_size () in + Moonpool.run_async runner + (let chan = !primes in + fun () -> generate' chan); + + for _i = 1 to n do + let prime = Moonpool.Chan.pop !primes in + on_prime prime; + let filtered_chan = Moonpool.Chan.create ~max_size:chan_size () in + Moonpool.run_async runner + (let in_chan = !primes in + fun () -> filter' in_chan filtered_chan prime); + primes := filtered_chan + done + +let () = + let n = ref 10_000 in + let chan_size = ref 0 in + let time = ref true in + let opts = + [ + "-n", Arg.Set_int n, " number of iterations"; + "--no-time", Arg.Clear time, " do not compute time"; + "--chan-size", Arg.Set_int chan_size, " channel size"; + ] + |> Arg.align + in + Arg.parse opts ignore ""; + Printf.printf "computing %d primes\n%!" !n; + + let t_start = Unix.gettimeofday () in + + let n_primes = Atomic.make 0 in + main ~n:!n ~chan_size:!chan_size ~on_prime:(fun _ -> Atomic.incr n_primes) (); + + let elapsed : float = Unix.gettimeofday () -. t_start in + Printf.printf "computed %d primes%s\n%!" (Atomic.get n_primes) + (if !time then + spf " in %.4fs" elapsed + else + "") diff --git a/src/core/chan.ml b/src/core/chan.ml index 5ce82376..be4ac3b9 100644 --- a/src/core/chan.ml +++ b/src/core/chan.ml @@ -1,193 +1,124 @@ -module A = Atomic_ - -type 'a or_error = 'a Fut.or_error -type 'a waiter = 'a Fut.promise - -let[@inline] list_is_empty_ = function - | [] -> true - | _ :: _ -> false - -(** Simple functional queue *) -module Q : sig - type 'a t - - val return : 'a -> 'a t - val is_empty : _ t -> bool - - exception Empty - - val pop_exn : 'a t -> 'a * 'a t - val push : 'a t -> 'a -> 'a t - val iter : ('a -> unit) -> 'a t -> unit -end = struct - type 'a t = { - hd: 'a list; - tl: 'a list; - } - (** Queue containing elements of type 'a. - - invariant: if hd=[], then tl=[] *) - - let[@inline] return x : _ t = { hd = [ x ]; tl = [] } - - let[@inline] make_ hd tl = - match hd with - | [] -> { hd = List.rev tl; tl = [] } - | _ :: _ -> { hd; tl } - - let[@inline] is_empty self = list_is_empty_ self.hd - let[@inline] push self x : _ t = make_ self.hd (x :: self.tl) - - let iter f (self : _ t) : unit = - List.iter f self.hd; - List.iter f self.tl - - exception Empty - - let pop_exn self = - match self.hd with - | [] -> - assert (list_is_empty_ self.tl); - raise Empty - | x :: hd' -> - let self' = make_ hd' self.tl in - x, self' -end - exception Closed -type 'a state = - | Empty - | St_closed - | Elems of 'a Q.t - | Waiters of 'a waiter Q.t - -type 'a t = { st: 'a state A.t } [@@unboxed] - -let create () : _ t = { st = A.make Empty } - -(** Produce a state from a queue of waiters *) -let[@inline] mk_st_waiters_ ws : _ state = - if Q.is_empty ws then - Empty - else - Waiters ws - -(** Produce a state from a queue of elements *) -let[@inline] mk_st_elems_ q : _ state = - if Q.is_empty q then - Empty - else - Elems q +type 'a t = { + q: 'a Queue.t; + mutex: Mutex.t; (** protects critical section *) + mutable closed: bool; + max_size: int; + push_waiters: Trigger.t Queue.t; + pop_waiters: Trigger.t Queue.t; +} + +let create ~max_size () : _ t = + if max_size < 0 then invalid_arg "Chan: max_size < 0"; + { + max_size; + mutex = Mutex.create (); + closed = false; + q = Queue.create (); + push_waiters = Queue.create (); + pop_waiters = Queue.create (); + } -let push (self : _ t) x : unit = - while - let old_st = A.get self.st in - match old_st with - | St_closed -> raise Closed - | Empty -> not (A.compare_and_set self.st old_st (Elems (Q.return x))) - | Waiters ws -> - (* awake first waiter and give it [x] *) - let w, ws' = Q.pop_exn ws in - let new_st = mk_st_waiters_ ws' in - if A.compare_and_set self.st old_st new_st then ( - Fut.fulfill w (Ok x); - false - ) else - true - | Elems q -> not (A.compare_and_set self.st old_st (Elems (Q.push q x))) - do - Domain_.relax () - done +let try_push (self : _ t) x : bool = + let res = ref false in + if Mutex.try_lock self.mutex then ( + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); + + match Queue.length self.q with + | 0 -> + let to_awake = Queue.create () in + Queue.push x self.q; + Queue.transfer self.pop_waiters to_awake; + res := true; + Mutex.unlock self.mutex; + (* wake up pop triggers if needed. Be careful to do that + outside the critical section*) + Queue.iter Trigger.signal to_awake + | n when n < self.max_size -> + Queue.push x self.q; + Mutex.unlock self.mutex + | _ -> Mutex.unlock self.mutex + ); + !res let try_pop (type elt) self : elt option = - let module M = struct - exception Found of elt - end in - try - (* a bit of spinning *) - for _retry = 1 to 10 do - let old_st = A.get self.st in - match old_st with - | Elems q -> - let x, q' = Q.pop_exn q in - let new_st = mk_st_elems_ q' in - if A.compare_and_set self.st old_st new_st then - raise_notrace (M.Found x) - else - Domain_.relax () - | _ -> raise_notrace Exit - done; - None - with - | M.Found x -> Some x - | Exit -> None - -let pop (type elt) (self : _ t) : elt Fut.t = - let module M = struct - exception Ret of elt - exception Fut of elt Fut.t - end in - try - while - let old_st = A.get self.st in - (match old_st with - | St_closed -> - let bt = Printexc.get_callstack 10 in - raise_notrace (M.Fut (Fut.fail Closed bt)) - | Elems q -> - let x, q' = Q.pop_exn q in - let new_st = mk_st_elems_ q' in - if A.compare_and_set self.st old_st new_st then raise_notrace (M.Ret x) - | Empty -> - let fut, promise = Fut.make () in - let new_st = Waiters (Q.return promise) in - if A.compare_and_set self.st old_st new_st then - raise_notrace (M.Fut fut) - | Waiters ws -> - let fut, promise = Fut.make () in - (* add new promise at the end of the queue of waiters *) - let new_st = Waiters (Q.push ws promise) in - if A.compare_and_set self.st old_st new_st then - raise_notrace (M.Fut fut)); - true - do - Domain_.relax () - done; - (* never reached *) - assert false - with - | M.Ret x -> Fut.return x - | M.Fut f -> f - -let pop_block_exn (self : 'a t) : 'a = - match try_pop self with - | Some x -> x - | None -> Fut.wait_block_exn @@ pop self + let res = ref None in + if Mutex.try_lock self.mutex then ( + (match Queue.pop self.q with + | exception Queue.Empty -> + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ) + | x -> res := Some x); + Mutex.unlock self.mutex + ); + !res let close (self : _ t) : unit = - while - let old_st = A.get self.st in - match old_st with - | St_closed -> false (* exit *) - | Elems _ | Empty -> not (A.compare_and_set self.st old_st St_closed) - | Waiters ws -> - if A.compare_and_set self.st old_st St_closed then ( - (* fail all waiters with [Closed]. *) - let bt = Printexc.get_callstack 10 in - Q.iter (fun w -> Fut.fulfill_idempotent w (Error (Closed, bt))) ws; - false - ) else - true - do - Domain_.relax () - done + let q = Queue.create () in + Mutex.lock self.mutex; + if not self.closed then ( + self.closed <- true; + Queue.transfer self.pop_waiters q; + Queue.transfer self.push_waiters q + ); + Mutex.unlock self.mutex; + Queue.iter Trigger.signal q [@@@ifge 5.0] -let pop_await self = - match try_pop self with - | Some x -> x - | None -> Fut.await @@ pop self +let rec push (self : _ t) x : unit = + Mutex.lock self.mutex; + + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); + + match Queue.length self.q with + | 0 -> + Queue.push x self.q; + let to_wakeup = Queue.create () in + Queue.transfer self.pop_waiters to_wakeup; + Mutex.unlock self.mutex; + Queue.iter Trigger.signal to_wakeup + | n when n < self.max_size -> + Queue.push x self.q; + Mutex.unlock self.mutex + | _ -> + let tr = Trigger.create () in + Queue.push tr self.push_waiters; + Mutex.unlock self.mutex; + Trigger.await_exn tr; + push self x + +let rec pop (self : 'a t) : 'a = + Mutex.lock self.mutex; + match Queue.pop self.q with + | x -> + if Queue.is_empty self.q then ( + let to_wakeup = Queue.create () in + Queue.transfer self.push_waiters to_wakeup; + Mutex.unlock self.mutex; + Queue.iter Trigger.signal to_wakeup + ) else + Mutex.unlock self.mutex; + x + | exception Queue.Empty -> + if self.closed then ( + Mutex.unlock self.mutex; + raise Closed + ); + + let tr = Trigger.create () in + Queue.push tr self.pop_waiters; + Mutex.unlock self.mutex; + Trigger.await_exn tr; + pop self [@@@endif] diff --git a/src/core/chan.mli b/src/core/chan.mli index 083cf8d5..7ec1163d 100644 --- a/src/core/chan.mli +++ b/src/core/chan.mli @@ -1,42 +1,29 @@ (** Channels. - Channels are pipelines of values where threads can push into - one end, and pull from the other end. + The channels have bounded size. Push/pop return futures or can use effects + to provide an [await]-friendly version. - Unlike {!Moonpool.Blocking_queue}, channels are designed so - that pushing never blocks, and pop'ing values returns a future. - - @since 0.3 + The channels became bounded since @NEXT_RELEASE . *) -type 'a or_error = 'a Fut.or_error - type 'a t (** Channel carrying values of type ['a]. *) -val create : unit -> 'a t +val create : max_size:int -> unit -> 'a t (** Create a channel. *) exception Closed -val push : 'a t -> 'a -> unit -(** [push chan x] pushes [x] into [chan]. This does not block. +val try_push : 'a t -> 'a -> bool +(** [try_push chan x] pushes [x] into [chan]. This does not block. + Returns [true] if it succeeded in pushing. @raise Closed if the channel is closed. *) -val pop : 'a t -> 'a Fut.t -(** Pop an element. This returns a future that will be - fulfilled when an element is available. - @raise Closed if the channel is closed, or fails the future - if the channel is closed before an element is available for it. *) - val try_pop : 'a t -> 'a option (** [try_pop chan] pops and return an element if one is available - immediately. Otherwise it returns [None]. *) - -val pop_block_exn : 'a t -> 'a -(** Like [pop], but blocks if an element is not available immediately. - The precautions around blocking from inside a thread pool - are the same as explained in {!Fut.wait_block}. *) + immediately. Otherwise it returns [None]. + @raise Closed if the channel is closed and empty. + *) val close : _ t -> unit (** Close the channel. Further push and pop calls will fail. @@ -44,9 +31,23 @@ val close : _ t -> unit [@@@ifge 5.0] -val pop_await : 'a t -> 'a -(** Like {!pop} but suspends the current thread until an element is - available. See {!Fut.await} for more details. - @since 0.3 *) +val push : 'a t -> 'a -> unit +(** Push the value into the channel, suspending the current task + if the channel is currently full. + @raise Closed if the channel is closed + @since NEXT_RELEASE *) + +val pop : 'a t -> 'a +(** Pop an element. This might suspend the current task if the + channel is currently empty. + @raise Closed if the channel is empty and closed. + @since NEXT_RELEASE *) + +(* +val pop_block_exn : 'a t -> 'a +(** Like [pop], but blocks if an element is not available immediately. + The precautions around blocking from inside a thread pool + are the same as explained in {!Fut.wait_block}. *) +*) [@@@endif] diff --git a/src/sync/moonpool_sync.ml b/src/sync/moonpool_sync.ml index 99065305..f2c29ba7 100644 --- a/src/sync/moonpool_sync.ml +++ b/src/sync/moonpool_sync.ml @@ -1,4 +1,5 @@ module Mutex = Picos_std_sync.Mutex +module Chan = Chan module Condition = Picos_std_sync.Condition module Lock = Lock module Event = Event diff --git a/test/dune b/test/dune index c10d08f4..af881591 100644 --- a/test/dune +++ b/test/dune @@ -7,7 +7,6 @@ t_futs1 t_tree_futs t_props - t_chan_train t_resource t_unfair t_ws_deque diff --git a/test/effect-based/dune b/test/effect-based/dune index bf1feb81..faa9254d 100644 --- a/test/effect-based/dune +++ b/test/effect-based/dune @@ -3,6 +3,7 @@ t_fib1 t_futs1 t_many + t_chan_train t_fib_fork_join t_fib_fork_join_all t_sort diff --git a/test/t_chan_train.ml b/test/effect-based/t_chan_train.ml similarity index 88% rename from test/t_chan_train.ml rename to test/effect-based/t_chan_train.ml index 20645a73..e972dabc 100644 --- a/test/t_chan_train.ml +++ b/test/effect-based/t_chan_train.ml @@ -3,17 +3,15 @@ open Moonpool (* large pool, some of our tasks below are long lived *) let pool = Ws_pool.create ~num_threads:30 () -open Fut.Infix - type event = | E_int of int | E_close let mk_chan (ic : event Chan.t) : event Chan.t = - let out = Chan.create () in + let out = Chan.create ~max_size:16 () in let rec loop () = - let* ev = Chan.pop ic in + let ev = Chan.pop ic in Chan.push out ev; match ev with | E_close -> Fut.return () @@ -44,7 +42,7 @@ let run () = (* start trains *) let trains = List.init n_trains (fun _ -> - let c = Chan.create () in + let c = Chan.create ~max_size:16 () in let out = mk_train len_train c in c, out) in @@ -66,7 +64,9 @@ let run () = let sum = ref 0 in try while true do - match Chan.pop_block_exn oc with + match + Fut.spawn ~on:pool (fun () -> Chan.pop oc) |> Fut.wait_block_exn + with | E_close -> raise Exit | E_int x -> sum := !sum + x done; diff --git a/test/fiber/t_fib1.ml b/test/fiber/t_fib1.ml index 7ceedbf4..77360b2b 100644 --- a/test/fiber/t_fib1.ml +++ b/test/fiber/t_fib1.ml @@ -52,14 +52,14 @@ let () = let clock = ref TS.init in let fib = F.spawn_top ~on:runner @@ fun () -> - let chan_progress = Chan.create () in - let chans = Array.init 5 (fun _ -> Chan.create ()) in + let chan_progress = Chan.create ~max_size:4 () in + let chans = Array.init 5 (fun _ -> Chan.create ~max_size:4 ()) in let subs = List.init 5 (fun i -> F.spawn ~protect:false @@ fun _n -> Thread.delay (float i *. 0.01); - Chan.pop_await chans.(i); + Chan.pop chans.(i); Chan.push chan_progress i; F.check_if_cancelled (); i) @@ -70,7 +70,7 @@ let () = F.spawn_ignore (fun () -> for i = 0 to 4 do Chan.push chans.(i) (); - let i' = Chan.pop_await chan_progress in + let i' = Chan.pop chan_progress in assert (i = i') done); @@ -110,8 +110,8 @@ let () = @@ Exn_bt.show ebt) in - let chans_unblock = Array.init 10 (fun _i -> Chan.create ()) in - let chan_progress = Chan.create () in + let chans_unblock = Array.init 10 (fun _i -> Chan.create ~max_size:4 ()) in + let chan_progress = Chan.create ~max_size:4 () in logf (TS.tick_get clock) "start fibers"; let subs = @@ -126,7 +126,7 @@ let () = Thread.delay 0.002; (* sync for determinism *) - Chan.pop_await chans_unblock.(i); + Chan.pop chans_unblock.(i); Chan.push chan_progress i; if i = 7 then ( @@ -150,7 +150,7 @@ let () = F.spawn_ignore (fun () -> for j = 0 to 9 do Chan.push chans_unblock.(j) (); - let j' = Chan.pop_await chan_progress in + let j' = Chan.pop chan_progress in assert (j = j') done);