std/sys/pal/unix/
futex.rs

1#![cfg(any(
2    target_os = "linux",
3    target_os = "android",
4    all(target_os = "emscripten", target_feature = "atomics"),
5    target_os = "freebsd",
6    target_os = "openbsd",
7    target_os = "dragonfly",
8    target_os = "fuchsia",
9))]
10
11use crate::sync::atomic::Atomic;
12use crate::time::Duration;
13
14/// An atomic for use as a futex that is at least 32-bits but may be larger
15pub type Futex = Atomic<Primitive>;
16/// Must be the underlying type of Futex
17pub type Primitive = u32;
18
19/// An atomic for use as a futex that is at least 8-bits but may be larger.
20pub type SmallFutex = Atomic<SmallPrimitive>;
21/// Must be the underlying type of SmallFutex
22pub type SmallPrimitive = u32;
23
24/// Waits for a `futex_wake` operation to wake us.
25///
26/// Returns directly if the futex doesn't hold the expected value.
27///
28/// Returns false on timeout, and true in all other cases.
29#[cfg(any(target_os = "linux", target_os = "android", target_os = "freebsd"))]
30pub fn futex_wait(futex: &Atomic<u32>, expected: u32, timeout: Option<Duration>) -> bool {
31    use super::time::Timespec;
32    use crate::ptr::null;
33    use crate::sync::atomic::Ordering::Relaxed;
34
35    // Calculate the timeout as an absolute timespec.
36    //
37    // Overflows are rounded up to an infinite timeout (None).
38    let timespec = timeout
39        .and_then(|d| Timespec::now(libc::CLOCK_MONOTONIC).checked_add_duration(&d))
40        .and_then(|t| t.to_timespec());
41
42    loop {
43        // No need to wait if the value already changed.
44        if futex.load(Relaxed) != expected {
45            return true;
46        }
47
48        let r = unsafe {
49            cfg_if::cfg_if! {
50                if #[cfg(target_os = "freebsd")] {
51                    // FreeBSD doesn't have futex(), but it has
52                    // _umtx_op(UMTX_OP_WAIT_UINT_PRIVATE), which is nearly
53                    // identical. It supports absolute timeouts through a flag
54                    // in the _umtx_time struct.
55                    let umtx_timeout = timespec.map(|t| libc::_umtx_time {
56                        _timeout: t,
57                        _flags: libc::UMTX_ABSTIME,
58                        _clockid: libc::CLOCK_MONOTONIC as u32,
59                    });
60                    let umtx_timeout_ptr = umtx_timeout.as_ref().map_or(null(), |t| t as *const _);
61                    let umtx_timeout_size = umtx_timeout.as_ref().map_or(0, |t| size_of_val(t));
62                    libc::_umtx_op(
63                        futex as *const Atomic<u32> as *mut _,
64                        libc::UMTX_OP_WAIT_UINT_PRIVATE,
65                        expected as libc::c_ulong,
66                        crate::ptr::without_provenance_mut(umtx_timeout_size),
67                        umtx_timeout_ptr as *mut _,
68                    )
69                } else if #[cfg(any(target_os = "linux", target_os = "android"))] {
70                    // Use FUTEX_WAIT_BITSET rather than FUTEX_WAIT to be able to give an
71                    // absolute time rather than a relative time.
72                    libc::syscall(
73                        libc::SYS_futex,
74                        futex as *const Atomic<u32>,
75                        libc::FUTEX_WAIT_BITSET | libc::FUTEX_PRIVATE_FLAG,
76                        expected,
77                        timespec.as_ref().map_or(null(), |t| t as *const libc::timespec),
78                        null::<u32>(), // This argument is unused for FUTEX_WAIT_BITSET.
79                        !0u32,         // A full bitmask, to make it behave like a regular FUTEX_WAIT.
80                    )
81                } else {
82                    compile_error!("unknown target_os");
83                }
84            }
85        };
86
87        match (r < 0).then(super::os::errno) {
88            Some(libc::ETIMEDOUT) => return false,
89            Some(libc::EINTR) => continue,
90            _ => return true,
91        }
92    }
93}
94
95/// Wakes up one thread that's blocked on `futex_wait` on this futex.
96///
97/// Returns true if this actually woke up such a thread,
98/// or false if no thread was waiting on this futex.
99///
100/// On some platforms, this always returns false.
101#[cfg(any(target_os = "linux", target_os = "android"))]
102pub fn futex_wake(futex: &Atomic<u32>) -> bool {
103    let ptr = futex as *const Atomic<u32>;
104    let op = libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG;
105    unsafe { libc::syscall(libc::SYS_futex, ptr, op, 1) > 0 }
106}
107
108/// Wakes up all threads that are waiting on `futex_wait` on this futex.
109#[cfg(any(target_os = "linux", target_os = "android"))]
110pub fn futex_wake_all(futex: &Atomic<u32>) {
111    let ptr = futex as *const Atomic<u32>;
112    let op = libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG;
113    unsafe {
114        libc::syscall(libc::SYS_futex, ptr, op, i32::MAX);
115    }
116}
117
118// FreeBSD doesn't tell us how many threads are woken up, so this always returns false.
119#[cfg(target_os = "freebsd")]
120pub fn futex_wake(futex: &Atomic<u32>) -> bool {
121    use crate::ptr::null_mut;
122    unsafe {
123        libc::_umtx_op(
124            futex as *const Atomic<u32> as *mut _,
125            libc::UMTX_OP_WAKE_PRIVATE,
126            1,
127            null_mut(),
128            null_mut(),
129        )
130    };
131    false
132}
133
134#[cfg(target_os = "freebsd")]
135pub fn futex_wake_all(futex: &Atomic<u32>) {
136    use crate::ptr::null_mut;
137    unsafe {
138        libc::_umtx_op(
139            futex as *const Atomic<u32> as *mut _,
140            libc::UMTX_OP_WAKE_PRIVATE,
141            i32::MAX as libc::c_ulong,
142            null_mut(),
143            null_mut(),
144        )
145    };
146}
147
148#[cfg(target_os = "openbsd")]
149pub fn futex_wait(futex: &Atomic<u32>, expected: u32, timeout: Option<Duration>) -> bool {
150    use super::time::Timespec;
151    use crate::ptr::{null, null_mut};
152
153    // Overflows are rounded up to an infinite timeout (None).
154    let timespec = timeout
155        .and_then(|d| Timespec::zero().checked_add_duration(&d))
156        .and_then(|t| t.to_timespec());
157
158    let r = unsafe {
159        libc::futex(
160            futex as *const Atomic<u32> as *mut u32,
161            libc::FUTEX_WAIT,
162            expected as i32,
163            timespec.as_ref().map_or(null(), |t| t as *const libc::timespec),
164            null_mut(),
165        )
166    };
167
168    r == 0 || super::os::errno() != libc::ETIMEDOUT
169}
170
171#[cfg(target_os = "openbsd")]
172pub fn futex_wake(futex: &Atomic<u32>) -> bool {
173    use crate::ptr::{null, null_mut};
174    unsafe {
175        libc::futex(
176            futex as *const Atomic<u32> as *mut u32,
177            libc::FUTEX_WAKE,
178            1,
179            null(),
180            null_mut(),
181        ) > 0
182    }
183}
184
185#[cfg(target_os = "openbsd")]
186pub fn futex_wake_all(futex: &Atomic<u32>) {
187    use crate::ptr::{null, null_mut};
188    unsafe {
189        libc::futex(
190            futex as *const Atomic<u32> as *mut u32,
191            libc::FUTEX_WAKE,
192            i32::MAX,
193            null(),
194            null_mut(),
195        );
196    }
197}
198
199#[cfg(target_os = "dragonfly")]
200pub fn futex_wait(futex: &Atomic<u32>, expected: u32, timeout: Option<Duration>) -> bool {
201    // A timeout of 0 means infinite.
202    // We round smaller timeouts up to 1 millisecond.
203    // Overflows are rounded up to an infinite timeout.
204    let timeout_ms =
205        timeout.and_then(|d| Some(i32::try_from(d.as_millis()).ok()?.max(1))).unwrap_or(0);
206
207    let r = unsafe {
208        libc::umtx_sleep(futex as *const Atomic<u32> as *const i32, expected as i32, timeout_ms)
209    };
210
211    r == 0 || super::os::errno() != libc::ETIMEDOUT
212}
213
214// DragonflyBSD doesn't tell us how many threads are woken up, so this always returns false.
215#[cfg(target_os = "dragonfly")]
216pub fn futex_wake(futex: &Atomic<u32>) -> bool {
217    unsafe { libc::umtx_wakeup(futex as *const Atomic<u32> as *const i32, 1) };
218    false
219}
220
221#[cfg(target_os = "dragonfly")]
222pub fn futex_wake_all(futex: &Atomic<u32>) {
223    unsafe { libc::umtx_wakeup(futex as *const Atomic<u32> as *const i32, i32::MAX) };
224}
225
226#[cfg(target_os = "emscripten")]
227unsafe extern "C" {
228    fn emscripten_futex_wake(addr: *const Atomic<u32>, count: libc::c_int) -> libc::c_int;
229    fn emscripten_futex_wait(
230        addr: *const Atomic<u32>,
231        val: libc::c_uint,
232        max_wait_ms: libc::c_double,
233    ) -> libc::c_int;
234}
235
236#[cfg(target_os = "emscripten")]
237pub fn futex_wait(futex: &Atomic<u32>, expected: u32, timeout: Option<Duration>) -> bool {
238    unsafe {
239        emscripten_futex_wait(
240            futex,
241            expected,
242            timeout.map_or(f64::INFINITY, |d| d.as_secs_f64() * 1000.0),
243        ) != -libc::ETIMEDOUT
244    }
245}
246
247#[cfg(target_os = "emscripten")]
248pub fn futex_wake(futex: &Atomic<u32>) -> bool {
249    unsafe { emscripten_futex_wake(futex, 1) > 0 }
250}
251
252#[cfg(target_os = "emscripten")]
253pub fn futex_wake_all(futex: &Atomic<u32>) {
254    unsafe { emscripten_futex_wake(futex, i32::MAX) };
255}
256
257#[cfg(target_os = "fuchsia")]
258pub mod zircon {
259    pub type zx_futex_t = crate::sync::atomic::Atomic<u32>;
260    pub type zx_handle_t = u32;
261    pub type zx_status_t = i32;
262    pub type zx_time_t = i64;
263
264    pub const ZX_HANDLE_INVALID: zx_handle_t = 0;
265
266    pub const ZX_TIME_INFINITE: zx_time_t = zx_time_t::MAX;
267
268    pub const ZX_OK: zx_status_t = 0;
269    pub const ZX_ERR_INVALID_ARGS: zx_status_t = -10;
270    pub const ZX_ERR_BAD_HANDLE: zx_status_t = -11;
271    pub const ZX_ERR_WRONG_TYPE: zx_status_t = -12;
272    pub const ZX_ERR_BAD_STATE: zx_status_t = -20;
273    pub const ZX_ERR_TIMED_OUT: zx_status_t = -21;
274
275    unsafe extern "C" {
276        pub fn zx_clock_get_monotonic() -> zx_time_t;
277        pub fn zx_futex_wait(
278            value_ptr: *const zx_futex_t,
279            current_value: zx_futex_t,
280            new_futex_owner: zx_handle_t,
281            deadline: zx_time_t,
282        ) -> zx_status_t;
283        pub fn zx_futex_wake(value_ptr: *const zx_futex_t, wake_count: u32) -> zx_status_t;
284        pub fn zx_futex_wake_single_owner(value_ptr: *const zx_futex_t) -> zx_status_t;
285        pub fn zx_thread_self() -> zx_handle_t;
286    }
287}
288
289#[cfg(target_os = "fuchsia")]
290pub fn futex_wait(futex: &Atomic<u32>, expected: u32, timeout: Option<Duration>) -> bool {
291    // Sleep forever if the timeout is longer than fits in a i64.
292    let deadline = timeout
293        .and_then(|d| {
294            i64::try_from(d.as_nanos())
295                .ok()?
296                .checked_add(unsafe { zircon::zx_clock_get_monotonic() })
297        })
298        .unwrap_or(zircon::ZX_TIME_INFINITE);
299
300    unsafe {
301        zircon::zx_futex_wait(
302            futex,
303            core::sync::atomic::AtomicU32::new(expected),
304            zircon::ZX_HANDLE_INVALID,
305            deadline,
306        ) != zircon::ZX_ERR_TIMED_OUT
307    }
308}
309
310// Fuchsia doesn't tell us how many threads are woken up, so this always returns false.
311#[cfg(target_os = "fuchsia")]
312pub fn futex_wake(futex: &Atomic<u32>) -> bool {
313    unsafe { zircon::zx_futex_wake(futex, 1) };
314    false
315}
316
317#[cfg(target_os = "fuchsia")]
318pub fn futex_wake_all(futex: &Atomic<u32>) {
319    unsafe { zircon::zx_futex_wake(futex, u32::MAX) };
320}