Skip to content

[dbsp] Decouple foreground and background workers.#5840

Merged
ryzhyk merged 2 commits intomainfrom
async_merger
Mar 17, 2026
Merged

[dbsp] Decouple foreground and background workers.#5840
ryzhyk merged 2 commits intomainfrom
async_merger

Conversation

@ryzhyk
Copy link
Contributor

@ryzhyk ryzhyk commented Mar 17, 2026

To date, we assigned a single background merger thread to each
foreground worker. This may not be sufficient for some workloads
or some phases workloads, leading to merge backpressure and slow
key lookups due to a large number of unmerged batches.

This commit decouples foreground and background worker counts,
making it possible to spread merging across any number of background
worker threads (we set the bg/fg worker ratio to 2 by default).

Design:

  • We associate a separate tokio runtime dedicated to merging with
    a DBSP runtime.
  • For every spine created by a foreground thread, we start a
    separate tokio task per spine level. The task runs in an infinite
    loop making progress with merges at its level. As
    before we use fuel to control the amount of CPU time at each level.
    The task yields the CPU after using up its fuel (at this point tokio
    should put it at the end of the scheduling queue and schedule other
    tasks). The task blocks when there is no outstanding merge at
    its level.
  • As a side effect of the new design, we can no longer run useful
    circuits without a DBSP runtime. All tests and tutorials that used
    RootCircuit::build have been upgraded to using
    Runtime::init_circuit(1,...).
  • For backward compatibility, we create a tokio runtime with
    the number of threads equal to the number of worker threads by default.
    We will increase the ration to two after initial user testing.
    The number of workers is configurable via a new dev tweak.

Describe Manual Test Plan

Observed improved performance on ingest-heavy customer workloads.

Checklist

  • Unit tests added/updated

Added a test for background merge panic detection. Other existing tests should cover the new functionality.

  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Breaking Changes?

Mark if you think the answer is yes for any of these components:

Describe Incompatible Changes

Make the benchmark more realistic by running it with the storage backend
enabled.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
@ryzhyk ryzhyk requested review from blp and gz March 17, 2026 00:42
@ryzhyk ryzhyk added DBSP core Related to the core DBSP library performance labels Mar 17, 2026
@mihaibudiu
Copy link
Contributor

What happens to profile information for background threads? How is that associated with workers?

@ryzhyk
Copy link
Contributor Author

ryzhyk commented Mar 17, 2026

What happens to profile information for background threads? How is that associated with workers?

We don't have profiles for background threads. Merger-related metrics show up as part of Z-1 and Accumulator operators (which contain spines inside). These metrics don't change in any way with this PR.

@mihaibudiu
Copy link
Contributor

How about the cache measurements? These are assigned to foreground and background.

@mihaibudiu
Copy link
Contributor

Does this fix #2117?

@ryzhyk
Copy link
Contributor Author

ryzhyk commented Mar 17, 2026

How about the cache measurements? These are assigned to foreground and background.

That still holds as well

@ryzhyk
Copy link
Contributor Author

ryzhyk commented Mar 17, 2026

Does this fix #2117?

I think it does, although it doesn't yet eliminate all the special cases in the code.

Copy link
Contributor

@mihaibudiu mihaibudiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am approving, but this is not a definitive review.

thread_local! {
// Reference to the `Runtime` that manages this worker thread or `None`
// if the current thread is not running in a multithreaded runtime.
/// Reference to the `Runtime` that manages this worker thread or `None`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, there are for example connector threads that run outside a runtime.

Copy link
Contributor

@gz gz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expected it to be more complicated, I guess the fact that it isn't is a positive note for our code


/// The number of merger threads.
///
/// The default is twice the number of worker threads.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the default has potential for breaking things, we should ultimately increase it but maybe we start with the same default and roll it out gradually?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it's a breaking change. What would a gradual rollout look like though?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increase for a few users where we know it will matter, afterwards change default when deemed fine?

maybe too cautious but then if it saves us some hours of trouble, maybe worth it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, we can do that.

set_current_thread_type(ThreadType::Background);
RUNTIME.with(|rt| *rt.borrow_mut() = Some(runtime_clone.clone()));
})
.thread_stack_size(6 * 1024 * 1024)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a constant we use somewhere else, should it be const ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied this from our other tokio runtimes. Not sure how we chose this size. I think there were some case when the default was too small.

if let Some(rt) = Runtime::runtime() {
match current_thread_type() {
None => {
let buffer_cache = NO_RUNTIME_CACHE.clone();
Copy link
Contributor

@gz gz Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this thing is a bit of a ticking time bomb, I did the same for slabs.. we should just get rid of this No-runtime code ideally... lets file an issue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have one #2117

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this PR, we no longer have threads that use spines outside the runtime. But this cache is also used by output connector threads, which run inside the runtime, but are neither fg nor bg threads, to buffer updates. It's not clear how to pick cache size or the number of caches for this purpose.

Copy link
Contributor

@gz gz Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how are the output connector threads using the buffer cache :O? can we add a comment about it somewhere next to NO_RUNTIME_CACHE

Copy link
Contributor Author

@ryzhyk ryzhyk Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how are the output connector threads using the buffer cache :O?

some of them maintain output buffers, which are implemented as spines.

can we add a comment about it somewhere next to NO_RUNTIME_CACHE

ok!

@ryzhyk ryzhyk force-pushed the async_merger branch 3 times, most recently from d580f68 to 4f46b3c Compare March 17, 2026 07:12
Copy link
Member

@blp blp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought switching to Tokio for this would be awful but it's not too bad.

Using Tokio will make it easier if we decide we want to do async I/O later.

Comment on lines +48 to +52
StorageOptions {
min_storage_bytes: None,
min_step_storage_bytes: None,
..StorageOptions::default()
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these are all the defaults, so I'd just write StorageOptions::default().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

min_storage_bytes: None

I don't think this one is the default.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really? The default for StorageOptions comes from #[derive(Default)] and we also use #[serde(default)] on the struct, so regardless of where the StorageOptions is getting defaulted, any Option field is going to be None.

(But it's not important.)

Comment on lines +301 to +303
// Map CPU IDs to core IDs for foreground and background workers.
//
// Returns a pair of vectors of core IDs, one for foreground workers and one for background workers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Map CPU IDs to core IDs for foreground and background workers.
//
// Returns a pair of vectors of core IDs, one for foreground workers and one for background workers.
/// Map CPU IDs to core IDs for foreground and background workers.
///
/// Returns a pair of vectors of core IDs, one for foreground workers and one for background workers.

To date, we assigned a single background merger thread to each
foreground worker. This may not be sufficient for some workloads
or some phases workloads, leading to merge backpressure and slow
key lookups due to a large number of unmerged batches.

This commit decouples foreground and background worker counts,
making it possible to spread merging across any number of background
worker threads (we sethe bg/fg worker ratio to 2 by default).

Design:
- We associate a separate tokio runtime dedicated to merging with
  a DBSP runtime.
- For every spine created by a foreground thread, we start a
  separate tokio task per spine level. The task runs in an infinite
  loop making progress with merges at its level. As
  before we use fuel to control the amount of CPU time at each level.
  The task yields the CPU after using up its fuel (at this point tokio
  should put it at the end of the scheduling queue and schedule other
  tasks). The task blocks when there is no outstanding merge at
  its level.
- As a side effect of the new design, we can no longer run useful
  circuits without a DBSP runtime. All tests and tutorials that used
  `RootCircuit::build` have been upgraded to using
  `Runtime::init_circuit(1,...)`.
- For backward compatibility, we create a tokio runtime with
  the number of threads equal to the number of worker threads by default.
  We will increase the ration to two after initial user testing.
  The number of workers is configurable via a new dev tweak.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
@ryzhyk ryzhyk added this pull request to the merge queue Mar 17, 2026
Merged via the queue into main with commit e0a5624 Mar 17, 2026
30 of 36 checks passed
@ryzhyk ryzhyk deleted the async_merger branch March 17, 2026 23:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

DBSP core Related to the core DBSP library performance

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants