aboutsummaryrefslogtreecommitdiff
path: root/rtic-common
diff options
context:
space:
mode:
authorOleksandr Babak <alexanderbabak@proton.me>2025-05-15 14:03:47 +0200
committerHenrik Tjäder <henrik@tjaders.com>2025-06-15 09:03:30 +0000
commit11f1bc60fa76a6d8bfaa86cf5f18fe91c5ff0278 (patch)
tree70a003896d55aa41907c555fc265c010c0e30751 /rtic-common
parentff3b011cef0ca6e5f77cb1112940337e913ca957 (diff)
feat: `wait_until` method for waker queue
Diffstat (limited to 'rtic-common')
-rw-r--r--rtic-common/src/wait_queue.rs58
1 files changed, 54 insertions, 4 deletions
diff --git a/rtic-common/src/wait_queue.rs b/rtic-common/src/wait_queue.rs
index 0f3a59d..de27710 100644
--- a/rtic-common/src/wait_queue.rs
+++ b/rtic-common/src/wait_queue.rs
@@ -1,12 +1,17 @@
//! A wait queue implementation using a doubly linked list.
-use core::marker::PhantomPinned;
-use core::pin::Pin;
-use core::ptr::null_mut;
-use core::task::Waker;
+use core::{
+ future::poll_fn,
+ marker::PhantomPinned,
+ pin::{pin, Pin},
+ ptr::null_mut,
+ task::{Poll, Waker},
+};
use critical_section as cs;
use portable_atomic::{AtomicBool, AtomicPtr, Ordering};
+use crate::dropper::OnDropWith;
+
/// A helper definition of a wait queue.
pub type WaitQueue = DoublyLinkedList<Waker>;
@@ -220,6 +225,51 @@ impl<T: core::fmt::Debug + Clone> DoublyLinkedList<T> {
}
}
+impl DoublyLinkedList<Waker> {
+ /// Wait until `f` returns `Some`.
+ pub async fn wait_until<T, F: FnMut() -> Option<T>>(&self, mut f: F) -> T {
+ let link_place = pin!(None::<Link<Waker>>);
+
+ let mut link_guard = OnDropWith::new(link_place, |link| {
+ if let Some(link) = link.as_ref().as_pin_ref() {
+ link.remove_from_list(self);
+ }
+ link.set(None);
+ });
+
+ poll_fn(move |cx| {
+ // clean up the old link, because we are going to invalidate it.
+ // we are doing it before returning `Poll::Ready` to handle cases
+ // where the future is polled after it is completed.
+ link_guard.execute();
+
+ if let Some(val) = f() {
+ return Poll::Ready(val);
+ }
+
+ // note: we may introduce a more complex logic to try to reuse the old link
+ // with the old waker by using `Waker::will_wake` to avoid `Waker::clone`,
+ // but it is probably not needed as Rtic's `waker` is cheap to clone.
+
+ // By the contract, each poll we should update the waker.
+ let new_link = Link::new(cx.waker().clone());
+
+ // Store the link into the pinned place.
+ link_guard.set(Some(new_link));
+
+ let new_link_pinned = link_guard.as_ref().as_pin_ref().expect("We just set it");
+
+ // SAFETY: we guarantee that `link` will live until removed by cleaning it up
+ // in the destructor of the future and that destructor is guaranteed to run
+ // before it's memory is reused or invalidated because the future is pinned.
+ unsafe { self.push(new_link_pinned) };
+
+ Poll::Pending
+ })
+ .await
+ }
+}
+
#[cfg(test)]
impl<T: core::fmt::Debug + Clone> Link<T> {
fn print(&self) {