Skip to content

Commit

Permalink
testing delay.wait
Browse files Browse the repository at this point in the history
  • Loading branch information
psafont committed Dec 31, 2024
1 parent f606c57 commit b8a1d66
Show file tree
Hide file tree
Showing 38 changed files with 191 additions and 228 deletions.
2 changes: 1 addition & 1 deletion ocaml/database/block_device_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ let accept_conn s timer =
match Timer.remaining timer with
| Remaining time ->
(* Await an incoming connection... *)
let timeout = Scheduler.span_to_s time in
let timeout = Clock.Timer.span_to_s time in
let ready_to_read, _, _ =
Xapi_stdext_unix.Unixext.select [s] [] [] timeout
in
Expand Down
5 changes: 2 additions & 3 deletions ocaml/database/master_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module Timer = Clock.Timer

let my_connection : Stunnel.t option ref = ref None

let delay = Scheduler.PipeDelay.make ()
let delay = Scheduler.Delay.make ()

exception Uninitialised

Expand Down Expand Up @@ -358,8 +358,7 @@ let do_db_xml_rpc_persistent_with_reopen ~host:_ ~path (req : string) :
) ;
debug "Sleeping %a before retrying master connection..."
Debug.Pp.mtime_span !backoff_delay ;
let delay_s = Scheduler.span_to_s !backoff_delay in
let timed_out = Scheduler.PipeDelay.wait delay delay_s in
let timed_out = Scheduler.Delay.wait delay !backoff_delay in
if not timed_out then
debug "%s: Sleep interrupted, retrying master connection now"
__FUNCTION__ ;
Expand Down
1 change: 1 addition & 0 deletions ocaml/libs/tracing/dune
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
(libraries
cohttp
cohttp-posix
mtime
ptime
ptime.clock.os
rpclib.core
Expand Down
6 changes: 3 additions & 3 deletions ocaml/libs/tracing/tracing_export.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ open Tracing

let ( let@ ) f x = f x

let export_interval = ref 30.
let export_interval = ref Mtime.Span.(30 * s)

let set_export_interval t = export_interval := t

Expand Down Expand Up @@ -315,8 +315,8 @@ module Destination = struct
(fun () ->
let signaled = ref false in
while not !signaled do
debug "Tracing: Waiting %d seconds before exporting spans"
(int_of_float !export_interval) ;
debug "Tracing: Waiting %a before exporting spans" Debug.Pp.mtime_span
!export_interval ;
if not (Delay.wait delay !export_interval) then (
debug "Tracing: we are signaled, export spans now and exit" ;
signaled := true
Expand Down
28 changes: 14 additions & 14 deletions ocaml/libs/tracing/tracing_export.mli
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,36 @@
* GNU Lesser General Public License for more details.
*)

(** [Tracing_export] is a module dedicated for the creation and management of
(** [Tracing_export] is a module dedicated for the creation and management of
threads that export the tracing data.
*)

val set_export_interval : float -> unit
(** [set_export_interval seconds] sets the time interval between consecutive
exports of the finished spans to [seconds].
val set_export_interval : Mtime.Span.t -> unit
(** [set_export_interval period] sets the time interval between consecutive
exports of the finished spans to [period].
Default is every [30.] seconds.
*)

val set_host_id : string -> unit
(** [set_host_id id] sets the id of the host to [id].
(** [set_host_id id] sets the id of the host to [id].
Default is ["localhost"].
*)

val set_service_name : string -> unit
(** [set_service_name name] sets the name of the service to [name].
All spans will be exported under this service's name.
(** [set_service_name name] sets the name of the service to [name].
All spans will be exported under this service's name.
Default name is ["unknown"].
*)

(** [Destination] is a module for managing the export of tracing data to
different types of endpoints, whether is exporting it to a [File] or an
[Http] endpoint.
(** [Destination] is a module for managing the export of tracing data to
different types of endpoints, whether is exporting it to a [File] or an
[Http] endpoint.
*)
module Destination : sig
(** [File] is a module for managing the files in which the tracing data is
(** [File] is a module for managing the files in which the tracing data is
exported.
*)
module File : sig
Expand Down Expand Up @@ -70,11 +70,11 @@ module Destination : sig
end

val flush_spans : unit -> unit
(** [flush_spans ()] forcefully flushes the spans to the current enabled
(** [flush_spans ()] forcefully flushes the spans to the current enabled
endpoints.
*)

(** [Http] is a module for managing exporting tracing data to an http
(** [Http] is a module for managing exporting tracing data to an http
endpoint.
*)
module Http : sig
Expand Down
8 changes: 2 additions & 6 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,7 @@ let loop () =
add_periodic_pending ()
| None -> (
(* Sleep until next event. *)
let sleep =
Mtime.Span.abs_diff deadline now
|> Mtime.Span.(add ms)
|> Clock.Timer.span_to_s
in
let sleep = Mtime.Span.abs_diff deadline now |> Mtime.Span.(add ms) in
try ignore (Delay.wait delay sleep)
with e ->
let detailed_msg =
Expand All @@ -112,7 +108,7 @@ let loop () =
"Could not schedule interruptable delay (%s). Falling back to \
normal delay. New events may be missed."
detailed_msg ;
Thread.delay sleep
Thread.delay (Clock.Timer.span_to_s sleep)
)
done
with _ ->
Expand Down
18 changes: 8 additions & 10 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,16 @@ module Delay = struct
external wait : t -> int64 -> bool = "caml_xapi_delay_wait"

let wait d t =
if t <= 0. then
if t = Mtime.Span.zero then
true
else
match Mtime.Span.of_float_ns (t *. 1e9) with
| Some span ->
let now = Mtime_clock.now () in
let deadline =
Mtime.add_span now span |> Option.value ~default:Mtime.max_stamp
in
wait d (Mtime.to_uint64_ns deadline)
| None ->
invalid_arg "Time specified too big"
(* Against Mtime docs, we use the absolute value of the monotonic clock,
because that's what pthread_cond_timedwait has been set up to use *)
let now = Mtime_clock.now () in
let deadline =
Mtime.add_span now t |> Option.value ~default:Mtime.max_stamp
in
wait d (Mtime.to_uint64_ns deadline)
end

let wait_timed_read fd timeout =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ module Delay : sig

val make : unit -> t

val wait : t -> float -> bool
val wait : t -> Mtime.Span.t -> bool
(** Blocks the calling thread for a given period of time with the option of
returning early if someone calls 'signal'. Returns true if the full time
period elapsed and false if signalled. Note that multiple 'signals' are
Expand Down
61 changes: 46 additions & 15 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext_test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,53 @@

module Delay = Xapi_stdext_threads.Threadext.Delay

let delay_wait_check ~min ~max delay timeout expected =
let cnt = Mtime_clock.counter () in
let res = Delay.wait delay timeout in
let elapsed = (Mtime_clock.count cnt |> Mtime.Span.to_float_ns) *. 1e-9 in
Alcotest.(check bool) "expected result" expected res ;
if elapsed < min || elapsed > max then
let msg = Printf.sprintf "%f not in range %f-%f" elapsed min max in
Alcotest.(check bool) msg true false
let span_between ~at_least ~at_most actual =
Mtime.Span.is_longer actual ~than:at_least
&& Mtime.Span.is_shorter actual ~than:at_most

let delay_test delay wait_for ~at_least ~at_most ~times_out =
let c = Mtime_clock.counter () in
let timed_out = Delay.wait delay wait_for in
let waited = Mtime_clock.count c in

let msg =
Fmt.str "Value %a must be between %a and %a" Mtime.Span.pp waited
Mtime.Span.pp at_least Mtime.Span.pp at_most
in
let actual = span_between ~at_least ~at_most waited in
Alcotest.(check' bool) ~msg ~expected:true ~actual ;

let msg =
if timed_out then
"Must have been signaled"
else
"Must have timed\n out"
in
Alcotest.(check' bool) ~msg ~expected:times_out ~actual:timed_out

(*
Single simple signal stored
- signal
- wait on same thread should succeed quickly
*)
let simple () =
let signal () =
let d = Delay.make () in
let wait_for = Mtime.Span.(1 * s) in
let at_least = Mtime.Span.zero in
let at_most = Mtime.Span.(5 * ms) in
Delay.signal d ;
delay_wait_check ~min:0. ~max:0.05 d 1.0 false
delay_test d wait_for ~at_least ~at_most ~times_out:false

(*
No signal
- wait on same thread should timeout more or less on delay
*)
let no_signal () =
let d = Delay.make () in
delay_wait_check ~min:0.2 ~max:0.25 d 0.2 true
let wait_for = Mtime.Span.(200 * ms) in
let at_least = wait_for in
let at_most = Mtime.Span.(250 * ms) in
delay_test d wait_for ~at_least ~at_most ~times_out:true

(*
Signal twice, collapsed
Expand All @@ -52,8 +73,15 @@ let collapsed () =
let d = Delay.make () in
Delay.signal d ;
Delay.signal d ;
delay_wait_check ~min:0. ~max:0.05 d 0.2 false ;
delay_wait_check ~min:0.2 ~max:0.25 d 0.2 true

let wait_for = Mtime.Span.(200 * ms) in

let at_least = Mtime.Span.zero in
let at_most = Mtime.Span.(50 * ms) in
delay_test d wait_for ~at_least ~at_most ~times_out:false ;
let at_least = Mtime.Span.(200 * ms) in
let at_most = Mtime.Span.(250 * ms) in
delay_test d wait_for ~at_least ~at_most ~times_out:true

(*
Signal from another thread
Expand All @@ -62,13 +90,16 @@ Signal from another thread
*)
let other_thread () =
let d = Delay.make () in
let wait_for = Mtime.Span.(1 * s) in
let at_least = Mtime.Span.(200 * ms) in
let at_most = Mtime.Span.(250 * ms) in
let th = Thread.create (fun d -> Thread.delay 0.2 ; Delay.signal d) d in
delay_wait_check ~min:0.2 ~max:0.25 d 1.0 false ;
delay_test d wait_for ~at_least ~at_most ~times_out:false ;
Thread.join th

let tests =
[
("simple", `Quick, simple)
("simple", `Quick, signal)
; ("no_signal", `Quick, no_signal)
; ("collapsed", `Quick, collapsed)
; ("other_thread", `Quick, other_thread)
Expand Down
6 changes: 2 additions & 4 deletions ocaml/message-switch/cli/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,13 @@ let day = 86400_000_000_000L

let int64_of_id = Mtime.Span.to_uint64_ns

let is_longer s ~than = Mtime.Span.compare s than > 0

let diagnostics common_opts =
Client.connect ~switch:common_opts.Common.path () >>|= fun t ->
Client.diagnostics ~t () >>|= fun d ->
let open Message_switch_core.Protocol in
let time_ago x =
let ts =
if is_longer ~than:d.Diagnostics.time_since_start x then
if Mtime.Span.is_longer ~than:d.Diagnostics.time_since_start x then
Mtime.Span.zero
else
Mtime.Span.(abs_diff d.Diagnostics.time_since_start x)
Expand Down Expand Up @@ -186,7 +184,7 @@ let diagnostics common_opts =
Mtime.Span.add d.Diagnostics.time_since_start unresponsive_timeout
in
let elapsed = Mtime_clock.elapsed () in
if is_longer ~than:deadline elapsed then
if Mtime.Span.is_longer ~than:deadline elapsed then
`Crashed_or_deadlocked t
else
`Ok
Expand Down
4 changes: 1 addition & 3 deletions ocaml/message-switch/core_test/client_unix_main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ let ( >>|= ) m f =
Format.pp_print_flush fmt () ;
raise (Failure (Buffer.contents b))

let is_shorter ~than s = Mtime.Span.compare s than < 0

let main () =
Client.connect ~switch:!path () >>|= fun c ->
let counter = ref 0 in
Expand All @@ -52,7 +50,7 @@ let main () =
| None ->
one ()
| Some t ->
while is_shorter ~than:t (Mtime_clock.count start) do
while Mtime.Span.is_shorter ~than:t (Mtime_clock.count start) do
one ()
done
) ;
Expand Down
4 changes: 1 addition & 3 deletions ocaml/message-switch/core_test/lwt/client_main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ let ( >>|= ) m f =
Format.pp_print_flush fmt () ;
fail (Failure (Buffer.contents b))

let is_shorter ~than s = Mtime.Span.compare s than < 0

let main () =
Client.connect ~switch:!path () >>|= fun t ->
let counter = ref 0 in
Expand All @@ -57,7 +55,7 @@ let main () =
| Some timeout ->
let rec thread n =
let elapsed = Mtime_clock.count start in
if is_shorter ~than:timeout elapsed then
if Mtime.Span.is_shorter ~than:timeout elapsed then
one () >>= fun () -> thread n
else
return ()
Expand Down
6 changes: 2 additions & 4 deletions ocaml/message-switch/switch/switch_main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ let exn_hook e =

let () = Lwt.async_exception_hook := exn_hook

let is_longer s ~than = Mtime.Span.compare s than > 0

let elapsed () = Clock.elapsed_ns () |> Mtime.Span.of_uint64_ns

let make_server config trace_config =
Expand Down Expand Up @@ -245,7 +243,7 @@ let make_server config trace_config =
if Q.transfer !queues from names = [] then
let timeout =
let elapsed = elapsed () in
if is_longer ~than:deadline elapsed then
if Mtime.Span.is_longer ~than:deadline elapsed then
0.
else
Mtime.Span.abs_diff deadline elapsed
Expand All @@ -255,7 +253,7 @@ let make_server config trace_config =
in
Lwt.pick [Lwt_unix.sleep timeout; Lwt_condition.wait queues_c]
>>= fun () ->
if is_longer ~than:deadline (elapsed ()) then
if Mtime.Span.is_longer ~than:deadline (elapsed ()) then
return ()
else
wait ()
Expand Down
10 changes: 2 additions & 8 deletions ocaml/message-switch/unix/protocol_unix_scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ module Mutex = struct
end

module WaitMap = Map.Make (Mtime.Span)

module Delay = Xapi_stdext_threads.Threadext.Delay

type item = {id: int; name: string; fn: unit -> unit}
Expand Down Expand Up @@ -133,13 +132,8 @@ let rec main_loop () =
Mtime.Span.(add (Mtime_clock.count elapsed) (1 * hour))
)
in
let seconds =
Mtime.Span.abs_diff sleep_until (Mtime_clock.count elapsed)
|> Mtime.Span.to_uint64_ns
|> Int64.div 1_000_000_000L
|> Int64.to_float
in
let (_ : bool) = Delay.wait delay seconds in
let wait_for = Mtime.Span.abs_diff sleep_until (Mtime_clock.count elapsed) in
let (_ : bool) = Delay.wait delay wait_for in
main_loop ()

let start =
Expand Down
1 change: 1 addition & 0 deletions ocaml/tests/dune
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
(libraries
alcotest
bos
clock
fmt
mtime
pam
Expand Down
Loading

0 comments on commit b8a1d66

Please sign in to comment.