grafton_ndi/async_runtime.rs
1//! Async runtime integration for Tokio and async-std.
2//!
3//! This module provides async wrappers around the synchronous NDI receiver API,
4//! allowing integration with async Rust applications using Tokio or async-std runtimes.
5//!
6//! The NDI SDK operations are inherently synchronous and blocking, so these wrappers
7//! use `spawn_blocking` internally to run NDI operations on a thread pool without
8//! blocking the async runtime.
9//!
10//! For reliable `capture_*` methods, the timeout budget starts when the async
11//! method is called. The blocking task receives only the remaining budget when it
12//! begins, so `spawn_blocking` queue delay does not expand the SDK wait budget.
13//! Runtime scheduling can still make the awaited future complete after the timeout;
14//! these wrappers do not use runtime-level cancellation because that would not stop
15//! a queued or running blocking NDI call.
16//!
17//! # Features
18//!
19//! - `tokio` - Enable Tokio runtime support
20//! - `async-std` - Enable async-std runtime support
21//!
22//! # Example with Tokio
23//!
24//! ```no_run
25//! # #[cfg(feature = "tokio")]
26//! # {
27//! use grafton_ndi::{NDI, ReceiverOptionsBuilder, tokio::AsyncReceiver};
28//!
29//! #[tokio::main]
30//! async fn main() -> Result<(), grafton_ndi::Error> {
31//! let ndi = NDI::new()?;
32//! // ... obtain source from finder ...
33//! # let source = grafton_ndi::Source {
34//! # name: "Test".into(),
35//! # address: grafton_ndi::SourceAddress::None
36//! # };
37//!
38//! let options = ReceiverOptionsBuilder::snapshot_preset(source).build();
39//! let receiver = grafton_ndi::Receiver::new(&ndi, &options)?;
40//!
41//! let async_receiver = AsyncReceiver::new(receiver);
42//!
43//! // Capture frame asynchronously without blocking the runtime
44//! let frame = async_receiver.video().capture(std::time::Duration::from_secs(5)).await?;
45//! println!("Captured {}x{} frame", frame.width(), frame.height());
46//!
47//! Ok(())
48//! }
49//! # }
50//! ```
51
52use std::{
53 future::Future,
54 marker::PhantomData,
55 sync::Arc,
56 time::{Duration, Instant},
57};
58
59use crate::{
60 capture::{AudioKind, CaptureKind, MetadataKind, VideoKind},
61 to_ms_checked, ConnectionStats, Receiver, Result,
62};
63
64#[cfg(feature = "tokio")]
65use crate::Error;
66
67/// Trait for async runtime spawn-blocking abstraction.
68///
69/// This trait enables runtime-agnostic async code by abstracting the spawn-blocking
70/// mechanism. Each runtime (Tokio, async-std) provides its own implementation.
71///
72/// The trait is sealed to prevent external implementations and ensure consistent
73/// error handling across all supported runtimes.
74pub trait SpawnBlocking: sealed::Sealed + Clone + Send + Sync + 'static {
75 /// Spawns a blocking operation on a thread pool and returns its result.
76 ///
77 /// # Errors
78 ///
79 /// Returns `Err(Error::SpawnFailed)` if the blocking task panics or is cancelled.
80 fn spawn_blocking<F, R>(f: F) -> impl Future<Output = Result<R>> + Send
81 where
82 F: FnOnce() -> R + Send + 'static,
83 R: Send + 'static;
84}
85
86mod sealed {
87 pub trait Sealed {}
88
89 #[cfg(feature = "tokio")]
90 impl Sealed for super::TokioRuntime {}
91
92 #[cfg(feature = "async-std")]
93 impl Sealed for super::AsyncStdRuntime {}
94}
95
96/// Tokio async runtime marker type.
97///
98/// Used as a type parameter for [`AsyncReceiverGeneric`] to select Tokio's
99/// `spawn_blocking` implementation.
100#[cfg(feature = "tokio")]
101#[derive(Clone, Copy, Debug, Default)]
102pub struct TokioRuntime;
103
104#[cfg(feature = "tokio")]
105impl SpawnBlocking for TokioRuntime {
106 // Using `impl Future` instead of `async fn` in trait because we need explicit
107 // Send bounds on the returned future. This pattern is intentional.
108 #[allow(clippy::manual_async_fn)]
109 fn spawn_blocking<F, R>(f: F) -> impl Future<Output = Result<R>> + Send
110 where
111 F: FnOnce() -> R + Send + 'static,
112 R: Send + 'static,
113 {
114 async {
115 ::tokio::task::spawn_blocking(f)
116 .await
117 .map_err(|e| Error::SpawnFailed(e.to_string()))
118 }
119 }
120}
121
122/// async-std runtime marker type.
123///
124/// Used as a type parameter for [`AsyncReceiverGeneric`] to select async-std's
125/// `spawn_blocking` implementation.
126#[cfg(feature = "async-std")]
127#[derive(Clone, Copy, Debug, Default)]
128pub struct AsyncStdRuntime;
129
130#[cfg(feature = "async-std")]
131impl SpawnBlocking for AsyncStdRuntime {
132 // Using `impl Future` instead of `async fn` in trait because we need explicit
133 // Send bounds on the returned future. This pattern is intentional.
134 #[allow(clippy::manual_async_fn)]
135 fn spawn_blocking<F, R>(f: F) -> impl Future<Output = Result<R>> + Send
136 where
137 F: FnOnce() -> R + Send + 'static,
138 R: Send + 'static,
139 {
140 async { Ok(::async_std::task::spawn_blocking(f).await) }
141 }
142}
143
144/// Generic async receiver wrapper parameterized by runtime.
145///
146/// This struct provides async versions of the [`Receiver`] methods by running
147/// blocking NDI operations on the runtime's thread pool using `spawn_blocking`.
148///
149/// # Type Parameters
150///
151/// - `R`: The async runtime type, implementing [`SpawnBlocking`]. Use
152/// [`TokioRuntime`] or [`AsyncStdRuntime`].
153///
154/// # Thread Safety
155///
156/// The underlying [`Receiver`] is wrapped in an [`Arc`] to allow sharing across
157/// async tasks and safe cloning. The NDI SDK receiver is inherently thread-safe.
158///
159/// # Example
160///
161/// ```no_run
162/// # #[cfg(feature = "tokio")]
163/// # {
164/// use grafton_ndi::{NDI, ReceiverOptionsBuilder, tokio::AsyncReceiver};
165///
166/// #[tokio::main]
167/// async fn main() -> Result<(), grafton_ndi::Error> {
168/// let ndi = NDI::new()?;
169/// // ... obtain source ...
170/// # let source = grafton_ndi::Source {
171/// # name: "Test".into(),
172/// # address: grafton_ndi::SourceAddress::None
173/// # };
174///
175/// let options = ReceiverOptionsBuilder::snapshot_preset(source).build();
176/// let receiver = grafton_ndi::Receiver::new(&ndi, &options)?;
177/// let async_receiver = AsyncReceiver::new(receiver);
178///
179/// // Non-blocking async capture
180/// match async_receiver.video().try_capture(std::time::Duration::from_millis(100)).await? {
181/// Some(frame) => println!("Got frame: {}x{}", frame.width(), frame.height()),
182/// None => println!("No frame available"),
183/// }
184///
185/// Ok(())
186/// }
187/// # }
188/// ```
189pub struct AsyncReceiverGeneric<R: SpawnBlocking> {
190 inner: Arc<Receiver>,
191 _runtime: PhantomData<R>,
192}
193
194fn validated_timeout_start(timeout: Duration) -> Result<Instant> {
195 to_ms_checked(timeout)?;
196 Ok(Instant::now())
197}
198
199fn remaining_timeout(timeout: Duration, start_time: Instant) -> Duration {
200 timeout.saturating_sub(start_time.elapsed())
201}
202
203impl<R: SpawnBlocking> AsyncReceiverGeneric<R> {
204 /// Create a new async receiver wrapper.
205 ///
206 /// The receiver is wrapped in an [`Arc`] to allow sharing across async tasks.
207 pub fn new(receiver: Receiver) -> Self {
208 Self {
209 inner: Arc::new(receiver),
210 _runtime: PhantomData,
211 }
212 }
213
214 /// Capture **video** frames without blocking the async runtime.
215 ///
216 /// Returns an [`AsyncCapture`] view; see [`AsyncCapture`] for its verbs
217 /// ([`capture`](AsyncCapture::capture),
218 /// [`try_capture`](AsyncCapture::try_capture)). Mirrors
219 /// [`Receiver::video`], running each capture on the blocking pool.
220 ///
221 /// # Example
222 ///
223 /// ```no_run
224 /// # #[cfg(feature = "tokio")]
225 /// # {
226 /// # use grafton_ndi::{NDI, ReceiverOptions, tokio::AsyncReceiver};
227 /// # use std::time::Duration;
228 /// # #[tokio::main]
229 /// # async fn main() -> Result<(), grafton_ndi::Error> {
230 /// # let ndi = NDI::new()?;
231 /// # let source = grafton_ndi::Source {
232 /// # name: "Test".into(),
233 /// # address: grafton_ndi::SourceAddress::None
234 /// # };
235 /// # let options = ReceiverOptions::builder(source).build();
236 /// # let receiver = grafton_ndi::Receiver::new(&ndi, &options)?;
237 /// let async_receiver = AsyncReceiver::new(receiver);
238 /// let frame = async_receiver.video().capture(Duration::from_secs(5)).await?;
239 /// println!("Captured {}x{} frame", frame.width(), frame.height());
240 /// # Ok(())
241 /// # }
242 /// # }
243 /// ```
244 #[must_use = "the AsyncCapture view does nothing until a capture verb is awaited"]
245 pub fn video(&self) -> AsyncCapture<'_, R, VideoKind> {
246 AsyncCapture::new(self)
247 }
248
249 /// Capture **audio** frames without blocking the async runtime.
250 ///
251 /// Returns an [`AsyncCapture`] view; see [`video`](Self::video) for usage.
252 /// Mirrors [`Receiver::audio`].
253 #[must_use = "the AsyncCapture view does nothing until a capture verb is awaited"]
254 pub fn audio(&self) -> AsyncCapture<'_, R, AudioKind> {
255 AsyncCapture::new(self)
256 }
257
258 /// Capture **metadata** frames without blocking the async runtime.
259 ///
260 /// Returns an [`AsyncCapture`] view; see [`video`](Self::video) for usage.
261 /// Mirrors [`Receiver::metadata`].
262 #[must_use = "the AsyncCapture view does nothing until a capture verb is awaited"]
263 pub fn metadata(&self) -> AsyncCapture<'_, R, MetadataKind> {
264 AsyncCapture::new(self)
265 }
266
267 /// Whether the underlying receiver currently has at least one active
268 /// connection to its source. See [`Receiver::is_connected`].
269 ///
270 /// A cheap, non-blocking SDK query, so it runs inline rather than on
271 /// the blocking pool.
272 pub fn is_connected(&self) -> bool {
273 self.inner.is_connected()
274 }
275
276 /// Connection and frame-throughput statistics for the underlying
277 /// receiver. See [`Receiver::connection_stats`].
278 ///
279 /// A cheap, non-blocking SDK query, so it runs inline rather than on
280 /// the blocking pool. `connection_stats().video_frames_received` is
281 /// the canonical liveness signal: it advances as the receiver pulls
282 /// frames off the network, independent of how often the caller
283 /// captures, so a frozen counter means the feed itself has stalled.
284 pub fn connection_stats(&self) -> ConnectionStats {
285 self.inner.connection_stats()
286 }
287
288 /// Re-establish the underlying receiver's connection to its source
289 /// in place. See [`Receiver::reconnect`].
290 ///
291 /// Unlike the liveness probes above, a reconnect takes the receiver's
292 /// capture guard exclusively, so it can block until in-flight captures
293 /// (which run on the blocking pool) drain. It therefore runs on the
294 /// blocking pool too, never on the async runtime's threads, and is safe to
295 /// call while captures on this receiver are in flight — they serialize
296 /// instead of racing. Confirm recovery via [`Self::connection_stats`].
297 pub async fn reconnect(&self) -> Result<()> {
298 let receiver = Arc::clone(&self.inner);
299 R::spawn_blocking(move || receiver.reconnect()).await?
300 }
301}
302
303/// A typed view over an async receiver for capturing frames of one kind,
304/// without blocking the async runtime.
305///
306/// Created by [`AsyncReceiverGeneric::video`], [`audio`](AsyncReceiverGeneric::audio),
307/// and [`metadata`](AsyncReceiverGeneric::metadata). Each verb runs the
308/// underlying synchronous capture on the runtime's blocking pool:
309///
310/// - [`capture`](Self::capture) — reliable owned capture with built-in retry.
311/// The timeout budget starts when the future is created; `spawn_blocking`
312/// queue delay is subtracted before the SDK begins waiting.
313/// - [`try_capture`](Self::try_capture) — a single owned poll; `Ok(None)` when
314/// no frame is ready.
315///
316/// There is no zero-copy `try_capture_ref` here: a borrowed frame is tied to
317/// the receiver and cannot cross the `spawn_blocking` boundary. Use
318/// [`Receiver::video`]'s [`Capture::try_capture_ref`](crate::Capture::try_capture_ref)
319/// on the synchronous receiver for in-place processing.
320pub struct AsyncCapture<'rx, R: SpawnBlocking, K: CaptureKind> {
321 recv: &'rx AsyncReceiverGeneric<R>,
322 _kind: PhantomData<K>,
323}
324
325impl<'rx, R: SpawnBlocking, K: CaptureKind> AsyncCapture<'rx, R, K>
326where
327 K::Owned: Send + 'static,
328{
329 fn new(recv: &'rx AsyncReceiverGeneric<R>) -> Self {
330 Self {
331 recv,
332 _kind: PhantomData,
333 }
334 }
335
336 /// Async version of [`Capture::capture`](crate::Capture::capture):
337 /// reliable owned capture that retries across the SDK's initial-sync
338 /// warm-up, run on the blocking pool.
339 ///
340 /// # Arguments
341 ///
342 /// * `timeout` - Total budget to wait for a frame, starting when this async
343 /// method is called. Blocking task queue delay is subtracted before the
344 /// synchronous receiver starts waiting. Must not exceed
345 /// [`crate::MAX_TIMEOUT`] (~49.7 days).
346 ///
347 /// # Returns
348 ///
349 /// * `Ok(frame)` - Successfully captured a frame
350 /// * `Err(Error::FrameTimeout)` - No frame received within timeout (includes retry details)
351 /// * `Err(Error::SpawnFailed)` - The blocking task panicked or was cancelled
352 /// * `Err(_)` - Another error occurred during capture
353 pub async fn capture(&self, timeout: Duration) -> Result<K::Owned> {
354 let start_time = validated_timeout_start(timeout)?;
355 let receiver = Arc::clone(&self.recv.inner);
356 R::spawn_blocking(move || {
357 receiver.capture_kind::<K>(remaining_timeout(timeout, start_time))
358 })
359 .await?
360 }
361
362 /// Async version of
363 /// [`Capture::try_capture`](crate::Capture::try_capture): a single owned
364 /// poll, run on the blocking pool.
365 ///
366 /// # Arguments
367 ///
368 /// * `timeout` - Maximum time to wait for a frame. Must not exceed
369 /// [`crate::MAX_TIMEOUT`] (~49.7 days).
370 ///
371 /// # Returns
372 ///
373 /// * `Ok(Some(frame))` - Successfully captured a frame
374 /// * `Ok(None)` - No frame available within timeout
375 /// * `Err(Error::SpawnFailed)` - The blocking task panicked or was cancelled
376 /// * `Err(_)` - An error occurred during capture
377 pub async fn try_capture(&self, timeout: Duration) -> Result<Option<K::Owned>> {
378 let receiver = Arc::clone(&self.recv.inner);
379 R::spawn_blocking(move || receiver.try_capture_kind::<K>(timeout)).await?
380 }
381}
382
383impl<R: SpawnBlocking> Clone for AsyncReceiverGeneric<R> {
384 fn clone(&self) -> Self {
385 Self {
386 inner: Arc::clone(&self.inner),
387 _runtime: PhantomData,
388 }
389 }
390}
391
392// Backward-compatible module re-exports
393
394#[cfg(feature = "tokio")]
395pub mod tokio {
396 //! Tokio async runtime integration.
397 //!
398 //! Provides [`AsyncReceiver`] wrapper that uses `tokio::task::spawn_blocking`
399 //! to run NDI operations without blocking the Tokio runtime.
400 //!
401 //! # Example
402 //!
403 //! ```no_run
404 //! # #[cfg(feature = "tokio")]
405 //! # {
406 //! use grafton_ndi::{NDI, ReceiverOptionsBuilder, tokio::AsyncReceiver};
407 //!
408 //! #[tokio::main]
409 //! async fn main() -> Result<(), grafton_ndi::Error> {
410 //! let ndi = NDI::new()?;
411 //! // ... obtain source ...
412 //! # let source = grafton_ndi::Source {
413 //! # name: "Test".into(),
414 //! # address: grafton_ndi::SourceAddress::None
415 //! # };
416 //!
417 //! let options = ReceiverOptionsBuilder::snapshot_preset(source).build();
418 //! let receiver = grafton_ndi::Receiver::new(&ndi, &options)?;
419 //! let async_receiver = AsyncReceiver::new(receiver);
420 //!
421 //! // Non-blocking async capture
422 //! match async_receiver.video().try_capture(std::time::Duration::from_millis(100)).await? {
423 //! Some(frame) => println!("Got frame: {}x{}", frame.width(), frame.height()),
424 //! None => println!("No frame available"),
425 //! }
426 //!
427 //! Ok(())
428 //! }
429 //! # }
430 //! ```
431
432 use super::{AsyncReceiverGeneric, TokioRuntime};
433
434 /// Async receiver wrapper for Tokio runtime.
435 ///
436 /// This is a type alias for the generic async receiver parameterized with
437 /// the Tokio runtime. It provides async versions of the [`crate::Receiver`]
438 /// methods by running blocking NDI operations on Tokio's blocking thread
439 /// pool using `spawn_blocking`.
440 ///
441 /// # Thread Safety
442 ///
443 /// The underlying `Receiver` is wrapped in an `Arc` to allow sharing across
444 /// async tasks and safe cloning. The NDI SDK receiver is inherently thread-safe.
445 ///
446 /// # Error Handling
447 ///
448 /// All methods return [`crate::Result`], converting any task panic or cancellation
449 /// into [`crate::Error::SpawnFailed`] rather than propagating the panic.
450 pub type AsyncReceiver = AsyncReceiverGeneric<TokioRuntime>;
451}
452
453#[cfg(feature = "async-std")]
454pub mod async_std {
455 //! async-std runtime integration.
456 //!
457 //! Provides [`AsyncReceiver`] wrapper that uses `async_std::task::spawn_blocking`
458 //! to run NDI operations without blocking the async-std runtime.
459 //!
460 //! # Example
461 //!
462 //! ```no_run
463 //! # #[cfg(feature = "async-std")]
464 //! # {
465 //! use grafton_ndi::{NDI, ReceiverOptionsBuilder, async_std::AsyncReceiver};
466 //!
467 //! #[async_std::main]
468 //! async fn main() -> Result<(), grafton_ndi::Error> {
469 //! let ndi = NDI::new()?;
470 //! // ... obtain source ...
471 //! # let source = grafton_ndi::Source {
472 //! # name: "Test".into(),
473 //! # address: grafton_ndi::SourceAddress::None
474 //! # };
475 //!
476 //! let options = ReceiverOptionsBuilder::snapshot_preset(source).build();
477 //! let receiver = grafton_ndi::Receiver::new(&ndi, &options)?;
478 //! let async_receiver = AsyncReceiver::new(receiver);
479 //!
480 //! // Non-blocking async capture
481 //! match async_receiver.video().try_capture(std::time::Duration::from_millis(100)).await? {
482 //! Some(frame) => println!("Got frame: {}x{}", frame.width(), frame.height()),
483 //! None => println!("No frame available"),
484 //! }
485 //!
486 //! Ok(())
487 //! }
488 //! # }
489 //! ```
490
491 use super::{AsyncReceiverGeneric, AsyncStdRuntime};
492
493 /// Async receiver wrapper for async-std runtime.
494 ///
495 /// This is a type alias for the generic async receiver parameterized with
496 /// the async-std runtime. It provides async versions of the [`crate::Receiver`]
497 /// methods by running blocking NDI operations on async-std's blocking thread
498 /// pool using `spawn_blocking`.
499 ///
500 /// # Thread Safety
501 ///
502 /// The underlying `Receiver` is wrapped in an `Arc` to allow sharing across
503 /// async tasks and safe cloning. The NDI SDK receiver is inherently thread-safe.
504 ///
505 /// # Error Handling
506 ///
507 /// All methods return [`crate::Result`]. Note that async-std's `spawn_blocking`
508 /// does not return a `Result`, so spawn failures from this runtime are less
509 /// common than with Tokio.
510 pub type AsyncReceiver = AsyncReceiverGeneric<AsyncStdRuntime>;
511}