Skip to main content

grafton_ndi/
waitable_completion.rs

1//! Zero-overhead completion signal for async operations.
2//!
3//! This module provides a reusable abstraction for waiting on completion events
4//! with timeout support. It encapsulates the atomic flag, mutex, and condvar
5//! pattern used throughout the sender module for async video completion tracking.
6
7use std::{
8    sync::{
9        atomic::{AtomicBool, Ordering},
10        Condvar, Mutex,
11    },
12    time::Duration,
13};
14
15/// A completion signal for synchronizing async operations.
16///
17/// This struct provides a thread-safe mechanism for one thread to signal completion
18/// while another thread waits for that signal with optional timeout. It handles
19/// mutex poisoning gracefully, preferring progress over panic.
20///
21/// # Thread Safety
22///
23/// All methods are safe to call from multiple threads. The implementation uses:
24/// - An atomic boolean for lock-free completion checks
25/// - A mutex + condvar for efficient blocking waits
26/// - Poison recovery to avoid panics in Drop contexts
27///
28/// # Example
29///
30/// ```
31/// use std::time::Duration;
32/// use std::thread;
33/// # use grafton_ndi::waitable_completion::WaitableCompletion;
34///
35/// let completion = WaitableCompletion::new();
36///
37/// // Spawn a thread that signals completion after a delay
38/// let completion_clone = completion.clone();
39/// thread::spawn(move || {
40///     thread::sleep(Duration::from_millis(10));
41///     completion_clone.signal();
42/// });
43///
44/// // Wait for completion with timeout
45/// match completion.wait_timeout(Duration::from_secs(1)) {
46///     Ok(()) => println!("Operation completed"),
47///     Err(e) => println!("Timed out: {e}"),
48/// }
49/// ```
50#[derive(Debug)]
51pub struct WaitableCompletion {
52    completed: AtomicBool,
53    lock: Mutex<()>,
54    cv: Condvar,
55}
56
57impl Clone for WaitableCompletion {
58    fn clone(&self) -> Self {
59        Self {
60            completed: AtomicBool::new(self.completed.load(Ordering::Acquire)),
61            lock: Mutex::new(()),
62            cv: Condvar::new(),
63        }
64    }
65}
66
67impl Default for WaitableCompletion {
68    fn default() -> Self {
69        Self::new()
70    }
71}
72
73impl WaitableCompletion {
74    /// Creates a new completion signal in the incomplete state.
75    pub fn new() -> Self {
76        Self {
77            completed: AtomicBool::new(false),
78            lock: Mutex::new(()),
79            cv: Condvar::new(),
80        }
81    }
82
83    /// Creates a new completion signal in the completed state.
84    ///
85    /// Useful when initializing a sender that has no pending async operations.
86    pub fn new_completed() -> Self {
87        Self {
88            completed: AtomicBool::new(true),
89            lock: Mutex::new(()),
90            cv: Condvar::new(),
91        }
92    }
93
94    /// Signals completion and wakes all waiting threads.
95    ///
96    /// This method is safe to call multiple times; subsequent calls are no-ops
97    /// for the atomic flag but will still notify waiting threads.
98    pub fn signal(&self) {
99        self.completed.store(true, Ordering::Release);
100        let _lock = self
101            .lock
102            .lock()
103            .unwrap_or_else(|poisoned| poisoned.into_inner());
104        self.cv.notify_all();
105    }
106
107    /// Checks if the operation has completed.
108    ///
109    /// This is a non-blocking, lock-free check using atomic load.
110    pub fn is_complete(&self) -> bool {
111        self.completed.load(Ordering::Acquire)
112    }
113
114    /// Resets the completion state to incomplete.
115    ///
116    /// Call this before starting a new async operation to reuse the signal.
117    pub fn reset(&self) {
118        self.completed.store(false, Ordering::Release);
119    }
120
121    /// Waits for completion with timeout, returning a Result.
122    ///
123    /// This method blocks until either:
124    /// - The completion signal is received (returns `Ok(())`)
125    /// - The timeout elapses (returns `Err` with timeout message)
126    ///
127    /// # Poison Recovery
128    ///
129    /// If the mutex is poisoned (e.g., a thread panicked while holding it),
130    /// this method recovers the guard and continues waiting. This ensures
131    /// robust behavior even in exceptional circumstances.
132    ///
133    /// # Errors
134    ///
135    /// Returns an error string if the timeout elapses before completion.
136    pub fn wait_timeout(&self, timeout: Duration) -> Result<(), String> {
137        let mut guard = self
138            .lock
139            .lock()
140            .unwrap_or_else(|poisoned| poisoned.into_inner());
141
142        let start = std::time::Instant::now();
143
144        while !self.completed.load(Ordering::Acquire) {
145            let elapsed = start.elapsed();
146            if elapsed >= timeout {
147                return Err(format!(
148                    "Async operation did not complete within {timeout:?}"
149                ));
150            }
151
152            let remaining = timeout - elapsed;
153            let wait_result = self.cv.wait_timeout(guard, remaining);
154            match wait_result {
155                Ok((new_guard, timeout_result)) => {
156                    guard = new_guard;
157                    if timeout_result.timed_out() && !self.completed.load(Ordering::Acquire) {
158                        return Err(format!(
159                            "Async operation did not complete within {timeout:?}"
160                        ));
161                    }
162                }
163                Err(poisoned) => {
164                    let (new_guard, _) = poisoned.into_inner();
165                    guard = new_guard;
166                }
167            }
168        }
169
170        Ok(())
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177    use std::sync::Arc;
178    use std::thread;
179
180    #[test]
181    fn test_new_starts_incomplete() {
182        let wc = WaitableCompletion::new();
183        assert!(!wc.is_complete());
184    }
185
186    #[test]
187    fn test_new_completed_starts_complete() {
188        let wc = WaitableCompletion::new_completed();
189        assert!(wc.is_complete());
190    }
191
192    #[test]
193    fn test_signal_sets_complete() {
194        let wc = WaitableCompletion::new();
195        assert!(!wc.is_complete());
196        wc.signal();
197        assert!(wc.is_complete());
198    }
199
200    #[test]
201    fn test_reset_clears_complete() {
202        let wc = WaitableCompletion::new_completed();
203        assert!(wc.is_complete());
204        wc.reset();
205        assert!(!wc.is_complete());
206    }
207
208    #[test]
209    fn test_signal_before_wait() {
210        let wc = WaitableCompletion::new();
211        wc.signal();
212        let result = wc.wait_timeout(Duration::from_millis(100));
213        assert!(result.is_ok());
214    }
215
216    #[test]
217    fn test_wait_then_signal() {
218        let wc = Arc::new(WaitableCompletion::new());
219        let wc_clone = Arc::clone(&wc);
220
221        let handle = thread::spawn(move || {
222            thread::sleep(Duration::from_millis(10));
223            wc_clone.signal();
224        });
225
226        let result = wc.wait_timeout(Duration::from_secs(1));
227        assert!(result.is_ok());
228        handle.join().unwrap();
229    }
230
231    #[test]
232    fn test_timeout_expires() {
233        let wc = WaitableCompletion::new();
234        let result = wc.wait_timeout(Duration::from_millis(10));
235        assert!(result.is_err());
236        assert!(result.unwrap_err().contains("did not complete"));
237    }
238
239    #[test]
240    fn test_clone_preserves_state() {
241        let wc1 = WaitableCompletion::new();
242        wc1.signal();
243
244        let wc2 = wc1.clone();
245        assert!(wc2.is_complete());
246    }
247
248    #[test]
249    fn test_clone_is_independent() {
250        let wc1 = WaitableCompletion::new();
251        let wc2 = wc1.clone();
252
253        wc1.signal();
254        assert!(wc1.is_complete());
255        assert!(!wc2.is_complete());
256    }
257
258    #[test]
259    fn test_multiple_signals_are_idempotent() {
260        let wc = WaitableCompletion::new();
261        wc.signal();
262        wc.signal();
263        wc.signal();
264        assert!(wc.is_complete());
265    }
266
267    #[test]
268    fn test_concurrent_signal_and_wait() {
269        for _ in 0..100 {
270            let wc = Arc::new(WaitableCompletion::new());
271            let wc_clone = Arc::clone(&wc);
272
273            let signaler = thread::spawn(move || {
274                wc_clone.signal();
275            });
276
277            let result = wc.wait_timeout(Duration::from_secs(1));
278            signaler.join().unwrap();
279
280            assert!(
281                result.is_ok() || wc.is_complete(),
282                "Expected completion but got {:?}",
283                result
284            );
285        }
286    }
287}