Skip to content

[dbsp] Memory pressure mechanism#5901

Merged
ryzhyk merged 5 commits intomainfrom
issue5883
Mar 24, 2026
Merged

[dbsp] Memory pressure mechanism#5901
ryzhyk merged 5 commits intomainfrom
issue5883

Conversation

@ryzhyk
Copy link
Contributor

@ryzhyk ryzhyk commented Mar 23, 2026

Implements: #5883

Introduce a new configuration setting that limits the amount of memory that the pipeline
is allowed to use.

Setting this property activates memory pressure monitoring and backpressure
mechanisms. The pipeline will track the amount of remaining memory and
report the memory pressure level via the memory_pressure metric as low/moderate/high/critical.

As the memory pressure increases, the system will apply increasing backpressure
to push in-memory batches to to storage, preventing the pipeline from running
out of memory at the cost of some performance degradation:

  • Moderate backpressure - mergers write all merged batches to storate regardless of their size
  • High backpressure: all batches added to spines are instantly written to storage by the
    foreground thread; in addition any remaining in-memory batches inside spines are forced to
    storage.
  • Critical: additionally, all transient batches created during a step are written to storage.

The new configuration does not affect Bloom filters, input/output queues, and caches; it only
manages the remaining memory not used by these other users.

The new option is called max_rss_mb, since it is related to
resources.memory_mb_min/memory_mb_max (in fact, we use memory_mb_max as the default it
max_rss_mb is not specified). This is unfortunate, and we should be using strings
written using human-readable units instead (e.g., "256GiB"). We'll have to make that transition
at some point, but it requires care to deal with various forward/backward compatibility issues.

Describe Manual Test Plan

Tested on some customer pipelines.

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Breaking Changes?

Mark if you think the answer is yes for any of these components:

Describe Incompatible Changes

@ryzhyk ryzhyk requested review from blp and gz March 23, 2026 18:58
@ryzhyk ryzhyk added the DBSP core Related to the core DBSP library label Mar 23, 2026
@mihaibudiu
Copy link
Contributor

mihaibudiu commented Mar 23, 2026

Some names have units at end, some in the middle... "mb"

@ryzhyk
Copy link
Contributor Author

ryzhyk commented Mar 23, 2026

Some names have units at end, some in the middle... "mb"

They should always be in the end, but I did not try to fix any of the existing settings.

## Bounding pipeline's memory footprint `max_rss_mb`
<a id="max_rss"></a>

Memory not used for input/output buffers, Bloom filters, and caches is available for temporary storage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The page would be better written top-down, starting from this part. Basic usage should only need users to adjust max_rss, so let's start by describing that. Anything else would be advanced usage, where people can go into details.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reorganize the docs this way.

suspend_error: Result<(), SuspendError>,
pipeline_complete: bool,
transaction_info: TransactionInfo,
memory_pressure: MemoryPressure,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the epoch need to be a separate field? could it be inside of MemoryPressure? (havent looked at the structure yet)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are only loosely related.

F: MetricsFormatter,
{
metrics.process_metrics(labels);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

);
metrics.gauge(
"memory_pressure",
"Current memory pressure level in [0..3]: low=0, moderate=1, high=2, critical=3.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder what moderate and high do seems like a binary decision to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they engage different backpressure mechanisms. Moderate is designed to reduce memory without affecting performance much. high will slow things down at least until the memory pressure has gone down.

{
warn!(
"RSS memory limit ('max_rss_mb') is not set, but a Kubernetes memory limit \
('resources.memory_mb_max' = {memory_mb_max} MB) is configured. \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one wonders if there is even need for both

Copy link
Contributor Author

@ryzhyk ryzhyk Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are several reasons to keep them separate (I'd love to address them somehow)

  • memory_mb_max may be set in values.yaml instead of in the pipeline config. Can we access that setting somehow?
  • The pipeline may get a pod withmemory_mb_min, not _max. Ideally we'd have a way to find out how much memory the pipeline actually has. I asked about that on slack and you guys told me it's probably impossible.
  • We also have rclone, which runs as a separate process and can use non-trivial amount of memory. max_rss_mb may need to account for that.

Ok(CircuitConfig {
layout: layout
.unwrap_or_else(|| Layout::new_solo(pipeline_config.global.workers as usize)),
max_rss_bytes: max_rss_mb.map(|mb| mb * 1_000_000),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be a constant 1_000_000?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't expect the definition of 1MB to ever change. It's probably an overkill

Copy link
Contributor

@gz gz Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's more about documenting what you're converting to in the code e.g. you write it as mb * ONE_MB_IN_BYTES it makes it easy to read/understand

&& let Some(memory_mb_max) = &pipeline_config.global.resources.memory_mb_max
{
warn!(
"RSS memory limit ('max_rss_mb') is not set, but a Kubernetes memory limit \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

print something if max_rss_mb is > than kubernetes limit?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or if there is a significant difference between the to (>50%)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a good idea, there's no good reason for that config

}
}

pub fn pick_insert_destination<B>(batch: &B) -> BatchLocation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be implemented in trait Batch?

/// In this case, we will initiate a new merge regardless of the number of loose batches.
fn must_relieve_memory_pressure(&self) -> bool {
if let Some(memory_pressure) = Runtime::memory_pressure() {
memory_pressure >= MemoryPressure::High
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use High here but not critical

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High means 10% memory left. At this point we will force all batches in spines to disk.
Critical means 5% memory is left. At this point we do the same plus also push all intermediate batches to disk.

///
/// See [documentation on the pipeline's memory usage](https://docs.feldera.com/operations/memory)
/// for more details.
pub max_rss_mb: Option<u64>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is a chance we ever drop this (because on k8s we ultimately probably dont need it?) we may want ot move it in dev tweaks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like I said I'd like to drop this, but today it's the most important knob to control memory pressure, so it shouldn't be in dev tweaks.

Ok(CircuitConfig {
layout: layout
.unwrap_or_else(|| Layout::new_solo(pipeline_config.global.workers as usize)),
max_rss_bytes: max_rss_mb.map(|mb| mb * 1_000_000),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit mismatch: memory_mb_max is used elsewhere as MiB (e.g. adhoc.rs:44 multiplies by 1024 * 1024), but here it is multiplied by 1_000_000 (SI MB). These will disagree by ~5% at large values. Pick one and be consistent — MiB is what Kubernetes / Linux think of as "megabytes".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adhoc.rs is wrong in this case. I don't think it's important the way it's used there, but I'll change it to 1_000_000 there.

@blp
Copy link
Member

blp commented Mar 23, 2026

Once both this PR and my PR for samply improvements merge, I'll write and merge a further PR that adds a memory timeline with pressure level markers too.

Comment on lines +1639 to +1657
static MOCK_RSS_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));

fn set_mock_process_rss_bytes(bytes: u64) {
// Safety: tests serialize all updates to process env using `MOCK_RSS_LOCK`.
unsafe {
std::env::set_var("MOCK_PROCESS_RSS_BYTES", bytes.to_string());
}
}

struct MockRssVarGuard;

impl Drop for MockRssVarGuard {
fn drop(&mut self) {
// Safety: tests serialize all updates to process env using `MOCK_RSS_LOCK`.
unsafe {
std::env::remove_var("MOCK_PROCESS_RSS_BYTES");
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this locking reflects a misunderstanding. Changing the environment is not unsafe because of some flaw or omission or lack of synchronization in the Rust standard library. Indeed, if you look at the implementation of set_var in the Rust standard library, you will see that it uses a reader/writer lock to synchronize access. This lock just duplicates that one (so it should be removed).

The reason that changing the environment is unsafe is that other libraries that might be running in-process and might examine or change the environment will not also take the same lock that the Rust standard library uses. For example, if librdkafka examines or modifies the environment at the same time as this code (or other Rust code), then that could easily cause a segfault. There is no way to prevent that, short of making both libraries use the same synchronization.

Therefore, I would remove this lock. It does not help and it gives false hope. And if there is some non-environment way to implement this, then that's a good idea (maybe this can be #[cfg(test)]?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's just the safety comment that's a misunderstanding. It looks like we really use the lock to prevent two tests from setting the mock RSS at once, although currently only one does that.

I would change the safety comment to say something like
// Safety: We assume that only Rust code accesses the environment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is meant to serialize multiple tests using this mechanism. I will improve the comments.

@blp
Copy link
Member

blp commented Mar 23, 2026

We could also shrink the buffer cache if we're getting a lot of memory pressure.

ryzhyk added 4 commits March 23, 2026 16:11
Introduce a new configuration setting that limits the amount of memory that the pipeline
is allowed to use.

Setting this property activates memory pressure monitoring and backpressure
mechanisms. The pipeline will track the amount of remaining memory and
report the memory pressure level via the `memory_pressure` metric as low/moderate/high/critical.

As the memory pressure increases, the system will apply increasing backpressure
to push in-memory batches to to storage, preventing the pipeline from running
out of memory at the cost of some performance degradation:
- Moderate backpressure - mergers write all merged batches to storate regardless of their size
- High backpressure: all batches added to spines are instantly written to storage by the
  foreground thread; in addition any remaining in-memory batches inside spines are forced to
  storage.
- Critical: additionally, all transient batches created during a step are written to storage.

The new configuration does not affect Bloom filters, input/output queues, and caches; it only
manages the remaining memory not used by these other users.

The new option is called `max_rss_mb`, since it is related to
`resources.memory_mb_min`/`memory_mb_max` (in fact, we use `memory_mb_max` as the default it
`max_rss_mb` is not specified). This is unfortunate, and we should be using strings
written using human-readable units instead (e.g., "256GiB").  We'll have to make that transition
at some point, but it requires care to deal with various forward/backward compatibility issues.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
- Document the new `max_rss_mb` feature.
- Remove redundant memory troubleshooting docs in guide.md.
  Point to memory.md instead.
- Add a summary table to memory.md.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Move the section on max_rss_mb to the top, since this is the higher
order bit most users will care about.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
adhoc.rs interpreted memory_mb_max as 2^20 instead of 1M.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
@ryzhyk ryzhyk added this pull request to the merge queue Mar 24, 2026
Merged via the queue into main with commit 5ec6c29 Mar 24, 2026
1 check passed
@ryzhyk ryzhyk deleted the issue5883 branch March 24, 2026 18:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

DBSP core Related to the core DBSP library

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants