Tide is a comprehensive framework for time series data processing, analysis, and machine learning. Built with PyTorch integration at its core, Tide provides a unified interface for loading, processing, and analyzing time series data across diverse domains including sensor networks, biomedical signals, financial data, and industrial monitoring systems.
The framework offers modular components for dataset management, feature engineering, signal filtering, and temporal data splitting, making it suitable for both research and production environments.
- Core Features
- TSDataset - Universal Time Series Data Loader
- Time Series Feature Engineering
- Signal Processing and Filtering
- Temporal Data Splitting
- Domain-Specific Extensions
- Installation
- Getting Started
- Testing
- Contributing
- License
Tide provides a comprehensive suite of tools for time series data processing and analysis:
- Multi-format Support: Load data from HDF5, CSV, NumPy, and PyTorch formats
- Flexible Windowing: Configurable input/output windows with prediction offsets
- Memory Management: Efficient disk-based or RAM-based loading strategies
- Metadata Filtering: Advanced filtering based on file attributes
- Batch Processing: Optimized for large-scale time series datasets
- 20+ Built-in Features: Time-domain, frequency-domain, and advanced signal processing features
- Parallel Processing: Multi-threaded feature extraction for improved performance
- Custom Features: Extensible framework for domain-specific feature development
- PyTorch Integration: Direct tensor output for deep learning workflows
- Digital Filters: Butterworth bandpass, OneEuro, and custom filter implementations
- Filter Stacks: Composable filter chains with automatic dimensionality management
- Real-t 103CE ime Processing: Low-latency filtering suitable for real-time applications
- Time-aware Splitting: Preserve temporal dependencies in train/validation/test splits
- Multiple Strategies: Chronological, gap-based, rolling window, and blocked cross-validation
- Flexible Configuration: Customizable split ratios and minimum size constraints
The TSDataset
class provides a powerful, flexible interface for loading time series data from various file formats. It's designed to work seamlessly with PyTorch's DataLoader
and supports both single-file and multi-file datasets with sliding window configurations.
- Multiple File Formats: H5, CSV, NPZ, PyTorch tensors (.pt)
- Flexible Window Configuration: Configurable input/output windows with prediction offsets
- Memory Management: Choose between disk-based or RAM-based loading
- Metadata Filtering: Filter datasets based on file metadata
- PyTorch Integration: Native compatibility with
DataLoader
for training - Multi-file Support: Automatic indexing across multiple files
from tsds._core.dataset import TSDataset
# Load a single CSV file
dataset = TSDataset(
location="data/pollution.csv",
features=['temperature', 'pressure', 'humidity'], # Input features
labels=['pollution'], # Target variable
input_window_length=10, # 10 timesteps input
output_window_length=5, # 5 timesteps output
prediction_offset=1, # 1-step ahead prediction
load2ram=True # Load to memory
)
print(f"Dataset length: {len(dataset)}")
sample = dataset[0]
print(f"Input shape: {sample['input'].shape}") # [10, 3]
print(f"Target shape: {sample['targets'].shape}") # [5, 1]
For larger datasets, use the multi-file approach with a data.yaml
configuration inside the data directory:
# data.yaml
path: 'h5_files'
file_format: 'h5'
features: ['temperature', 'pressure', 'humidity']
labels: ['energy_consumption', 'power_usage']
description: 'Multi-building sensor data'
# Load multi-file dataset
dataset = TSDataset(
location="./data/sensor_network", # Directory containing data.yaml
input_window_length=15,
output_window_length=5,
prediction_offset=2,
load2ram=False, # Keep files on disk
max_open_files=8 # Limit open file handles
)
To use TSDataset with your own data, you need to structure your files and directories according to the supported formats. Here's how to organize your datasets:
For single files, TSDataset supports the following formats:
CSV Files (.csv
)
data/
└── timeseries.csv
CSV structure requirements:
- First dimension: Time steps (rows)
- Columns: Features and labels
- Header: Column names (used for feature/label specification)
timestamp,temperature,pressure,humidity,energy_consumption
2024-01-01 00:00:00,22.5,1013.2,45.3,120.5
2024-01-01 01:00:00,22.1,1013.8,46.1,118.2
2024-01-01 02:00:00,21.8,1014.1,46.8,115.9
...
HDF5 Files (.h5
, .hdf5
)
data/
└── sensors.h5
H5 structure requirements:
- Datasets: Each feature/label as separate dataset
- Shape:
(time_steps,)
for each dataset - Metadata: Optional attributes for filtering
# Example H5 structure
import h5py
with h5py.File('sensors.h5', 'w') as f:
f.create_dataset('temperature', data=temp_data) # Shape: (1000,)
f.create_dataset('pressure', data=pressure_data) # Shape: (1000,)
f.create_dataset('humidity', data=humidity_data) # Shape: (1000,)
f.create_dataset('energy', data=energy_data) # Shape: (1000,)
# Optional metadata
f.attrs['location'] = 'Building_A'
f.attrs['sensor_type'] = 'environmental'
NumPy Files (.npz
)
data/
└── signals.npz
NPZ structure requirements:
- Arrays: Each feature/label as named array
- Shape:
(time_steps,)
for each array
# Example NPZ structure
import numpy as np
np.savez('signals.npz',
signal_1=data1, # Shape: (1000,)
signal_2=data2, # Shape: (1000,)
target=target_data) # Shape: (1000,)
PyTorch Files (.pt
, .pth
)
data/
└── tensors.pt
PT structure requirements:
- Dictionary: Keys as feature/label names, values as tensors
- Shape:
(time_steps,)
for each tensor
# Example PT structure
import torch
data_dict = {
'sensor_a': torch.tensor(data_a), # Shape: [1000]
'sensor_b': torch.tensor(data_b), # Shape: [1000]
'target': torch.tensor(target_data) # Shape: [1000]
}
torch.save(data_dict, 'tensors.pt')
For multi-file datasets, organize your data with a data.yaml
configuration:
dataset_directory/
├── data.yaml # Configuration file
└── data_files/ # Data directory
├── file_001.h5
├── file_002.h5
├── file_003.h5
└── ...
Required data.yaml
Structure:
# data.yaml - Required fields
path: 'data_files' # Relative path to data files
file_format: 'h5' # File format: 'h5', 'csv', 'npz', 'pt'
features: ['temperature', 'pressure', 'humidity'] # Input feature names
labels: ['energy_consumption'] # Target label names
# Optional fields
description: 'Multi-sensor environmental data'
sampling_rate: 1000 # Hz
units:
temperature: 'Celsius'
pressure: 'hPa'
humidity: 'percent'
energy_consumption: 'kWh'
Complete Multi-File Example:
environmental_sensors/
├── data.yaml
└── h5_files/
├── building_a_sensors.h5
├── building_b_sensors.h5
└── building_c_sensors.h5
Each H5 file contains the same structure:
# building_a_sensors.h5
with h5py.File('building_a_sensors.h5', 'w') as f:
# Time series data (all same length)
f.create_dataset('temperature', data=temp_array) # Shape: (2000,)
f.create_dataset('pressure', data=pressure_array) # Shape: (2000,)
f.create_dataset('humidity', data=humidity_array) # Shape: (2000,)
f.create_dataset('energy_consumption', data=energy_array) # Shape: (2000,)
# Metadata for filtering (optional)
f.attrs['building'] = 'Building_A'
f.attrs['floor'] = 1
f.attrs['data_quality'] = 'high'
f.attrs['installation_year'] = 2023
data.yaml for this example:
path: 'h5_files'
file_format: 'h5'
features: ['temperature', 'pressure', 'humidity']
labels: ['energy_consumption']
description: 'Multi-building environmental sensor network'
You can filter files based on metadata attributes. The metadata_filter
parameter accepts a callable that takes a metadata dictionary and returns True
to include the file or False
to exclude it.
def quality_filter(metadata):
"""Only include high-quality sensors from factories"""
return (metadata.get('data_quality') == 'high' and
metadata.get('location_type') == 'factory')
dataset = TSDataset(
location="path/to/your/dataset",
input_window_length=10,
output_window_length=5,
prediction_offset=1,
metadata_filter=quality_filter # Apply custom filter
)
Time Series Requirements:
- Consistent sampling: All files should have the same sampling rate
- Same features: All files must contain the same feature/label names
- Sequential data: Data should be temporally ordered
- No missing timestamps: Ensure continuous time series
File Naming:
- Use descriptive names:
sensor_building_a.h5
vsfile1.h5
- Include metadata in filenames when possible
- Use consistent naming patterns across files
Data Quality:
- Consistent dtypes: Use
float32
for memory efficiency - Reasonable ranges: Ensure data values are within expected bounds
- Handle missing values: Remove or interpolate missing data points
- Normalize if needed: Consider scaling features to similar ranges
Directory Organization:
project/
├── data/
│ ├── raw/ # Original data files
│ ├── processed/ # TSDataset-ready files
│ │ ├── data.yaml
│ │ └── h5_files/
│ └── splits/ # Train/val/test splits
└── models/
Testing Your Data Structure:
# Verify your dataset loads correctly
from tsds._core.dataset import TSDataset
# Test loading
try:
dataset = TSDataset(
location="path/to/your/dataset",
input_window_length=10,
output_window_length=5,
prediction_offset=1
)
print(f"Dataset loaded: {len(dataset)} samples")
print(f"Features: {dataset.features}")
print(f"Labels: {dataset.labels}")
# Test sample access
sample = dataset[0]
print(f"Sample input shape: {sample['input'].shape}")
print(f"Sample target shape: {sample['targets'].shape}")
except Exception as e:
print(f"Error loading dataset: {e}")
The prediction offset defines the gap between input and output sequences:
Here is a visual representation for input_window_length=9
, output_window_length=7
, and prediction_offset=6
for dataset index 0:
Time: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
Input: [------ 9 timesteps ----]
Output: [-- 7 timesteps --]
↑ ↑
start=0 start=6 (offset=6)
prediction_offset=1
: Next-step prediction (immediate future)prediction_offset>1
: Future forecasting with gapprediction_offset=0
: Overlapping sequences (reconstruction tasks)
Different configurations serve different use cases:
# Next-step prediction (real-time monitoring)
dataset = TSDataset(location=data_path, input_window_length=20,
output_window_length=1, prediction_offset=1)
# Short-term forecasting (operational planning)
dataset = TSDataset(location=data_path, input_window_length=24,
output_window_length=6, prediction_offset=1)
# Long-term forecasting (strategic planning)
dataset = TSDataset(location=data_path, input_window_length=48,
output_window_length=24, prediction_offset=12)
# Sequence-to-sequence (denoising, reconstruction)
dataset = TSDataset(location=data_path, input_window_length=16,
output_window_length=-1, prediction_offset=1) # -1 = same as input
from torch.utils.data import DataLoader, random_split
# Create train/validation split
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
# Training loop
for batch in train_loader:
inputs = batch['input'] # [batch_size, seq_len, features]
targets = batch['targets'] # [batch_size, seq_len, targets]
predictions = model(inputs)
loss = criterion(predictions, targets)
# ... backpropagation
Filter files based on metadata attributes:
def quality_filter(metadata):
"""Only include high-quality sensors from factories"""
return (metadata.get('data_quality') == 'high' and
metadata.get('location_type') == 'factory')
dataset = TSDataset(
location=data_dir,
input_window_length=10,
output_window_length=5,
prediction_offset=1,
metadata_filter=quality_filter # Apply custom filter
)
Choose the appropriate loading strategy:
# For small datasets (< 500MB): Load to RAM for speed
dataset = TSDataset(location=data_dir, load2ram=True)
# For large datasets: Stream from disk
dataset = TSDataset(
location=data_dir,
load2ram=False,
max_open_files=8 # Limit file handles
)
Format | Extension | Use Case | Example |
---|---|---|---|
HDF5 | .h5 , .hdf5 |
Large scientific datasets | Multi-sensor recordings |
CSV | .csv |
Tabular data | Weather, financial data |
NumPy | .npz , .npy |
Array data | Signal processing |
PyTorch | .pt , .pth |
Tensor data | Pre-processed features |
- Small datasets (< 100MB): Use
load2ram=True
- Medium datasets (100MB - 1GB): Use
load2ram=False
,max_open_files=16-32
- Large datasets (> 1GB): Use
load2ram=False
,max_open_files=4-8
, apply metadata filters - Memory estimation:
sample_size × dataset_length ÷ (1024²)
MB
See example.ipynb
for comprehensive examples including:
- Different file format demonstrations
- Window configuration patterns
- Performance comparisons
- Real-world use cases with sensor data
The Time Series Feature Engineering module provides a comprehensive suite of time-domain, frequency-domain, and advanced signal processing features focused on electromyography (EMG) signal analysis. The FeatureEngineer
class offers a unified interface for extracting multiple features from EMG data with support for multithreading and seamless integration with PyTorch workflows.
For simplicity we will continue to refer to EMG data, but the features can be applied to any time series data.
The feature extraction library includes 20+ features organized into several categories:
- Mean Absolute Value (MAV):
weighted_mav
,mav_type1
,mav_type2
- Root Mean Square (RMS):
rms
- Statistical Moments:
variance
,standard_deviation
,temporal_moment
- Signal Integration:
ssi
(Simple Squared Integral) - Signal Morphology:
7F2
waveform_length
,zero_crossings_improved
,slope_sign_changes
- Amplitude Analysis:
log_detector
,myopulse_percentage_rate
- Signal Distribution:
histogram
- Spectral Moments:
mean_frequency
,median_frequency
,spectral_moments
- Peak Analysis:
peak_frequency
- Energy Distribution:
spectral_energy_bands
- Wavelet Analysis:
wavelet_energy
(requires PyWavelets) - Parametric Modeling:
autoregressive_coefficients
The FeatureEngineer
class provides a powerful interface for orchestrating feature extraction with advanced capabilities:
- Multithreading Support: Parallel feature computation for improved performance
- Flexible Configuration: YAML/dict-based configuration or programmatic setup
- PyTorch Integration: Direct tensor output for deep learning workflows
- Robust Error Handling: Graceful handling of missing dependencies and computation errors
- Batch Processing: Efficient processing of multi-dimensional data
from tsds.features.feature_engineer import FeatureEngineer
import numpy as np
# Create sample EMG data (batch_size, sequence_length, channels)
emg_data = np.random.randn(10, 1000, 8) # 10 trials, 1000 samples, 8 channels
# Configuration-based setup
config = {
'weighted_mav': {'axis': 1}, # Mean absolute value across time
'rms': {'axis': 1}, # Root mean square
'variance': {'axis': 1}, # Signal variance
'mean_frequency': {'fs': 1000.0, 'axis': 1}, # Mean frequency (1kHz sampling)
'spectral_energy_bands': {'fs': 1000.0, 'axis': 1} # Energy in frequency bands
}
# Create feature engineer with multithreading
fe = FeatureEngineer.from_config(
config,
num_workers=4, # Use 4 threads for parallel processing
return_tensor=True # Return PyTorch tensors
)
# Extract features
features, info = fe(emg_data)
print(f"Input shape: {emg_data.shape}") # (10, 1000, 8)
print(f"Features shape: {features.shape}") # (10, n_features)
print(f"Feature info: {list(info.keys())}") # ['weighted_mav', 'rms', ...]
# Quick setup with default parameters
feature_names = ['weighted_mav', 'rms', 'variance', 'waveform_length']
fe = FeatureEngineer.from_function_list(
feature_names,
num_workers=2,
default_axis=1 # Compute across time dimension
)
features, info = fe(emg_data)
# Complex configuration with custom parameters
advanced_config = {
'weighted_mav': {'axis': 1},
'temporal_moment': {'order': 3, 'axis': 1},
'zero_crossings_improved': {'threshold': 0.01, 'axis': 1},
'histogram': {'n_bins': 15, 'axis': 1},
'spectral_moments': {'fs': 2000.0, 'order': 2, 'axis': 1},
'wavelet_energy': {'wavelet': 'db6', 'levels': 5, 'axis': 1},
'autoregressive_coefficients': {'order': 6, 'axis': 1}
}
fe = FeatureEngineer.from_config(advanced_config, num_workers=6)
features, info = fe(emg_data)
# Access feature slices
mav_features = features[:, info['weighted_mav']['slice'][0]:info['weighted_mav']['slice'][1]]
The FeatureEngineer
integrates seamlessly with TSDataset
for end-to-end EMG processing workflows:
from tsds._core.dataset import TSDataset
from tsds.features.feature_engineer import FeatureEngineer
from torch.utils.data import DataLoader
# Load EMG dataset
emg_dataset = TSDataset(
location="data/emg_recordings",
input_window_length=200, # 200ms windows at 1kHz
output_window_length=50, # 50ms prediction window
prediction_offset=10, # 10ms prediction offset
features=['emg_ch1', 'emg_ch2', 'emg_ch3', 'emg_ch4'],
labels=['muscle_activation']
)
# Setup feature extraction
feature_config = {
'weighted_mav': {'axis': 1},
'rms': {'axis': 1},
'waveform_length': {'axis': 1},
'mean_frequency': {'fs': 1000.0, 'axis': 1},
'spectral_energy_bands': {'fs': 1000.0, 'axis': 1}
}
fe = FeatureEngineer.from_config(feature_config, num_workers=4)
# Custom collate function for DataLoader
def emg_collate_fn(batch):
"""Custom collate function that applies feature extraction"""
inputs = torch.stack([item['input'] for item in batch])
targets = torch.stack([item['targets'] for item in batch])
# Extract features from raw EMG signals
features, _ = fe(inputs.numpy())
return {
'raw_emg': inputs,
'features': features,
'targets': targets
}
# Create DataLoader with feature extraction
dataloader = DataLoader(
emg_dataset,
batch_size=32,
shuffle=True,
collate_fn=emg_collate_fn
)
# Training loop with features
for batch in dataloader:
raw_emg = batch['raw_emg'] # Original EMG signals
features = batch['features'] # Extracted features
targets = batch['targets'] # Labels
# Use either raw EMG or features for your model
predictions = model(features) # Feature-based model
# OR
predictions = model(raw_emg) # Raw signal model
The FeatureEngineer
supports parallel feature computation for improved performance:
import time
# Single-threaded processing
fe_single = FeatureEngineer.from_config(config, num_workers=1)
start_time = time.time()
features_single, _ = fe_single(large_emg_data)
single_thread_time = time.time() - start_time
# Multi-threaded processing
fe_multi = FeatureEngineer.from_config(config, num_workers=8)
start_time = time.time()
features_multi, _ = fe_multi(large_emg_data)
multi_thread_time = time.time() - start_time
print(f"Single-threaded: {single_thread_time:.2f}s")
print(f"Multi-threaded: {multi_thread_time:.2f}s")
print(f"Speedup: {single_thread_time/multi_thread_time:.1f}x")
# Results are identical
assert torch.allclose(features_single, features_multi, rtol=1e-10)
Add your own custom feature functions to the extraction pipeline:
def custom_energy_ratio(x: np.ndarray, axis: int = 0) -> np.ndarray:
"""Custom feature: ratio of signal energy in first half vs second half"""
if axis == 0: # Across time
mid = x.shape[0] // 2
first_half_energy = np.sum(x[:mid]**2, axis=0)
second_half_energy = np.sum(x[mid:]**2, axis=0)
else: # Across channels
mid = x.shape[1] // 2
first_half_energy = np.sum(x[:, :mid]**2, axis=1)
second_half_energy = np.sum(x[:, mid:]**2, axis=1)
return first_half_energy / (second_half_energy + 1e-8)
# Register the custom function
FeatureEngineer.register_func(custom_energy_ratio)
# Use in configuration
config_with_custom = {
'weighted_mav': {'axis': 1},
'custom_energy_ratio': {'axis': 1}
}
fe = FeatureEngineer.from_config(config_with_custom)
Here's a complete example showing EMG feature extraction in a real-world scenario:
import numpy as np
import torch
from torch.utils.data import DataLoader
from tsds._core.dataset import TSDataset
from tsds.features.feature_engineer import FeatureEngineer
# 1. Load EMG dataset
dataset = TSDataset(
location="data/emg_gestures",
input_window_length=500, # 500ms windows
output_window_length=1, # Single gesture label
prediction_offset=1,
features=['emg_1', 'emg_2', 'emg_3', 'emg_4', 'emg_5', 'emg_6'],
labels=['gesture_id']
)
# 2. Configure comprehensive feature extraction
comprehensive_config = {
# Time domain
'weighted_mav': {'axis': 1},
'rms': {'axis': 1},
'variance': {'axis': 1},
'waveform_length': {'axis': 1},
'zero_crossings_improved': {'axis': 1},
'slope_sign_changes': {'axis': 1},
# Frequency domain
'mean_frequency': {'fs': 1000.0, 'axis': 1},
'median_frequency': {'fs': 1000.0, 'axis': 1},
'peak_frequency': {'fs': 1000.0, 'axis': 1},
'spectral_energy_bands': {'fs': 1000.0, 'axis': 1},
# Advanced
'wavelet_energy': {'wavelet': 'db4', 'levels': 4, 'axis': 1},
'autoregressive_coefficients': {'order': 4, 'axis': 1}
}
# 3. Create feature engineer with optimal settings
fe = FeatureEngineer.from_config(
comprehensive_config,
num_workers=6, # Parallel processing
return_tensor=True # PyTorch tensors
)
# 4. Process data with feature extraction
def feature_collate_fn(batch):
raw_signals = torch.stack([item['input'] for item in batch])
labels = torch.stack([item['targets'] for item in batch])
# Extract features
features, feature_info = fe(raw_signals.numpy())
return {
'features': features,
'labels': labels.squeeze(),
'feature_info': feature_info
}
# 5. Create DataLoader
train_loader = DataLoader(
dataset,
batch_size=64,
shuffle=True,
collate_fn=feature_collate_fn,
num_workers=4
)
# 6. Training loop
for batch in train_loader:
features = batch['features'] # Shape: (64, total_features)
labels = batch['labels'] # Shape: (64,)
# features now contains all extracted EMG features ready for ML models
print(f"Batch features shape: {features.shape}")
print(f"Feature names: {list(batch['feature_info'].keys())}")
# Train your gesture recognition model
# predictions = model(features)
# loss = criterion(predictions, labels)
break
print(f"Total features extracted: {features.shape[1]}")
print(f"Available feature types: {fe.get_feature_names()}")
- Small datasets (< 1000 samples):
num_workers=1-2
- Medium datasets (1k-10k samples):
num_workers=4-6
- Large datasets (> 10k samples):
num_workers=6-8
- Memory usage: Each feature typically adds 1-8 values per channel
- Computation time: Frequency domain features are more expensive than time domain
The FeatureEngineer includes robust error handling:
# Missing optional dependencies (PyWavelets, scipy) are handled gracefully
config_with_optional = {
'rms': {'axis': 1}, # Always available
'wavelet_energy': {'axis': 1}, # Requires PyWavelets
'spectral_moments': {'axis': 1} # Requires scipy
}
fe = FeatureEngineer.from_config(config_with_optional)
# Functions with missing dependencies will log warnings and provide fallbacks
This comprehensive feature engineering system enables sophisticated time series analysis with minimal code while maintaining high performance and flexibility for research and real-world applications.
Tide provides a comprehensive signal processing toolkit designed for real-time and batch processing of time series data. The filtering system is built with modularity and performance in mind, supporting both individual filters and composable filter chains.
A high-performance digital filter for frequency-domain signal conditioning:
from tsds.preprocessing.filter import ButterBandPassFilter
# Configure filter parameters
filter = ButterBandPassFilter(
fs=1000, # Sampling frequency (Hz)
lowcut=20, # Low cutoff frequency (Hz)
highcut=450, # High cutoff frequency (Hz)
order=4 # Filter order
)
# Apply to time series data
filtered_signal = filter(raw_signal)
Applications:
- Noise reduction in sensor data
- Frequency band isolation
- Signal conditioning for feature extraction
An adaptive filter providing optimal balance between smoothing and responsiveness:
from tsds.preprocessing.filter import OneEuroFilter
# Configure for real-time applications
filter = OneEuroFilter(
min_cutoff=1.0, # Minimum cutoff frequency
beta=0.1 # Responsiveness parameter
)
# Real-time processing
smoothed_signal = filter(noisy_signal)
Applications:
- Real-time signal smoothing
- Gesture recognition systems
- Interactive applications requiring low latency
The FilterStack
class enables composition of multiple filters with automatic dimensionality management:
from tsds.preprocessing.filter.stack import FilterStack
# Create multi-filter pipeline
filters = [
ButterBandPassFilter(fs=1000, lowcut=20, highcut=100, order=4),
ButterBandPassFilter(fs=1000, lowcut=100, lowcut=300, order=4),
OneEuroFilter(min_cutoff=1.0, beta=0.1)
]
filter_stack = FilterStack(filters=filters)
# Optional: Add identity filter as bypass
filter_stack.add_identity()
# Process data through all filters
# Output shape: (batch, filters, window, channels)
multi_filtered = filter_stack(input_signal)
Automatic Reshaping:
Filter stacks automatically reshape outputs to (B, F, W, C)
format (batch, filters, window, channels), making them directly compatible with 2D convolutional neural networks for signal processing applications.
Extend the filtering system with domain-specific implementations:
from tsds.preprocessing.filter.base import BaseFilter
import numpy as np
class MovingAverageFilter(BaseFilter):
def __init__(self, window_size: int):
super().__init__()
self.window_size = window_size
def apply(self, signal: np.ndarray) -> np.ndarray:
"""Apply moving average filter"""
return np.convolve(signal,
np.ones(self.window_size) / self.window_size,
mode='same')
# Use custom filter
ma_filter = MovingAverageFilter(window_size=10)
smoothed = ma_filter(raw_signal)
Tide provides sophisticated splitting strategies specifically designed for time series data, ensuring temporal dependencies are preserved and preventing data leakage between training, validation, and test sets.
Maintains temporal order with optional gap periods between splits:
from tsds.utils.splitting import ChronologicalSplitter
splitter = ChronologicalSplitter(
train_ratio=0.7,
val_ratio=0.15,
test_ratio=0.15,
gap_size=10 # Optional gap between splits
)
split_indices = splitter.split(dataset_length=10000)
Introduces deliberate gaps between splits to prevent temporal leakage:
from tsds.utils.splitting import GapBasedSplitter
splitter = GapBasedSplitter(
train_ratio=0.6,
val_ratio=0.2,
test_ratio=0.2,
train_val_gap=50, # Gap between train and validation
val_test_gap=50 # Gap between validation and test
)
Suitable for time series cross-validation with temporal constraints:
from tsds.utils.splitting import RollingWindowSplitter
splitter = RollingWindowSplitter(
n_splits=5,
train_size=1000,
test_size=200,
gap=50
)
# Generate multiple train/test splits
for train_idx, test_idx in splitter.split(dataset):
# Train and evaluate model
pass
from tsds._core.dataset import TSDataset
from tsds.utils.splitting import ChronologicalSplitter
# Load time series dataset
dataset = TSDataset(
location="data/sensor_data.csv",
features=['temperature', 'pressure', 'humidity'],
labels=['target_variable'],
input_window_length=24,
output_window_length=6,
prediction_o
D8DF
ffset=1
)
# Apply temporal splitting
splitter = ChronologicalSplitter(train_ratio=0.7, val_ratio=0.15, test_ratio=0.15)
split_indices = splitter.split(len(dataset))
# Create dataset splits
datasets = dataset.split_dataset(split_indices)
train_dataset = datasets['train']
val_dataset = datasets['val']
test_dataset = datasets['test']
The splitting utilities include comprehensive validation to ensure:
- No temporal overlap between splits
- Minimum size requirements are met
- Proper handling of edge cases
- Temporal ordering preservation
While Tide is designed as a general-purpose time series framework, it includes specialized extensions for specific domains and applications.
Tide provides enhanced support for biomedical time series analysis, including specialized features and preprocessing methods commonly used in physiological signal processing.
The feature engineering module includes electromyography (EMG) specific features:
from tsds.features.feature_engineer import FeatureEngineer
# EMG-optimized feature configuration
emg_config = {
'weighted_mav': {'axis': 1}, # Mean Absolute Value
'rms': {'axis': 1}, # Root Mean Square
'waveform_length': {'axis': 1}, # Signal complexity
'zero_crossings_improved': {'axis': 1}, # Frequency content
'mean_frequency': {'fs': 1000.0, 'axis': 1}, # Spectral analysis
'spectral_energy_bands': {'fs': 1000.0, 'axis': 1} # Energy distribution
}
fe = FeatureEngineer.from_config(emg_config, num_workers=4)
features, info = fe(emg_data)
Tide includes plugin-style support for popular EMG datasets, providing standardized access while maintaining the framework's general-purpose design.
Tide includes comprehensive support for the NinaPro EMG datasets through specialized dataset classes that provide seamless access to multimodal EMG data with advanced time series capabilities:
from tsds.emg import NinaProDB1, NinaProDB2
# Load NinaPro DB1 with custom feature/label selection
dataset = NinaProDB1(
subject_idx=[1, 2, 3],
feature_columns=["emg"], # EMG signals as features
label_columns=["glove"], # Hand pose as labels
input_window_length=200,
output_window_length=1,
auto_download=True # Automatic data management
)
# Multi-modal configuration for DB2
multimodal = NinaProDB2(
feature_columns=["emg", "acc"], # EMG + accelerometer
label_columns=["glove", "force"], # Pose + force targets
input_window_length=250
)
# Check sampling rates and available modalities
print(f"EMG sampling rate: {dataset.get_sampling_rate('emg')} Hz")
print(f"Available modalities: {dataset.get_available_modalities()}")
Key Features:
- 9 NinaPro Databases: Complete support from DB1 to DB9
- Sampling Rate Awareness: Automatic handling of different acquisition frequencies (100 Hz to 2 kHz)
- Custom Modality Selection: Flexible feature/label column specification
- Automatic Data Management: Download, conversion, and caching handled automatically
For comprehensive usage examples, configuration options, and database specifications, see the NinaPro Integration Guide.
from emgdatasets.datasets import EMG2Pose
# Load pose estimation dataset
dataset = EMG2Pose(full=False) # Use smaller subset for development
# Access multimodal data
for batch in dataset:
emg_signals = batch["emg"]
joint_angles = batch["joint_angles"]
# Process multimodal time series data
For real-time applications, the framework includes efficient downsampling utilities:
from emgdatasets.preprocessing.sampling import DownSampler
# Reduce sampling rate for real-time processing
downsampler = DownSampler(target_freq=500)
resampled_dataset = downsampler(original_dataset)
Install Tide using Poetry (recommended) or pip:
# Clone the repository
git clone https://github.com/RobinU434/EMGDatasets
cd EMGDatasets
# Install with Poetry
poetry install
# Activate virtual environment
poetry shell
# Clone and install
git clone https://github.com/RobinU434/EMGDatasets
cd EMGDatasets
pip install -e .
Install additional packages for extended functionality:
# For wavelet analysis
pip install PyWavelets
# For advanced signal processing
pip install scipy
# For configuration management
pip install omegaconf
from tsds._core.dataset import TSDataset
# Load CSV time series data
dataset = TSDataset(
location="data/sensor_readings.csv",
features=['temperature', 'humidity', 'pressure'],
labels=['energy_consumption'],
input_window_length=24, # 24-hour input windows
output_window_length=6, # 6-hour predictions
prediction_offset=1 # 1-hour ahead forecasting
)
# Access samples
sample = dataset[0]
print(f"Input shape: {sample['input'].shape}") # [24, 3]
print(f"Target shape: {sample['targets'].shape}") # [6, 1]
from tsds.features.feature_engineer import FeatureEngineer
# Configure feature extraction
config = {
'rms': {'axis': 1}, # Root mean square
'variance': {'axis': 1}, # Signal variance
'mean_frequency': {'fs': 100.0, 'axis': 1}, # Spectral analysis
'waveform_length': {'axis': 1} # Signal complexity
}
# Extract features
fe = FeatureEngineer.from_config(config, num_workers=4)
features, info = fe(time_series_data)
from tsds.preprocessing.filter.stack import FilterStack
# Create multi-band filter
filter_stack = FilterStack.from_butter_configs(
lowcuts=[0.5, 10, 50], # Low cutoff frequencies
highcuts=[10, 50, 200], # High cutoff frequencies
orders=[4, 4, 4], # Filter orders
fs=1000 # Sampling frequency
)
# Apply filters
filtered_signals = filter_stack(raw_signals)
# Output: (batch, filters, time, channels)
import torch
from torch.utils.data import DataLoader
from tsds._core.dataset import TSDataset
from tsds.features.feature_engineer import FeatureEngineer
from tsds.utils.splitting import ChronologicalSplitter
# 1. Load dataset
dataset = TSDataset(
location="data/industrial_sensors",
input_window_length=100,
output_window_length=20,
prediction_offset=5
)
# 2. Split temporally
splitter = ChronologicalSplitter(train_ratio=0.7, val_ratio=0.15, test_ratio=0.15)
splits = dataset.split_dataset(splitter.split(len(dataset)))
# 3. Setup feature extraction
fe = FeatureEngineer.from_function_list(['rms', 'variance', 'mean_frequency'])
# 4. Create data loaders with feature extraction
def collate_fn(batch):
inputs = torch.stack([item['input'] for item in batch])
targets = torch.stack([item['targets'] for item in batch])
features, _ = fe(inputs.numpy())
return {'features': features, 'targets': targets}
train_loader = DataLoader(splits['train'], batch_size=32, collate_fn=collate_fn)
# 5. Training loop
for batch in train_loader:
features = batch['features'] # Extracted features
targets = batch['targets'] # Ground truth
# Train your model...
Run the comprehensive test suite:
# Run all tests
pytest
# Run specific test modules
pytest test/test_ts_dataset.py test/test_feature_engineer.py
# Run with coverage
pytest --cov=tsds
Tide welcomes contributions from the research and development community. Whether you're adding new features, fixing bugs, or improving documentation, your contributions help advance time series analysis capabilities.
# Clone and setup development environment
git clone https://github.com/RobinU434/EMGDatasets
cd EMGDatasets
poetry install --with dev
# Run tests
pytest
# Run linting
black tsds/
flake8 tsds/
- Fork and Branch: Create a feature branch from the main branch
- Code Quality: Follow PEP 8 and include comprehensive tests
- Documentation: Update documentation for new features
- Testing: Ensure all tests pass and add tests for new functionality
- Pull Request: Submit a clear pull request with description of changes
- New Data Loaders: Support for additional file formats
- Feature Functions: Domain-specific feature extraction methods
- Filter Implementations: Advanced signal processing filters
- Splitting Strategies: Novel temporal splitting approaches
- Dataset Plugins: Integration with new time series datasets
- Performance Optimizations: Efficiency improvements
- Documentation: Examples, tutorials, and API documentation
When contributing dataset support, please ensure:
- Compliance with dataset licensing terms
- Proper attribution and citations
- Standardized API following existing patterns
- Comprehensive testing and documentation
This project is licensed under the MIT License. The MIT License provides broad permissions for use, modification, and distribution while maintaining attribution requirements.
If you use Tide in your research, please consider citing:
@software{tide_framework,
title={Tide: Torch Integrated Data Engine for Time Series Analysis},
author={Uhrich, Robin},
year={2024},
url={https://github.com/RobinU434/Tide}
}
For questions, support, or collaboration opportunities, please create an issue in the repository or contact the maintainers directly.