Skip to content

Speed up multihost exchange#5819

Open
blp wants to merge 7 commits intomainfrom
speed-up-serde
Open

Speed up multihost exchange#5819
blp wants to merge 7 commits intomainfrom
speed-up-serde

Conversation

@blp
Copy link
Member

@blp blp commented Mar 13, 2026

I tested the correctness of these changes by running a pipeline in single- and multi-host setups and then seeing that, afterward, ad-hoc queries produced the same results.

This improved commit performance for a particular test transaction commit on multihost from 48 to 23 seconds.

Please see individual commit messages for details.

@blp blp force-pushed the speed-up-serde branch 5 times, most recently from af08538 to 10d3a0b Compare March 20, 2026 18:04
blp added 4 commits March 20, 2026 11:04
Signed-off-by: Ben Pfaff <blp@feldera.com>
There's no need to collect data and then immediate convert it back to
an iterator in these cases.

Signed-off-by: Ben Pfaff <blp@feldera.com>
No behavioral change, just simpler code.

Signed-off-by: Ben Pfaff <blp@feldera.com>
Until now, we've used a very low-level mechanism based on mmap to
indicate marker ranges to the samply profiler.  This works but it is
expensive (one system call per marker range) and inflexible (the ranges
are all attribute to a single thread and there is no means to categorize
or annotate them).

This commit changes all that.  It drops the former mechanism in favor of
postprocessing samply's output.  We can add as many marker ranges as we
want, with categories and annotations and attribution to whatever thread
we like.  We can edit the profile metadata to indicate where it came
from.  And we can automatically enable recording so that it happens only
when samply is running.  There is the opportunity for further extension
in the future.

The new markers are cheap enough that this commit adds them for every
operator evaluation, as well as for other operations such as checkpoint
commits and transactions.

Signed-off-by: Ben Pfaff <blp@feldera.com>
@blp blp force-pushed the speed-up-serde branch from 10d3a0b to 4d82071 Compare March 20, 2026 18:04
blp added 3 commits March 20, 2026 17:09
Until now, we've used `tarpc`, a Rust crate for RPC, to exchange data
among hosts.  This crate serializes everything we pass into it, but we've
already serialized it, and this wastes CPU time and memory.

`tarpc` also doesn't guarantee alignment of the data at receive time, but
rkyv needs data to be aligned on 16-byte boundaries.  We actually don't
do that carefully, and we're just lucky that the Rust allocator (or at
least the one we use) happens to do that for `Vec<u8>` anyhow.

This commit replaces `tarpc` by a simple protocol that just writes the
data we need to exchange to a socket with a short fixed header in front of
it.

We also double-copy a lot of the data we send in another way: we
first partition it to a batch, and then we serialize the batch.  This
commit fixes that problem too.

This reduced commit time for my test case from 48 seconds to 23 seconds.

Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
When scanning a table produces multiple batches, it's important to feed
all of the batches into a single `StreamWriter`.  Until now, the code here
used a separate `StreamWriter` for each batch, but that didn't work
properly.

This wasn't noticed until now because the tests only produced small
outputs that had only one batch.

Signed-off-by: Ben Pfaff <blp@feldera.com>
@blp blp force-pushed the speed-up-serde branch from e5a80a3 to 93a6046 Compare March 21, 2026 00:09
@blp blp changed the title Speed up serde Speed up multihost exchange Mar 21, 2026
@blp blp requested a review from ryzhyk March 21, 2026 00:22
@blp blp self-assigned this Mar 21, 2026
@blp blp added bug Something isn't working DBSP core Related to the core DBSP library ft Fault tolerant, distributed, and scale-out implementation performance connectors Issues related to the adapters/connectors crate storage Persistence for internal state in DBSP operators adhoc Issue related to ad hoc query processing rust Pull requests that update Rust code enterprise Issue related to Feldera Enterprise features. metrics Metrics about feldera pipelines multihost Related to multihost or distributed pipelines labels Mar 21, 2026
@blp blp marked this pull request as ready for review March 21, 2026 00:23
Copy link

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work. Clean protocol, good commit messages, meaningful perf improvement. One hard blocker: the two new unsafe blocks need // SAFETY: comments.

let padded_len = len.next_multiple_of(16);
let mut payload = AlignedVec::with_capacity(padded_len);
let pointer = payload.as_mut_ptr() as *mut MaybeUninit<u8>;
let mut slice = unsafe { std::slice::from_raw_parts_mut(pointer, padded_len) };

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing // SAFETY: comment. Please explain: (1) why from_raw_parts_mut on MaybeUninit<u8> is valid here (capacity is padded_len, pointer is the allocation start), and (2) why passing &mut &[MaybeUninit<u8>] to read_buf is sound (the BufMut impl for &mut &[u8] advances the slice as bytes are written, which relies on the bytes being initialized after the read). Also covers the set_len(len) below — needs the same comment explaining that all len bytes are initialized by read_buf.

/// span is dropped, it is automatically recorded.
///
/// [SamplySpan] does nothing when markers are not being captured. A span
/// will be captured only in a profile only if markers were being captured
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

/// profile.
///
/// `product` and `os_cpu` can optionally override the values in the
/// profile. The Firefox Profilers shows these for identification purposes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: Profilers -> Profiler

}

thread_local! {
static QUEUE: Arc<SegQueue<Marker>> = new_queue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is flushed at the end of every samply run. Is there any risk of it getting too large and using too much (or unbounded) memory?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a risk, if we emit markers very fast or if a profile goes on for a very long time. We could limit its size. I'll figure it out.

trait ExchangeService {
#[binrw::binrw]
#[brw(little)]
struct ExchangeHeader {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you document these fields?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

adhoc Issue related to ad hoc query processing bug Something isn't working connectors Issues related to the adapters/connectors crate DBSP core Related to the core DBSP library enterprise Issue related to Feldera Enterprise features. ft Fault tolerant, distributed, and scale-out implementation metrics Metrics about feldera pipelines multihost Related to multihost or distributed pipelines performance rust Pull requests that update Rust code storage Persistence for internal state in DBSP operators

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants