aboutsummaryrefslogtreecommitdiff
path: root/rtic-arbiter/src/lib.rs
diff options
context:
space:
mode:
authorEmil Fresk <emil.fresk@gmail.com>2023-01-30 14:49:24 +0100
committerHenrik Tjäder <henrik@tjaders.com>2023-03-01 00:33:38 +0100
commit5c1cefbf4e249c38467e3f6eb4e061e5b8073d6c (patch)
treec0e8ab5b535ec4a192b9f6e83f29da8409c7016a /rtic-arbiter/src/lib.rs
parente65e532c2a342f77080ac6fc8e5be11aa7d82575 (diff)
Add `rtic-arbiter`
Diffstat (limited to 'rtic-arbiter/src/lib.rs')
-rw-r--r--rtic-arbiter/src/lib.rs175
1 files changed, 175 insertions, 0 deletions
diff --git a/rtic-arbiter/src/lib.rs b/rtic-arbiter/src/lib.rs
new file mode 100644
index 0000000..487c64c
--- /dev/null
+++ b/rtic-arbiter/src/lib.rs
@@ -0,0 +1,175 @@
+//! Crate
+
+#![no_std]
+#![deny(missing_docs)]
+//deny_warnings_placeholder_for_ci
+
+use core::cell::UnsafeCell;
+use core::future::poll_fn;
+use core::ops::{Deref, DerefMut};
+use core::pin::Pin;
+use core::sync::atomic::{fence, AtomicBool, Ordering};
+use core::task::{Poll, Waker};
+
+use rtic_common::dropper::OnDrop;
+use rtic_common::wait_queue::{Link, WaitQueue};
+
+/// This is needed to make the async closure in `send` accept that we "share"
+/// the link possible between threads.
+#[derive(Clone)]
+struct LinkPtr(*mut Option<Link<Waker>>);
+
+impl LinkPtr {
+ /// This will dereference the pointer stored within and give out an `&mut`.
+ unsafe fn get(&mut self) -> &mut Option<Link<Waker>> {
+ &mut *self.0
+ }
+}
+
+unsafe impl Send for LinkPtr {}
+unsafe impl Sync for LinkPtr {}
+
+/// An FIFO waitqueue for use in shared bus usecases.
+pub struct Arbiter<T> {
+ wait_queue: WaitQueue,
+ inner: UnsafeCell<T>,
+ taken: AtomicBool,
+}
+
+impl<T> Arbiter<T> {
+ /// Create a new arbiter.
+ pub const fn new(inner: T) -> Self {
+ Self {
+ wait_queue: WaitQueue::new(),
+ inner: UnsafeCell::new(inner),
+ taken: AtomicBool::new(false),
+ }
+ }
+
+ /// Get access to the inner value in the `Arbiter`. This will wait until access is granted,
+ /// for non-blocking access use `try_access`.
+ pub async fn access(&self) -> ExclusiveAccess<'_, T> {
+ let mut link_ptr: Option<Link<Waker>> = None;
+
+ // Make this future `Drop`-safe, also shadow the original definition so we can't abuse it.
+ let mut link_ptr = LinkPtr(&mut link_ptr as *mut Option<Link<Waker>>);
+
+ let mut link_ptr2 = link_ptr.clone();
+ let dropper = OnDrop::new(|| {
+ // SAFETY: We only run this closure and dereference the pointer if we have
+ // exited the `poll_fn` below in the `drop(dropper)` call. The other dereference
+ // of this pointer is in the `poll_fn`.
+ if let Some(link) = unsafe { link_ptr2.get() } {
+ link.remove_from_list(&self.wait_queue);
+ }
+ });
+
+ poll_fn(|cx| {
+ critical_section::with(|_| {
+ fence(Ordering::SeqCst);
+
+ // The queue is empty and noone has taken the value.
+ if self.wait_queue.is_empty() && !self.taken.load(Ordering::Relaxed) {
+ self.taken.store(true, Ordering::Relaxed);
+
+ return Poll::Ready(());
+ }
+
+ // SAFETY: This pointer is only dereferenced here and on drop of the future
+ // which happens outside this `poll_fn`'s stack frame.
+ let link = unsafe { link_ptr.get() };
+ if let Some(link) = link {
+ if link.is_poped() {
+ return Poll::Ready(());
+ }
+ } else {
+ // Place the link in the wait queue on first run.
+ let link_ref = link.insert(Link::new(cx.waker().clone()));
+
+ // SAFETY: The address to the link is stable as it is hidden behind
+ // `link_ptr`, and `link_ptr` shadows the original making it unmovable.
+ self.wait_queue
+ .push(unsafe { Pin::new_unchecked(link_ref) });
+ }
+
+ Poll::Pending
+ })
+ })
+ .await;
+
+ // Make sure the link is removed from the queue.
+ drop(dropper);
+
+ // SAFETY: One only gets here if there is exlusive access.
+ ExclusiveAccess {
+ arbiter: self,
+ inner: unsafe { &mut *self.inner.get() },
+ }
+ }
+
+ /// Non-blockingly tries to access the underlying value.
+ /// If someone is in queue to get it, this will return `None`.
+ pub fn try_access(&self) -> Option<ExclusiveAccess<'_, T>> {
+ critical_section::with(|_| {
+ fence(Ordering::SeqCst);
+
+ // The queue is empty and noone has taken the value.
+ if self.wait_queue.is_empty() && !self.taken.load(Ordering::Relaxed) {
+ self.taken.store(true, Ordering::Relaxed);
+
+ // SAFETY: One only gets here if there is exlusive access.
+ Some(ExclusiveAccess {
+ arbiter: self,
+ inner: unsafe { &mut *self.inner.get() },
+ })
+ } else {
+ None
+ }
+ })
+ }
+}
+
+/// This token represents exclusive access to the value protected by the `Arbiter`.
+pub struct ExclusiveAccess<'a, T> {
+ arbiter: &'a Arbiter<T>,
+ inner: &'a mut T,
+}
+
+impl<'a, T> Drop for ExclusiveAccess<'a, T> {
+ fn drop(&mut self) {
+ critical_section::with(|_| {
+ fence(Ordering::SeqCst);
+
+ if self.arbiter.wait_queue.is_empty() {
+ // If noone is in queue and we release exclusive access, reset `taken`.
+ self.arbiter.taken.store(false, Ordering::Relaxed);
+ } else if let Some(next) = self.arbiter.wait_queue.pop() {
+ // Wake the next one in queue.
+ next.wake();
+ }
+ })
+ }
+}
+
+impl<'a, T> Deref for ExclusiveAccess<'a, T> {
+ type Target = T;
+
+ fn deref(&self) -> &Self::Target {
+ self.inner
+ }
+}
+
+impl<'a, T> DerefMut for ExclusiveAccess<'a, T> {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ self.inner
+ }
+}
+
+#[cfg(test)]
+#[macro_use]
+extern crate std;
+
+#[cfg(test)]
+mod tests {
+ // use super::*;
+}