Real-Time Processing#
The realtime module provides real-time audio I/O, ring buffering, and stream processing capabilities.
Overview#
The real-time processing system consists of:
Audio backends - Abstract interface for audio I/O with a PortAudio implementation via
sounddeviceRing buffer - Lock-free SPSC buffer for real-time tensor transfer
Real-time processor - Orchestrator connecting backend to effect chain
Stream processor - Chunk-based file processing for large files
Quick Start#
Both processors support the context manager protocol for safe resource management.
Real-time processing (requires sounddevice):
from torchfx.realtime import RealtimeProcessor, SoundDeviceBackend, StreamConfig
from torchfx.effect import Gain
config = StreamConfig(
sample_rate=48000,
buffer_size=512,
channels_in=2,
channels_out=2,
)
with RealtimeProcessor(
effects=[Gain(0.5)],
backend=SoundDeviceBackend(),
config=config,
) as processor:
input("Press Enter to stop...")
# Stream is automatically stopped on exit
Stream processing (no extra dependencies):
from torchfx.realtime import StreamProcessor
from torchfx.effect import Gain
with StreamProcessor(effects=[Gain(0.5)], chunk_size=65536) as processor:
processor.process_file("large_input.wav", "output.wav")
Configuration#
- class torchfx.realtime.StreamConfig(sample_rate=48000, buffer_size=512, channels_in=0, channels_out=2, dtype='float32', device_in=None, device_out=None, latency='low')[source]#
Configuration for an audio stream.
- Parameters:
sample_rate (int) – Sample rate in Hz (e.g., 44100, 48000). Default is 48000.
buffer_size (int) – Number of samples per buffer/callback frame. Default is 512.
channels_in (int) – Number of input channels (0 for output-only). Default is 0.
channels_out (int) – Number of output channels (0 for input-only). Default is 2.
dtype (str) – Sample format string. Default is
"float32".device_in (int | str | None) – Input device identifier. None for system default.
device_out (int | str | None) – Output device identifier. None for system default.
latency (str | float) – Latency hint:
"low","high", or seconds as float. Default is"low".
Examples
>>> config = StreamConfig(sample_rate=44100, buffer_size=256) >>> config.direction <StreamDirection.OUTPUT: 'output'> >>> config.latency_ms 5.804988662131519
- property direction: StreamDirection#
Infer stream direction from channel counts.
- Returns:
DUPLEX if both in and out channels > 0, INPUT if only in > 0, OUTPUT otherwise.
- Return type:
Audio Backend#
- class torchfx.realtime.AudioBackend[source]#
Abstract base class for audio I/O backends.
All audio backends must implement this interface. The backend manages the lifecycle of audio streams and routes audio data through callbacks.
Subclasses should implement all abstract methods to provide a complete audio I/O solution.
Examples
Implementing a custom backend:
>>> class MyBackend(AudioBackend): ... # Implement all abstract methods ... pass
- abstract get_default_device(direction)[source]#
Get the default device for a given direction.
- Parameters:
direction (StreamDirection) – The stream direction to query.
- Returns:
Device identifier.
- Return type:
- abstract open_stream(config, callback=None)[source]#
Open an audio stream with the given configuration.
- Parameters:
config (StreamConfig) – Stream configuration.
callback (AudioCallback | None) – Callback for real-time processing. If None, use blocking API.
- Return type:
None
- abstract read(num_frames)[source]#
Read audio frames (blocking API).
- Parameters:
num_frames (int) – Number of frames to read.
- Returns:
Shape
(channels_in, num_frames).- Return type:
Tensor
- abstract write(data)[source]#
Write audio frames (blocking API).
- Parameters:
data (Tensor) – Shape
(channels_out, num_frames).- Return type:
None
- abstract property state: StreamState#
Current stream state.
- class torchfx.realtime.sounddevice_backend.SoundDeviceBackend[source]#
Audio backend using sounddevice (PortAudio).
Provides real-time audio I/O through PortAudio with support for input, output, and duplex streams. Supports both callback-based and blocking APIs.
The sounddevice module is imported lazily on instantiation.
- Raises:
BackendNotAvailableError – If sounddevice is not installed.
Examples
>>> backend = SoundDeviceBackend() >>> devices = backend.get_devices()
- get_default_device(direction)[source]#
Get the default device for a given direction.
- Parameters:
direction (StreamDirection) – The stream direction.
- Returns:
Default device index or name.
- Return type:
- open_stream(config, callback=None)[source]#
Open an audio stream.
- Parameters:
config (StreamConfig) – Stream configuration.
callback (AudioCallback | None) – Audio processing callback. If None, blocking API is used.
- Raises:
StreamError – If stream is already open or opening fails.
- Return type:
None
- read(num_frames)[source]#
Read audio frames using blocking API.
- Parameters:
num_frames (int) – Number of frames to read.
- Returns:
Shape
(channels_in, num_frames).- Return type:
Tensor
- Raises:
StreamError – If stream is not configured for input.
- start()[source]#
Start the audio stream.
- Raises:
StreamError – If stream is not open.
- Return type:
None
- stop()[source]#
Stop the audio stream.
- Raises:
StreamError – If stream is not running.
- Return type:
None
- write(data)[source]#
Write audio frames using blocking API.
- Parameters:
data (Tensor) – Shape
(channels_out, num_frames).- Raises:
StreamError – If stream is not configured for output.
- Return type:
None
- property state: StreamState#
Current stream state.
Ring Buffer#
- class torchfx.realtime.TensorRingBuffer(capacity, channels=1, dtype=torch.float32, device='cpu')[source]#
Lock-free SPSC ring buffer for real-time audio tensor transfer.
Uses separate read/write indices with a pre-allocated tensor backing store. In the SPSC model, only the producer writes
_write_idxand only the consumer writes_read_idx, so no locks are needed.The capacity must be a power of 2 for efficient modular arithmetic (bitwise AND instead of modulo). If a non-power-of-2 value is provided, it is rounded up to the next power of 2.
- Parameters:
capacity (int) – Total capacity in samples per channel. Rounded up to next power of 2.
channels (int) – Number of audio channels. Default is 1.
dtype (torch.dtype) – Data type for the buffer tensor. Default is
torch.float32.device (str) – Device for the buffer tensor. Default is
"cpu".
- buffer#
Pre-allocated backing tensor of shape
(channels, capacity).- Type:
Tensor
Examples
>>> buf = TensorRingBuffer(capacity=512, channels=2) >>> buf.capacity 512 >>> buf.available_read 0 >>> buf.available_write 512
- advance_read(num_samples)[source]#
Advance the read pointer without reading data.
Use after
peek()to advance by the hop size in overlap-add processing.- Parameters:
num_samples (int) – Number of samples to advance.
- Raises:
BufferUnderrunError – If advancing would go past the write pointer.
- Return type:
None
- clear()[source]#
Reset the buffer to empty state.
Examples
>>> buf = TensorRingBuffer(capacity=256, channels=1) >>> _ = buf.write(torch.ones(1, 100)) >>> buf.clear() >>> buf.available_read 0
- Return type:
None
- peek(num_samples)[source]#
Read samples without advancing the read pointer.
Useful for overlap-add processing where the consumer needs to read overlapping frames.
- Parameters:
num_samples (int) – Number of samples to peek.
- Returns:
Audio data of shape
(channels, num_samples).- Return type:
Tensor
- Raises:
BufferUnderrunError – If fewer samples are available than requested.
- read(num_samples)[source]#
Read samples from the buffer.
- Parameters:
num_samples (int) – Number of samples to read.
- Returns:
Audio data of shape
(channels, num_samples).- Return type:
Tensor
- Raises:
BufferUnderrunError – If fewer samples are available than requested.
Examples
>>> buf = TensorRingBuffer(capacity=256, channels=1) >>> _ = buf.write(torch.ones(1, 100)) >>> output = buf.read(50) >>> output.shape torch.Size([1, 50]) >>> buf.available_read 50
- write(data)[source]#
Write samples into the buffer.
- Parameters:
data (Tensor) – Audio data of shape
(channels, samples)or(samples,)for single-channel buffers.- Returns:
Number of samples actually written. May be less than requested if the buffer is near full.
- Return type:
Examples
>>> buf = TensorRingBuffer(capacity=256, channels=1) >>> data = torch.ones(1, 100) >>> buf.write(data) 100 >>> buf.available_read 100
Processors#
Real-Time Processor#
- class torchfx.realtime.RealtimeProcessor(effects, backend, config, ring_blocks=4, latency_log_size=65536, prime_output=True, start_worker=True)[source]#
Real-time audio processor connecting an audio backend to an effect chain.
- Parameters:
effects (Sequence[FX] | nn.Sequential) – Chain of effects to apply in order. Each effect must preserve chunk length; effects that change the time dimension (e.g.,
Delaywith feedback that grows the buffer) are rejected at runtime.backend (AudioBackend) – Audio backend driving the I/O thread.
config (StreamConfig) – Stream configuration.
config.buffer_sizedefines the DSP chunk size.ring_blocks (int) – Capacity of each ring buffer in units of
buffer_size. Default is 4 (≈40 ms tolerance at 512-sample / 48 kHz).latency_log_size (int) – Maximum number of per-callback latency samples retained. Default is 65 536 (≈ 11 min at 512-sample / 48 kHz). Older entries are evicted FIFO.
prime_output (bool) – If True (default), write one chunk of silence into the output ring at start so the first callback never underflows.
start_worker (bool) – If True (default),
start()spawns a dedicated DSP worker thread that drains the input ring continuously. If False, the caller is responsible for callingprocess_pending()after each callback round-trip. The latter mode is intended for deterministic testing and for embedding the processor inside a host-driven scheduler.
Examples
>>> from torchfx.realtime import RealtimeProcessor, StreamConfig >>> from torchfx.effect import Gain >>> config = StreamConfig(sample_rate=48000, buffer_size=512, ... channels_in=1, channels_out=1)
- latency_log_ns()[source]#
Snapshot of per-callback wall-clock durations in nanoseconds.
The returned list is a copy; safe to consume from any thread.
Examples
>>> # samples = processor.latency_log_ns() >>> # p99 = sorted(samples)[int(len(samples) * 0.99)] / 1e6 # ms
- latency_stats_ms()[source]#
Summary statistics of per-callback latency in milliseconds.
- Returns:
Keys
count,min,median,mean,p95,p99,max. Empty log returns zeros exceptcount.- Return type:
Examples
>>> # stats = processor.latency_stats_ms() >>> # print(stats["p99"], stats["count"])
- process_pending()[source]#
Drive one DSP draining pass synchronously.
Intended for tests and host-scheduled deployments using
start_worker=False. Reads as manybuffer_size-aligned chunks as are available in the input ring, runs the effect chain on each, and writes results to the output ring.- Returns:
Number of chunks processed.
- Return type:
- reset_metrics()[source]#
Reset latency log and xrun counters.
Useful between benchmark phases to discard startup transients.
- Return type:
None
- reset_state()[source]#
Reset internal state: ring buffers and stateful effects.
Useful after seeking in a file or switching audio sources. Safe to call while running, but typically called between sessions.
- Return type:
None
- set_parameter(name, value)[source]#
Thread-safe parameter update.
Parameters are staged in a pending dict and applied at the next DSP worker iteration boundary, not in the audio callback. This keeps the callback allocation-free.
- Parameters:
name (str) – Dot-separated parameter path, e.g.,
"0.cutoff"for effect index 0, attributecutoff.value (Any) – New parameter value.
- Return type:
None
Examples
>>> # processor.set_parameter("0.cutoff", 2000) >>> # processor.set_parameter("1.gain", 0.8)
- start()[source]#
Start real-time processing.
Spawns the DSP worker thread, primes the output ring, and starts the audio backend. The audio callback is registered with the backend; once running, sample movement between the backend’s buffers and the ring buffers happens on every callback while the effect chain runs in the worker thread.
- Raises:
RealtimeError – If the processor is already running.
- Return type:
None
- stop()[source]#
Stop real-time processing and close the stream.
Stops the audio backend first (so no more callbacks fire) and then signals the worker thread to exit. Any exception captured from the worker thread is re-raised here so callers see DSP errors at a known point in their code.
- Raises:
RealtimeError – If the processor is not running.
- Return type:
None
- property backend_xrun_count: int#
Xruns reported by the audio backend itself (e.g. PortAudio paInputOverflow).
- property callback_count: int#
Number of audio callbacks serviced since start (or last
reset_metrics).
- property config: StreamConfig#
The stream configuration.
- property input_overflow_count: int#
Times the audio callback couldn’t push all input samples into the ring.
Stream Processor#
- class torchfx.realtime.StreamProcessor(effects, chunk_size=65536, overlap=0, device='cpu')[source]#
Process audio files in chunks without loading the entire file.
Reads audio in configurable chunk sizes, applies an effect chain to each chunk, and writes output progressively. Supports an overlap parameter for effects that need context beyond chunk boundaries.
- Parameters:
Examples
>>> from torchfx.realtime import StreamProcessor >>> from torchfx.effect import Gain >>> processor = StreamProcessor(effects=[Gain(0.5)])
- process_chunks(input_path)[source]#
Yield processed chunks as tensors.
Generator API for streaming to another process, network, or real-time playback.
- Parameters:
input_path (str | Path) – Path to the input audio file.
- Yields:
Tensor – Processed audio chunks of shape
(channels, chunk_size).- Return type:
Examples
>>> from torchfx.realtime import StreamProcessor >>> from torchfx.effect import Gain >>> processor = StreamProcessor(effects=[Gain(0.5)]) >>> # for chunk in processor.process_chunks("input.wav"): >>> # print(chunk.shape)
- process_file(input_path, output_path, format=None, subtype=None)[source]#
Process an audio file chunk by chunk.
Reads the input file in chunks, applies the effect chain to each chunk, and writes the result to the output file.
- Parameters:
input_path (str | Path) – Path to the input audio file.
output_path (str | Path) – Path to the output audio file.
format (str | None) – Output format (e.g.,
"WAV","FLAC"). Inferred from extension if None.subtype (str | None) – Output subtype (e.g.,
"PCM_16","FLOAT"). Uses default for format if None.
- Return type:
None
Examples
>>> from torchfx.realtime import StreamProcessor >>> from torchfx.effect import Gain >>> processor = StreamProcessor(effects=[Gain(0.5)]) >>> # processor.process_file("input.wav", "output.wav")
CUDA Graph Runner#
Captures a fixed-shape GPU filter forward into a torch.cuda.CUDAGraph and
replays it per chunk, collapsing the per-section kernel launches of an SOS cascade into a
single graph launch. The win is largest in the short-chunk / realtime regime (up to ~4×
lower per-chunk latency on an RTX 3070). Streaming DF1 state is carried across replays.
import torch
from torchfx.filter import HiButterworth, LoButterworth
from torchfx.filter.fused import FusedSOSCascade
from torchfx.realtime import CudaGraphRunner
chain = FusedSOSCascade(
HiButterworth(80, order=2, fs=48000),
LoButterworth(8000, order=4, fs=48000),
)
runner = CudaGraphRunner(chain, torch.randn(2, 512, device="cuda"))
for chunk in stream: # each chunk is [2, 512] on CUDA
y = runner.run(chunk).clone()
- class torchfx.realtime.CudaGraphRunner(module, example, warmup=3)[source]#
Capture a fixed-shape GPU forward into a CUDA graph and replay it.
- Parameters:
module (nn.Module) – A GPU filter/cascade whose
forwardupdates its state in place (the native SOS path does). The captured graph reuses the module’s state buffers, so streaming continuity is preserved acrossrun()calls.example (Tensor) – A representative input chunk on CUDA. Its shape, dtype, and device fix the graph; every
run()input must match.warmup (int, default 3) – Eager iterations on a side stream before capture, so coefficient caches and scratch allocations are established (required for stable capture).
Notes
The captured filter must have static coefficients — the SOS taps are baked into the graph at capture time. Call
reset_state()to restart streaming from a zero state (e.g. between files). The returned tensor fromrun()is the module’s shared output buffer; clone it before the nextrun().- reset_state()[source]#
Zero the captured streaming state in place to restart a new stream.
Does not call
module.reset_state()— that may drop coefficients or rebind the state buffers the graph was captured against; instead the existing state buffers are zeroed in place.Caveat: graph continuation is exact, but the very first chunk after a reset can show a small initial transient versus a never-run eager filter, because the capture warmup is baked into the recorded kernels. For exact fresh-stream behaviour, warm up the runner with input representative of the stream’s start (e.g. silence). The transient decays within a few samples.
- Return type:
None
Exceptions#
- class torchfx.realtime.RealtimeError(message, parameter_name=None, actual_value=None, suggestion=None)[source]#
Base exception for all real-time processing errors.
- Parameters:
Examples
>>> raise RealtimeError("Processing failed") Traceback (most recent call last): ... torchfx.realtime.exceptions.RealtimeError: Processing failed
- class torchfx.realtime.BackendNotAvailableError(backend_name, suggestion=None)[source]#
Raised when a requested audio backend is not installed or available.
- Parameters:
Examples
>>> raise BackendNotAvailableError("sounddevice") Traceback (most recent call last): ... torchfx.realtime.exceptions.BackendNotAvailableError: ...
- class torchfx.realtime.StreamError(message, parameter_name=None, actual_value=None, suggestion=None)[source]#
Raised for audio stream lifecycle errors.
- Parameters:
Examples
>>> raise StreamError("Failed to open audio stream") Traceback (most recent call last): ... torchfx.realtime.exceptions.StreamError: Failed to open audio stream
- class torchfx.realtime.BufferOverrunError(message, parameter_name=None, actual_value=None, suggestion=None)[source]#
Raised when the ring buffer overflows.
This occurs when the producer writes data faster than the consumer reads it, causing data loss.
- Parameters:
Examples
>>> raise BufferOverrunError("Ring buffer overflow") Traceback (most recent call last): ... torchfx.realtime.exceptions.BufferOverrunError: Ring buffer overflow
- class torchfx.realtime.BufferUnderrunError(message, parameter_name=None, actual_value=None, suggestion=None)[source]#
Raised when the ring buffer underflows.
This occurs when the consumer attempts to read more data than is available, typically due to processing being faster than input.
- Parameters:
Examples
>>> raise BufferUnderrunError("Ring buffer underrun") Traceback (most recent call last): ... torchfx.realtime.exceptions.BufferUnderrunError: Ring buffer underrun
Usage Examples#
Real-Time Guitar Effect#
from torchfx.realtime import RealtimeProcessor, SoundDeviceBackend, StreamConfig
from torchfx.filter.iir import LoButterworth, HiButterworth
from torchfx.effect import Gain, Reverb
config = StreamConfig(
sample_rate=48000,
buffer_size=256,
channels_in=1,
channels_out=1,
latency="low",
)
with RealtimeProcessor(
effects=[
HiButterworth(80), # Remove low rumble
LoButterworth(8000), # Tame high frequencies
Gain(1.5), # Boost signal
Reverb(room_size=0.3), # Add ambience
],
backend=SoundDeviceBackend(),
config=config,
) as processor:
# Tweak parameters in real time
processor.set_parameter("2.gain", 2.0) # Increase gain
input("Press Enter to stop...")
Processing Large Files#
from torchfx.realtime import StreamProcessor
from torchfx.effect import Gain, Normalize
# Process a large file in 64K sample chunks
with StreamProcessor(
effects=[Gain(0.8), Normalize(peak=0.95)],
chunk_size=65536,
) as processor:
processor.process_file("large_podcast.wav", "normalized_podcast.wav")
# Or use the generator API
for chunk in processor.process_chunks("large_podcast.wav"):
print(f"Processed chunk: {chunk.shape}")
Thread-Safe Parameter Updates#
import time
from torchfx.realtime import RealtimeProcessor, SoundDeviceBackend, StreamConfig
from torchfx.filter.iir import LoButterworth
config = StreamConfig(sample_rate=48000, buffer_size=512,
channels_in=1, channels_out=1)
with RealtimeProcessor(
effects=[LoButterworth(1000)],
backend=SoundDeviceBackend(),
config=config,
) as processor:
# Sweep filter cutoff from 1000Hz to 5000Hz
for cutoff in range(1000, 5001, 100):
processor.set_parameter("0.cutoff", cutoff)
time.sleep(0.05)