aboutsummaryrefslogtreecommitdiff
path: root/rtic-channel
diff options
context:
space:
mode:
authorEmil Fresk <emil.fresk@gmail.com>2023-01-29 20:16:23 +0100
committerHenrik Tjäder <henrik@tjaders.com>2023-03-01 00:33:38 +0100
commite65e532c2a342f77080ac6fc8e5be11aa7d82575 (patch)
tree5a1f21ad66e277bd13e75c8f29bbd89ba4e10a46 /rtic-channel
parent58692a35e87ddc8b8faca5bb262070d343ceb869 (diff)
Move common data structures to `rtic-common`
Diffstat (limited to 'rtic-channel')
-rw-r--r--rtic-channel/Cargo.toml3
-rw-r--r--rtic-channel/src/lib.rs17
-rw-r--r--rtic-channel/src/wait_queue.rs268
-rw-r--r--rtic-channel/src/waker_registration.rs64
4 files changed, 9 insertions, 343 deletions
diff --git a/rtic-channel/Cargo.toml b/rtic-channel/Cargo.toml
index 5d4cbd0..a0955bc 100644
--- a/rtic-channel/Cargo.toml
+++ b/rtic-channel/Cargo.toml
@@ -8,6 +8,7 @@ edition = "2021"
[dependencies]
heapless = "0.7"
critical-section = "1"
+rtic-common = { version = "1.0.0", path = "../rtic-common" }
[dev-dependencies]
tokio = { version = "1", features = ["rt", "macros", "time"] }
@@ -15,4 +16,4 @@ tokio = { version = "1", features = ["rt", "macros", "time"] }
[features]
default = []
-testing = ["critical-section/std"]
+testing = ["critical-section/std", "rtic-common/testing"]
diff --git a/rtic-channel/src/lib.rs b/rtic-channel/src/lib.rs
index acfa801..2b237f6 100644
--- a/rtic-channel/src/lib.rs
+++ b/rtic-channel/src/lib.rs
@@ -14,11 +14,8 @@ use core::{
task::{Poll, Waker},
};
use heapless::Deque;
-use wait_queue::WaitQueue;
-use waker_registration::CriticalSectionWakerRegistration as WakerRegistration;
-
-mod wait_queue;
-mod waker_registration;
+use rtic_common::wait_queue::{Link, WaitQueue};
+use rtic_common::waker_registration::CriticalSectionWakerRegistration as WakerRegistration;
/// An MPSC channel for use in no-alloc systems. `N` sets the size of the queue.
///
@@ -136,11 +133,11 @@ unsafe impl<'a, T, const N: usize> Send for Sender<'a, T, N> {}
/// This is needed to make the async closure in `send` accept that we "share"
/// the link possible between threads.
#[derive(Clone)]
-struct LinkPtr(*mut Option<wait_queue::Link<Waker>>);
+struct LinkPtr(*mut Option<Link<Waker>>);
impl LinkPtr {
/// This will dereference the pointer stored within and give out an `&mut`.
- unsafe fn get(&mut self) -> &mut Option<wait_queue::Link<Waker>> {
+ unsafe fn get(&mut self) -> &mut Option<Link<Waker>> {
&mut *self.0
}
}
@@ -200,10 +197,10 @@ impl<'a, T, const N: usize> Sender<'a, T, N> {
/// Send a value. If there is no place left in the queue this will wait until there is.
/// If the receiver does not exist this will return an error.
pub async fn send(&mut self, val: T) -> Result<(), NoReceiver<T>> {
- let mut link_ptr: Option<wait_queue::Link<Waker>> = None;
+ let mut link_ptr: Option<Link<Waker>> = None;
// Make this future `Drop`-safe, also shadow the original definition so we can't abuse it.
- let mut link_ptr = LinkPtr(&mut link_ptr as *mut Option<wait_queue::Link<Waker>>);
+ let mut link_ptr = LinkPtr(&mut link_ptr as *mut Option<Link<Waker>>);
let mut link_ptr2 = link_ptr.clone();
let dropper = OnDrop::new(|| {
@@ -236,7 +233,7 @@ impl<'a, T, const N: usize> Sender<'a, T, N> {
}
} else {
// Place the link in the wait queue on first run.
- let link_ref = link.insert(wait_queue::Link::new(cx.waker().clone()));
+ let link_ref = link.insert(Link::new(cx.waker().clone()));
// SAFETY: The address to the link is stable as it is hidden behind
// `link_ptr`, and `link_ptr` shadows the original making it unmovable.
diff --git a/rtic-channel/src/wait_queue.rs b/rtic-channel/src/wait_queue.rs
deleted file mode 100644
index 2de6311..0000000
--- a/rtic-channel/src/wait_queue.rs
+++ /dev/null
@@ -1,268 +0,0 @@
-//! ...
-
-use core::marker::PhantomPinned;
-use core::pin::Pin;
-use core::ptr::null_mut;
-use core::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
-use core::task::Waker;
-use critical_section as cs;
-
-pub type WaitQueue = LinkedList<Waker>;
-
-/// A FIFO linked list for a wait queue.
-pub struct LinkedList<T> {
- head: AtomicPtr<Link<T>>, // UnsafeCell<*mut Link<T>>
- tail: AtomicPtr<Link<T>>,
-}
-
-impl<T> LinkedList<T> {
- /// Create a new linked list.
- pub const fn new() -> Self {
- Self {
- head: AtomicPtr::new(null_mut()),
- tail: AtomicPtr::new(null_mut()),
- }
- }
-}
-
-impl<T: Clone> LinkedList<T> {
- const R: Ordering = Ordering::Relaxed;
-
- /// Pop the first element in the queue.
- pub fn pop(&self) -> Option<T> {
- cs::with(|_| {
- // Make sure all previous writes are visible
- core::sync::atomic::fence(Ordering::SeqCst);
-
- let head = self.head.load(Self::R);
-
- // SAFETY: `as_ref` is safe as `insert` requires a valid reference to a link
- if let Some(head_ref) = unsafe { head.as_ref() } {
- // Move head to the next element
- self.head.store(head_ref.next.load(Self::R), Self::R);
-
- // We read the value at head
- let head_val = head_ref.val.clone();
-
- let tail = self.tail.load(Self::R);
- if head == tail {
- // The queue is empty
- self.tail.store(null_mut(), Self::R);
- }
-
- if let Some(next_ref) = unsafe { head_ref.next.load(Self::R).as_ref() } {
- next_ref.prev.store(null_mut(), Self::R);
- }
-
- // Clear the pointers in the node.
- head_ref.next.store(null_mut(), Self::R);
- head_ref.prev.store(null_mut(), Self::R);
- head_ref.is_poped.store(true, Self::R);
-
- return Some(head_val);
- }
-
- None
- })
- }
-
- /// Put an element at the back of the queue.
- pub fn push(&self, link: Pin<&mut Link<T>>) {
- cs::with(|_| {
- // Make sure all previous writes are visible
- core::sync::atomic::fence(Ordering::SeqCst);
-
- let tail = self.tail.load(Self::R);
-
- // SAFETY: This datastructure does not move the underlying value.
- let link = unsafe { link.get_unchecked_mut() };
-
- if let Some(tail_ref) = unsafe { tail.as_ref() } {
- // Queue is not empty
- link.prev.store(tail, Self::R);
- self.tail.store(link, Self::R);
- tail_ref.next.store(link, Self::R);
- } else {
- // Queue is empty
- self.tail.store(link, Self::R);
- self.head.store(link, Self::R);
- }
- });
- }
-
- /// Check if the queue is empty.
- pub fn is_empty(&self) -> bool {
- self.head.load(Self::R).is_null()
- }
-}
-
-/// A link in the linked list.
-pub struct Link<T> {
- pub(crate) val: T,
- next: AtomicPtr<Link<T>>,
- prev: AtomicPtr<Link<T>>,
- is_poped: AtomicBool,
- _up: PhantomPinned,
-}
-
-impl<T: Clone> Link<T> {
- const R: Ordering = Ordering::Relaxed;
-
- /// Create a new link.
- pub const fn new(val: T) -> Self {
- Self {
- val,
- next: AtomicPtr::new(null_mut()),
- prev: AtomicPtr::new(null_mut()),
- is_poped: AtomicBool::new(false),
- _up: PhantomPinned,
- }
- }
-
- pub fn is_poped(&self) -> bool {
- self.is_poped.load(Self::R)
- }
-
- pub fn remove_from_list(&mut self, list: &LinkedList<T>) {
- cs::with(|_| {
- // Make sure all previous writes are visible
- core::sync::atomic::fence(Ordering::SeqCst);
-
- let prev = self.prev.load(Self::R);
- let next = self.next.load(Self::R);
- self.is_poped.store(true, Self::R);
-
- match unsafe { (prev.as_ref(), next.as_ref()) } {
- (None, None) => {
- // Not in the list or alone in the list, check if list head == node address
- let sp = self as *const _;
-
- if sp == list.head.load(Ordering::Relaxed) {
- list.head.store(null_mut(), Self::R);
- list.tail.store(null_mut(), Self::R);
- }
- }
- (None, Some(next_ref)) => {
- // First in the list
- next_ref.prev.store(null_mut(), Self::R);
- list.head.store(next, Self::R);
- }
- (Some(prev_ref), None) => {
- // Last in the list
- prev_ref.next.store(null_mut(), Self::R);
- list.tail.store(prev, Self::R);
- }
- (Some(prev_ref), Some(next_ref)) => {
- // Somewhere in the list
-
- // Connect the `prev.next` and `next.prev` with each other to remove the node
- prev_ref.next.store(next, Self::R);
- next_ref.prev.store(prev, Self::R);
- }
- }
- })
- }
-}
-
-#[cfg(test)]
-impl<T: core::fmt::Debug + Clone> LinkedList<T> {
- fn print(&self) {
- cs::with(|_| {
- // Make sure all previous writes are visible
- core::sync::atomic::fence(Ordering::SeqCst);
-
- let mut head = self.head.load(Self::R);
- let tail = self.tail.load(Self::R);
-
- println!(
- "List - h = 0x{:x}, t = 0x{:x}",
- head as usize, tail as usize
- );
-
- let mut i = 0;
-
- // SAFETY: `as_ref` is safe as `insert` requires a valid reference to a link
- while let Some(head_ref) = unsafe { head.as_ref() } {
- println!(
- " {}: {:?}, s = 0x{:x}, n = 0x{:x}, p = 0x{:x}",
- i,
- head_ref.val,
- head as usize,
- head_ref.next.load(Ordering::Relaxed) as usize,
- head_ref.prev.load(Ordering::Relaxed) as usize
- );
-
- head = head_ref.next.load(Self::R);
-
- i += 1;
- }
- });
- }
-}
-
-#[cfg(test)]
-impl<T: core::fmt::Debug + Clone> Link<T> {
- fn print(&self) {
- cs::with(|_| {
- // Make sure all previous writes are visible
- core::sync::atomic::fence(Ordering::SeqCst);
-
- println!("Link:");
-
- println!(
- " val = {:?}, n = 0x{:x}, p = 0x{:x}",
- self.val,
- self.next.load(Ordering::Relaxed) as usize,
- self.prev.load(Ordering::Relaxed) as usize
- );
- });
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn linked_list() {
- let wq = LinkedList::<u32>::new();
-
- let mut i1 = Link::new(10);
- let mut i2 = Link::new(11);
- let mut i3 = Link::new(12);
- let mut i4 = Link::new(13);
- let mut i5 = Link::new(14);
-
- wq.push(unsafe { Pin::new_unchecked(&mut i1) });
- wq.push(unsafe { Pin::new_unchecked(&mut i2) });
- wq.push(unsafe { Pin::new_unchecked(&mut i3) });
- wq.push(unsafe { Pin::new_unchecked(&mut i4) });
- wq.push(unsafe { Pin::new_unchecked(&mut i5) });
-
- wq.print();
-
- wq.pop();
- i1.print();
-
- wq.print();
-
- i4.remove_from_list(&wq);
-
- wq.print();
-
- // i1.remove_from_list(&wq);
- // wq.print();
-
- println!("i2");
- i2.remove_from_list(&wq);
- wq.print();
-
- println!("i3");
- i3.remove_from_list(&wq);
- wq.print();
-
- println!("i5");
- i5.remove_from_list(&wq);
- wq.print();
- }
-}
diff --git a/rtic-channel/src/waker_registration.rs b/rtic-channel/src/waker_registration.rs
deleted file mode 100644
index c30df7f..0000000
--- a/rtic-channel/src/waker_registration.rs
+++ /dev/null
@@ -1,64 +0,0 @@
-use core::cell::UnsafeCell;
-use core::task::Waker;
-
-/// A critical section based waker handler.
-pub struct CriticalSectionWakerRegistration {
- waker: UnsafeCell<Option<Waker>>,
-}
-
-unsafe impl Send for CriticalSectionWakerRegistration {}
-unsafe impl Sync for CriticalSectionWakerRegistration {}
-
-impl CriticalSectionWakerRegistration {
- /// Create a new waker registration.
- pub const fn new() -> Self {
- Self {
- waker: UnsafeCell::new(None),
- }
- }
-
- /// Register a waker.
- /// This will overwrite the previous waker if there was one.
- pub fn register(&self, new_waker: &Waker) {
- critical_section::with(|_| {
- // SAFETY: This access is protected by the critical section.
- let self_waker = unsafe { &mut *self.waker.get() };
-
- // From embassy
- // https://github.com/embassy-rs/embassy/blob/b99533607ceed225dd12ae73aaa9a0d969a7365e/embassy-sync/src/waitqueue/waker.rs#L59-L61
- match self_waker {
- // Optimization: If both the old and new Wakers wake the same task, we can simply
- // keep the old waker, skipping the clone. (In most executor implementations,
- // cloning a waker is somewhat expensive, comparable to cloning an Arc).
- Some(ref w2) if (w2.will_wake(new_waker)) => {}
- _ => {
- // clone the new waker and store it
- if let Some(old_waker) = core::mem::replace(self_waker, Some(new_waker.clone()))
- {
- // We had a waker registered for another task. Wake it, so the other task can
- // reregister itself if it's still interested.
- //
- // If two tasks are waiting on the same thing concurrently, this will cause them
- // to wake each other in a loop fighting over this WakerRegistration. This wastes
- // CPU but things will still work.
- //
- // If the user wants to have two tasks waiting on the same thing they should use
- // a more appropriate primitive that can store multiple wakers.
- old_waker.wake()
- }
- }
- }
- });
- }
-
- /// Wake the waker.
- pub fn wake(&self) {
- critical_section::with(|_| {
- // SAFETY: This access is protected by the critical section.
- let self_waker = unsafe { &mut *self.waker.get() };
- if let Some(waker) = self_waker.take() {
- waker.wake()
- }
- });
- }
-}