aboutsummaryrefslogtreecommitdiff
path: root/rtic-sync/src
diff options
context:
space:
mode:
authorAdinAck <adinackerman@gmail.com>2024-06-19 11:52:38 -0700
committerGitHub <noreply@github.com>2024-06-19 18:52:38 +0000
commitd516d9a214152e7bcecbc14bb3cbd22819039c3e (patch)
tree6368ae641333030c29117f21185c4c2147373877 /rtic-sync/src
parent689c4a068eddfe32956c1975cdc241b26d1751da (diff)
Add Signal to rtic-sync (#934)
* add signal to rtic-sync * woops update changelog * remove example, too comlicated for a doc TODO: add example to rtic-examples repo * fix @korken89's issues * ...remove fence * fix clippy warnings * add tests
Diffstat (limited to 'rtic-sync/src')
-rw-r--r--rtic-sync/src/lib.rs1
-rw-r--r--rtic-sync/src/signal.rs195
2 files changed, 196 insertions, 0 deletions
diff --git a/rtic-sync/src/lib.rs b/rtic-sync/src/lib.rs
index 90afff6..f884588 100644
--- a/rtic-sync/src/lib.rs
+++ b/rtic-sync/src/lib.rs
@@ -9,6 +9,7 @@ use defmt_03 as defmt;
pub mod arbiter;
pub mod channel;
pub use portable_atomic;
+pub mod signal;
#[cfg(test)]
#[macro_use]
diff --git a/rtic-sync/src/signal.rs b/rtic-sync/src/signal.rs
new file mode 100644
index 0000000..0a26b4c
--- /dev/null
+++ b/rtic-sync/src/signal.rs
@@ -0,0 +1,195 @@
+//! A "latest only" value store with unlimited writers and async waiting.
+
+use core::{cell::UnsafeCell, future::poll_fn, task::Poll};
+use rtic_common::waker_registration::CriticalSectionWakerRegistration;
+
+/// Basically an Option but for indicating
+/// whether the store has been set or not
+#[derive(Clone, Copy)]
+enum Store<T> {
+ Set(T),
+ Unset,
+}
+
+/// A "latest only" value store with unlimited writers and async waiting.
+pub struct Signal<T: Copy> {
+ waker: CriticalSectionWakerRegistration,
+ store: UnsafeCell<Store<T>>,
+}
+
+unsafe impl<T: Copy> Send for Signal<T> {}
+unsafe impl<T: Copy> Sync for Signal<T> {}
+
+impl<T: Copy> Signal<T> {
+ /// Create a new signal.
+ pub const fn new() -> Self {
+ Self {
+ waker: CriticalSectionWakerRegistration::new(),
+ store: UnsafeCell::new(Store::Unset),
+ }
+ }
+
+ /// Split the signal into a writer and reader.
+ pub fn split(&self) -> (SignalWriter<T>, SignalReader<T>) {
+ (SignalWriter { parent: self }, SignalReader { parent: self })
+ }
+}
+
+/// Fascilitates the writing of values to a Signal.
+#[derive(Clone)]
+pub struct SignalWriter<'a, T: Copy> {
+ parent: &'a Signal<T>,
+}
+
+impl<'a, T: Copy> SignalWriter<'a, T> {
+ /// Write a raw Store value to the Signal.
+ fn write_inner(&mut self, value: Store<T>) {
+ critical_section::with(|_| {
+ // SAFETY: in a cs: exclusive access
+ unsafe { self.parent.store.get().replace(value) };
+ });
+
+ self.parent.waker.wake();
+ }
+
+ /// Write a value to the Signal.
+ pub fn write(&mut self, value: T) {
+ self.write_inner(Store::Set(value));
+ }
+
+ /// Clear the stored value in the Signal (if any).
+ pub fn clear(&mut self) {
+ self.write_inner(Store::Unset);
+ }
+}
+
+/// Fascilitates the async reading of values from the Signal.
+pub struct SignalReader<'a, T: Copy> {
+ parent: &'a Signal<T>,
+}
+
+impl<'a, T: Copy> SignalReader<'a, T> {
+ /// Immediately read and evict the latest value stored in the Signal.
+ fn take(&mut self) -> Store<T> {
+ critical_section::with(|_| {
+ // SAFETY: in a cs: exclusive access
+ unsafe { self.parent.store.get().replace(Store::Unset) }
+ })
+ }
+
+ /// Returns a pending value if present, or None if no value is available.
+ ///
+ /// Upon read, the stored value is evicted.
+ pub fn try_read(&mut self) -> Option<T> {
+ match self.take() {
+ Store::Unset => None,
+ Store::Set(value) => Some(value),
+ }
+ }
+
+ /// Wait for a new value to be written and read it.
+ ///
+ /// If a value is already pending it will be returned immediately.
+ ///
+ /// Upon read, the stored value is evicted.
+ pub async fn wait(&mut self) -> T {
+ poll_fn(|ctx| {
+ self.parent.waker.register(ctx.waker());
+ match self.take() {
+ Store::Unset => Poll::Pending,
+ Store::Set(value) => Poll::Ready(value),
+ }
+ })
+ .await
+ }
+
+ /// Wait for a new value to be written and read it.
+ ///
+ /// If a value is already pending, it will be evicted and a new
+ /// value must be written for the wait to resolve.
+ ///
+ /// Upon read, the stored value is evicted.
+ pub async fn wait_fresh(&mut self) -> T {
+ self.take();
+ self.wait().await
+ }
+}
+
+/// Convenience macro for creating a Signal.
+#[macro_export]
+macro_rules! make_signal {
+ ( $T:ty ) => {{
+ static SIGNAL: Signal<$T> = Signal::new();
+
+ SIGNAL.split()
+ }};
+}
+
+#[cfg(test)]
+mod tests {
+ use static_cell::StaticCell;
+
+ use super::*;
+
+ #[test]
+ fn empty() {
+ let (_writer, mut reader) = make_signal!(u32);
+
+ assert!(reader.try_read().is_none());
+ }
+
+ #[test]
+ fn ping_pong() {
+ let (mut writer, mut reader) = make_signal!(u32);
+
+ writer.write(0xde);
+ assert!(reader.try_read().is_some_and(|value| value == 0xde));
+ }
+
+ #[test]
+ fn latest() {
+ let (mut writer, mut reader) = make_signal!(u32);
+
+ writer.write(0xde);
+ writer.write(0xad);
+ writer.write(0xbe);
+ writer.write(0xef);
+ assert!(reader.try_read().is_some_and(|value| value == 0xef));
+ }
+
+ #[test]
+ fn consumption() {
+ let (mut writer, mut reader) = make_signal!(u32);
+
+ writer.write(0xaa);
+ assert!(reader.try_read().is_some_and(|value| value == 0xaa));
+ assert!(reader.try_read().is_none());
+ }
+
+ #[tokio::test]
+ async fn pending() {
+ let (mut writer, mut reader) = make_signal!(u32);
+
+ writer.write(0xaa);
+
+ assert_eq!(reader.wait().await, 0xaa);
+ }
+
+ #[tokio::test]
+ async fn waiting() {
+ static READER: StaticCell<SignalReader<u32>> = StaticCell::new();
+ let (mut writer, reader) = make_signal!(u32);
+
+ writer.write(0xaa);
+
+ let reader = READER.init(reader);
+ let handle = tokio::spawn(reader.wait_fresh());
+
+ tokio::task::yield_now().await; // encourage tokio executor to poll reader future
+ assert!(!handle.is_finished()); // verify reader future did not resolve after poll
+
+ writer.write(0xab);
+
+ assert!(handle.await.is_ok_and(|value| value == 0xab));
+ }
+}