src.actions.windowing module

Windowing Actions Module

This module provides actions for creating time series windows and sequence processing within the SNN2 neural network framework. It contains utilities for:

  • Time series windowing with configurable window sizes and strides

  • Batch processing of multiple DataFrames with windowing operations

  • Anomaly window flagging for time series data

  • Post-processing operations on windowed data

  • Masking operations to handle experiment boundaries

The module is designed to work with pandas DataFrames and TensorFlow tensors, providing essential functionality for preparing sequential data for neural network training and analysis.

Functions

windowingfunction

Create time series windows from DataFrame data with masking.

applyRequestsfunction

Apply windowing requests using TensorFlow’s extract_patches for efficient processing.

listWindowingfunction

Apply windowing operations to a list of DataFrames and concatenate results.

flag_anomaly_windowsfunction

Flag windows containing at least one anomaly for time series anomaly detection.

Notes

All functions in this module use the @action and @f_logger decorators for consistent logging and action tracking within the SNN2 framework. The module handles time series data with proper masking to avoid creating windows that span across different experiments or operational periods.

The windowing operations support various post-processing operations that can be applied to the generated windows, allowing for flexible data preprocessing pipelines.

Examples

Basic time series windowing workflow:

>>> # Define windowing requests
>>> requests = DataManager()
>>> requests['features'] = {
>>>     'columns': ['feature1', 'feature2'],
>>>     'dtype': np.float32,
>>>     'post_operation': None
>>> }
>>>
>>> # Apply windowing to single DataFrame
>>> windowed_data = windowing(df, requests, window=10)
>>>
>>> # Apply windowing to multiple DataFrames
>>> multi_windowed = listWindowing(df_list, window=6, requests=requests)
>>>
>>> # Flag anomaly windows
>>> flagged_df = flag_anomaly_windows(df, anomaly_column='anomaly')

See also

SNN2.src.core.data.DataManager

DataManager class for data handling

SNN2.src.decorators.decorators

Action and logging decorators

tensorflow.keras.preprocessing.timeseries_dataset_from_array

TensorFlow windowing utility

src.actions.windowing.applyRequests(df: ~pandas.core.frame.DataFrame, requests: ~typing.Dict[str, ~typing.Dict[str, ~typing.Any]], window_size: int = 120, stride: int = 1, *, logger=None, write_msg=<function f_logger.<locals>.__dummy_log>, **kwargs) Dict[str, Dict[str, Any]]

Apply windowing requests using TensorFlow’s extract_patches for efficient processing.

This function creates sliding windows from DataFrame data using TensorFlow’s image patch extraction technique adapted for time series data. It supports configurable stride lengths and handles multiple operational IDs with proper boundary masking.

Parameters:
  • df (pd.DataFrame) – Input DataFrame containing time series data with ‘op_id’ column for grouping different operational periods.

  • requests (Dict[str, Dict[str, Any]]) – Dictionary of data extraction requests, where each request contains: - ‘columns’: List of column names to extract - ‘dtype’: Optional data type for conversion - ‘post_operation’: Optional list of operations to apply - ‘post_operation_args’: Arguments for post-operations - ‘post_operation_kwargs’: Keyword arguments for post-operations

  • window_size (int, default=120) – Size of the sliding window (number of time steps).

  • stride (int, default=1) – Step size for sliding the window (1 = no overlap reduction).

  • **kwargs (dict) – Additional keyword arguments containing logger and write_msg from decorators.

Returns:

Updated requests dictionary with windowed TensorFlow tensors added as default values for each request.

Return type:

Dict[str, Dict[str, Any]]

Notes

The function uses tf.image.extract_patches to efficiently create sliding windows from reshaped time series data. This approach is more efficient than traditional looping methods for large datasets.

Masking is applied to prevent windows from spanning across different operational IDs (op_id), ensuring data integrity in multi-experiment scenarios.

Special handling is provided for ‘timestamp’ columns, which are converted to Unix timestamps if present in the request.

Examples

>>> requests = {
...     'sensors': {
...         'columns': ['temp', 'pressure', 'timestamp'],
...         'dtype': np.float32,
...         'post_operation': [tf.nn.l2_normalize],
...         'post_operation_args': [()],
...         'post_operation_kwargs': [{'axis': -1}]
...     }
... }
>>> processed_requests = applyRequests(
...     df, requests, window_size=60, stride=10
... )
src.actions.windowing.flag_anomaly_windows(df: DataFrame, anomaly_column: str = 'anomaly', window_column: str = 'window', new_clm_name: str = 'anomaly_window', drop_old_clm: bool = True) DataFrame

Flag windows that contain at least one anomaly for time series anomaly detection.

This function creates a new binary column that flags entire windows as anomalous if they contain at least one anomalous data point. This is useful for window-based anomaly detection where the presence of any anomaly in a window makes the entire window suspicious.

Parameters:
  • df (pd.DataFrame) – Input DataFrame containing time series data with anomaly and window information.

  • anomaly_column (str, default="anomaly") – Name of the column containing binary anomaly indicators (0/1 or False/True).

  • window_column (str, default="window") – Name of the column containing window identifiers that group data points into windows.

  • new_clm_name (str, default="anomaly_window") – Name of the new column that will contain the window-level anomaly flags.

  • drop_old_clm (bool, default=True) – Whether to drop the original anomaly column after creating the window flags.

Returns:

DataFrame with the new window-level anomaly flag column added. If drop_old_clm is True, the original anomaly column is removed.

Return type:

pd.DataFrame

Notes

The function uses pandas groupby with transform(‘max’) to propagate any anomaly flag within a window to the entire window. This creates a conservative approach where any suspicious activity flags the whole window.

The input anomaly and window columns are converted to integers before processing to ensure consistent data types.

This approach is particularly useful in scenarios where anomalies might have temporal dependencies or where context around anomalous points is important for analysis.

Examples

>>> # Flag windows containing anomalies
>>> flagged_df = flag_anomaly_windows(
...     df,
...     anomaly_column='is_anomaly',
...     window_column='time_window',
...     new_clm_name='window_anomaly'
... )
>>>
>>> # Count anomalous windows
>>> anomaly_windows = flagged_df['window_anomaly'].sum()
>>> print(f"Found {anomaly_windows} anomalous windows")
src.actions.windowing.listWindowing(dfs: ~typing.List[~pandas.core.frame.DataFrame], window: int = 6, requests: ~SNN2.src.decorators.decorators.c_logger.<locals>.augmented_cls | None = None, *, logger=None, write_msg=<function f_logger.<locals>.__dummy_log>, **kwargs) Dict[str, Dict[str, Any]]

Apply windowing operations to a list of DataFrames and concatenate results.

This function processes multiple DataFrames with the same windowing configuration and concatenates the results into unified tensors. It’s useful for batch processing multiple experimental runs or data files.

Parameters:
  • dfs (List[pd.DataFrame]) – List of pandas DataFrames to be processed with windowing operations. Each DataFrame should have consistent structure and column names.

  • window (int, default=6) – Size of the sliding window (number of time steps) to apply to each DataFrame.

  • requests (Optional[DataManager], optional) – DataManager instance containing windowing requests configuration. Must not be None and should contain request specifications.

  • **kwargs (dict) – Additional keyword arguments containing logger and write_msg from decorators.

Returns:

DataManager instance with concatenated windowed tensors from all input DataFrames. Each request contains the merged results.

Return type:

Dict[str, Dict[str, Any]]

Raises:

AssertionError – If requests parameter is None.

Notes

The function applies the windowing operation to each DataFrame individually using the same requests configuration, then concatenates all results along the first axis (batch dimension).

This approach is memory-efficient for processing large numbers of DataFrames as it processes them sequentially rather than loading all data into memory simultaneously.

All DataFrames in the input list should have compatible structures to ensure successful concatenation.

Examples

>>> # Process multiple experiment files
>>> dataframes = [pd.read_csv(f'exp_{i}.csv') for i in range(10)]
>>> requests = DataManager()
>>> requests['measurements'] = {
...     'columns': ['sensor1', 'sensor2'],
...     'post_operation': None
... }
>>> combined_windows = listWindowing(dataframes, window=12, requests=requests)
src.actions.windowing.windowing(df: ~pandas.core.frame.DataFrame, requests: ~typing.Dict[str, ~typing.Dict[str, ~typing.Any]], window: int, *, logger=None, write_msg=<function f_logger.<locals>.__dummy_log>, **kwargs) List[Tensor]

Create time series windows from DataFrame data with experiment boundary masking.

This function creates sliding windows from time series data in a DataFrame, with intelligent masking to prevent windows from spanning across different experiments or operational periods. It supports post-processing operations and handles multiple data requests simultaneously.

Parameters:
  • df (pd.DataFrame) – Input DataFrame containing time series data with a ‘second’ column indicating time progression within experiments.

  • requests (Dict[str, Dict[str, Any]]) – Dictionary of data extraction requests, where each request contains: - ‘columns’: List of column names to extract - ‘dtype’: Optional data type for conversion - ‘post_operation’: Optional list of operations to apply - ‘post_operation_args’: Arguments for post-operations - ‘post_operation_kwargs’: Keyword arguments for post-operations

  • window (int) – Size of the sliding window (number of time steps).

  • **kwargs (dict) – Additional keyword arguments containing logger and write_msg from decorators.

Returns:

List of TensorFlow tensors containing windowed data for each request, with experiment boundary masking applied.

Return type:

List[tf.Tensor]

Raises:

Exception – If window size is larger than the minimum experiment duration.

Notes

The function creates a boolean mask to exclude windows that would span across experiment boundaries. This ensures that each window contains data from a single experimental run.

The masking logic assumes that all experiments have the same duration, determined by the maximum value in the ‘second’ column.

Post-processing operations are applied sequentially if specified in the request configuration.

Examples

>>> requests = {
...     'features': {
...         'columns': ['sensor1', 'sensor2'],
...         'dtype': np.float32,
...         'post_operation': None
...     }
... }
>>> windowed_tensors = windowing(time_series_df, requests, window=10)