Real-Time Processing#

The realtime module provides real-time audio I/O, ring buffering, and stream processing capabilities.

Overview#

The real-time processing system consists of:

  • Audio backends - Abstract interface for audio I/O with a PortAudio implementation via sounddevice

  • Ring buffer - Lock-free SPSC buffer for real-time tensor transfer

  • Real-time processor - Orchestrator connecting backend to effect chain

  • Stream processor - Chunk-based file processing for large files

Quick Start#

Both processors support the context manager protocol for safe resource management.

Real-time processing (requires sounddevice):

from torchfx.realtime import RealtimeProcessor, SoundDeviceBackend, StreamConfig
from torchfx.effect import Gain

config = StreamConfig(
    sample_rate=48000,
    buffer_size=512,
    channels_in=2,
    channels_out=2,
)
with RealtimeProcessor(
    effects=[Gain(0.5)],
    backend=SoundDeviceBackend(),
    config=config,
) as processor:
    input("Press Enter to stop...")
# Stream is automatically stopped on exit

Stream processing (no extra dependencies):

from torchfx.realtime import StreamProcessor
from torchfx.effect import Gain

with StreamProcessor(effects=[Gain(0.5)], chunk_size=65536) as processor:
    processor.process_file("large_input.wav", "output.wav")

Configuration#

class torchfx.realtime.StreamConfig(sample_rate=48000, buffer_size=512, channels_in=0, channels_out=2, dtype='float32', device_in=None, device_out=None, latency='low')[source]#

Configuration for an audio stream.

Parameters:
  • sample_rate (int) – Sample rate in Hz (e.g., 44100, 48000). Default is 48000.

  • buffer_size (int) – Number of samples per buffer/callback frame. Default is 512.

  • channels_in (int) – Number of input channels (0 for output-only). Default is 0.

  • channels_out (int) – Number of output channels (0 for input-only). Default is 2.

  • dtype (str) – Sample format string. Default is "float32".

  • device_in (int | str | None) – Input device identifier. None for system default.

  • device_out (int | str | None) – Output device identifier. None for system default.

  • latency (str | float) – Latency hint: "low", "high", or seconds as float. Default is "low".

Examples

>>> config = StreamConfig(sample_rate=44100, buffer_size=256)
>>> config.direction
<StreamDirection.OUTPUT: 'output'>
>>> config.latency_ms
5.804988662131519
property direction: StreamDirection#

Infer stream direction from channel counts.

Returns:

DUPLEX if both in and out channels > 0, INPUT if only in > 0, OUTPUT otherwise.

Return type:

StreamDirection

property latency_ms: float#

Calculate theoretical minimum latency in milliseconds.

Returns:

Latency based on buffer_size / sample_rate.

Return type:

float

class torchfx.realtime.StreamDirection(value)[source]#

Audio stream direction.

class torchfx.realtime.StreamState(value)[source]#

Audio stream lifecycle state.

Audio Backend#

class torchfx.realtime.AudioBackend[source]#

Abstract base class for audio I/O backends.

All audio backends must implement this interface. The backend manages the lifecycle of audio streams and routes audio data through callbacks.

Subclasses should implement all abstract methods to provide a complete audio I/O solution.

Examples

Implementing a custom backend:

>>> class MyBackend(AudioBackend):
...     # Implement all abstract methods
...     pass
abstract close()[source]#

Close the audio stream and release resources.

Return type:

None

abstract get_default_device(direction)[source]#

Get the default device for a given direction.

Parameters:

direction (StreamDirection) – The stream direction to query.

Returns:

Device identifier.

Return type:

int | str

abstract get_devices()[source]#

List available audio devices.

Returns:

List of device info dicts with keys: "name", "index", "max_input_channels", "max_output_channels", "default_sample_rate".

Return type:

list[dict[str, Any]]

abstract open_stream(config, callback=None)[source]#

Open an audio stream with the given configuration.

Parameters:
  • config (StreamConfig) – Stream configuration.

  • callback (AudioCallback | None) – Callback for real-time processing. If None, use blocking API.

Return type:

None

abstract read(num_frames)[source]#

Read audio frames (blocking API).

Parameters:

num_frames (int) – Number of frames to read.

Returns:

Shape (channels_in, num_frames).

Return type:

Tensor

abstract start()[source]#

Start the audio stream.

Return type:

None

abstract stop()[source]#

Stop the audio stream.

Return type:

None

abstract write(data)[source]#

Write audio frames (blocking API).

Parameters:

data (Tensor) – Shape (channels_out, num_frames).

Return type:

None

abstract property is_available: bool#

Whether this backend’s dependencies are available.

abstract property name: str#

Human-readable name of the backend.

abstract property state: StreamState#

Current stream state.

class torchfx.realtime.sounddevice_backend.SoundDeviceBackend[source]#

Audio backend using sounddevice (PortAudio).

Provides real-time audio I/O through PortAudio with support for input, output, and duplex streams. Supports both callback-based and blocking APIs.

The sounddevice module is imported lazily on instantiation.

Raises:

BackendNotAvailableError – If sounddevice is not installed.

Examples

>>> backend = SoundDeviceBackend()  
>>> devices = backend.get_devices()  
close()[source]#

Close the audio stream and release resources.

Return type:

None

get_default_device(direction)[source]#

Get the default device for a given direction.

Parameters:

direction (StreamDirection) – The stream direction.

Returns:

Default device index or name.

Return type:

int | str

get_devices()[source]#

List available audio devices.

Returns:

Device information dictionaries.

Return type:

list[dict[str, Any]]

open_stream(config, callback=None)[source]#

Open an audio stream.

Parameters:
  • config (StreamConfig) – Stream configuration.

  • callback (AudioCallback | None) – Audio processing callback. If None, blocking API is used.

Raises:

StreamError – If stream is already open or opening fails.

Return type:

None

read(num_frames)[source]#

Read audio frames using blocking API.

Parameters:

num_frames (int) – Number of frames to read.

Returns:

Shape (channels_in, num_frames).

Return type:

Tensor

Raises:

StreamError – If stream is not configured for input.

start()[source]#

Start the audio stream.

Raises:

StreamError – If stream is not open.

Return type:

None

stop()[source]#

Stop the audio stream.

Raises:

StreamError – If stream is not running.

Return type:

None

write(data)[source]#

Write audio frames using blocking API.

Parameters:

data (Tensor) – Shape (channels_out, num_frames).

Raises:

StreamError – If stream is not configured for output.

Return type:

None

property is_available: bool#

Return True since instantiation succeeded.

property name: str#

Return backend name.

property state: StreamState#

Current stream state.

Ring Buffer#

class torchfx.realtime.TensorRingBuffer(capacity, channels=1, dtype=torch.float32, device='cpu')[source]#

Lock-free SPSC ring buffer for real-time audio tensor transfer.

Uses separate read/write indices with a pre-allocated tensor backing store. In the SPSC model, only the producer writes _write_idx and only the consumer writes _read_idx, so no locks are needed.

The capacity must be a power of 2 for efficient modular arithmetic (bitwise AND instead of modulo). If a non-power-of-2 value is provided, it is rounded up to the next power of 2.

Parameters:
  • capacity (int) – Total capacity in samples per channel. Rounded up to next power of 2.

  • channels (int) – Number of audio channels. Default is 1.

  • dtype (torch.dtype) – Data type for the buffer tensor. Default is torch.float32.

  • device (str) – Device for the buffer tensor. Default is "cpu".

buffer#

Pre-allocated backing tensor of shape (channels, capacity).

Type:

Tensor

Examples

>>> buf = TensorRingBuffer(capacity=512, channels=2)
>>> buf.capacity
512
>>> buf.available_read
0
>>> buf.available_write
512
advance_read(num_samples)[source]#

Advance the read pointer without reading data.

Use after peek() to advance by the hop size in overlap-add processing.

Parameters:

num_samples (int) – Number of samples to advance.

Raises:

BufferUnderrunError – If advancing would go past the write pointer.

Return type:

None

clear()[source]#

Reset the buffer to empty state.

Examples

>>> buf = TensorRingBuffer(capacity=256, channels=1)
>>> _ = buf.write(torch.ones(1, 100))
>>> buf.clear()
>>> buf.available_read
0
Return type:

None

peek(num_samples)[source]#

Read samples without advancing the read pointer.

Useful for overlap-add processing where the consumer needs to read overlapping frames.

Parameters:

num_samples (int) – Number of samples to peek.

Returns:

Audio data of shape (channels, num_samples).

Return type:

Tensor

Raises:

BufferUnderrunError – If fewer samples are available than requested.

read(num_samples)[source]#

Read samples from the buffer.

Parameters:

num_samples (int) – Number of samples to read.

Returns:

Audio data of shape (channels, num_samples).

Return type:

Tensor

Raises:

BufferUnderrunError – If fewer samples are available than requested.

Examples

>>> buf = TensorRingBuffer(capacity=256, channels=1)
>>> _ = buf.write(torch.ones(1, 100))
>>> output = buf.read(50)
>>> output.shape
torch.Size([1, 50])
>>> buf.available_read
50
write(data)[source]#

Write samples into the buffer.

Parameters:

data (Tensor) – Audio data of shape (channels, samples) or (samples,) for single-channel buffers.

Returns:

Number of samples actually written. May be less than requested if the buffer is near full.

Return type:

int

Examples

>>> buf = TensorRingBuffer(capacity=256, channels=1)
>>> data = torch.ones(1, 100)
>>> buf.write(data)
100
>>> buf.available_read
100
property available_read: int#

Number of samples available for reading.

property available_write: int#

Number of samples available for writing.

property capacity: int#

Total buffer capacity in samples per channel.

property channels: int#

Number of audio channels.

Processors#

Real-Time Processor#

class torchfx.realtime.RealtimeProcessor(effects, backend, config, ring_blocks=4, latency_log_size=65536, prime_output=True, start_worker=True)[source]#

Real-time audio processor connecting an audio backend to an effect chain.

Parameters:
  • effects (Sequence[FX] | nn.Sequential) – Chain of effects to apply in order. Each effect must preserve chunk length; effects that change the time dimension (e.g., Delay with feedback that grows the buffer) are rejected at runtime.

  • backend (AudioBackend) – Audio backend driving the I/O thread.

  • config (StreamConfig) – Stream configuration. config.buffer_size defines the DSP chunk size.

  • ring_blocks (int) – Capacity of each ring buffer in units of buffer_size. Default is 4 (≈40 ms tolerance at 512-sample / 48 kHz).

  • latency_log_size (int) – Maximum number of per-callback latency samples retained. Default is 65 536 (≈ 11 min at 512-sample / 48 kHz). Older entries are evicted FIFO.

  • prime_output (bool) – If True (default), write one chunk of silence into the output ring at start so the first callback never underflows.

  • start_worker (bool) – If True (default), start() spawns a dedicated DSP worker thread that drains the input ring continuously. If False, the caller is responsible for calling process_pending() after each callback round-trip. The latter mode is intended for deterministic testing and for embedding the processor inside a host-driven scheduler.

Examples

>>> from torchfx.realtime import RealtimeProcessor, StreamConfig
>>> from torchfx.effect import Gain
>>> config = StreamConfig(sample_rate=48000, buffer_size=512,
...                       channels_in=1, channels_out=1)
latency_log_ns()[source]#

Snapshot of per-callback wall-clock durations in nanoseconds.

The returned list is a copy; safe to consume from any thread.

Examples

>>> # samples = processor.latency_log_ns()
>>> # p99 = sorted(samples)[int(len(samples) * 0.99)] / 1e6  # ms
Return type:

list[int]

latency_stats_ms()[source]#

Summary statistics of per-callback latency in milliseconds.

Returns:

Keys count, min, median, mean, p95, p99, max. Empty log returns zeros except count.

Return type:

dict

Examples

>>> # stats = processor.latency_stats_ms()
>>> # print(stats["p99"], stats["count"])
process_pending()[source]#

Drive one DSP draining pass synchronously.

Intended for tests and host-scheduled deployments using start_worker=False. Reads as many buffer_size-aligned chunks as are available in the input ring, runs the effect chain on each, and writes results to the output ring.

Returns:

Number of chunks processed.

Return type:

int

reset_metrics()[source]#

Reset latency log and xrun counters.

Useful between benchmark phases to discard startup transients.

Return type:

None

reset_state()[source]#

Reset internal state: ring buffers and stateful effects.

Useful after seeking in a file or switching audio sources. Safe to call while running, but typically called between sessions.

Return type:

None

set_parameter(name, value)[source]#

Thread-safe parameter update.

Parameters are staged in a pending dict and applied at the next DSP worker iteration boundary, not in the audio callback. This keeps the callback allocation-free.

Parameters:
  • name (str) – Dot-separated parameter path, e.g., "0.cutoff" for effect index 0, attribute cutoff.

  • value (Any) – New parameter value.

Return type:

None

Examples

>>> # processor.set_parameter("0.cutoff", 2000)
>>> # processor.set_parameter("1.gain", 0.8)
start()[source]#

Start real-time processing.

Spawns the DSP worker thread, primes the output ring, and starts the audio backend. The audio callback is registered with the backend; once running, sample movement between the backend’s buffers and the ring buffers happens on every callback while the effect chain runs in the worker thread.

Raises:

RealtimeError – If the processor is already running.

Return type:

None

stop()[source]#

Stop real-time processing and close the stream.

Stops the audio backend first (so no more callbacks fire) and then signals the worker thread to exit. Any exception captured from the worker thread is re-raised here so callers see DSP errors at a known point in their code.

Raises:

RealtimeError – If the processor is not running.

Return type:

None

property backend_xrun_count: int#

Xruns reported by the audio backend itself (e.g. PortAudio paInputOverflow).

property callback_count: int#

Number of audio callbacks serviced since start (or last reset_metrics).

property config: StreamConfig#

The stream configuration.

property deadline_ms: float#

Per-callback deadline in milliseconds (buffer_size / sample_rate).

property effects: list[FX]#

The current effect chain.

property input_overflow_count: int#

Times the audio callback couldn’t push all input samples into the ring.

property is_running: bool#

Whether the processor is currently running.

property latency_ms: float#

Estimated total latency in milliseconds.

Computed as (buffer_size + 1 block of output priming) / sample_rate. With output priming enabled this is two buffers of latency; without priming it is one.

property output_underflow_count: int#

Times the audio callback found the output ring empty.

property xrun_count: int#

Total number of xruns seen since start (or last reset_metrics).

Includes input ring overflows, output ring underflows, and backend-reported xruns. Use the more granular properties below to attribute by source.

Stream Processor#

class torchfx.realtime.StreamProcessor(effects, chunk_size=65536, overlap=0, device='cpu')[source]#

Process audio files in chunks without loading the entire file.

Reads audio in configurable chunk sizes, applies an effect chain to each chunk, and writes output progressively. Supports an overlap parameter for effects that need context beyond chunk boundaries.

Parameters:
  • effects (Sequence[FX] | nn.Sequential) – Chain of effects to apply in order.

  • chunk_size (int) – Number of samples per processing chunk. Default is 65536.

  • overlap (int) – Number of overlap samples between chunks. Default is 0.

  • device (str) – Processing device ("cpu" or "cuda"). Default is "cpu".

Examples

>>> from torchfx.realtime import StreamProcessor
>>> from torchfx.effect import Gain
>>> processor = StreamProcessor(effects=[Gain(0.5)])
process_chunks(input_path)[source]#

Yield processed chunks as tensors.

Generator API for streaming to another process, network, or real-time playback.

Parameters:

input_path (str | Path) – Path to the input audio file.

Yields:

Tensor – Processed audio chunks of shape (channels, chunk_size).

Return type:

Generator[Tensor, None, None]

Examples

>>> from torchfx.realtime import StreamProcessor
>>> from torchfx.effect import Gain
>>> processor = StreamProcessor(effects=[Gain(0.5)])
>>> # for chunk in processor.process_chunks("input.wav"):
>>> #     print(chunk.shape)
process_file(input_path, output_path, format=None, subtype=None)[source]#

Process an audio file chunk by chunk.

Reads the input file in chunks, applies the effect chain to each chunk, and writes the result to the output file.

Parameters:
  • input_path (str | Path) – Path to the input audio file.

  • output_path (str | Path) – Path to the output audio file.

  • format (str | None) – Output format (e.g., "WAV", "FLAC"). Inferred from extension if None.

  • subtype (str | None) – Output subtype (e.g., "PCM_16", "FLOAT"). Uses default for format if None.

Return type:

None

Examples

>>> from torchfx.realtime import StreamProcessor
>>> from torchfx.effect import Gain
>>> processor = StreamProcessor(effects=[Gain(0.5)])
>>> # processor.process_file("input.wav", "output.wav")
reset_state()[source]#

Reset stateful effects in the chain.

This clears filter/effect history so a new file starts without residual state from previous processing.

Return type:

None

property chunk_size: int#

Processing chunk size in samples.

property effects: list[FX]#

The current effect chain.

property overlap: int#

Overlap between chunks in samples.

CUDA Graph Runner#

Captures a fixed-shape GPU filter forward into a torch.cuda.CUDAGraph and replays it per chunk, collapsing the per-section kernel launches of an SOS cascade into a single graph launch. The win is largest in the short-chunk / realtime regime (up to ~4× lower per-chunk latency on an RTX 3070). Streaming DF1 state is carried across replays.

import torch
from torchfx.filter import HiButterworth, LoButterworth
from torchfx.filter.fused import FusedSOSCascade
from torchfx.realtime import CudaGraphRunner

chain = FusedSOSCascade(
    HiButterworth(80, order=2, fs=48000),
    LoButterworth(8000, order=4, fs=48000),
)
runner = CudaGraphRunner(chain, torch.randn(2, 512, device="cuda"))
for chunk in stream:                 # each chunk is [2, 512] on CUDA
    y = runner.run(chunk).clone()
class torchfx.realtime.CudaGraphRunner(module, example, warmup=3)[source]#

Capture a fixed-shape GPU forward into a CUDA graph and replay it.

Parameters:
  • module (nn.Module) – A GPU filter/cascade whose forward updates its state in place (the native SOS path does). The captured graph reuses the module’s state buffers, so streaming continuity is preserved across run() calls.

  • example (Tensor) – A representative input chunk on CUDA. Its shape, dtype, and device fix the graph; every run() input must match.

  • warmup (int, default 3) – Eager iterations on a side stream before capture, so coefficient caches and scratch allocations are established (required for stable capture).

Notes

The captured filter must have static coefficients — the SOS taps are baked into the graph at capture time. Call reset_state() to restart streaming from a zero state (e.g. between files). The returned tensor from run() is the module’s shared output buffer; clone it before the next run().

reset_state()[source]#

Zero the captured streaming state in place to restart a new stream.

Does not call module.reset_state() — that may drop coefficients or rebind the state buffers the graph was captured against; instead the existing state buffers are zeroed in place.

Caveat: graph continuation is exact, but the very first chunk after a reset can show a small initial transient versus a never-run eager filter, because the capture warmup is baked into the recorded kernels. For exact fresh-stream behaviour, warm up the runner with input representative of the stream’s start (e.g. silence). The transient decays within a few samples.

Return type:

None

run(chunk)[source]#

Process one fixed-shape chunk via graph replay.

Returns the module’s shared output buffer (clone it before the next call).

Parameters:

chunk (Tensor)

Return type:

Tensor

Exceptions#

class torchfx.realtime.RealtimeError(message, parameter_name=None, actual_value=None, suggestion=None)[source]#

Base exception for all real-time processing errors.

Parameters:
  • message (str) – Human-readable error message.

  • suggestion (str | None, optional) – A suggestion for fixing the error.

  • parameter_name (str | None)

  • actual_value (Any | None)

Examples

>>> raise RealtimeError("Processing failed")
Traceback (most recent call last):
    ...
torchfx.realtime.exceptions.RealtimeError: Processing failed
class torchfx.realtime.BackendNotAvailableError(backend_name, suggestion=None)[source]#

Raised when a requested audio backend is not installed or available.

Parameters:
  • backend_name (str) – Name of the backend that is not available.

  • suggestion (str | None, optional) – Installation instructions or alternative.

Examples

>>> raise BackendNotAvailableError("sounddevice")
Traceback (most recent call last):
    ...
torchfx.realtime.exceptions.BackendNotAvailableError: ...
class torchfx.realtime.StreamError(message, parameter_name=None, actual_value=None, suggestion=None)[source]#

Raised for audio stream lifecycle errors.

Parameters:
  • message (str) – Human-readable error message.

  • suggestion (str | None, optional) – A suggestion for fixing the error.

  • parameter_name (str | None)

  • actual_value (Any | None)

Examples

>>> raise StreamError("Failed to open audio stream")
Traceback (most recent call last):
    ...
torchfx.realtime.exceptions.StreamError: Failed to open audio stream
class torchfx.realtime.BufferOverrunError(message, parameter_name=None, actual_value=None, suggestion=None)[source]#

Raised when the ring buffer overflows.

This occurs when the producer writes data faster than the consumer reads it, causing data loss.

Parameters:
  • message (str) – Human-readable error message.

  • suggestion (str | None, optional) – A suggestion for fixing the error.

  • parameter_name (str | None)

  • actual_value (Any | None)

Examples

>>> raise BufferOverrunError("Ring buffer overflow")
Traceback (most recent call last):
    ...
torchfx.realtime.exceptions.BufferOverrunError: Ring buffer overflow
class torchfx.realtime.BufferUnderrunError(message, parameter_name=None, actual_value=None, suggestion=None)[source]#

Raised when the ring buffer underflows.

This occurs when the consumer attempts to read more data than is available, typically due to processing being faster than input.

Parameters:
  • message (str) – Human-readable error message.

  • suggestion (str | None, optional) – A suggestion for fixing the error.

  • parameter_name (str | None)

  • actual_value (Any | None)

Examples

>>> raise BufferUnderrunError("Ring buffer underrun")
Traceback (most recent call last):
    ...
torchfx.realtime.exceptions.BufferUnderrunError: Ring buffer underrun

Usage Examples#

Real-Time Guitar Effect#

from torchfx.realtime import RealtimeProcessor, SoundDeviceBackend, StreamConfig
from torchfx.filter.iir import LoButterworth, HiButterworth
from torchfx.effect import Gain, Reverb

config = StreamConfig(
    sample_rate=48000,
    buffer_size=256,
    channels_in=1,
    channels_out=1,
    latency="low",
)

with RealtimeProcessor(
    effects=[
        HiButterworth(80),        # Remove low rumble
        LoButterworth(8000),       # Tame high frequencies
        Gain(1.5),                 # Boost signal
        Reverb(room_size=0.3),     # Add ambience
    ],
    backend=SoundDeviceBackend(),
    config=config,
) as processor:
    # Tweak parameters in real time
    processor.set_parameter("2.gain", 2.0)  # Increase gain
    input("Press Enter to stop...")

Processing Large Files#

from torchfx.realtime import StreamProcessor
from torchfx.effect import Gain, Normalize

# Process a large file in 64K sample chunks
with StreamProcessor(
    effects=[Gain(0.8), Normalize(peak=0.95)],
    chunk_size=65536,
) as processor:
    processor.process_file("large_podcast.wav", "normalized_podcast.wav")

    # Or use the generator API
    for chunk in processor.process_chunks("large_podcast.wav"):
        print(f"Processed chunk: {chunk.shape}")

Thread-Safe Parameter Updates#

import time
from torchfx.realtime import RealtimeProcessor, SoundDeviceBackend, StreamConfig
from torchfx.filter.iir import LoButterworth

config = StreamConfig(sample_rate=48000, buffer_size=512,
                      channels_in=1, channels_out=1)

with RealtimeProcessor(
    effects=[LoButterworth(1000)],
    backend=SoundDeviceBackend(),
    config=config,
) as processor:
    # Sweep filter cutoff from 1000Hz to 5000Hz
    for cutoff in range(1000, 5001, 100):
        processor.set_parameter("0.cutoff", cutoff)
        time.sleep(0.05)