Skip to content

[dbsp] Increase async merger slot0 size to 15,000.#5824

Merged
ryzhyk merged 3 commits intomainfrom
increase-level0-slot
Mar 14, 2026
Merged

[dbsp] Increase async merger slot0 size to 15,000.#5824
ryzhyk merged 3 commits intomainfrom
increase-level0-slot

Conversation

@ryzhyk
Copy link
Contributor

@ryzhyk ryzhyk commented Mar 13, 2026

Today pipelines are tuned to generate batches of size 10K via two (configurable)
settings:

  • DEFAULT_MAX_WORKER_BATCH_SIZE=10000 - controls the number of records ingested by each worker
    from an output connector per step.
  • SPLITTER_OUTPUT_CHUNK_SIZE=10000 - controls how operators like joins and aggregates chunk
    their outputs.

Coincidentally, 10,000 is also the upper bound on the size of batches in level 0 for the async merger.
This is an unfortunate coincidence, which makes it common for most newly added batches to have
size >=10K and land in level 1 instead of 0. This is a problem because we use the ingest rate at level 0
as an estimate of the overall ingest rate of the spine. If ingest happens at levels >0, our fuel
computation is off, potentially leading to merge backpressure and slowness in some of the spines,
while others are overoptimized.

This is just a temporary fix that increases level 0 size threshold to 15,000, making sure
that most new batches fit in level 0, and also makes this threshold configurable via dev
tweaks.

This won't work in several cases:

  • Skewed distributions where one of the workers consistently creates large batches.
  • Operators like FlatMap that can create large output batches from small input batches.
  • Large inputs if the user configures custom max_worket_batch_size per connector.

We'll need a better solution for those cases, taking into account actual ingest rate across all levels
in fueld computation.

Until we have that, this commit should improve things in most cases.

Describe Manual Test Plan

Will run merge queue tasks and test the runtime on some of our workloads.

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Breaking Changes?

Mark if you think the answer is yes for any of these components:

ryzhyk added 3 commits March 13, 2026 11:52
Today pipelines are tuned to generate batches of size 10K via two (configurable)
settings:

- DEFAULT_MAX_WORKER_BATCH_SIZE=10000 - controls the number of records ingested by each worker
  from an output connector per step.
- SPLITTER_OUTPUT_CHUNK_SIZE=10000 - controls how operators like joins and aggregates chunk
  their outputs.

Coincidentally, 10000 is also the upper bound on the size of batches in level 0 for the async merger.
This is an unfortunate coincidence, which makes it common for most newly added batches to have
size >=10K and land in level 1 instead of 0. This is a problem because we use the ingest rate at level 0
as an estimate of the overall ingest rate of the spine. If ingest happens at levels >0, our fuel
computation is off, potentially leading to merge backpressure and slowness in some of the spines,
while others are overoptimized.

This is just a temporary fix that increases level 0 size threshold to 15,000, making sure
that most new batches fit in level 0, and also makes this threshold configurable via dev
tweaks.

This won't work in several cases:
- Skewed distributions where one of the workers will consistently create large batches.
- Operators like FlatMap that can create large output batches from small input batches.
- Large inputs if the user configures custome max_worket_batch_size per connector.

We'll need a better solution for those cases, taking into account actual ingest rate across all levels
in fueld computation.

Until we have that, this commit should improve things in most cases.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Add missing region annotations for lag/lead and topk operators.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
@gz
Copy link
Contributor

gz commented Mar 13, 2026

can we add an static-assert or warn so when this is misconfigured we catch it again in the future

Copy link
Member

@blp blp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great find.

@blp
Copy link
Member

blp commented Mar 13, 2026

can we add an static-assert or warn so when this is misconfigured we catch it again in the future

This PR adds some assertions, are there others that are missing?

@gz
Copy link
Contributor

gz commented Mar 13, 2026

This PR adds some assertions, are there others that are missing?

It seems there is an implicit relationship between

pub(crate) const MAX_LEVEL0_BATCH_SIZE_RECORDS: u16 = 14_999;

and something else that chunks at 10k for input batches during transaction, I don't see if we encode anwhere that the latter should be smaller than the former and issue a warning / abort if not.. maybe better would be to just make

MAX_LEVEL0_BATCH_SIZE_RECORDS = max(14999, transaction-chunk-size)

@blp
Copy link
Member

blp commented Mar 13, 2026

Good point.

Ultimately (#5825) I think that the value should be self-adjusting.

Copy link
Collaborator

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

{
/// Given a batch size figure out which level it should reside in.
fn size_to_level(len: usize) -> usize {
fn size_to_level(len: usize, max_level0_batch_size_records: usize) -> usize {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking suggestion: a small unit test for size_to_level at the boundary (e.g. assert_eq!(size_to_level(14_999, 14_999), 0), assert_eq!(size_to_level(15_000, 14_999), 1)) would lock in the intent for the next person who adjusts this threshold. Easy to add in a #[cfg(test)] block in this file.

@ryzhyk
Copy link
Contributor Author

ryzhyk commented Mar 13, 2026

Good point.

Ultimately (#5825) I think that the value should be self-adjusting.

Yes, we need a proper fix, none of these static checks will be enough.

@ryzhyk ryzhyk added this pull request to the merge queue Mar 14, 2026
Merged via the queue into main with commit ffc2f08 Mar 14, 2026
33 checks passed
@ryzhyk ryzhyk deleted the increase-level0-slot branch March 14, 2026 19:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

DBSP core Related to the core DBSP library performance

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants