Skip to content

Commit

Permalink
runtime: reimplement atomics
Browse files Browse the repository at this point in the history
  • Loading branch information
mertcandav committed Dec 6, 2024
1 parent 3f94ad6 commit f981aca
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 72 deletions.
30 changes: 19 additions & 11 deletions std/runtime/atomic.jule
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,35 @@ cpp unsafe fn __atomic_compare_exchange(*unsafe, *unsafe, *unsafe, int, atomicMe
#cdef
cpp unsafe fn __atomic_fetch_add[T](*unsafe, T, atomicMemoryOrder): T

unsafe fn atomicSwap[T](mut p: *T, new: *T, mo: atomicMemoryOrder): (old: T) {
// Atomically stores new value to p with memory order mo, and returns the old value.
fn atomicSwap[T](mut &p: T, new: T, mo: atomicMemoryOrder): (old: T) {
let mut tmp: T
cpp.__atomic_exchange[T](p, new, &tmp, mo)
unsafe { cpp.__atomic_exchange[T](&p, &new, &tmp, mo) }
ret tmp
}

unsafe fn atomicLoad[T](p: *T, mo: atomicMemoryOrder): T {
// Atomically reads value of the p with memory order mo and returns.
fn atomicLoad[T](&p: T, mo: atomicMemoryOrder): T {
let mut tmp: T
cpp.__atomic_load(p, &tmp, mo)
unsafe { cpp.__atomic_load(&p, &tmp, mo) }
ret tmp
}

unsafe fn atomicCompareSwap[T](mut p: *T, old: *T, new: *T, suc: atomicMemoryOrder, fail: atomicMemoryOrder): (swapped: bool) {
const Magic = 0x0
ret cpp.__atomic_compare_exchange(p, old, new, Magic, suc, fail)
// Atomically reads value of the p and compares it with old.
// If comparison results with true, it atomically stores new value into p.
// In every case, it will use memory order mo.
// Reports whether swap succeeded.
fn atomicCompareAndSwap[T](mut &p: T, old: T, new: T, mo: atomicMemoryOrder): (swapped: bool) {
const Strong = 0 // strong atomicity
ret unsafe { cpp.__atomic_compare_exchange(&p, &old, &new, Strong, mo, mo) }
}

unsafe fn atomicAdd[T](mut p: *T, delta: T, mo: atomicMemoryOrder): (old: T) {
ret cpp.__atomic_fetch_add[T](p, delta, mo)
// Atomically adds delta to p with memory order mo and returns the new value.
fn atomicAdd[T](mut &p: T, delta: T, mo: atomicMemoryOrder): (new: T) {
ret unsafe { cpp.__atomic_fetch_add[T](&p, delta, mo) } + delta
}

unsafe fn atomicStore[T](mut p: *T, val: *T, mo: atomicMemoryOrder) {
cpp.__atomic_store(p, val, mo)
// Atomically stores new value to p with memory order mo.
fn atomicStore[T](mut &p: T, val: T, mo: atomicMemoryOrder) {
unsafe { cpp.__atomic_store(&p, &val, mo) }
}
32 changes: 10 additions & 22 deletions std/runtime/lock.jule
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,14 @@ impl fmutex {
}

fn unlock(self) {
old := unsafe { atomicAdd(&self.state, -mutexLocked, atomicSeqCst) }
if old == 0 {
new := atomicAdd(self.state, -mutexLocked, atomicSeqCst)
if new != 0 {
panic("runtime: mutex: unlock of unlocked mutex")
}
}

fn tryLock(self): bool {
locked := i32(mutexLocked)
unlocked := i32(0)
ret unsafe { atomicCompareSwap(&self.state, &unlocked, &locked, atomicSeqCst, atomicSeqCst) }
ret atomicCompareAndSwap(self.state, 0, mutexLocked, atomicSeqCst)
}
}

Expand Down Expand Up @@ -81,9 +79,7 @@ impl mutex {
// See the [sync::Mutex.Lock] for documentation.
fn lock(self) {
// Fast path: grab unlocked mutex.
old := i32(0)
new := i32(mutexLocked)
if unsafe { atomicCompareSwap(&self.state, &old, &new, atomicSeqCst, atomicSeqCst) } {
if atomicCompareAndSwap(self.state, 0, mutexLocked, atomicSeqCst) {
ret
}
// Slow path (outlined so that the fast path may be inlined)
Expand All @@ -104,9 +100,7 @@ impl mutex {
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked threads.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 {
old2 := old
new := old | mutexWoken
awoke = unsafe { atomicCompareSwap(&self.state, &old2, &new, atomicSeqCst, atomicSeqCst) }
awoke = atomicCompareAndSwap(self.state, old, old|mutexWoken, atomicSeqCst)
}
iter <<= 1
old = self.state
Expand Down Expand Up @@ -135,9 +129,7 @@ impl mutex {
}
new &= ^mutexWoken
}
old2 := old
new2 := new
if unsafe { atomicCompareSwap(&self.state, &old2, &new2, atomicSeqCst, atomicSeqCst) } {
if atomicCompareAndSwap(self.state, old, new, atomicSeqCst) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
Expand Down Expand Up @@ -166,7 +158,7 @@ impl mutex {
// to starvation mode.
delta -= mutexStarving
}
unsafe { atomicAdd(&self.state, delta, atomicSeqCst) }
atomicAdd(self.state, delta, atomicSeqCst)
break
}
awoke = true
Expand All @@ -188,16 +180,14 @@ impl mutex {
// There may be a thread waiting for the mutex, but we are
// running now and can try to grab the mutex before that
// thread wakes up.
new := old | mutexLocked
ret unsafe { atomicCompareSwap(&self.state, &old, &new, atomicSeqCst, atomicSeqCst) }
ret atomicCompareAndSwap(self.state, old, old|mutexLocked, atomicSeqCst)
}

// Unlocks the mutex.
// See the [sync::Mutex.Unlock] for documentation.
fn unlock(self) {
// Fast path: drop lock bit.
mut new := unsafe { atomicAdd(&self.state, -mutexLocked, atomicSeqCst) }
new += -mutexLocked
new := atomicAdd(self.state, -mutexLocked, atomicSeqCst)
if new != 0 {
// Outlined slow path to may allow inlining the fast path.
self.unlockSlow(new)
Expand All @@ -223,9 +213,7 @@ impl mutex {
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
{
old2 := old
new2 := new
if unsafe { atomicCompareSwap(&self.state, &old2, &new2, atomicSeqCst, atomicSeqCst) } {
if atomicCompareAndSwap(self.state, old, new, atomicSeqCst) {
semrelease(self.sema, false)
ret
}
Expand Down
16 changes: 8 additions & 8 deletions std/runtime/rc.jule
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const _RCMo = atomicSeqCst
fn _RCNew(): _RCPtr {
const Bits = comptime::TypeOf(_RCType).Bits()
const BitsPerByte = 8
mut p := unsafe { (_RCPtr)(cpp.malloc(Bits / BitsPerByte)) }
mut p := unsafe { _RCPtr(cpp.malloc(Bits / BitsPerByte)) }
if p == nil {
panic("runtime: memory allocation failed for reference counting data")
}
Expand All @@ -44,7 +44,7 @@ unsafe fn _RCLoad(p: _RCPtr): _RCType {
// Same as _RCLoad but have thread-safe implementation.
#export "__jule_RCLoadAtomic"
unsafe fn _RCLoadAtomic(p: _RCPtr): _RCType {
ret atomicLoad[_RCType](p, _RCMo)
ret atomicLoad[_RCType](*p, _RCMo)
}

// Adds strong reference to reference pointer.
Expand All @@ -61,7 +61,7 @@ unsafe fn _RCAdd(mut p: _RCPtr) {
// Same as _RCAdd but have thread-safe implementation.
#export "__jule_RCAddAtomic"
unsafe fn _RCAddAtomic(mut p: _RCPtr) {
atomicAdd[_RCType](p, RCDelta, _RCMo)
atomicAdd[_RCType](*p, RCDelta, _RCMo)
}

// Drops strong reference from reference pointer.
Expand All @@ -73,18 +73,18 @@ unsafe fn _RCAddAtomic(mut p: _RCPtr) {
// See memory model of concurrency.
#export "__jule_RCDrop"
unsafe fn _RCDrop(mut p: _RCPtr): bool {
// If old data equals to delta, means references zeroed.
alive := *p > RCDelta
// If new data equals to zero, means references zeroed.
alive := *p >= RCDelta
*p -= RCDelta
ret alive
}

// Same as _RCDrop but have thread-safe implementation.
#export "__jule_RCDropAtomic"
unsafe fn _RCDropAtomic(mut p: _RCPtr): bool {
// The atomicAdd function returns old data of pointer.
// So if old data equals to delta, means references zeroed.
ret atomicAdd[_RCType](p, -RCDelta, _RCMo) > RCDelta
// The atomicAdd function returns new data of pointer.
// So if new data equals to zero, means references zeroed.
ret atomicAdd[_RCType](*p, -RCDelta, _RCMo) >= RCDelta
}

// Deallocates reference counting data allocation.
Expand Down
23 changes: 11 additions & 12 deletions std/runtime/sema.jule
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,13 @@ fn semtable_rootFor(&sema: u32): &semaRoot {
ret unsafe { (&semaRoot)(&semtable[(uintptr(&sema)>>3)%semTabSize].root) }
}

fn cansemacquire(&sema: u32): bool {
fn cansemacquire(mut &sema: u32): bool {
for {
v := unsafe { atomicLoad(&sema, atomicSeqCst) }
v := atomicLoad(sema, atomicSeqCst)
if v == 0 {
ret false
}
new := v - 1
if unsafe { atomicCompareSwap(&sema, &v, &new, atomicSeqCst, atomicSeqCst) } {
if atomicCompareAndSwap(sema, v, v-1, atomicSeqCst) {
ret true
}
}
Expand All @@ -132,7 +131,7 @@ fn semapark(&lock: fmutex, &deq: bool) {
// It is intended as a simple sleep primitive for use by the synchronization
// library and should not be used directly.
// If lifo is true, queue waiter at the head of wait queue.
fn semacquire(&sema: u32, lifo: bool) {
fn semacquire(mut &sema: u32, lifo: bool) {
// Easy case.
if cansemacquire(sema) {
ret
Expand All @@ -149,10 +148,10 @@ fn semacquire(&sema: u32, lifo: bool) {
for {
root.lock.lock()
// Add ourselves to nwait to disable "easy case" in semrelease.
unsafe { atomicAdd(&root.nwait, 1, atomicSeqCst) }
atomicAdd(root.nwait, 1, atomicSeqCst)
// Check cansemacquire to avoid missed wakeup.
if cansemacquire(sema) {
unsafe { atomicAdd(&root.nwait, ^u32(0), atomicSeqCst) }
atomicAdd(root.nwait, ^u32(0), atomicSeqCst)
root.lock.unlock()
break
}
Expand All @@ -171,20 +170,20 @@ fn semacquire(&sema: u32, lifo: bool) {
// It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly.
// If handoff is true, pass count directly to the first waiter.
fn semrelease(&sema: u32, handoff: bool) {
fn semrelease(mut &sema: u32, handoff: bool) {
mut root := semtable_rootFor(sema)
unsafe { atomicAdd(&sema, 1, atomicSeqCst) }
atomicAdd(sema, 1, atomicSeqCst)

// Easy case: no waiters?
// This check must happen after the atomicAdd, to avoid a missed wakeup
// (see loop in semacquire).
if unsafe { atomicLoad(&root.nwait, atomicSeqCst) } == 0 {
if atomicLoad(root.nwait, atomicSeqCst) == 0 {
ret
}

// Harder case: search for a waiter and wake it.
root.lock.lock()
if unsafe { atomicLoad(&root.nwait, atomicSeqCst) } == 0 {
if atomicLoad(root.nwait, atomicSeqCst) == 0 {
// The count is already consumed by another thread,
// so no need to wake up another thread.
root.lock.unlock()
Expand All @@ -193,7 +192,7 @@ fn semrelease(&sema: u32, handoff: bool) {

mut sl := root.dequeue(sema)
if sl != nil {
unsafe { atomicAdd(&root.nwait, ^u32(0), atomicSeqCst) }
atomicAdd(root.nwait, ^u32(0), atomicSeqCst)
}
root.lock.unlock()
if sl != nil {
Expand Down
32 changes: 16 additions & 16 deletions std/sync/atomic/atomic.jule
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,27 @@ struct number[T] {
impl number {
// Atomically stores new value and returns the previous value.
fn Swap(mut self, new: T, order: memoryOrder): (old: T) {
ret unsafe { runtime::atomicSwap[T](&self.n, &new, order) }
ret runtime::atomicSwap[T](self.n, new, order)
}

// Executes the compare-and-swap operation.
fn CompareSwap(mut self, old: T, new: T, order: memoryOrder): (swapped: bool) {
ret unsafe { runtime::atomicCompareSwap[T](&self.n, &old, &new, order, order) }
fn CompareAndSwap(mut self, old: T, new: T, order: memoryOrder): (swapped: bool) {
ret runtime::atomicCompareAndSwap[T](self.n, old, new, order)
}

// Atomically adds delta to value and returns the previous value.
fn Add(mut self, delta: T, order: memoryOrder): (old: T) {
ret unsafe { runtime::atomicAdd[T](&self.n, delta, order) }
// Atomically adds delta to value and returns the new value.
fn Add(mut self, delta: T, order: memoryOrder): (new: T) {
ret runtime::atomicAdd[T](self.n, delta, order)
}

// Atomically reads and returns value.
fn Load(self, order: memoryOrder): T {
ret unsafe { runtime::atomicLoad[T](&self.n, order) }
ret runtime::atomicLoad[T](self.n, order)
}

// Atomically assigns to value.
fn Store(mut self, val: T, order: memoryOrder) {
unsafe { runtime::atomicStore[T](&self.n, &val, order) }
runtime::atomicStore[T](self.n, val, order)
}
}

Expand Down Expand Up @@ -110,29 +110,29 @@ type Uintptr = number[uintptr]
// Atomically stores new into addr and returns the previous addr value.
// Only integer types are supported.
fn Swap[T: int | uint | i8 | i16 | i32 | i64 | u8 | u16 | u32 | u64 | uintptr](mut &addr: T, new: T, order: memoryOrder): (old: T) {
ret unsafe { runtime::atomicSwap[T](&addr, &new, order) }
ret runtime::atomicSwap[T](addr, new, order)
}

// Executes the compare-and-swap operation for value.
// Only integer types are supported.
fn CompareSwap[T: int | uint | i8 | i16 | i32 | i64 | u8 | u16 | u32 | u64 | uintptr](mut &addr: T, old: T, new: T, order: memoryOrder): (swapped: bool) {
ret unsafe { runtime::atomicCompareSwap[T](&addr, &old, &new, order, order) }
fn CompareAndSwap[T: int | uint | i8 | i16 | i32 | i64 | u8 | u16 | u32 | u64 | uintptr](mut &addr: T, old: T, new: T, order: memoryOrder): (swapped: bool) {
ret runtime::atomicCompareAndSwap[T](addr, old, new, order)
}

// Atomically adds delta to addr and returns the previous addr value.
// Atomically adds delta to addr and returns the new addr value.
// Only integer types are supported.
fn Add[T: int | uint | i8 | i16 | i32 | i64 | u8 | u16 | u32 | u64 | uintptr](mut &addr: T, delta: T, order: memoryOrder): (old: T) {
ret unsafe { runtime::atomicAdd[T](&addr, delta, order) }
fn Add[T: int | uint | i8 | i16 | i32 | i64 | u8 | u16 | u32 | u64 | uintptr](mut &addr: T, delta: T, order: memoryOrder): (new: T) {
ret runtime::atomicAdd[T](addr, delta, order)
}

// Atomically loads addr.
// Only integer types are supported.
fn Load[T: int | uint | i8 | i16 | i32 | i64 | u8 | u16 | u32 | u64 | uintptr](&addr: T, order: memoryOrder): T {
ret unsafe { runtime::atomicLoad[T](&addr, order) }
ret runtime::atomicLoad[T](addr, order)
}

// Atomically stores val into addr.
// Only integer types are supported.
fn Store[T: int | uint | i8 | i16 | i32 | i64 | u8 | u16 | u32 | u64 | uintptr](mut &addr: T, val: T, order: memoryOrder) {
unsafe { runtime::atomicStore[T](&addr, &val, order) }
runtime::atomicStore[T](addr, val, order)
}
5 changes: 2 additions & 3 deletions std/sync/waitgroup.jule
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ impl WaitGroup {
// and unblocks any wait() calls if task count becomes zero.
// Panics if task count reaches below zero.
fn Add(mut self, delta: int) {
oldTask := int(self.taskN.Add(u32(delta), atomic::SeqCst))
nTask := oldTask + delta
nTask := int(self.taskN.Add(u32(delta), atomic::SeqCst))
if nTask < 0 {
panic("sync: WaitGroup.Add: negative number of tasks")
}
Expand All @@ -47,7 +46,7 @@ impl WaitGroup {
ret
}

if self.waitN.CompareSwap(nWaiters, 0, atomic::SeqCst) {
if self.waitN.CompareAndSwap(nWaiters, 0, atomic::SeqCst) {
ret
}
}
Expand Down

0 comments on commit f981aca

Please sign in to comment.