[dbsp] Increase async merger slot0 size to 15,000.#5824
Conversation
Today pipelines are tuned to generate batches of size 10K via two (configurable) settings: - DEFAULT_MAX_WORKER_BATCH_SIZE=10000 - controls the number of records ingested by each worker from an output connector per step. - SPLITTER_OUTPUT_CHUNK_SIZE=10000 - controls how operators like joins and aggregates chunk their outputs. Coincidentally, 10000 is also the upper bound on the size of batches in level 0 for the async merger. This is an unfortunate coincidence, which makes it common for most newly added batches to have size >=10K and land in level 1 instead of 0. This is a problem because we use the ingest rate at level 0 as an estimate of the overall ingest rate of the spine. If ingest happens at levels >0, our fuel computation is off, potentially leading to merge backpressure and slowness in some of the spines, while others are overoptimized. This is just a temporary fix that increases level 0 size threshold to 15,000, making sure that most new batches fit in level 0, and also makes this threshold configurable via dev tweaks. This won't work in several cases: - Skewed distributions where one of the workers will consistently create large batches. - Operators like FlatMap that can create large output batches from small input batches. - Large inputs if the user configures custome max_worket_batch_size per connector. We'll need a better solution for those cases, taking into account actual ingest rate across all levels in fueld computation. Until we have that, this commit should improve things in most cases. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Add missing region annotations for lag/lead and topk operators. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
|
can we add an static-assert or warn so when this is misconfigured we catch it again in the future |
This PR adds some assertions, are there others that are missing? |
It seems there is an implicit relationship between pub(crate) const MAX_LEVEL0_BATCH_SIZE_RECORDS: u16 = 14_999; and something else that chunks at 10k for input batches during transaction, I don't see if we encode anwhere that the latter should be smaller than the former and issue a warning / abort if not.. maybe better would be to just make MAX_LEVEL0_BATCH_SIZE_RECORDS = max(14999, transaction-chunk-size) |
|
Good point. Ultimately (#5825) I think that the value should be self-adjusting. |
| { | ||
| /// Given a batch size figure out which level it should reside in. | ||
| fn size_to_level(len: usize) -> usize { | ||
| fn size_to_level(len: usize, max_level0_batch_size_records: usize) -> usize { |
There was a problem hiding this comment.
Non-blocking suggestion: a small unit test for size_to_level at the boundary (e.g. assert_eq!(size_to_level(14_999, 14_999), 0), assert_eq!(size_to_level(15_000, 14_999), 1)) would lock in the intent for the next person who adjusts this threshold. Easy to add in a #[cfg(test)] block in this file.
Yes, we need a proper fix, none of these static checks will be enough. |
Today pipelines are tuned to generate batches of size 10K via two (configurable)
settings:
from an output connector per step.
their outputs.
Coincidentally, 10,000 is also the upper bound on the size of batches in level 0 for the async merger.
This is an unfortunate coincidence, which makes it common for most newly added batches to have
size >=10K and land in level 1 instead of 0. This is a problem because we use the ingest rate at level 0
as an estimate of the overall ingest rate of the spine. If ingest happens at levels >0, our fuel
computation is off, potentially leading to merge backpressure and slowness in some of the spines,
while others are overoptimized.
This is just a temporary fix that increases level 0 size threshold to 15,000, making sure
that most new batches fit in level 0, and also makes this threshold configurable via dev
tweaks.
This won't work in several cases:
We'll need a better solution for those cases, taking into account actual ingest rate across all levels
in fueld computation.
Until we have that, this commit should improve things in most cases.
Describe Manual Test Plan
Will run merge queue tasks and test the runtime on some of our workloads.
Checklist
Breaking Changes?
Mark if you think the answer is yes for any of these components: