From 71b5f9438e1beb5fe12b90415d9d6307e79c0cdf Mon Sep 17 00:00:00 2001 From: Emil Fresk Date: Mon, 23 Jan 2023 20:57:56 +0100 Subject: Fixed systick monotonic --- rtic-time/.gitignore | 6 + rtic-time/CHANGELOG.md | 0 rtic-time/Cargo.toml | 10 ++ rtic-time/rust-toolchain.toml | 4 + rtic-time/src/lib.rs | 336 ++++++++++++++++++++++++++++++++++++++++++ rtic-time/src/linked_list.rs | 173 ++++++++++++++++++++++ rtic-time/src/monotonic.rs | 60 ++++++++ 7 files changed, 589 insertions(+) create mode 100644 rtic-time/.gitignore create mode 100644 rtic-time/CHANGELOG.md create mode 100644 rtic-time/Cargo.toml create mode 100644 rtic-time/rust-toolchain.toml create mode 100644 rtic-time/src/lib.rs create mode 100644 rtic-time/src/linked_list.rs create mode 100644 rtic-time/src/monotonic.rs (limited to 'rtic-time') diff --git a/rtic-time/.gitignore b/rtic-time/.gitignore new file mode 100644 index 0000000..c400256 --- /dev/null +++ b/rtic-time/.gitignore @@ -0,0 +1,6 @@ +**/*.rs.bk +.#* +.gdb_history +/target +Cargo.lock +*.hex diff --git a/rtic-time/CHANGELOG.md b/rtic-time/CHANGELOG.md new file mode 100644 index 0000000..e69de29 diff --git a/rtic-time/Cargo.toml b/rtic-time/Cargo.toml new file mode 100644 index 0000000..ea05939 --- /dev/null +++ b/rtic-time/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "rtic-time" +version = "1.0.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +critical-section = "1" +futures-util = { version = "0.3.25", default-features = false } diff --git a/rtic-time/rust-toolchain.toml b/rtic-time/rust-toolchain.toml new file mode 100644 index 0000000..e28b55d --- /dev/null +++ b/rtic-time/rust-toolchain.toml @@ -0,0 +1,4 @@ +[toolchain] +channel = "nightly" +components = [ "rust-src", "rustfmt", "llvm-tools-preview" ] +targets = [ "thumbv6m-none-eabi", "thumbv7m-none-eabi" ] diff --git a/rtic-time/src/lib.rs b/rtic-time/src/lib.rs new file mode 100644 index 0000000..d7faa07 --- /dev/null +++ b/rtic-time/src/lib.rs @@ -0,0 +1,336 @@ +//! Crate + +#![no_std] +#![no_main] +#![deny(missing_docs)] +#![allow(incomplete_features)] +#![feature(async_fn_in_trait)] + +pub mod monotonic; + +use core::future::{poll_fn, Future}; +use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use core::task::{Poll, Waker}; +use futures_util::{ + future::{select, Either}, + pin_mut, +}; +pub use monotonic::Monotonic; + +mod linked_list; + +use linked_list::{Link, LinkedList}; + +/// Holds a waker and at which time instant this waker shall be awoken. +struct WaitingWaker { + waker: Waker, + release_at: Mono::Instant, +} + +impl Clone for WaitingWaker { + fn clone(&self) -> Self { + Self { + waker: self.waker.clone(), + release_at: self.release_at, + } + } +} + +impl PartialEq for WaitingWaker { + fn eq(&self, other: &Self) -> bool { + self.release_at == other.release_at + } +} + +impl PartialOrd for WaitingWaker { + fn partial_cmp(&self, other: &Self) -> Option { + self.release_at.partial_cmp(&other.release_at) + } +} + +/// A generic timer queue for async executors. +/// +/// # Blocking +/// +/// The internal priority queue uses global critical sections to manage access. This means that +/// `await`ing a delay will cause a lock of the entire system for O(n) time. In practice the lock +/// duration is ~10 clock cycles per element in the queue. +/// +/// # Safety +/// +/// This timer queue is based on an intrusive linked list, and by extension the links are strored +/// on the async stacks of callers. The links are deallocated on `drop` or when the wait is +/// complete. +/// +/// Do not call `mem::forget` on an awaited future, or there will be dragons! +pub struct TimerQueue { + queue: LinkedList>, + initialized: AtomicBool, +} + +/// This indicates that there was a timeout. +pub struct TimeoutError; + +impl TimerQueue { + /// Make a new queue. + pub const fn new() -> Self { + Self { + queue: LinkedList::new(), + initialized: AtomicBool::new(false), + } + } + + /// Forwards the `Monotonic::now()` method. + #[inline(always)] + pub fn now(&self) -> Mono::Instant { + Mono::now() + } + + /// Takes the initialized monotonic to initialize the TimerQueue. + pub fn initialize(&self, monotonic: Mono) { + self.initialized.store(true, Ordering::SeqCst); + + // Don't run drop on `Mono` + core::mem::forget(monotonic); + } + + /// Call this in the interrupt handler of the hardware timer supporting the `Monotonic` + /// + /// # Safety + /// + /// It's always safe to call, but it must only be called from the interrupt of the + /// monotonic timer for correct operation. + pub unsafe fn on_monotonic_interrupt(&self) { + Mono::clear_compare_flag(); + Mono::on_interrupt(); + + loop { + let mut release_at = None; + let head = self.queue.pop_if(|head| { + release_at = Some(head.release_at); + + Mono::now() >= head.release_at + }); + + match (head, release_at) { + (Some(link), _) => { + link.waker.wake(); + } + (None, Some(instant)) => { + Mono::enable_timer(); + Mono::set_compare(instant); + + if Mono::now() >= instant { + // The time for the next instant passed while handling it, + // continue dequeueing + continue; + } + + break; + } + (None, None) => { + // Queue is empty + Mono::disable_timer(); + + break; + } + } + } + } + + /// Timeout at a specific time. + pub async fn timeout_at( + &self, + instant: Mono::Instant, + future: F, + ) -> Result { + let delay = self.delay_until(instant); + + pin_mut!(future); + pin_mut!(delay); + + match select(future, delay).await { + Either::Left((r, _)) => Ok(r), + Either::Right(_) => Err(TimeoutError), + } + } + + /// Timeout after a specific duration. + #[inline] + pub async fn timeout_after( + &self, + duration: Mono::Duration, + future: F, + ) -> Result { + self.timeout_at(Mono::now() + duration, future).await + } + + /// Delay for some duration of time. + #[inline] + pub async fn delay(&self, duration: Mono::Duration) { + let now = Mono::now(); + + self.delay_until(now + duration).await; + } + + /// Delay to some specific time instant. + pub async fn delay_until(&self, instant: Mono::Instant) { + if !self.initialized.load(Ordering::Relaxed) { + panic!( + "The timer queue is not initialized with a monotonic, you need to run `initialize`" + ); + } + + let mut first_run = true; + let queue = &self.queue; + let mut link = Link::new(WaitingWaker { + waker: poll_fn(|cx| Poll::Ready(cx.waker().clone())).await, + release_at: instant, + }); + + let marker = &AtomicUsize::new(0); + + let dropper = OnDrop::new(|| { + queue.delete(marker.load(Ordering::Relaxed)); + }); + + poll_fn(|_| { + if Mono::now() >= instant { + return Poll::Ready(()); + } + + if first_run { + first_run = false; + let (was_empty, addr) = queue.insert(&mut link); + marker.store(addr, Ordering::Relaxed); + + if was_empty { + // Pend the monotonic handler if the queue was empty to setup the timer. + Mono::pend_interrupt(); + } + } + + Poll::Pending + }) + .await; + + // Make sure that our link is deleted from the list before we drop this stack + drop(dropper); + } +} + +struct OnDrop { + f: core::mem::MaybeUninit, +} + +impl OnDrop { + pub fn new(f: F) -> Self { + Self { + f: core::mem::MaybeUninit::new(f), + } + } + + #[allow(unused)] + pub fn defuse(self) { + core::mem::forget(self) + } +} + +impl Drop for OnDrop { + fn drop(&mut self) { + unsafe { self.f.as_ptr().read()() } + } +} + +// -------- Test program --------- +// +// +// use systick_monotonic::{Systick, TimerQueue}; +// +// // same panicking *behavior* as `panic-probe` but doesn't print a panic message +// // this prevents the panic message being printed *twice* when `defmt::panic` is invoked +// #[defmt::panic_handler] +// fn panic() -> ! { +// cortex_m::asm::udf() +// } +// +// /// Terminates the application and makes `probe-run` exit with exit-code = 0 +// pub fn exit() -> ! { +// loop { +// cortex_m::asm::bkpt(); +// } +// } +// +// defmt::timestamp!("{=u64:us}", { +// let time_us: fugit::MicrosDurationU32 = MONO.now().duration_since_epoch().convert(); +// +// time_us.ticks() as u64 +// }); +// +// make_systick_timer_queue!(MONO, Systick<1_000>); +// +// #[rtic::app( +// device = nrf52832_hal::pac, +// dispatchers = [SWI0_EGU0, SWI1_EGU1, SWI2_EGU2, SWI3_EGU3, SWI4_EGU4, SWI5_EGU5], +// )] +// mod app { +// use super::{Systick, MONO}; +// use fugit::ExtU32; +// +// #[shared] +// struct Shared {} +// +// #[local] +// struct Local {} +// +// #[init] +// fn init(cx: init::Context) -> (Shared, Local) { +// defmt::println!("init"); +// +// let systick = Systick::start(cx.core.SYST, 64_000_000); +// +// defmt::println!("initializing monotonic"); +// +// MONO.initialize(systick); +// +// async_task::spawn().ok(); +// async_task2::spawn().ok(); +// async_task3::spawn().ok(); +// +// (Shared {}, Local {}) +// } +// +// #[idle] +// fn idle(_: idle::Context) -> ! { +// defmt::println!("idle"); +// +// loop { +// core::hint::spin_loop(); +// } +// } +// +// #[task] +// async fn async_task(_: async_task::Context) { +// loop { +// defmt::println!("async task waiting for 1 second"); +// MONO.delay(1.secs()).await; +// } +// } +// +// #[task] +// async fn async_task2(_: async_task2::Context) { +// loop { +// defmt::println!(" async task 2 waiting for 0.5 second"); +// MONO.delay(500.millis()).await; +// } +// } +// +// #[task] +// async fn async_task3(_: async_task3::Context) { +// loop { +// defmt::println!(" async task 3 waiting for 0.2 second"); +// MONO.delay(200.millis()).await; +// } +// } +// } +// diff --git a/rtic-time/src/linked_list.rs b/rtic-time/src/linked_list.rs new file mode 100644 index 0000000..42ff8cb --- /dev/null +++ b/rtic-time/src/linked_list.rs @@ -0,0 +1,173 @@ +//! ... + +use core::marker::PhantomPinned; +use core::sync::atomic::{AtomicPtr, Ordering}; +use critical_section as cs; + +/// A sorted linked list for the timer queue. +pub struct LinkedList { + head: AtomicPtr>, +} + +impl LinkedList { + /// Create a new linked list. + pub const fn new() -> Self { + Self { + head: AtomicPtr::new(core::ptr::null_mut()), + } + } +} + +impl LinkedList { + /// Pop the first element in the queue if the closure returns true. + pub fn pop_if bool>(&self, f: F) -> Option { + cs::with(|_| { + // Make sure all previous writes are visible + core::sync::atomic::fence(Ordering::SeqCst); + + let head = self.head.load(Ordering::Relaxed); + + // SAFETY: `as_ref` is safe as `insert` requires a valid reference to a link + if let Some(head) = unsafe { head.as_ref() } { + if f(&head.val) { + // Move head to the next element + self.head + .store(head.next.load(Ordering::Relaxed), Ordering::Relaxed); + + // We read the value at head + let head_val = head.val.clone(); + + return Some(head_val); + } + } + None + }) + } + + /// Delete a link at an address. + pub fn delete(&self, addr: usize) { + cs::with(|_| { + // Make sure all previous writes are visible + core::sync::atomic::fence(Ordering::SeqCst); + + let head = self.head.load(Ordering::Relaxed); + + // SAFETY: `as_ref` is safe as `insert` requires a valid reference to a link + let head_ref = if let Some(head_ref) = unsafe { head.as_ref() } { + head_ref + } else { + // 1. List is empty, do nothing + return; + }; + + if head as *const _ as usize == addr { + // 2. Replace head with head.next + self.head + .store(head_ref.next.load(Ordering::Relaxed), Ordering::Relaxed); + + return; + } + + // 3. search list for correct node + let mut curr = head_ref; + let mut next = head_ref.next.load(Ordering::Relaxed); + + // SAFETY: `as_ref` is safe as `insert` requires a valid reference to a link + while let Some(next_link) = unsafe { next.as_ref() } { + // Next is not null + + if next as *const _ as usize == addr { + curr.next + .store(next_link.next.load(Ordering::Relaxed), Ordering::Relaxed); + + return; + } + + // Continue searching + curr = next_link; + next = next_link.next.load(Ordering::Relaxed); + } + }) + } + + /// Insert a new link into the linked list. + /// The return is (was_empty, address), where the address of the link is for use with `delete`. + pub fn insert(&self, val: &mut Link) -> (bool, usize) { + cs::with(|_| { + let addr = val as *const _ as usize; + + // Make sure all previous writes are visible + core::sync::atomic::fence(Ordering::SeqCst); + + let head = self.head.load(Ordering::Relaxed); + + // 3 cases to handle + + // 1. List is empty, write to head + // SAFETY: `as_ref` is safe as `insert` requires a valid reference to a link + let head_ref = if let Some(head_ref) = unsafe { head.as_ref() } { + head_ref + } else { + self.head.store(val, Ordering::Relaxed); + return (true, addr); + }; + + // 2. val needs to go in first + if val.val < head_ref.val { + // Set current head as next of `val` + val.next.store(head, Ordering::Relaxed); + + // `val` is now first in the queue + self.head.store(val, Ordering::Relaxed); + + return (false, addr); + } + + // 3. search list for correct place + let mut curr = head_ref; + let mut next = head_ref.next.load(Ordering::Relaxed); + + // SAFETY: `as_ref` is safe as `insert` requires a valid reference to a link + while let Some(next_link) = unsafe { next.as_ref() } { + // Next is not null + + if val.val < next_link.val { + // Replace next with `val` + val.next.store(next, Ordering::Relaxed); + + // Insert `val` + curr.next.store(val, Ordering::Relaxed); + + return (false, addr); + } + + // Continue searching + curr = next_link; + next = next_link.next.load(Ordering::Relaxed); + } + + // No next, write link to last position in list + curr.next.store(val, Ordering::Relaxed); + + (false, addr) + }) + } +} + +/// A link in the linked list. +pub struct Link { + val: T, + next: AtomicPtr>, + _up: PhantomPinned, +} + +impl Link { + /// Create a new link. + pub const fn new(val: T) -> Self { + Self { + val, + next: AtomicPtr::new(core::ptr::null_mut()), + _up: PhantomPinned, + } + } +} diff --git a/rtic-time/src/monotonic.rs b/rtic-time/src/monotonic.rs new file mode 100644 index 0000000..9b3742f --- /dev/null +++ b/rtic-time/src/monotonic.rs @@ -0,0 +1,60 @@ +//! ... + +/// # A monotonic clock / counter definition. +/// +/// ## Correctness +/// +/// The trait enforces that proper time-math is implemented between `Instant` and `Duration`. This +/// is a requirement on the time library that the user chooses to use. +pub trait Monotonic { + /// The time at time zero. + const ZERO: Self::Instant; + + /// The type for instant, defining an instant in time. + /// + /// **Note:** In all APIs in RTIC that use instants from this monotonic, this type will be used. + type Instant: Ord + + Copy + + core::ops::Add + + core::ops::Sub + + core::ops::Sub; + + /// The type for duration, defining an duration of time. + /// + /// **Note:** In all APIs in RTIC that use duration from this monotonic, this type will be used. + type Duration; + + /// Get the current time. + fn now() -> Self::Instant; + + /// Set the compare value of the timer interrupt. + /// + /// **Note:** This method does not need to handle race conditions of the monotonic, the timer + /// queue in RTIC checks this. + fn set_compare(instant: Self::Instant); + + /// Clear the compare interrupt flag. + fn clear_compare_flag(); + + /// Pend the timer's interrupt. + fn pend_interrupt(); + + /// Optional. Runs on interrupt before any timer queue handling. + fn on_interrupt() {} + + /// Optional. This is used to save power, this is called when the timer queue is not empty. + /// + /// Enabling and disabling the monotonic needs to propagate to `now` so that an instant + /// based of `now()` is still valid. + /// + /// NOTE: This may be called more than once. + fn enable_timer() {} + + /// Optional. This is used to save power, this is called when the timer queue is empty. + /// + /// Enabling and disabling the monotonic needs to propagate to `now` so that an instant + /// based of `now()` is still valid. + /// + /// NOTE: This may be called more than once. + fn disable_timer() {} +} -- cgit v1.2.3 From 143cd136eeeb2856d06a1b83e3ef5682f720c251 Mon Sep 17 00:00:00 2001 From: Emil Fresk Date: Tue, 24 Jan 2023 11:55:48 +0100 Subject: Optimize linked list popping so delete is not run everytime --- rtic-time/src/lib.rs | 131 +++++++++---------------------------------- rtic-time/src/linked_list.rs | 4 +- 2 files changed, 28 insertions(+), 107 deletions(-) (limited to 'rtic-time') diff --git a/rtic-time/src/lib.rs b/rtic-time/src/lib.rs index d7faa07..850885b 100644 --- a/rtic-time/src/lib.rs +++ b/rtic-time/src/lib.rs @@ -6,9 +6,8 @@ #![allow(incomplete_features)] #![feature(async_fn_in_trait)] -pub mod monotonic; - use core::future::{poll_fn, Future}; +use core::mem::MaybeUninit; use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use core::task::{Poll, Waker}; use futures_util::{ @@ -18,20 +17,25 @@ use futures_util::{ pub use monotonic::Monotonic; mod linked_list; +mod monotonic; use linked_list::{Link, LinkedList}; /// Holds a waker and at which time instant this waker shall be awoken. struct WaitingWaker { - waker: Waker, + // This is alway initialized when used, we create this struct on the async stack and then + // initialize the waker field in the `poll_fn` closure (we then know the waker) + waker: MaybeUninit, release_at: Mono::Instant, + was_poped: AtomicBool, } impl Clone for WaitingWaker { fn clone(&self) -> Self { Self { - waker: self.waker.clone(), + waker: MaybeUninit::new(unsafe { self.waker.assume_init_ref() }.clone()), release_at: self.release_at, + was_poped: AtomicBool::new(self.was_poped.load(Ordering::Relaxed)), } } } @@ -109,12 +113,15 @@ impl TimerQueue { let head = self.queue.pop_if(|head| { release_at = Some(head.release_at); - Mono::now() >= head.release_at + let should_pop = Mono::now() >= head.release_at; + head.was_poped.store(should_pop, Ordering::Relaxed); + + should_pop }); match (head, release_at) { (Some(link), _) => { - link.waker.wake(); + link.waker.assume_init().wake(); } (None, Some(instant)) => { Mono::enable_timer(); @@ -181,26 +188,28 @@ impl TimerQueue { ); } - let mut first_run = true; - let queue = &self.queue; let mut link = Link::new(WaitingWaker { - waker: poll_fn(|cx| Poll::Ready(cx.waker().clone())).await, + waker: MaybeUninit::uninit(), release_at: instant, + was_poped: AtomicBool::new(false), }); + let mut first_run = true; + let queue = &self.queue; let marker = &AtomicUsize::new(0); let dropper = OnDrop::new(|| { queue.delete(marker.load(Ordering::Relaxed)); }); - poll_fn(|_| { + poll_fn(|cx| { if Mono::now() >= instant { return Poll::Ready(()); } if first_run { first_run = false; + link.val.waker.write(cx.waker().clone()); let (was_empty, addr) = queue.insert(&mut link); marker.store(addr, Ordering::Relaxed); @@ -214,8 +223,13 @@ impl TimerQueue { }) .await; - // Make sure that our link is deleted from the list before we drop this stack - drop(dropper); + if link.val.was_poped.load(Ordering::Relaxed) { + // If it was poped from the queue there is no need to run delete + dropper.defuse(); + } else { + // Make sure that our link is deleted from the list before we drop this stack + drop(dropper); + } } } @@ -241,96 +255,3 @@ impl Drop for OnDrop { unsafe { self.f.as_ptr().read()() } } } - -// -------- Test program --------- -// -// -// use systick_monotonic::{Systick, TimerQueue}; -// -// // same panicking *behavior* as `panic-probe` but doesn't print a panic message -// // this prevents the panic message being printed *twice* when `defmt::panic` is invoked -// #[defmt::panic_handler] -// fn panic() -> ! { -// cortex_m::asm::udf() -// } -// -// /// Terminates the application and makes `probe-run` exit with exit-code = 0 -// pub fn exit() -> ! { -// loop { -// cortex_m::asm::bkpt(); -// } -// } -// -// defmt::timestamp!("{=u64:us}", { -// let time_us: fugit::MicrosDurationU32 = MONO.now().duration_since_epoch().convert(); -// -// time_us.ticks() as u64 -// }); -// -// make_systick_timer_queue!(MONO, Systick<1_000>); -// -// #[rtic::app( -// device = nrf52832_hal::pac, -// dispatchers = [SWI0_EGU0, SWI1_EGU1, SWI2_EGU2, SWI3_EGU3, SWI4_EGU4, SWI5_EGU5], -// )] -// mod app { -// use super::{Systick, MONO}; -// use fugit::ExtU32; -// -// #[shared] -// struct Shared {} -// -// #[local] -// struct Local {} -// -// #[init] -// fn init(cx: init::Context) -> (Shared, Local) { -// defmt::println!("init"); -// -// let systick = Systick::start(cx.core.SYST, 64_000_000); -// -// defmt::println!("initializing monotonic"); -// -// MONO.initialize(systick); -// -// async_task::spawn().ok(); -// async_task2::spawn().ok(); -// async_task3::spawn().ok(); -// -// (Shared {}, Local {}) -// } -// -// #[idle] -// fn idle(_: idle::Context) -> ! { -// defmt::println!("idle"); -// -// loop { -// core::hint::spin_loop(); -// } -// } -// -// #[task] -// async fn async_task(_: async_task::Context) { -// loop { -// defmt::println!("async task waiting for 1 second"); -// MONO.delay(1.secs()).await; -// } -// } -// -// #[task] -// async fn async_task2(_: async_task2::Context) { -// loop { -// defmt::println!(" async task 2 waiting for 0.5 second"); -// MONO.delay(500.millis()).await; -// } -// } -// -// #[task] -// async fn async_task3(_: async_task3::Context) { -// loop { -// defmt::println!(" async task 3 waiting for 0.2 second"); -// MONO.delay(200.millis()).await; -// } -// } -// } -// diff --git a/rtic-time/src/linked_list.rs b/rtic-time/src/linked_list.rs index 42ff8cb..52a955b 100644 --- a/rtic-time/src/linked_list.rs +++ b/rtic-time/src/linked_list.rs @@ -5,7 +5,7 @@ use core::sync::atomic::{AtomicPtr, Ordering}; use critical_section as cs; /// A sorted linked list for the timer queue. -pub struct LinkedList { +pub(crate) struct LinkedList { head: AtomicPtr>, } @@ -156,7 +156,7 @@ impl LinkedList { /// A link in the linked list. pub struct Link { - val: T, + pub(crate) val: T, next: AtomicPtr>, _up: PhantomPinned, } -- cgit v1.2.3 From 2e96229c912076829d4c2be83a8b5ce27d81ebaa Mon Sep 17 00:00:00 2001 From: Emil Fresk Date: Wed, 25 Jan 2023 15:24:32 +0100 Subject: Remove unnecessary MaybeUninit --- rtic-time/src/lib.rs | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) (limited to 'rtic-time') diff --git a/rtic-time/src/lib.rs b/rtic-time/src/lib.rs index 850885b..34f9362 100644 --- a/rtic-time/src/lib.rs +++ b/rtic-time/src/lib.rs @@ -7,7 +7,6 @@ #![feature(async_fn_in_trait)] use core::future::{poll_fn, Future}; -use core::mem::MaybeUninit; use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use core::task::{Poll, Waker}; use futures_util::{ @@ -23,9 +22,7 @@ use linked_list::{Link, LinkedList}; /// Holds a waker and at which time instant this waker shall be awoken. struct WaitingWaker { - // This is alway initialized when used, we create this struct on the async stack and then - // initialize the waker field in the `poll_fn` closure (we then know the waker) - waker: MaybeUninit, + waker: Waker, release_at: Mono::Instant, was_poped: AtomicBool, } @@ -33,7 +30,7 @@ struct WaitingWaker { impl Clone for WaitingWaker { fn clone(&self) -> Self { Self { - waker: MaybeUninit::new(unsafe { self.waker.assume_init_ref() }.clone()), + waker: self.waker.clone(), release_at: self.release_at, was_poped: AtomicBool::new(self.was_poped.load(Ordering::Relaxed)), } @@ -121,7 +118,7 @@ impl TimerQueue { match (head, release_at) { (Some(link), _) => { - link.waker.assume_init().wake(); + link.waker.wake(); } (None, Some(instant)) => { Mono::enable_timer(); @@ -188,13 +185,8 @@ impl TimerQueue { ); } - let mut link = Link::new(WaitingWaker { - waker: MaybeUninit::uninit(), - release_at: instant, - was_poped: AtomicBool::new(false), - }); + let mut link = None; - let mut first_run = true; let queue = &self.queue; let marker = &AtomicUsize::new(0); @@ -207,10 +199,14 @@ impl TimerQueue { return Poll::Ready(()); } - if first_run { - first_run = false; - link.val.waker.write(cx.waker().clone()); - let (was_empty, addr) = queue.insert(&mut link); + if link.is_none() { + let mut link_ref = link.insert(Link::new(WaitingWaker { + waker: cx.waker().clone(), + release_at: instant, + was_poped: AtomicBool::new(false), + })); + + let (was_empty, addr) = queue.insert(&mut link_ref); marker.store(addr, Ordering::Relaxed); if was_empty { @@ -223,9 +219,11 @@ impl TimerQueue { }) .await; - if link.val.was_poped.load(Ordering::Relaxed) { - // If it was poped from the queue there is no need to run delete - dropper.defuse(); + if let Some(link) = link { + if link.val.was_poped.load(Ordering::Relaxed) { + // If it was poped from the queue there is no need to run delete + dropper.defuse(); + } } else { // Make sure that our link is deleted from the list before we drop this stack drop(dropper); -- cgit v1.2.3 From 51d4eccc726d4dc7950aac6f8d74a34ddf669f67 Mon Sep 17 00:00:00 2001 From: Emil Fresk Date: Thu, 26 Jan 2023 21:29:52 +0100 Subject: Fixes in MPSC linked list and dropper handling --- rtic-channel/Cargo.toml | 15 ++ rtic-channel/src/lib.rs | 380 +++++++++++++++++++++++++++++++++++++++++ rtic-channel/src/wait_queue.rs | 278 ++++++++++++++++++++++++++++++ rtic-time/src/lib.rs | 1 - 4 files changed, 673 insertions(+), 1 deletion(-) create mode 100644 rtic-channel/Cargo.toml create mode 100644 rtic-channel/src/lib.rs create mode 100644 rtic-channel/src/wait_queue.rs (limited to 'rtic-time') diff --git a/rtic-channel/Cargo.toml b/rtic-channel/Cargo.toml new file mode 100644 index 0000000..8962352 --- /dev/null +++ b/rtic-channel/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "rtic-channel" +version = "1.0.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +heapless = "0.7" +critical-section = "1" + + +[features] +default = [] +testing = ["critical-section/std"] diff --git a/rtic-channel/src/lib.rs b/rtic-channel/src/lib.rs new file mode 100644 index 0000000..a7098ee --- /dev/null +++ b/rtic-channel/src/lib.rs @@ -0,0 +1,380 @@ +//! Crate + +#![no_std] +#![deny(missing_docs)] + +use core::{ + cell::UnsafeCell, + future::poll_fn, + mem::MaybeUninit, + ptr, + task::{Poll, Waker}, +}; +use heapless::Deque; +use wait_queue::WaitQueue; +use waker_registration::CriticalSectionWakerRegistration as WakerRegistration; + +mod wait_queue; +mod waker_registration; + +/// An MPSC channel for use in no-alloc systems. `N` sets the size of the queue. +/// +/// This channel uses critical sections, however there are extremely small and all `memcpy` +/// operations of `T` are done without critical sections. +pub struct Channel { + // Here are all indexes that are not used in `slots` and ready to be allocated. + freeq: UnsafeCell>, + // Here are wakers and indexes to slots that are ready to be dequeued by the receiver. + readyq: UnsafeCell>, + // Waker for the receiver. + receiver_waker: WakerRegistration, + // Storage for N `T`s, so we don't memcpy around a lot of `T`s. + slots: [UnsafeCell>; N], + // If there is no room in the queue a `Sender`s can wait for there to be place in the queue. + wait_queue: WaitQueue, + // Keep track of the receiver. + receiver_dropped: UnsafeCell, + // Keep track of the number of senders. + num_senders: UnsafeCell, +} + +struct UnsafeAccess<'a, const N: usize> { + freeq: &'a mut Deque, + readyq: &'a mut Deque, + receiver_dropped: &'a mut bool, + num_senders: &'a mut usize, +} + +impl Channel { + const _CHECK: () = assert!(N < 256, "This queue support a maximum of 255 entries"); + + const INIT_SLOTS: UnsafeCell> = UnsafeCell::new(MaybeUninit::uninit()); + + /// Create a new channel. + pub const fn new() -> Self { + Self { + freeq: UnsafeCell::new(Deque::new()), + readyq: UnsafeCell::new(Deque::new()), + receiver_waker: WakerRegistration::new(), + slots: [Self::INIT_SLOTS; N], + wait_queue: WaitQueue::new(), + receiver_dropped: UnsafeCell::new(false), + num_senders: UnsafeCell::new(0), + } + } + + /// Split the queue into a `Sender`/`Receiver` pair. + pub fn split<'a>(&'a mut self) -> (Sender<'a, T, N>, Receiver<'a, T, N>) { + // Fill free queue + for idx in 0..(N - 1) as u8 { + debug_assert!(!self.freeq.get_mut().is_full()); + + // SAFETY: This safe as the loop goes from 0 to the capacity of the underlying queue. + unsafe { + self.freeq.get_mut().push_back_unchecked(idx); + } + } + + debug_assert!(self.freeq.get_mut().is_full()); + + // There is now 1 sender + *self.num_senders.get_mut() = 1; + + (Sender(self), Receiver(self)) + } + + fn access<'a>(&'a self, _cs: critical_section::CriticalSection) -> UnsafeAccess<'a, N> { + // SAFETY: This is safe as are in a critical section. + unsafe { + UnsafeAccess { + freeq: &mut *self.freeq.get(), + readyq: &mut *self.readyq.get(), + receiver_dropped: &mut *self.receiver_dropped.get(), + num_senders: &mut *self.num_senders.get(), + } + } + } +} + +/// Creates a split channel with `'static` lifetime. +#[macro_export] +macro_rules! make_channel { + ($type:path, $size:expr) => {{ + static mut CHANNEL: Channel<$type, $size> = Channel::new(); + + // SAFETY: This is safe as we hide the static mut from others to access it. + // Only this point is where the mutable access happens. + unsafe { CHANNEL.split() } + }}; +} + +// -------- Sender + +/// Error state for when the receiver has been dropped. +pub struct NoReceiver(pub T); + +/// A `Sender` can send to the channel and can be cloned. +pub struct Sender<'a, T, const N: usize>(&'a Channel); + +unsafe impl<'a, T, const N: usize> Send for Sender<'a, T, N> {} + +impl<'a, T, const N: usize> Sender<'a, T, N> { + #[inline(always)] + fn send_footer(&mut self, idx: u8, val: T) { + // Write the value to the slots, note; this memcpy is not under a critical section. + unsafe { + ptr::write( + self.0.slots.get_unchecked(idx as usize).get() as *mut T, + val, + ) + } + + // Write the value into the ready queue. + critical_section::with(|cs| unsafe { self.0.access(cs).readyq.push_back_unchecked(idx) }); + + // If there is a receiver waker, wake it. + self.0.receiver_waker.wake(); + } + + /// Try to send a value, non-blocking. If the channel is full this will return an error. + /// Note; this does not check if the channel is closed. + pub fn try_send(&mut self, val: T) -> Result<(), T> { + // If the wait queue is not empty, we can't try to push into the queue. + if !self.0.wait_queue.is_empty() { + return Err(val); + } + + let idx = + if let Some(idx) = critical_section::with(|cs| self.0.access(cs).freeq.pop_front()) { + idx + } else { + return Err(val); + }; + + self.send_footer(idx, val); + + Ok(()) + } + + /// Send a value. If there is no place left in the queue this will wait until there is. + /// If the receiver does not exist this will return an error. + pub async fn send(&mut self, val: T) -> Result<(), NoReceiver> { + if self.is_closed() {} + + let mut __hidden_link: Option> = None; + + // Make this future `Drop`-safe + let link_ptr = &mut __hidden_link as *mut Option>; + let dropper = OnDrop::new(|| { + // SAFETY: We only run this closure and dereference the pointer if we have + // exited the `poll_fn` below in the `drop(dropper)` call. The other dereference + // of this pointer is in the `poll_fn`. + if let Some(link) = unsafe { &mut *link_ptr } { + link.remove_from_list(&self.0.wait_queue); + } + }); + + let idx = poll_fn(|cx| { + if self.is_closed() { + return Poll::Ready(Err(())); + } + + // Do all this in one critical section, else there can be race conditions + let queue_idx = critical_section::with(|cs| { + if !self.0.wait_queue.is_empty() || self.0.access(cs).freeq.is_empty() { + // SAFETY: This pointer is only dereferenced here and on drop of the future. + let link = unsafe { &mut *link_ptr }; + if link.is_none() { + // Place the link in the wait queue on first run. + let link_ref = link.insert(wait_queue::Link::new(cx.waker().clone())); + self.0.wait_queue.push(link_ref); + } + + return None; + } + + // Get index as the queue is guaranteed not empty and the wait queue is empty + let idx = unsafe { self.0.access(cs).freeq.pop_front_unchecked() }; + + Some(idx) + }); + + if let Some(idx) = queue_idx { + // Return the index + Poll::Ready(Ok(idx)) + } else { + return Poll::Pending; + } + }) + .await; + + // Make sure the link is removed from the queue. + drop(dropper); + + if let Ok(idx) = idx { + self.send_footer(idx, val); + + Ok(()) + } else { + Err(NoReceiver(val)) + } + } + + /// Returns true if there is no `Receiver`s. + pub fn is_closed(&self) -> bool { + critical_section::with(|cs| *self.0.access(cs).receiver_dropped) + } + + /// Is the queue full. + pub fn is_full(&self) -> bool { + critical_section::with(|cs| self.0.access(cs).freeq.is_empty()) + } + + /// Is the queue empty. + pub fn is_empty(&self) -> bool { + critical_section::with(|cs| self.0.access(cs).freeq.is_full()) + } +} + +impl<'a, T, const N: usize> Drop for Sender<'a, T, N> { + fn drop(&mut self) { + // Count down the reference counter + let num_senders = critical_section::with(|cs| { + *self.0.access(cs).num_senders -= 1; + + *self.0.access(cs).num_senders + }); + + // If there are no senders, wake the receiver to do error handling. + if num_senders == 0 { + self.0.receiver_waker.wake(); + } + } +} + +impl<'a, T, const N: usize> Clone for Sender<'a, T, N> { + fn clone(&self) -> Self { + // Count up the reference counter + critical_section::with(|cs| *self.0.access(cs).num_senders += 1); + + Self(self.0) + } +} + +// -------- Receiver + +/// A receiver of the channel. There can only be one receiver at any time. +pub struct Receiver<'a, T, const N: usize>(&'a Channel); + +/// Error state for when all senders has been dropped. +pub struct NoSender; + +impl<'a, T, const N: usize> Receiver<'a, T, N> { + /// Receives a value if there is one in the channel, non-blocking. + /// Note; this does not check if the channel is closed. + pub fn try_recv(&mut self) -> Option { + // Try to get a ready slot. + let ready_slot = + critical_section::with(|cs| self.0.access(cs).readyq.pop_front().map(|rs| rs)); + + if let Some(rs) = ready_slot { + // Read the value from the slots, note; this memcpy is not under a critical section. + let r = unsafe { ptr::read(self.0.slots.get_unchecked(rs as usize).get() as *const T) }; + + // Return the index to the free queue after we've read the value. + critical_section::with(|cs| unsafe { self.0.access(cs).freeq.push_back_unchecked(rs) }); + + // If someone is waiting in the WaiterQueue, wake the first one up. + if let Some(wait_head) = self.0.wait_queue.pop() { + wait_head.wake(); + } + + Some(r) + } else { + None + } + } + + /// Receives a value, waiting if the queue is empty. + /// If all senders are dropped this will error with `NoSender`. + pub async fn recv(&mut self) -> Result { + // There was nothing in the queue, setup the waiting. + poll_fn(|cx| { + // Register waker. + // TODO: Should it happen here or after the if? This might cause a spurious wake. + self.0.receiver_waker.register(cx.waker()); + + // Try to dequeue. + if let Some(val) = self.try_recv() { + return Poll::Ready(Ok(val)); + } + + // If the queue is empty and there is no sender, return the error. + if self.is_closed() { + return Poll::Ready(Err(NoSender)); + } + + Poll::Pending + }) + .await + } + + /// Returns true if there are no `Sender`s. + pub fn is_closed(&self) -> bool { + critical_section::with(|cs| *self.0.access(cs).num_senders == 0) + } + + /// Is the queue full. + pub fn is_full(&self) -> bool { + critical_section::with(|cs| self.0.access(cs).readyq.is_empty()) + } + + /// Is the queue empty. + pub fn is_empty(&self) -> bool { + critical_section::with(|cs| self.0.access(cs).readyq.is_empty()) + } +} + +impl<'a, T, const N: usize> Drop for Receiver<'a, T, N> { + fn drop(&mut self) { + // Mark the receiver as dropped and wake all waiters + critical_section::with(|cs| *self.0.access(cs).receiver_dropped = true); + + while let Some(waker) = self.0.wait_queue.pop() { + waker.wake(); + } + } +} + +struct OnDrop { + f: core::mem::MaybeUninit, +} + +impl OnDrop { + pub fn new(f: F) -> Self { + Self { + f: core::mem::MaybeUninit::new(f), + } + } + + #[allow(unused)] + pub fn defuse(self) { + core::mem::forget(self) + } +} + +impl Drop for OnDrop { + fn drop(&mut self) { + unsafe { self.f.as_ptr().read()() } + } +} + +#[cfg(test)] +#[macro_use] +extern crate std; + +#[cfg(test)] +mod tests { + #[test] + fn channel() {} +} diff --git a/rtic-channel/src/wait_queue.rs b/rtic-channel/src/wait_queue.rs new file mode 100644 index 0000000..90d762b --- /dev/null +++ b/rtic-channel/src/wait_queue.rs @@ -0,0 +1,278 @@ +//! ... + +use core::cell::UnsafeCell; +use core::marker::PhantomPinned; +use core::ptr::null_mut; +use core::sync::atomic::{AtomicPtr, Ordering}; +use core::task::Waker; +use critical_section as cs; + +pub type WaitQueue = LinkedList; + +struct MyLinkPtr(UnsafeCell<*mut Link>); + +impl MyLinkPtr { + #[inline(always)] + fn new(val: *mut Link) -> Self { + Self(UnsafeCell::new(val)) + } + + /// SAFETY: Only use this in a critical section, and don't forget them barriers. + #[inline(always)] + unsafe fn load_relaxed(&self) -> *mut Link { + unsafe { *self.0.get() } + } + + /// SAFETY: Only use this in a critical section, and don't forget them barriers. + #[inline(always)] + unsafe fn store_relaxed(&self, val: *mut Link) { + unsafe { self.0.get().write(val) } + } +} + +/// A FIFO linked list for a wait queue. +pub struct LinkedList { + head: AtomicPtr>, // UnsafeCell<*mut Link> + tail: AtomicPtr>, +} + +impl LinkedList { + /// Create a new linked list. + pub const fn new() -> Self { + Self { + head: AtomicPtr::new(null_mut()), + tail: AtomicPtr::new(null_mut()), + } + } +} + +impl LinkedList { + const R: Ordering = Ordering::Relaxed; + + /// Pop the first element in the queue. + pub fn pop(&self) -> Option { + cs::with(|_| { + // Make sure all previous writes are visible + core::sync::atomic::fence(Ordering::SeqCst); + + let head = self.head.load(Self::R); + + // SAFETY: `as_ref` is safe as `insert` requires a valid reference to a link + if let Some(head_ref) = unsafe { head.as_ref() } { + // Move head to the next element + self.head.store(head_ref.next.load(Self::R), Self::R); + + // We read the value at head + let head_val = head_ref.val.clone(); + + let tail = self.tail.load(Self::R); + if head == tail { + // The queue is empty + self.tail.store(null_mut(), Self::R); + } + + if let Some(next_ref) = unsafe { head_ref.next.load(Self::R).as_ref() } { + next_ref.prev.store(null_mut(), Self::R); + } + + // Clear the pointers in the node. + head_ref.next.store(null_mut(), Self::R); + head_ref.prev.store(null_mut(), Self::R); + + return Some(head_val); + } + + None + }) + } + + /// Put an element at the back of the queue. + pub fn push(&self, link: &mut Link) { + cs::with(|_| { + // Make sure all previous writes are visible + core::sync::atomic::fence(Ordering::SeqCst); + + let tail = self.tail.load(Self::R); + + if let Some(tail_ref) = unsafe { tail.as_ref() } { + // Queue is not empty + link.prev.store(tail, Self::R); + self.tail.store(link, Self::R); + tail_ref.next.store(link, Self::R); + } else { + // Queue is empty + self.tail.store(link, Self::R); + self.head.store(link, Self::R); + } + }); + } + + /// Check if the queue is empty. + pub fn is_empty(&self) -> bool { + self.head.load(Self::R).is_null() + } +} + +/// A link in the linked list. +pub struct Link { + pub(crate) val: T, + next: AtomicPtr>, + prev: AtomicPtr>, + _up: PhantomPinned, +} + +impl Link { + const R: Ordering = Ordering::Relaxed; + + /// Create a new link. + pub const fn new(val: T) -> Self { + Self { + val, + next: AtomicPtr::new(null_mut()), + prev: AtomicPtr::new(null_mut()), + _up: PhantomPinned, + } + } + + pub fn remove_from_list(&mut self, list: &LinkedList) { + cs::with(|_| { + // Make sure all previous writes are visible + core::sync::atomic::fence(Ordering::SeqCst); + + let prev = self.prev.load(Self::R); + let next = self.next.load(Self::R); + + match unsafe { (prev.as_ref(), next.as_ref()) } { + (None, None) => { + // Not in the list or alone in the list, check if list head == node address + let sp = self as *const _; + + if sp == list.head.load(Ordering::Relaxed) { + list.head.store(null_mut(), Self::R); + list.tail.store(null_mut(), Self::R); + } + } + (None, Some(next_ref)) => { + // First in the list + next_ref.prev.store(null_mut(), Self::R); + list.head.store(next, Self::R); + } + (Some(prev_ref), None) => { + // Last in the list + prev_ref.next.store(null_mut(), Self::R); + list.tail.store(prev, Self::R); + } + (Some(prev_ref), Some(next_ref)) => { + // Somewhere in the list + + // Connect the `prev.next` and `next.prev` with each other to remove the node + prev_ref.next.store(next, Self::R); + next_ref.prev.store(prev, Self::R); + } + } + }) + } +} + +#[cfg(test)] +impl LinkedList { + fn print(&self) { + cs::with(|_| { + // Make sure all previous writes are visible + core::sync::atomic::fence(Ordering::SeqCst); + + let mut head = self.head.load(Self::R); + let tail = self.tail.load(Self::R); + + println!( + "List - h = 0x{:x}, t = 0x{:x}", + head as usize, tail as usize + ); + + let mut i = 0; + + // SAFETY: `as_ref` is safe as `insert` requires a valid reference to a link + while let Some(head_ref) = unsafe { head.as_ref() } { + println!( + " {}: {:?}, s = 0x{:x}, n = 0x{:x}, p = 0x{:x}", + i, + head_ref.val, + head as usize, + head_ref.next.load(Ordering::Relaxed) as usize, + head_ref.prev.load(Ordering::Relaxed) as usize + ); + + head = head_ref.next.load(Self::R); + + i += 1; + } + }); + } +} + +#[cfg(test)] +impl Link { + fn print(&self) { + cs::with(|_| { + // Make sure all previous writes are visible + core::sync::atomic::fence(Ordering::SeqCst); + + println!("Link:"); + + println!( + " val = {:?}, n = 0x{:x}, p = 0x{:x}", + self.val, + self.next.load(Ordering::Relaxed) as usize, + self.prev.load(Ordering::Relaxed) as usize + ); + }); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn linked_list() { + let mut wq = LinkedList::::new(); + + let mut i1 = Link::new(10); + let mut i2 = Link::new(11); + let mut i3 = Link::new(12); + let mut i4 = Link::new(13); + let mut i5 = Link::new(14); + + wq.push(&mut i1); + wq.push(&mut i2); + wq.push(&mut i3); + wq.push(&mut i4); + wq.push(&mut i5); + + wq.print(); + + wq.pop(); + i1.print(); + + wq.print(); + + i4.remove_from_list(&wq); + + wq.print(); + + // i1.remove_from_list(&wq); + // wq.print(); + + println!("i2"); + i2.remove_from_list(&wq); + wq.print(); + + println!("i3"); + i3.remove_from_list(&wq); + wq.print(); + + println!("i5"); + i5.remove_from_list(&wq); + wq.print(); + } +} diff --git a/rtic-time/src/lib.rs b/rtic-time/src/lib.rs index 34f9362..78ece1d 100644 --- a/rtic-time/src/lib.rs +++ b/rtic-time/src/lib.rs @@ -1,7 +1,6 @@ //! Crate #![no_std] -#![no_main] #![deny(missing_docs)] #![allow(incomplete_features)] #![feature(async_fn_in_trait)] -- cgit v1.2.3 From 1baa4a4228cae4576e194174618bf35f5c206959 Mon Sep 17 00:00:00 2001 From: Henrik Tjäder Date: Fri, 27 Jan 2023 13:18:29 +0100 Subject: CI: Don't let warnings get away --- rtic-channel/src/lib.rs | 1 + rtic-monotonics/src/lib.rs | 1 + rtic-time/src/lib.rs | 1 + 3 files changed, 3 insertions(+) (limited to 'rtic-time') diff --git a/rtic-channel/src/lib.rs b/rtic-channel/src/lib.rs index a7098ee..166015f 100644 --- a/rtic-channel/src/lib.rs +++ b/rtic-channel/src/lib.rs @@ -2,6 +2,7 @@ #![no_std] #![deny(missing_docs)] +//deny_warnings_placeholder_for_ci use core::{ cell::UnsafeCell, diff --git a/rtic-monotonics/src/lib.rs b/rtic-monotonics/src/lib.rs index ce30c36..1e23088 100644 --- a/rtic-monotonics/src/lib.rs +++ b/rtic-monotonics/src/lib.rs @@ -3,6 +3,7 @@ #![no_std] #![no_main] #![deny(missing_docs)] +//deny_warnings_placeholder_for_ci #![allow(incomplete_features)] #![feature(async_fn_in_trait)] diff --git a/rtic-time/src/lib.rs b/rtic-time/src/lib.rs index 78ece1d..eeecd86 100644 --- a/rtic-time/src/lib.rs +++ b/rtic-time/src/lib.rs @@ -2,6 +2,7 @@ #![no_std] #![deny(missing_docs)] +//deny_warnings_placeholder_for_ci #![allow(incomplete_features)] #![feature(async_fn_in_trait)] -- cgit v1.2.3 From 9c6e2c1c99448304fb54354297c284c1a8810cd6 Mon Sep 17 00:00:00 2001 From: Henrik Tjäder Date: Sat, 28 Jan 2023 11:15:59 +0100 Subject: Add changelog templates --- rtic-channel/CHANGELOG.md | 16 ++++++++++++++++ rtic-monotonics/CHANGELOG.md | 16 ++++++++++++++++ rtic-time/CHANGELOG.md | 16 ++++++++++++++++ 3 files changed, 48 insertions(+) create mode 100644 rtic-channel/CHANGELOG.md (limited to 'rtic-time') diff --git a/rtic-channel/CHANGELOG.md b/rtic-channel/CHANGELOG.md new file mode 100644 index 0000000..d3a9d84 --- /dev/null +++ b/rtic-channel/CHANGELOG.md @@ -0,0 +1,16 @@ +# Change Log + +All notable changes to this project will be documented in this file. +This project adheres to [Semantic Versioning](http://semver.org/). + +For each category, *Added*, *Changed*, *Fixed* add new entries at the top! + +## [Unreleased] + +### Added + +### Changed + +### Fixed + +## [v1.0.0] - 2023-xx-xx diff --git a/rtic-monotonics/CHANGELOG.md b/rtic-monotonics/CHANGELOG.md index e69de29..e990223 100644 --- a/rtic-monotonics/CHANGELOG.md +++ b/rtic-monotonics/CHANGELOG.md @@ -0,0 +1,16 @@ +# Change Log + +All notable changes to this project will be documented in this file. +This project adheres to [Semantic Versioning](http://semver.org/). + +For each category, *Added*, *Changed*, *Fixed* add new entries at the top! + +## [Unreleased] + +### Added + +### Changed + +### Fixed + +## [v0.1.0] - 2023-xx-xx diff --git a/rtic-time/CHANGELOG.md b/rtic-time/CHANGELOG.md index e69de29..d3a9d84 100644 --- a/rtic-time/CHANGELOG.md +++ b/rtic-time/CHANGELOG.md @@ -0,0 +1,16 @@ +# Change Log + +All notable changes to this project will be documented in this file. +This project adheres to [Semantic Versioning](http://semver.org/). + +For each category, *Added*, *Changed*, *Fixed* add new entries at the top! + +## [Unreleased] + +### Added + +### Changed + +### Fixed + +## [v1.0.0] - 2023-xx-xx -- cgit v1.2.3 From 3050fc0591f087a4fbe08840c69633e89d3f58a7 Mon Sep 17 00:00:00 2001 From: Emil Fresk Date: Sat, 28 Jan 2023 13:21:44 +0100 Subject: Use `Pin` in the linked lists --- rtic-channel/src/lib.rs | 18 +++++++++++++----- rtic-channel/src/wait_queue.rs | 16 ++++++++++------ rtic-time/src/lib.rs | 18 +++++++++++++++--- rtic-time/src/linked_list.rs | 5 ++++- 4 files changed, 42 insertions(+), 15 deletions(-) (limited to 'rtic-time') diff --git a/rtic-channel/src/lib.rs b/rtic-channel/src/lib.rs index 5ee2c71..20086ac 100644 --- a/rtic-channel/src/lib.rs +++ b/rtic-channel/src/lib.rs @@ -8,6 +8,7 @@ use core::{ cell::UnsafeCell, future::poll_fn, mem::MaybeUninit, + pin::Pin, ptr, task::{Poll, Waker}, }; @@ -177,10 +178,11 @@ impl<'a, T, const N: usize> Sender<'a, T, N> { pub async fn send(&mut self, val: T) -> Result<(), NoReceiver> { if self.is_closed() {} - let mut __hidden_link: Option> = None; + let mut link_ptr: Option> = None; + + // Make this future `Drop`-safe, also shadow the original definition so we can't abuse it. + let link_ptr = &mut link_ptr as *mut Option>; - // Make this future `Drop`-safe - let link_ptr = &mut __hidden_link as *mut Option>; let dropper = OnDrop::new(|| { // SAFETY: We only run this closure and dereference the pointer if we have // exited the `poll_fn` below in the `drop(dropper)` call. The other dereference @@ -198,12 +200,18 @@ impl<'a, T, const N: usize> Sender<'a, T, N> { // Do all this in one critical section, else there can be race conditions let queue_idx = critical_section::with(|cs| { if !self.0.wait_queue.is_empty() || self.0.access(cs).freeq.is_empty() { - // SAFETY: This pointer is only dereferenced here and on drop of the future. + // SAFETY: This pointer is only dereferenced here and on drop of the future + // which happens outside this `poll_fn`'s stack frame. let link = unsafe { &mut *link_ptr }; if link.is_none() { // Place the link in the wait queue on first run. let link_ref = link.insert(wait_queue::Link::new(cx.waker().clone())); - self.0.wait_queue.push(link_ref); + + // SAFETY: The address to the link is stable as it is hidden behind + // `link_ptr`, and `link_ptr` shadows the original making it unmovable. + self.0 + .wait_queue + .push(unsafe { Pin::new_unchecked(link_ref) }); } return None; diff --git a/rtic-channel/src/wait_queue.rs b/rtic-channel/src/wait_queue.rs index 5b59983..ba05e6b 100644 --- a/rtic-channel/src/wait_queue.rs +++ b/rtic-channel/src/wait_queue.rs @@ -1,6 +1,7 @@ //! ... use core::marker::PhantomPinned; +use core::pin::Pin; use core::ptr::null_mut; use core::sync::atomic::{AtomicPtr, Ordering}; use core::task::Waker; @@ -65,13 +66,16 @@ impl LinkedList { } /// Put an element at the back of the queue. - pub fn push(&self, link: &mut Link) { + pub fn push(&self, link: Pin<&mut Link>) { cs::with(|_| { // Make sure all previous writes are visible core::sync::atomic::fence(Ordering::SeqCst); let tail = self.tail.load(Self::R); + // SAFETY: This datastructure does not move the underlying value. + let link = unsafe { link.get_unchecked_mut() }; + if let Some(tail_ref) = unsafe { tail.as_ref() } { // Queue is not empty link.prev.store(tail, Self::R); @@ -221,11 +225,11 @@ mod tests { let mut i4 = Link::new(13); let mut i5 = Link::new(14); - wq.push(&mut i1); - wq.push(&mut i2); - wq.push(&mut i3); - wq.push(&mut i4); - wq.push(&mut i5); + wq.push(unsafe { Pin::new_unchecked(&mut i1) }); + wq.push(unsafe { Pin::new_unchecked(&mut i2) }); + wq.push(unsafe { Pin::new_unchecked(&mut i3) }); + wq.push(unsafe { Pin::new_unchecked(&mut i4) }); + wq.push(unsafe { Pin::new_unchecked(&mut i5) }); wq.print(); diff --git a/rtic-time/src/lib.rs b/rtic-time/src/lib.rs index eeecd86..6b23f76 100644 --- a/rtic-time/src/lib.rs +++ b/rtic-time/src/lib.rs @@ -7,6 +7,7 @@ #![feature(async_fn_in_trait)] use core::future::{poll_fn, Future}; +use core::pin::Pin; use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use core::task::{Poll, Waker}; use futures_util::{ @@ -185,7 +186,10 @@ impl TimerQueue { ); } - let mut link = None; + let mut link_ptr: Option>> = None; + + // Make this future `Drop`-safe, also shadow the original definition so we can't abuse it. + let link_ptr = &mut link_ptr as *mut Option>>; let queue = &self.queue; let marker = &AtomicUsize::new(0); @@ -199,6 +203,9 @@ impl TimerQueue { return Poll::Ready(()); } + // SAFETY: This pointer is only dereferenced here and on drop of the future + // which happens outside this `poll_fn`'s stack frame. + let link = unsafe { &mut *link_ptr }; if link.is_none() { let mut link_ref = link.insert(Link::new(WaitingWaker { waker: cx.waker().clone(), @@ -206,7 +213,9 @@ impl TimerQueue { was_poped: AtomicBool::new(false), })); - let (was_empty, addr) = queue.insert(&mut link_ref); + // SAFETY: The address to the link is stable as it is defined outside this stack + // frame. + let (was_empty, addr) = queue.insert(unsafe { Pin::new_unchecked(&mut link_ref) }); marker.store(addr, Ordering::Relaxed); if was_empty { @@ -219,7 +228,10 @@ impl TimerQueue { }) .await; - if let Some(link) = link { + // SAFETY: We only run this and dereference the pointer if we have + // exited the `poll_fn` below in the `drop(dropper)` call. The other dereference + // of this pointer is in the `poll_fn`. + if let Some(link) = unsafe { &mut *link_ptr } { if link.val.was_poped.load(Ordering::Relaxed) { // If it was poped from the queue there is no need to run delete dropper.defuse(); diff --git a/rtic-time/src/linked_list.rs b/rtic-time/src/linked_list.rs index 52a955b..de5ea2a 100644 --- a/rtic-time/src/linked_list.rs +++ b/rtic-time/src/linked_list.rs @@ -1,6 +1,7 @@ //! ... use core::marker::PhantomPinned; +use core::pin::Pin; use core::sync::atomic::{AtomicPtr, Ordering}; use critical_section as cs; @@ -92,8 +93,10 @@ impl LinkedList { /// Insert a new link into the linked list. /// The return is (was_empty, address), where the address of the link is for use with `delete`. - pub fn insert(&self, val: &mut Link) -> (bool, usize) { + pub fn insert(&self, val: Pin<&mut Link>) -> (bool, usize) { cs::with(|_| { + // SAFETY: This datastructure does not move the underlying value. + let val = unsafe { val.get_unchecked_mut() }; let addr = val as *const _ as usize; // Make sure all previous writes are visible -- cgit v1.2.3 From 2bd70baeb9362050196d431f2801551066e27e59 Mon Sep 17 00:00:00 2001 From: Emil Fresk Date: Sat, 28 Jan 2023 21:11:18 +0100 Subject: rtic-time: Make Send happy --- rtic-channel/src/lib.rs | 2 +- rtic-channel/src/wait_queue.rs | 2 -- rtic-time/src/lib.rs | 28 +++++++++++++++++++++++++--- 3 files changed, 26 insertions(+), 6 deletions(-) (limited to 'rtic-time') diff --git a/rtic-channel/src/lib.rs b/rtic-channel/src/lib.rs index eafa25c..acfa801 100644 --- a/rtic-channel/src/lib.rs +++ b/rtic-channel/src/lib.rs @@ -532,7 +532,7 @@ mod tests { #[tokio::test] async fn stress_channel() { - const NUM_RUNS: usize = 1_000000; + const NUM_RUNS: usize = 1_000; const QUEUE_SIZE: usize = 10; let (s, mut r) = make_channel!(u32, QUEUE_SIZE); diff --git a/rtic-channel/src/wait_queue.rs b/rtic-channel/src/wait_queue.rs index e6d5a8b..2de6311 100644 --- a/rtic-channel/src/wait_queue.rs +++ b/rtic-channel/src/wait_queue.rs @@ -105,8 +105,6 @@ pub struct Link { _up: PhantomPinned, } -unsafe impl Send for Link {} - impl Link { const R: Ordering = Ordering::Relaxed; diff --git a/rtic-time/src/lib.rs b/rtic-time/src/lib.rs index 6b23f76..44fdbce 100644 --- a/rtic-time/src/lib.rs +++ b/rtic-time/src/lib.rs @@ -73,6 +73,26 @@ pub struct TimerQueue { /// This indicates that there was a timeout. pub struct TimeoutError; +/// This is needed to make the async closure in `delay_until` accept that we "share" +/// the link possible between threads. +struct LinkPtr(*mut Option>>); + +impl Clone for LinkPtr { + fn clone(&self) -> Self { + LinkPtr(self.0) + } +} + +impl LinkPtr { + /// This will dereference the pointer stored within and give out an `&mut`. + unsafe fn get(&mut self) -> &mut Option>> { + &mut *self.0 + } +} + +unsafe impl Send for LinkPtr {} +unsafe impl Sync for LinkPtr {} + impl TimerQueue { /// Make a new queue. pub const fn new() -> Self { @@ -189,7 +209,9 @@ impl TimerQueue { let mut link_ptr: Option>> = None; // Make this future `Drop`-safe, also shadow the original definition so we can't abuse it. - let link_ptr = &mut link_ptr as *mut Option>>; + let mut link_ptr = + LinkPtr(&mut link_ptr as *mut Option>>); + let mut link_ptr2 = link_ptr.clone(); let queue = &self.queue; let marker = &AtomicUsize::new(0); @@ -205,7 +227,7 @@ impl TimerQueue { // SAFETY: This pointer is only dereferenced here and on drop of the future // which happens outside this `poll_fn`'s stack frame. - let link = unsafe { &mut *link_ptr }; + let link = unsafe { link_ptr2.get() }; if link.is_none() { let mut link_ref = link.insert(Link::new(WaitingWaker { waker: cx.waker().clone(), @@ -231,7 +253,7 @@ impl TimerQueue { // SAFETY: We only run this and dereference the pointer if we have // exited the `poll_fn` below in the `drop(dropper)` call. The other dereference // of this pointer is in the `poll_fn`. - if let Some(link) = unsafe { &mut *link_ptr } { + if let Some(link) = unsafe { link_ptr.get() } { if link.val.was_poped.load(Ordering::Relaxed) { // If it was poped from the queue there is no need to run delete dropper.defuse(); -- cgit v1.2.3 From d0c51269608c18a105fd010f070bd9af6f443c60 Mon Sep 17 00:00:00 2001 From: Emil Fresk Date: Tue, 31 Jan 2023 22:05:43 +0100 Subject: Cleanup common code and clippy fixes --- rtic-channel/src/lib.rs | 12 +++++------- rtic-time/Cargo.toml | 1 + rtic-time/src/lib.rs | 27 ++------------------------- 3 files changed, 8 insertions(+), 32 deletions(-) (limited to 'rtic-time') diff --git a/rtic-channel/src/lib.rs b/rtic-channel/src/lib.rs index fef546b..47e4a77 100644 --- a/rtic-channel/src/lib.rs +++ b/rtic-channel/src/lib.rs @@ -142,8 +142,8 @@ where { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { match self { - TrySendError::NoReceiver(v) => write!(f, "NoReceiver({:?})", v), - TrySendError::Full(v) => write!(f, "Full({:?})", v), + TrySendError::NoReceiver(v) => write!(f, "NoReceiver({v:?})"), + TrySendError::Full(v) => write!(f, "Full({v:?})"), } } } @@ -401,12 +401,10 @@ impl<'a, T, const N: usize> Receiver<'a, T, N> { } Ok(r) + } else if self.is_closed() { + Err(ReceiveError::NoSender) } else { - if self.is_closed() { - Err(ReceiveError::NoSender) - } else { - Err(ReceiveError::Empty) - } + Err(ReceiveError::Empty) } } diff --git a/rtic-time/Cargo.toml b/rtic-time/Cargo.toml index ea05939..dbcf454 100644 --- a/rtic-time/Cargo.toml +++ b/rtic-time/Cargo.toml @@ -8,3 +8,4 @@ edition = "2021" [dependencies] critical-section = "1" futures-util = { version = "0.3.25", default-features = false } +rtic-common = { version = "1.0.0", path = "../rtic-common" } diff --git a/rtic-time/src/lib.rs b/rtic-time/src/lib.rs index 44fdbce..5e4457c 100644 --- a/rtic-time/src/lib.rs +++ b/rtic-time/src/lib.rs @@ -14,13 +14,13 @@ use futures_util::{ future::{select, Either}, pin_mut, }; +use linked_list::{Link, LinkedList}; pub use monotonic::Monotonic; +use rtic_common::dropper::OnDrop; mod linked_list; mod monotonic; -use linked_list::{Link, LinkedList}; - /// Holds a waker and at which time instant this waker shall be awoken. struct WaitingWaker { waker: Waker, @@ -264,26 +264,3 @@ impl TimerQueue { } } } - -struct OnDrop { - f: core::mem::MaybeUninit, -} - -impl OnDrop { - pub fn new(f: F) -> Self { - Self { - f: core::mem::MaybeUninit::new(f), - } - } - - #[allow(unused)] - pub fn defuse(self) { - core::mem::forget(self) - } -} - -impl Drop for OnDrop { - fn drop(&mut self) { - unsafe { self.f.as_ptr().read()() } - } -} -- cgit v1.2.3 From fe77b4538d6cd506d1a18bdc9e17216dc61881db Mon Sep 17 00:00:00 2001 From: Henrik Tjäder Date: Wed, 1 Feb 2023 21:55:05 +0100 Subject: Add alpha.0 and required Cargo fields --- rtic-arbiter/Cargo.toml | 7 +++++-- rtic-channel/Cargo.toml | 7 +++++-- rtic-common/Cargo.toml | 5 ++++- rtic-monotonics/Cargo.toml | 7 +++++-- rtic-time/Cargo.toml | 7 +++++-- 5 files changed, 24 insertions(+), 9 deletions(-) (limited to 'rtic-time') diff --git a/rtic-arbiter/Cargo.toml b/rtic-arbiter/Cargo.toml index b1afaf4..52daa27 100644 --- a/rtic-arbiter/Cargo.toml +++ b/rtic-arbiter/Cargo.toml @@ -1,13 +1,16 @@ [package] name = "rtic-arbiter" -version = "1.0.0" +version = "1.0.0-alpha.0" edition = "2021" +categories = ["concurrency", "embedded", "no-std", "asynchronous"] +description = "rtic-arbiter lib TODO" +license = "MIT OR Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] critical-section = "1" -rtic-common = { version = "1.0.0", path = "../rtic-common" } +rtic-common = { version = "1.0.0-alpha.0", path = "../rtic-common" } [dev-dependencies] tokio = { version = "1", features = ["rt", "macros", "time"] } diff --git a/rtic-channel/Cargo.toml b/rtic-channel/Cargo.toml index a0955bc..a6f98e4 100644 --- a/rtic-channel/Cargo.toml +++ b/rtic-channel/Cargo.toml @@ -1,14 +1,17 @@ [package] name = "rtic-channel" -version = "1.0.0" +version = "1.0.0-alpha.0" edition = "2021" +categories = ["concurrency", "embedded", "no-std", "asynchronous"] +description = "rtic-channel lib TODO" +license = "MIT OR Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] heapless = "0.7" critical-section = "1" -rtic-common = { version = "1.0.0", path = "../rtic-common" } +rtic-common = { version = "1.0.0-alpha.0", path = "../rtic-common" } [dev-dependencies] tokio = { version = "1", features = ["rt", "macros", "time"] } diff --git a/rtic-common/Cargo.toml b/rtic-common/Cargo.toml index 258caae..35b0f72 100644 --- a/rtic-common/Cargo.toml +++ b/rtic-common/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "rtic-common" -version = "1.0.0" +version = "1.0.0-alpha.0" edition = "2021" +categories = ["concurrency", "embedded", "no-std", "asynchronous"] +description = "rtic-common lib TODO" +license = "MIT OR Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/rtic-monotonics/Cargo.toml b/rtic-monotonics/Cargo.toml index 9d364c8..bb8ff6d 100644 --- a/rtic-monotonics/Cargo.toml +++ b/rtic-monotonics/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "rtic-monotonics" -version = "0.1.0" +version = "1.0.0-alpha.0" edition = "2021" +categories = ["concurrency", "embedded", "no-std", "asynchronous"] +description = "rtic-monotonics lib TODO" +license = "MIT OR Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -9,5 +12,5 @@ edition = "2021" cortex-m = { version = "0.7.6" } embedded-hal-async = "0.2.0-alpha.0" fugit = { version = "0.3.6", features = ["defmt"] } -rtic-time = { version = "1.0.0", path = "../rtic-time" } +rtic-time = { version = "1.0.0-alpha.0", path = "../rtic-time" } atomic-polyfill = "1" diff --git a/rtic-time/Cargo.toml b/rtic-time/Cargo.toml index dbcf454..0ba7b11 100644 --- a/rtic-time/Cargo.toml +++ b/rtic-time/Cargo.toml @@ -1,11 +1,14 @@ [package] name = "rtic-time" -version = "1.0.0" +version = "1.0.0-alpha.0" edition = "2021" +categories = ["concurrency", "embedded", "no-std", "asynchronous"] +description = "rtic-time lib TODO" +license = "MIT OR Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] critical-section = "1" futures-util = { version = "0.3.25", default-features = false } -rtic-common = { version = "1.0.0", path = "../rtic-common" } +rtic-common = { version = "1.0.0-alpha.0", path = "../rtic-common" } -- cgit v1.2.3 From c2d2b1ba38dd291cfcd7e32f11d725db0ec1cf85 Mon Sep 17 00:00:00 2001 From: Henrik Tjäder Date: Wed, 1 Feb 2023 22:11:50 +0100 Subject: Add authors to each Cargo.toml Copy-paste the default one used for the project --- rtic-arbiter/Cargo.toml | 8 ++++++++ rtic-channel/Cargo.toml | 8 ++++++++ rtic-common/Cargo.toml | 8 ++++++++ rtic-monotonics/Cargo.toml | 8 ++++++++ rtic-time/Cargo.toml | 8 ++++++++ 5 files changed, 40 insertions(+) (limited to 'rtic-time') diff --git a/rtic-arbiter/Cargo.toml b/rtic-arbiter/Cargo.toml index 52daa27..d8b18c4 100644 --- a/rtic-arbiter/Cargo.toml +++ b/rtic-arbiter/Cargo.toml @@ -1,7 +1,15 @@ [package] name = "rtic-arbiter" version = "1.0.0-alpha.0" + edition = "2021" +authors = [ + "The Real-Time Interrupt-driven Concurrency developers", + "Emil Fresk ", + "Henrik Tjäder ", + "Jorge Aparicio ", + "Per Lindgren ", +] categories = ["concurrency", "embedded", "no-std", "asynchronous"] description = "rtic-arbiter lib TODO" license = "MIT OR Apache-2.0" diff --git a/rtic-channel/Cargo.toml b/rtic-channel/Cargo.toml index a6f98e4..900f972 100644 --- a/rtic-channel/Cargo.toml +++ b/rtic-channel/Cargo.toml @@ -1,7 +1,15 @@ [package] name = "rtic-channel" version = "1.0.0-alpha.0" + edition = "2021" +authors = [ + "The Real-Time Interrupt-driven Concurrency developers", + "Emil Fresk ", + "Henrik Tjäder ", + "Jorge Aparicio ", + "Per Lindgren ", +] categories = ["concurrency", "embedded", "no-std", "asynchronous"] description = "rtic-channel lib TODO" license = "MIT OR Apache-2.0" diff --git a/rtic-common/Cargo.toml b/rtic-common/Cargo.toml index 35b0f72..726090a 100644 --- a/rtic-common/Cargo.toml +++ b/rtic-common/Cargo.toml @@ -1,7 +1,15 @@ [package] name = "rtic-common" version = "1.0.0-alpha.0" + edition = "2021" +authors = [ + "The Real-Time Interrupt-driven Concurrency developers", + "Emil Fresk ", + "Henrik Tjäder ", + "Jorge Aparicio ", + "Per Lindgren ", +] categories = ["concurrency", "embedded", "no-std", "asynchronous"] description = "rtic-common lib TODO" license = "MIT OR Apache-2.0" diff --git a/rtic-monotonics/Cargo.toml b/rtic-monotonics/Cargo.toml index bb8ff6d..5e6586e 100644 --- a/rtic-monotonics/Cargo.toml +++ b/rtic-monotonics/Cargo.toml @@ -1,7 +1,15 @@ [package] name = "rtic-monotonics" version = "1.0.0-alpha.0" + edition = "2021" +authors = [ + "The Real-Time Interrupt-driven Concurrency developers", + "Emil Fresk ", + "Henrik Tjäder ", + "Jorge Aparicio ", + "Per Lindgren ", +] categories = ["concurrency", "embedded", "no-std", "asynchronous"] description = "rtic-monotonics lib TODO" license = "MIT OR Apache-2.0" diff --git a/rtic-time/Cargo.toml b/rtic-time/Cargo.toml index 0ba7b11..c7d13bb 100644 --- a/rtic-time/Cargo.toml +++ b/rtic-time/Cargo.toml @@ -1,7 +1,15 @@ [package] name = "rtic-time" version = "1.0.0-alpha.0" + edition = "2021" +authors = [ + "The Real-Time Interrupt-driven Concurrency developers", + "Emil Fresk ", + "Henrik Tjäder ", + "Jorge Aparicio ", + "Per Lindgren ", +] categories = ["concurrency", "embedded", "no-std", "asynchronous"] description = "rtic-time lib TODO" license = "MIT OR Apache-2.0" -- cgit v1.2.3 From 1cda61fbda205920517f7b63af90c97c38ff9af6 Mon Sep 17 00:00:00 2001 From: Emil Fresk Date: Sat, 18 Feb 2023 09:43:06 +0100 Subject: Make some linked list operations unsafe, and document their safety at usage --- rtic-arbiter/src/lib.rs | 14 +++++++++----- rtic-channel/src/lib.rs | 15 +++++++++------ rtic-common/src/lib.rs | 4 ++++ rtic-common/src/wait_queue.rs | 38 ++++++++++++++++++++------------------ rtic-time/src/lib.rs | 19 +++++++++++++------ rtic-time/src/linked_list.rs | 18 ++++++++++++------ 6 files changed, 67 insertions(+), 41 deletions(-) (limited to 'rtic-time') diff --git a/rtic-arbiter/src/lib.rs b/rtic-arbiter/src/lib.rs index 09d1b2e..c70fbf5 100644 --- a/rtic-arbiter/src/lib.rs +++ b/rtic-arbiter/src/lib.rs @@ -54,7 +54,8 @@ impl Arbiter { pub async fn access(&self) -> ExclusiveAccess<'_, T> { let mut link_ptr: Option> = None; - // Make this future `Drop`-safe, also shadow the original definition so we can't abuse it. + // Make this future `Drop`-safe. + // SAFETY(link_ptr): Shadow the original definition of `link_ptr` so we can't abuse it. let mut link_ptr = LinkPtr(&mut link_ptr as *mut Option>); let mut link_ptr2 = link_ptr.clone(); @@ -89,10 +90,13 @@ impl Arbiter { // Place the link in the wait queue on first run. let link_ref = link.insert(Link::new(cx.waker().clone())); - // SAFETY: The address to the link is stable as it is hidden behind - // `link_ptr`, and `link_ptr` shadows the original making it unmovable. - self.wait_queue - .push(unsafe { Pin::new_unchecked(link_ref) }); + // SAFETY(new_unchecked): The address to the link is stable as it is defined + // outside this stack frame. + // SAFETY(push): `link_ref` lifetime comes from `link_ptr` that is shadowed, + // and we make sure in `dropper` that the link is removed from the queue + // before dropping `link_ptr` AND `dropper` makes sure that the shadowed + // `link_ptr` lives until the end of the stack frame. + unsafe { self.wait_queue.push(Pin::new_unchecked(link_ref)) }; } Poll::Pending diff --git a/rtic-channel/src/lib.rs b/rtic-channel/src/lib.rs index 47e4a77..a4e4935 100644 --- a/rtic-channel/src/lib.rs +++ b/rtic-channel/src/lib.rs @@ -240,7 +240,8 @@ impl<'a, T, const N: usize> Sender<'a, T, N> { pub async fn send(&mut self, val: T) -> Result<(), NoReceiver> { let mut link_ptr: Option> = None; - // Make this future `Drop`-safe, also shadow the original definition so we can't abuse it. + // Make this future `Drop`-safe. + // SAFETY(link_ptr): Shadow the original definition of `link_ptr` so we can't abuse it. let mut link_ptr = LinkPtr(&mut link_ptr as *mut Option>); let mut link_ptr2 = link_ptr.clone(); @@ -276,11 +277,13 @@ impl<'a, T, const N: usize> Sender<'a, T, N> { // Place the link in the wait queue on first run. let link_ref = link.insert(Link::new(cx.waker().clone())); - // SAFETY: The address to the link is stable as it is hidden behind - // `link_ptr`, and `link_ptr` shadows the original making it unmovable. - self.0 - .wait_queue - .push(unsafe { Pin::new_unchecked(link_ref) }); + // SAFETY(new_unchecked): The address to the link is stable as it is defined + // outside this stack frame. + // SAFETY(push): `link_ref` lifetime comes from `link_ptr` that is shadowed, + // and we make sure in `dropper` that the link is removed from the queue + // before dropping `link_ptr` AND `dropper` makes sure that the shadowed + // `link_ptr` lives until the end of the stack frame. + unsafe { self.0.wait_queue.push(Pin::new_unchecked(link_ref)) }; return None; } diff --git a/rtic-common/src/lib.rs b/rtic-common/src/lib.rs index b8b5e0d..03d0306 100644 --- a/rtic-common/src/lib.rs +++ b/rtic-common/src/lib.rs @@ -4,6 +4,10 @@ #![deny(missing_docs)] //deny_warnings_placeholder_for_ci +#[cfg(test)] +#[macro_use] +extern crate std; + pub mod dropper; pub mod wait_queue; pub mod waker_registration; diff --git a/rtic-common/src/wait_queue.rs b/rtic-common/src/wait_queue.rs index 4ced6ab..7387a98 100644 --- a/rtic-common/src/wait_queue.rs +++ b/rtic-common/src/wait_queue.rs @@ -68,7 +68,9 @@ impl LinkedList { } /// Put an element at the back of the queue. - pub fn push(&self, link: Pin<&mut Link>) { + /// + /// SAFETY: The link must live until it is removed from the queue. + pub unsafe fn push(&self, link: Pin<&Link>) { cs::with(|_| { // Make sure all previous writes are visible core::sync::atomic::fence(Ordering::SeqCst); @@ -76,17 +78,17 @@ impl LinkedList { let tail = self.tail.load(Self::R); // SAFETY: This datastructure does not move the underlying value. - let link = unsafe { link.get_unchecked_mut() }; + let link = link.get_ref(); if let Some(tail_ref) = unsafe { tail.as_ref() } { // Queue is not empty link.prev.store(tail, Self::R); - self.tail.store(link, Self::R); - tail_ref.next.store(link, Self::R); + self.tail.store(link as *const _ as *mut _, Self::R); + tail_ref.next.store(link as *const _ as *mut _, Self::R); } else { // Queue is empty - self.tail.store(link, Self::R); - self.head.store(link, Self::R); + self.tail.store(link as *const _ as *mut _, Self::R); + self.head.store(link as *const _ as *mut _, Self::R); } }); } @@ -126,7 +128,7 @@ impl Link { } /// Remove this link from a linked list. - pub fn remove_from_list(&mut self, list: &LinkedList) { + pub fn remove_from_list(&self, list: &LinkedList) { cs::with(|_| { // Make sure all previous writes are visible core::sync::atomic::fence(Ordering::SeqCst); @@ -230,17 +232,17 @@ mod tests { fn linked_list() { let wq = LinkedList::::new(); - let mut i1 = Link::new(10); - let mut i2 = Link::new(11); - let mut i3 = Link::new(12); - let mut i4 = Link::new(13); - let mut i5 = Link::new(14); - - wq.push(unsafe { Pin::new_unchecked(&mut i1) }); - wq.push(unsafe { Pin::new_unchecked(&mut i2) }); - wq.push(unsafe { Pin::new_unchecked(&mut i3) }); - wq.push(unsafe { Pin::new_unchecked(&mut i4) }); - wq.push(unsafe { Pin::new_unchecked(&mut i5) }); + let i1 = Link::new(10); + let i2 = Link::new(11); + let i3 = Link::new(12); + let i4 = Link::new(13); + let i5 = Link::new(14); + + unsafe { wq.push(Pin::new_unchecked(&i1)) }; + unsafe { wq.push(Pin::new_unchecked(&i2)) }; + unsafe { wq.push(Pin::new_unchecked(&i3)) }; + unsafe { wq.push(Pin::new_unchecked(&i4)) }; + unsafe { wq.push(Pin::new_unchecked(&i5)) }; wq.print(); diff --git a/rtic-time/src/lib.rs b/rtic-time/src/lib.rs index 5e4457c..3126e6b 100644 --- a/rtic-time/src/lib.rs +++ b/rtic-time/src/lib.rs @@ -208,7 +208,8 @@ impl TimerQueue { let mut link_ptr: Option>> = None; - // Make this future `Drop`-safe, also shadow the original definition so we can't abuse it. + // Make this future `Drop`-safe + // SAFETY(link_ptr): Shadow the original definition of `link_ptr` so we can't abuse it. let mut link_ptr = LinkPtr(&mut link_ptr as *mut Option>>); let mut link_ptr2 = link_ptr.clone(); @@ -226,18 +227,24 @@ impl TimerQueue { } // SAFETY: This pointer is only dereferenced here and on drop of the future - // which happens outside this `poll_fn`'s stack frame. + // which happens outside this `poll_fn`'s stack frame, so this mutable access cannot + // happen at the same time as `dropper` runs. let link = unsafe { link_ptr2.get() }; if link.is_none() { - let mut link_ref = link.insert(Link::new(WaitingWaker { + let link_ref = link.insert(Link::new(WaitingWaker { waker: cx.waker().clone(), release_at: instant, was_poped: AtomicBool::new(false), })); - // SAFETY: The address to the link is stable as it is defined outside this stack - // frame. - let (was_empty, addr) = queue.insert(unsafe { Pin::new_unchecked(&mut link_ref) }); + // SAFETY(new_unchecked): The address to the link is stable as it is defined + //outside this stack frame. + // SAFETY(insert): `link_ref` lifetime comes from `link_ptr` that is shadowed, and + // we make sure in `dropper` that the link is removed from the queue before + // dropping `link_ptr` AND `dropper` makes sure that the shadowed `link_ptr` lives + // until the end of the stack frame. + let (was_empty, addr) = unsafe { queue.insert(Pin::new_unchecked(&link_ref)) }; + marker.store(addr, Ordering::Relaxed); if was_empty { diff --git a/rtic-time/src/linked_list.rs b/rtic-time/src/linked_list.rs index de5ea2a..d4256c9 100644 --- a/rtic-time/src/linked_list.rs +++ b/rtic-time/src/linked_list.rs @@ -93,10 +93,12 @@ impl LinkedList { /// Insert a new link into the linked list. /// The return is (was_empty, address), where the address of the link is for use with `delete`. - pub fn insert(&self, val: Pin<&mut Link>) -> (bool, usize) { + /// + /// SAFETY: The pinned link must live until it is removed from this list. + pub unsafe fn insert(&self, val: Pin<&Link>) -> (bool, usize) { cs::with(|_| { // SAFETY: This datastructure does not move the underlying value. - let val = unsafe { val.get_unchecked_mut() }; + let val = val.get_ref(); let addr = val as *const _ as usize; // Make sure all previous writes are visible @@ -111,7 +113,8 @@ impl LinkedList { let head_ref = if let Some(head_ref) = unsafe { head.as_ref() } { head_ref } else { - self.head.store(val, Ordering::Relaxed); + self.head + .store(val as *const _ as *mut _, Ordering::Relaxed); return (true, addr); }; @@ -121,7 +124,8 @@ impl LinkedList { val.next.store(head, Ordering::Relaxed); // `val` is now first in the queue - self.head.store(val, Ordering::Relaxed); + self.head + .store(val as *const _ as *mut _, Ordering::Relaxed); return (false, addr); } @@ -139,7 +143,8 @@ impl LinkedList { val.next.store(next, Ordering::Relaxed); // Insert `val` - curr.next.store(val, Ordering::Relaxed); + curr.next + .store(val as *const _ as *mut _, Ordering::Relaxed); return (false, addr); } @@ -150,7 +155,8 @@ impl LinkedList { } // No next, write link to last position in list - curr.next.store(val, Ordering::Relaxed); + curr.next + .store(val as *const _ as *mut _, Ordering::Relaxed); (false, addr) }) -- cgit v1.2.3 From ebd35b89a4abe147e11bd7f716788cf642368b6f Mon Sep 17 00:00:00 2001 From: Henrik Tjäder Date: Sat, 4 Mar 2023 20:52:50 +0100 Subject: rtic-time: clippy fixes --- rtic-time/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'rtic-time') diff --git a/rtic-time/src/lib.rs b/rtic-time/src/lib.rs index 3126e6b..78b57a4 100644 --- a/rtic-time/src/lib.rs +++ b/rtic-time/src/lib.rs @@ -243,7 +243,7 @@ impl TimerQueue { // we make sure in `dropper` that the link is removed from the queue before // dropping `link_ptr` AND `dropper` makes sure that the shadowed `link_ptr` lives // until the end of the stack frame. - let (was_empty, addr) = unsafe { queue.insert(Pin::new_unchecked(&link_ref)) }; + let (was_empty, addr) = unsafe { queue.insert(Pin::new_unchecked(link_ref)) }; marker.store(addr, Ordering::Relaxed); -- cgit v1.2.3