Skip to main content

grafton_ndi/
async_runtime.rs

1//! Async runtime integration for Tokio and async-std.
2//!
3//! This module provides async wrappers around the synchronous NDI receiver API,
4//! allowing integration with async Rust applications using Tokio or async-std runtimes.
5//!
6//! The NDI SDK operations are inherently synchronous and blocking, so these wrappers
7//! use `spawn_blocking` internally to run NDI operations on a thread pool without
8//! blocking the async runtime.
9//!
10//! For reliable `capture_*` methods, the timeout budget starts when the async
11//! method is called. The blocking task receives only the remaining budget when it
12//! begins, so `spawn_blocking` queue delay does not expand the SDK wait budget.
13//! Runtime scheduling can still make the awaited future complete after the timeout;
14//! these wrappers do not use runtime-level cancellation because that would not stop
15//! a queued or running blocking NDI call.
16//!
17//! # Features
18//!
19//! - `tokio` - Enable Tokio runtime support
20//! - `async-std` - Enable async-std runtime support
21//!
22//! # Example with Tokio
23//!
24//! ```no_run
25//! # #[cfg(feature = "tokio")]
26//! # {
27//! use grafton_ndi::{NDI, ReceiverOptionsBuilder, tokio::AsyncReceiver};
28//!
29//! #[tokio::main]
30//! async fn main() -> Result<(), grafton_ndi::Error> {
31//!     let ndi = NDI::new()?;
32//!     // ... obtain source from finder ...
33//!     # let source = grafton_ndi::Source {
34//!     #     name: "Test".into(),
35//!     #     address: grafton_ndi::SourceAddress::None
36//!     # };
37//!
38//!     let options = ReceiverOptionsBuilder::snapshot_preset(source).build();
39//!     let receiver = grafton_ndi::Receiver::new(&ndi, &options)?;
40//!
41//!     let async_receiver = AsyncReceiver::new(receiver);
42//!
43//!     // Capture frame asynchronously without blocking the runtime
44//!     let frame = async_receiver.video().capture(std::time::Duration::from_secs(5)).await?;
45//!     println!("Captured {}x{} frame", frame.width(), frame.height());
46//!
47//!     Ok(())
48//! }
49//! # }
50//! ```
51
52use std::{
53    future::Future,
54    marker::PhantomData,
55    sync::Arc,
56    time::{Duration, Instant},
57};
58
59use crate::{
60    capture::{AudioKind, CaptureKind, MetadataKind, VideoKind},
61    to_ms_checked, ConnectionStats, Receiver, Result,
62};
63
64#[cfg(feature = "tokio")]
65use crate::Error;
66
67/// Trait for async runtime spawn-blocking abstraction.
68///
69/// This trait enables runtime-agnostic async code by abstracting the spawn-blocking
70/// mechanism. Each runtime (Tokio, async-std) provides its own implementation.
71///
72/// The trait is sealed to prevent external implementations and ensure consistent
73/// error handling across all supported runtimes.
74pub trait SpawnBlocking: sealed::Sealed + Clone + Send + Sync + 'static {
75    /// Spawns a blocking operation on a thread pool and returns its result.
76    ///
77    /// # Errors
78    ///
79    /// Returns `Err(Error::SpawnFailed)` if the blocking task panics or is cancelled.
80    fn spawn_blocking<F, R>(f: F) -> impl Future<Output = Result<R>> + Send
81    where
82        F: FnOnce() -> R + Send + 'static,
83        R: Send + 'static;
84}
85
86mod sealed {
87    pub trait Sealed {}
88
89    #[cfg(feature = "tokio")]
90    impl Sealed for super::TokioRuntime {}
91
92    #[cfg(feature = "async-std")]
93    impl Sealed for super::AsyncStdRuntime {}
94}
95
96/// Tokio async runtime marker type.
97///
98/// Used as a type parameter for [`AsyncReceiverGeneric`] to select Tokio's
99/// `spawn_blocking` implementation.
100#[cfg(feature = "tokio")]
101#[derive(Clone, Copy, Debug, Default)]
102pub struct TokioRuntime;
103
104#[cfg(feature = "tokio")]
105impl SpawnBlocking for TokioRuntime {
106    // Using `impl Future` instead of `async fn` in trait because we need explicit
107    // Send bounds on the returned future. This pattern is intentional.
108    #[allow(clippy::manual_async_fn)]
109    fn spawn_blocking<F, R>(f: F) -> impl Future<Output = Result<R>> + Send
110    where
111        F: FnOnce() -> R + Send + 'static,
112        R: Send + 'static,
113    {
114        async {
115            ::tokio::task::spawn_blocking(f)
116                .await
117                .map_err(|e| Error::SpawnFailed(e.to_string()))
118        }
119    }
120}
121
122/// async-std runtime marker type.
123///
124/// Used as a type parameter for [`AsyncReceiverGeneric`] to select async-std's
125/// `spawn_blocking` implementation.
126#[cfg(feature = "async-std")]
127#[derive(Clone, Copy, Debug, Default)]
128pub struct AsyncStdRuntime;
129
130#[cfg(feature = "async-std")]
131impl SpawnBlocking for AsyncStdRuntime {
132    // Using `impl Future` instead of `async fn` in trait because we need explicit
133    // Send bounds on the returned future. This pattern is intentional.
134    #[allow(clippy::manual_async_fn)]
135    fn spawn_blocking<F, R>(f: F) -> impl Future<Output = Result<R>> + Send
136    where
137        F: FnOnce() -> R + Send + 'static,
138        R: Send + 'static,
139    {
140        async { Ok(::async_std::task::spawn_blocking(f).await) }
141    }
142}
143
144/// Generic async receiver wrapper parameterized by runtime.
145///
146/// This struct provides async versions of the [`Receiver`] methods by running
147/// blocking NDI operations on the runtime's thread pool using `spawn_blocking`.
148///
149/// # Type Parameters
150///
151/// - `R`: The async runtime type, implementing [`SpawnBlocking`]. Use
152///   [`TokioRuntime`] or [`AsyncStdRuntime`].
153///
154/// # Thread Safety
155///
156/// The underlying [`Receiver`] is wrapped in an [`Arc`] to allow sharing across
157/// async tasks and safe cloning. The NDI SDK receiver is inherently thread-safe.
158///
159/// # Example
160///
161/// ```no_run
162/// # #[cfg(feature = "tokio")]
163/// # {
164/// use grafton_ndi::{NDI, ReceiverOptionsBuilder, tokio::AsyncReceiver};
165///
166/// #[tokio::main]
167/// async fn main() -> Result<(), grafton_ndi::Error> {
168///     let ndi = NDI::new()?;
169///     // ... obtain source ...
170///     # let source = grafton_ndi::Source {
171///     #     name: "Test".into(),
172///     #     address: grafton_ndi::SourceAddress::None
173///     # };
174///
175///     let options = ReceiverOptionsBuilder::snapshot_preset(source).build();
176///     let receiver = grafton_ndi::Receiver::new(&ndi, &options)?;
177///     let async_receiver = AsyncReceiver::new(receiver);
178///
179///     // Non-blocking async capture
180///     match async_receiver.video().try_capture(std::time::Duration::from_millis(100)).await? {
181///         Some(frame) => println!("Got frame: {}x{}", frame.width(), frame.height()),
182///         None => println!("No frame available"),
183///     }
184///
185///     Ok(())
186/// }
187/// # }
188/// ```
189pub struct AsyncReceiverGeneric<R: SpawnBlocking> {
190    inner: Arc<Receiver>,
191    _runtime: PhantomData<R>,
192}
193
194fn validated_timeout_start(timeout: Duration) -> Result<Instant> {
195    to_ms_checked(timeout)?;
196    Ok(Instant::now())
197}
198
199fn remaining_timeout(timeout: Duration, start_time: Instant) -> Duration {
200    timeout.saturating_sub(start_time.elapsed())
201}
202
203impl<R: SpawnBlocking> AsyncReceiverGeneric<R> {
204    /// Create a new async receiver wrapper.
205    ///
206    /// The receiver is wrapped in an [`Arc`] to allow sharing across async tasks.
207    pub fn new(receiver: Receiver) -> Self {
208        Self {
209            inner: Arc::new(receiver),
210            _runtime: PhantomData,
211        }
212    }
213
214    /// Capture **video** frames without blocking the async runtime.
215    ///
216    /// Returns an [`AsyncCapture`] view; see [`AsyncCapture`] for its verbs
217    /// ([`capture`](AsyncCapture::capture),
218    /// [`try_capture`](AsyncCapture::try_capture)). Mirrors
219    /// [`Receiver::video`], running each capture on the blocking pool.
220    ///
221    /// # Example
222    ///
223    /// ```no_run
224    /// # #[cfg(feature = "tokio")]
225    /// # {
226    /// # use grafton_ndi::{NDI, ReceiverOptions, tokio::AsyncReceiver};
227    /// # use std::time::Duration;
228    /// # #[tokio::main]
229    /// # async fn main() -> Result<(), grafton_ndi::Error> {
230    /// # let ndi = NDI::new()?;
231    /// # let source = grafton_ndi::Source {
232    /// #     name: "Test".into(),
233    /// #     address: grafton_ndi::SourceAddress::None
234    /// # };
235    /// # let options = ReceiverOptions::builder(source).build();
236    /// # let receiver = grafton_ndi::Receiver::new(&ndi, &options)?;
237    /// let async_receiver = AsyncReceiver::new(receiver);
238    /// let frame = async_receiver.video().capture(Duration::from_secs(5)).await?;
239    /// println!("Captured {}x{} frame", frame.width(), frame.height());
240    /// # Ok(())
241    /// # }
242    /// # }
243    /// ```
244    #[must_use = "the AsyncCapture view does nothing until a capture verb is awaited"]
245    pub fn video(&self) -> AsyncCapture<'_, R, VideoKind> {
246        AsyncCapture::new(self)
247    }
248
249    /// Capture **audio** frames without blocking the async runtime.
250    ///
251    /// Returns an [`AsyncCapture`] view; see [`video`](Self::video) for usage.
252    /// Mirrors [`Receiver::audio`].
253    #[must_use = "the AsyncCapture view does nothing until a capture verb is awaited"]
254    pub fn audio(&self) -> AsyncCapture<'_, R, AudioKind> {
255        AsyncCapture::new(self)
256    }
257
258    /// Capture **metadata** frames without blocking the async runtime.
259    ///
260    /// Returns an [`AsyncCapture`] view; see [`video`](Self::video) for usage.
261    /// Mirrors [`Receiver::metadata`].
262    #[must_use = "the AsyncCapture view does nothing until a capture verb is awaited"]
263    pub fn metadata(&self) -> AsyncCapture<'_, R, MetadataKind> {
264        AsyncCapture::new(self)
265    }
266
267    /// Whether the underlying receiver currently has at least one active
268    /// connection to its source. See [`Receiver::is_connected`].
269    ///
270    /// A cheap, non-blocking SDK query, so it runs inline rather than on
271    /// the blocking pool.
272    pub fn is_connected(&self) -> bool {
273        self.inner.is_connected()
274    }
275
276    /// Connection and frame-throughput statistics for the underlying
277    /// receiver. See [`Receiver::connection_stats`].
278    ///
279    /// A cheap, non-blocking SDK query, so it runs inline rather than on
280    /// the blocking pool. `connection_stats().video_frames_received` is
281    /// the canonical liveness signal: it advances as the receiver pulls
282    /// frames off the network, independent of how often the caller
283    /// captures, so a frozen counter means the feed itself has stalled.
284    pub fn connection_stats(&self) -> ConnectionStats {
285        self.inner.connection_stats()
286    }
287
288    /// Re-establish the underlying receiver's connection to its source
289    /// in place. See [`Receiver::reconnect`].
290    ///
291    /// Unlike the liveness probes above, a reconnect takes the receiver's
292    /// capture guard exclusively, so it can block until in-flight captures
293    /// (which run on the blocking pool) drain. It therefore runs on the
294    /// blocking pool too, never on the async runtime's threads, and is safe to
295    /// call while captures on this receiver are in flight — they serialize
296    /// instead of racing. Confirm recovery via [`Self::connection_stats`].
297    pub async fn reconnect(&self) -> Result<()> {
298        let receiver = Arc::clone(&self.inner);
299        R::spawn_blocking(move || receiver.reconnect()).await?
300    }
301}
302
303/// A typed view over an async receiver for capturing frames of one kind,
304/// without blocking the async runtime.
305///
306/// Created by [`AsyncReceiverGeneric::video`], [`audio`](AsyncReceiverGeneric::audio),
307/// and [`metadata`](AsyncReceiverGeneric::metadata). Each verb runs the
308/// underlying synchronous capture on the runtime's blocking pool:
309///
310/// - [`capture`](Self::capture) — reliable owned capture with built-in retry.
311///   The timeout budget starts when the future is created; `spawn_blocking`
312///   queue delay is subtracted before the SDK begins waiting.
313/// - [`try_capture`](Self::try_capture) — a single owned poll; `Ok(None)` when
314///   no frame is ready.
315///
316/// There is no zero-copy `try_capture_ref` here: a borrowed frame is tied to
317/// the receiver and cannot cross the `spawn_blocking` boundary. Use
318/// [`Receiver::video`]'s [`Capture::try_capture_ref`](crate::Capture::try_capture_ref)
319/// on the synchronous receiver for in-place processing.
320pub struct AsyncCapture<'rx, R: SpawnBlocking, K: CaptureKind> {
321    recv: &'rx AsyncReceiverGeneric<R>,
322    _kind: PhantomData<K>,
323}
324
325impl<'rx, R: SpawnBlocking, K: CaptureKind> AsyncCapture<'rx, R, K>
326where
327    K::Owned: Send + 'static,
328{
329    fn new(recv: &'rx AsyncReceiverGeneric<R>) -> Self {
330        Self {
331            recv,
332            _kind: PhantomData,
333        }
334    }
335
336    /// Async version of [`Capture::capture`](crate::Capture::capture):
337    /// reliable owned capture that retries across the SDK's initial-sync
338    /// warm-up, run on the blocking pool.
339    ///
340    /// # Arguments
341    ///
342    /// * `timeout` - Total budget to wait for a frame, starting when this async
343    ///   method is called. Blocking task queue delay is subtracted before the
344    ///   synchronous receiver starts waiting. Must not exceed
345    ///   [`crate::MAX_TIMEOUT`] (~49.7 days).
346    ///
347    /// # Returns
348    ///
349    /// * `Ok(frame)` - Successfully captured a frame
350    /// * `Err(Error::FrameTimeout)` - No frame received within timeout (includes retry details)
351    /// * `Err(Error::SpawnFailed)` - The blocking task panicked or was cancelled
352    /// * `Err(_)` - Another error occurred during capture
353    pub async fn capture(&self, timeout: Duration) -> Result<K::Owned> {
354        let start_time = validated_timeout_start(timeout)?;
355        let receiver = Arc::clone(&self.recv.inner);
356        R::spawn_blocking(move || {
357            receiver.capture_kind::<K>(remaining_timeout(timeout, start_time))
358        })
359        .await?
360    }
361
362    /// Async version of
363    /// [`Capture::try_capture`](crate::Capture::try_capture): a single owned
364    /// poll, run on the blocking pool.
365    ///
366    /// # Arguments
367    ///
368    /// * `timeout` - Maximum time to wait for a frame. Must not exceed
369    ///   [`crate::MAX_TIMEOUT`] (~49.7 days).
370    ///
371    /// # Returns
372    ///
373    /// * `Ok(Some(frame))` - Successfully captured a frame
374    /// * `Ok(None)` - No frame available within timeout
375    /// * `Err(Error::SpawnFailed)` - The blocking task panicked or was cancelled
376    /// * `Err(_)` - An error occurred during capture
377    pub async fn try_capture(&self, timeout: Duration) -> Result<Option<K::Owned>> {
378        let receiver = Arc::clone(&self.recv.inner);
379        R::spawn_blocking(move || receiver.try_capture_kind::<K>(timeout)).await?
380    }
381}
382
383impl<R: SpawnBlocking> Clone for AsyncReceiverGeneric<R> {
384    fn clone(&self) -> Self {
385        Self {
386            inner: Arc::clone(&self.inner),
387            _runtime: PhantomData,
388        }
389    }
390}
391
392// Backward-compatible module re-exports
393
394#[cfg(feature = "tokio")]
395pub mod tokio {
396    //! Tokio async runtime integration.
397    //!
398    //! Provides [`AsyncReceiver`] wrapper that uses `tokio::task::spawn_blocking`
399    //! to run NDI operations without blocking the Tokio runtime.
400    //!
401    //! # Example
402    //!
403    //! ```no_run
404    //! # #[cfg(feature = "tokio")]
405    //! # {
406    //! use grafton_ndi::{NDI, ReceiverOptionsBuilder, tokio::AsyncReceiver};
407    //!
408    //! #[tokio::main]
409    //! async fn main() -> Result<(), grafton_ndi::Error> {
410    //!     let ndi = NDI::new()?;
411    //!     // ... obtain source ...
412    //!     # let source = grafton_ndi::Source {
413    //!     #     name: "Test".into(),
414    //!     #     address: grafton_ndi::SourceAddress::None
415    //!     # };
416    //!
417    //!     let options = ReceiverOptionsBuilder::snapshot_preset(source).build();
418    //!     let receiver = grafton_ndi::Receiver::new(&ndi, &options)?;
419    //!     let async_receiver = AsyncReceiver::new(receiver);
420    //!
421    //!     // Non-blocking async capture
422    //!     match async_receiver.video().try_capture(std::time::Duration::from_millis(100)).await? {
423    //!         Some(frame) => println!("Got frame: {}x{}", frame.width(), frame.height()),
424    //!         None => println!("No frame available"),
425    //!     }
426    //!
427    //!     Ok(())
428    //! }
429    //! # }
430    //! ```
431
432    use super::{AsyncReceiverGeneric, TokioRuntime};
433
434    /// Async receiver wrapper for Tokio runtime.
435    ///
436    /// This is a type alias for the generic async receiver parameterized with
437    /// the Tokio runtime. It provides async versions of the [`crate::Receiver`]
438    /// methods by running blocking NDI operations on Tokio's blocking thread
439    /// pool using `spawn_blocking`.
440    ///
441    /// # Thread Safety
442    ///
443    /// The underlying `Receiver` is wrapped in an `Arc` to allow sharing across
444    /// async tasks and safe cloning. The NDI SDK receiver is inherently thread-safe.
445    ///
446    /// # Error Handling
447    ///
448    /// All methods return [`crate::Result`], converting any task panic or cancellation
449    /// into [`crate::Error::SpawnFailed`] rather than propagating the panic.
450    pub type AsyncReceiver = AsyncReceiverGeneric<TokioRuntime>;
451}
452
453#[cfg(feature = "async-std")]
454pub mod async_std {
455    //! async-std runtime integration.
456    //!
457    //! Provides [`AsyncReceiver`] wrapper that uses `async_std::task::spawn_blocking`
458    //! to run NDI operations without blocking the async-std runtime.
459    //!
460    //! # Example
461    //!
462    //! ```no_run
463    //! # #[cfg(feature = "async-std")]
464    //! # {
465    //! use grafton_ndi::{NDI, ReceiverOptionsBuilder, async_std::AsyncReceiver};
466    //!
467    //! #[async_std::main]
468    //! async fn main() -> Result<(), grafton_ndi::Error> {
469    //!     let ndi = NDI::new()?;
470    //!     // ... obtain source ...
471    //!     # let source = grafton_ndi::Source {
472    //!     #     name: "Test".into(),
473    //!     #     address: grafton_ndi::SourceAddress::None
474    //!     # };
475    //!
476    //!     let options = ReceiverOptionsBuilder::snapshot_preset(source).build();
477    //!     let receiver = grafton_ndi::Receiver::new(&ndi, &options)?;
478    //!     let async_receiver = AsyncReceiver::new(receiver);
479    //!
480    //!     // Non-blocking async capture
481    //!     match async_receiver.video().try_capture(std::time::Duration::from_millis(100)).await? {
482    //!         Some(frame) => println!("Got frame: {}x{}", frame.width(), frame.height()),
483    //!         None => println!("No frame available"),
484    //!     }
485    //!
486    //!     Ok(())
487    //! }
488    //! # }
489    //! ```
490
491    use super::{AsyncReceiverGeneric, AsyncStdRuntime};
492
493    /// Async receiver wrapper for async-std runtime.
494    ///
495    /// This is a type alias for the generic async receiver parameterized with
496    /// the async-std runtime. It provides async versions of the [`crate::Receiver`]
497    /// methods by running blocking NDI operations on async-std's blocking thread
498    /// pool using `spawn_blocking`.
499    ///
500    /// # Thread Safety
501    ///
502    /// The underlying `Receiver` is wrapped in an `Arc` to allow sharing across
503    /// async tasks and safe cloning. The NDI SDK receiver is inherently thread-safe.
504    ///
505    /// # Error Handling
506    ///
507    /// All methods return [`crate::Result`]. Note that async-std's `spawn_blocking`
508    /// does not return a `Result`, so spawn failures from this runtime are less
509    /// common than with Tokio.
510    pub type AsyncReceiver = AsyncReceiverGeneric<AsyncStdRuntime>;
511}