Skip to content

Implement EventResampler for Cascaded Resampling#1372

Open
malteschaaf wants to merge 5 commits intofrequenz-floss:v1.x.xfrom
malteschaaf:event_resampler
Open

Implement EventResampler for Cascaded Resampling#1372
malteschaaf wants to merge 5 commits intofrequenz-floss:v1.x.xfrom
malteschaaf:event_resampler

Conversation

@malteschaaf
Copy link
Contributor

Problem

When cascading Timer-based resamplers (e.g., 1s → 10s) with align_to=UNIX_EPOCH, samples can be lost at window boundaries due to timing synchronization issues.

Example:

  • Resampler 1 emits samples at: 1s, 2s, ..., 10s, 11s, ...
  • Resampler 2 has windows: [0-10s), [10-20s)
  • A sample from Resampler 1 arriving at exactly t=10.0s may be excluded from the [0-10s) window
  • Result: DATA LOSS

This occurs because both resamplers use independent timers aligned to the same epoch, but their tick times are not synchronized.

Solution

EventResampler uses event-driven window management instead of fixed timer intervals. Windows are emitted when a sample arrives with a timestamp >= current window_end, not on a timer schedule.

Benefits:

  • No data loss at boundaries (samples trigger window closure)
  • Seamless cascading (no synchronization issues)
  • Window alignment maintained through simple addition
  • Event-driven (no polling, resource-efficient)

Implementation

Core Components

  1. EventResampler class: Event-driven resampler inheriting from Resampler

    • Overrides resample() with event-driven loop
    • Implements _process_sample() for window state management
    • Uses _calculate_window_end() (simplified for event-driven scenario)
  2. Sample callback mechanism: Added to StreamingHelper

    • register_sample_callback() method
    • Callback invoked when sample arrives
    • Enables event-driven processing without polling
  3. Refactored _emit_window(): Extracted from Resampler.resample()

    • Shared by both Timer-based and Event-driven resamplers
    • Reduces code duplication
    • Clear separation of concerns

Design Decision: asyncio.Queue + Callbacks

After evaluating multiple approaches (polling, direct channels, event signals, task-based multiplexing), I chose asyncio.Queue with callbacks because:

  • Encapsulation: No protected-access violations. Communication through public APIs only.
  • Event-Driven: Truly async/await pattern (await queue.get()), not polling.
  • Simplicity: Minimal code, immediately understandable, no complex state tracking.
  • Type Safety: Generic Queue[Sample[Quantity]] ensures type correctness.
  • Decoupling: Components don't need to know internal implementation details.
  • Built for Purpose: asyncio.Queue is designed specifically for this use case.

Changes Made

StreamingHelper

  • Added _sample_callback attribute to store callback function
  • Added register_sample_callback() method to register async callback
  • Modified _receive_samples() to invoke callback when sample arrives

Resampler

  • Extracted _emit_window() method from resample()
  • Refactored resample() to delegate window emission to _emit_window()

EventResampler (New)

  • Event-driven resampler for cascaded scenarios
  • Uses sample queue fed by callbacks instead of timer ticks
  • Maintains window alignment through simple addition
  • Simplified _calculate_window_end() (no warm-up period needed)

Tests

  • Comprehensive test suite for EventResampler
  • Parametrized tests for both aligned and non-aligned scenarios
  • Tests verify no data loss at boundaries, proper alignment, and window management

Open Question: Task Cancellation in remove_timeseries()

Observation: When remove_timeseries() is called, the _receiving_task continues running in the background. It will:

  • Continue reading from the source
  • Continue adding samples to the buffer
  • Continue invoking registered callbacks (EventResampler)

Current behavior:

def remove_timeseries(self, source: Source) -> bool:
    try:
        del self._resamplers[source]
    except KeyError:
        return False
    return True

Impact:

  • Memory Leak: Samples accumulate in the buffer but are never consumed
  • Resource Waste: Task runs unnecessarily
  • Critical for EventResampler: Queue grows unbounded with samples from removed sources
  • Relevant for TimerResampler: Buffer eventually stabilizes due to maxlen, but still wastes resources

Question for reviewers:
Should remove_timeseries() cancel the receiving task for both TimerResampler and EventResampler? If yes, we need to either:

  1. Make remove_timeseries() async and call await helper.stop()
  2. Add a synchronous cancel_receiving() method to _StreamingHelper

This should be addressed as a separate issue for both resamplers, not just EventResampler.

Backward Compatibility

  • No breaking changes
  • Resampler API unchanged
  • StreamingHelper additions are backward compatible (callback defaults to None)
  • EventResampler is opt-in (users must explicitly use it)

Usage

from datetime import timedelta
from frequenz.sdk.timeseries import EventResampler, ResamplerConfig

# Configure for cascaded resampling
config = ResamplerConfig(
    resampling_period=timedelta(seconds=10),
    resampling_function=my_resampling_func,
)

# Use EventResampler in second+ stage
resampler = EventResampler(config)
resampler.add_timeseries("my_source", source, sink)
await resampler.resample()

Testing

  • Unit tests cover all core functionality
  • Parametrized tests verify behavior with and without align_to
  • Key test demonstrates no data loss at boundaries
  • All existing tests continue to pass

Extract window emission logic from `resample()` into dedicated `_emit_window()` method to allow code sharing between Timer-based and Event-driven resampler implementations.

Signed-off-by: Malte Schaaf <malte.schaaf@frequenz.com>
Add ability for `StreamingHelper` to notify external consumers when samples arrive via a callback function. This enables event-driven resampler implementations to receive samples without polling internal buffers.

Changes:
- Added `_sample_callback` attribute to store the callback function
- Added `register_sample_callback()` method to register an async callback
- Modified `_receive_samples()` to invoke the callback when a sample is added to the buffer

This mechanism is used by `EventResampler` to implement event-driven sample processing instead of timer-based polling.

Signed-off-by: Malte Schaaf <malte.schaaf@frequenz.com>
Add `EventResampler` class that uses event-driven window management instead of timer-based intervals. This solves data loss issues when cascading resamplers.

Problem: When cascading Timer-based resamplers (e.g., 1s → 10s) with `align_to=UNIX_EPOCH`, samples can be lost at window boundaries due to timing synchronization.

Solution: EventResampler opens/closes windows based on sample arrival timestamps instead of fixed intervals, ensuring no data loss at boundaries.

Changes:
- Added `EventResampler` class that inherits from `Resampler`
- Windows are emitted when a sample arrives with `timestamp >= window_end`
- Maintains window alignment through simple addition of `resampling_period`
- Uses sample callback mechanism from `StreamingHelper` for event-driven processing

`EventResampler` is optimized for cascaded resampling and should not be used directly with raw, irregular data.

Signed-off-by: Malte Schaaf <malte.schaaf@frequenz.com>
Add test suite for `EventResampler` covering window initialization, boundary conditions, and alignment behavior. Tests are parametrized to verify correct behavior with and without `align_to` configuration.

Changes:
- Added tests for `EventResampler` initialization and window end calculation
- Added tests for sample processing before, at, and after window boundaries
- Added tests for correct behavior when samples cross multiple windows
- Added tests verifying window alignment is maintained through simple addition
- Added key test demonstrating no data loss at window boundaries

Tests use parametrized fixtures to cover both aligned and non-aligned window scenarios, ensuring the event-driven window management works correctly in all cases.

Signed-off-by: Malte Schaaf <malte.schaaf@frequenz.com>
Signed-off-by: Malte Schaaf <malte.schaaf@frequenz.com>
@malteschaaf malteschaaf requested a review from a team as a code owner March 5, 2026 16:20
@malteschaaf malteschaaf requested review from daniel-zullo-frequenz and removed request for a team March 5, 2026 16:20
@github-actions github-actions bot added part:docs Affects the documentation part:tests Affects the unit, integration and performance (benchmarks) tests part:data-pipeline Affects the data pipeline labels Mar 5, 2026
@malteschaaf malteschaaf requested a review from llucax March 5, 2026 16:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

part:data-pipeline Affects the data pipeline part:docs Affects the documentation part:tests Affects the unit, integration and performance (benchmarks) tests

Projects

Status: To do

Development

Successfully merging this pull request may close these issues.

1 participant