grafton_ndi/
waitable_completion.rs1use std::{
8 sync::{
9 atomic::{AtomicBool, Ordering},
10 Condvar, Mutex,
11 },
12 time::Duration,
13};
14
15#[derive(Debug)]
51pub struct WaitableCompletion {
52 completed: AtomicBool,
53 lock: Mutex<()>,
54 cv: Condvar,
55}
56
57impl Clone for WaitableCompletion {
58 fn clone(&self) -> Self {
59 Self {
60 completed: AtomicBool::new(self.completed.load(Ordering::Acquire)),
61 lock: Mutex::new(()),
62 cv: Condvar::new(),
63 }
64 }
65}
66
67impl Default for WaitableCompletion {
68 fn default() -> Self {
69 Self::new()
70 }
71}
72
73impl WaitableCompletion {
74 pub fn new() -> Self {
76 Self {
77 completed: AtomicBool::new(false),
78 lock: Mutex::new(()),
79 cv: Condvar::new(),
80 }
81 }
82
83 pub fn new_completed() -> Self {
87 Self {
88 completed: AtomicBool::new(true),
89 lock: Mutex::new(()),
90 cv: Condvar::new(),
91 }
92 }
93
94 pub fn signal(&self) {
99 self.completed.store(true, Ordering::Release);
100 let _lock = self
101 .lock
102 .lock()
103 .unwrap_or_else(|poisoned| poisoned.into_inner());
104 self.cv.notify_all();
105 }
106
107 pub fn is_complete(&self) -> bool {
111 self.completed.load(Ordering::Acquire)
112 }
113
114 pub fn reset(&self) {
118 self.completed.store(false, Ordering::Release);
119 }
120
121 pub fn wait_timeout(&self, timeout: Duration) -> Result<(), String> {
137 let mut guard = self
138 .lock
139 .lock()
140 .unwrap_or_else(|poisoned| poisoned.into_inner());
141
142 let start = std::time::Instant::now();
143
144 while !self.completed.load(Ordering::Acquire) {
145 let elapsed = start.elapsed();
146 if elapsed >= timeout {
147 return Err(format!(
148 "Async operation did not complete within {timeout:?}"
149 ));
150 }
151
152 let remaining = timeout - elapsed;
153 let wait_result = self.cv.wait_timeout(guard, remaining);
154 match wait_result {
155 Ok((new_guard, timeout_result)) => {
156 guard = new_guard;
157 if timeout_result.timed_out() && !self.completed.load(Ordering::Acquire) {
158 return Err(format!(
159 "Async operation did not complete within {timeout:?}"
160 ));
161 }
162 }
163 Err(poisoned) => {
164 let (new_guard, _) = poisoned.into_inner();
165 guard = new_guard;
166 }
167 }
168 }
169
170 Ok(())
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177 use std::sync::Arc;
178 use std::thread;
179
180 #[test]
181 fn test_new_starts_incomplete() {
182 let wc = WaitableCompletion::new();
183 assert!(!wc.is_complete());
184 }
185
186 #[test]
187 fn test_new_completed_starts_complete() {
188 let wc = WaitableCompletion::new_completed();
189 assert!(wc.is_complete());
190 }
191
192 #[test]
193 fn test_signal_sets_complete() {
194 let wc = WaitableCompletion::new();
195 assert!(!wc.is_complete());
196 wc.signal();
197 assert!(wc.is_complete());
198 }
199
200 #[test]
201 fn test_reset_clears_complete() {
202 let wc = WaitableCompletion::new_completed();
203 assert!(wc.is_complete());
204 wc.reset();
205 assert!(!wc.is_complete());
206 }
207
208 #[test]
209 fn test_signal_before_wait() {
210 let wc = WaitableCompletion::new();
211 wc.signal();
212 let result = wc.wait_timeout(Duration::from_millis(100));
213 assert!(result.is_ok());
214 }
215
216 #[test]
217 fn test_wait_then_signal() {
218 let wc = Arc::new(WaitableCompletion::new());
219 let wc_clone = Arc::clone(&wc);
220
221 let handle = thread::spawn(move || {
222 thread::sleep(Duration::from_millis(10));
223 wc_clone.signal();
224 });
225
226 let result = wc.wait_timeout(Duration::from_secs(1));
227 assert!(result.is_ok());
228 handle.join().unwrap();
229 }
230
231 #[test]
232 fn test_timeout_expires() {
233 let wc = WaitableCompletion::new();
234 let result = wc.wait_timeout(Duration::from_millis(10));
235 assert!(result.is_err());
236 assert!(result.unwrap_err().contains("did not complete"));
237 }
238
239 #[test]
240 fn test_clone_preserves_state() {
241 let wc1 = WaitableCompletion::new();
242 wc1.signal();
243
244 let wc2 = wc1.clone();
245 assert!(wc2.is_complete());
246 }
247
248 #[test]
249 fn test_clone_is_independent() {
250 let wc1 = WaitableCompletion::new();
251 let wc2 = wc1.clone();
252
253 wc1.signal();
254 assert!(wc1.is_complete());
255 assert!(!wc2.is_complete());
256 }
257
258 #[test]
259 fn test_multiple_signals_are_idempotent() {
260 let wc = WaitableCompletion::new();
261 wc.signal();
262 wc.signal();
263 wc.signal();
264 assert!(wc.is_complete());
265 }
266
267 #[test]
268 fn test_concurrent_signal_and_wait() {
269 for _ in 0..100 {
270 let wc = Arc::new(WaitableCompletion::new());
271 let wc_clone = Arc::clone(&wc);
272
273 let signaler = thread::spawn(move || {
274 wc_clone.signal();
275 });
276
277 let result = wc.wait_timeout(Duration::from_secs(1));
278 signaler.join().unwrap();
279
280 assert!(
281 result.is_ok() || wc.is_complete(),
282 "Expected completion but got {:?}",
283 result
284 );
285 }
286 }
287}