SNN2.src.actions.windowing module
Windowing Actions Module
This module provides actions for creating time series windows and sequence processing within the SNN2 neural network framework. It contains utilities for:
Time series windowing with configurable window sizes and strides
Batch processing of multiple DataFrames with windowing operations
Anomaly window flagging for time series data
Post-processing operations on windowed data
Masking operations to handle experiment boundaries
The module is designed to work with pandas DataFrames and TensorFlow tensors, providing essential functionality for preparing sequential data for neural network training and analysis.
Functions
- windowingfunction
Create time series windows from DataFrame data with masking.
- applyRequestsfunction
Apply windowing requests using TensorFlow’s extract_patches for efficient processing.
- listWindowingfunction
Apply windowing operations to a list of DataFrames and concatenate results.
- flag_anomaly_windowsfunction
Flag windows containing at least one anomaly for time series anomaly detection.
Notes
All functions in this module use the @action and @f_logger decorators for consistent logging and action tracking within the SNN2 framework. The module handles time series data with proper masking to avoid creating windows that span across different experiments or operational periods.
The windowing operations support various post-processing operations that can be applied to the generated windows, allowing for flexible data preprocessing pipelines.
Examples
Basic time series windowing workflow:
>>> # Define windowing requests
>>> requests = DataManager()
>>> requests['features'] = {
>>> 'columns': ['feature1', 'feature2'],
>>> 'dtype': np.float32,
>>> 'post_operation': None
>>> }
>>>
>>> # Apply windowing to single DataFrame
>>> windowed_data = windowing(df, requests, window=10)
>>>
>>> # Apply windowing to multiple DataFrames
>>> multi_windowed = listWindowing(df_list, window=6, requests=requests)
>>>
>>> # Flag anomaly windows
>>> flagged_df = flag_anomaly_windows(df, anomaly_column='anomaly')
See also
SNN2.src.core.data.DataManagerDataManager class for data handling
SNN2.src.decorators.decoratorsAction and logging decorators
tensorflow.keras.preprocessing.timeseries_dataset_from_arrayTensorFlow windowing utility
- SNN2.src.actions.windowing.applyRequests(df: ~pandas.core.frame.DataFrame, requests: ~typing.Dict[str, ~typing.Dict[str, ~typing.Any]], window_size: int = 120, stride: int = 1, *, logger=None, write_msg=<function f_logger.<locals>.__dummy_log>, **kwargs) Dict[str, Dict[str, Any]]
Apply windowing requests using TensorFlow’s extract_patches for efficient processing.
This function creates sliding windows from DataFrame data using TensorFlow’s image patch extraction technique adapted for time series data. It supports configurable stride lengths and handles multiple operational IDs with proper boundary masking.
- Parameters:
df (pd.DataFrame) – Input DataFrame containing time series data with ‘op_id’ column for grouping different operational periods.
requests (Dict[str, Dict[str, Any]]) – Dictionary of data extraction requests, where each request contains: - ‘columns’: List of column names to extract - ‘dtype’: Optional data type for conversion - ‘post_operation’: Optional list of operations to apply - ‘post_operation_args’: Arguments for post-operations - ‘post_operation_kwargs’: Keyword arguments for post-operations
window_size (int, default=120) – Size of the sliding window (number of time steps).
stride (int, default=1) – Step size for sliding the window (1 = no overlap reduction).
**kwargs (dict) – Additional keyword arguments containing logger and write_msg from decorators.
- Returns:
Updated requests dictionary with windowed TensorFlow tensors added as default values for each request.
- Return type:
Dict[str, Dict[str, Any]]
Notes
The function uses tf.image.extract_patches to efficiently create sliding windows from reshaped time series data. This approach is more efficient than traditional looping methods for large datasets.
Masking is applied to prevent windows from spanning across different operational IDs (op_id), ensuring data integrity in multi-experiment scenarios.
Special handling is provided for ‘timestamp’ columns, which are converted to Unix timestamps if present in the request.
Examples
>>> requests = { ... 'sensors': { ... 'columns': ['temp', 'pressure', 'timestamp'], ... 'dtype': np.float32, ... 'post_operation': [tf.nn.l2_normalize], ... 'post_operation_args': [()], ... 'post_operation_kwargs': [{'axis': -1}] ... } ... } >>> processed_requests = applyRequests( ... df, requests, window_size=60, stride=10 ... )
- SNN2.src.actions.windowing.flag_anomaly_windows(df: DataFrame, anomaly_column: str = 'anomaly', window_column: str = 'window', new_clm_name: str = 'anomaly_window', drop_old_clm: bool = True) DataFrame
Flag windows that contain at least one anomaly for time series anomaly detection.
This function creates a new binary column that flags entire windows as anomalous if they contain at least one anomalous data point. This is useful for window-based anomaly detection where the presence of any anomaly in a window makes the entire window suspicious.
- Parameters:
df (pd.DataFrame) – Input DataFrame containing time series data with anomaly and window information.
anomaly_column (str, default="anomaly") – Name of the column containing binary anomaly indicators (0/1 or False/True).
window_column (str, default="window") – Name of the column containing window identifiers that group data points into windows.
new_clm_name (str, default="anomaly_window") – Name of the new column that will contain the window-level anomaly flags.
drop_old_clm (bool, default=True) – Whether to drop the original anomaly column after creating the window flags.
- Returns:
DataFrame with the new window-level anomaly flag column added. If drop_old_clm is True, the original anomaly column is removed.
- Return type:
pd.DataFrame
Notes
The function uses pandas groupby with transform(‘max’) to propagate any anomaly flag within a window to the entire window. This creates a conservative approach where any suspicious activity flags the whole window.
The input anomaly and window columns are converted to integers before processing to ensure consistent data types.
This approach is particularly useful in scenarios where anomalies might have temporal dependencies or where context around anomalous points is important for analysis.
Examples
>>> # Flag windows containing anomalies >>> flagged_df = flag_anomaly_windows( ... df, ... anomaly_column='is_anomaly', ... window_column='time_window', ... new_clm_name='window_anomaly' ... ) >>> >>> # Count anomalous windows >>> anomaly_windows = flagged_df['window_anomaly'].sum() >>> print(f"Found {anomaly_windows} anomalous windows")
- SNN2.src.actions.windowing.listWindowing(dfs: ~typing.List[~pandas.core.frame.DataFrame], window: int = 6, requests: ~SNN2.src.decorators.decorators.c_logger.<locals>.augmented_cls | None = None, *, logger=None, write_msg=<function f_logger.<locals>.__dummy_log>, **kwargs) Dict[str, Dict[str, Any]]
Apply windowing operations to a list of DataFrames and concatenate results.
This function processes multiple DataFrames with the same windowing configuration and concatenates the results into unified tensors. It’s useful for batch processing multiple experimental runs or data files.
- Parameters:
dfs (List[pd.DataFrame]) – List of pandas DataFrames to be processed with windowing operations. Each DataFrame should have consistent structure and column names.
window (int, default=6) – Size of the sliding window (number of time steps) to apply to each DataFrame.
requests (Optional[DataManager], optional) – DataManager instance containing windowing requests configuration. Must not be None and should contain request specifications.
**kwargs (dict) – Additional keyword arguments containing logger and write_msg from decorators.
- Returns:
DataManager instance with concatenated windowed tensors from all input DataFrames. Each request contains the merged results.
- Return type:
Dict[str, Dict[str, Any]]
- Raises:
AssertionError – If requests parameter is None.
Notes
The function applies the windowing operation to each DataFrame individually using the same requests configuration, then concatenates all results along the first axis (batch dimension).
This approach is memory-efficient for processing large numbers of DataFrames as it processes them sequentially rather than loading all data into memory simultaneously.
All DataFrames in the input list should have compatible structures to ensure successful concatenation.
Examples
>>> # Process multiple experiment files >>> dataframes = [pd.read_csv(f'exp_{i}.csv') for i in range(10)] >>> requests = DataManager() >>> requests['measurements'] = { ... 'columns': ['sensor1', 'sensor2'], ... 'post_operation': None ... } >>> combined_windows = listWindowing(dataframes, window=12, requests=requests)
- SNN2.src.actions.windowing.windowing(df: ~pandas.core.frame.DataFrame, requests: ~typing.Dict[str, ~typing.Dict[str, ~typing.Any]], window: int, *, logger=None, write_msg=<function f_logger.<locals>.__dummy_log>, **kwargs) List[Tensor]
Create time series windows from DataFrame data with experiment boundary masking.
This function creates sliding windows from time series data in a DataFrame, with intelligent masking to prevent windows from spanning across different experiments or operational periods. It supports post-processing operations and handles multiple data requests simultaneously.
- Parameters:
df (pd.DataFrame) – Input DataFrame containing time series data with a ‘second’ column indicating time progression within experiments.
requests (Dict[str, Dict[str, Any]]) – Dictionary of data extraction requests, where each request contains: - ‘columns’: List of column names to extract - ‘dtype’: Optional data type for conversion - ‘post_operation’: Optional list of operations to apply - ‘post_operation_args’: Arguments for post-operations - ‘post_operation_kwargs’: Keyword arguments for post-operations
window (int) – Size of the sliding window (number of time steps).
**kwargs (dict) – Additional keyword arguments containing logger and write_msg from decorators.
- Returns:
List of TensorFlow tensors containing windowed data for each request, with experiment boundary masking applied.
- Return type:
List[tf.Tensor]
- Raises:
Exception – If window size is larger than the minimum experiment duration.
Notes
The function creates a boolean mask to exclude windows that would span across experiment boundaries. This ensures that each window contains data from a single experimental run.
The masking logic assumes that all experiments have the same duration, determined by the maximum value in the ‘second’ column.
Post-processing operations are applied sequentially if specified in the request configuration.
Examples
>>> requests = { ... 'features': { ... 'columns': ['sensor1', 'sensor2'], ... 'dtype': np.float32, ... 'post_operation': None ... } ... } >>> windowed_tensors = windowing(time_series_df, requests, window=10)