Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Picos_std_sync.Queue #302

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions bench/bench_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
open Multicore_bench
open Picos_std_sync

let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
let t = Queue.create ~padded:true () in

let op push =
if push then Queue.push t 101
else match Queue.pop_exn t with _ -> () | exception Queue.Empty -> ()
in

let init _ =
assert (
match Queue.pop_exn t with _ -> false | exception Queue.Empty -> true);
Util.generate_push_and_pop_sequence n_msgs
in
let work _ bits = Util.Bits.iter op bits in

Times.record ~budgetf ~n_domains:1 ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"

let run_one ~budgetf ~n_adders ~n_takers () =
let n_domains = n_adders + n_takers in

let n_msgs = 50 * Util.iter_factor in

let t = Queue.create ~padded:true () in

let n_msgs_to_add = Countdown.create ~n_domains:n_adders () in
let n_msgs_to_take = Countdown.create ~n_domains:n_takers () in

let init _ =
assert (
match Queue.pop_exn t with _ -> false | exception Queue.Empty -> true);
Countdown.non_atomic_set n_msgs_to_add n_msgs;
Countdown.non_atomic_set n_msgs_to_take n_msgs
in
let work i () =
if i < n_adders then
let rec work () =
let n = Countdown.alloc n_msgs_to_add ~domain_index:i ~batch:1000 in
if 0 < n then begin
for i = 1 to n do
Queue.push t i
done;
work ()
end
in
work ()
else
let i = i - n_adders in
let rec work () =
let n = Countdown.alloc n_msgs_to_take ~domain_index:i ~batch:1000 in
if 0 < n then
let rec loop n =
if 0 < n then begin
match Queue.pop_exn t with
| _ -> loop (n - 1)
| exception Queue.Empty ->
Backoff.once Backoff.default |> ignore;
loop n
end
else work ()
in
loop n
in
work ()
in

let config =
let format role n =
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
in
Printf.sprintf "%s, %s"
(format "nb adder" n_adders)
(format "nb taker" n_takers)
in
Times.record ~budgetf ~n_domains ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
run_one_domain ~budgetf ()
@ (Util.cross [ 1; 2; 4 ] [ 1; 2; 4 ]
|> List.concat_map @@ fun (n_adders, n_takers) ->
if Picos_domain.recommended_domain_count () < n_adders + n_takers then []
else run_one ~budgetf ~n_adders ~n_takers ())
1 change: 1 addition & 0 deletions bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
(run %{test} -brief "Picos Mutex")
(run %{test} -brief "Picos Semaphore")
(run %{test} -brief "Picos Spawn")
(run %{test} -brief "Picos Queue")
(run %{test} -brief "Picos Yield")
(run %{test} -brief "Picos Cancel_after with Picos_select")
(run %{test} -brief "Ref with Picos_sync.Mutex")
Expand Down
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ let benchmarks =
("Picos DLS", Bench_dls.run_suite);
("Picos Mutex", Bench_mutex.run_suite);
("Picos Semaphore", Bench_semaphore.run_suite);
("Picos Queue", Bench_queue.run_suite);
("Picos Spawn", Bench_spawn.run_suite);
("Picos Yield", Bench_yield.run_suite);
("Picos Cancel_after with Picos_select", Bench_cancel_after.run_suite);
Expand Down
1 change: 1 addition & 0 deletions lib/picos_std.sync/picos_std_sync.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ module Semaphore = Semaphore
module Lazy = Lazy
module Latch = Latch
module Ivar = Ivar
module Queue = Queue
module Stream = Stream
77 changes: 77 additions & 0 deletions lib/picos_std.sync/picos_std_sync.mli
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,81 @@ module Ivar : sig
variable has either been assigned a value or has been poisoned. *)
end

module Queue : sig
(** A lock-free multi-producer, multi-consumer queue. *)

(** {1 API} *)

type !'a t
(** A multi-producer, multi-consumer queue. *)

val create : ?padded:bool -> unit -> 'a t
(** [create ()] returns a new empty multi-producer, multi-consumer queue. *)

val push : 'a t -> 'a -> unit
(** [push queue value] adds the [value] to the tail of the [queue]. *)

val push_head : 'a t -> 'a -> unit
(** [push_head queue value] adds the [value] to the head of the [queue]. *)

exception Empty
(** Raised by {!pop_exn} in case it finds the queue empty. *)

val pop_exn : 'a t -> 'a
(** [pop_exn queue] tries to remove the value at the head of the [queue].
Returns the removed value or raises {!Empty} in case the queue was empty.

@raise Empty in case the queue was empty. *)

val pop_opt : 'a t -> 'a option
(** [pop_opt queue] tries to remove the value at the head of the [queue].
Returns the removed value or [None] in case the queue was empty. *)

val pop : 'a t -> 'a
(** [pop queue] waits until the queue is not empty, removes the value at the
head of the [queue], and returns it. *)

val length : 'a t -> int
(** [length queue] returns the length or the number of values in the [queue].
*)

val is_empty : 'a t -> bool
(** [is_empty queue] is equivalent to {{!length} [length queue = 0]}. *)

(** {1 Examples}

An example top-level session:
{[
# let q : int Queue.t =
Queue.create ()
val q : int Picos_std_sync.Queue.t = <abstr>

# Queue.push q 42
- : unit = ()

# Queue.push_head q 76
- : unit = ()

# Queue.length q
- : int = 2

# Queue.push q 101
- : unit = ()

# Queue.pop_exn q
- : int = 76

# Queue.pop_exn q
- : int = 42

# Queue.pop_exn q
- : int = 101

# Queue.pop_exn q
Exception: Picos_std_sync__Queue.Empty.
]} *)
end

module Stream : sig
(** A lock-free, poisonable, many-to-many, stream.

Expand Down Expand Up @@ -424,6 +499,8 @@ end
val push : 'a t -> 'a -> unit
val pop : 'a t -> 'a
end = struct
module Queue = Stdlib.Queue

type 'a t = {
mutex : Mutex.t;
queue : 'a Queue.t;
Expand Down
Loading
Loading