Skip to content

Commit

Permalink
Add FIFOLock
Browse files Browse the repository at this point in the history
A reentrant lock that provides strict FIFO ordering for lock
acquisitions.
  • Loading branch information
kpamnany committed Feb 2, 2025
1 parent a01385c commit a96a787
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 1 deletion.
4 changes: 3 additions & 1 deletion src/ConcurrentUtilities.jl
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
module ConcurrentUtilities

import Base: AbstractLock, islocked, trylock, lock, unlock
export Lockable, OrderedSynchronizer, reset!, ReadWriteLock, readlock, readunlock, @wkspawn,
Workers, remote_eval, remote_fetch, Worker, terminate!, WorkerTerminatedException,
Pool, acquire, release, drain!, try_with_timeout, TimeoutException
Pool, acquire, release, drain!, try_with_timeout, TimeoutException, FIFOLock

macro samethreadpool_spawn(expr)
if VERSION >= v"1.9.2"
Expand All @@ -21,6 +22,7 @@ include("synchronizer.jl")
include("rwlock.jl")
include("pools.jl")
using .Pools
include("fifolock.jl")

function clear_current_task()
current_task().storage = nothing
Expand Down
143 changes: 143 additions & 0 deletions src/fifolock.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
@static if VERSION >= v"1.10"

const LOCKED_BIT = 0b01

"""
FIFOLock()
Like Base.ReentrantLock, but with strict FIFO ordering.
"""
mutable struct FIFOLock <: AbstractLock
@atomic locked_by::Union{Task, Nothing}
reentrancy_cnt::UInt32
@atomic havelock::UInt8
cond_wait::Base.ThreadSynchronizer

FIFOLock() = new(nothing, 0x0000_0000, 0x00, Base.ThreadSynchronizer())
end

assert_havelock(l::FIFOLock) = assert_havelock(l, l.locked_by)
islocked(l::FIFOLock) = (@atomic :monotonic l.havelock) & LOCKED_BIT != 0

# Correctness reasoning:
#
# `havelock` can only be unset with the `cond_wait` lock held.
# Locking then first tries to set `havelock`; on failure, we
# acquire the `cond_wait` lock and try to set `havelock` again
# to ensure that we didn't coincide with an `unlock`. If we
# fail again, then we are assured that we will not miss an
# `unlock`, because we hold the `cond_wait` lock. Thus we can
# safely wait on `cond_wait`.
#
# FIFO ordering is ensured in `unlock`, which first acquires
# the `cond_wait` lock. If `cond_wait`'s wait queue is empty,
# the lock is released. Otherwise, we pop the first task in
# the wait queue, transfer ownership to it, schedule it, and
# return. Thus when one or more tasks are waiting,`havelock`
# is never reset.

"""
trylock(l::FIFOLock)
Try to acquire lock `l`. If successful, return `true`. If the lock is
held by another task, do not wait and return `false`.
"""
@inline function trylock(l::FIFOLock)
ct = current_task()
if l.locked_by === ct
l.reentrancy_cnt += 0x0000_0001
return true
end
return _trylock(l, ct)
end
@noinline function _trylock(l::FIFOLock, ct::Task)
GC.disable_finalizers()
if (@atomicreplace :acquire l.havelock 0x00 => LOCKED_BIT).success
l.reentrancy_cnt = 0x0000_0001
@atomic :release l.locked_by = ct
return true
end
GC.enable_finalizers()
return false
end

"""
lock(l::FIFOLock)
Acquire lock `l`. If the calling task has already acquired the lock
previously, increment an internal counter and return to support
reentrancy. Each `lock` call must be matched with an `unlock` call.
As with `Base.ReentrantLock`, acquiring a lock will inhibit running
finalizers on that thread until the lock is released.
FIFO behavior is handled in `unlock`.
"""
@inline function lock(l::FIFOLock)
trylock(l) || _lock(l)
end
@noinline function _lock(l::FIFOLock)
ct = current_task()
c = l.cond_wait
lock(c)
try
_trylock(l, ct) && return
wait(c)
l.reentrancy_cnt = 0x0000_0001
@atomic :release l.locked_by = ct
finally
unlock(c)
end
return
end

"""
unlock(l::FIFOLock)
Release ownership of the lock `l`. If the lock was acquired recursively,
the number of unlocks must match the number of locks before `l` is
actually released.
FIFO behavior is enforced here, in `unlock`: if one or more tasks are
waiting on the lock, we do not actually unlock; just hand ownership to
the first waiting task and schedule it.
"""
@inline function unlock(l::FIFOLock)
ct = current_task()
if l.locked_by !== ct
error("unlock from wrong thread")
end
if l.reentrancy_cnt == 0x0000_0000
error("unlock count must match lock count")
end
_unlock(l)
end
@noinline function _unlock(l::FIFOLock)
ct = current_task()
c = l.cond_wait
n = l.reentrancy_cnt - 0x0000_0001
if n == 0x0000_0000
lock(c)
try
if isempty(c.waitq)
l.reentrancy_cnt = n
@atomic :release l.locked_by = nothing
@atomic :release l.havelock = 0x00
else
t = popfirst!(c.waitq)
@atomic :release l.locked_by = t
schedule(t)
end
GC.enable_finalizers()
finally
unlock(c)
end
else
l.reentrancy_cnt = n
end
return
end

const Condition = Base.GenericCondition{FIFOLock}

end
44 changes: 44 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,50 @@ else
end # @static if VERSION < v"1.8"
end

@testset "FIFOLock" begin
@static if VERSION < v"1.10"
@warn "skipping FIFOLock tests since VERSION ($VERSION) < v\"1.10\""
else
fl = FIFOLock()
lock(fl)
tot = 0
ttsks = Task[]
tordr = Int[]
try
# we assume here that the tasks spawned below run in the order
# they were spawned
for i in 1:16
t = Threads.@spawn begin
lock(fl)
try
tot = tot + 1
if tot != i
error("non-atomic access in lock")
end
push!(tordr, i)
finally
unlock(fl)
end
end
push!(ttsks, t)
sleep(0.1)
end
finally
unlock(fl)
end
for t in ttsks
@test try
wait(t)
true
catch
false
end
end
@test tot == 16
@test all([tordr[i] == i for i in 1:16])
end # @static if VERSION < v"1.10"
end

# track all workers every created
ALL_WORKERS = []
ConcurrentUtilities.Workers.GLOBAL_CALLBACK_PER_WORKER[] = w -> push!(ALL_WORKERS, w)
Expand Down

0 comments on commit a96a787

Please sign in to comment.