From e65e532c2a342f77080ac6fc8e5be11aa7d82575 Mon Sep 17 00:00:00 2001 From: Emil Fresk Date: Sun, 29 Jan 2023 20:16:23 +0100 Subject: Move common data structures to `rtic-common` --- rtic-channel/Cargo.toml | 3 +- rtic-channel/src/lib.rs | 17 +-- rtic-channel/src/wait_queue.rs | 268 -------------------------------- rtic-channel/src/waker_registration.rs | 64 -------- rtic-common/.gitignore | 2 + rtic-common/CHANGELOG.md | 16 ++ rtic-common/Cargo.toml | 13 ++ rtic-common/src/lib.rs | 8 + rtic-common/src/wait_queue.rs | 271 +++++++++++++++++++++++++++++++++ rtic-common/src/waker_registration.rs | 66 ++++++++ 10 files changed, 385 insertions(+), 343 deletions(-) delete mode 100644 rtic-channel/src/wait_queue.rs delete mode 100644 rtic-channel/src/waker_registration.rs create mode 100644 rtic-common/.gitignore create mode 100644 rtic-common/CHANGELOG.md create mode 100644 rtic-common/Cargo.toml create mode 100644 rtic-common/src/lib.rs create mode 100644 rtic-common/src/wait_queue.rs create mode 100644 rtic-common/src/waker_registration.rs diff --git a/rtic-channel/Cargo.toml b/rtic-channel/Cargo.toml index 5d4cbd0..a0955bc 100644 --- a/rtic-channel/Cargo.toml +++ b/rtic-channel/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] heapless = "0.7" critical-section = "1" +rtic-common = { version = "1.0.0", path = "../rtic-common" } [dev-dependencies] tokio = { version = "1", features = ["rt", "macros", "time"] } @@ -15,4 +16,4 @@ tokio = { version = "1", features = ["rt", "macros", "time"] } [features] default = [] -testing = ["critical-section/std"] +testing = ["critical-section/std", "rtic-common/testing"] diff --git a/rtic-channel/src/lib.rs b/rtic-channel/src/lib.rs index acfa801..2b237f6 100644 --- a/rtic-channel/src/lib.rs +++ b/rtic-channel/src/lib.rs @@ -14,11 +14,8 @@ use core::{ task::{Poll, Waker}, }; use heapless::Deque; -use wait_queue::WaitQueue; -use waker_registration::CriticalSectionWakerRegistration as WakerRegistration; - -mod wait_queue; -mod waker_registration; +use rtic_common::wait_queue::{Link, WaitQueue}; +use rtic_common::waker_registration::CriticalSectionWakerRegistration as WakerRegistration; /// An MPSC channel for use in no-alloc systems. `N` sets the size of the queue. /// @@ -136,11 +133,11 @@ unsafe impl<'a, T, const N: usize> Send for Sender<'a, T, N> {} /// This is needed to make the async closure in `send` accept that we "share" /// the link possible between threads. #[derive(Clone)] -struct LinkPtr(*mut Option>); +struct LinkPtr(*mut Option>); impl LinkPtr { /// This will dereference the pointer stored within and give out an `&mut`. - unsafe fn get(&mut self) -> &mut Option> { + unsafe fn get(&mut self) -> &mut Option> { &mut *self.0 } } @@ -200,10 +197,10 @@ impl<'a, T, const N: usize> Sender<'a, T, N> { /// Send a value. If there is no place left in the queue this will wait until there is. /// If the receiver does not exist this will return an error. pub async fn send(&mut self, val: T) -> Result<(), NoReceiver> { - let mut link_ptr: Option> = None; + let mut link_ptr: Option> = None; // Make this future `Drop`-safe, also shadow the original definition so we can't abuse it. - let mut link_ptr = LinkPtr(&mut link_ptr as *mut Option>); + let mut link_ptr = LinkPtr(&mut link_ptr as *mut Option>); let mut link_ptr2 = link_ptr.clone(); let dropper = OnDrop::new(|| { @@ -236,7 +233,7 @@ impl<'a, T, const N: usize> Sender<'a, T, N> { } } else { // Place the link in the wait queue on first run. - let link_ref = link.insert(wait_queue::Link::new(cx.waker().clone())); + let link_ref = link.insert(Link::new(cx.waker().clone())); // SAFETY: The address to the link is stable as it is hidden behind // `link_ptr`, and `link_ptr` shadows the original making it unmovable. diff --git a/rtic-channel/src/wait_queue.rs b/rtic-channel/src/wait_queue.rs deleted file mode 100644 index 2de6311..0000000 --- a/rtic-channel/src/wait_queue.rs +++ /dev/null @@ -1,268 +0,0 @@ -//! ... - -use core::marker::PhantomPinned; -use core::pin::Pin; -use core::ptr::null_mut; -use core::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; -use core::task::Waker; -use critical_section as cs; - -pub type WaitQueue = LinkedList; - -/// A FIFO linked list for a wait queue. -pub struct LinkedList { - head: AtomicPtr>, // UnsafeCell<*mut Link> - tail: AtomicPtr>, -} - -impl LinkedList { - /// Create a new linked list. - pub const fn new() -> Self { - Self { - head: AtomicPtr::new(null_mut()), - tail: AtomicPtr::new(null_mut()), - } - } -} - -impl LinkedList { - const R: Ordering = Ordering::Relaxed; - - /// Pop the first element in the queue. - pub fn pop(&self) -> Option { - cs::with(|_| { - // Make sure all previous writes are visible - core::sync::atomic::fence(Ordering::SeqCst); - - let head = self.head.load(Self::R); - - // SAFETY: `as_ref` is safe as `insert` requires a valid reference to a link - if let Some(head_ref) = unsafe { head.as_ref() } { - // Move head to the next element - self.head.store(head_ref.next.load(Self::R), Self::R); - - // We read the value at head - let head_val = head_ref.val.clone(); - - let tail = self.tail.load(Self::R); - if head == tail { - // The queue is empty - self.tail.store(null_mut(), Self::R); - } - - if let Some(next_ref) = unsafe { head_ref.next.load(Self::R).as_ref() } { - next_ref.prev.store(null_mut(), Self::R); - } - - // Clear the pointers in the node. - head_ref.next.store(null_mut(), Self::R); - head_ref.prev.store(null_mut(), Self::R); - head_ref.is_poped.store(true, Self::R); - - return Some(head_val); - } - - None - }) - } - - /// Put an element at the back of the queue. - pub fn push(&self, link: Pin<&mut Link>) { - cs::with(|_| { - // Make sure all previous writes are visible - core::sync::atomic::fence(Ordering::SeqCst); - - let tail = self.tail.load(Self::R); - - // SAFETY: This datastructure does not move the underlying value. - let link = unsafe { link.get_unchecked_mut() }; - - if let Some(tail_ref) = unsafe { tail.as_ref() } { - // Queue is not empty - link.prev.store(tail, Self::R); - self.tail.store(link, Self::R); - tail_ref.next.store(link, Self::R); - } else { - // Queue is empty - self.tail.store(link, Self::R); - self.head.store(link, Self::R); - } - }); - } - - /// Check if the queue is empty. - pub fn is_empty(&self) -> bool { - self.head.load(Self::R).is_null() - } -} - -/// A link in the linked list. -pub struct Link { - pub(crate) val: T, - next: AtomicPtr>, - prev: AtomicPtr>, - is_poped: AtomicBool, - _up: PhantomPinned, -} - -impl Link { - const R: Ordering = Ordering::Relaxed; - - /// Create a new link. - pub const fn new(val: T) -> Self { - Self { - val, - next: AtomicPtr::new(null_mut()), - prev: AtomicPtr::new(null_mut()), - is_poped: AtomicBool::new(false), - _up: PhantomPinned, - } - } - - pub fn is_poped(&self) -> bool { - self.is_poped.load(Self::R) - } - - pub fn remove_from_list(&mut self, list: &LinkedList) { - cs::with(|_| { - // Make sure all previous writes are visible - core::sync::atomic::fence(Ordering::SeqCst); - - let prev = self.prev.load(Self::R); - let next = self.next.load(Self::R); - self.is_poped.store(true, Self::R); - - match unsafe { (prev.as_ref(), next.as_ref()) } { - (None, None) => { - // Not in the list or alone in the list, check if list head == node address - let sp = self as *const _; - - if sp == list.head.load(Ordering::Relaxed) { - list.head.store(null_mut(), Self::R); - list.tail.store(null_mut(), Self::R); - } - } - (None, Some(next_ref)) => { - // First in the list - next_ref.prev.store(null_mut(), Self::R); - list.head.store(next, Self::R); - } - (Some(prev_ref), None) => { - // Last in the list - prev_ref.next.store(null_mut(), Self::R); - list.tail.store(prev, Self::R); - } - (Some(prev_ref), Some(next_ref)) => { - // Somewhere in the list - - // Connect the `prev.next` and `next.prev` with each other to remove the node - prev_ref.next.store(next, Self::R); - next_ref.prev.store(prev, Self::R); - } - } - }) - } -} - -#[cfg(test)] -impl LinkedList { - fn print(&self) { - cs::with(|_| { - // Make sure all previous writes are visible - core::sync::atomic::fence(Ordering::SeqCst); - - let mut head = self.head.load(Self::R); - let tail = self.tail.load(Self::R); - - println!( - "List - h = 0x{:x}, t = 0x{:x}", - head as usize, tail as usize - ); - - let mut i = 0; - - // SAFETY: `as_ref` is safe as `insert` requires a valid reference to a link - while let Some(head_ref) = unsafe { head.as_ref() } { - println!( - " {}: {:?}, s = 0x{:x}, n = 0x{:x}, p = 0x{:x}", - i, - head_ref.val, - head as usize, - head_ref.next.load(Ordering::Relaxed) as usize, - head_ref.prev.load(Ordering::Relaxed) as usize - ); - - head = head_ref.next.load(Self::R); - - i += 1; - } - }); - } -} - -#[cfg(test)] -impl Link { - fn print(&self) { - cs::with(|_| { - // Make sure all previous writes are visible - core::sync::atomic::fence(Ordering::SeqCst); - - println!("Link:"); - - println!( - " val = {:?}, n = 0x{:x}, p = 0x{:x}", - self.val, - self.next.load(Ordering::Relaxed) as usize, - self.prev.load(Ordering::Relaxed) as usize - ); - }); - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn linked_list() { - let wq = LinkedList::::new(); - - let mut i1 = Link::new(10); - let mut i2 = Link::new(11); - let mut i3 = Link::new(12); - let mut i4 = Link::new(13); - let mut i5 = Link::new(14); - - wq.push(unsafe { Pin::new_unchecked(&mut i1) }); - wq.push(unsafe { Pin::new_unchecked(&mut i2) }); - wq.push(unsafe { Pin::new_unchecked(&mut i3) }); - wq.push(unsafe { Pin::new_unchecked(&mut i4) }); - wq.push(unsafe { Pin::new_unchecked(&mut i5) }); - - wq.print(); - - wq.pop(); - i1.print(); - - wq.print(); - - i4.remove_from_list(&wq); - - wq.print(); - - // i1.remove_from_list(&wq); - // wq.print(); - - println!("i2"); - i2.remove_from_list(&wq); - wq.print(); - - println!("i3"); - i3.remove_from_list(&wq); - wq.print(); - - println!("i5"); - i5.remove_from_list(&wq); - wq.print(); - } -} diff --git a/rtic-channel/src/waker_registration.rs b/rtic-channel/src/waker_registration.rs deleted file mode 100644 index c30df7f..0000000 --- a/rtic-channel/src/waker_registration.rs +++ /dev/null @@ -1,64 +0,0 @@ -use core::cell::UnsafeCell; -use core::task::Waker; - -/// A critical section based waker handler. -pub struct CriticalSectionWakerRegistration { - waker: UnsafeCell>, -} - -unsafe impl Send for CriticalSectionWakerRegistration {} -unsafe impl Sync for CriticalSectionWakerRegistration {} - -impl CriticalSectionWakerRegistration { - /// Create a new waker registration. - pub const fn new() -> Self { - Self { - waker: UnsafeCell::new(None), - } - } - - /// Register a waker. - /// This will overwrite the previous waker if there was one. - pub fn register(&self, new_waker: &Waker) { - critical_section::with(|_| { - // SAFETY: This access is protected by the critical section. - let self_waker = unsafe { &mut *self.waker.get() }; - - // From embassy - // https://github.com/embassy-rs/embassy/blob/b99533607ceed225dd12ae73aaa9a0d969a7365e/embassy-sync/src/waitqueue/waker.rs#L59-L61 - match self_waker { - // Optimization: If both the old and new Wakers wake the same task, we can simply - // keep the old waker, skipping the clone. (In most executor implementations, - // cloning a waker is somewhat expensive, comparable to cloning an Arc). - Some(ref w2) if (w2.will_wake(new_waker)) => {} - _ => { - // clone the new waker and store it - if let Some(old_waker) = core::mem::replace(self_waker, Some(new_waker.clone())) - { - // We had a waker registered for another task. Wake it, so the other task can - // reregister itself if it's still interested. - // - // If two tasks are waiting on the same thing concurrently, this will cause them - // to wake each other in a loop fighting over this WakerRegistration. This wastes - // CPU but things will still work. - // - // If the user wants to have two tasks waiting on the same thing they should use - // a more appropriate primitive that can store multiple wakers. - old_waker.wake() - } - } - } - }); - } - - /// Wake the waker. - pub fn wake(&self) { - critical_section::with(|_| { - // SAFETY: This access is protected by the critical section. - let self_waker = unsafe { &mut *self.waker.get() }; - if let Some(waker) = self_waker.take() { - waker.wake() - } - }); - } -} diff --git a/rtic-common/.gitignore b/rtic-common/.gitignore new file mode 100644 index 0000000..1e7caa9 --- /dev/null +++ b/rtic-common/.gitignore @@ -0,0 +1,2 @@ +Cargo.lock +target/ diff --git a/rtic-common/CHANGELOG.md b/rtic-common/CHANGELOG.md new file mode 100644 index 0000000..d3a9d84 --- /dev/null +++ b/rtic-common/CHANGELOG.md @@ -0,0 +1,16 @@ +# Change Log + +All notable changes to this project will be documented in this file. +This project adheres to [Semantic Versioning](http://semver.org/). + +For each category, *Added*, *Changed*, *Fixed* add new entries at the top! + +## [Unreleased] + +### Added + +### Changed + +### Fixed + +## [v1.0.0] - 2023-xx-xx diff --git a/rtic-common/Cargo.toml b/rtic-common/Cargo.toml new file mode 100644 index 0000000..258caae --- /dev/null +++ b/rtic-common/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "rtic-common" +version = "1.0.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +critical-section = "1" + +[features] +default = [] +testing = ["critical-section/std"] diff --git a/rtic-common/src/lib.rs b/rtic-common/src/lib.rs new file mode 100644 index 0000000..3c75856 --- /dev/null +++ b/rtic-common/src/lib.rs @@ -0,0 +1,8 @@ +//! Crate + +#![no_std] +#![deny(missing_docs)] +//deny_warnings_placeholder_for_ci + +pub mod wait_queue; +pub mod waker_registration; diff --git a/rtic-common/src/wait_queue.rs b/rtic-common/src/wait_queue.rs new file mode 100644 index 0000000..ba8ff54 --- /dev/null +++ b/rtic-common/src/wait_queue.rs @@ -0,0 +1,271 @@ +//! ... + +use core::marker::PhantomPinned; +use core::pin::Pin; +use core::ptr::null_mut; +use core::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; +use core::task::Waker; +use critical_section as cs; + +/// A helper definition of a wait queue. +pub type WaitQueue = LinkedList; + +/// A FIFO linked list for a wait queue. +pub struct LinkedList { + head: AtomicPtr>, // UnsafeCell<*mut Link> + tail: AtomicPtr>, +} + +impl LinkedList { + /// Create a new linked list. + pub const fn new() -> Self { + Self { + head: AtomicPtr::new(null_mut()), + tail: AtomicPtr::new(null_mut()), + } + } +} + +impl LinkedList { + const R: Ordering = Ordering::Relaxed; + + /// Pop the first element in the queue. + pub fn pop(&self) -> Option { + cs::with(|_| { + // Make sure all previous writes are visible + core::sync::atomic::fence(Ordering::SeqCst); + + let head = self.head.load(Self::R); + + // SAFETY: `as_ref` is safe as `insert` requires a valid reference to a link + if let Some(head_ref) = unsafe { head.as_ref() } { + // Move head to the next element + self.head.store(head_ref.next.load(Self::R), Self::R); + + // We read the value at head + let head_val = head_ref.val.clone(); + + let tail = self.tail.load(Self::R); + if head == tail { + // The queue is empty + self.tail.store(null_mut(), Self::R); + } + + if let Some(next_ref) = unsafe { head_ref.next.load(Self::R).as_ref() } { + next_ref.prev.store(null_mut(), Self::R); + } + + // Clear the pointers in the node. + head_ref.next.store(null_mut(), Self::R); + head_ref.prev.store(null_mut(), Self::R); + head_ref.is_poped.store(true, Self::R); + + return Some(head_val); + } + + None + }) + } + + /// Put an element at the back of the queue. + pub fn push(&self, link: Pin<&mut Link>) { + cs::with(|_| { + // Make sure all previous writes are visible + core::sync::atomic::fence(Ordering::SeqCst); + + let tail = self.tail.load(Self::R); + + // SAFETY: This datastructure does not move the underlying value. + let link = unsafe { link.get_unchecked_mut() }; + + if let Some(tail_ref) = unsafe { tail.as_ref() } { + // Queue is not empty + link.prev.store(tail, Self::R); + self.tail.store(link, Self::R); + tail_ref.next.store(link, Self::R); + } else { + // Queue is empty + self.tail.store(link, Self::R); + self.head.store(link, Self::R); + } + }); + } + + /// Check if the queue is empty. + pub fn is_empty(&self) -> bool { + self.head.load(Self::R).is_null() + } +} + +/// A link in the linked list. +pub struct Link { + pub(crate) val: T, + next: AtomicPtr>, + prev: AtomicPtr>, + is_poped: AtomicBool, + _up: PhantomPinned, +} + +impl Link { + const R: Ordering = Ordering::Relaxed; + + /// Create a new link. + pub const fn new(val: T) -> Self { + Self { + val, + next: AtomicPtr::new(null_mut()), + prev: AtomicPtr::new(null_mut()), + is_poped: AtomicBool::new(false), + _up: PhantomPinned, + } + } + + /// Return true if this link has been poped from the list. + pub fn is_poped(&self) -> bool { + self.is_poped.load(Self::R) + } + + /// Remove this link from a linked list. + pub fn remove_from_list(&mut self, list: &LinkedList) { + cs::with(|_| { + // Make sure all previous writes are visible + core::sync::atomic::fence(Ordering::SeqCst); + + let prev = self.prev.load(Self::R); + let next = self.next.load(Self::R); + self.is_poped.store(true, Self::R); + + match unsafe { (prev.as_ref(), next.as_ref()) } { + (None, None) => { + // Not in the list or alone in the list, check if list head == node address + let sp = self as *const _; + + if sp == list.head.load(Ordering::Relaxed) { + list.head.store(null_mut(), Self::R); + list.tail.store(null_mut(), Self::R); + } + } + (None, Some(next_ref)) => { + // First in the list + next_ref.prev.store(null_mut(), Self::R); + list.head.store(next, Self::R); + } + (Some(prev_ref), None) => { + // Last in the list + prev_ref.next.store(null_mut(), Self::R); + list.tail.store(prev, Self::R); + } + (Some(prev_ref), Some(next_ref)) => { + // Somewhere in the list + + // Connect the `prev.next` and `next.prev` with each other to remove the node + prev_ref.next.store(next, Self::R); + next_ref.prev.store(prev, Self::R); + } + } + }) + } +} + +#[cfg(test)] +impl LinkedList { + fn print(&self) { + cs::with(|_| { + // Make sure all previous writes are visible + core::sync::atomic::fence(Ordering::SeqCst); + + let mut head = self.head.load(Self::R); + let tail = self.tail.load(Self::R); + + println!( + "List - h = 0x{:x}, t = 0x{:x}", + head as usize, tail as usize + ); + + let mut i = 0; + + // SAFETY: `as_ref` is safe as `insert` requires a valid reference to a link + while let Some(head_ref) = unsafe { head.as_ref() } { + println!( + " {}: {:?}, s = 0x{:x}, n = 0x{:x}, p = 0x{:x}", + i, + head_ref.val, + head as usize, + head_ref.next.load(Ordering::Relaxed) as usize, + head_ref.prev.load(Ordering::Relaxed) as usize + ); + + head = head_ref.next.load(Self::R); + + i += 1; + } + }); + } +} + +#[cfg(test)] +impl Link { + fn print(&self) { + cs::with(|_| { + // Make sure all previous writes are visible + core::sync::atomic::fence(Ordering::SeqCst); + + println!("Link:"); + + println!( + " val = {:?}, n = 0x{:x}, p = 0x{:x}", + self.val, + self.next.load(Ordering::Relaxed) as usize, + self.prev.load(Ordering::Relaxed) as usize + ); + }); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn linked_list() { + let wq = LinkedList::::new(); + + let mut i1 = Link::new(10); + let mut i2 = Link::new(11); + let mut i3 = Link::new(12); + let mut i4 = Link::new(13); + let mut i5 = Link::new(14); + + wq.push(unsafe { Pin::new_unchecked(&mut i1) }); + wq.push(unsafe { Pin::new_unchecked(&mut i2) }); + wq.push(unsafe { Pin::new_unchecked(&mut i3) }); + wq.push(unsafe { Pin::new_unchecked(&mut i4) }); + wq.push(unsafe { Pin::new_unchecked(&mut i5) }); + + wq.print(); + + wq.pop(); + i1.print(); + + wq.print(); + + i4.remove_from_list(&wq); + + wq.print(); + + // i1.remove_from_list(&wq); + // wq.print(); + + println!("i2"); + i2.remove_from_list(&wq); + wq.print(); + + println!("i3"); + i3.remove_from_list(&wq); + wq.print(); + + println!("i5"); + i5.remove_from_list(&wq); + wq.print(); + } +} diff --git a/rtic-common/src/waker_registration.rs b/rtic-common/src/waker_registration.rs new file mode 100644 index 0000000..174765c --- /dev/null +++ b/rtic-common/src/waker_registration.rs @@ -0,0 +1,66 @@ +//! Waker registration utility. + +use core::cell::UnsafeCell; +use core::task::Waker; + +/// A critical section based waker handler. +pub struct CriticalSectionWakerRegistration { + waker: UnsafeCell>, +} + +unsafe impl Send for CriticalSectionWakerRegistration {} +unsafe impl Sync for CriticalSectionWakerRegistration {} + +impl CriticalSectionWakerRegistration { + /// Create a new waker registration. + pub const fn new() -> Self { + Self { + waker: UnsafeCell::new(None), + } + } + + /// Register a waker. + /// This will overwrite the previous waker if there was one. + pub fn register(&self, new_waker: &Waker) { + critical_section::with(|_| { + // SAFETY: This access is protected by the critical section. + let self_waker = unsafe { &mut *self.waker.get() }; + + // From embassy + // https://github.com/embassy-rs/embassy/blob/b99533607ceed225dd12ae73aaa9a0d969a7365e/embassy-sync/src/waitqueue/waker.rs#L59-L61 + match self_waker { + // Optimization: If both the old and new Wakers wake the same task, we can simply + // keep the old waker, skipping the clone. (In most executor implementations, + // cloning a waker is somewhat expensive, comparable to cloning an Arc). + Some(ref w2) if (w2.will_wake(new_waker)) => {} + _ => { + // clone the new waker and store it + if let Some(old_waker) = core::mem::replace(self_waker, Some(new_waker.clone())) + { + // We had a waker registered for another task. Wake it, so the other task can + // reregister itself if it's still interested. + // + // If two tasks are waiting on the same thing concurrently, this will cause them + // to wake each other in a loop fighting over this WakerRegistration. This wastes + // CPU but things will still work. + // + // If the user wants to have two tasks waiting on the same thing they should use + // a more appropriate primitive that can store multiple wakers. + old_waker.wake() + } + } + } + }); + } + + /// Wake the waker. + pub fn wake(&self) { + critical_section::with(|_| { + // SAFETY: This access is protected by the critical section. + let self_waker = unsafe { &mut *self.waker.get() }; + if let Some(waker) = self_waker.take() { + waker.wake() + } + }); + } +} -- cgit v1.2.3