Conversation
This is a very cheap filter that can speed up ingest significantly. The idea is to track the min/max for every batch. For seek_key_exact if the value we're seeking is not in side of the range we skip the batch. Some ingest heavy benchmarks: just bloom filter: ╭────────────────────────┬───────────┬──────────┬───────╮ │ Metric │ Value │ Lower │ Upper │ ├────────────────────────┼───────────┼──────────┼───────┤ │ Throughput (records/s) │ 2695496 │ - │ - │ │ Memory │ 14.28 GiB │ 1.78 GiB │ - │ │ Storage │ 79.45 GiB │ 110 B │ - │ │ Uptime [ms] │ 302742 │ - │ - │ │ State Amplification │ 0.43 │ - │ - │ ╰────────────────────────┴───────────┴──────────┴───────╯ range+bloom filter: ╭────────────────────────┬────────────┬──────────┬───────╮ │ Metric │ Value │ Lower │ Upper │ ├────────────────────────┼────────────┼──────────┼───────┤ │ Throughput (records/s) │ 4035088 │ - │ - │ │ Memory │ 23.4 GiB │ 2.42 GiB │ - │ │ Storage │ 112.51 GiB │ 110 B │ - │ │ Uptime [ms] │ 303292 │ - │ - │ │ State Amplification │ 0.41 │ - │ - │ ╰────────────────────────┴────────────┴──────────┴───────╯ Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
| let mut max = key_factory.default_box(); | ||
|
|
||
| match root.read::<K, A>(&self.file)? { | ||
| TreeBlock::Data(data_block) => unsafe { |
There was a problem hiding this comment.
I think this should be a trait
There was a problem hiding this comment.
more importantly needs some eyes from @blp to tell me if this is doing the right thing
There was a problem hiding this comment.
Getting the range should be a trait.
There was a problem hiding this comment.
It could be a method on TreeBlock but I don't see the value of defining a trait.
| ) | ||
| .unwrap_storage(), | ||
| weight: factories.weight_factory().default_box(), | ||
| key_range: None, |
There was a problem hiding this comment.
What does None mean? Empty or unknown?
There was a problem hiding this comment.
both, I'm not sure if it can ever happen but the APIs are written in a way that such that it's possible
| } | ||
|
|
||
| /// Extends the upper bound when keys arrive in sorted order. | ||
| pub(crate) fn extend_to(&mut self, max: &K) { |
There was a problem hiding this comment.
I hope this is called only for the last value in a batch
| } | ||
|
|
||
| fn push_key(&mut self, key: &K) { | ||
| if let Some(range) = &mut self.key_range { |
There was a problem hiding this comment.
So it is called for every element added?
How do you know that this is larger than the min?
Could you have a debug_assert for that?
There was a problem hiding this comment.
there is an assert in extend_to; maybe it can be done in a less naive way I'll have a look
There was a problem hiding this comment.
Writer1::write0() asserts that the keys that it's called with are in order.
| fn seek_key_exact(&mut self, key: &K, hash: Option<u64>) -> bool { | ||
| let hash = hash.unwrap_or_else(|| key.default_hash()); | ||
| if !self.wset.maybe_contains_key(hash) { | ||
| if !self.wset.filters.maybe_contains_key(key, hash) { |
There was a problem hiding this comment.
Is the default_hash() still computed someplace?
mythical-fred
left a comment
There was a problem hiding this comment.
Two blockers: (1) two unsafe blocks in reader.rs missing // SAFETY: comments (see inline), and (2) docs.feldera.com/docs/operations/metrics.md should be updated with entries for the four new range filter metrics (range_filter_size_bytes, range_filter_hits_count, range_filter_misses_count, range_filter_hit_rate_percent). Looking at past PRs (e.g., the transaction metrics PR), the pattern is a manual edit to metrics.md alongside the code change.
| let mut max = key_factory.default_box(); | ||
|
|
||
| match root.read::<K, A>(&self.file)? { | ||
| TreeBlock::Data(data_block) => unsafe { |
There was a problem hiding this comment.
Missing // SAFETY: comment. key_range() is a safe function, so each unsafe {} block inside it needs to document the invariant. For this DataBlock arm: why is calling data_block.key(0, ...) and data_block.key(n_values()-1, ...) sound? (Bounds validity, factory type match.)
There was a problem hiding this comment.
Fair point from @blp — if the convention in this file is to propagate unsafe up to the public function signature (rather than wrapping it in a safe function with // SAFETY:), making key_range() unsafe fn with a /// # Safety doc comment works equally well. Either approach satisfies the requirement; I just want the safety contract stated somewhere.
| data_block.key(&factories, 0, min.as_mut()); | ||
| data_block.key(&factories, data_block.n_values() - 1, max.as_mut()); | ||
| }, | ||
| TreeBlock::Index(index_block) => unsafe { |
There was a problem hiding this comment.
Missing // SAFETY: comment. For the IndexBlock arm: why is index_block.get_bound(0, ...) and get_bound(n_children()*2-1, ...) sound? (Bounds validity, factory type match.)
We add stats for the range filter. This lead to some refactoring: Since we now have two filters (with another one on the way) we consolidate the stats into a single struct that can be re-used across filters. It also revealed a performance issue with the current filter stats. Because this function is extremly hot adding the hit and miss atomics in CachePadded led to a 25% increase for the ingest benchmark. Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
|
some benchmarking revealed that this optimization alone does not help much for e.g., delta lake connectors because keys get ingested mostly at random (maybe some z ordering or liquid clustering stuff would help but who knows) it helps for connectors that ingest in semi linear orders (e.g., datagen -- maybe postgres/kafka as well) |
| let mut max = key_factory.default_box(); | ||
|
|
||
| match root.read::<K, A>(&self.file)? { | ||
| TreeBlock::Data(data_block) => unsafe { |
| /// | ||
| /// The bounds are loaded from the root node when first requested and can | ||
| /// then be cached by higher-level batch types. | ||
| pub fn key_range(&self) -> Result<Option<(Box<K>, Box<K>)>, Error> { |
There was a problem hiding this comment.
I think we've been making the public functions in the reader unsafe if they are unsafe internally, because these functions don't mask the unsafety; they are as unsafe as the functions they call. (The unsafety is because rkyv deserialization is unsafe.)
| } | ||
|
|
||
| #[test] | ||
| fn one_column_key_range() { |
There was a problem hiding this comment.
It's good to have a test.
I think it would be better to add to the existing tests, too. The new addition would be analogous to test_bloom(), which is also a check that only applies to the first column in a file. I'd expect that it would use expected0 to get the expected first and last key and then call key_range and verify that the results are the same.
| if let Some(range) = &mut self.key_range { | ||
| range.extend_to(key); | ||
| } else { | ||
| self.key_range = Some(KeyRange::from_refs(key, key)); | ||
| } |
There was a problem hiding this comment.
This clones every key we write, which will be expensive for large keys. That's easy but it's not necessary, we have at least two ways to avoid it:
- The Writer could recover the key range from what it wrote, which is still in memory in Writer1::close and Writer2::close, since it writes the top-level index or data block as the last thing it does there, and then return it from Writer1::close and Writer2::close along with the bloom filter, or from Writer1::into_reader or Writer2::into_reader.
- Read it back in Reader::new() since it's probably still in the cache (we just wrote it).
| } | ||
|
|
||
| fn push_key(&mut self, key: &K) { | ||
| if let Some(range) = &mut self.key_range { |
There was a problem hiding this comment.
Writer1::write0() asserts that the keys that it's called with are in order.
| impl Add for FilterStats { | ||
| type Output = Self; | ||
|
|
||
| fn add(mut self, rhs: Self) -> Self::Output { | ||
| self.add_assign(rhs); | ||
| self | ||
| } | ||
| } | ||
|
|
||
| impl AddAssign for FilterStats { | ||
| fn add_assign(&mut self, rhs: Self) { | ||
| self.size_byte += rhs.size_byte; | ||
| self.hits += rhs.hits; | ||
| self.misses += rhs.misses; | ||
| } | ||
| } |
There was a problem hiding this comment.
I noticed the other day that we use derive_more. I think we could just write #[derive(Add, Sum)] to get these. It's a matter of taste whether you like that though.
(I just noticed this, I think we could use this elsewhere and don't.)
| hits: CachePadded<AtomicUsize>, | ||
| misses: CachePadded<AtomicUsize>, |
There was a problem hiding this comment.
The padding makes this structure big, probably 384 bytes?
Do you expect hits and misses to be accessed by different CPUs? If not, they could go in the same CachePadded.
Describe Manual Test Plan
Tested manually with a few pipelines.
Checklist
Describe Incompatible Changes
No incompatible changes.