aboutsummaryrefslogtreecommitdiff
path: root/rtic-sync
diff options
context:
space:
mode:
Diffstat (limited to 'rtic-sync')
-rw-r--r--rtic-sync/src/channel.rs56
1 files changed, 33 insertions, 23 deletions
diff --git a/rtic-sync/src/channel.rs b/rtic-sync/src/channel.rs
index 17fd50a..d3a64b6 100644
--- a/rtic-sync/src/channel.rs
+++ b/rtic-sync/src/channel.rs
@@ -108,6 +108,30 @@ impl<T, const N: usize> Channel<T, N> {
}
}
}
+
+ /// Return free slot `slot` to the channel.
+ ///
+ /// This will do one of two things:
+ /// 1. If there are any waiting `send`-ers, wake the longest-waiting one and hand it `slot`.
+ /// 2. else, insert `slot` into `self.freeq`.
+ ///
+ /// SAFETY: `slot` must be a `u8` that is obtained by dequeueing from [`Self::readyq`].
+ unsafe fn return_free_slot(&self, slot: u8) {
+ critical_section::with(|cs| {
+ fence(Ordering::SeqCst);
+
+ // If someone is waiting in the `wait_queue`, wake the first one up & hand it the free slot.
+ if let Some((wait_head, mut freeq_slot)) = self.wait_queue.pop() {
+ // SAFETY: `freeq_slot` is valid for writes: we are in a critical
+ // section & the `SlotPtr` lives for at least the duration of the wait queue link.
+ unsafe { freeq_slot.replace(Some(slot), cs) };
+ wait_head.wake();
+ } else {
+ assert!(!self.access(cs).freeq.is_full());
+ unsafe { self.access(cs).freeq.push_back_unchecked(slot) }
+ }
+ })
+ }
}
/// Creates a split channel with `'static` lifetime.
@@ -313,18 +337,16 @@ impl<T, const N: usize> Sender<'_, T, N> {
link.remove_from_list(&self.0.wait_queue);
}
+ // Return our potentially-unused free slot.
// Potentially unnecessary c-s because our link was already popped, so there
// is no way for anything else to access the free slot ptr. Gotta think
// about this a bit more...
- //
- // SAFETY(replace): `free_slot_ptr2` is valid for writes.
critical_section::with(|cs| {
if let Some(freed_slot) = unsafe { free_slot_ptr2.replace(None, cs) } {
- debug_assert!(!self.0.access(cs).freeq.is_full());
- // SAFETY: freeq is not full.
- unsafe {
- self.0.access(cs).freeq.push_back_unchecked(freed_slot);
- }
+ // SAFETY: freed slot is passed to us from `return_free_slot`, which either
+ // directly (through `try_recv`), or indirectly (through another `return_free_slot`)
+ // comes from `readyq`.
+ unsafe { self.0.return_free_slot(freed_slot) };
}
});
});
@@ -350,7 +372,7 @@ impl<T, const N: usize> Sender<'_, T, N> {
let slot = unsafe { free_slot_ptr.replace(None, cs) };
// If our link is popped, then:
- // 1. We were popped by `try_recv` and it provided us with a slot.
+ // 1. We were popped by `return_free_lot` and provided us with a slot.
// 2. We were popped by `Receiver::drop` and it did not provide us with a slot, and the channel is closed.
if let Some(slot) = slot {
Poll::Ready(Ok(slot))
@@ -482,22 +504,10 @@ impl<T, const N: usize> Receiver<'_, T, N> {
let r = unsafe { ptr::read(self.0.slots.get_unchecked(rs as usize).get() as *const T) };
// Return the index to the free queue after we've read the value.
- critical_section::with(|cs| {
- fence(Ordering::SeqCst);
-
- // If someone is waiting in the WaiterQueue, wake the first one up & hand it the free slot.
- if let Some((wait_head, mut freeq_slot)) = self.0.wait_queue.pop() {
- // SAFETY: `freeq_slot` is valid for writes: we are in a critical
- // section & the `SlotPtr` lives for at least the duration of the wait queue link.
- unsafe { freeq_slot.replace(Some(rs), cs) };
- wait_head.wake();
- } else {
- assert!(!self.0.access(cs).freeq.is_full());
- unsafe { self.0.access(cs).freeq.push_back_unchecked(rs) }
- }
+ // SAFETY: `rs` comes directly from `readyq`.
+ unsafe { self.0.return_free_slot(rs) };
- Ok(r)
- })
+ Ok(r)
} else if self.is_closed() {
Err(ReceiveError::NoSender)
} else {