Real-Time Processing#

The realtime module provides real-time audio I/O, ring buffering, and stream processing capabilities.

Overview#

The real-time processing system consists of:

  • Audio backends - Abstract interface for audio I/O with a PortAudio implementation via sounddevice

  • Ring buffer - Lock-free SPSC buffer for real-time tensor transfer

  • Real-time processor - Orchestrator connecting backend to effect chain

  • Stream processor - Chunk-based file processing for large files

Quick Start#

Both processors support the context manager protocol for safe resource management.

Real-time processing (requires sounddevice):

from torchfx.realtime import RealtimeProcessor, SoundDeviceBackend, StreamConfig
from torchfx.effect import Gain

config = StreamConfig(
    sample_rate=48000,
    buffer_size=512,
    channels_in=2,
    channels_out=2,
)
with RealtimeProcessor(
    effects=[Gain(0.5)],
    backend=SoundDeviceBackend(),
    config=config,
) as processor:
    input("Press Enter to stop...")
# Stream is automatically stopped on exit

Stream processing (no extra dependencies):

from torchfx.realtime import StreamProcessor
from torchfx.effect import Gain

with StreamProcessor(effects=[Gain(0.5)], chunk_size=65536) as processor:
    processor.process_file("large_input.wav", "output.wav")

Configuration#

class torchfx.realtime.StreamConfig(sample_rate=48000, buffer_size=512, channels_in=0, channels_out=2, dtype='float32', device_in=None, device_out=None, latency='low')[source]#

Configuration for an audio stream.

Parameters:
  • sample_rate (int) – Sample rate in Hz (e.g., 44100, 48000). Default is 48000.

  • buffer_size (int) – Number of samples per buffer/callback frame. Default is 512.

  • channels_in (int) – Number of input channels (0 for output-only). Default is 0.

  • channels_out (int) – Number of output channels (0 for input-only). Default is 2.

  • dtype (str) – Sample format string. Default is "float32".

  • device_in (int | str | None) – Input device identifier. None for system default.

  • device_out (int | str | None) – Output device identifier. None for system default.

  • latency (str | float) – Latency hint: "low", "high", or seconds as float. Default is "low".

Examples

>>> config = StreamConfig(sample_rate=44100, buffer_size=256)
>>> config.direction
<StreamDirection.OUTPUT: 'output'>
>>> config.latency_ms
5.804988662131519
property direction: StreamDirection#

Infer stream direction from channel counts.

Returns:

DUPLEX if both in and out channels > 0, INPUT if only in > 0, OUTPUT otherwise.

Return type:

StreamDirection

property latency_ms: float#

Calculate theoretical minimum latency in milliseconds.

Returns:

Latency based on buffer_size / sample_rate.

Return type:

float

class torchfx.realtime.StreamDirection(value)[source]#

Audio stream direction.

class torchfx.realtime.StreamState(value)[source]#

Audio stream lifecycle state.

Audio Backend#

class torchfx.realtime.AudioBackend[source]#

Abstract base class for audio I/O backends.

All audio backends must implement this interface. The backend manages the lifecycle of audio streams and routes audio data through callbacks.

Subclasses should implement all abstract methods to provide a complete audio I/O solution.

Examples

Implementing a custom backend:

>>> class MyBackend(AudioBackend):
...     # Implement all abstract methods
...     pass
abstract close()[source]#

Close the audio stream and release resources.

Return type:

None

abstract get_default_device(direction)[source]#

Get the default device for a given direction.

Parameters:

direction (StreamDirection) – The stream direction to query.

Returns:

Device identifier.

Return type:

int | str

abstract get_devices()[source]#

List available audio devices.

Returns:

List of device info dicts with keys: "name", "index", "max_input_channels", "max_output_channels", "default_sample_rate".

Return type:

list[dict[str, Any]]

abstract open_stream(config, callback=None)[source]#

Open an audio stream with the given configuration.

Parameters:
  • config (StreamConfig) – Stream configuration.

  • callback (AudioCallback | None) – Callback for real-time processing. If None, use blocking API.

Return type:

None

abstract read(num_frames)[source]#

Read audio frames (blocking API).

Parameters:

num_frames (int) – Number of frames to read.

Returns:

Shape (channels_in, num_frames).

Return type:

Tensor

abstract start()[source]#

Start the audio stream.

Return type:

None

abstract stop()[source]#

Stop the audio stream.

Return type:

None

abstract write(data)[source]#

Write audio frames (blocking API).

Parameters:

data (Tensor) – Shape (channels_out, num_frames).

Return type:

None

abstract property is_available: bool#

Whether this backend’s dependencies are available.

abstract property name: str#

Human-readable name of the backend.

abstract property state: StreamState#

Current stream state.

class torchfx.realtime.sounddevice_backend.SoundDeviceBackend[source]#

Audio backend using sounddevice (PortAudio).

Provides real-time audio I/O through PortAudio with support for input, output, and duplex streams. Supports both callback-based and blocking APIs.

The sounddevice module is imported lazily on instantiation.

Raises:

BackendNotAvailableError – If sounddevice is not installed.

Examples

>>> backend = SoundDeviceBackend()  
>>> devices = backend.get_devices()  
close()[source]#

Close the audio stream and release resources.

Return type:

None

get_default_device(direction)[source]#

Get the default device for a given direction.

Parameters:

direction (StreamDirection) – The stream direction.

Returns:

Default device index or name.

Return type:

int | str

get_devices()[source]#

List available audio devices.

Returns:

Device information dictionaries.

Return type:

list[dict[str, Any]]

open_stream(config, callback=None)[source]#

Open an audio stream.

Parameters:
  • config (StreamConfig) – Stream configuration.

  • callback (AudioCallback | None) – Audio processing callback. If None, blocking API is used.

Raises:

StreamError – If stream is already open or opening fails.

Return type:

None

read(num_frames)[source]#

Read audio frames using blocking API.

Parameters:

num_frames (int) – Number of frames to read.

Returns:

Shape (channels_in, num_frames).

Return type:

Tensor

Raises:

StreamError – If stream is not configured for input.

start()[source]#

Start the audio stream.

Raises:

StreamError – If stream is not open.

Return type:

None

stop()[source]#

Stop the audio stream.

Raises:

StreamError – If stream is not running.

Return type:

None

write(data)[source]#

Write audio frames using blocking API.

Parameters:

data (Tensor) – Shape (channels_out, num_frames).

Raises:

StreamError – If stream is not configured for output.

Return type:

None

property is_available: bool#

Return True since instantiation succeeded.

property name: str#

Return backend name.

property state: StreamState#

Current stream state.

Ring Buffer#

class torchfx.realtime.TensorRingBuffer(capacity, channels=1, dtype=torch.float32, device='cpu')[source]#

Lock-free SPSC ring buffer for real-time audio tensor transfer.

Uses separate read/write indices with a pre-allocated tensor backing store. In the SPSC model, only the producer writes _write_idx and only the consumer writes _read_idx, so no locks are needed.

The capacity must be a power of 2 for efficient modular arithmetic (bitwise AND instead of modulo). If a non-power-of-2 value is provided, it is rounded up to the next power of 2.

Parameters:
  • capacity (int) – Total capacity in samples per channel. Rounded up to next power of 2.

  • channels (int) – Number of audio channels. Default is 1.

  • dtype (torch.dtype) – Data type for the buffer tensor. Default is torch.float32.

  • device (str) – Device for the buffer tensor. Default is "cpu".

buffer#

Pre-allocated backing tensor of shape (channels, capacity).

Type:

Tensor

Examples

>>> buf = TensorRingBuffer(capacity=512, channels=2)
>>> buf.capacity
512
>>> buf.available_read
0
>>> buf.available_write
512
advance_read(num_samples)[source]#

Advance the read pointer without reading data.

Use after peek() to advance by the hop size in overlap-add processing.

Parameters:

num_samples (int) – Number of samples to advance.

Raises:

BufferUnderrunError – If advancing would go past the write pointer.

Return type:

None

clear()[source]#

Reset the buffer to empty state.

Examples

>>> buf = TensorRingBuffer(capacity=256, channels=1)
>>> _ = buf.write(torch.ones(1, 100))
>>> buf.clear()
>>> buf.available_read
0
Return type:

None

peek(num_samples)[source]#

Read samples without advancing the read pointer.

Useful for overlap-add processing where the consumer needs to read overlapping frames.

Parameters:

num_samples (int) – Number of samples to peek.

Returns:

Audio data of shape (channels, num_samples).

Return type:

Tensor

Raises:

BufferUnderrunError – If fewer samples are available than requested.

read(num_samples)[source]#

Read samples from the buffer.

Parameters:

num_samples (int) – Number of samples to read.

Returns:

Audio data of shape (channels, num_samples).

Return type:

Tensor

Raises:

BufferUnderrunError – If fewer samples are available than requested.

Examples

>>> buf = TensorRingBuffer(capacity=256, channels=1)
>>> _ = buf.write(torch.ones(1, 100))
>>> output = buf.read(50)
>>> output.shape
torch.Size([1, 50])
>>> buf.available_read
50
write(data)[source]#

Write samples into the buffer.

Parameters:

data (Tensor) – Audio data of shape (channels, samples) or (samples,) for single-channel buffers.

Returns:

Number of samples actually written. May be less than requested if the buffer is near full.

Return type:

int

Examples

>>> buf = TensorRingBuffer(capacity=256, channels=1)
>>> data = torch.ones(1, 100)
>>> buf.write(data)
100
>>> buf.available_read
100
property available_read: int#

Number of samples available for reading.

property available_write: int#

Number of samples available for writing.

property capacity: int#

Total buffer capacity in samples per channel.

property channels: int#

Number of audio channels.

Processors#

Real-Time Processor#

class torchfx.realtime.RealtimeProcessor(effects, backend, config, buffer_capacity=8192)[source]#

Real-time audio processor connecting an audio backend to an effect chain.

Orchestrates audio I/O through a backend and processing through a chain of FX effects. Supports thread-safe parameter updates during processing.

Parameters:
  • effects (Sequence[FX] | nn.Sequential) – Chain of effects to apply in order.

  • backend (AudioBackend) – Audio backend for I/O.

  • config (StreamConfig) – Stream configuration.

  • buffer_capacity (int) – Ring buffer capacity in samples per channel. Default is 8192.

Examples

>>> from torchfx.realtime import RealtimeProcessor, StreamConfig
>>> from torchfx.effect import Gain
>>> config = StreamConfig(sample_rate=48000, buffer_size=512,
...                       channels_in=1, channels_out=1)
reset_state()[source]#

Reset all internal state (filter states, ring buffers).

Useful after seeking in a file or switching audio sources.

Return type:

None

set_parameter(name, value)[source]#

Thread-safe parameter update.

Parameters are staged in a pending dict and applied at the next processing boundary (start of next audio callback). This avoids locks in the audio processing path.

Parameters:
  • name (str) – Dot-separated parameter path, e.g., "0.cutoff" for effect index 0, attribute cutoff.

  • value (Any) – New parameter value.

Return type:

None

Examples

>>> # processor.set_parameter("0.cutoff", 2000)
>>> # processor.set_parameter("1.gain", 0.8)
start()[source]#

Start real-time processing.

Opens and starts the audio stream. The backend callback routes audio through the effect chain.

Raises:

RealtimeError – If the processor is already running.

Return type:

None

stop()[source]#

Stop real-time processing and close the stream.

Raises:

RealtimeError – If the processor is not running.

Return type:

None

property config: StreamConfig#

The stream configuration.

property effects: list[FX]#

The current effect chain.

property is_running: bool#

Whether the processor is currently running.

property latency_ms: float#

Estimated total latency in milliseconds.

Stream Processor#

class torchfx.realtime.StreamProcessor(effects, chunk_size=65536, overlap=0, device='cpu')[source]#

Process audio files in chunks without loading the entire file.

Reads audio in configurable chunk sizes, applies an effect chain to each chunk, and writes output progressively. Supports an overlap parameter for effects that need context beyond chunk boundaries.

Parameters:
  • effects (Sequence[FX] | nn.Sequential) – Chain of effects to apply in order.

  • chunk_size (int) – Number of samples per processing chunk. Default is 65536.

  • overlap (int) – Number of overlap samples between chunks. Default is 0.

  • device (str) – Processing device ("cpu" or "cuda"). Default is "cpu".

Examples

>>> from torchfx.realtime import StreamProcessor
>>> from torchfx.effect import Gain
>>> processor = StreamProcessor(effects=[Gain(0.5)])
process_chunks(input_path)[source]#

Yield processed chunks as tensors.

Generator API for streaming to another process, network, or real-time playback.

Parameters:

input_path (str | Path) – Path to the input audio file.

Yields:

Tensor – Processed audio chunks of shape (channels, chunk_size).

Return type:

Generator[Tensor, None, None]

Examples

>>> from torchfx.realtime import StreamProcessor
>>> from torchfx.effect import Gain
>>> processor = StreamProcessor(effects=[Gain(0.5)])
>>> # for chunk in processor.process_chunks("input.wav"):
>>> #     print(chunk.shape)
process_file(input_path, output_path, format=None, subtype=None)[source]#

Process an audio file chunk by chunk.

Reads the input file in chunks, applies the effect chain to each chunk, and writes the result to the output file.

Parameters:
  • input_path (str | Path) – Path to the input audio file.

  • output_path (str | Path) – Path to the output audio file.

  • format (str | None) – Output format (e.g., "WAV", "FLAC"). Inferred from extension if None.

  • subtype (str | None) – Output subtype (e.g., "PCM_16", "FLOAT"). Uses default for format if None.

Return type:

None

Examples

>>> from torchfx.realtime import StreamProcessor
>>> from torchfx.effect import Gain
>>> processor = StreamProcessor(effects=[Gain(0.5)])
>>> # processor.process_file("input.wav", "output.wav")
property chunk_size: int#

Processing chunk size in samples.

property effects: list[FX]#

The current effect chain.

property overlap: int#

Overlap between chunks in samples.

Exceptions#

class torchfx.realtime.RealtimeError(message, parameter_name=None, actual_value=None, suggestion=None)[source]#

Base exception for all real-time processing errors.

Parameters:
  • message (str) – Human-readable error message.

  • suggestion (str | None, optional) – A suggestion for fixing the error.

  • parameter_name (str | None)

  • actual_value (Any | None)

Examples

>>> raise RealtimeError("Processing failed")
Traceback (most recent call last):
    ...
torchfx.realtime.exceptions.RealtimeError: Processing failed
class torchfx.realtime.BackendNotAvailableError(backend_name, suggestion=None)[source]#

Raised when a requested audio backend is not installed or available.

Parameters:
  • backend_name (str) – Name of the backend that is not available.

  • suggestion (str | None, optional) – Installation instructions or alternative.

Examples

>>> raise BackendNotAvailableError("sounddevice")
Traceback (most recent call last):
    ...
torchfx.realtime.exceptions.BackendNotAvailableError: ...
class torchfx.realtime.StreamError(message, parameter_name=None, actual_value=None, suggestion=None)[source]#

Raised for audio stream lifecycle errors.

Parameters:
  • message (str) – Human-readable error message.

  • suggestion (str | None, optional) – A suggestion for fixing the error.

  • parameter_name (str | None)

  • actual_value (Any | None)

Examples

>>> raise StreamError("Failed to open audio stream")
Traceback (most recent call last):
    ...
torchfx.realtime.exceptions.StreamError: Failed to open audio stream
class torchfx.realtime.BufferOverrunError(message, parameter_name=None, actual_value=None, suggestion=None)[source]#

Raised when the ring buffer overflows.

This occurs when the producer writes data faster than the consumer reads it, causing data loss.

Parameters:
  • message (str) – Human-readable error message.

  • suggestion (str | None, optional) – A suggestion for fixing the error.

  • parameter_name (str | None)

  • actual_value (Any | None)

Examples

>>> raise BufferOverrunError("Ring buffer overflow")
Traceback (most recent call last):
    ...
torchfx.realtime.exceptions.BufferOverrunError: Ring buffer overflow
class torchfx.realtime.BufferUnderrunError(message, parameter_name=None, actual_value=None, suggestion=None)[source]#

Raised when the ring buffer underflows.

This occurs when the consumer attempts to read more data than is available, typically due to processing being faster than input.

Parameters:
  • message (str) – Human-readable error message.

  • suggestion (str | None, optional) – A suggestion for fixing the error.

  • parameter_name (str | None)

  • actual_value (Any | None)

Examples

>>> raise BufferUnderrunError("Ring buffer underrun")
Traceback (most recent call last):
    ...
torchfx.realtime.exceptions.BufferUnderrunError: Ring buffer underrun

Usage Examples#

Real-Time Guitar Effect#

from torchfx.realtime import RealtimeProcessor, SoundDeviceBackend, StreamConfig
from torchfx.filter.iir import LoButterworth, HiButterworth
from torchfx.effect import Gain, Reverb

config = StreamConfig(
    sample_rate=48000,
    buffer_size=256,
    channels_in=1,
    channels_out=1,
    latency="low",
)

with RealtimeProcessor(
    effects=[
        HiButterworth(80),        # Remove low rumble
        LoButterworth(8000),       # Tame high frequencies
        Gain(1.5),                 # Boost signal
        Reverb(room_size=0.3),     # Add ambience
    ],
    backend=SoundDeviceBackend(),
    config=config,
) as processor:
    # Tweak parameters in real time
    processor.set_parameter("2.gain", 2.0)  # Increase gain
    input("Press Enter to stop...")

Processing Large Files#

from torchfx.realtime import StreamProcessor
from torchfx.effect import Gain, Normalize

# Process a large file in 64K sample chunks
with StreamProcessor(
    effects=[Gain(0.8), Normalize(peak=0.95)],
    chunk_size=65536,
) as processor:
    processor.process_file("large_podcast.wav", "normalized_podcast.wav")

    # Or use the generator API
    for chunk in processor.process_chunks("large_podcast.wav"):
        print(f"Processed chunk: {chunk.shape}")

Thread-Safe Parameter Updates#

import time
from torchfx.realtime import RealtimeProcessor, SoundDeviceBackend, StreamConfig
from torchfx.filter.iir import LoButterworth

config = StreamConfig(sample_rate=48000, buffer_size=512,
                      channels_in=1, channels_out=1)

with RealtimeProcessor(
    effects=[LoButterworth(1000)],
    backend=SoundDeviceBackend(),
    config=config,
) as processor:
    # Sweep filter cutoff from 1000Hz to 5000Hz
    for cutoff in range(1000, 5001, 100):
        processor.set_parameter("0.cutoff", cutoff)
        time.sleep(0.05)