Real-Time Processing#
The realtime module provides real-time audio I/O, ring buffering, and stream processing capabilities.
Overview#
The real-time processing system consists of:
Audio backends - Abstract interface for audio I/O with a PortAudio implementation via
sounddeviceRing buffer - Lock-free SPSC buffer for real-time tensor transfer
Real-time processor - Orchestrator connecting backend to effect chain
Stream processor - Chunk-based file processing for large files
Quick Start#
Both processors support the context manager protocol for safe resource management.
Real-time processing (requires sounddevice):
from torchfx.realtime import RealtimeProcessor, SoundDeviceBackend, StreamConfig
from torchfx.effect import Gain
config = StreamConfig(
sample_rate=48000,
buffer_size=512,
channels_in=2,
channels_out=2,
)
with RealtimeProcessor(
effects=[Gain(0.5)],
backend=SoundDeviceBackend(),
config=config,
) as processor:
input("Press Enter to stop...")
# Stream is automatically stopped on exit
Stream processing (no extra dependencies):
from torchfx.realtime import StreamProcessor
from torchfx.effect import Gain
with StreamProcessor(effects=[Gain(0.5)], chunk_size=65536) as processor:
processor.process_file("large_input.wav", "output.wav")
Configuration#
- class torchfx.realtime.StreamConfig(sample_rate=48000, buffer_size=512, channels_in=0, channels_out=2, dtype='float32', device_in=None, device_out=None, latency='low')[source]#
Configuration for an audio stream.
- Parameters:
sample_rate (int) – Sample rate in Hz (e.g., 44100, 48000). Default is 48000.
buffer_size (int) – Number of samples per buffer/callback frame. Default is 512.
channels_in (int) – Number of input channels (0 for output-only). Default is 0.
channels_out (int) – Number of output channels (0 for input-only). Default is 2.
dtype (str) – Sample format string. Default is
"float32".device_in (int | str | None) – Input device identifier. None for system default.
device_out (int | str | None) – Output device identifier. None for system default.
latency (str | float) – Latency hint:
"low","high", or seconds as float. Default is"low".
Examples
>>> config = StreamConfig(sample_rate=44100, buffer_size=256) >>> config.direction <StreamDirection.OUTPUT: 'output'> >>> config.latency_ms 5.804988662131519
- property direction: StreamDirection#
Infer stream direction from channel counts.
- Returns:
DUPLEX if both in and out channels > 0, INPUT if only in > 0, OUTPUT otherwise.
- Return type:
Audio Backend#
- class torchfx.realtime.AudioBackend[source]#
Abstract base class for audio I/O backends.
All audio backends must implement this interface. The backend manages the lifecycle of audio streams and routes audio data through callbacks.
Subclasses should implement all abstract methods to provide a complete audio I/O solution.
Examples
Implementing a custom backend:
>>> class MyBackend(AudioBackend): ... # Implement all abstract methods ... pass
- abstract get_default_device(direction)[source]#
Get the default device for a given direction.
- Parameters:
direction (StreamDirection) – The stream direction to query.
- Returns:
Device identifier.
- Return type:
- abstract open_stream(config, callback=None)[source]#
Open an audio stream with the given configuration.
- Parameters:
config (StreamConfig) – Stream configuration.
callback (AudioCallback | None) – Callback for real-time processing. If None, use blocking API.
- Return type:
None
- abstract read(num_frames)[source]#
Read audio frames (blocking API).
- Parameters:
num_frames (int) – Number of frames to read.
- Returns:
Shape
(channels_in, num_frames).- Return type:
Tensor
- abstract write(data)[source]#
Write audio frames (blocking API).
- Parameters:
data (Tensor) – Shape
(channels_out, num_frames).- Return type:
None
- abstract property state: StreamState#
Current stream state.
- class torchfx.realtime.sounddevice_backend.SoundDeviceBackend[source]#
Audio backend using sounddevice (PortAudio).
Provides real-time audio I/O through PortAudio with support for input, output, and duplex streams. Supports both callback-based and blocking APIs.
The sounddevice module is imported lazily on instantiation.
- Raises:
BackendNotAvailableError – If sounddevice is not installed.
Examples
>>> backend = SoundDeviceBackend() >>> devices = backend.get_devices()
- get_default_device(direction)[source]#
Get the default device for a given direction.
- Parameters:
direction (StreamDirection) – The stream direction.
- Returns:
Default device index or name.
- Return type:
- open_stream(config, callback=None)[source]#
Open an audio stream.
- Parameters:
config (StreamConfig) – Stream configuration.
callback (AudioCallback | None) – Audio processing callback. If None, blocking API is used.
- Raises:
StreamError – If stream is already open or opening fails.
- Return type:
None
- read(num_frames)[source]#
Read audio frames using blocking API.
- Parameters:
num_frames (int) – Number of frames to read.
- Returns:
Shape
(channels_in, num_frames).- Return type:
Tensor
- Raises:
StreamError – If stream is not configured for input.
- start()[source]#
Start the audio stream.
- Raises:
StreamError – If stream is not open.
- Return type:
None
- stop()[source]#
Stop the audio stream.
- Raises:
StreamError – If stream is not running.
- Return type:
None
- write(data)[source]#
Write audio frames using blocking API.
- Parameters:
data (Tensor) – Shape
(channels_out, num_frames).- Raises:
StreamError – If stream is not configured for output.
- Return type:
None
- property state: StreamState#
Current stream state.
Ring Buffer#
- class torchfx.realtime.TensorRingBuffer(capacity, channels=1, dtype=torch.float32, device='cpu')[source]#
Lock-free SPSC ring buffer for real-time audio tensor transfer.
Uses separate read/write indices with a pre-allocated tensor backing store. In the SPSC model, only the producer writes
_write_idxand only the consumer writes_read_idx, so no locks are needed.The capacity must be a power of 2 for efficient modular arithmetic (bitwise AND instead of modulo). If a non-power-of-2 value is provided, it is rounded up to the next power of 2.
- Parameters:
capacity (int) – Total capacity in samples per channel. Rounded up to next power of 2.
channels (int) – Number of audio channels. Default is 1.
dtype (torch.dtype) – Data type for the buffer tensor. Default is
torch.float32.device (str) – Device for the buffer tensor. Default is
"cpu".
- buffer#
Pre-allocated backing tensor of shape
(channels, capacity).- Type:
Tensor
Examples
>>> buf = TensorRingBuffer(capacity=512, channels=2) >>> buf.capacity 512 >>> buf.available_read 0 >>> buf.available_write 512
- advance_read(num_samples)[source]#
Advance the read pointer without reading data.
Use after
peek()to advance by the hop size in overlap-add processing.- Parameters:
num_samples (int) – Number of samples to advance.
- Raises:
BufferUnderrunError – If advancing would go past the write pointer.
- Return type:
None
- clear()[source]#
Reset the buffer to empty state.
Examples
>>> buf = TensorRingBuffer(capacity=256, channels=1) >>> _ = buf.write(torch.ones(1, 100)) >>> buf.clear() >>> buf.available_read 0
- Return type:
None
- peek(num_samples)[source]#
Read samples without advancing the read pointer.
Useful for overlap-add processing where the consumer needs to read overlapping frames.
- Parameters:
num_samples (int) – Number of samples to peek.
- Returns:
Audio data of shape
(channels, num_samples).- Return type:
Tensor
- Raises:
BufferUnderrunError – If fewer samples are available than requested.
- read(num_samples)[source]#
Read samples from the buffer.
- Parameters:
num_samples (int) – Number of samples to read.
- Returns:
Audio data of shape
(channels, num_samples).- Return type:
Tensor
- Raises:
BufferUnderrunError – If fewer samples are available than requested.
Examples
>>> buf = TensorRingBuffer(capacity=256, channels=1) >>> _ = buf.write(torch.ones(1, 100)) >>> output = buf.read(50) >>> output.shape torch.Size([1, 50]) >>> buf.available_read 50
- write(data)[source]#
Write samples into the buffer.
- Parameters:
data (Tensor) – Audio data of shape
(channels, samples)or(samples,)for single-channel buffers.- Returns:
Number of samples actually written. May be less than requested if the buffer is near full.
- Return type:
Examples
>>> buf = TensorRingBuffer(capacity=256, channels=1) >>> data = torch.ones(1, 100) >>> buf.write(data) 100 >>> buf.available_read 100
Processors#
Real-Time Processor#
- class torchfx.realtime.RealtimeProcessor(effects, backend, config, buffer_capacity=8192)[source]#
Real-time audio processor connecting an audio backend to an effect chain.
Orchestrates audio I/O through a backend and processing through a chain of FX effects. Supports thread-safe parameter updates during processing.
- Parameters:
effects (Sequence[FX] | nn.Sequential) – Chain of effects to apply in order.
backend (AudioBackend) – Audio backend for I/O.
config (StreamConfig) – Stream configuration.
buffer_capacity (int) – Ring buffer capacity in samples per channel. Default is 8192.
Examples
>>> from torchfx.realtime import RealtimeProcessor, StreamConfig >>> from torchfx.effect import Gain >>> config = StreamConfig(sample_rate=48000, buffer_size=512, ... channels_in=1, channels_out=1)
- reset_state()[source]#
Reset all internal state (filter states, ring buffers).
Useful after seeking in a file or switching audio sources.
- Return type:
None
- set_parameter(name, value)[source]#
Thread-safe parameter update.
Parameters are staged in a pending dict and applied at the next processing boundary (start of next audio callback). This avoids locks in the audio processing path.
- Parameters:
name (str) – Dot-separated parameter path, e.g.,
"0.cutoff"for effect index 0, attributecutoff.value (Any) – New parameter value.
- Return type:
None
Examples
>>> # processor.set_parameter("0.cutoff", 2000) >>> # processor.set_parameter("1.gain", 0.8)
- start()[source]#
Start real-time processing.
Opens and starts the audio stream. The backend callback routes audio through the effect chain.
- Raises:
RealtimeError – If the processor is already running.
- Return type:
None
- stop()[source]#
Stop real-time processing and close the stream.
- Raises:
RealtimeError – If the processor is not running.
- Return type:
None
- property config: StreamConfig#
The stream configuration.
Stream Processor#
- class torchfx.realtime.StreamProcessor(effects, chunk_size=65536, overlap=0, device='cpu')[source]#
Process audio files in chunks without loading the entire file.
Reads audio in configurable chunk sizes, applies an effect chain to each chunk, and writes output progressively. Supports an overlap parameter for effects that need context beyond chunk boundaries.
- Parameters:
Examples
>>> from torchfx.realtime import StreamProcessor >>> from torchfx.effect import Gain >>> processor = StreamProcessor(effects=[Gain(0.5)])
- process_chunks(input_path)[source]#
Yield processed chunks as tensors.
Generator API for streaming to another process, network, or real-time playback.
- Parameters:
input_path (str | Path) – Path to the input audio file.
- Yields:
Tensor – Processed audio chunks of shape
(channels, chunk_size).- Return type:
Examples
>>> from torchfx.realtime import StreamProcessor >>> from torchfx.effect import Gain >>> processor = StreamProcessor(effects=[Gain(0.5)]) >>> # for chunk in processor.process_chunks("input.wav"): >>> # print(chunk.shape)
- process_file(input_path, output_path, format=None, subtype=None)[source]#
Process an audio file chunk by chunk.
Reads the input file in chunks, applies the effect chain to each chunk, and writes the result to the output file.
- Parameters:
input_path (str | Path) – Path to the input audio file.
output_path (str | Path) – Path to the output audio file.
format (str | None) – Output format (e.g.,
"WAV","FLAC"). Inferred from extension if None.subtype (str | None) – Output subtype (e.g.,
"PCM_16","FLOAT"). Uses default for format if None.
- Return type:
None
Examples
>>> from torchfx.realtime import StreamProcessor >>> from torchfx.effect import Gain >>> processor = StreamProcessor(effects=[Gain(0.5)]) >>> # processor.process_file("input.wav", "output.wav")
Exceptions#
- class torchfx.realtime.RealtimeError(message, parameter_name=None, actual_value=None, suggestion=None)[source]#
Base exception for all real-time processing errors.
- Parameters:
Examples
>>> raise RealtimeError("Processing failed") Traceback (most recent call last): ... torchfx.realtime.exceptions.RealtimeError: Processing failed
- class torchfx.realtime.BackendNotAvailableError(backend_name, suggestion=None)[source]#
Raised when a requested audio backend is not installed or available.
- Parameters:
Examples
>>> raise BackendNotAvailableError("sounddevice") Traceback (most recent call last): ... torchfx.realtime.exceptions.BackendNotAvailableError: ...
- class torchfx.realtime.StreamError(message, parameter_name=None, actual_value=None, suggestion=None)[source]#
Raised for audio stream lifecycle errors.
- Parameters:
Examples
>>> raise StreamError("Failed to open audio stream") Traceback (most recent call last): ... torchfx.realtime.exceptions.StreamError: Failed to open audio stream
- class torchfx.realtime.BufferOverrunError(message, parameter_name=None, actual_value=None, suggestion=None)[source]#
Raised when the ring buffer overflows.
This occurs when the producer writes data faster than the consumer reads it, causing data loss.
- Parameters:
Examples
>>> raise BufferOverrunError("Ring buffer overflow") Traceback (most recent call last): ... torchfx.realtime.exceptions.BufferOverrunError: Ring buffer overflow
- class torchfx.realtime.BufferUnderrunError(message, parameter_name=None, actual_value=None, suggestion=None)[source]#
Raised when the ring buffer underflows.
This occurs when the consumer attempts to read more data than is available, typically due to processing being faster than input.
- Parameters:
Examples
>>> raise BufferUnderrunError("Ring buffer underrun") Traceback (most recent call last): ... torchfx.realtime.exceptions.BufferUnderrunError: Ring buffer underrun
Usage Examples#
Real-Time Guitar Effect#
from torchfx.realtime import RealtimeProcessor, SoundDeviceBackend, StreamConfig
from torchfx.filter.iir import LoButterworth, HiButterworth
from torchfx.effect import Gain, Reverb
config = StreamConfig(
sample_rate=48000,
buffer_size=256,
channels_in=1,
channels_out=1,
latency="low",
)
with RealtimeProcessor(
effects=[
HiButterworth(80), # Remove low rumble
LoButterworth(8000), # Tame high frequencies
Gain(1.5), # Boost signal
Reverb(room_size=0.3), # Add ambience
],
backend=SoundDeviceBackend(),
config=config,
) as processor:
# Tweak parameters in real time
processor.set_parameter("2.gain", 2.0) # Increase gain
input("Press Enter to stop...")
Processing Large Files#
from torchfx.realtime import StreamProcessor
from torchfx.effect import Gain, Normalize
# Process a large file in 64K sample chunks
with StreamProcessor(
effects=[Gain(0.8), Normalize(peak=0.95)],
chunk_size=65536,
) as processor:
processor.process_file("large_podcast.wav", "normalized_podcast.wav")
# Or use the generator API
for chunk in processor.process_chunks("large_podcast.wav"):
print(f"Processed chunk: {chunk.shape}")
Thread-Safe Parameter Updates#
import time
from torchfx.realtime import RealtimeProcessor, SoundDeviceBackend, StreamConfig
from torchfx.filter.iir import LoButterworth
config = StreamConfig(sample_rate=48000, buffer_size=512,
channels_in=1, channels_out=1)
with RealtimeProcessor(
effects=[LoButterworth(1000)],
backend=SoundDeviceBackend(),
config=config,
) as processor:
# Sweep filter cutoff from 1000Hz to 5000Hz
for cutoff in range(1000, 5001, 100):
processor.set_parameter("0.cutoff", cutoff)
time.sleep(0.05)