[dbsp] Decouple foreground and background workers.#5840
Conversation
Make the benchmark more realistic by running it with the storage backend enabled. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
|
What happens to profile information for background threads? How is that associated with workers? |
We don't have profiles for background threads. Merger-related metrics show up as part of Z-1 and Accumulator operators (which contain spines inside). These metrics don't change in any way with this PR. |
|
How about the cache measurements? These are assigned to foreground and background. |
|
Does this fix #2117? |
That still holds as well |
I think it does, although it doesn't yet eliminate all the special cases in the code. |
mihaibudiu
left a comment
There was a problem hiding this comment.
I am approving, but this is not a definitive review.
| thread_local! { | ||
| // Reference to the `Runtime` that manages this worker thread or `None` | ||
| // if the current thread is not running in a multithreaded runtime. | ||
| /// Reference to the `Runtime` that manages this worker thread or `None` |
There was a problem hiding this comment.
is this still possible?
There was a problem hiding this comment.
yes, there are for example connector threads that run outside a runtime.
gz
left a comment
There was a problem hiding this comment.
I expected it to be more complicated, I guess the fact that it isn't is a positive note for our code
|
|
||
| /// The number of merger threads. | ||
| /// | ||
| /// The default is twice the number of worker threads. |
There was a problem hiding this comment.
Changing the default has potential for breaking things, we should ultimately increase it but maybe we start with the same default and roll it out gradually?
There was a problem hiding this comment.
I agree it's a breaking change. What would a gradual rollout look like though?
There was a problem hiding this comment.
Increase for a few users where we know it will matter, afterwards change default when deemed fine?
maybe too cautious but then if it saves us some hours of trouble, maybe worth it
| set_current_thread_type(ThreadType::Background); | ||
| RUNTIME.with(|rt| *rt.borrow_mut() = Some(runtime_clone.clone())); | ||
| }) | ||
| .thread_stack_size(6 * 1024 * 1024) |
There was a problem hiding this comment.
is this a constant we use somewhere else, should it be const ...
There was a problem hiding this comment.
I copied this from our other tokio runtimes. Not sure how we chose this size. I think there were some case when the default was too small.
crates/dbsp/src/circuit/runtime.rs
Outdated
| if let Some(rt) = Runtime::runtime() { | ||
| match current_thread_type() { | ||
| None => { | ||
| let buffer_cache = NO_RUNTIME_CACHE.clone(); |
There was a problem hiding this comment.
this thing is a bit of a ticking time bomb, I did the same for slabs.. we should just get rid of this No-runtime code ideally... lets file an issue
There was a problem hiding this comment.
With this PR, we no longer have threads that use spines outside the runtime. But this cache is also used by output connector threads, which run inside the runtime, but are neither fg nor bg threads, to buffer updates. It's not clear how to pick cache size or the number of caches for this purpose.
There was a problem hiding this comment.
how are the output connector threads using the buffer cache :O? can we add a comment about it somewhere next to NO_RUNTIME_CACHE
There was a problem hiding this comment.
how are the output connector threads using the buffer cache :O?
some of them maintain output buffers, which are implemented as spines.
can we add a comment about it somewhere next to NO_RUNTIME_CACHE
ok!
d580f68 to
4f46b3c
Compare
blp
left a comment
There was a problem hiding this comment.
I thought switching to Tokio for this would be awful but it's not too bad.
Using Tokio will make it easier if we decide we want to do async I/O later.
| StorageOptions { | ||
| min_storage_bytes: None, | ||
| min_step_storage_bytes: None, | ||
| ..StorageOptions::default() | ||
| }, |
There was a problem hiding this comment.
I think these are all the defaults, so I'd just write StorageOptions::default().
There was a problem hiding this comment.
min_storage_bytes: None
I don't think this one is the default.
There was a problem hiding this comment.
Really? The default for StorageOptions comes from #[derive(Default)] and we also use #[serde(default)] on the struct, so regardless of where the StorageOptions is getting defaulted, any Option field is going to be None.
(But it's not important.)
| // Map CPU IDs to core IDs for foreground and background workers. | ||
| // | ||
| // Returns a pair of vectors of core IDs, one for foreground workers and one for background workers. |
There was a problem hiding this comment.
| // Map CPU IDs to core IDs for foreground and background workers. | |
| // | |
| // Returns a pair of vectors of core IDs, one for foreground workers and one for background workers. | |
| /// Map CPU IDs to core IDs for foreground and background workers. | |
| /// | |
| /// Returns a pair of vectors of core IDs, one for foreground workers and one for background workers. |
To date, we assigned a single background merger thread to each foreground worker. This may not be sufficient for some workloads or some phases workloads, leading to merge backpressure and slow key lookups due to a large number of unmerged batches. This commit decouples foreground and background worker counts, making it possible to spread merging across any number of background worker threads (we sethe bg/fg worker ratio to 2 by default). Design: - We associate a separate tokio runtime dedicated to merging with a DBSP runtime. - For every spine created by a foreground thread, we start a separate tokio task per spine level. The task runs in an infinite loop making progress with merges at its level. As before we use fuel to control the amount of CPU time at each level. The task yields the CPU after using up its fuel (at this point tokio should put it at the end of the scheduling queue and schedule other tasks). The task blocks when there is no outstanding merge at its level. - As a side effect of the new design, we can no longer run useful circuits without a DBSP runtime. All tests and tutorials that used `RootCircuit::build` have been upgraded to using `Runtime::init_circuit(1,...)`. - For backward compatibility, we create a tokio runtime with the number of threads equal to the number of worker threads by default. We will increase the ration to two after initial user testing. The number of workers is configurable via a new dev tweak. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
To date, we assigned a single background merger thread to each
foreground worker. This may not be sufficient for some workloads
or some phases workloads, leading to merge backpressure and slow
key lookups due to a large number of unmerged batches.
This commit decouples foreground and background worker counts,
making it possible to spread merging across any number of background
worker threads (we set the bg/fg worker ratio to 2 by default).
Design:
a DBSP runtime.
separate tokio task per spine level. The task runs in an infinite
loop making progress with merges at its level. As
before we use fuel to control the amount of CPU time at each level.
The task yields the CPU after using up its fuel (at this point tokio
should put it at the end of the scheduling queue and schedule other
tasks). The task blocks when there is no outstanding merge at
its level.
circuits without a DBSP runtime. All tests and tutorials that used
RootCircuit::buildhave been upgraded to usingRuntime::init_circuit(1,...).the number of threads equal to the number of worker threads by default.
We will increase the ration to two after initial user testing.
The number of workers is configurable via a new dev tweak.
Describe Manual Test Plan
Observed improved performance on ingest-heavy customer workloads.
Checklist
Added a test for background merge panic detection. Other existing tests should cover the new functionality.
Breaking Changes?
Mark if you think the answer is yes for any of these components:
Describe Incompatible Changes