Skip to content

Commit

Permalink
Merge pull request #32 from c-cube/simon/update-chan-2024-09-25
Browse files Browse the repository at this point in the history
update channels to make them bounded and more efficient
  • Loading branch information
c-cube authored Sep 27, 2024
2 parents 0d8767f + c7f517c commit f128e6c
Show file tree
Hide file tree
Showing 10 changed files with 220 additions and 224 deletions.
3 changes: 3 additions & 0 deletions bench_primes.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/sh
OPTS="--profile=release --display=quiet"
exec dune exec $OPTS -- benchs/primes.exe $@
2 changes: 1 addition & 1 deletion benchs/dune
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(executables
(names fib_rec pi)
(names fib_rec pi primes)
(preprocess
(action
(run %{project_root}/src/cpp/cpp.exe %{input-file})))
Expand Down
60 changes: 60 additions & 0 deletions benchs/primes.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
let ( let@ ) = ( @@ )
let spf = Printf.sprintf

let generate' chan =
for i = 2 to Int.max_int do
Moonpool.Chan.push chan i
done

let filter' in_chan out_chan prime =
let rec loop () =
let n = Moonpool.Chan.pop in_chan in
if n mod prime <> 0 then Moonpool.Chan.push out_chan n;
loop ()
in
loop ()

let main ~chan_size ~n ~on_prime () : unit =
let@ runner = Moonpool.Ws_pool.with_ () in
let@ () = Moonpool.Ws_pool.run_wait_block runner in
let primes = ref @@ Moonpool.Chan.create ~max_size:chan_size () in
Moonpool.run_async runner
(let chan = !primes in
fun () -> generate' chan);

for _i = 1 to n do
let prime = Moonpool.Chan.pop !primes in
on_prime prime;
let filtered_chan = Moonpool.Chan.create ~max_size:chan_size () in
Moonpool.run_async runner
(let in_chan = !primes in
fun () -> filter' in_chan filtered_chan prime);
primes := filtered_chan
done

let () =
let n = ref 10_000 in
let chan_size = ref 0 in
let time = ref true in
let opts =
[
"-n", Arg.Set_int n, " number of iterations";
"--no-time", Arg.Clear time, " do not compute time";
"--chan-size", Arg.Set_int chan_size, " channel size";
]
|> Arg.align
in
Arg.parse opts ignore "";
Printf.printf "computing %d primes\n%!" !n;

let t_start = Unix.gettimeofday () in

let n_primes = Atomic.make 0 in
main ~n:!n ~chan_size:!chan_size ~on_prime:(fun _ -> Atomic.incr n_primes) ();

let elapsed : float = Unix.gettimeofday () -. t_start in
Printf.printf "computed %d primes%s\n%!" (Atomic.get n_primes)
(if !time then
spf " in %.4fs" elapsed
else
"")
293 changes: 112 additions & 181 deletions src/core/chan.ml
Original file line number Diff line number Diff line change
@@ -1,193 +1,124 @@
module A = Atomic_

type 'a or_error = 'a Fut.or_error
type 'a waiter = 'a Fut.promise

let[@inline] list_is_empty_ = function
| [] -> true
| _ :: _ -> false

(** Simple functional queue *)
module Q : sig
type 'a t

val return : 'a -> 'a t
val is_empty : _ t -> bool

exception Empty

val pop_exn : 'a t -> 'a * 'a t
val push : 'a t -> 'a -> 'a t
val iter : ('a -> unit) -> 'a t -> unit
end = struct
type 'a t = {
hd: 'a list;
tl: 'a list;
}
(** Queue containing elements of type 'a.
invariant: if hd=[], then tl=[] *)

let[@inline] return x : _ t = { hd = [ x ]; tl = [] }

let[@inline] make_ hd tl =
match hd with
| [] -> { hd = List.rev tl; tl = [] }
| _ :: _ -> { hd; tl }

let[@inline] is_empty self = list_is_empty_ self.hd
let[@inline] push self x : _ t = make_ self.hd (x :: self.tl)

let iter f (self : _ t) : unit =
List.iter f self.hd;
List.iter f self.tl

exception Empty

let pop_exn self =
match self.hd with
| [] ->
assert (list_is_empty_ self.tl);
raise Empty
| x :: hd' ->
let self' = make_ hd' self.tl in
x, self'
end

exception Closed

type 'a state =
| Empty
| St_closed
| Elems of 'a Q.t
| Waiters of 'a waiter Q.t

type 'a t = { st: 'a state A.t } [@@unboxed]

let create () : _ t = { st = A.make Empty }

(** Produce a state from a queue of waiters *)
let[@inline] mk_st_waiters_ ws : _ state =
if Q.is_empty ws then
Empty
else
Waiters ws

(** Produce a state from a queue of elements *)
let[@inline] mk_st_elems_ q : _ state =
if Q.is_empty q then
Empty
else
Elems q
type 'a t = {
q: 'a Queue.t;
mutex: Mutex.t; (** protects critical section *)
mutable closed: bool;
max_size: int;
push_waiters: Trigger.t Queue.t;
pop_waiters: Trigger.t Queue.t;
}

let create ~max_size () : _ t =
if max_size < 0 then invalid_arg "Chan: max_size < 0";
{
max_size;
mutex = Mutex.create ();
closed = false;
q = Queue.create ();
push_waiters = Queue.create ();
pop_waiters = Queue.create ();
}

let push (self : _ t) x : unit =
while
let old_st = A.get self.st in
match old_st with
| St_closed -> raise Closed
| Empty -> not (A.compare_and_set self.st old_st (Elems (Q.return x)))
| Waiters ws ->
(* awake first waiter and give it [x] *)
let w, ws' = Q.pop_exn ws in
let new_st = mk_st_waiters_ ws' in
if A.compare_and_set self.st old_st new_st then (
Fut.fulfill w (Ok x);
false
) else
true
| Elems q -> not (A.compare_and_set self.st old_st (Elems (Q.push q x)))
do
Domain_.relax ()
done
let try_push (self : _ t) x : bool =
let res = ref false in
if Mutex.try_lock self.mutex then (
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);

match Queue.length self.q with
| 0 ->
let to_awake = Queue.create () in
Queue.push x self.q;
Queue.transfer self.pop_waiters to_awake;
res := true;
Mutex.unlock self.mutex;
(* wake up pop triggers if needed. Be careful to do that
outside the critical section*)
Queue.iter Trigger.signal to_awake
| n when n < self.max_size ->
Queue.push x self.q;
Mutex.unlock self.mutex
| _ -> Mutex.unlock self.mutex
);
!res

let try_pop (type elt) self : elt option =
let module M = struct
exception Found of elt
end in
try
(* a bit of spinning *)
for _retry = 1 to 10 do
let old_st = A.get self.st in
match old_st with
| Elems q ->
let x, q' = Q.pop_exn q in
let new_st = mk_st_elems_ q' in
if A.compare_and_set self.st old_st new_st then
raise_notrace (M.Found x)
else
Domain_.relax ()
| _ -> raise_notrace Exit
done;
None
with
| M.Found x -> Some x
| Exit -> None

let pop (type elt) (self : _ t) : elt Fut.t =
let module M = struct
exception Ret of elt
exception Fut of elt Fut.t
end in
try
while
let old_st = A.get self.st in
(match old_st with
| St_closed ->
let bt = Printexc.get_callstack 10 in
raise_notrace (M.Fut (Fut.fail Closed bt))
| Elems q ->
let x, q' = Q.pop_exn q in
let new_st = mk_st_elems_ q' in
if A.compare_and_set self.st old_st new_st then raise_notrace (M.Ret x)
| Empty ->
let fut, promise = Fut.make () in
let new_st = Waiters (Q.return promise) in
if A.compare_and_set self.st old_st new_st then
raise_notrace (M.Fut fut)
| Waiters ws ->
let fut, promise = Fut.make () in
(* add new promise at the end of the queue of waiters *)
let new_st = Waiters (Q.push ws promise) in
if A.compare_and_set self.st old_st new_st then
raise_notrace (M.Fut fut));
true
do
Domain_.relax ()
done;
(* never reached *)
assert false
with
| M.Ret x -> Fut.return x
| M.Fut f -> f

let pop_block_exn (self : 'a t) : 'a =
match try_pop self with
| Some x -> x
| None -> Fut.wait_block_exn @@ pop self
let res = ref None in
if Mutex.try_lock self.mutex then (
(match Queue.pop self.q with
| exception Queue.Empty ->
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
)
| x -> res := Some x);
Mutex.unlock self.mutex
);
!res

let close (self : _ t) : unit =
while
let old_st = A.get self.st in
match old_st with
| St_closed -> false (* exit *)
| Elems _ | Empty -> not (A.compare_and_set self.st old_st St_closed)
| Waiters ws ->
if A.compare_and_set self.st old_st St_closed then (
(* fail all waiters with [Closed]. *)
let bt = Printexc.get_callstack 10 in
Q.iter (fun w -> Fut.fulfill_idempotent w (Error (Closed, bt))) ws;
false
) else
true
do
Domain_.relax ()
done
let q = Queue.create () in
Mutex.lock self.mutex;
if not self.closed then (
self.closed <- true;
Queue.transfer self.pop_waiters q;
Queue.transfer self.push_waiters q
);
Mutex.unlock self.mutex;
Queue.iter Trigger.signal q

[@@@ifge 5.0]

let pop_await self =
match try_pop self with
| Some x -> x
| None -> Fut.await @@ pop self
let rec push (self : _ t) x : unit =
Mutex.lock self.mutex;

if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);

match Queue.length self.q with
| 0 ->
Queue.push x self.q;
let to_wakeup = Queue.create () in
Queue.transfer self.pop_waiters to_wakeup;
Mutex.unlock self.mutex;
Queue.iter Trigger.signal to_wakeup
| n when n < self.max_size ->
Queue.push x self.q;
Mutex.unlock self.mutex
| _ ->
let tr = Trigger.create () in
Queue.push tr self.push_waiters;
Mutex.unlock self.mutex;
Trigger.await_exn tr;
push self x

let rec pop (self : 'a t) : 'a =
Mutex.lock self.mutex;
match Queue.pop self.q with
| x ->
if Queue.is_empty self.q then (
let to_wakeup = Queue.create () in
Queue.transfer self.push_waiters to_wakeup;
Mutex.unlock self.mutex;
Queue.iter Trigger.signal to_wakeup
) else
Mutex.unlock self.mutex;
x
| exception Queue.Empty ->
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);

let tr = Trigger.create () in
Queue.push tr self.pop_waiters;
Mutex.unlock self.mutex;
Trigger.await_exn tr;
pop self

[@@@endif]
Loading

0 comments on commit f128e6c

Please sign in to comment.