diff options
| author | Emil Fresk <emil.fresk@gmail.com> | 2023-01-29 20:16:23 +0100 |
|---|---|---|
| committer | Henrik Tjäder <henrik@tjaders.com> | 2023-03-01 00:33:38 +0100 |
| commit | e65e532c2a342f77080ac6fc8e5be11aa7d82575 (patch) | |
| tree | 5a1f21ad66e277bd13e75c8f29bbd89ba4e10a46 /rtic-channel | |
| parent | 58692a35e87ddc8b8faca5bb262070d343ceb869 (diff) | |
Move common data structures to `rtic-common`
Diffstat (limited to 'rtic-channel')
| -rw-r--r-- | rtic-channel/Cargo.toml | 3 | ||||
| -rw-r--r-- | rtic-channel/src/lib.rs | 17 | ||||
| -rw-r--r-- | rtic-channel/src/wait_queue.rs | 268 | ||||
| -rw-r--r-- | rtic-channel/src/waker_registration.rs | 64 |
4 files changed, 9 insertions, 343 deletions
diff --git a/rtic-channel/Cargo.toml b/rtic-channel/Cargo.toml index 5d4cbd0..a0955bc 100644 --- a/rtic-channel/Cargo.toml +++ b/rtic-channel/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] heapless = "0.7" critical-section = "1" +rtic-common = { version = "1.0.0", path = "../rtic-common" } [dev-dependencies] tokio = { version = "1", features = ["rt", "macros", "time"] } @@ -15,4 +16,4 @@ tokio = { version = "1", features = ["rt", "macros", "time"] } [features] default = [] -testing = ["critical-section/std"] +testing = ["critical-section/std", "rtic-common/testing"] diff --git a/rtic-channel/src/lib.rs b/rtic-channel/src/lib.rs index acfa801..2b237f6 100644 --- a/rtic-channel/src/lib.rs +++ b/rtic-channel/src/lib.rs @@ -14,11 +14,8 @@ use core::{ task::{Poll, Waker}, }; use heapless::Deque; -use wait_queue::WaitQueue; -use waker_registration::CriticalSectionWakerRegistration as WakerRegistration; - -mod wait_queue; -mod waker_registration; +use rtic_common::wait_queue::{Link, WaitQueue}; +use rtic_common::waker_registration::CriticalSectionWakerRegistration as WakerRegistration; /// An MPSC channel for use in no-alloc systems. `N` sets the size of the queue. /// @@ -136,11 +133,11 @@ unsafe impl<'a, T, const N: usize> Send for Sender<'a, T, N> {} /// This is needed to make the async closure in `send` accept that we "share" /// the link possible between threads. #[derive(Clone)] -struct LinkPtr(*mut Option<wait_queue::Link<Waker>>); +struct LinkPtr(*mut Option<Link<Waker>>); impl LinkPtr { /// This will dereference the pointer stored within and give out an `&mut`. - unsafe fn get(&mut self) -> &mut Option<wait_queue::Link<Waker>> { + unsafe fn get(&mut self) -> &mut Option<Link<Waker>> { &mut *self.0 } } @@ -200,10 +197,10 @@ impl<'a, T, const N: usize> Sender<'a, T, N> { /// Send a value. If there is no place left in the queue this will wait until there is. /// If the receiver does not exist this will return an error. pub async fn send(&mut self, val: T) -> Result<(), NoReceiver<T>> { - let mut link_ptr: Option<wait_queue::Link<Waker>> = None; + let mut link_ptr: Option<Link<Waker>> = None; // Make this future `Drop`-safe, also shadow the original definition so we can't abuse it. - let mut link_ptr = LinkPtr(&mut link_ptr as *mut Option<wait_queue::Link<Waker>>); + let mut link_ptr = LinkPtr(&mut link_ptr as *mut Option<Link<Waker>>); let mut link_ptr2 = link_ptr.clone(); let dropper = OnDrop::new(|| { @@ -236,7 +233,7 @@ impl<'a, T, const N: usize> Sender<'a, T, N> { } } else { // Place the link in the wait queue on first run. - let link_ref = link.insert(wait_queue::Link::new(cx.waker().clone())); + let link_ref = link.insert(Link::new(cx.waker().clone())); // SAFETY: The address to the link is stable as it is hidden behind // `link_ptr`, and `link_ptr` shadows the original making it unmovable. diff --git a/rtic-channel/src/wait_queue.rs b/rtic-channel/src/wait_queue.rs deleted file mode 100644 index 2de6311..0000000 --- a/rtic-channel/src/wait_queue.rs +++ /dev/null @@ -1,268 +0,0 @@ -//! ... - -use core::marker::PhantomPinned; -use core::pin::Pin; -use core::ptr::null_mut; -use core::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; -use core::task::Waker; -use critical_section as cs; - -pub type WaitQueue = LinkedList<Waker>; - -/// A FIFO linked list for a wait queue. -pub struct LinkedList<T> { - head: AtomicPtr<Link<T>>, // UnsafeCell<*mut Link<T>> - tail: AtomicPtr<Link<T>>, -} - -impl<T> LinkedList<T> { - /// Create a new linked list. - pub const fn new() -> Self { - Self { - head: AtomicPtr::new(null_mut()), - tail: AtomicPtr::new(null_mut()), - } - } -} - -impl<T: Clone> LinkedList<T> { - const R: Ordering = Ordering::Relaxed; - - /// Pop the first element in the queue. - pub fn pop(&self) -> Option<T> { - cs::with(|_| { - // Make sure all previous writes are visible - core::sync::atomic::fence(Ordering::SeqCst); - - let head = self.head.load(Self::R); - - // SAFETY: `as_ref` is safe as `insert` requires a valid reference to a link - if let Some(head_ref) = unsafe { head.as_ref() } { - // Move head to the next element - self.head.store(head_ref.next.load(Self::R), Self::R); - - // We read the value at head - let head_val = head_ref.val.clone(); - - let tail = self.tail.load(Self::R); - if head == tail { - // The queue is empty - self.tail.store(null_mut(), Self::R); - } - - if let Some(next_ref) = unsafe { head_ref.next.load(Self::R).as_ref() } { - next_ref.prev.store(null_mut(), Self::R); - } - - // Clear the pointers in the node. - head_ref.next.store(null_mut(), Self::R); - head_ref.prev.store(null_mut(), Self::R); - head_ref.is_poped.store(true, Self::R); - - return Some(head_val); - } - - None - }) - } - - /// Put an element at the back of the queue. - pub fn push(&self, link: Pin<&mut Link<T>>) { - cs::with(|_| { - // Make sure all previous writes are visible - core::sync::atomic::fence(Ordering::SeqCst); - - let tail = self.tail.load(Self::R); - - // SAFETY: This datastructure does not move the underlying value. - let link = unsafe { link.get_unchecked_mut() }; - - if let Some(tail_ref) = unsafe { tail.as_ref() } { - // Queue is not empty - link.prev.store(tail, Self::R); - self.tail.store(link, Self::R); - tail_ref.next.store(link, Self::R); - } else { - // Queue is empty - self.tail.store(link, Self::R); - self.head.store(link, Self::R); - } - }); - } - - /// Check if the queue is empty. - pub fn is_empty(&self) -> bool { - self.head.load(Self::R).is_null() - } -} - -/// A link in the linked list. -pub struct Link<T> { - pub(crate) val: T, - next: AtomicPtr<Link<T>>, - prev: AtomicPtr<Link<T>>, - is_poped: AtomicBool, - _up: PhantomPinned, -} - -impl<T: Clone> Link<T> { - const R: Ordering = Ordering::Relaxed; - - /// Create a new link. - pub const fn new(val: T) -> Self { - Self { - val, - next: AtomicPtr::new(null_mut()), - prev: AtomicPtr::new(null_mut()), - is_poped: AtomicBool::new(false), - _up: PhantomPinned, - } - } - - pub fn is_poped(&self) -> bool { - self.is_poped.load(Self::R) - } - - pub fn remove_from_list(&mut self, list: &LinkedList<T>) { - cs::with(|_| { - // Make sure all previous writes are visible - core::sync::atomic::fence(Ordering::SeqCst); - - let prev = self.prev.load(Self::R); - let next = self.next.load(Self::R); - self.is_poped.store(true, Self::R); - - match unsafe { (prev.as_ref(), next.as_ref()) } { - (None, None) => { - // Not in the list or alone in the list, check if list head == node address - let sp = self as *const _; - - if sp == list.head.load(Ordering::Relaxed) { - list.head.store(null_mut(), Self::R); - list.tail.store(null_mut(), Self::R); - } - } - (None, Some(next_ref)) => { - // First in the list - next_ref.prev.store(null_mut(), Self::R); - list.head.store(next, Self::R); - } - (Some(prev_ref), None) => { - // Last in the list - prev_ref.next.store(null_mut(), Self::R); - list.tail.store(prev, Self::R); - } - (Some(prev_ref), Some(next_ref)) => { - // Somewhere in the list - - // Connect the `prev.next` and `next.prev` with each other to remove the node - prev_ref.next.store(next, Self::R); - next_ref.prev.store(prev, Self::R); - } - } - }) - } -} - -#[cfg(test)] -impl<T: core::fmt::Debug + Clone> LinkedList<T> { - fn print(&self) { - cs::with(|_| { - // Make sure all previous writes are visible - core::sync::atomic::fence(Ordering::SeqCst); - - let mut head = self.head.load(Self::R); - let tail = self.tail.load(Self::R); - - println!( - "List - h = 0x{:x}, t = 0x{:x}", - head as usize, tail as usize - ); - - let mut i = 0; - - // SAFETY: `as_ref` is safe as `insert` requires a valid reference to a link - while let Some(head_ref) = unsafe { head.as_ref() } { - println!( - " {}: {:?}, s = 0x{:x}, n = 0x{:x}, p = 0x{:x}", - i, - head_ref.val, - head as usize, - head_ref.next.load(Ordering::Relaxed) as usize, - head_ref.prev.load(Ordering::Relaxed) as usize - ); - - head = head_ref.next.load(Self::R); - - i += 1; - } - }); - } -} - -#[cfg(test)] -impl<T: core::fmt::Debug + Clone> Link<T> { - fn print(&self) { - cs::with(|_| { - // Make sure all previous writes are visible - core::sync::atomic::fence(Ordering::SeqCst); - - println!("Link:"); - - println!( - " val = {:?}, n = 0x{:x}, p = 0x{:x}", - self.val, - self.next.load(Ordering::Relaxed) as usize, - self.prev.load(Ordering::Relaxed) as usize - ); - }); - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn linked_list() { - let wq = LinkedList::<u32>::new(); - - let mut i1 = Link::new(10); - let mut i2 = Link::new(11); - let mut i3 = Link::new(12); - let mut i4 = Link::new(13); - let mut i5 = Link::new(14); - - wq.push(unsafe { Pin::new_unchecked(&mut i1) }); - wq.push(unsafe { Pin::new_unchecked(&mut i2) }); - wq.push(unsafe { Pin::new_unchecked(&mut i3) }); - wq.push(unsafe { Pin::new_unchecked(&mut i4) }); - wq.push(unsafe { Pin::new_unchecked(&mut i5) }); - - wq.print(); - - wq.pop(); - i1.print(); - - wq.print(); - - i4.remove_from_list(&wq); - - wq.print(); - - // i1.remove_from_list(&wq); - // wq.print(); - - println!("i2"); - i2.remove_from_list(&wq); - wq.print(); - - println!("i3"); - i3.remove_from_list(&wq); - wq.print(); - - println!("i5"); - i5.remove_from_list(&wq); - wq.print(); - } -} diff --git a/rtic-channel/src/waker_registration.rs b/rtic-channel/src/waker_registration.rs deleted file mode 100644 index c30df7f..0000000 --- a/rtic-channel/src/waker_registration.rs +++ /dev/null @@ -1,64 +0,0 @@ -use core::cell::UnsafeCell; -use core::task::Waker; - -/// A critical section based waker handler. -pub struct CriticalSectionWakerRegistration { - waker: UnsafeCell<Option<Waker>>, -} - -unsafe impl Send for CriticalSectionWakerRegistration {} -unsafe impl Sync for CriticalSectionWakerRegistration {} - -impl CriticalSectionWakerRegistration { - /// Create a new waker registration. - pub const fn new() -> Self { - Self { - waker: UnsafeCell::new(None), - } - } - - /// Register a waker. - /// This will overwrite the previous waker if there was one. - pub fn register(&self, new_waker: &Waker) { - critical_section::with(|_| { - // SAFETY: This access is protected by the critical section. - let self_waker = unsafe { &mut *self.waker.get() }; - - // From embassy - // https://github.com/embassy-rs/embassy/blob/b99533607ceed225dd12ae73aaa9a0d969a7365e/embassy-sync/src/waitqueue/waker.rs#L59-L61 - match self_waker { - // Optimization: If both the old and new Wakers wake the same task, we can simply - // keep the old waker, skipping the clone. (In most executor implementations, - // cloning a waker is somewhat expensive, comparable to cloning an Arc). - Some(ref w2) if (w2.will_wake(new_waker)) => {} - _ => { - // clone the new waker and store it - if let Some(old_waker) = core::mem::replace(self_waker, Some(new_waker.clone())) - { - // We had a waker registered for another task. Wake it, so the other task can - // reregister itself if it's still interested. - // - // If two tasks are waiting on the same thing concurrently, this will cause them - // to wake each other in a loop fighting over this WakerRegistration. This wastes - // CPU but things will still work. - // - // If the user wants to have two tasks waiting on the same thing they should use - // a more appropriate primitive that can store multiple wakers. - old_waker.wake() - } - } - } - }); - } - - /// Wake the waker. - pub fn wake(&self) { - critical_section::with(|_| { - // SAFETY: This access is protected by the critical section. - let self_waker = unsafe { &mut *self.waker.get() }; - if let Some(waker) = self_waker.take() { - waker.wake() - } - }); - } -} |
