aboutsummaryrefslogtreecommitdiff
path: root/rtic-sync
diff options
context:
space:
mode:
authordatdenkikniet <jcdra1@gmail.com>2025-03-13 21:25:46 +0100
committerEmil Fresk <emil.fresk@gmail.com>2025-03-16 11:19:22 +0000
commit70f57c3160565dc3f2c0b060b1349eb2dd2a13b5 (patch)
tree3c933b87b8eb8e6fa010c51c1511122257f44217 /rtic-sync
parent6903d208b69822608bc8e94e1384b4a7e20c1f91 (diff)
rtic-sync: explicitly send an awoken Sender the slot it can use
Diffstat (limited to 'rtic-sync')
-rw-r--r--rtic-sync/src/channel.rs104
1 files changed, 71 insertions, 33 deletions
diff --git a/rtic-sync/src/channel.rs b/rtic-sync/src/channel.rs
index aca8cf4..5b83fce 100644
--- a/rtic-sync/src/channel.rs
+++ b/rtic-sync/src/channel.rs
@@ -12,15 +12,17 @@ use core::{
#[doc(hidden)]
pub use critical_section;
use heapless::Deque;
-use rtic_common::waker_registration::CriticalSectionWakerRegistration as WakerRegistration;
use rtic_common::{
- dropper::OnDrop,
- wait_queue::{Link, WaitQueue},
+ dropper::OnDrop, wait_queue::DoublyLinkedList, wait_queue::Link,
+ waker_registration::CriticalSectionWakerRegistration as WakerRegistration,
};
#[cfg(feature = "defmt-03")]
use crate::defmt;
+type WaitQueueData = (Waker, SlotPtr);
+type WaitQueue = DoublyLinkedList<WaitQueueData>;
+
/// An MPSC channel for use in no-alloc systems. `N` sets the size of the queue.
///
/// This channel uses critical sections, however there are extremely small and all `memcpy`
@@ -192,11 +194,11 @@ unsafe impl<T, const N: usize> Send for Sender<'_, T, N> {}
/// This is needed to make the async closure in `send` accept that we "share"
/// the link possible between threads.
#[derive(Clone)]
-struct LinkPtr(*mut Option<Link<Waker>>);
+struct LinkPtr(*mut Option<Link<WaitQueueData>>);
impl LinkPtr {
/// This will dereference the pointer stored within and give out an `&mut`.
- unsafe fn get(&mut self) -> &mut Option<Link<Waker>> {
+ unsafe fn get(&mut self) -> &mut Option<Link<WaitQueueData>> {
&mut *self.0
}
}
@@ -205,6 +207,28 @@ unsafe impl Send for LinkPtr {}
unsafe impl Sync for LinkPtr {}
+/// This is needed to make the async closure in `send` accept that we "share"
+/// the link possible between threads.
+#[derive(Clone)]
+struct SlotPtr(*mut Option<u8>);
+
+impl SlotPtr {
+ /// Replace the value of this slot with `new_value`, and return
+ /// the old value.
+ fn replace(
+ &mut self,
+ new_value: Option<u8>,
+ _cs: critical_section::CriticalSection,
+ ) -> Option<u8> {
+ // SAFETY: we are in a critical section.
+ unsafe { core::ptr::replace(self.0, new_value) }
+ }
+}
+
+unsafe impl Send for SlotPtr {}
+
+unsafe impl Sync for SlotPtr {}
+
impl<T, const N: usize> core::fmt::Debug for Sender<'_, T, N> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "Sender")
@@ -268,13 +292,17 @@ impl<T, const N: usize> Sender<'_, T, N> {
/// Send a value. If there is no place left in the queue this will wait until there is.
/// If the receiver does not exist this will return an error.
pub async fn send(&mut self, val: T) -> Result<(), NoReceiver<T>> {
- let mut link_ptr: Option<Link<Waker>> = None;
+ let mut free_slot_ptr: Option<u8> = None;
+ let mut link_ptr: Option<Link<WaitQueueData>> = None;
// Make this future `Drop`-safe.
// SAFETY(link_ptr): Shadow the original definition of `link_ptr` so we can't abuse it.
- let mut link_ptr = LinkPtr(&mut link_ptr as *mut Option<Link<Waker>>);
+ let mut link_ptr = LinkPtr(core::ptr::addr_of_mut!(link_ptr));
+ // SAFETY(freed_slot): Shadow the original definition of `free_slot_ptr` so we can't abuse it.
+ let mut free_slot_ptr = SlotPtr(core::ptr::addr_of_mut!(free_slot_ptr));
let mut link_ptr2 = link_ptr.clone();
+ let mut free_slot_ptr2 = free_slot_ptr.clone();
let dropper = OnDrop::new(|| {
// SAFETY: We only run this closure and dereference the pointer if we have
// exited the `poll_fn` below in the `drop(dropper)` call. The other dereference
@@ -282,30 +310,41 @@ impl<T, const N: usize> Sender<'_, T, N> {
if let Some(link) = unsafe { link_ptr2.get() } {
link.remove_from_list(&self.0.wait_queue);
}
+
+ // Potentially unnecessary c-s because our link was already popped, so there
+ // is no way for anything else to access the free slot ptr. Gotta think
+ // about this a bit more...
+ critical_section::with(|cs| {
+ if let Some(freed_slot) = free_slot_ptr2.replace(None, cs) {
+ debug_assert!(!self.0.access(cs).freeq.is_full());
+ // SAFETY: freeq is not full.
+ unsafe {
+ self.0.access(cs).freeq.push_back_unchecked(freed_slot);
+ }
+ }
+ });
});
let idx = poll_fn(|cx| {
- if self.is_closed() {
- return Poll::Ready(Err(()));
- }
-
// Do all this in one critical section, else there can be race conditions
- let queue_idx = critical_section::with(|cs| {
+ critical_section::with(|cs| {
let wq_empty = self.0.wait_queue.is_empty();
let fq_empty = self.0.access(cs).freeq.is_empty();
+
if !wq_empty || fq_empty {
// SAFETY: This pointer is only dereferenced here and on drop of the future
// which happens outside this `poll_fn`'s stack frame.
let link = unsafe { link_ptr.get() };
if let Some(link) = link {
if !link.is_popped() {
- return None;
+ return Poll::Pending;
} else {
// Fall through to dequeue
}
} else {
// Place the link in the wait queue on first run.
- let link_ref = link.insert(Link::new(cx.waker().clone()));
+ let link_ref =
+ link.insert(Link::new((cx.waker().clone(), free_slot_ptr.clone())));
// SAFETY(new_unchecked): The address to the link is stable as it is defined
// outside this stack frame.
@@ -315,23 +354,21 @@ impl<T, const N: usize> Sender<'_, T, N> {
// `link_ptr` lives until the end of the stack frame.
unsafe { self.0.wait_queue.push(Pin::new_unchecked(link_ref)) };
- return None;
+ return Poll::Pending;
}
}
- assert!(!self.0.access(cs).freeq.is_empty());
- // Get index as the queue is guaranteed not empty and the wait queue is empty
- let idx = unsafe { self.0.access(cs).freeq.pop_front_unchecked() };
-
- Some(idx)
- });
+ let slot = free_slot_ptr
+ .replace(None, cs)
+ .or_else(|| self.0.access(cs).freeq.pop_back());
- if let Some(idx) = queue_idx {
- // Return the index
- Poll::Ready(Ok(idx))
- } else {
- Poll::Pending
- }
+ if let Some(slot) = slot {
+ Poll::Ready(Ok(slot))
+ } else {
+ debug_assert!(self.is_closed());
+ Poll::Ready(Err(()))
+ }
+ })
})
.await;
@@ -430,14 +467,15 @@ impl<T, const N: usize> Receiver<'_, T, N> {
// Return the index to the free queue after we've read the value.
critical_section::with(|cs| {
- assert!(!self.0.access(cs).freeq.is_full());
- unsafe { self.0.access(cs).freeq.push_back_unchecked(rs) }
-
fence(Ordering::SeqCst);
- // If someone is waiting in the WaiterQueue, wake the first one up.
- if let Some(wait_head) = self.0.wait_queue.pop() {
+ // If someone is waiting in the WaiterQueue, wake the first one up & hand it the free slot.
+ if let Some((wait_head, mut freeq_slot)) = self.0.wait_queue.pop() {
+ freeq_slot.replace(Some(rs), cs);
wait_head.wake();
+ } else {
+ assert!(!self.0.access(cs).freeq.is_full());
+ unsafe { self.0.access(cs).freeq.push_back_unchecked(rs) }
}
Ok(r)
@@ -495,7 +533,7 @@ impl<T, const N: usize> Drop for Receiver<'_, T, N> {
// Mark the receiver as dropped and wake all waiters
critical_section::with(|cs| *self.0.access(cs).receiver_dropped = true);
- while let Some(waker) = self.0.wait_queue.pop() {
+ while let Some((waker, _)) = self.0.wait_queue.pop() {
waker.wake();
}
}