std/sync/mpmc/
list.rs

1//! Unbounded channel implemented as a linked list.
2
3use super::context::Context;
4use super::error::*;
5use super::select::{Operation, Selected, Token};
6use super::utils::{Backoff, CachePadded};
7use super::waker::SyncWaker;
8use crate::cell::UnsafeCell;
9use crate::marker::PhantomData;
10use crate::mem::MaybeUninit;
11use crate::ptr;
12use crate::sync::atomic::{self, Atomic, AtomicPtr, AtomicUsize, Ordering};
13use crate::time::Instant;
14
15// Bits indicating the state of a slot:
16// * If a message has been written into the slot, `WRITE` is set.
17// * If a message has been read from the slot, `READ` is set.
18// * If the block is being destroyed, `DESTROY` is set.
19const WRITE: usize = 1;
20const READ: usize = 2;
21const DESTROY: usize = 4;
22
23// Each block covers one "lap" of indices.
24const LAP: usize = 32;
25// The maximum number of messages a block can hold.
26const BLOCK_CAP: usize = LAP - 1;
27// How many lower bits are reserved for metadata.
28const SHIFT: usize = 1;
29// Has two different purposes:
30// * If set in head, indicates that the block is not the last one.
31// * If set in tail, indicates that the channel is disconnected.
32const MARK_BIT: usize = 1;
33
34/// A slot in a block.
35struct Slot<T> {
36    /// The message.
37    msg: UnsafeCell<MaybeUninit<T>>,
38
39    /// The state of the slot.
40    state: Atomic<usize>,
41}
42
43impl<T> Slot<T> {
44    /// Waits until a message is written into the slot.
45    fn wait_write(&self) {
46        let backoff = Backoff::new();
47        while self.state.load(Ordering::Acquire) & WRITE == 0 {
48            backoff.spin_heavy();
49        }
50    }
51}
52
53/// A block in a linked list.
54///
55/// Each block in the list can hold up to `BLOCK_CAP` messages.
56struct Block<T> {
57    /// The next block in the linked list.
58    next: Atomic<*mut Block<T>>,
59
60    /// Slots for messages.
61    slots: [Slot<T>; BLOCK_CAP],
62}
63
64impl<T> Block<T> {
65    /// Creates an empty block.
66    fn new() -> Box<Block<T>> {
67        // SAFETY: This is safe because:
68        //  [1] `Block::next` (Atomic<*mut _>) may be safely zero initialized.
69        //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
70        //  [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it
71        //       holds a MaybeUninit.
72        //  [4] `Slot::state` (Atomic<usize>) may be safely zero initialized.
73        unsafe { Box::new_zeroed().assume_init() }
74    }
75
76    /// Waits until the next pointer is set.
77    fn wait_next(&self) -> *mut Block<T> {
78        let backoff = Backoff::new();
79        loop {
80            let next = self.next.load(Ordering::Acquire);
81            if !next.is_null() {
82                return next;
83            }
84            backoff.spin_heavy();
85        }
86    }
87
88    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
89    unsafe fn destroy(this: *mut Block<T>, start: usize) {
90        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
91        // begun destruction of the block.
92        for i in start..BLOCK_CAP - 1 {
93            let slot = unsafe { (*this).slots.get_unchecked(i) };
94
95            // Mark the `DESTROY` bit if a thread is still using the slot.
96            if slot.state.load(Ordering::Acquire) & READ == 0
97                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
98            {
99                // If a thread is still using the slot, it will continue destruction of the block.
100                return;
101            }
102        }
103
104        // No thread is using the block, now it is safe to destroy it.
105        drop(unsafe { Box::from_raw(this) });
106    }
107}
108
109/// A position in a channel.
110#[derive(Debug)]
111struct Position<T> {
112    /// The index in the channel.
113    index: Atomic<usize>,
114
115    /// The block in the linked list.
116    block: Atomic<*mut Block<T>>,
117}
118
119/// The token type for the list flavor.
120#[derive(Debug)]
121pub(crate) struct ListToken {
122    /// The block of slots.
123    block: *const u8,
124
125    /// The offset into the block.
126    offset: usize,
127}
128
129impl Default for ListToken {
130    #[inline]
131    fn default() -> Self {
132        ListToken { block: ptr::null(), offset: 0 }
133    }
134}
135
136/// Unbounded channel implemented as a linked list.
137///
138/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
139/// represented as numbers of type `usize` and wrap on overflow.
140///
141/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
142/// improve cache efficiency.
143pub(crate) struct Channel<T> {
144    /// The head of the channel.
145    head: CachePadded<Position<T>>,
146
147    /// The tail of the channel.
148    tail: CachePadded<Position<T>>,
149
150    /// Receivers waiting while the channel is empty and not disconnected.
151    receivers: SyncWaker,
152
153    /// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
154    _marker: PhantomData<T>,
155}
156
157impl<T> Channel<T> {
158    /// Creates a new unbounded channel.
159    pub(crate) fn new() -> Self {
160        Channel {
161            head: CachePadded::new(Position {
162                block: AtomicPtr::new(ptr::null_mut()),
163                index: AtomicUsize::new(0),
164            }),
165            tail: CachePadded::new(Position {
166                block: AtomicPtr::new(ptr::null_mut()),
167                index: AtomicUsize::new(0),
168            }),
169            receivers: SyncWaker::new(),
170            _marker: PhantomData,
171        }
172    }
173
174    /// Attempts to reserve a slot for sending a message.
175    fn start_send(&self, token: &mut Token) -> bool {
176        let backoff = Backoff::new();
177        let mut tail = self.tail.index.load(Ordering::Acquire);
178        let mut block = self.tail.block.load(Ordering::Acquire);
179        let mut next_block = None;
180
181        loop {
182            // Check if the channel is disconnected.
183            if tail & MARK_BIT != 0 {
184                token.list.block = ptr::null();
185                return true;
186            }
187
188            // Calculate the offset of the index into the block.
189            let offset = (tail >> SHIFT) % LAP;
190
191            // If we reached the end of the block, wait until the next one is installed.
192            if offset == BLOCK_CAP {
193                backoff.spin_heavy();
194                tail = self.tail.index.load(Ordering::Acquire);
195                block = self.tail.block.load(Ordering::Acquire);
196                continue;
197            }
198
199            // If we're going to have to install the next block, allocate it in advance in order to
200            // make the wait for other threads as short as possible.
201            if offset + 1 == BLOCK_CAP && next_block.is_none() {
202                next_block = Some(Block::<T>::new());
203            }
204
205            // If this is the first message to be sent into the channel, we need to allocate the
206            // first block and install it.
207            if block.is_null() {
208                let new = Box::into_raw(Block::<T>::new());
209
210                if self
211                    .tail
212                    .block
213                    .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
214                    .is_ok()
215                {
216                    // This yield point leaves the channel in a half-initialized state where the
217                    // tail.block pointer is set but the head.block is not. This is used to
218                    // facilitate the test in src/tools/miri/tests/pass/issues/issue-139553.rs
219                    #[cfg(miri)]
220                    crate::thread::yield_now();
221                    self.head.block.store(new, Ordering::Release);
222                    block = new;
223                } else {
224                    next_block = unsafe { Some(Box::from_raw(new)) };
225                    tail = self.tail.index.load(Ordering::Acquire);
226                    block = self.tail.block.load(Ordering::Acquire);
227                    continue;
228                }
229            }
230
231            let new_tail = tail + (1 << SHIFT);
232
233            // Try advancing the tail forward.
234            match self.tail.index.compare_exchange_weak(
235                tail,
236                new_tail,
237                Ordering::SeqCst,
238                Ordering::Acquire,
239            ) {
240                Ok(_) => unsafe {
241                    // If we've reached the end of the block, install the next one.
242                    if offset + 1 == BLOCK_CAP {
243                        let next_block = Box::into_raw(next_block.unwrap());
244                        self.tail.block.store(next_block, Ordering::Release);
245                        self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
246                        (*block).next.store(next_block, Ordering::Release);
247                    }
248
249                    token.list.block = block as *const u8;
250                    token.list.offset = offset;
251                    return true;
252                },
253                Err(_) => {
254                    backoff.spin_light();
255                    tail = self.tail.index.load(Ordering::Acquire);
256                    block = self.tail.block.load(Ordering::Acquire);
257                }
258            }
259        }
260    }
261
262    /// Writes a message into the channel.
263    pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
264        // If there is no slot, the channel is disconnected.
265        if token.list.block.is_null() {
266            return Err(msg);
267        }
268
269        // Write the message into the slot.
270        let block = token.list.block as *mut Block<T>;
271        let offset = token.list.offset;
272        unsafe {
273            let slot = (*block).slots.get_unchecked(offset);
274            slot.msg.get().write(MaybeUninit::new(msg));
275            slot.state.fetch_or(WRITE, Ordering::Release);
276        }
277
278        // Wake a sleeping receiver.
279        self.receivers.notify();
280        Ok(())
281    }
282
283    /// Attempts to reserve a slot for receiving a message.
284    fn start_recv(&self, token: &mut Token) -> bool {
285        let backoff = Backoff::new();
286        let mut head = self.head.index.load(Ordering::Acquire);
287        let mut block = self.head.block.load(Ordering::Acquire);
288
289        loop {
290            // Calculate the offset of the index into the block.
291            let offset = (head >> SHIFT) % LAP;
292
293            // If we reached the end of the block, wait until the next one is installed.
294            if offset == BLOCK_CAP {
295                backoff.spin_heavy();
296                head = self.head.index.load(Ordering::Acquire);
297                block = self.head.block.load(Ordering::Acquire);
298                continue;
299            }
300
301            let mut new_head = head + (1 << SHIFT);
302
303            if new_head & MARK_BIT == 0 {
304                atomic::fence(Ordering::SeqCst);
305                let tail = self.tail.index.load(Ordering::Relaxed);
306
307                // If the tail equals the head, that means the channel is empty.
308                if head >> SHIFT == tail >> SHIFT {
309                    // If the channel is disconnected...
310                    if tail & MARK_BIT != 0 {
311                        // ...then receive an error.
312                        token.list.block = ptr::null();
313                        return true;
314                    } else {
315                        // Otherwise, the receive operation is not ready.
316                        return false;
317                    }
318                }
319
320                // If head and tail are not in the same block, set `MARK_BIT` in head.
321                if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
322                    new_head |= MARK_BIT;
323                }
324            }
325
326            // The block can be null here only if the first message is being sent into the channel.
327            // In that case, just wait until it gets initialized.
328            if block.is_null() {
329                backoff.spin_heavy();
330                head = self.head.index.load(Ordering::Acquire);
331                block = self.head.block.load(Ordering::Acquire);
332                continue;
333            }
334
335            // Try moving the head index forward.
336            match self.head.index.compare_exchange_weak(
337                head,
338                new_head,
339                Ordering::SeqCst,
340                Ordering::Acquire,
341            ) {
342                Ok(_) => unsafe {
343                    // If we've reached the end of the block, move to the next one.
344                    if offset + 1 == BLOCK_CAP {
345                        let next = (*block).wait_next();
346                        let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
347                        if !(*next).next.load(Ordering::Relaxed).is_null() {
348                            next_index |= MARK_BIT;
349                        }
350
351                        self.head.block.store(next, Ordering::Release);
352                        self.head.index.store(next_index, Ordering::Release);
353                    }
354
355                    token.list.block = block as *const u8;
356                    token.list.offset = offset;
357                    return true;
358                },
359                Err(_) => {
360                    backoff.spin_light();
361                    head = self.head.index.load(Ordering::Acquire);
362                    block = self.head.block.load(Ordering::Acquire);
363                }
364            }
365        }
366    }
367
368    /// Reads a message from the channel.
369    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
370        if token.list.block.is_null() {
371            // The channel is disconnected.
372            return Err(());
373        }
374
375        // Read the message.
376        let block = token.list.block as *mut Block<T>;
377        let offset = token.list.offset;
378        unsafe {
379            let slot = (*block).slots.get_unchecked(offset);
380            slot.wait_write();
381            let msg = slot.msg.get().read().assume_init();
382
383            // Destroy the block if we've reached the end, or if another thread wanted to destroy but
384            // couldn't because we were busy reading from the slot.
385            if offset + 1 == BLOCK_CAP {
386                Block::destroy(block, 0);
387            } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
388                Block::destroy(block, offset + 1);
389            }
390
391            Ok(msg)
392        }
393    }
394
395    /// Attempts to send a message into the channel.
396    pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
397        self.send(msg, None).map_err(|err| match err {
398            SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
399            SendTimeoutError::Timeout(_) => unreachable!(),
400        })
401    }
402
403    /// Sends a message into the channel.
404    pub(crate) fn send(
405        &self,
406        msg: T,
407        _deadline: Option<Instant>,
408    ) -> Result<(), SendTimeoutError<T>> {
409        let token = &mut Token::default();
410        assert!(self.start_send(token));
411        unsafe { self.write(token, msg).map_err(SendTimeoutError::Disconnected) }
412    }
413
414    /// Attempts to receive a message without blocking.
415    pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
416        let token = &mut Token::default();
417
418        if self.start_recv(token) {
419            unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
420        } else {
421            Err(TryRecvError::Empty)
422        }
423    }
424
425    /// Receives a message from the channel.
426    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
427        let token = &mut Token::default();
428        loop {
429            if self.start_recv(token) {
430                unsafe {
431                    return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
432                }
433            }
434
435            if let Some(d) = deadline {
436                if Instant::now() >= d {
437                    return Err(RecvTimeoutError::Timeout);
438                }
439            }
440
441            // Prepare for blocking until a sender wakes us up.
442            Context::with(|cx| {
443                let oper = Operation::hook(token);
444                self.receivers.register(oper, cx);
445
446                // Has the channel become ready just now?
447                if !self.is_empty() || self.is_disconnected() {
448                    let _ = cx.try_select(Selected::Aborted);
449                }
450
451                // Block the current thread.
452                // SAFETY: the context belongs to the current thread.
453                let sel = unsafe { cx.wait_until(deadline) };
454
455                match sel {
456                    Selected::Waiting => unreachable!(),
457                    Selected::Aborted | Selected::Disconnected => {
458                        self.receivers.unregister(oper).unwrap();
459                        // If the channel was disconnected, we still have to check for remaining
460                        // messages.
461                    }
462                    Selected::Operation(_) => {}
463                }
464            });
465        }
466    }
467
468    /// Returns the current number of messages inside the channel.
469    pub(crate) fn len(&self) -> usize {
470        loop {
471            // Load the tail index, then load the head index.
472            let mut tail = self.tail.index.load(Ordering::SeqCst);
473            let mut head = self.head.index.load(Ordering::SeqCst);
474
475            // If the tail index didn't change, we've got consistent indices to work with.
476            if self.tail.index.load(Ordering::SeqCst) == tail {
477                // Erase the lower bits.
478                tail &= !((1 << SHIFT) - 1);
479                head &= !((1 << SHIFT) - 1);
480
481                // Fix up indices if they fall onto block ends.
482                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
483                    tail = tail.wrapping_add(1 << SHIFT);
484                }
485                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
486                    head = head.wrapping_add(1 << SHIFT);
487                }
488
489                // Rotate indices so that head falls into the first block.
490                let lap = (head >> SHIFT) / LAP;
491                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
492                head = head.wrapping_sub((lap * LAP) << SHIFT);
493
494                // Remove the lower bits.
495                tail >>= SHIFT;
496                head >>= SHIFT;
497
498                // Return the difference minus the number of blocks between tail and head.
499                return tail - head - tail / LAP;
500            }
501        }
502    }
503
504    /// Returns the capacity of the channel.
505    pub(crate) fn capacity(&self) -> Option<usize> {
506        None
507    }
508
509    /// Disconnects senders and wakes up all blocked receivers.
510    ///
511    /// Returns `true` if this call disconnected the channel.
512    pub(crate) fn disconnect_senders(&self) -> bool {
513        let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
514
515        if tail & MARK_BIT == 0 {
516            self.receivers.disconnect();
517            true
518        } else {
519            false
520        }
521    }
522
523    /// Disconnects receivers.
524    ///
525    /// Returns `true` if this call disconnected the channel.
526    pub(crate) fn disconnect_receivers(&self) -> bool {
527        let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
528
529        if tail & MARK_BIT == 0 {
530            // If receivers are dropped first, discard all messages to free
531            // memory eagerly.
532            self.discard_all_messages();
533            true
534        } else {
535            false
536        }
537    }
538
539    /// Discards all messages.
540    ///
541    /// This method should only be called when all receivers are dropped.
542    fn discard_all_messages(&self) {
543        let backoff = Backoff::new();
544        let mut tail = self.tail.index.load(Ordering::Acquire);
545        loop {
546            let offset = (tail >> SHIFT) % LAP;
547            if offset != BLOCK_CAP {
548                break;
549            }
550
551            // New updates to tail will be rejected by MARK_BIT and aborted unless it's
552            // at boundary. We need to wait for the updates take affect otherwise there
553            // can be memory leaks.
554            backoff.spin_heavy();
555            tail = self.tail.index.load(Ordering::Acquire);
556        }
557
558        let mut head = self.head.index.load(Ordering::Acquire);
559        // The channel may be uninitialized, so we have to swap to avoid overwriting any sender's attempts
560        // to initialize the first block before noticing that the receivers disconnected. Late allocations
561        // will be deallocated by the sender in Drop.
562        let mut block = self.head.block.swap(ptr::null_mut(), Ordering::AcqRel);
563
564        // If we're going to be dropping messages we need to synchronize with initialization
565        if head >> SHIFT != tail >> SHIFT {
566            // The block can be null here only if a sender is in the process of initializing the
567            // channel while another sender managed to send a message by inserting it into the
568            // semi-initialized channel and advanced the tail.
569            // In that case, just wait until it gets initialized.
570            while block.is_null() {
571                backoff.spin_heavy();
572                block = self.head.block.swap(ptr::null_mut(), Ordering::AcqRel);
573            }
574        }
575        // After this point `head.block` is not modified again and it will be deallocated if it's
576        // non-null. The `Drop` code of the channel, which runs after this function, also attempts
577        // to deallocate `head.block` if it's non-null. Therefore this function must maintain the
578        // invariant that if a deallocation of head.block is attemped then it must also be set to
579        // NULL. Failing to do so will lead to the Drop code attempting a double free. For this
580        // reason both reads above do an atomic swap instead of a simple atomic load.
581
582        unsafe {
583            // Drop all messages between head and tail and deallocate the heap-allocated blocks.
584            while head >> SHIFT != tail >> SHIFT {
585                let offset = (head >> SHIFT) % LAP;
586
587                if offset < BLOCK_CAP {
588                    // Drop the message in the slot.
589                    let slot = (*block).slots.get_unchecked(offset);
590                    slot.wait_write();
591                    let p = &mut *slot.msg.get();
592                    p.as_mut_ptr().drop_in_place();
593                } else {
594                    (*block).wait_next();
595                    // Deallocate the block and move to the next one.
596                    let next = (*block).next.load(Ordering::Acquire);
597                    drop(Box::from_raw(block));
598                    block = next;
599                }
600
601                head = head.wrapping_add(1 << SHIFT);
602            }
603
604            // Deallocate the last remaining block.
605            if !block.is_null() {
606                drop(Box::from_raw(block));
607            }
608        }
609
610        head &= !MARK_BIT;
611        self.head.index.store(head, Ordering::Release);
612    }
613
614    /// Returns `true` if the channel is disconnected.
615    pub(crate) fn is_disconnected(&self) -> bool {
616        self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
617    }
618
619    /// Returns `true` if the channel is empty.
620    pub(crate) fn is_empty(&self) -> bool {
621        let head = self.head.index.load(Ordering::SeqCst);
622        let tail = self.tail.index.load(Ordering::SeqCst);
623        head >> SHIFT == tail >> SHIFT
624    }
625
626    /// Returns `true` if the channel is full.
627    pub(crate) fn is_full(&self) -> bool {
628        false
629    }
630}
631
632impl<T> Drop for Channel<T> {
633    fn drop(&mut self) {
634        let mut head = self.head.index.load(Ordering::Relaxed);
635        let mut tail = self.tail.index.load(Ordering::Relaxed);
636        let mut block = self.head.block.load(Ordering::Relaxed);
637
638        // Erase the lower bits.
639        head &= !((1 << SHIFT) - 1);
640        tail &= !((1 << SHIFT) - 1);
641
642        unsafe {
643            // Drop all messages between head and tail and deallocate the heap-allocated blocks.
644            while head != tail {
645                let offset = (head >> SHIFT) % LAP;
646
647                if offset < BLOCK_CAP {
648                    // Drop the message in the slot.
649                    let slot = (*block).slots.get_unchecked(offset);
650                    let p = &mut *slot.msg.get();
651                    p.as_mut_ptr().drop_in_place();
652                } else {
653                    // Deallocate the block and move to the next one.
654                    let next = (*block).next.load(Ordering::Relaxed);
655                    drop(Box::from_raw(block));
656                    block = next;
657                }
658
659                head = head.wrapping_add(1 << SHIFT);
660            }
661
662            // Deallocate the last remaining block.
663            if !block.is_null() {
664                drop(Box::from_raw(block));
665            }
666        }
667    }
668}