1use once_cell::sync::Lazy;
4
5use std::sync::{Condvar, Mutex, MutexGuard};
6
7use crate::{ndi_lib::*, Error, Result};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17enum Phase {
18 Uninitialized,
20 Initializing,
22 Running,
24 Destroying,
26 Failed,
28}
29
30struct RuntimeState {
32 phase: Phase,
33 refcount: usize,
34}
35
36impl RuntimeState {
37 const fn new() -> Self {
38 Self {
39 phase: Phase::Uninitialized,
40 refcount: 0,
41 }
42 }
43}
44
45struct RuntimeManager {
53 state: Mutex<RuntimeState>,
54 cv: Condvar,
55}
56
57impl RuntimeManager {
58 const fn new() -> Self {
59 Self {
60 state: Mutex::new(RuntimeState::new()),
61 cv: Condvar::new(),
62 }
63 }
64
65 fn recover_guard<'a>(
67 result: std::sync::LockResult<MutexGuard<'a, RuntimeState>>,
68 ) -> MutexGuard<'a, RuntimeState> {
69 result.unwrap_or_else(|poisoned| poisoned.into_inner())
70 }
71
72 fn acquire(&self) -> Result<()> {
73 let mut guard = Self::recover_guard(self.state.lock());
74
75 loop {
76 match guard.phase {
77 Phase::Uninitialized | Phase::Failed => {
78 guard.phase = Phase::Initializing;
80 drop(guard);
81
82 #[cfg(all(target_os = "windows", debug_assertions))]
84 {
85 if std::env::var("CI").is_ok() {
86 eprintln!("[NDI] Initializing NDI runtime in CI environment...");
87 if let Ok(sdk_dir) = std::env::var("NDI_SDK_DIR") {
88 eprintln!("[NDI] NDI_SDK_DIR: {}", sdk_dir);
89 }
90 }
91 }
92
93 let succeeded = unsafe { NDIlib_initialize() };
94
95 guard = Self::recover_guard(self.state.lock());
97
98 if succeeded {
99 guard.phase = Phase::Running;
100 guard.refcount = 1;
101 self.cv.notify_all();
102 return Ok(());
103 } else {
104 guard.phase = Phase::Failed;
105 self.cv.notify_all();
106 return Err(Error::InitializationFailed(
107 "NDIlib_initialize failed".into(),
108 ));
109 }
110 }
111
112 Phase::Initializing | Phase::Destroying => {
113 guard = Self::recover_guard(self.cv.wait(guard));
115 }
117
118 Phase::Running => {
119 guard.refcount += 1;
121 return Ok(());
122 }
123 }
124 }
125 }
126
127 fn release(&self) {
128 let mut guard = Self::recover_guard(self.state.lock());
129
130 debug_assert!(
131 guard.refcount > 0,
132 "release() called when refcount was already 0 (double-free or unbalanced release)"
133 );
134 debug_assert!(
135 guard.phase == Phase::Running,
136 "release() called when phase was {:?}, expected Running",
137 guard.phase
138 );
139
140 guard.refcount -= 1;
141
142 if guard.refcount == 0 {
143 guard.phase = Phase::Destroying;
145 drop(guard);
146
147 unsafe { NDIlib_destroy() };
149
150 let mut guard = Self::recover_guard(self.state.lock());
152 guard.phase = Phase::Uninitialized;
153 self.cv.notify_all();
154 }
155 }
156
157 fn is_running(&self) -> bool {
158 let guard = Self::recover_guard(self.state.lock());
159 guard.phase == Phase::Running && guard.refcount > 0
160 }
161}
162
163static RUNTIME: Lazy<RuntimeManager> = Lazy::new(RuntimeManager::new);
164
165#[derive(Debug)]
188pub struct NDI;
189
190impl NDI {
191 pub fn new() -> Result<Self> {
213 RUNTIME.acquire()?;
214 Ok(Self)
215 }
216
217 pub fn is_supported_cpu() -> bool {
231 unsafe { NDIlib_is_supported_CPU() }
232 }
233
234 pub fn version() -> Result<String> {
254 unsafe {
255 let version_ptr = NDIlib_version();
256 if version_ptr.is_null() {
257 return Err(Error::NullPointer("NDIlib_version".into()));
258 }
259 let c_str = std::ffi::CStr::from_ptr(version_ptr);
260 c_str
261 .to_str()
262 .map(|s| s.to_owned())
263 .map_err(|e| Error::InvalidUtf8(e.to_string()))
264 }
265 }
266 pub fn is_running() -> bool {
278 RUNTIME.is_running()
279 }
280}
281
282impl Clone for NDI {
283 fn clone(&self) -> Self {
284 RUNTIME
285 .acquire()
286 .expect("Runtime should be initialized when cloning existing NDI handle");
287 Self
288 }
289}
290
291impl Drop for NDI {
292 fn drop(&mut self) {
293 RUNTIME.release();
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
301 use std::sync::Arc;
302 use std::thread;
303 use std::time::Duration;
304
305 struct TestableRuntimeManager {
308 state: Mutex<RuntimeState>,
309 cv: Condvar,
310 init_count: AtomicUsize,
311 destroy_count: AtomicUsize,
312 init_should_fail: AtomicBool,
313 init_delay_ms: AtomicUsize,
314 destroy_delay_ms: AtomicUsize,
315 }
316
317 impl TestableRuntimeManager {
318 fn new() -> Self {
319 Self {
320 state: Mutex::new(RuntimeState::new()),
321 cv: Condvar::new(),
322 init_count: AtomicUsize::new(0),
323 destroy_count: AtomicUsize::new(0),
324 init_should_fail: AtomicBool::new(false),
325 init_delay_ms: AtomicUsize::new(0),
326 destroy_delay_ms: AtomicUsize::new(0),
327 }
328 }
329
330 fn recover_guard<'a>(
331 result: std::sync::LockResult<MutexGuard<'a, RuntimeState>>,
332 ) -> MutexGuard<'a, RuntimeState> {
333 result.unwrap_or_else(|poisoned| poisoned.into_inner())
334 }
335
336 fn mock_initialize(&self) -> bool {
337 let delay = self.init_delay_ms.load(Ordering::Acquire);
338 if delay > 0 {
339 thread::sleep(Duration::from_millis(delay as u64));
340 }
341 self.init_count.fetch_add(1, Ordering::AcqRel);
342 !self.init_should_fail.load(Ordering::Acquire)
343 }
344
345 fn mock_destroy(&self) {
346 let delay = self.destroy_delay_ms.load(Ordering::Acquire);
347 if delay > 0 {
348 thread::sleep(Duration::from_millis(delay as u64));
349 }
350 self.destroy_count.fetch_add(1, Ordering::AcqRel);
351 }
352
353 fn acquire(&self) -> Result<()> {
354 let mut guard = Self::recover_guard(self.state.lock());
355
356 loop {
357 match guard.phase {
358 Phase::Uninitialized | Phase::Failed => {
359 guard.phase = Phase::Initializing;
360 drop(guard);
361
362 let succeeded = self.mock_initialize();
363
364 guard = Self::recover_guard(self.state.lock());
365
366 if succeeded {
367 guard.phase = Phase::Running;
368 guard.refcount = 1;
369 self.cv.notify_all();
370 return Ok(());
371 } else {
372 guard.phase = Phase::Failed;
373 self.cv.notify_all();
374 return Err(Error::InitializationFailed(
375 "Mock NDIlib_initialize failed".into(),
376 ));
377 }
378 }
379
380 Phase::Initializing | Phase::Destroying => {
381 guard = Self::recover_guard(self.cv.wait(guard));
382 }
383
384 Phase::Running => {
385 guard.refcount += 1;
386 return Ok(());
387 }
388 }
389 }
390 }
391
392 fn release(&self) {
393 let mut guard = Self::recover_guard(self.state.lock());
394
395 assert!(guard.refcount > 0, "release() called with refcount 0");
396 assert!(
397 guard.phase == Phase::Running,
398 "release() called in phase {:?}",
399 guard.phase
400 );
401
402 guard.refcount -= 1;
403
404 if guard.refcount == 0 {
405 guard.phase = Phase::Destroying;
406 drop(guard);
407
408 self.mock_destroy();
409
410 let mut guard = Self::recover_guard(self.state.lock());
411 guard.phase = Phase::Uninitialized;
412 self.cv.notify_all();
413 }
414 }
415
416 fn is_running(&self) -> bool {
417 let guard = Self::recover_guard(self.state.lock());
418 guard.phase == Phase::Running && guard.refcount > 0
419 }
420
421 fn phase(&self) -> Phase {
422 let guard = Self::recover_guard(self.state.lock());
423 guard.phase
424 }
425
426 fn refcount(&self) -> usize {
427 let guard = Self::recover_guard(self.state.lock());
428 guard.refcount
429 }
430 }
431
432 #[test]
435 fn test_reinit_after_teardown() {
436 let manager = Arc::new(TestableRuntimeManager::new());
439
440 manager.acquire().expect("First init should succeed");
442 assert_eq!(manager.init_count.load(Ordering::Acquire), 1);
443 assert!(manager.is_running());
444
445 manager.release();
446 assert_eq!(manager.destroy_count.load(Ordering::Acquire), 1);
447 assert!(!manager.is_running());
448 assert_eq!(manager.phase(), Phase::Uninitialized);
449
450 manager.acquire().expect("Second init should succeed");
452 assert_eq!(manager.init_count.load(Ordering::Acquire), 2);
453 assert!(manager.is_running());
454
455 manager.release();
456 assert_eq!(manager.destroy_count.load(Ordering::Acquire), 2);
457 assert!(!manager.is_running());
458 }
459
460 #[test]
461 fn test_init_failure_retry() {
462 let manager = Arc::new(TestableRuntimeManager::new());
464
465 manager.init_should_fail.store(true, Ordering::Release);
467
468 let result = manager.acquire();
469 assert!(result.is_err());
470 assert_eq!(manager.init_count.load(Ordering::Acquire), 1);
471 assert_eq!(manager.phase(), Phase::Failed);
472 assert!(!manager.is_running());
473
474 manager.init_should_fail.store(false, Ordering::Release);
476
477 let result = manager.acquire();
478 assert!(result.is_ok());
479 assert_eq!(manager.init_count.load(Ordering::Acquire), 2);
480 assert!(manager.is_running());
481
482 manager.release();
483 }
484
485 #[test]
486 fn test_no_ok_while_uninitialized() {
487 let manager = Arc::new(TestableRuntimeManager::new());
489
490 assert_eq!(manager.phase(), Phase::Uninitialized);
492 assert!(!manager.is_running());
493 assert_eq!(manager.init_count.load(Ordering::Acquire), 0);
494
495 manager.acquire().expect("Should succeed");
497 assert_eq!(manager.init_count.load(Ordering::Acquire), 1);
498
499 manager.acquire().expect("Clone should succeed");
501 assert_eq!(manager.init_count.load(Ordering::Acquire), 1);
502 assert_eq!(manager.refcount(), 2);
503
504 manager.release();
506 manager.release();
507
508 assert_eq!(manager.phase(), Phase::Uninitialized);
510 manager.acquire().expect("Re-init should succeed");
511 assert_eq!(manager.init_count.load(Ordering::Acquire), 2);
512
513 manager.release();
514 }
515
516 #[test]
517 fn test_destroy_called_exactly_once_per_cycle() {
518 let manager = Arc::new(TestableRuntimeManager::new());
520
521 manager.acquire().expect("Init");
523 manager.acquire().expect("Clone 1");
524 manager.acquire().expect("Clone 2");
525 assert_eq!(manager.refcount(), 3);
526 assert_eq!(manager.destroy_count.load(Ordering::Acquire), 0);
527
528 manager.release();
529 assert_eq!(manager.destroy_count.load(Ordering::Acquire), 0);
530 manager.release();
531 assert_eq!(manager.destroy_count.load(Ordering::Acquire), 0);
532 manager.release(); assert_eq!(manager.destroy_count.load(Ordering::Acquire), 1);
534
535 manager.acquire().expect("Re-init");
537 manager.acquire().expect("Clone");
538 assert_eq!(manager.destroy_count.load(Ordering::Acquire), 1);
539
540 manager.release();
541 manager.release();
542 assert_eq!(manager.destroy_count.load(Ordering::Acquire), 2);
543 }
544
545 #[test]
548 fn test_concurrent_acquire_single_init() {
549 let manager = Arc::new(TestableRuntimeManager::new());
551
552 manager.init_delay_ms.store(5, Ordering::Release);
554
555 let handles: Vec<_> = (0..10)
556 .map(|_| {
557 let mgr = Arc::clone(&manager);
558 thread::spawn(move || mgr.acquire())
559 })
560 .collect();
561
562 let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
563
564 for result in &results {
566 assert!(result.is_ok());
567 }
568
569 assert_eq!(manager.init_count.load(Ordering::Acquire), 1);
571 assert_eq!(manager.refcount(), 10);
572
573 for _ in 0..10 {
575 manager.release();
576 }
577 }
578
579 #[test]
580 fn test_concurrent_acquire_during_destroy() {
581 let manager = Arc::new(TestableRuntimeManager::new());
583
584 manager.acquire().expect("Init");
586 assert!(manager.is_running());
587
588 manager.destroy_delay_ms.store(50, Ordering::Release);
590
591 let mgr_clone = Arc::clone(&manager);
592
593 let acquirer = thread::spawn(move || {
595 thread::sleep(Duration::from_millis(10));
597 mgr_clone.acquire()
598 });
599
600 manager.release();
602
603 let result = acquirer.join().unwrap();
605 assert!(result.is_ok());
606
607 assert_eq!(manager.init_count.load(Ordering::Acquire), 2);
609 assert_eq!(manager.destroy_count.load(Ordering::Acquire), 1);
610
611 manager.release();
612 }
613
614 #[test]
615 fn test_mixed_acquire_release_concurrent() {
616 let manager = Arc::new(TestableRuntimeManager::new());
618 let success_count = Arc::new(AtomicUsize::new(0));
619 let failure_count = Arc::new(AtomicUsize::new(0));
620
621 let handles: Vec<_> = (0..20)
622 .map(|i| {
623 let mgr = Arc::clone(&manager);
624 let success = Arc::clone(&success_count);
625 let failure = Arc::clone(&failure_count);
626
627 thread::spawn(move || {
628 for _ in 0..10 {
629 match mgr.acquire() {
630 Ok(()) => {
631 success.fetch_add(1, Ordering::Relaxed);
632 if i % 3 == 0 {
634 thread::sleep(Duration::from_micros(100));
635 }
636 mgr.release();
637 }
638 Err(_) => {
639 failure.fetch_add(1, Ordering::Relaxed);
640 }
641 }
642 }
643 })
644 })
645 .collect();
646
647 for handle in handles {
648 handle.join().expect("Thread panicked");
649 }
650
651 assert_eq!(success_count.load(Ordering::Relaxed), 200);
653 assert_eq!(failure_count.load(Ordering::Relaxed), 0);
654
655 assert!(!manager.is_running());
657 assert_eq!(manager.refcount(), 0);
658 assert_eq!(manager.phase(), Phase::Uninitialized);
659 }
660
661 #[test]
662 fn test_concurrent_init_with_failures() {
663 let manager = Arc::new(TestableRuntimeManager::new());
665 manager.init_should_fail.store(true, Ordering::Release);
666 manager.init_delay_ms.store(5, Ordering::Release);
667
668 let handles: Vec<_> = (0..5)
669 .map(|_| {
670 let mgr = Arc::clone(&manager);
671 thread::spawn(move || mgr.acquire())
672 })
673 .collect();
674
675 let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
676
677 for result in &results {
679 assert!(result.is_err());
680 }
681
682 assert_eq!(manager.phase(), Phase::Failed);
684 assert!(!manager.is_running());
685
686 manager.init_should_fail.store(false, Ordering::Release);
688
689 manager.acquire().expect("Retry should succeed");
691 assert!(manager.is_running());
692
693 manager.release();
694 }
695
696 #[test]
697 fn test_source_cache_pattern() {
698 let manager = Arc::new(TestableRuntimeManager::new());
700
701 manager.acquire().expect("First source lookup");
703 assert_eq!(manager.init_count.load(Ordering::Acquire), 1);
704
705 manager.acquire().expect("Second source");
707 manager.acquire().expect("Third source");
708 assert_eq!(manager.refcount(), 3);
709
710 manager.release();
712 manager.release();
713 manager.release();
714
715 assert_eq!(manager.destroy_count.load(Ordering::Acquire), 1);
717 assert!(!manager.is_running());
718
719 manager.acquire().expect("New lookup after clear");
721 assert_eq!(manager.init_count.load(Ordering::Acquire), 2);
722 assert!(manager.is_running());
723
724 manager.release();
725 }
726}