Conversation
|
Some names have units at end, some in the middle... "mb" |
They should always be in the end, but I did not try to fix any of the existing settings. |
| ## Bounding pipeline's memory footprint `max_rss_mb` | ||
| <a id="max_rss"></a> | ||
|
|
||
| Memory not used for input/output buffers, Bloom filters, and caches is available for temporary storage |
There was a problem hiding this comment.
The page would be better written top-down, starting from this part. Basic usage should only need users to adjust max_rss, so let's start by describing that. Anything else would be advanced usage, where people can go into details.
There was a problem hiding this comment.
I've reorganize the docs this way.
| suspend_error: Result<(), SuspendError>, | ||
| pipeline_complete: bool, | ||
| transaction_info: TransactionInfo, | ||
| memory_pressure: MemoryPressure, |
There was a problem hiding this comment.
does the epoch need to be a separate field? could it be inside of MemoryPressure? (havent looked at the structure yet)
There was a problem hiding this comment.
They are only loosely related.
| F: MetricsFormatter, | ||
| { | ||
| metrics.process_metrics(labels); | ||
|
|
| ); | ||
| metrics.gauge( | ||
| "memory_pressure", | ||
| "Current memory pressure level in [0..3]: low=0, moderate=1, high=2, critical=3.", |
There was a problem hiding this comment.
i wonder what moderate and high do seems like a binary decision to me
There was a problem hiding this comment.
they engage different backpressure mechanisms. Moderate is designed to reduce memory without affecting performance much. high will slow things down at least until the memory pressure has gone down.
| { | ||
| warn!( | ||
| "RSS memory limit ('max_rss_mb') is not set, but a Kubernetes memory limit \ | ||
| ('resources.memory_mb_max' = {memory_mb_max} MB) is configured. \ |
There was a problem hiding this comment.
one wonders if there is even need for both
There was a problem hiding this comment.
there are several reasons to keep them separate (I'd love to address them somehow)
memory_mb_maxmay be set in values.yaml instead of in the pipeline config. Can we access that setting somehow?- The pipeline may get a pod with
memory_mb_min, not_max. Ideally we'd have a way to find out how much memory the pipeline actually has. I asked about that on slack and you guys told me it's probably impossible. - We also have rclone, which runs as a separate process and can use non-trivial amount of memory.
max_rss_mbmay need to account for that.
| Ok(CircuitConfig { | ||
| layout: layout | ||
| .unwrap_or_else(|| Layout::new_solo(pipeline_config.global.workers as usize)), | ||
| max_rss_bytes: max_rss_mb.map(|mb| mb * 1_000_000), |
There was a problem hiding this comment.
should be a constant 1_000_000?
There was a problem hiding this comment.
I don't expect the definition of 1MB to ever change. It's probably an overkill
There was a problem hiding this comment.
it's more about documenting what you're converting to in the code e.g. you write it as mb * ONE_MB_IN_BYTES it makes it easy to read/understand
| && let Some(memory_mb_max) = &pipeline_config.global.resources.memory_mb_max | ||
| { | ||
| warn!( | ||
| "RSS memory limit ('max_rss_mb') is not set, but a Kubernetes memory limit \ |
There was a problem hiding this comment.
print something if max_rss_mb is > than kubernetes limit?
There was a problem hiding this comment.
or if there is a significant difference between the to (>50%)?
There was a problem hiding this comment.
that's a good idea, there's no good reason for that config
| } | ||
| } | ||
|
|
||
| pub fn pick_insert_destination<B>(batch: &B) -> BatchLocation |
There was a problem hiding this comment.
should this be implemented in trait Batch?
| /// In this case, we will initiate a new merge regardless of the number of loose batches. | ||
| fn must_relieve_memory_pressure(&self) -> bool { | ||
| if let Some(memory_pressure) = Runtime::memory_pressure() { | ||
| memory_pressure >= MemoryPressure::High |
There was a problem hiding this comment.
we use High here but not critical
There was a problem hiding this comment.
High means 10% memory left. At this point we will force all batches in spines to disk.
Critical means 5% memory is left. At this point we do the same plus also push all intermediate batches to disk.
| /// | ||
| /// See [documentation on the pipeline's memory usage](https://docs.feldera.com/operations/memory) | ||
| /// for more details. | ||
| pub max_rss_mb: Option<u64>, |
There was a problem hiding this comment.
if there is a chance we ever drop this (because on k8s we ultimately probably dont need it?) we may want ot move it in dev tweaks?
There was a problem hiding this comment.
Like I said I'd like to drop this, but today it's the most important knob to control memory pressure, so it shouldn't be in dev tweaks.
| Ok(CircuitConfig { | ||
| layout: layout | ||
| .unwrap_or_else(|| Layout::new_solo(pipeline_config.global.workers as usize)), | ||
| max_rss_bytes: max_rss_mb.map(|mb| mb * 1_000_000), |
There was a problem hiding this comment.
Unit mismatch: memory_mb_max is used elsewhere as MiB (e.g. adhoc.rs:44 multiplies by 1024 * 1024), but here it is multiplied by 1_000_000 (SI MB). These will disagree by ~5% at large values. Pick one and be consistent — MiB is what Kubernetes / Linux think of as "megabytes".
There was a problem hiding this comment.
adhoc.rs is wrong in this case. I don't think it's important the way it's used there, but I'll change it to 1_000_000 there.
|
Once both this PR and my PR for samply improvements merge, I'll write and merge a further PR that adds a memory timeline with pressure level markers too. |
| static MOCK_RSS_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(())); | ||
|
|
||
| fn set_mock_process_rss_bytes(bytes: u64) { | ||
| // Safety: tests serialize all updates to process env using `MOCK_RSS_LOCK`. | ||
| unsafe { | ||
| std::env::set_var("MOCK_PROCESS_RSS_BYTES", bytes.to_string()); | ||
| } | ||
| } | ||
|
|
||
| struct MockRssVarGuard; | ||
|
|
||
| impl Drop for MockRssVarGuard { | ||
| fn drop(&mut self) { | ||
| // Safety: tests serialize all updates to process env using `MOCK_RSS_LOCK`. | ||
| unsafe { | ||
| std::env::remove_var("MOCK_PROCESS_RSS_BYTES"); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
I think this locking reflects a misunderstanding. Changing the environment is not unsafe because of some flaw or omission or lack of synchronization in the Rust standard library. Indeed, if you look at the implementation of set_var in the Rust standard library, you will see that it uses a reader/writer lock to synchronize access. This lock just duplicates that one (so it should be removed).
The reason that changing the environment is unsafe is that other libraries that might be running in-process and might examine or change the environment will not also take the same lock that the Rust standard library uses. For example, if librdkafka examines or modifies the environment at the same time as this code (or other Rust code), then that could easily cause a segfault. There is no way to prevent that, short of making both libraries use the same synchronization.
Therefore, I would remove this lock. It does not help and it gives false hope. And if there is some non-environment way to implement this, then that's a good idea (maybe this can be #[cfg(test)]?)
There was a problem hiding this comment.
Maybe it's just the safety comment that's a misunderstanding. It looks like we really use the lock to prevent two tests from setting the mock RSS at once, although currently only one does that.
I would change the safety comment to say something like
// Safety: We assume that only Rust code accesses the environment.
There was a problem hiding this comment.
yes, this is meant to serialize multiple tests using this mechanism. I will improve the comments.
|
We could also shrink the buffer cache if we're getting a lot of memory pressure. |
Introduce a new configuration setting that limits the amount of memory that the pipeline is allowed to use. Setting this property activates memory pressure monitoring and backpressure mechanisms. The pipeline will track the amount of remaining memory and report the memory pressure level via the `memory_pressure` metric as low/moderate/high/critical. As the memory pressure increases, the system will apply increasing backpressure to push in-memory batches to to storage, preventing the pipeline from running out of memory at the cost of some performance degradation: - Moderate backpressure - mergers write all merged batches to storate regardless of their size - High backpressure: all batches added to spines are instantly written to storage by the foreground thread; in addition any remaining in-memory batches inside spines are forced to storage. - Critical: additionally, all transient batches created during a step are written to storage. The new configuration does not affect Bloom filters, input/output queues, and caches; it only manages the remaining memory not used by these other users. The new option is called `max_rss_mb`, since it is related to `resources.memory_mb_min`/`memory_mb_max` (in fact, we use `memory_mb_max` as the default it `max_rss_mb` is not specified). This is unfortunate, and we should be using strings written using human-readable units instead (e.g., "256GiB"). We'll have to make that transition at some point, but it requires care to deal with various forward/backward compatibility issues. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
- Document the new `max_rss_mb` feature. - Remove redundant memory troubleshooting docs in guide.md. Point to memory.md instead. - Add a summary table to memory.md. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Move the section on max_rss_mb to the top, since this is the higher order bit most users will care about. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
adhoc.rs interpreted memory_mb_max as 2^20 instead of 1M. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
Implements: #5883
Introduce a new configuration setting that limits the amount of memory that the pipeline
is allowed to use.
Setting this property activates memory pressure monitoring and backpressure
mechanisms. The pipeline will track the amount of remaining memory and
report the memory pressure level via the
memory_pressuremetric as low/moderate/high/critical.As the memory pressure increases, the system will apply increasing backpressure
to push in-memory batches to to storage, preventing the pipeline from running
out of memory at the cost of some performance degradation:
foreground thread; in addition any remaining in-memory batches inside spines are forced to
storage.
The new configuration does not affect Bloom filters, input/output queues, and caches; it only
manages the remaining memory not used by these other users.
The new option is called
max_rss_mb, since it is related toresources.memory_mb_min/memory_mb_max(in fact, we usememory_mb_maxas the default itmax_rss_mbis not specified). This is unfortunate, and we should be using stringswritten using human-readable units instead (e.g., "256GiB"). We'll have to make that transition
at some point, but it requires care to deal with various forward/backward compatibility issues.
Describe Manual Test Plan
Tested on some customer pipelines.
Checklist
Breaking Changes?
Mark if you think the answer is yes for any of these components:
Describe Incompatible Changes