aboutsummaryrefslogtreecommitdiff
path: root/rtic-sync
diff options
context:
space:
mode:
Diffstat (limited to 'rtic-sync')
-rw-r--r--rtic-sync/src/channel.rs127
1 files changed, 28 insertions, 99 deletions
diff --git a/rtic-sync/src/channel.rs b/rtic-sync/src/channel.rs
index e665204..fe53608 100644
--- a/rtic-sync/src/channel.rs
+++ b/rtic-sync/src/channel.rs
@@ -102,62 +102,10 @@ impl<T, const N: usize> Channel<T, N> {
}
}
- /// Clear any remaining items from this `Channel`.
- pub fn clear(&mut self) {
- for item in self.queued_items() {
- drop(item);
- }
- }
-
- /// Return an iterator over the still-queued items, removing them
- /// from this channel.
- pub fn queued_items(&mut self) -> impl Iterator<Item = T> + '_ {
- struct Iter<'a, T, const N: usize> {
- inner: &'a mut Channel<T, N>,
- }
-
- impl<T, const N: usize> Iterator for Iter<'_, T, N> {
- type Item = T;
-
- fn next(&mut self) -> Option<Self::Item> {
- let slot = self.inner.readyq.as_mut().pop_back()?;
-
- let value = unsafe {
- // SAFETY: `ready` is a valid slot.
- let first_element = self.inner.slots.get_unchecked(slot as usize).get_mut();
- let ptr = first_element.deref().as_ptr();
- // SAFETY: `ptr` points to an initialized `T`.
- core::ptr::read(ptr)
- };
-
- // NOTE: do not `return_free_slot`, as we have mutable
- // access to this `Channel` and no `Receiver` or `Sender`
- // exist.
- debug_assert!(!self.inner.freeq.as_mut().is_full());
- unsafe {
- // SAFETY: `freeq` is not ful.
- self.inner.freeq.as_mut().push_back_unchecked(slot);
- }
-
- Some(value)
- }
- }
-
- Iter { inner: self }
- }
-
/// Split the queue into a `Sender`/`Receiver` pair.
- ///
- /// # Panics
- /// This function panics if there are items in this channel while splitting.
- ///
- /// Call [`Channel::clear`] to clear all items from it, or [`Channel::queued_items`] to retrieve
- /// an iterator that yields the values.
pub fn split(&mut self) -> (Sender<'_, T, N>, Receiver<'_, T, N>) {
- assert!(
- self.readyq.as_mut().is_empty(),
- "Cannot re-split non-empty queue. Call `Channel::clear()`."
- );
+ // NOTE(assert): queue is cleared by dropping the corresponding `Receiver`.
+ debug_assert!(self.readyq.as_mut().is_empty(),);
let freeq = self.freeq.as_mut();
@@ -193,7 +141,8 @@ impl<T, const N: usize> Channel<T, N> {
/// 1. If there are any waiting `send`-ers, wake the longest-waiting one and hand it `slot`.
/// 2. else, insert `slot` into `self.freeq`.
///
- /// SAFETY: `slot` must be a `u8` that is obtained by dequeueing from [`Self::readyq`].
+ /// SAFETY: `slot` must be a `u8` that is obtained by dequeueing from [`Self::readyq`], and that `slot`
+ /// is returned at most once.
unsafe fn return_free_slot(&self, slot: u8) {
critical_section::with(|cs| {
fence(Ordering::SeqCst);
@@ -216,11 +165,13 @@ impl<T, const N: usize> Channel<T, N> {
}
})
}
-}
-impl<T, const N: usize> Drop for Channel<T, N> {
- fn drop(&mut self) {
- self.clear();
+ /// SAFETY: the caller must guarantee that `slot` is an `u8` obtained by dequeueing from [`Self::readyq`],
+ /// and is read at most once.
+ unsafe fn read_slot(&self, slot: u8) -> T {
+ let first_element = self.slots.get_unchecked(slot as usize).get_mut();
+ let ptr = first_element.deref().as_ptr();
+ ptr::read(ptr)
}
}
@@ -641,14 +592,13 @@ impl<T, const N: usize> Receiver<'_, T, N> {
if let Some(rs) = ready_slot {
// Read the value from the slots, note; this memcpy is not under a critical section.
- let r = unsafe {
- let first_element = self.0.slots.get_unchecked(rs as usize).get_mut();
- let ptr = first_element.deref().as_ptr();
- ptr::read(ptr)
- };
+ // SAFETY: `rs` is directly obtained from `self.0.readyq` and is read exactly
+ // once.
+ let r = unsafe { self.0.read_slot(rs) };
// Return the index to the free queue after we've read the value.
- // SAFETY: `rs` comes directly from `readyq`.
+ // SAFETY: `rs` comes directly from `readyq` and is only returned
+ // once.
unsafe { self.0.return_free_slot(rs) };
Ok(r)
@@ -717,6 +667,19 @@ impl<T, const N: usize> Drop for Receiver<'_, T, N> {
self.0.receiver_dropped(cs, |v| *v = true);
});
+ let ready_slot = || {
+ critical_section::with(|cs| unsafe {
+ // SAFETY: `self.0.readyq` is not called recursively.
+ self.0.readyq(cs, |q| q.pop_back())
+ })
+ };
+
+ while let Some(slot) = ready_slot() {
+ // SAFETY: `slot` comes from `readyq` and is
+ // read exactly once.
+ drop(unsafe { self.0.read_slot(slot) })
+ }
+
while let Some((waker, _)) = self.0.wait_queue.pop() {
waker.wake();
}
@@ -886,44 +849,11 @@ mod tests {
tx.try_send(SetToTrueOnDrop(value.clone())).unwrap();
drop((tx, rx));
- drop(channel);
assert!(value.load(Ordering::SeqCst));
}
#[test]
- pub fn cleared_item_is_dropped() {
- let mut channel: Channel<SetToTrueOnDrop, 1> = Channel::new();
-
- let (mut tx, rx) = channel.split();
-
- let value = Arc::new(AtomicBool::new(false));
- tx.try_send(SetToTrueOnDrop(value.clone())).unwrap();
-
- drop((tx, rx));
-
- assert!(!value.load(Ordering::SeqCst));
-
- channel.clear();
-
- assert!(value.load(Ordering::SeqCst));
- }
-
- #[test]
- #[should_panic]
- pub fn splitting_non_empty_channel_panics() {
- let mut channel: Channel<(), 1> = Channel::new();
-
- let (mut tx, rx) = channel.split();
-
- tx.try_send(()).unwrap();
-
- drop((tx, rx));
-
- channel.split();
- }
-
- #[test]
pub fn splitting_empty_channel_works() {
let mut channel: Channel<(), 1> = Channel::new();
@@ -933,7 +863,6 @@ mod tests {
drop((tx, rx));
- channel.clear();
channel.split();
}
}