diff options
Diffstat (limited to 'rtic-common/src/wait_queue.rs')
| -rw-r--r-- | rtic-common/src/wait_queue.rs | 58 |
1 files changed, 54 insertions, 4 deletions
diff --git a/rtic-common/src/wait_queue.rs b/rtic-common/src/wait_queue.rs index 0f3a59d..de27710 100644 --- a/rtic-common/src/wait_queue.rs +++ b/rtic-common/src/wait_queue.rs @@ -1,12 +1,17 @@ //! A wait queue implementation using a doubly linked list. -use core::marker::PhantomPinned; -use core::pin::Pin; -use core::ptr::null_mut; -use core::task::Waker; +use core::{ + future::poll_fn, + marker::PhantomPinned, + pin::{pin, Pin}, + ptr::null_mut, + task::{Poll, Waker}, +}; use critical_section as cs; use portable_atomic::{AtomicBool, AtomicPtr, Ordering}; +use crate::dropper::OnDropWith; + /// A helper definition of a wait queue. pub type WaitQueue = DoublyLinkedList<Waker>; @@ -220,6 +225,51 @@ impl<T: core::fmt::Debug + Clone> DoublyLinkedList<T> { } } +impl DoublyLinkedList<Waker> { + /// Wait until `f` returns `Some`. + pub async fn wait_until<T, F: FnMut() -> Option<T>>(&self, mut f: F) -> T { + let link_place = pin!(None::<Link<Waker>>); + + let mut link_guard = OnDropWith::new(link_place, |link| { + if let Some(link) = link.as_ref().as_pin_ref() { + link.remove_from_list(self); + } + link.set(None); + }); + + poll_fn(move |cx| { + // clean up the old link, because we are going to invalidate it. + // we are doing it before returning `Poll::Ready` to handle cases + // where the future is polled after it is completed. + link_guard.execute(); + + if let Some(val) = f() { + return Poll::Ready(val); + } + + // note: we may introduce a more complex logic to try to reuse the old link + // with the old waker by using `Waker::will_wake` to avoid `Waker::clone`, + // but it is probably not needed as Rtic's `waker` is cheap to clone. + + // By the contract, each poll we should update the waker. + let new_link = Link::new(cx.waker().clone()); + + // Store the link into the pinned place. + link_guard.set(Some(new_link)); + + let new_link_pinned = link_guard.as_ref().as_pin_ref().expect("We just set it"); + + // SAFETY: we guarantee that `link` will live until removed by cleaning it up + // in the destructor of the future and that destructor is guaranteed to run + // before it's memory is reused or invalidated because the future is pinned. + unsafe { self.push(new_link_pinned) }; + + Poll::Pending + }) + .await + } +} + #[cfg(test)] impl<T: core::fmt::Debug + Clone> Link<T> { fn print(&self) { |
