Skip to main content

grafton_ndi/
runtime.rs

1//! NDI runtime management and initialization.
2
3use once_cell::sync::Lazy;
4
5use std::sync::{Condvar, Mutex, MutexGuard};
6
7use crate::{ndi_lib::*, Error, Result};
8
9/// Runtime lifecycle phase.
10///
11/// Transitions:
12/// - `Uninitialized` → `Initializing` → `Running` (on success)
13/// - `Uninitialized` → `Initializing` → `Failed` (on failure)
14/// - `Running` → `Destroying` → `Uninitialized` (on last release)
15/// - `Failed` → `Initializing` (retry allowed)
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17enum Phase {
18    /// Runtime not initialized. Ready for initialization.
19    Uninitialized,
20    /// Initialization in progress. Other threads should wait.
21    Initializing,
22    /// Runtime is active with one or more live handles.
23    Running,
24    /// Destruction in progress. Other threads should wait.
25    Destroying,
26    /// Last initialization attempt failed. Retry is allowed.
27    Failed,
28}
29
30/// Internal state protected by the mutex.
31struct RuntimeState {
32    phase: Phase,
33    refcount: usize,
34}
35
36impl RuntimeState {
37    const fn new() -> Self {
38        Self {
39            phase: Phase::Uninitialized,
40            refcount: 0,
41        }
42    }
43}
44
45/// Process-global runtime manager for NDI.
46///
47/// This implementation uses a `Mutex` + `Condvar` state machine that:
48/// - Allows re-initialization after teardown
49/// - Allows retry after initialization failure
50/// - Avoids spin loops by using `Condvar` waits
51/// - Maintains the invariant: `NDI::new()` returns `Ok` only when runtime is initialized
52struct RuntimeManager {
53    state: Mutex<RuntimeState>,
54    cv: Condvar,
55}
56
57impl RuntimeManager {
58    const fn new() -> Self {
59        Self {
60            state: Mutex::new(RuntimeState::new()),
61            cv: Condvar::new(),
62        }
63    }
64
65    /// Recover from mutex poisoning, preferring progress over panic.
66    fn recover_guard<'a>(
67        result: std::sync::LockResult<MutexGuard<'a, RuntimeState>>,
68    ) -> MutexGuard<'a, RuntimeState> {
69        result.unwrap_or_else(|poisoned| poisoned.into_inner())
70    }
71
72    fn acquire(&self) -> Result<()> {
73        let mut guard = Self::recover_guard(self.state.lock());
74
75        loop {
76            match guard.phase {
77                Phase::Uninitialized | Phase::Failed => {
78                    // We're responsible for initialization
79                    guard.phase = Phase::Initializing;
80                    drop(guard);
81
82                    // Call NDIlib_initialize outside the lock
83                    #[cfg(all(target_os = "windows", debug_assertions))]
84                    {
85                        if std::env::var("CI").is_ok() {
86                            eprintln!("[NDI] Initializing NDI runtime in CI environment...");
87                            if let Ok(sdk_dir) = std::env::var("NDI_SDK_DIR") {
88                                eprintln!("[NDI] NDI_SDK_DIR: {}", sdk_dir);
89                            }
90                        }
91                    }
92
93                    let succeeded = unsafe { NDIlib_initialize() };
94
95                    // Re-acquire lock to update state
96                    guard = Self::recover_guard(self.state.lock());
97
98                    if succeeded {
99                        guard.phase = Phase::Running;
100                        guard.refcount = 1;
101                        self.cv.notify_all();
102                        return Ok(());
103                    } else {
104                        guard.phase = Phase::Failed;
105                        self.cv.notify_all();
106                        return Err(Error::InitializationFailed(
107                            "NDIlib_initialize failed".into(),
108                        ));
109                    }
110                }
111
112                Phase::Initializing | Phase::Destroying => {
113                    // Wait for state transition to complete
114                    guard = Self::recover_guard(self.cv.wait(guard));
115                    // Loop again to check the new state
116                }
117
118                Phase::Running => {
119                    // Runtime is already initialized, just increment refcount
120                    guard.refcount += 1;
121                    return Ok(());
122                }
123            }
124        }
125    }
126
127    fn release(&self) {
128        let mut guard = Self::recover_guard(self.state.lock());
129
130        debug_assert!(
131            guard.refcount > 0,
132            "release() called when refcount was already 0 (double-free or unbalanced release)"
133        );
134        debug_assert!(
135            guard.phase == Phase::Running,
136            "release() called when phase was {:?}, expected Running",
137            guard.phase
138        );
139
140        guard.refcount -= 1;
141
142        if guard.refcount == 0 {
143            // Last reference - destroy the runtime
144            guard.phase = Phase::Destroying;
145            drop(guard);
146
147            // Call NDIlib_destroy outside the lock
148            unsafe { NDIlib_destroy() };
149
150            // Re-acquire lock to reset state
151            let mut guard = Self::recover_guard(self.state.lock());
152            guard.phase = Phase::Uninitialized;
153            self.cv.notify_all();
154        }
155    }
156
157    fn is_running(&self) -> bool {
158        let guard = Self::recover_guard(self.state.lock());
159        guard.phase == Phase::Running && guard.refcount > 0
160    }
161}
162
163static RUNTIME: Lazy<RuntimeManager> = Lazy::new(RuntimeManager::new);
164
165/// Manages the NDI runtime lifecycle.
166///
167/// The `NDI` struct is the entry point for all NDI operations. It ensures the NDI
168/// runtime is properly initialized and cleaned up. Multiple instances can exist
169/// simultaneously - they share the same underlying runtime through reference counting.
170///
171/// # Examples
172///
173/// ```no_run
174/// use grafton_ndi::NDI;
175///
176/// # fn main() -> Result<(), grafton_ndi::Error> {
177/// // Create an NDI instance
178/// let ndi = NDI::new()?;
179///
180/// // The runtime stays alive as long as any NDI instance exists
181/// let ndi2 = ndi.clone(); // Cheap reference-counted clone
182///
183/// // Runtime is automatically cleaned up when all instances are dropped
184/// # Ok(())
185/// # }
186/// ```
187#[derive(Debug)]
188pub struct NDI;
189
190impl NDI {
191    /// Creates a new NDI instance.
192    ///
193    /// This method is the single entry point for creating NDI instances. It is thread-safe
194    /// and can be called from multiple threads. The first call initializes the NDI runtime,
195    /// subsequent calls increment a reference count. When the last instance is dropped, the
196    /// runtime is automatically destroyed.
197    ///
198    /// # Errors
199    ///
200    /// Returns [`Error::InitializationFailed`] if the NDI SDK fails to initialize.
201    ///
202    /// # Examples
203    ///
204    /// ```
205    /// # use grafton_ndi::NDI;
206    /// # fn main() -> Result<(), grafton_ndi::Error> {
207    /// let ndi = NDI::new()?;
208    /// // Use NDI operations...
209    /// # Ok(())
210    /// # }
211    /// ```
212    pub fn new() -> Result<Self> {
213        RUNTIME.acquire()?;
214        Ok(Self)
215    }
216
217    /// Checks if the current CPU is supported by the NDI SDK.
218    ///
219    /// The NDI SDK requires certain CPU features (e.g., SSE4.2 on x86_64).
220    ///
221    /// # Examples
222    ///
223    /// ```
224    /// if grafton_ndi::NDI::is_supported_cpu() {
225    ///     println!("CPU is supported by NDI");
226    /// } else {
227    ///     eprintln!("CPU lacks required features for NDI");
228    /// }
229    /// ```
230    pub fn is_supported_cpu() -> bool {
231        unsafe { NDIlib_is_supported_CPU() }
232    }
233
234    /// Returns the version string of the NDI runtime.
235    ///
236    /// # Errors
237    ///
238    /// Returns an error if the version string cannot be retrieved or contains
239    /// invalid UTF-8.
240    ///
241    /// # Examples
242    ///
243    /// ```no_run
244    /// # use grafton_ndi::NDI;
245    /// # fn main() -> Result<(), grafton_ndi::Error> {
246    /// match NDI::version() {
247    ///     Ok(version) => println!("NDI version: {}", version),
248    ///     Err(e) => eprintln!("Failed to get version: {}", e),
249    /// }
250    /// # Ok(())
251    /// # }
252    /// ```
253    pub fn version() -> Result<String> {
254        unsafe {
255            let version_ptr = NDIlib_version();
256            if version_ptr.is_null() {
257                return Err(Error::NullPointer("NDIlib_version".into()));
258            }
259            let c_str = std::ffi::CStr::from_ptr(version_ptr);
260            c_str
261                .to_str()
262                .map(|s| s.to_owned())
263                .map_err(|e| Error::InvalidUtf8(e.to_string()))
264        }
265    }
266    /// Checks if the NDI runtime is currently initialized.
267    ///
268    /// This can be useful for diagnostic purposes or conditional initialization.
269    ///
270    /// # Examples
271    ///
272    /// ```
273    /// if grafton_ndi::NDI::is_running() {
274    ///     println!("NDI runtime is active");
275    /// }
276    /// ```
277    pub fn is_running() -> bool {
278        RUNTIME.is_running()
279    }
280}
281
282impl Clone for NDI {
283    fn clone(&self) -> Self {
284        RUNTIME
285            .acquire()
286            .expect("Runtime should be initialized when cloning existing NDI handle");
287        Self
288    }
289}
290
291impl Drop for NDI {
292    fn drop(&mut self) {
293        RUNTIME.release();
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
301    use std::sync::Arc;
302    use std::thread;
303    use std::time::Duration;
304
305    /// A testable runtime manager that uses mock init/destroy functions.
306    /// This allows testing lifecycle invariants without the real NDI SDK.
307    struct TestableRuntimeManager {
308        state: Mutex<RuntimeState>,
309        cv: Condvar,
310        init_count: AtomicUsize,
311        destroy_count: AtomicUsize,
312        init_should_fail: AtomicBool,
313        init_delay_ms: AtomicUsize,
314        destroy_delay_ms: AtomicUsize,
315    }
316
317    impl TestableRuntimeManager {
318        fn new() -> Self {
319            Self {
320                state: Mutex::new(RuntimeState::new()),
321                cv: Condvar::new(),
322                init_count: AtomicUsize::new(0),
323                destroy_count: AtomicUsize::new(0),
324                init_should_fail: AtomicBool::new(false),
325                init_delay_ms: AtomicUsize::new(0),
326                destroy_delay_ms: AtomicUsize::new(0),
327            }
328        }
329
330        fn recover_guard<'a>(
331            result: std::sync::LockResult<MutexGuard<'a, RuntimeState>>,
332        ) -> MutexGuard<'a, RuntimeState> {
333            result.unwrap_or_else(|poisoned| poisoned.into_inner())
334        }
335
336        fn mock_initialize(&self) -> bool {
337            let delay = self.init_delay_ms.load(Ordering::Acquire);
338            if delay > 0 {
339                thread::sleep(Duration::from_millis(delay as u64));
340            }
341            self.init_count.fetch_add(1, Ordering::AcqRel);
342            !self.init_should_fail.load(Ordering::Acquire)
343        }
344
345        fn mock_destroy(&self) {
346            let delay = self.destroy_delay_ms.load(Ordering::Acquire);
347            if delay > 0 {
348                thread::sleep(Duration::from_millis(delay as u64));
349            }
350            self.destroy_count.fetch_add(1, Ordering::AcqRel);
351        }
352
353        fn acquire(&self) -> Result<()> {
354            let mut guard = Self::recover_guard(self.state.lock());
355
356            loop {
357                match guard.phase {
358                    Phase::Uninitialized | Phase::Failed => {
359                        guard.phase = Phase::Initializing;
360                        drop(guard);
361
362                        let succeeded = self.mock_initialize();
363
364                        guard = Self::recover_guard(self.state.lock());
365
366                        if succeeded {
367                            guard.phase = Phase::Running;
368                            guard.refcount = 1;
369                            self.cv.notify_all();
370                            return Ok(());
371                        } else {
372                            guard.phase = Phase::Failed;
373                            self.cv.notify_all();
374                            return Err(Error::InitializationFailed(
375                                "Mock NDIlib_initialize failed".into(),
376                            ));
377                        }
378                    }
379
380                    Phase::Initializing | Phase::Destroying => {
381                        guard = Self::recover_guard(self.cv.wait(guard));
382                    }
383
384                    Phase::Running => {
385                        guard.refcount += 1;
386                        return Ok(());
387                    }
388                }
389            }
390        }
391
392        fn release(&self) {
393            let mut guard = Self::recover_guard(self.state.lock());
394
395            assert!(guard.refcount > 0, "release() called with refcount 0");
396            assert!(
397                guard.phase == Phase::Running,
398                "release() called in phase {:?}",
399                guard.phase
400            );
401
402            guard.refcount -= 1;
403
404            if guard.refcount == 0 {
405                guard.phase = Phase::Destroying;
406                drop(guard);
407
408                self.mock_destroy();
409
410                let mut guard = Self::recover_guard(self.state.lock());
411                guard.phase = Phase::Uninitialized;
412                self.cv.notify_all();
413            }
414        }
415
416        fn is_running(&self) -> bool {
417            let guard = Self::recover_guard(self.state.lock());
418            guard.phase == Phase::Running && guard.refcount > 0
419        }
420
421        fn phase(&self) -> Phase {
422            let guard = Self::recover_guard(self.state.lock());
423            guard.phase
424        }
425
426        fn refcount(&self) -> usize {
427            let guard = Self::recover_guard(self.state.lock());
428            guard.refcount
429        }
430    }
431
432    // ========== Lifecycle Invariant Tests ==========
433
434    #[test]
435    fn test_reinit_after_teardown() {
436        // Issue requirement: create NDI, drop all, create NDI again
437        // => init called twice, destroy called twice, both NDI::new() return Ok
438        let manager = Arc::new(TestableRuntimeManager::new());
439
440        // First cycle
441        manager.acquire().expect("First init should succeed");
442        assert_eq!(manager.init_count.load(Ordering::Acquire), 1);
443        assert!(manager.is_running());
444
445        manager.release();
446        assert_eq!(manager.destroy_count.load(Ordering::Acquire), 1);
447        assert!(!manager.is_running());
448        assert_eq!(manager.phase(), Phase::Uninitialized);
449
450        // Second cycle - must re-initialize
451        manager.acquire().expect("Second init should succeed");
452        assert_eq!(manager.init_count.load(Ordering::Acquire), 2);
453        assert!(manager.is_running());
454
455        manager.release();
456        assert_eq!(manager.destroy_count.load(Ordering::Acquire), 2);
457        assert!(!manager.is_running());
458    }
459
460    #[test]
461    fn test_init_failure_retry() {
462        // Issue requirement: first init fails => error, next call succeeds => Ok
463        let manager = Arc::new(TestableRuntimeManager::new());
464
465        // Configure first init to fail
466        manager.init_should_fail.store(true, Ordering::Release);
467
468        let result = manager.acquire();
469        assert!(result.is_err());
470        assert_eq!(manager.init_count.load(Ordering::Acquire), 1);
471        assert_eq!(manager.phase(), Phase::Failed);
472        assert!(!manager.is_running());
473
474        // Configure next init to succeed
475        manager.init_should_fail.store(false, Ordering::Release);
476
477        let result = manager.acquire();
478        assert!(result.is_ok());
479        assert_eq!(manager.init_count.load(Ordering::Acquire), 2);
480        assert!(manager.is_running());
481
482        manager.release();
483    }
484
485    #[test]
486    fn test_no_ok_while_uninitialized() {
487        // Issue requirement: cannot get Ok from acquire without init in that cycle
488        let manager = Arc::new(TestableRuntimeManager::new());
489
490        // Verify initial state
491        assert_eq!(manager.phase(), Phase::Uninitialized);
492        assert!(!manager.is_running());
493        assert_eq!(manager.init_count.load(Ordering::Acquire), 0);
494
495        // Acquire must call init
496        manager.acquire().expect("Should succeed");
497        assert_eq!(manager.init_count.load(Ordering::Acquire), 1);
498
499        // Clone/acquire with running does NOT call init
500        manager.acquire().expect("Clone should succeed");
501        assert_eq!(manager.init_count.load(Ordering::Acquire), 1);
502        assert_eq!(manager.refcount(), 2);
503
504        // Release both
505        manager.release();
506        manager.release();
507
508        // After teardown, next acquire MUST call init
509        assert_eq!(manager.phase(), Phase::Uninitialized);
510        manager.acquire().expect("Re-init should succeed");
511        assert_eq!(manager.init_count.load(Ordering::Acquire), 2);
512
513        manager.release();
514    }
515
516    #[test]
517    fn test_destroy_called_exactly_once_per_cycle() {
518        // Issue requirement: destroy called exactly once per successful init cycle
519        let manager = Arc::new(TestableRuntimeManager::new());
520
521        // First cycle with multiple refs
522        manager.acquire().expect("Init");
523        manager.acquire().expect("Clone 1");
524        manager.acquire().expect("Clone 2");
525        assert_eq!(manager.refcount(), 3);
526        assert_eq!(manager.destroy_count.load(Ordering::Acquire), 0);
527
528        manager.release();
529        assert_eq!(manager.destroy_count.load(Ordering::Acquire), 0);
530        manager.release();
531        assert_eq!(manager.destroy_count.load(Ordering::Acquire), 0);
532        manager.release(); // Last one
533        assert_eq!(manager.destroy_count.load(Ordering::Acquire), 1);
534
535        // Second cycle
536        manager.acquire().expect("Re-init");
537        manager.acquire().expect("Clone");
538        assert_eq!(manager.destroy_count.load(Ordering::Acquire), 1);
539
540        manager.release();
541        manager.release();
542        assert_eq!(manager.destroy_count.load(Ordering::Acquire), 2);
543    }
544
545    // ========== Concurrency Stress Tests ==========
546
547    #[test]
548    fn test_concurrent_acquire_single_init() {
549        // Issue requirement: concurrent NDI::new() calls result in at most one init per cycle
550        let manager = Arc::new(TestableRuntimeManager::new());
551
552        // Add a small delay to init to increase chance of race
553        manager.init_delay_ms.store(5, Ordering::Release);
554
555        let handles: Vec<_> = (0..10)
556            .map(|_| {
557                let mgr = Arc::clone(&manager);
558                thread::spawn(move || mgr.acquire())
559            })
560            .collect();
561
562        let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
563
564        // All should succeed
565        for result in &results {
566            assert!(result.is_ok());
567        }
568
569        // Only one init call should have happened
570        assert_eq!(manager.init_count.load(Ordering::Acquire), 1);
571        assert_eq!(manager.refcount(), 10);
572
573        // Cleanup
574        for _ in 0..10 {
575            manager.release();
576        }
577    }
578
579    #[test]
580    fn test_concurrent_acquire_during_destroy() {
581        // Issue requirement: callers block during destroy, then succeed after
582        let manager = Arc::new(TestableRuntimeManager::new());
583
584        // Initialize
585        manager.acquire().expect("Init");
586        assert!(manager.is_running());
587
588        // Add delay to destroy
589        manager.destroy_delay_ms.store(50, Ordering::Release);
590
591        let mgr_clone = Arc::clone(&manager);
592
593        // Start a thread that will try to acquire while destroy is in progress
594        let acquirer = thread::spawn(move || {
595            // Wait a bit to ensure destroy starts
596            thread::sleep(Duration::from_millis(10));
597            mgr_clone.acquire()
598        });
599
600        // Trigger destroy
601        manager.release();
602
603        // The acquirer should have blocked and then succeeded
604        let result = acquirer.join().unwrap();
605        assert!(result.is_ok());
606
607        // A new init cycle should have started
608        assert_eq!(manager.init_count.load(Ordering::Acquire), 2);
609        assert_eq!(manager.destroy_count.load(Ordering::Acquire), 1);
610
611        manager.release();
612    }
613
614    #[test]
615    fn test_mixed_acquire_release_concurrent() {
616        // Stress test with mixed operations
617        let manager = Arc::new(TestableRuntimeManager::new());
618        let success_count = Arc::new(AtomicUsize::new(0));
619        let failure_count = Arc::new(AtomicUsize::new(0));
620
621        let handles: Vec<_> = (0..20)
622            .map(|i| {
623                let mgr = Arc::clone(&manager);
624                let success = Arc::clone(&success_count);
625                let failure = Arc::clone(&failure_count);
626
627                thread::spawn(move || {
628                    for _ in 0..10 {
629                        match mgr.acquire() {
630                            Ok(()) => {
631                                success.fetch_add(1, Ordering::Relaxed);
632                                // Hold for a bit
633                                if i % 3 == 0 {
634                                    thread::sleep(Duration::from_micros(100));
635                                }
636                                mgr.release();
637                            }
638                            Err(_) => {
639                                failure.fetch_add(1, Ordering::Relaxed);
640                            }
641                        }
642                    }
643                })
644            })
645            .collect();
646
647        for handle in handles {
648            handle.join().expect("Thread panicked");
649        }
650
651        // All should succeed (no failures configured)
652        assert_eq!(success_count.load(Ordering::Relaxed), 200);
653        assert_eq!(failure_count.load(Ordering::Relaxed), 0);
654
655        // Final state should be clean
656        assert!(!manager.is_running());
657        assert_eq!(manager.refcount(), 0);
658        assert_eq!(manager.phase(), Phase::Uninitialized);
659    }
660
661    #[test]
662    fn test_concurrent_init_with_failures() {
663        // Test that failures during concurrent init are handled correctly
664        let manager = Arc::new(TestableRuntimeManager::new());
665        manager.init_should_fail.store(true, Ordering::Release);
666        manager.init_delay_ms.store(5, Ordering::Release);
667
668        let handles: Vec<_> = (0..5)
669            .map(|_| {
670                let mgr = Arc::clone(&manager);
671                thread::spawn(move || mgr.acquire())
672            })
673            .collect();
674
675        let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
676
677        // All should fail
678        for result in &results {
679            assert!(result.is_err());
680        }
681
682        // State should be Failed
683        assert_eq!(manager.phase(), Phase::Failed);
684        assert!(!manager.is_running());
685
686        // Now allow init to succeed
687        manager.init_should_fail.store(false, Ordering::Release);
688
689        // Retry should work
690        manager.acquire().expect("Retry should succeed");
691        assert!(manager.is_running());
692
693        manager.release();
694    }
695
696    #[test]
697    fn test_source_cache_pattern() {
698        // Test the pattern used by SourceCache: create, cache, clear, recreate
699        let manager = Arc::new(TestableRuntimeManager::new());
700
701        // Simulate SourceCache.find_by_host() - creates NDI, caches it
702        manager.acquire().expect("First source lookup");
703        assert_eq!(manager.init_count.load(Ordering::Acquire), 1);
704
705        // Simulate multiple cached sources
706        manager.acquire().expect("Second source");
707        manager.acquire().expect("Third source");
708        assert_eq!(manager.refcount(), 3);
709
710        // Simulate SourceCache.clear() - drops all cached NDI handles
711        manager.release();
712        manager.release();
713        manager.release();
714
715        // Runtime should be destroyed
716        assert_eq!(manager.destroy_count.load(Ordering::Acquire), 1);
717        assert!(!manager.is_running());
718
719        // Simulate new find_by_host() after clear - must reinit
720        manager.acquire().expect("New lookup after clear");
721        assert_eq!(manager.init_count.load(Ordering::Acquire), 2);
722        assert!(manager.is_running());
723
724        manager.release();
725    }
726}