diff --git a/src/concurrency/sync.rs b/src/concurrency/sync.rs index 14c72e9398..268268848e 100644 --- a/src/concurrency/sync.rs +++ b/src/concurrency/sync.rs @@ -128,7 +128,7 @@ struct Condvar { /// The futex state. #[derive(Default, Debug)] struct Futex { - waiters: VecDeque, + waiters: Vec, /// Tracks the happens-before relationship /// between a futex-wake and a futex-wait /// during a non-spurious wake event. @@ -140,6 +140,12 @@ struct Futex { #[derive(Default, Clone)] pub struct FutexRef(Rc>); +impl FutexRef { + pub fn waiters(&self) -> usize { + self.0.borrow().waiters.len() + } +} + impl VisitProvenance for FutexRef { fn visit_provenance(&self, _visit: &mut VisitWith<'_>) { // No provenance in `Futex`. @@ -728,25 +734,21 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { interp_ok(true) } - /// Wait for the futex to be signaled, or a timeout. - /// On a signal, `retval_succ` is written to `dest`. - /// On a timeout, `retval_timeout` is written to `dest` and `errno_timeout` is set as the last error. + /// Wait for the futex to be signaled, or a timeout. Once the thread is + /// unblocked, `callback` is called with the unblock reason. fn futex_wait( &mut self, futex_ref: FutexRef, bitset: u32, timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>, - retval_succ: Scalar, - retval_timeout: Scalar, - dest: MPlaceTy<'tcx>, - errno_timeout: IoError, + callback: DynUnblockCallback<'tcx>, ) { let this = self.eval_context_mut(); let thread = this.active_thread(); let mut futex = futex_ref.0.borrow_mut(); let waiters = &mut futex.waiters; assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting"); - waiters.push_back(FutexWaiter { thread, bitset }); + waiters.push(FutexWaiter { thread, bitset }); drop(futex); this.block_thread( @@ -755,10 +757,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { callback!( @capture<'tcx> { futex_ref: FutexRef, - retval_succ: Scalar, - retval_timeout: Scalar, - dest: MPlaceTy<'tcx>, - errno_timeout: IoError, + callback: DynUnblockCallback<'tcx>, } |this, unblock: UnblockKind| { match unblock { @@ -768,29 +767,29 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { if let Some(data_race) = &this.machine.data_race { data_race.acquire_clock(&futex.clock, &this.machine.threads); } - // Write the return value. - this.write_scalar(retval_succ, &dest)?; - interp_ok(()) }, UnblockKind::TimedOut => { // Remove the waiter from the futex. let thread = this.active_thread(); let mut futex = futex_ref.0.borrow_mut(); futex.waiters.retain(|waiter| waiter.thread != thread); - // Set errno and write return value. - this.set_last_error(errno_timeout)?; - this.write_scalar(retval_timeout, &dest)?; - interp_ok(()) }, } + + callback.call(this, unblock) } ), ); } - /// Wake up the first thread in the queue that matches any of the bits in the bitset. - /// Returns whether anything was woken. - fn futex_wake(&mut self, futex_ref: &FutexRef, bitset: u32) -> InterpResult<'tcx, bool> { + /// Wake up `count` of the threads in the queue that match any of the bits + /// in the bitset. Returns how many threads were woken. + fn futex_wake( + &mut self, + futex_ref: &FutexRef, + bitset: u32, + count: usize, + ) -> InterpResult<'tcx, usize> { let this = self.eval_context_mut(); let mut futex = futex_ref.0.borrow_mut(); let data_race = &this.machine.data_race; @@ -800,13 +799,18 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { data_race.release_clock(&this.machine.threads, |clock| futex.clock.clone_from(clock)); } - // Wake up the first thread in the queue that matches any of the bits in the bitset. - let Some(i) = futex.waiters.iter().position(|w| w.bitset & bitset != 0) else { - return interp_ok(false); - }; - let waiter = futex.waiters.remove(i).unwrap(); + // Remove `count` of the threads in the queue that match any of the bits in the bitset. + // We collect all of them before unblocking because the unblock callback may access the + // futex state to retrieve the remaining number of waiters on macOS. + let waiters: Vec<_> = + futex.waiters.extract_if(.., |w| w.bitset & bitset != 0).take(count).collect(); drop(futex); - this.unblock_thread(waiter.thread, BlockReason::Futex)?; - interp_ok(true) + + let woken = waiters.len(); + for waiter in waiters { + this.unblock_thread(waiter.thread, BlockReason::Futex)?; + } + + interp_ok(woken) } } diff --git a/src/lib.rs b/src/lib.rs index 5d3204a527..45054c37c4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,8 @@ #![feature(unqualified_local_imports)] #![feature(derive_coerce_pointee)] #![feature(arbitrary_self_types)] +#![feature(unsigned_is_multiple_of)] +#![feature(extract_if)] // Configure clippy and other lints #![allow( clippy::collapsible_else_if, diff --git a/src/shims/unix/linux_like/sync.rs b/src/shims/unix/linux_like/sync.rs index 8369811336..280bee4800 100644 --- a/src/shims/unix/linux_like/sync.rs +++ b/src/shims/unix/linux_like/sync.rs @@ -158,14 +158,24 @@ pub fn futex<'tcx>( .futex .clone(); + let dest = dest.clone(); ecx.futex_wait( futex_ref, bitset, timeout, - Scalar::from_target_isize(0, ecx), // retval_succ - Scalar::from_target_isize(-1, ecx), // retval_timeout - dest.clone(), - LibcError("ETIMEDOUT"), // errno_timeout + callback!( + @capture<'tcx> { + dest: MPlaceTy<'tcx>, + } + |ecx, unblock: UnblockKind| match unblock { + UnblockKind::Ready => { + ecx.write_int(0, &dest) + } + UnblockKind::TimedOut => { + ecx.set_last_error_and_return(LibcError("ETIMEDOUT"), &dest) + } + } + ), ); } else { // The futex value doesn't match the expected value, so we return failure @@ -209,16 +219,8 @@ pub fn futex<'tcx>( // will see the latest value on addr which could be changed by our caller // before doing the syscall. ecx.atomic_fence(AtomicFenceOrd::SeqCst)?; - let mut n = 0; - #[expect(clippy::arithmetic_side_effects)] - for _ in 0..val { - if ecx.futex_wake(&futex_ref, bitset)? { - n += 1; - } else { - break; - } - } - ecx.write_scalar(Scalar::from_target_isize(n, ecx), dest)?; + let woken = ecx.futex_wake(&futex_ref, bitset, val.try_into().unwrap())?; + ecx.write_scalar(Scalar::from_target_isize(woken.try_into().unwrap(), ecx), dest)?; } op => throw_unsup_format!("Miri does not support `futex` syscall with op={}", op), } diff --git a/src/shims/unix/macos/foreign_items.rs b/src/shims/unix/macos/foreign_items.rs index 37fb6c1919..918fd8dd52 100644 --- a/src/shims/unix/macos/foreign_items.rs +++ b/src/shims/unix/macos/foreign_items.rs @@ -2,12 +2,20 @@ use rustc_middle::ty::Ty; use rustc_span::Symbol; use rustc_target::callconv::{Conv, FnAbi}; -use super::sync::EvalContextExt as _; +use super::sync::{EvalContextExt as _, MacOsFutexTimeout}; use crate::shims::unix::*; use crate::*; -pub fn is_dyn_sym(_name: &str) -> bool { - false +pub fn is_dyn_sym(name: &str) -> bool { + match name { + // These only became available with macOS 11.0, so std looks them up dynamically. + "os_sync_wait_on_address" + | "os_sync_wait_on_address_with_deadline" + | "os_sync_wait_on_address_with_timeout" + | "os_sync_wake_by_address_any" + | "os_sync_wake_by_address_all" => true, + _ => false, + } } impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {} @@ -214,6 +222,58 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { this.write_scalar(res, dest)?; } + // Futex primitives + "os_sync_wait_on_address" => { + let [addr_op, value_op, size_op, flags_op] = + this.check_shim(abi, Conv::C, link_name, args)?; + this.os_sync_wait_on_address( + addr_op, + value_op, + size_op, + flags_op, + MacOsFutexTimeout::None, + dest, + )?; + } + "os_sync_wait_on_address_with_deadline" => { + let [addr_op, value_op, size_op, flags_op, clock_op, timeout_op] = + this.check_shim(abi, Conv::C, link_name, args)?; + this.os_sync_wait_on_address( + addr_op, + value_op, + size_op, + flags_op, + MacOsFutexTimeout::Absolute { clock_op, timeout_op }, + dest, + )?; + } + "os_sync_wait_on_address_with_timeout" => { + let [addr_op, value_op, size_op, flags_op, clock_op, timeout_op] = + this.check_shim(abi, Conv::C, link_name, args)?; + this.os_sync_wait_on_address( + addr_op, + value_op, + size_op, + flags_op, + MacOsFutexTimeout::Relative { clock_op, timeout_op }, + dest, + )?; + } + "os_sync_wake_by_address_any" => { + let [addr_op, size_op, flags_op] = + this.check_shim(abi, Conv::C, link_name, args)?; + this.os_sync_wake_by_address( + addr_op, size_op, flags_op, /* all */ false, dest, + )?; + } + "os_sync_wake_by_address_all" => { + let [addr_op, size_op, flags_op] = + this.check_shim(abi, Conv::C, link_name, args)?; + this.os_sync_wake_by_address( + addr_op, size_op, flags_op, /* all */ true, dest, + )?; + } + "os_unfair_lock_lock" => { let [lock_op] = this.check_shim(abi, Conv::C, link_name, args)?; this.os_unfair_lock_lock(lock_op)?; diff --git a/src/shims/unix/macos/sync.rs b/src/shims/unix/macos/sync.rs index 5442d38d52..3c4ade59ae 100644 --- a/src/shims/unix/macos/sync.rs +++ b/src/shims/unix/macos/sync.rs @@ -10,8 +10,12 @@ //! and we do not detect copying of the lock, but macOS doesn't guarantee anything //! in that case either. +use std::cell::Cell; +use std::time::Duration; + use rustc_abi::Size; +use crate::concurrency::sync::FutexRef; use crate::*; #[derive(Clone)] @@ -20,6 +24,26 @@ enum MacOsUnfairLock { Active { mutex_ref: MutexRef }, } +pub enum MacOsFutexTimeout<'a, 'tcx> { + None, + Relative { clock_op: &'a OpTy<'tcx>, timeout_op: &'a OpTy<'tcx> }, + Absolute { clock_op: &'a OpTy<'tcx>, timeout_op: &'a OpTy<'tcx> }, +} + +/// Metadata for a macOS futex. +/// +/// Since macOS 11.0, Apple has exposed the previously private futex API consisting +/// of `os_sync_wait_on_address` (and friends) and `os_sync_wake_by_address_{any, all}`. +/// These work with different value sizes and flags, which are validated to be consistent. +/// This structure keeps track of both the futex queue and these values. +struct MacOsFutex { + futex: FutexRef, + /// The size in bytes of the atomic primitive underlying this futex. + size: Cell, + /// Whether the futex is shared across process boundaries. + shared: Cell, +} + impl<'tcx> EvalContextExtPriv<'tcx> for crate::MiriInterpCx<'tcx> {} trait EvalContextExtPriv<'tcx>: crate::MiriInterpCxExt<'tcx> { fn os_unfair_lock_get_data<'a>( @@ -54,6 +78,194 @@ trait EvalContextExtPriv<'tcx>: crate::MiriInterpCxExt<'tcx> { impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {} pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { + /// Implements [`os_sync_wait_on_address`], [`os_sync_wait_on_address_with_deadline`] + /// and [`os_sync_wait_on_address_with_timeout`]. + /// + /// [`os_sync_wait_on_address`]: https://developer.apple.com/documentation/os/os_sync_wait_on_address?language=objc + /// [`os_sync_wait_on_address_with_deadline`]: https://developer.apple.com/documentation/os/os_sync_wait_on_address_with_deadline?language=objc + /// [`os_sync_wait_on_address_with_timeout`]: https://developer.apple.com/documentation/os/os_sync_wait_on_address_with_timeout?language=objc + fn os_sync_wait_on_address( + &mut self, + addr_op: &OpTy<'tcx>, + value_op: &OpTy<'tcx>, + size_op: &OpTy<'tcx>, + flags_op: &OpTy<'tcx>, + timeout: MacOsFutexTimeout<'_, 'tcx>, + dest: &MPlaceTy<'tcx>, + ) -> InterpResult<'tcx> { + let this = self.eval_context_mut(); + let none = this.eval_libc_u32("OS_SYNC_WAIT_ON_ADDRESS_NONE"); + let shared = this.eval_libc_u32("OS_SYNC_WAIT_ON_ADDRESS_SHARED"); + let absolute_clock = this.eval_libc_u32("OS_CLOCK_MACH_ABSOLUTE_TIME"); + + let ptr = this.read_pointer(addr_op)?; + let value = this.read_scalar(value_op)?.to_u64()?; + let size = this.read_target_usize(size_op)?; + let flags = this.read_scalar(flags_op)?.to_u32()?; + + let clock_timeout = match timeout { + MacOsFutexTimeout::None => None, + MacOsFutexTimeout::Relative { clock_op, timeout_op } => { + let clock = this.read_scalar(clock_op)?.to_u32()?; + let timeout = this.read_scalar(timeout_op)?.to_u64()?; + Some((clock, TimeoutAnchor::Relative, timeout)) + } + MacOsFutexTimeout::Absolute { clock_op, timeout_op } => { + let clock = this.read_scalar(clock_op)?.to_u32()?; + let timeout = this.read_scalar(timeout_op)?.to_u64()?; + Some((clock, TimeoutAnchor::Absolute, timeout)) + } + }; + + // Perform validation of the arguments. + let addr = ptr.addr().bytes(); + if addr == 0 + || !matches!(size, 4 | 8) + || !addr.is_multiple_of(size) + || (flags != none && flags != shared) + || clock_timeout + .is_some_and(|(clock, _, timeout)| clock != absolute_clock || timeout == 0) + { + this.set_last_error_and_return(LibcError("EINVAL"), dest)?; + return interp_ok(()); + } + + let is_shared = flags == shared; + let timeout = clock_timeout.map(|(_, anchor, timeout)| { + (TimeoutClock::Monotonic, anchor, Duration::from_nanos(timeout)) + }); + + // See the Linux futex implementation for why this fence exists. + this.atomic_fence(AtomicFenceOrd::SeqCst)?; + + let layout = this.machine.layouts.uint(Size::from_bytes(size)).unwrap(); + let futex_val = this + .read_scalar_atomic(&this.ptr_to_mplace(ptr, layout), AtomicReadOrd::Acquire)? + .to_bits(Size::from_bytes(size))?; + + let futex = this + .get_sync_or_init(ptr, |_| { + MacOsFutex { + futex: Default::default(), + size: Cell::new(size), + shared: Cell::new(is_shared), + } + }) + .unwrap(); + + // Detect mismatches between the flags and sizes used on this address + // by comparing it with the parameters used by the other waiters in + // the current list. If the list is currently empty, update those + // parameters. + if futex.futex.waiters() == 0 { + futex.size.set(size); + futex.shared.set(is_shared); + } else if futex.size.get() != size || futex.shared.get() != is_shared { + this.set_last_error_and_return(LibcError("EINVAL"), dest)?; + return interp_ok(()); + } + + if futex_val == value.into() { + // If the values are the same, we have to block. + let futex_ref = futex.futex.clone(); + let dest = dest.clone(); + this.futex_wait( + futex_ref.clone(), + u32::MAX, // bitset + timeout, + callback!( + @capture<'tcx> { + dest: MPlaceTy<'tcx>, + futex_ref: FutexRef, + } + |this, unblock: UnblockKind| { + match unblock { + UnblockKind::Ready => { + let remaining = futex_ref.waiters().try_into().unwrap(); + this.write_scalar(Scalar::from_i32(remaining), &dest) + } + UnblockKind::TimedOut => { + this.set_last_error_and_return(LibcError("ETIMEDOUT"), &dest) + } + } + } + ), + ); + } else { + // else retrieve the current number of waiters. + let waiters = futex.futex.waiters().try_into().unwrap(); + this.write_scalar(Scalar::from_i32(waiters), dest)?; + } + + interp_ok(()) + } + + /// Implements [`os_sync_wake_by_address_all`] and [`os_sync_wake_by_address_any`]. + /// + /// [`os_sync_wake_by_address_all`]: https://developer.apple.com/documentation/os/os_sync_wake_by_address_all?language=objc + /// [`os_sync_wake_by_address_any`]: https://developer.apple.com/documentation/os/os_sync_wake_by_address_any?language=objc + fn os_sync_wake_by_address( + &mut self, + addr_op: &OpTy<'tcx>, + size_op: &OpTy<'tcx>, + flags_op: &OpTy<'tcx>, + all: bool, + dest: &MPlaceTy<'tcx>, + ) -> InterpResult<'tcx> { + let this = self.eval_context_mut(); + let none = this.eval_libc_u32("OS_SYNC_WAKE_BY_ADDRESS_NONE"); + let shared = this.eval_libc_u32("OS_SYNC_WAKE_BY_ADDRESS_SHARED"); + + let ptr = this.read_pointer(addr_op)?; + let size = this.read_target_usize(size_op)?; + let flags = this.read_scalar(flags_op)?.to_u32()?; + + // Perform validation of the arguments. + let addr = ptr.addr().bytes(); + if addr == 0 || !matches!(size, 4 | 8) || (flags != none && flags != shared) { + this.set_last_error_and_return(LibcError("EINVAL"), dest)?; + return interp_ok(()); + } + + let is_shared = flags == shared; + + let Some(futex) = this.get_sync_or_init(ptr, |_| { + MacOsFutex { + futex: Default::default(), + size: Cell::new(size), + shared: Cell::new(is_shared), + } + }) else { + // No AllocId, or no live allocation at that AllocId. Return an + // error code. (That seems nicer than silently doing something + // non-intuitive.) This means that if an address gets reused by a + // new allocation, we'll use an independent futex queue for this... + // that seems acceptable. + this.set_last_error_and_return(LibcError("ENOENT"), dest)?; + return interp_ok(()); + }; + + if futex.futex.waiters() == 0 { + this.set_last_error_and_return(LibcError("ENOENT"), dest)?; + return interp_ok(()); + // If there are waiters in the queue, they have all used the parameters + // stored in `futex` (we check this in `os_sync_wait_on_address` above). + // Detect mismatches between "our" parameters and the parameters used by + // the waiters and return an error in that case. + } else if futex.size.get() != size || futex.shared.get() != is_shared { + this.set_last_error_and_return(LibcError("EINVAL"), dest)?; + return interp_ok(()); + } + + let futex_ref = futex.futex.clone(); + + // See the Linux futex implementation for why this fence exists. + this.atomic_fence(AtomicFenceOrd::SeqCst)?; + this.futex_wake(&futex_ref, u32::MAX, if all { usize::MAX } else { 1 })?; + this.write_scalar(Scalar::from_i32(0), dest)?; + interp_ok(()) + } + fn os_unfair_lock_lock(&mut self, lock_op: &OpTy<'tcx>) -> InterpResult<'tcx> { let this = self.eval_context_mut(); diff --git a/src/shims/windows/sync.rs b/src/shims/windows/sync.rs index 808a7786eb..8d5ea7db9e 100644 --- a/src/shims/windows/sync.rs +++ b/src/shims/windows/sync.rs @@ -212,14 +212,27 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { .futex .clone(); + let dest = dest.clone(); this.futex_wait( futex_ref, u32::MAX, // bitset timeout, - Scalar::from_i32(1), // retval_succ - Scalar::from_i32(0), // retval_timeout - dest.clone(), - IoError::WindowsError("ERROR_TIMEOUT"), // errno_timeout + callback!( + @capture<'tcx> { + dest: MPlaceTy<'tcx> + } + |this, unblock: UnblockKind| { + match unblock { + UnblockKind::Ready => { + this.write_int(1, &dest) + } + UnblockKind::TimedOut => { + this.set_last_error(IoError::WindowsError("ERROR_TIMEOUT"))?; + this.write_int(0, &dest) + } + } + } + ), ); } @@ -244,7 +257,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { }; let futex_ref = futex_ref.futex.clone(); - this.futex_wake(&futex_ref, u32::MAX)?; + this.futex_wake(&futex_ref, u32::MAX, 1)?; interp_ok(()) } @@ -264,7 +277,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { }; let futex_ref = futex_ref.futex.clone(); - while this.futex_wake(&futex_ref, u32::MAX)? {} + this.futex_wake(&futex_ref, u32::MAX, usize::MAX)?; interp_ok(()) } diff --git a/tests/pass-dep/concurrency/apple-futex.rs b/tests/pass-dep/concurrency/apple-futex.rs new file mode 100644 index 0000000000..969b3cdf72 --- /dev/null +++ b/tests/pass-dep/concurrency/apple-futex.rs @@ -0,0 +1,274 @@ +//@only-target: darwin + +use std::time::{Duration, Instant}; +use std::{io, ptr, thread}; + +fn wake_nobody() { + let futex = 0; + + // Wake 1 waiter. Expect ENOENT woken up, as nobody is waiting. + unsafe { + assert_eq!( + libc::os_sync_wake_by_address_any( + ptr::from_ref(&futex).cast_mut().cast(), + size_of::(), + libc::OS_SYNC_WAKE_BY_ADDRESS_NONE + ), + -1 + ); + assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::ENOENT); + } +} + +fn wake_dangling() { + let futex = Box::new(0); + let ptr = ptr::from_ref(&futex).cast_mut().cast(); + drop(futex); + + // Expect error since this is now "unmapped" memory. + unsafe { + assert_eq!( + libc::os_sync_wake_by_address_any( + ptr, + size_of::(), + libc::OS_SYNC_WAKE_BY_ADDRESS_NONE + ), + -1 + ); + assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::ENOENT); + } +} + +fn wait_wrong_val() { + let futex: i32 = 123; + + // Only wait if the futex value is 456. + unsafe { + assert_eq!( + libc::os_sync_wait_on_address( + ptr::from_ref(&futex).cast_mut().cast(), + 456, + size_of::(), + libc::OS_SYNC_WAIT_ON_ADDRESS_NONE + ), + 0, + ); + } +} + +fn wait_timeout() { + let start = Instant::now(); + + let futex: i32 = 123; + + // Wait for 200ms, with nobody waking us up early. + unsafe { + assert_eq!( + libc::os_sync_wait_on_address_with_timeout( + ptr::from_ref(&futex).cast_mut().cast(), + 123, + size_of::(), + libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, + libc::OS_CLOCK_MACH_ABSOLUTE_TIME, + 200_000_000, + ), + -1, + ); + assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::ETIMEDOUT); + } + + assert!((200..1000).contains(&start.elapsed().as_millis())); +} + +fn wait_absolute_timeout() { + let start = Instant::now(); + + // Get the current monotonic timestamp. + #[allow(deprecated)] + let mut deadline = unsafe { libc::mach_absolute_time() }; + + // Add 200ms. + // What we should be doing here is call `mach_timebase_info` to determine the + // unit used for `deadline`, but we know what Miri returns for that function: + // the unit is nanoseconds. + deadline += 200_000_000; + + let futex: i32 = 123; + + // Wait for 200ms from now, with nobody waking us up early. + unsafe { + assert_eq!( + libc::os_sync_wait_on_address_with_deadline( + ptr::from_ref(&futex).cast_mut().cast(), + 123, + size_of::(), + libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, + libc::OS_CLOCK_MACH_ABSOLUTE_TIME, + deadline, + ), + -1, + ); + assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::ETIMEDOUT); + } + + assert!((200..1000).contains(&start.elapsed().as_millis())); +} + +fn wait_wake() { + let start = Instant::now(); + + static mut FUTEX: i32 = 0; + + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(200)); + unsafe { + assert_eq!( + libc::os_sync_wake_by_address_any( + (&raw const FUTEX).cast_mut().cast(), + size_of::(), + libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, + ), + 0, + ); + } + }); + + unsafe { + assert_eq!( + libc::os_sync_wait_on_address( + (&raw const FUTEX).cast_mut().cast(), + 0, + size_of::(), + libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, + ), + 0, + ); + } + + // When running this in stress-gc mode, things can take quite long. + // So the timeout is 3000 ms. + assert!((200..3000).contains(&start.elapsed().as_millis())); + t.join().unwrap(); +} + +fn wait_wake_multiple() { + let val = 0i32; + let futex = &val; + + thread::scope(|s| { + // Spawn some threads and make them wait on the futex. + for i in 0..4 { + s.spawn(move || unsafe { + assert_eq!( + libc::os_sync_wait_on_address( + ptr::from_ref(futex).cast_mut().cast(), + 0, + size_of::(), + libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, + ), + // The last two threads will be woken at the same time, + // but for the first two threads the remaining number + // of waiters should be strictly decreasing. + if i < 2 { 3 - i } else { 0 }, + ); + }); + + thread::sleep(Duration::from_millis(200)); + } + + // Wake the threads up again. + unsafe { + assert_eq!( + libc::os_sync_wake_by_address_any( + ptr::from_ref(futex).cast_mut().cast(), + size_of::(), + libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, + ), + 0 + ); + + assert_eq!( + libc::os_sync_wake_by_address_any( + ptr::from_ref(futex).cast_mut().cast(), + size_of::(), + libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, + ), + 0 + ); + + // Wake both remaining threads at the same time. + assert_eq!( + libc::os_sync_wake_by_address_all( + ptr::from_ref(futex).cast_mut().cast(), + size_of::(), + libc::OS_SYNC_WAKE_BY_ADDRESS_NONE, + ), + 0 + ); + } + }) +} + +fn param_mismatch() { + let futex = 0; + thread::scope(|s| { + s.spawn(|| { + unsafe { + assert_eq!( + libc::os_sync_wait_on_address_with_timeout( + ptr::from_ref(&futex).cast_mut().cast(), + 0, + size_of::(), + libc::OS_SYNC_WAIT_ON_ADDRESS_NONE, + libc::OS_CLOCK_MACH_ABSOLUTE_TIME, + 400_000_000, + ), + -1, + ); + assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::ETIMEDOUT); + } + }); + + s.spawn(|| { + thread::sleep(Duration::from_millis(200)); + unsafe { + assert_eq!( + libc::os_sync_wait_on_address( + ptr::from_ref(&futex).cast_mut().cast(), + 0, + size_of::(), + libc::OS_SYNC_WAIT_ON_ADDRESS_SHARED, + ), + -1, + ); + // This call fails because it uses the shared flag whereas the first waiter didn't. + assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::EINVAL); + } + }); + + thread::sleep(Duration::from_millis(200)); + unsafe { + assert_eq!( + libc::os_sync_wake_by_address_any( + ptr::from_ref(&futex).cast_mut().cast(), + size_of::(), + libc::OS_SYNC_WAIT_ON_ADDRESS_SHARED, + ), + -1, + ); + // This call fails because it uses the shared flag whereas the waiter didn't. + assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::EINVAL); + } + }); +} + +fn main() { + wake_nobody(); + wake_dangling(); + wait_wrong_val(); + wait_timeout(); + wait_absolute_timeout(); + wait_wake(); + wait_wake_multiple(); + param_mismatch(); +}