Conversation
af08538 to
10d3a0b
Compare
Signed-off-by: Ben Pfaff <blp@feldera.com>
There's no need to collect data and then immediate convert it back to an iterator in these cases. Signed-off-by: Ben Pfaff <blp@feldera.com>
No behavioral change, just simpler code. Signed-off-by: Ben Pfaff <blp@feldera.com>
Until now, we've used a very low-level mechanism based on mmap to indicate marker ranges to the samply profiler. This works but it is expensive (one system call per marker range) and inflexible (the ranges are all attribute to a single thread and there is no means to categorize or annotate them). This commit changes all that. It drops the former mechanism in favor of postprocessing samply's output. We can add as many marker ranges as we want, with categories and annotations and attribution to whatever thread we like. We can edit the profile metadata to indicate where it came from. And we can automatically enable recording so that it happens only when samply is running. There is the opportunity for further extension in the future. The new markers are cheap enough that this commit adds them for every operator evaluation, as well as for other operations such as checkpoint commits and transactions. Signed-off-by: Ben Pfaff <blp@feldera.com>
Until now, we've used `tarpc`, a Rust crate for RPC, to exchange data among hosts. This crate serializes everything we pass into it, but we've already serialized it, and this wastes CPU time and memory. `tarpc` also doesn't guarantee alignment of the data at receive time, but rkyv needs data to be aligned on 16-byte boundaries. We actually don't do that carefully, and we're just lucky that the Rust allocator (or at least the one we use) happens to do that for `Vec<u8>` anyhow. This commit replaces `tarpc` by a simple protocol that just writes the data we need to exchange to a socket with a short fixed header in front of it. We also double-copy a lot of the data we send in another way: we first partition it to a batch, and then we serialize the batch. This commit fixes that problem too. This reduced commit time for my test case from 48 seconds to 23 seconds. Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
When scanning a table produces multiple batches, it's important to feed all of the batches into a single `StreamWriter`. Until now, the code here used a separate `StreamWriter` for each batch, but that didn't work properly. This wasn't noticed until now because the tests only produced small outputs that had only one batch. Signed-off-by: Ben Pfaff <blp@feldera.com>
mythical-fred
left a comment
There was a problem hiding this comment.
Good work. Clean protocol, good commit messages, meaningful perf improvement. One hard blocker: the two new unsafe blocks need // SAFETY: comments.
| let padded_len = len.next_multiple_of(16); | ||
| let mut payload = AlignedVec::with_capacity(padded_len); | ||
| let pointer = payload.as_mut_ptr() as *mut MaybeUninit<u8>; | ||
| let mut slice = unsafe { std::slice::from_raw_parts_mut(pointer, padded_len) }; |
There was a problem hiding this comment.
Missing // SAFETY: comment. Please explain: (1) why from_raw_parts_mut on MaybeUninit<u8> is valid here (capacity is padded_len, pointer is the allocation start), and (2) why passing &mut &[MaybeUninit<u8>] to read_buf is sound (the BufMut impl for &mut &[u8] advances the slice as bytes are written, which relies on the bytes being initialized after the read). Also covers the set_len(len) below — needs the same comment explaining that all len bytes are initialized by read_buf.
| /// span is dropped, it is automatically recorded. | ||
| /// | ||
| /// [SamplySpan] does nothing when markers are not being captured. A span | ||
| /// will be captured only in a profile only if markers were being captured |
| /// profile. | ||
| /// | ||
| /// `product` and `os_cpu` can optionally override the values in the | ||
| /// profile. The Firefox Profilers shows these for identification purposes |
There was a problem hiding this comment.
typo: Profilers -> Profiler
| } | ||
|
|
||
| thread_local! { | ||
| static QUEUE: Arc<SegQueue<Marker>> = new_queue(); |
There was a problem hiding this comment.
I guess this is flushed at the end of every samply run. Is there any risk of it getting too large and using too much (or unbounded) memory?
There was a problem hiding this comment.
There's a risk, if we emit markers very fast or if a profile goes on for a very long time. We could limit its size. I'll figure it out.
| trait ExchangeService { | ||
| #[binrw::binrw] | ||
| #[brw(little)] | ||
| struct ExchangeHeader { |
There was a problem hiding this comment.
Can you document these fields?
I tested the correctness of these changes by running a pipeline in single- and multi-host setups and then seeing that, afterward, ad-hoc queries produced the same results.
This improved commit performance for a particular test transaction commit on multihost from 48 to 23 seconds.
Please see individual commit messages for details.