Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

process based mutex #56

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions riot/lib/lib.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ module Supervisor = Supervisor
module Task = Task
module Telemetry = Telemetry_app
module Timeout = Timeout
module Mutex = Mutex
include Global
82 changes: 82 additions & 0 deletions riot/lib/mutex.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
module type Base = sig
type value
end

module MakeServer (B : Base) = struct
open Global
open Util

type value = B.value
type state = { value : value; queue : Pid.t Lf_queue.t }

type Message.t +=
| Lock of Pid.t
| Unlock of (Pid.t * B.value)
| LockAck of (Pid.t * value)

let rec loop_locked state locker_pid =
match receive () with
| Lock pid ->
let () = Lf_queue.push state.queue pid in
loop_locked state locker_pid
| Unlock (pid, value) when locker_pid = pid ->
let () = demonitor locker_pid in
loop_unlocked { state with value }
| Unlock (_, _) -> failwith "wrong pid tried to unlock mutex"
| Process.Messages.Monitor (Process_down fell_pid)
when locker_pid = fell_pid ->
Logger.debug (fun f -> f "locker process crashed");
loop_unlocked state
| _ -> failwith "unexpected message"

and loop_unlocked state =
match Lf_queue.pop state.queue with
| Some pid ->
let () = send pid (LockAck (self (), state.value)) in
let () = monitor pid in
loop_locked state pid
| None -> (
match receive () with
| Lock pid ->
let () = send pid (LockAck (self (), state.value)) in
let () = monitor pid in
loop_locked state pid
| _ -> failwith "unexpected message")

let start_link value =
let state = { queue = Lf_queue.create (); value } in
(fun () -> loop_unlocked state) |> spawn_link |> Result.ok
end

module type Intf = sig
type value
type t

val start_link : value -> (t, [> `Exn of exn ]) result
val lock : t -> value
val unlock : t -> value -> unit
end

module Make (B : Base) = struct
open Global
module Server = MakeServer (B)

type value = B.value
type t = Pid.t

let start_link = Server.start_link

(* PR-Note (from: @julien-leclercq): How to handle the case of a dead mutex process ?*)
let lock mutex =
let () = send mutex @@ Server.Lock (self ()) in
let rec do_receive () =
match receive () with
| Server.(LockAck (sender, value)) when sender = mutex -> value
| msg ->
send (self ()) msg;
do_receive ()
in
do_receive ()
Comment on lines +69 to +79
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

☝️ here is the part where we could be in trouble if the Mutex process is dead


let unlock mutex new_value = send mutex Server.(Unlock (self (), new_value))
end
17 changes: 17 additions & 0 deletions riot/riot.mli
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,23 @@ module Store : sig
module Make (B : Base) : Intf with type key = B.key and type value = B.value
end

module Mutex : sig
module type Base = sig
type value
end

module type Intf = sig
type value
type t

val start_link : value -> (t, [> `Exn of exn ]) result
val lock : t -> value
val unlock : t -> value -> unit
end

module Make (B : Base) : Intf with type value = B.value
end

module Crypto : sig
module Random : sig
val cstruct : int -> Cstruct.t
Expand Down
Loading