Implement EventResampler for Cascaded Resampling#1372
Open
malteschaaf wants to merge 5 commits intofrequenz-floss:v1.x.xfrom
Open
Implement EventResampler for Cascaded Resampling#1372malteschaaf wants to merge 5 commits intofrequenz-floss:v1.x.xfrom
malteschaaf wants to merge 5 commits intofrequenz-floss:v1.x.xfrom
Conversation
Extract window emission logic from `resample()` into dedicated `_emit_window()` method to allow code sharing between Timer-based and Event-driven resampler implementations. Signed-off-by: Malte Schaaf <malte.schaaf@frequenz.com>
Add ability for `StreamingHelper` to notify external consumers when samples arrive via a callback function. This enables event-driven resampler implementations to receive samples without polling internal buffers. Changes: - Added `_sample_callback` attribute to store the callback function - Added `register_sample_callback()` method to register an async callback - Modified `_receive_samples()` to invoke the callback when a sample is added to the buffer This mechanism is used by `EventResampler` to implement event-driven sample processing instead of timer-based polling. Signed-off-by: Malte Schaaf <malte.schaaf@frequenz.com>
Add `EventResampler` class that uses event-driven window management instead of timer-based intervals. This solves data loss issues when cascading resamplers. Problem: When cascading Timer-based resamplers (e.g., 1s → 10s) with `align_to=UNIX_EPOCH`, samples can be lost at window boundaries due to timing synchronization. Solution: EventResampler opens/closes windows based on sample arrival timestamps instead of fixed intervals, ensuring no data loss at boundaries. Changes: - Added `EventResampler` class that inherits from `Resampler` - Windows are emitted when a sample arrives with `timestamp >= window_end` - Maintains window alignment through simple addition of `resampling_period` - Uses sample callback mechanism from `StreamingHelper` for event-driven processing `EventResampler` is optimized for cascaded resampling and should not be used directly with raw, irregular data. Signed-off-by: Malte Schaaf <malte.schaaf@frequenz.com>
Add test suite for `EventResampler` covering window initialization, boundary conditions, and alignment behavior. Tests are parametrized to verify correct behavior with and without `align_to` configuration. Changes: - Added tests for `EventResampler` initialization and window end calculation - Added tests for sample processing before, at, and after window boundaries - Added tests for correct behavior when samples cross multiple windows - Added tests verifying window alignment is maintained through simple addition - Added key test demonstrating no data loss at window boundaries Tests use parametrized fixtures to cover both aligned and non-aligned window scenarios, ensuring the event-driven window management works correctly in all cases. Signed-off-by: Malte Schaaf <malte.schaaf@frequenz.com>
Signed-off-by: Malte Schaaf <malte.schaaf@frequenz.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
When cascading Timer-based resamplers (e.g., 1s → 10s) with
align_to=UNIX_EPOCH, samples can be lost at window boundaries due to timing synchronization issues.Example:
This occurs because both resamplers use independent timers aligned to the same epoch, but their tick times are not synchronized.
Solution
EventResampleruses event-driven window management instead of fixed timer intervals. Windows are emitted when a sample arrives with a timestamp >= current window_end, not on a timer schedule.Benefits:
Implementation
Core Components
EventResampler class: Event-driven resampler inheriting from
Resamplerresample()with event-driven loop_process_sample()for window state management_calculate_window_end()(simplified for event-driven scenario)Sample callback mechanism: Added to
StreamingHelperregister_sample_callback()methodRefactored
_emit_window(): Extracted fromResampler.resample()Design Decision: asyncio.Queue + Callbacks
After evaluating multiple approaches (polling, direct channels, event signals, task-based multiplexing), I chose
asyncio.Queuewith callbacks because:await queue.get()), not polling.Queue[Sample[Quantity]]ensures type correctness.asyncio.Queueis designed specifically for this use case.Changes Made
StreamingHelper
_sample_callbackattribute to store callback functionregister_sample_callback()method to register async callback_receive_samples()to invoke callback when sample arrivesResampler
_emit_window()method fromresample()resample()to delegate window emission to_emit_window()EventResampler (New)
_calculate_window_end()(no warm-up period needed)Tests
Open Question: Task Cancellation in
remove_timeseries()Observation: When
remove_timeseries()is called, the_receiving_taskcontinues running in the background. It will:Current behavior:
Impact:
Question for reviewers:
Should
remove_timeseries()cancel the receiving task for bothTimerResamplerandEventResampler? If yes, we need to either:remove_timeseries()async and callawait helper.stop()cancel_receiving()method to_StreamingHelperThis should be addressed as a separate issue for both resamplers, not just EventResampler.
Backward Compatibility
ResamplerAPI unchangedStreamingHelperadditions are backward compatible (callback defaults to None)EventResampleris opt-in (users must explicitly use it)Usage
Testing
align_to