aboutsummaryrefslogtreecommitdiff
path: root/rtic-sync/src
diff options
context:
space:
mode:
authordatdenkikniet <jcdra1@gmail.com>2025-03-13 22:41:16 +0100
committerEmil Fresk <emil.fresk@gmail.com>2025-03-16 11:19:22 +0000
commit4fa3f5ddbac646f0cec0ddb4b95828ce29182ece (patch)
tree661d0dbe9e3b52d5a45571f8722309b1c997f44a /rtic-sync/src
parentdaf977dcff780b70897d4e64d6594ecdaa3f3282 (diff)
rtic-sync: Channel: Sender: rewriter `send` logic to be easier to validate
Diffstat (limited to 'rtic-sync/src')
-rw-r--r--rtic-sync/src/channel.rs71
1 files changed, 39 insertions, 32 deletions
diff --git a/rtic-sync/src/channel.rs b/rtic-sync/src/channel.rs
index d5574d0..00e9476 100644
--- a/rtic-sync/src/channel.rs
+++ b/rtic-sync/src/channel.rs
@@ -333,44 +333,51 @@ impl<T, const N: usize> Sender<'_, T, N> {
// Do all this in one critical section, else there can be race conditions
critical_section::with(|cs| {
let wq_empty = self.0.wait_queue.is_empty();
- let fq_empty = self.0.access(cs).freeq.is_empty();
-
- if !wq_empty || fq_empty {
- // SAFETY: This pointer is only dereferenced here and on drop of the future
- // which happens outside this `poll_fn`'s stack frame.
- let link = unsafe { link_ptr.get() };
- if let Some(link) = link {
- if !link.is_popped() {
- return Poll::Pending;
+ let freeq_empty = self.0.access(cs).freeq.is_empty();
+
+ // SAFETY: This pointer is only dereferenced here and on drop of the future
+ // which happens outside this `poll_fn`'s stack frame.
+ let link = unsafe { link_ptr.get() };
+
+ // We are already in the wait queue.
+ if let Some(link) = link {
+ if link.is_popped() {
+ // If our link is popped, then:
+ // 1. We were popped by `try_recv` and it provided us with a slot.
+ // 2. We were popped by `Receiver::drop` and it did not provide us with a slot, and the channel is closed.
+ let slot = unsafe { free_slot_ptr.replace(None, cs) };
+
+ if let Some(slot) = slot {
+ Poll::Ready(Ok(slot))
} else {
- // Fall through to dequeue
+ Poll::Ready(Err(()))
}
} else {
- // Place the link in the wait queue on first run.
- let link_ref =
- link.insert(Link::new((cx.waker().clone(), free_slot_ptr.clone())));
-
- // SAFETY(new_unchecked): The address to the link is stable as it is defined
- // outside this stack frame.
- // SAFETY(push): `link_ref` lifetime comes from `link_ptr` that is shadowed,
- // and we make sure in `dropper` that the link is removed from the queue
- // before dropping `link_ptr` AND `dropper` makes sure that the shadowed
- // `link_ptr` lives until the end of the stack frame.
- unsafe { self.0.wait_queue.push(Pin::new_unchecked(link_ref)) };
-
- return Poll::Pending;
+ Poll::Pending
}
}
-
- // SAFETY: `free_slot_ptr` is valid for writes, as `free_slot_ptr` is still alive.
- let slot = unsafe { free_slot_ptr.replace(None, cs) }
- .or_else(|| self.0.access(cs).freeq.pop_back());
-
- if let Some(slot) = slot {
+ // We are not in the wait queue, but others are, or there is currently no free
+ // slot available.
+ else if !wq_empty || freeq_empty {
+ // Place the link in the wait queue.
+ let link_ref =
+ link.insert(Link::new((cx.waker().clone(), free_slot_ptr.clone())));
+
+ // SAFETY(new_unchecked): The address to the link is stable as it is defined
+ // outside this stack frame.
+ // SAFETY(push): `link_ref` lifetime comes from `link_ptr` that is shadowed,
+ // and we make sure in `dropper` that the link is removed from the queue
+ // before dropping `link_ptr` AND `dropper` makes sure that the shadowed
+ // `link_ptr` lives until the end of the stack frame.
+ unsafe { self.0.wait_queue.push(Pin::new_unchecked(link_ref)) };
+
+ Poll::Pending
+ }
+ // We are not in the wait queue, no one else is waiting, and there is a free slot available.
+ else {
+ assert!(!self.0.access(cs).freeq.is_empty());
+ let slot = unsafe { self.0.access(cs).freeq.pop_back_unchecked() };
Poll::Ready(Ok(slot))
- } else {
- debug_assert!(self.is_closed());
- Poll::Ready(Err(()))
}
})
})