diff options
| author | datdenkikniet <jcdra1@gmail.com> | 2025-03-15 14:35:58 +0100 |
|---|---|---|
| committer | Emil Fresk <emil.fresk@gmail.com> | 2025-03-16 11:19:22 +0000 |
| commit | e6bd03051f7752a2148d144a22db56d1ecd418c3 (patch) | |
| tree | f62467792475c3ac3f58d73d42a72660ca855b8b | |
| parent | 4d58d2bcd5e08f0123d2f3ac39c6a090dfee23d6 (diff) | |
rtic-sync: always wake `wait_queue` when attempting to return an item
to freeq
| -rw-r--r-- | rtic-sync/src/channel.rs | 56 |
1 files changed, 33 insertions, 23 deletions
diff --git a/rtic-sync/src/channel.rs b/rtic-sync/src/channel.rs index 17fd50a..d3a64b6 100644 --- a/rtic-sync/src/channel.rs +++ b/rtic-sync/src/channel.rs @@ -108,6 +108,30 @@ impl<T, const N: usize> Channel<T, N> { } } } + + /// Return free slot `slot` to the channel. + /// + /// This will do one of two things: + /// 1. If there are any waiting `send`-ers, wake the longest-waiting one and hand it `slot`. + /// 2. else, insert `slot` into `self.freeq`. + /// + /// SAFETY: `slot` must be a `u8` that is obtained by dequeueing from [`Self::readyq`]. + unsafe fn return_free_slot(&self, slot: u8) { + critical_section::with(|cs| { + fence(Ordering::SeqCst); + + // If someone is waiting in the `wait_queue`, wake the first one up & hand it the free slot. + if let Some((wait_head, mut freeq_slot)) = self.wait_queue.pop() { + // SAFETY: `freeq_slot` is valid for writes: we are in a critical + // section & the `SlotPtr` lives for at least the duration of the wait queue link. + unsafe { freeq_slot.replace(Some(slot), cs) }; + wait_head.wake(); + } else { + assert!(!self.access(cs).freeq.is_full()); + unsafe { self.access(cs).freeq.push_back_unchecked(slot) } + } + }) + } } /// Creates a split channel with `'static` lifetime. @@ -313,18 +337,16 @@ impl<T, const N: usize> Sender<'_, T, N> { link.remove_from_list(&self.0.wait_queue); } + // Return our potentially-unused free slot. // Potentially unnecessary c-s because our link was already popped, so there // is no way for anything else to access the free slot ptr. Gotta think // about this a bit more... - // - // SAFETY(replace): `free_slot_ptr2` is valid for writes. critical_section::with(|cs| { if let Some(freed_slot) = unsafe { free_slot_ptr2.replace(None, cs) } { - debug_assert!(!self.0.access(cs).freeq.is_full()); - // SAFETY: freeq is not full. - unsafe { - self.0.access(cs).freeq.push_back_unchecked(freed_slot); - } + // SAFETY: freed slot is passed to us from `return_free_slot`, which either + // directly (through `try_recv`), or indirectly (through another `return_free_slot`) + // comes from `readyq`. + unsafe { self.0.return_free_slot(freed_slot) }; } }); }); @@ -350,7 +372,7 @@ impl<T, const N: usize> Sender<'_, T, N> { let slot = unsafe { free_slot_ptr.replace(None, cs) }; // If our link is popped, then: - // 1. We were popped by `try_recv` and it provided us with a slot. + // 1. We were popped by `return_free_lot` and provided us with a slot. // 2. We were popped by `Receiver::drop` and it did not provide us with a slot, and the channel is closed. if let Some(slot) = slot { Poll::Ready(Ok(slot)) @@ -482,22 +504,10 @@ impl<T, const N: usize> Receiver<'_, T, N> { let r = unsafe { ptr::read(self.0.slots.get_unchecked(rs as usize).get() as *const T) }; // Return the index to the free queue after we've read the value. - critical_section::with(|cs| { - fence(Ordering::SeqCst); - - // If someone is waiting in the WaiterQueue, wake the first one up & hand it the free slot. - if let Some((wait_head, mut freeq_slot)) = self.0.wait_queue.pop() { - // SAFETY: `freeq_slot` is valid for writes: we are in a critical - // section & the `SlotPtr` lives for at least the duration of the wait queue link. - unsafe { freeq_slot.replace(Some(rs), cs) }; - wait_head.wake(); - } else { - assert!(!self.0.access(cs).freeq.is_full()); - unsafe { self.0.access(cs).freeq.push_back_unchecked(rs) } - } + // SAFETY: `rs` comes directly from `readyq`. + unsafe { self.0.return_free_slot(rs) }; - Ok(r) - }) + Ok(r) } else if self.is_closed() { Err(ReceiveError::NoSender) } else { |
