std/sync/mpmc/list.rs
1//! Unbounded channel implemented as a linked list.
2
3use super::context::Context;
4use super::error::*;
5use super::select::{Operation, Selected, Token};
6use super::utils::{Backoff, CachePadded};
7use super::waker::SyncWaker;
8use crate::cell::UnsafeCell;
9use crate::marker::PhantomData;
10use crate::mem::MaybeUninit;
11use crate::ptr;
12use crate::sync::atomic::{self, Atomic, AtomicPtr, AtomicUsize, Ordering};
13use crate::time::Instant;
14
15// Bits indicating the state of a slot:
16// * If a message has been written into the slot, `WRITE` is set.
17// * If a message has been read from the slot, `READ` is set.
18// * If the block is being destroyed, `DESTROY` is set.
19const WRITE: usize = 1;
20const READ: usize = 2;
21const DESTROY: usize = 4;
22
23// Each block covers one "lap" of indices.
24const LAP: usize = 32;
25// The maximum number of messages a block can hold.
26const BLOCK_CAP: usize = LAP - 1;
27// How many lower bits are reserved for metadata.
28const SHIFT: usize = 1;
29// Has two different purposes:
30// * If set in head, indicates that the block is not the last one.
31// * If set in tail, indicates that the channel is disconnected.
32const MARK_BIT: usize = 1;
33
34/// A slot in a block.
35struct Slot<T> {
36 /// The message.
37 msg: UnsafeCell<MaybeUninit<T>>,
38
39 /// The state of the slot.
40 state: Atomic<usize>,
41}
42
43impl<T> Slot<T> {
44 /// Waits until a message is written into the slot.
45 fn wait_write(&self) {
46 let backoff = Backoff::new();
47 while self.state.load(Ordering::Acquire) & WRITE == 0 {
48 backoff.spin_heavy();
49 }
50 }
51}
52
53/// A block in a linked list.
54///
55/// Each block in the list can hold up to `BLOCK_CAP` messages.
56struct Block<T> {
57 /// The next block in the linked list.
58 next: Atomic<*mut Block<T>>,
59
60 /// Slots for messages.
61 slots: [Slot<T>; BLOCK_CAP],
62}
63
64impl<T> Block<T> {
65 /// Creates an empty block.
66 fn new() -> Box<Block<T>> {
67 // SAFETY: This is safe because:
68 // [1] `Block::next` (Atomic<*mut _>) may be safely zero initialized.
69 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
70 // [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it
71 // holds a MaybeUninit.
72 // [4] `Slot::state` (Atomic<usize>) may be safely zero initialized.
73 unsafe { Box::new_zeroed().assume_init() }
74 }
75
76 /// Waits until the next pointer is set.
77 fn wait_next(&self) -> *mut Block<T> {
78 let backoff = Backoff::new();
79 loop {
80 let next = self.next.load(Ordering::Acquire);
81 if !next.is_null() {
82 return next;
83 }
84 backoff.spin_heavy();
85 }
86 }
87
88 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
89 unsafe fn destroy(this: *mut Block<T>, start: usize) {
90 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
91 // begun destruction of the block.
92 for i in start..BLOCK_CAP - 1 {
93 let slot = unsafe { (*this).slots.get_unchecked(i) };
94
95 // Mark the `DESTROY` bit if a thread is still using the slot.
96 if slot.state.load(Ordering::Acquire) & READ == 0
97 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
98 {
99 // If a thread is still using the slot, it will continue destruction of the block.
100 return;
101 }
102 }
103
104 // No thread is using the block, now it is safe to destroy it.
105 drop(unsafe { Box::from_raw(this) });
106 }
107}
108
109/// A position in a channel.
110#[derive(Debug)]
111struct Position<T> {
112 /// The index in the channel.
113 index: Atomic<usize>,
114
115 /// The block in the linked list.
116 block: Atomic<*mut Block<T>>,
117}
118
119/// The token type for the list flavor.
120#[derive(Debug)]
121pub(crate) struct ListToken {
122 /// The block of slots.
123 block: *const u8,
124
125 /// The offset into the block.
126 offset: usize,
127}
128
129impl Default for ListToken {
130 #[inline]
131 fn default() -> Self {
132 ListToken { block: ptr::null(), offset: 0 }
133 }
134}
135
136/// Unbounded channel implemented as a linked list.
137///
138/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
139/// represented as numbers of type `usize` and wrap on overflow.
140///
141/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
142/// improve cache efficiency.
143pub(crate) struct Channel<T> {
144 /// The head of the channel.
145 head: CachePadded<Position<T>>,
146
147 /// The tail of the channel.
148 tail: CachePadded<Position<T>>,
149
150 /// Receivers waiting while the channel is empty and not disconnected.
151 receivers: SyncWaker,
152
153 /// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
154 _marker: PhantomData<T>,
155}
156
157impl<T> Channel<T> {
158 /// Creates a new unbounded channel.
159 pub(crate) fn new() -> Self {
160 Channel {
161 head: CachePadded::new(Position {
162 block: AtomicPtr::new(ptr::null_mut()),
163 index: AtomicUsize::new(0),
164 }),
165 tail: CachePadded::new(Position {
166 block: AtomicPtr::new(ptr::null_mut()),
167 index: AtomicUsize::new(0),
168 }),
169 receivers: SyncWaker::new(),
170 _marker: PhantomData,
171 }
172 }
173
174 /// Attempts to reserve a slot for sending a message.
175 fn start_send(&self, token: &mut Token) -> bool {
176 let backoff = Backoff::new();
177 let mut tail = self.tail.index.load(Ordering::Acquire);
178 let mut block = self.tail.block.load(Ordering::Acquire);
179 let mut next_block = None;
180
181 loop {
182 // Check if the channel is disconnected.
183 if tail & MARK_BIT != 0 {
184 token.list.block = ptr::null();
185 return true;
186 }
187
188 // Calculate the offset of the index into the block.
189 let offset = (tail >> SHIFT) % LAP;
190
191 // If we reached the end of the block, wait until the next one is installed.
192 if offset == BLOCK_CAP {
193 backoff.spin_heavy();
194 tail = self.tail.index.load(Ordering::Acquire);
195 block = self.tail.block.load(Ordering::Acquire);
196 continue;
197 }
198
199 // If we're going to have to install the next block, allocate it in advance in order to
200 // make the wait for other threads as short as possible.
201 if offset + 1 == BLOCK_CAP && next_block.is_none() {
202 next_block = Some(Block::<T>::new());
203 }
204
205 // If this is the first message to be sent into the channel, we need to allocate the
206 // first block and install it.
207 if block.is_null() {
208 let new = Box::into_raw(Block::<T>::new());
209
210 if self
211 .tail
212 .block
213 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
214 .is_ok()
215 {
216 // This yield point leaves the channel in a half-initialized state where the
217 // tail.block pointer is set but the head.block is not. This is used to
218 // facilitate the test in src/tools/miri/tests/pass/issues/issue-139553.rs
219 #[cfg(miri)]
220 crate::thread::yield_now();
221 self.head.block.store(new, Ordering::Release);
222 block = new;
223 } else {
224 next_block = unsafe { Some(Box::from_raw(new)) };
225 tail = self.tail.index.load(Ordering::Acquire);
226 block = self.tail.block.load(Ordering::Acquire);
227 continue;
228 }
229 }
230
231 let new_tail = tail + (1 << SHIFT);
232
233 // Try advancing the tail forward.
234 match self.tail.index.compare_exchange_weak(
235 tail,
236 new_tail,
237 Ordering::SeqCst,
238 Ordering::Acquire,
239 ) {
240 Ok(_) => unsafe {
241 // If we've reached the end of the block, install the next one.
242 if offset + 1 == BLOCK_CAP {
243 let next_block = Box::into_raw(next_block.unwrap());
244 self.tail.block.store(next_block, Ordering::Release);
245 self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
246 (*block).next.store(next_block, Ordering::Release);
247 }
248
249 token.list.block = block as *const u8;
250 token.list.offset = offset;
251 return true;
252 },
253 Err(_) => {
254 backoff.spin_light();
255 tail = self.tail.index.load(Ordering::Acquire);
256 block = self.tail.block.load(Ordering::Acquire);
257 }
258 }
259 }
260 }
261
262 /// Writes a message into the channel.
263 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
264 // If there is no slot, the channel is disconnected.
265 if token.list.block.is_null() {
266 return Err(msg);
267 }
268
269 // Write the message into the slot.
270 let block = token.list.block as *mut Block<T>;
271 let offset = token.list.offset;
272 unsafe {
273 let slot = (*block).slots.get_unchecked(offset);
274 slot.msg.get().write(MaybeUninit::new(msg));
275 slot.state.fetch_or(WRITE, Ordering::Release);
276 }
277
278 // Wake a sleeping receiver.
279 self.receivers.notify();
280 Ok(())
281 }
282
283 /// Attempts to reserve a slot for receiving a message.
284 fn start_recv(&self, token: &mut Token) -> bool {
285 let backoff = Backoff::new();
286 let mut head = self.head.index.load(Ordering::Acquire);
287 let mut block = self.head.block.load(Ordering::Acquire);
288
289 loop {
290 // Calculate the offset of the index into the block.
291 let offset = (head >> SHIFT) % LAP;
292
293 // If we reached the end of the block, wait until the next one is installed.
294 if offset == BLOCK_CAP {
295 backoff.spin_heavy();
296 head = self.head.index.load(Ordering::Acquire);
297 block = self.head.block.load(Ordering::Acquire);
298 continue;
299 }
300
301 let mut new_head = head + (1 << SHIFT);
302
303 if new_head & MARK_BIT == 0 {
304 atomic::fence(Ordering::SeqCst);
305 let tail = self.tail.index.load(Ordering::Relaxed);
306
307 // If the tail equals the head, that means the channel is empty.
308 if head >> SHIFT == tail >> SHIFT {
309 // If the channel is disconnected...
310 if tail & MARK_BIT != 0 {
311 // ...then receive an error.
312 token.list.block = ptr::null();
313 return true;
314 } else {
315 // Otherwise, the receive operation is not ready.
316 return false;
317 }
318 }
319
320 // If head and tail are not in the same block, set `MARK_BIT` in head.
321 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
322 new_head |= MARK_BIT;
323 }
324 }
325
326 // The block can be null here only if the first message is being sent into the channel.
327 // In that case, just wait until it gets initialized.
328 if block.is_null() {
329 backoff.spin_heavy();
330 head = self.head.index.load(Ordering::Acquire);
331 block = self.head.block.load(Ordering::Acquire);
332 continue;
333 }
334
335 // Try moving the head index forward.
336 match self.head.index.compare_exchange_weak(
337 head,
338 new_head,
339 Ordering::SeqCst,
340 Ordering::Acquire,
341 ) {
342 Ok(_) => unsafe {
343 // If we've reached the end of the block, move to the next one.
344 if offset + 1 == BLOCK_CAP {
345 let next = (*block).wait_next();
346 let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
347 if !(*next).next.load(Ordering::Relaxed).is_null() {
348 next_index |= MARK_BIT;
349 }
350
351 self.head.block.store(next, Ordering::Release);
352 self.head.index.store(next_index, Ordering::Release);
353 }
354
355 token.list.block = block as *const u8;
356 token.list.offset = offset;
357 return true;
358 },
359 Err(_) => {
360 backoff.spin_light();
361 head = self.head.index.load(Ordering::Acquire);
362 block = self.head.block.load(Ordering::Acquire);
363 }
364 }
365 }
366 }
367
368 /// Reads a message from the channel.
369 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
370 if token.list.block.is_null() {
371 // The channel is disconnected.
372 return Err(());
373 }
374
375 // Read the message.
376 let block = token.list.block as *mut Block<T>;
377 let offset = token.list.offset;
378 unsafe {
379 let slot = (*block).slots.get_unchecked(offset);
380 slot.wait_write();
381 let msg = slot.msg.get().read().assume_init();
382
383 // Destroy the block if we've reached the end, or if another thread wanted to destroy but
384 // couldn't because we were busy reading from the slot.
385 if offset + 1 == BLOCK_CAP {
386 Block::destroy(block, 0);
387 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
388 Block::destroy(block, offset + 1);
389 }
390
391 Ok(msg)
392 }
393 }
394
395 /// Attempts to send a message into the channel.
396 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
397 self.send(msg, None).map_err(|err| match err {
398 SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
399 SendTimeoutError::Timeout(_) => unreachable!(),
400 })
401 }
402
403 /// Sends a message into the channel.
404 pub(crate) fn send(
405 &self,
406 msg: T,
407 _deadline: Option<Instant>,
408 ) -> Result<(), SendTimeoutError<T>> {
409 let token = &mut Token::default();
410 assert!(self.start_send(token));
411 unsafe { self.write(token, msg).map_err(SendTimeoutError::Disconnected) }
412 }
413
414 /// Attempts to receive a message without blocking.
415 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
416 let token = &mut Token::default();
417
418 if self.start_recv(token) {
419 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
420 } else {
421 Err(TryRecvError::Empty)
422 }
423 }
424
425 /// Receives a message from the channel.
426 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
427 let token = &mut Token::default();
428 loop {
429 if self.start_recv(token) {
430 unsafe {
431 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
432 }
433 }
434
435 if let Some(d) = deadline {
436 if Instant::now() >= d {
437 return Err(RecvTimeoutError::Timeout);
438 }
439 }
440
441 // Prepare for blocking until a sender wakes us up.
442 Context::with(|cx| {
443 let oper = Operation::hook(token);
444 self.receivers.register(oper, cx);
445
446 // Has the channel become ready just now?
447 if !self.is_empty() || self.is_disconnected() {
448 let _ = cx.try_select(Selected::Aborted);
449 }
450
451 // Block the current thread.
452 // SAFETY: the context belongs to the current thread.
453 let sel = unsafe { cx.wait_until(deadline) };
454
455 match sel {
456 Selected::Waiting => unreachable!(),
457 Selected::Aborted | Selected::Disconnected => {
458 self.receivers.unregister(oper).unwrap();
459 // If the channel was disconnected, we still have to check for remaining
460 // messages.
461 }
462 Selected::Operation(_) => {}
463 }
464 });
465 }
466 }
467
468 /// Returns the current number of messages inside the channel.
469 pub(crate) fn len(&self) -> usize {
470 loop {
471 // Load the tail index, then load the head index.
472 let mut tail = self.tail.index.load(Ordering::SeqCst);
473 let mut head = self.head.index.load(Ordering::SeqCst);
474
475 // If the tail index didn't change, we've got consistent indices to work with.
476 if self.tail.index.load(Ordering::SeqCst) == tail {
477 // Erase the lower bits.
478 tail &= !((1 << SHIFT) - 1);
479 head &= !((1 << SHIFT) - 1);
480
481 // Fix up indices if they fall onto block ends.
482 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
483 tail = tail.wrapping_add(1 << SHIFT);
484 }
485 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
486 head = head.wrapping_add(1 << SHIFT);
487 }
488
489 // Rotate indices so that head falls into the first block.
490 let lap = (head >> SHIFT) / LAP;
491 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
492 head = head.wrapping_sub((lap * LAP) << SHIFT);
493
494 // Remove the lower bits.
495 tail >>= SHIFT;
496 head >>= SHIFT;
497
498 // Return the difference minus the number of blocks between tail and head.
499 return tail - head - tail / LAP;
500 }
501 }
502 }
503
504 /// Returns the capacity of the channel.
505 pub(crate) fn capacity(&self) -> Option<usize> {
506 None
507 }
508
509 /// Disconnects senders and wakes up all blocked receivers.
510 ///
511 /// Returns `true` if this call disconnected the channel.
512 pub(crate) fn disconnect_senders(&self) -> bool {
513 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
514
515 if tail & MARK_BIT == 0 {
516 self.receivers.disconnect();
517 true
518 } else {
519 false
520 }
521 }
522
523 /// Disconnects receivers.
524 ///
525 /// Returns `true` if this call disconnected the channel.
526 pub(crate) fn disconnect_receivers(&self) -> bool {
527 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
528
529 if tail & MARK_BIT == 0 {
530 // If receivers are dropped first, discard all messages to free
531 // memory eagerly.
532 self.discard_all_messages();
533 true
534 } else {
535 false
536 }
537 }
538
539 /// Discards all messages.
540 ///
541 /// This method should only be called when all receivers are dropped.
542 fn discard_all_messages(&self) {
543 let backoff = Backoff::new();
544 let mut tail = self.tail.index.load(Ordering::Acquire);
545 loop {
546 let offset = (tail >> SHIFT) % LAP;
547 if offset != BLOCK_CAP {
548 break;
549 }
550
551 // New updates to tail will be rejected by MARK_BIT and aborted unless it's
552 // at boundary. We need to wait for the updates take affect otherwise there
553 // can be memory leaks.
554 backoff.spin_heavy();
555 tail = self.tail.index.load(Ordering::Acquire);
556 }
557
558 let mut head = self.head.index.load(Ordering::Acquire);
559 // The channel may be uninitialized, so we have to swap to avoid overwriting any sender's attempts
560 // to initialize the first block before noticing that the receivers disconnected. Late allocations
561 // will be deallocated by the sender in Drop.
562 let mut block = self.head.block.swap(ptr::null_mut(), Ordering::AcqRel);
563
564 // If we're going to be dropping messages we need to synchronize with initialization
565 if head >> SHIFT != tail >> SHIFT {
566 // The block can be null here only if a sender is in the process of initializing the
567 // channel while another sender managed to send a message by inserting it into the
568 // semi-initialized channel and advanced the tail.
569 // In that case, just wait until it gets initialized.
570 while block.is_null() {
571 backoff.spin_heavy();
572 block = self.head.block.swap(ptr::null_mut(), Ordering::AcqRel);
573 }
574 }
575 // After this point `head.block` is not modified again and it will be deallocated if it's
576 // non-null. The `Drop` code of the channel, which runs after this function, also attempts
577 // to deallocate `head.block` if it's non-null. Therefore this function must maintain the
578 // invariant that if a deallocation of head.block is attemped then it must also be set to
579 // NULL. Failing to do so will lead to the Drop code attempting a double free. For this
580 // reason both reads above do an atomic swap instead of a simple atomic load.
581
582 unsafe {
583 // Drop all messages between head and tail and deallocate the heap-allocated blocks.
584 while head >> SHIFT != tail >> SHIFT {
585 let offset = (head >> SHIFT) % LAP;
586
587 if offset < BLOCK_CAP {
588 // Drop the message in the slot.
589 let slot = (*block).slots.get_unchecked(offset);
590 slot.wait_write();
591 let p = &mut *slot.msg.get();
592 p.as_mut_ptr().drop_in_place();
593 } else {
594 (*block).wait_next();
595 // Deallocate the block and move to the next one.
596 let next = (*block).next.load(Ordering::Acquire);
597 drop(Box::from_raw(block));
598 block = next;
599 }
600
601 head = head.wrapping_add(1 << SHIFT);
602 }
603
604 // Deallocate the last remaining block.
605 if !block.is_null() {
606 drop(Box::from_raw(block));
607 }
608 }
609
610 head &= !MARK_BIT;
611 self.head.index.store(head, Ordering::Release);
612 }
613
614 /// Returns `true` if the channel is disconnected.
615 pub(crate) fn is_disconnected(&self) -> bool {
616 self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
617 }
618
619 /// Returns `true` if the channel is empty.
620 pub(crate) fn is_empty(&self) -> bool {
621 let head = self.head.index.load(Ordering::SeqCst);
622 let tail = self.tail.index.load(Ordering::SeqCst);
623 head >> SHIFT == tail >> SHIFT
624 }
625
626 /// Returns `true` if the channel is full.
627 pub(crate) fn is_full(&self) -> bool {
628 false
629 }
630}
631
632impl<T> Drop for Channel<T> {
633 fn drop(&mut self) {
634 let mut head = self.head.index.load(Ordering::Relaxed);
635 let mut tail = self.tail.index.load(Ordering::Relaxed);
636 let mut block = self.head.block.load(Ordering::Relaxed);
637
638 // Erase the lower bits.
639 head &= !((1 << SHIFT) - 1);
640 tail &= !((1 << SHIFT) - 1);
641
642 unsafe {
643 // Drop all messages between head and tail and deallocate the heap-allocated blocks.
644 while head != tail {
645 let offset = (head >> SHIFT) % LAP;
646
647 if offset < BLOCK_CAP {
648 // Drop the message in the slot.
649 let slot = (*block).slots.get_unchecked(offset);
650 let p = &mut *slot.msg.get();
651 p.as_mut_ptr().drop_in_place();
652 } else {
653 // Deallocate the block and move to the next one.
654 let next = (*block).next.load(Ordering::Relaxed);
655 drop(Box::from_raw(block));
656 block = next;
657 }
658
659 head = head.wrapping_add(1 << SHIFT);
660 }
661
662 // Deallocate the last remaining block.
663 if !block.is_null() {
664 drop(Box::from_raw(block));
665 }
666 }
667 }
668}