diff --git a/Cargo.lock b/Cargo.lock index 9a9a8a5c473..c3100f28f2d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3818,6 +3818,7 @@ dependencies = [ "reqwest 0.12.24", "rkyv", "rmp-serde", + "roaring", "seq-macro", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index b367a6120b5..7c52f1c2416 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,6 +120,7 @@ enum-map = "2.7.3" erased-serde = "0.3.31" fake = "2.10" fastbloom = "0.14.0" +roaring = "0.11.3" fdlimit = "0.3.0" feldera-buffer-cache = { version = "0.271.0", path = "crates/buffer-cache" } feldera-cloud1-client = "0.1.2" diff --git a/crates/dbsp/Cargo.toml b/crates/dbsp/Cargo.toml index 4d1e7d2dad4..2ae9372ce22 100644 --- a/crates/dbsp/Cargo.toml +++ b/crates/dbsp/Cargo.toml @@ -83,6 +83,7 @@ tracing = { workspace = true } snap = { workspace = true } enum-map = { workspace = true } fastbloom = { workspace = true } +roaring = { workspace = true } core_affinity = { workspace = true } indexmap = { workspace = true } feldera-storage = { workspace = true } diff --git a/crates/dbsp/src/circuit/dbsp_handle.rs b/crates/dbsp/src/circuit/dbsp_handle.rs index 9acba9bfb9f..6e8659b4d26 100644 --- a/crates/dbsp/src/circuit/dbsp_handle.rs +++ b/crates/dbsp/src/circuit/dbsp_handle.rs @@ -8,7 +8,7 @@ use crate::operator::dynamic::balance::{ MIN_ABSOLUTE_IMPROVEMENT_THRESHOLD, MIN_RELATIVE_IMPROVEMENT_THRESHOLD, PartitioningPolicy, }; use crate::storage::backend::StorageError; -use crate::storage::file::BLOOM_FILTER_FALSE_POSITIVE_RATE; +use crate::storage::file::{BLOOM_FILTER_FALSE_POSITIVE_RATE, BatchKeyFilterKind}; use crate::trace::MergerType; use crate::trace::spine_async::MAX_LEVEL0_BATCH_SIZE_RECORDS; use crate::{ @@ -415,6 +415,13 @@ pub struct DevTweaks { /// Values outside the valid range, such as 0.0, disable Bloom filters. pub bloom_false_positive_rate: f64, + /// Per-batch key filter implementation to use for storage batches. + /// + /// `roaring_u32` uses an exact roaring bitmap for key types that can be + /// mapped exactly into a `u32` filter domain, and falls back to the + /// existing Bloom filter path for other key types. + pub batch_key_filter: BatchKeyFilterKind, + /// Maximum batch size in records for level 0 merges. pub max_level0_batch_size_records: u16, @@ -437,6 +444,7 @@ impl Default for DevTweaks { stack_overflow_backtrace: false, splitter_chunk_size_records: 10_000, bloom_false_positive_rate: BLOOM_FILTER_FALSE_POSITIVE_RATE, + batch_key_filter: BatchKeyFilterKind::Bloom, balancer_min_relative_improvement_threshold: MIN_RELATIVE_IMPROVEMENT_THRESHOLD, balancer_min_absolute_improvement_threshold: MIN_ABSOLUTE_IMPROVEMENT_THRESHOLD, balancer_balance_tax: BALANCE_TAX, diff --git a/crates/dbsp/src/dynamic/data.rs b/crates/dbsp/src/dynamic/data.rs index ce1645a1c27..7be19b830fc 100644 --- a/crates/dbsp/src/dynamic/data.rs +++ b/crates/dbsp/src/dynamic/data.rs @@ -12,6 +12,7 @@ use crate::{ rkyv::SerializeDyn, }, hash::default_hash, + utils::RoaringU32Key, }; /// Defines the minimal set of operations that must be supported by @@ -19,7 +20,16 @@ use crate::{ /// /// This trait is object safe and can be invoked via dynamic dispatch. pub trait Data: - Comparable + Clonable + SerializeDyn + DeserializableDyn + Send + Sync + Debug + AsAny + SizeOf + Comparable + + Clonable + + SerializeDyn + + DeserializableDyn + + Send + + Sync + + Debug + + AsAny + + SizeOf + + RoaringU32Key { /// Compute a hash of the object using default hasher and seed. fn default_hash(&self) -> u64; diff --git a/crates/dbsp/src/storage/file.rs b/crates/dbsp/src/storage/file.rs index d60bed1f238..1db921003e3 100644 --- a/crates/dbsp/src/storage/file.rs +++ b/crates/dbsp/src/storage/file.rs @@ -36,8 +36,10 @@ //! value and for sequential reads. It should be possible to disable indexing //! by data value for workloads that don't require it. //! -//! Layer files support approximate set membership query in `~O(1)` time using -//! [a filter block](format::FilterBlock). +//! Layer files support cheap key-membership tests using a per-batch filter +//! block. The default filter is Bloom-based; key types that can be mapped +//! exactly into a `u32` filter domain can alternatively use an exact roaring +//! bitmap filter. //! //! Layer files should support 1 TB data size. //! @@ -87,6 +89,7 @@ use std::cell::RefCell; use std::fmt::Debug; use std::{any::Any, sync::Arc}; +mod filter; pub mod format; mod item; pub mod reader; @@ -97,6 +100,7 @@ use crate::{ dynamic::{DataTrait, Erase, Factory, WithFactory}, storage::file::item::RefTup2Factory, }; +pub use filter::{BatchKeyFilter, BatchKeyFilterKind, BatchKeyFilterProbe, TrackingRoaringBitmap}; pub use item::{ArchivedItem, Item, ItemFactory, WithItemFactory}; const BLOOM_FILTER_SEED: u128 = 42; @@ -112,6 +116,9 @@ where K: DataTrait + ?Sized, A: DataTrait + ?Sized, { + key_type_name: &'static str, + supports_roaring_u32: bool, + /// Factory for creating instances of `K`. pub key_factory: &'static dyn Factory, @@ -132,6 +139,8 @@ where { fn clone(&self) -> Self { Self { + key_type_name: self.key_type_name, + supports_roaring_u32: self.supports_roaring_u32, key_factory: self.key_factory, item_factory: self.item_factory, keys_factory: self.keys_factory, @@ -153,6 +162,8 @@ where AType: DBData + Erase, { Self { + key_type_name: std::any::type_name::(), + supports_roaring_u32: KType::can_use_roaring_u32(), key_factory: WithFactory::::FACTORY, item_factory: as WithItemFactory>::ITEM_FACTORY, keys_factory: WithFactory::>::FACTORY, @@ -171,6 +182,8 @@ where item_factory: Arc::new(self.item_factory), keys_factory: Arc::new(self.keys_factory), auxes_factory: Arc::new(self.auxes_factory), + key_type_name: self.key_type_name, + supports_roaring_u32: self.supports_roaring_u32, } } } @@ -189,6 +202,8 @@ pub struct AnyFactories { item_factory: Arc, keys_factory: Arc, auxes_factory: Arc, + key_type_name: &'static str, + supports_roaring_u32: bool, } impl Debug for AnyFactories { @@ -249,12 +264,22 @@ impl AnyFactories { A: DataTrait + ?Sized, { Factories { + key_type_name: self.key_type_name, + supports_roaring_u32: self.supports_roaring_u32, key_factory: self.key_factory(), item_factory: self.item_factory(), keys_factory: self.keys_factory(), auxes_factory: self.auxes_factory(), } } + + pub(crate) fn key_type_name(&self) -> &'static str { + self.key_type_name + } + + pub(crate) fn supports_roaring_u32(&self) -> bool { + self.supports_roaring_u32 + } } /// Trait for data that can be serialized and deserialized with [`rkyv`]. diff --git a/crates/dbsp/src/storage/file/filter.rs b/crates/dbsp/src/storage/file/filter.rs new file mode 100644 index 00000000000..e4ffd682b18 --- /dev/null +++ b/crates/dbsp/src/storage/file/filter.rs @@ -0,0 +1,270 @@ +use super::{AnyFactories, BLOOM_FILTER_SEED}; +use crate::{ + dynamic::DataTrait, + storage::tracking_bloom_filter::{BloomFilterStats, TrackingBloomFilter}, +}; +use fastbloom::BloomFilter; +use roaring::RoaringBitmap; +use serde::{Deserialize, Serialize}; +use std::{ + io, + mem::size_of_val, + sync::atomic::{AtomicUsize, Ordering}, +}; + +/// Configures which per-batch key filter implementation should be used. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum BatchKeyFilterKind { + /// Use the existing Bloom filter path. + #[default] + Bloom, + + /// Use an exact roaring bitmap when the batch key type can be mapped + /// injectively into a `u32` filter domain. + /// + /// Unsupported key types fall back to the existing Bloom filter behavior. + RoaringU32, +} + +/// Result of probing a batch key filter. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum BatchKeyFilterProbe { + /// The key is definitely not present. + Absent, + + /// The key may be present. + MaybePresent, + + /// The key is definitely present. + Present, +} + +impl BatchKeyFilterProbe { + /// Returns true if the filter result requires checking the batch contents. + pub(crate) fn may_contain(self) -> bool { + !matches!(self, Self::Absent) + } +} + +/// In-memory representation of the per-batch key filter. +#[derive(Debug)] +pub enum BatchKeyFilter { + /// Probabilistic Bloom filter over key hashes. + Bloom(TrackingBloomFilter), + + /// Exact roaring bitmap for key types that support exact `u32` encoding. + RoaringU32(TrackingRoaringBitmap), +} + +impl BatchKeyFilter { + pub(crate) fn new( + requested_kind: BatchKeyFilterKind, + estimated_keys: usize, + key_factories: &AnyFactories, + bloom_false_positive_rate: Option, + ) -> Option { + match requested_kind { + BatchKeyFilterKind::Bloom => bloom_false_positive_rate.map(|rate| { + Self::Bloom(TrackingBloomFilter::new( + BloomFilter::with_false_pos(rate) + .seed(&BLOOM_FILTER_SEED) + .expected_items({ + // `.max(64)` works around a fastbloom bug that hangs when the + // expected number of items is zero (see + // ). + estimated_keys.max(64) + }), + )) + }), + BatchKeyFilterKind::RoaringU32 if key_factories.supports_roaring_u32() => { + Some(Self::RoaringU32(TrackingRoaringBitmap::default())) + } + BatchKeyFilterKind::RoaringU32 => bloom_false_positive_rate.map(|rate| { + Self::Bloom(TrackingBloomFilter::new( + BloomFilter::with_false_pos(rate) + .seed(&BLOOM_FILTER_SEED) + .expected_items(estimated_keys.max(64)), + )) + }), + } + } + + pub(crate) fn kind(&self) -> BatchKeyFilterKind { + match self { + Self::Bloom(_) => BatchKeyFilterKind::Bloom, + Self::RoaringU32(_) => BatchKeyFilterKind::RoaringU32, + } + } + + pub(crate) fn from_bloom_parts(num_hashes: u32, data: Vec) -> Self { + Self::Bloom(TrackingBloomFilter::new( + BloomFilter::from_vec(data) + .seed(&BLOOM_FILTER_SEED) + .hashes(num_hashes), + )) + } + + pub(crate) fn from_roaring_bytes(data: &[u8]) -> io::Result { + TrackingRoaringBitmap::deserialize_from(data).map(Self::RoaringU32) + } + + pub(crate) fn is_exact(&self) -> bool { + matches!(self, Self::RoaringU32(_)) + } + + pub(crate) fn stats(&self) -> BloomFilterStats { + match self { + Self::Bloom(filter) => filter.stats(), + Self::RoaringU32(filter) => filter.stats(), + } + } + + pub(crate) fn insert_key(&mut self, key: &K) + where + K: DataTrait + ?Sized, + { + match self { + Self::Bloom(filter) => { + filter.insert_hash(key.default_hash()); + } + Self::RoaringU32(filter) => { + let key = key.as_roaring_u32(); + debug_assert!( + key.is_some(), + "roaring-u32 filter was selected for an unsupported key type" + ); + if let Some(key) = key { + filter.insert(key); + } + } + } + } + + pub(crate) fn probe_key(&self, key: &K, hash: Option) -> BatchKeyFilterProbe + where + K: DataTrait + ?Sized, + { + match self { + Self::Bloom(filter) => { + if filter.contains_hash(hash.unwrap_or_else(|| key.default_hash())) { + BatchKeyFilterProbe::MaybePresent + } else { + BatchKeyFilterProbe::Absent + } + } + Self::RoaringU32(filter) => match key.as_roaring_u32() { + Some(key) => { + if filter.contains(key) { + BatchKeyFilterProbe::Present + } else { + BatchKeyFilterProbe::Absent + } + } + None => BatchKeyFilterProbe::MaybePresent, + }, + } + } + + /// Hash-only membership checks are only meaningful for Bloom filters. + /// + /// Exact filters need the original key value, so they conservatively report + /// "maybe present" here. + pub(crate) fn maybe_contains_hash(&self, hash: u64) -> bool { + match self { + Self::Bloom(filter) => filter.contains_hash(hash), + Self::RoaringU32(_) => true, + } + } +} + +/// Roaring bitmap wrapper that tracks hit/miss counts during membership probes. +#[derive(Debug, Default)] +pub struct TrackingRoaringBitmap { + bitmap: RoaringBitmap, + hits: AtomicUsize, + misses: AtomicUsize, +} + +impl TrackingRoaringBitmap { + pub(crate) fn new(bitmap: RoaringBitmap) -> Self { + Self { + bitmap, + hits: AtomicUsize::new(0), + misses: AtomicUsize::new(0), + } + } + + pub(crate) fn insert(&mut self, value: u32) { + self.bitmap.insert(value); + } + + pub(crate) fn contains(&self, value: u32) -> bool { + let is_hit = self.bitmap.contains(value); + if is_hit { + self.hits.fetch_add(1, Ordering::Release); + } else { + self.misses.fetch_add(1, Ordering::Release); + } + is_hit + } + + pub(crate) fn stats(&self) -> BloomFilterStats { + BloomFilterStats { + size_byte: size_of_val(&self.bitmap) + self.bitmap.serialized_size(), + hits: self.hits.load(Ordering::Acquire), + misses: self.misses.load(Ordering::Acquire), + } + } + + pub(crate) fn serialized_size(&self) -> usize { + self.bitmap.serialized_size() + } + + pub(crate) fn serialize_into(&self, writer: W) -> io::Result<()> { + self.bitmap.serialize_into(writer) + } + + pub(crate) fn deserialize_from(reader: R) -> io::Result { + Ok(Self::new(RoaringBitmap::deserialize_from(reader)?)) + } +} + +#[cfg(test)] +mod tests { + use super::{BatchKeyFilterProbe, TrackingRoaringBitmap}; + use crate::storage::tracking_bloom_filter::BloomFilterStats; + + #[test] + fn tracking_roaring_bitmap_stats() { + let mut filter = TrackingRoaringBitmap::default(); + filter.insert(1); + filter.insert(3); + + assert!(filter.contains(1)); + assert!(!filter.contains(2)); + assert_eq!( + filter.stats(), + BloomFilterStats { + size_byte: std::mem::size_of::() + filter.serialized_size(), + hits: 1, + misses: 1, + } + ); + } + + #[test] + fn exact_probe_semantics() { + let mut filter = TrackingRoaringBitmap::default(); + filter.insert(7); + + assert!(matches!( + if filter.contains(7) { + BatchKeyFilterProbe::Present + } else { + BatchKeyFilterProbe::Absent + }, + BatchKeyFilterProbe::Present + )); + } +} diff --git a/crates/dbsp/src/storage/file/format.rs b/crates/dbsp/src/storage/file/format.rs index 42443dfb302..1118fce08f4 100644 --- a/crates/dbsp/src/storage/file/format.rs +++ b/crates/dbsp/src/storage/file/format.rs @@ -75,12 +75,10 @@ //! //! Decompressing a compressed block yields the regular index or data block //! format starting with a [`BlockHeader`]. -use crate::storage::tracking_bloom_filter::TrackingBloomFilter; -use crate::storage::{buffer_cache::FBuf, file::BLOOM_FILTER_SEED}; +use crate::storage::buffer_cache::FBuf; use binrw::{BinRead, BinResult, BinWrite, Error as BinError, binrw, binwrite}; #[cfg(doc)] use crc32c; -use fastbloom::BloomFilter; use num_derive::FromPrimitive; use num_traits::FromPrimitive; use size_of::SizeOf; @@ -107,8 +105,11 @@ pub const INDEX_BLOCK_MAGIC: [u8; 4] = *b"LFIB"; /// Magic number for the file trailer block. pub const FILE_TRAILER_BLOCK_MAGIC: [u8; 4] = *b"LFFT"; -/// Magic number for filter blocks. -pub const FILTER_BLOCK_MAGIC: [u8; 4] = *b"LFFB"; +/// Magic number for Bloom filter blocks. +pub const BLOOM_FILTER_BLOCK_MAGIC: [u8; 4] = *b"LFFB"; + +/// Magic number for roaring bitmap filter blocks. +pub const ROARING_FILTER_BLOCK_MAGIC: [u8; 4] = *b"LFFR"; /// 8-byte header at the beginning of each block. /// @@ -163,13 +164,13 @@ pub struct FileTrailer { #[br(count = n_columns)] pub columns: Vec, - /// File offset in bytes of the [FilterBlock]. + /// File offset in bytes of the filter block. /// /// This is 0 if there is no filter block, or if the filter block size is /// bigger than `i32::MAX`. pub filter_offset: u64, - /// Size in bytes of the [FilterBlock]. + /// Size in bytes of the filter block. /// /// This is 0 if there is no filter block, or if the filter block size is /// bigger than `i32::MAX`. @@ -197,7 +198,7 @@ pub struct FileTrailer { /// future expansion. pub incompatible_features: u64, - /// File offset in bytes of the [FilterBlock]. + /// File offset in bytes of the filter block. /// /// This is 0 if there is no filter block, or if the filter block size is /// less than `i32::MAX`. If this is nonzero, then @@ -205,7 +206,7 @@ pub struct FileTrailer { /// [FileTrailer::compatible_features]. pub filter_offset64: u64, - /// Size in bytes of the [FilterBlock]. + /// Size in bytes of the filter block. /// /// This is 0 if there is no filter block, or if the filter block size is /// less than `i32::MAX`. If this is nonzero, then @@ -537,12 +538,15 @@ impl Compression { /// /// The Bloom filter contains a member for each key in column 0. #[binrw] -pub struct FilterBlock { +pub struct BloomFilterBlock { /// Block header with "LFFB" magic. - #[brw(assert(header.magic == FILTER_BLOCK_MAGIC, "filter block has bad magic"))] + #[brw(assert( + header.magic == BLOOM_FILTER_BLOCK_MAGIC, + "bloom filter block has bad magic" + ))] pub header: BlockHeader, - /// [BloomFilter::num_hashes]. + /// Number of hashes used by the Bloom filter. pub num_hashes: u32, /// Number of elements in `data`. @@ -554,24 +558,17 @@ pub struct FilterBlock { pub data: Vec, } -impl From for TrackingBloomFilter { - fn from(block: FilterBlock) -> Self { - TrackingBloomFilter::new( - BloomFilter::from_vec(block.data) - .seed(&BLOOM_FILTER_SEED) - .hashes(block.num_hashes), - ) - } -} - /// A block representing a Bloom filter (with data by reference). #[binwrite] -pub struct FilterBlockRef<'a> { +pub struct BloomFilterBlockRef<'a> { /// Block header with "LFFB" magic. - #[bw(assert(header.magic == FILTER_BLOCK_MAGIC, "filter block has bad magic"))] + #[bw(assert( + header.magic == BLOOM_FILTER_BLOCK_MAGIC, + "bloom filter block has bad magic" + ))] pub header: BlockHeader, - /// [BloomFilter::num_hashes]. + /// Number of hashes used by the Bloom filter. pub num_hashes: u32, /// Number of elements in `data`. @@ -582,12 +579,39 @@ pub struct FilterBlockRef<'a> { pub data: &'a [u64], } -impl<'a> From<&'a TrackingBloomFilter> for FilterBlockRef<'a> { - fn from(value: &'a TrackingBloomFilter) -> Self { - FilterBlockRef { - header: BlockHeader::new(&FILTER_BLOCK_MAGIC), - num_hashes: value.num_hashes(), - data: value.as_slice(), - } - } +/// A block representing a roaring bitmap filter. +#[binrw] +pub struct RoaringBitmapFilterBlock { + /// Block header with "LFFR" magic. + #[brw(assert( + header.magic == ROARING_FILTER_BLOCK_MAGIC, + "roaring filter block has bad magic" + ))] + pub header: BlockHeader, + + /// Number of bytes in `data`. + #[bw(try_calc(u64::try_from(data.len())))] + pub len: u64, + + /// Serialized roaring bitmap contents. + #[br(count = len)] + pub data: Vec, +} + +/// A block representing a roaring bitmap filter (with data by reference). +#[binwrite] +pub struct RoaringBitmapFilterBlockRef<'a> { + /// Block header with "LFFR" magic. + #[bw(assert( + header.magic == ROARING_FILTER_BLOCK_MAGIC, + "roaring filter block has bad magic" + ))] + pub header: BlockHeader, + + /// Number of bytes in `data`. + #[bw(try_calc(u64::try_from(data.len())))] + pub len: u64, + + /// Serialized roaring bitmap contents. + pub data: &'a [u8], } diff --git a/crates/dbsp/src/storage/file/reader.rs b/crates/dbsp/src/storage/file/reader.rs index 386535ccad9..c3817d90752 100644 --- a/crates/dbsp/src/storage/file/reader.rs +++ b/crates/dbsp/src/storage/file/reader.rs @@ -3,11 +3,13 @@ //! [`Reader`] is the top-level interface for reading layer files. use super::format::{Compression, FileTrailer}; -use super::{AnyFactories, Deserializer, Factories}; +use super::{ + AnyFactories, BatchKeyFilter, BatchKeyFilterKind, BatchKeyFilterProbe, Deserializer, Factories, +}; use crate::dynamic::{DynVec, WeightTrait}; use crate::storage::buffer_cache::CacheAccess; -use crate::storage::file::format::FilterBlock; -use crate::storage::tracking_bloom_filter::{BloomFilterStats, TrackingBloomFilter}; +use crate::storage::file::format::{BloomFilterBlock, RoaringBitmapFilterBlock}; +use crate::storage::tracking_bloom_filter::BloomFilterStats; use crate::storage::{ backend::StorageError, buffer_cache::{BufferCache, FBuf}, @@ -140,6 +142,29 @@ pub enum CorruptionError { inner: String, }, + /// Filter block has an unknown magic number. + #[error("Filter block ({location}) has unknown magic {magic:?}")] + UnknownFilterBlockMagic { + /// Block location. + location: BlockLocation, + + /// Unexpected magic number. + magic: [u8; 4], + }, + + /// Filter block contents are malformed. + #[error("Invalid {kind} filter block ({location}): {inner}")] + InvalidFilterEncoding { + /// Block location. + location: BlockLocation, + + /// Filter kind being decoded. + kind: &'static str, + + /// Underlying decode error. + inner: String, + }, + /// Array overflows block bounds. #[error( "{count}-element array of {each}-byte elements starting at offset {offset} within block overflows {block_size}-byte block" @@ -1299,16 +1324,57 @@ struct Column { n_rows: u64, } -impl FilterBlock { - fn new(file_handle: &dyn FileReader, location: BlockLocation) -> Result { - let block = file_handle.read_block(location)?; - Self::read_le(&mut io::Cursor::new(block.as_slice())).map_err(|e| { - Error::Corruption(CorruptionError::Binrw { - location, - block_type: "filter", - inner: e.to_string(), - }) +fn parse_filter_block BinRead = ()>>( + block: &FBuf, + location: BlockLocation, + block_type: &'static str, +) -> Result { + T::read_le(&mut io::Cursor::new(block.as_slice())).map_err(|e| { + Error::Corruption(CorruptionError::Binrw { + location, + block_type, + inner: e.to_string(), }) + }) +} + +fn read_filter_block( + file_handle: &dyn FileReader, + location: BlockLocation, +) -> Result { + let block = file_handle.read_block(location)?; + if block.len() < 8 { + return Err(Error::Corruption(CorruptionError::InvalidFilterEncoding { + location, + kind: "unknown", + inner: format!("block too short: {} bytes", block.len()), + })); + } + let mut magic = [0u8; 4]; + magic.copy_from_slice(&block[4..8]); + + match magic { + crate::storage::file::format::BLOOM_FILTER_BLOCK_MAGIC => { + let block: BloomFilterBlock = parse_filter_block(&block, location, "bloom filter")?; + Ok(BatchKeyFilter::from_bloom_parts( + block.num_hashes, + block.data, + )) + } + crate::storage::file::format::ROARING_FILTER_BLOCK_MAGIC => { + let block: RoaringBitmapFilterBlock = + parse_filter_block(&block, location, "roaring filter")?; + BatchKeyFilter::from_roaring_bytes(&block.data).map_err(|e| { + Error::Corruption(CorruptionError::InvalidFilterEncoding { + location, + kind: "roaring", + inner: e.to_string(), + }) + }) + } + magic => Err(Error::Corruption( + CorruptionError::UnknownFilterBlockMagic { location, magic }, + )), } } @@ -1498,7 +1564,7 @@ where #[derive(Debug)] pub struct Reader { file: ImmutableFileRef, - bloom_filter: Option, + key_filter: Option, columns: Vec, /// `fn() -> T` is `Send` and `Sync` regardless of `T`. See @@ -1526,7 +1592,7 @@ where factories: &[&AnyFactories], cache: fn() -> Arc, file: Arc, - bloom_filter: Option, + key_filter: Option, ) -> Result { let file_size = file.get_size()?; if file_size < 512 || (file_size % 512) != 0 { @@ -1595,27 +1661,26 @@ where } } - fn read_filter_block( + fn read_filter_block_at( file_handle: &dyn FileReader, offset: u64, size: usize, - ) -> Result { - Ok(FilterBlock::new( + ) -> Result { + read_filter_block( file_handle, BlockLocation::new(offset, size).map_err(|error: InvalidBlockLocation| { Error::Corruption(CorruptionError::InvalidFilterLocation(error)) })?, - )? - .into()) + ) } - let bloom_filter = match bloom_filter { - Some(bloom_filter) => Some(bloom_filter), - None if file_trailer.has_filter64() => Some(read_filter_block( + let key_filter = match key_filter { + Some(key_filter) => Some(key_filter), + None if file_trailer.has_filter64() => Some(read_filter_block_at( &*file, file_trailer.filter_offset64, file_trailer.filter_size64 as usize, )?), - None if file_trailer.filter_offset != 0 => Some(read_filter_block( + None if file_trailer.filter_offset != 0 => Some(read_filter_block_at( &*file, file_trailer.filter_offset, file_trailer.filter_size as usize, @@ -1632,7 +1697,7 @@ where file_trailer.version, ), columns, - bloom_filter, + key_filter, _phantom: PhantomData, }) } @@ -1679,17 +1744,22 @@ where Ok(self.file.file_handle.get_size()?) } - /// Returns statistics of the Bloom filter, including its size in bytes. + /// Returns statistics of the batch key filter, including its size in bytes. /// - /// If the file doesn't have a Bloom filter, returns a default of zeros. + /// If the file doesn't have a filter, returns a default of zeros. pub fn filter_stats(&self) -> BloomFilterStats { - if let Some(bloom_filter) = &self.bloom_filter { - bloom_filter.stats() + if let Some(key_filter) = &self.key_filter { + key_filter.stats() } else { BloomFilterStats::default() } } + /// Returns the kind of batch key filter loaded for this file, if any. + pub fn filter_kind(&self) -> Option { + self.key_filter.as_ref().map(BatchKeyFilter::kind) + } + /// Evict this file from the cache. #[cfg(test)] pub fn evict(&self) { @@ -1714,11 +1784,23 @@ where A: DataTrait + ?Sized, (&'static K, &'static A, N): ColumnSpec, { - /// Asks the bloom filter of the reader if we have the key. + /// Probes the batch key filter using the original key value. + /// + /// Exact filters can return [`BatchKeyFilterProbe::Present`] or + /// [`BatchKeyFilterProbe::Absent`]. + pub fn probe_key_filter(&self, key: &K, hash: Option) -> BatchKeyFilterProbe { + self.key_filter + .as_ref() + .map_or(BatchKeyFilterProbe::MaybePresent, |filter| { + filter.probe_key(key, hash) + }) + } + + /// Asks the batch key filter whether the hashed key may be present. pub fn maybe_contains_key(&self, hash: u64) -> bool { - self.bloom_filter + self.key_filter .as_ref() - .is_none_or(|b| b.contains_hash(hash)) + .is_none_or(|filter| filter.maybe_contains_hash(hash)) } /// Returns a [`RowGroup`] for all of the rows in column 0. @@ -3043,7 +3125,7 @@ where } } -/// A `DynVec`, possibly filtered by a Bloom filter. +/// A `DynVec`, possibly filtered by a batch key filter. struct FilteredKeys<'b, K> where K: ?Sized, @@ -3051,17 +3133,17 @@ where /// Sorted array to keys to retrieve. queried_keys: &'b DynVec, - /// Indexes into `queried_keys` of the keys that pass the Bloom filter. If - /// this is `None`, then enough of the keys passed the Bloom filter that we + /// Indexes into `queried_keys` of the keys that pass the filter. If this + /// is `None`, then enough of the keys passed an approximate filter that we /// just take all of them. - bloom_keys: Option>, + filtered_keys: Option>, } impl<'b, K> FilteredKeys<'b, K> where K: DataTrait + ?Sized, { - /// Returns `keys`, filtered using `reader.maybe_contains_key()`. + /// Returns `keys`, filtered using the reader's batch key filter. fn new<'a, A, N>(reader: &'a Reader<(&'static K, &'static A, N)>, keys: &'b DynVec) -> Self where A: DataTrait + ?Sized, @@ -3069,34 +3151,35 @@ where { debug_assert!(keys.is_sorted_by(&|a, b| a.cmp(b))); - // Pass keys into the Bloom filter until 1/300th of them pass the Bloom - // filter. Empirically, this seems to good enough for the common case - // where the data passed into a "distinct" operator is actually distinct - // but we get some false positives from the Bloom filter. Because the - // keys that go into a "distinct" operator are often large, we don't - // want to pass all of them into the Bloom filter if we're going to have - // to deserialize them anyhow later. - let mut bloom_keys = SmallVec::<[_; 50]>::new(); + let exact_filter = reader + .key_filter + .as_ref() + .is_some_and(BatchKeyFilter::is_exact); + + // Pass keys into the filter until 1/300th of them pass an approximate + // filter. Exact filters don't have false positives, so they are always + // worth applying to the full key set. + let mut filtered_keys = SmallVec::<[_; 50]>::new(); for (index, key) in keys.dyn_iter().enumerate() { - if reader.maybe_contains_key(key.default_hash()) { - bloom_keys.push(index); - if bloom_keys.len() >= keys.len() / 300 { + if reader.probe_key_filter(key, None).may_contain() { + filtered_keys.push(index); + if !exact_filter && filtered_keys.len() >= keys.len() / 300 { return Self { queried_keys: keys, - bloom_keys: None, + filtered_keys: None, }; } } } Self { queried_keys: keys, - bloom_keys: Some(bloom_keys.into_vec()), + filtered_keys: Some(filtered_keys.into_vec()), } } fn len(&self) -> usize { - match &self.bloom_keys { - Some(bloom_keys) => bloom_keys.len(), + match &self.filtered_keys { + Some(filtered_keys) => filtered_keys.len(), None => self.queried_keys.len(), } } @@ -3113,8 +3196,8 @@ where type Output = K; fn index(&self, index: usize) -> &Self::Output { - match &self.bloom_keys { - Some(bloom_keys) => &self.queried_keys[bloom_keys[index]], + match &self.filtered_keys { + Some(filtered_keys) => &self.queried_keys[filtered_keys[index]], None => &self.queried_keys[index], } } diff --git a/crates/dbsp/src/storage/file/test.rs b/crates/dbsp/src/storage/file/test.rs index d7bdc8a1387..92d57612571 100644 --- a/crates/dbsp/src/storage/file/test.rs +++ b/crates/dbsp/src/storage/file/test.rs @@ -1,13 +1,16 @@ -use std::{marker::PhantomData, sync::Arc}; +use std::{io::Cursor, marker::PhantomData, sync::Arc}; use crate::{ DBWeight, - dynamic::{Data, DynWeight, Factory, LeanVec, Vector, WithFactory}, + dynamic::{Data, DataTrait, DynWeight, Factory, LeanVec, Vector, WithFactory}, storage::{ - backend::StorageBackend, + backend::{BlockLocation, StorageBackend}, buffer_cache::BufferCache, file::{ - format::Compression, + BatchKeyFilterKind, BatchKeyFilterProbe, + format::{ + BLOOM_FILTER_BLOCK_MAGIC, Compression, FileTrailer, ROARING_FILTER_BLOCK_MAGIC, + }, reader::{BulkRows, Reader}, }, }, @@ -15,7 +18,7 @@ use crate::{ BatchReaderFactories, Builder, VecIndexedWSetFactories, VecWSetFactories, ord::vec::{indexed_wset_batch::VecIndexedWSetBuilder, wset_batch::VecWSetBuilder}, }, - utils::test::init_test_logger, + utils::{Tup1, test::init_test_logger}, }; use super::{ @@ -28,6 +31,7 @@ use crate::{ DBData, dynamic::{DynData, Erase}, }; +use binrw::BinRead; use feldera_types::config::{StorageConfig, StorageOptions}; use rand::{Rng, seq::SliceRandom, thread_rng}; use tempfile::tempdir; @@ -683,6 +687,41 @@ fn test_bloom( } } +fn filter_block_magic(reader: &Reader<(&'static K, &'static A, N)>) -> Option<[u8; 4]> +where + K: DataTrait + ?Sized, + A: DataTrait + ?Sized, + (&'static K, &'static A, N): ColumnSpec, +{ + let file_size = reader.byte_size().unwrap() as usize; + let trailer_block = reader + .file_handle() + .read_block(BlockLocation::new((file_size - 512) as u64, 512).unwrap()) + .unwrap(); + let trailer = FileTrailer::read_le(&mut Cursor::new(trailer_block.as_slice())).unwrap(); + let offset = if trailer.has_filter64() { + trailer.filter_offset64 + } else { + trailer.filter_offset + }; + let size = if trailer.has_filter64() { + trailer.filter_size64 as usize + } else { + trailer.filter_size as usize + }; + if offset == 0 { + return None; + } + + let filter_block = reader + .file_handle() + .read_block(BlockLocation::new(offset, size).unwrap()) + .unwrap(); + let mut magic = [0u8; 4]; + magic.copy_from_slice(&filter_block[4..8]); + Some(magic) +} + fn test_two_columns(parameters: Parameters) where T: TwoColumns, @@ -926,7 +965,7 @@ where let reader = if reopen { println!("closing writer and reopening as reader"); let path = writer.path().clone(); - let (_file_handle, _bloom_filter) = writer.close().unwrap(); + let (_file_handle, _key_filter) = writer.close().unwrap(); Reader::open( &[&factories.any_factories()], test_buffer_cache, @@ -981,7 +1020,7 @@ fn test_one_column_zset( let reader = if reopen { println!("closing writer and reopening as reader"); let path = writer.path().clone(); - let (_file_handle, _bloom_filter) = writer.close().unwrap(); + let (_file_handle, _key_filter) = writer.close().unwrap(); Reader::open( &[&factories.any_factories()], test_buffer_cache, @@ -999,6 +1038,213 @@ fn test_one_column_zset( } } +#[test] +fn test_bloom_filter_roundtrip_and_block_kind() { + init_test_logger(); + + for reopen in [false, true] { + let factories = Factories::::new::(); + let tempdir = tempdir().unwrap(); + let storage_backend = ::new( + &StorageConfig { + path: tempdir.path().to_string_lossy().to_string(), + cache: Default::default(), + }, + &StorageOptions::default(), + ) + .unwrap(); + + let mut writer = Writer1::new( + &factories, + test_buffer_cache, + &*storage_backend, + Parameters::default().with_filter_kind(BatchKeyFilterKind::Bloom), + 3, + ) + .unwrap(); + for key in [1i64, 3, 7] { + writer.write0((&key, &())).unwrap(); + } + + let reader = if reopen { + let path = writer.path().clone(); + let (_file_handle, _key_filter) = writer.close().unwrap(); + Reader::open( + &[&factories.any_factories()], + test_buffer_cache, + &*storage_backend, + &path, + ) + .unwrap() + } else { + writer.into_reader().unwrap() + }; + + assert_eq!(reader.filter_kind(), Some(BatchKeyFilterKind::Bloom)); + for key in [1i64, 3, 7] { + assert!(reader.maybe_contains_key(key.default_hash())); + assert!(reader.probe_key_filter(key.erase(), None).may_contain()); + } + assert_eq!(filter_block_magic(&reader), Some(BLOOM_FILTER_BLOCK_MAGIC)); + } +} + +#[test] +fn test_roaring_u32_filter_roundtrip_exact_and_block_kind() { + init_test_logger(); + + for reopen in [false, true] { + let factories = Factories::::new::(); + let tempdir = tempdir().unwrap(); + let storage_backend = ::new( + &StorageConfig { + path: tempdir.path().to_string_lossy().to_string(), + cache: Default::default(), + }, + &StorageOptions::default(), + ) + .unwrap(); + + let mut writer = Writer1::new( + &factories, + test_buffer_cache, + &*storage_backend, + Parameters::default().with_filter_kind(BatchKeyFilterKind::RoaringU32), + 3, + ) + .unwrap(); + for key in [1u32, 3, 7] { + writer.write0((&key, &())).unwrap(); + } + + let reader = if reopen { + let path = writer.path().clone(); + let (_file_handle, _key_filter) = writer.close().unwrap(); + Reader::open( + &[&factories.any_factories()], + test_buffer_cache, + &*storage_backend, + &path, + ) + .unwrap() + } else { + writer.into_reader().unwrap() + }; + + assert_eq!(reader.filter_kind(), Some(BatchKeyFilterKind::RoaringU32)); + for key in [1u32, 3, 7] { + assert_eq!( + reader.probe_key_filter(key.erase(), None), + BatchKeyFilterProbe::Present + ); + } + for key in [0u32, 2, 9] { + assert_eq!( + reader.probe_key_filter(key.erase(), None), + BatchKeyFilterProbe::Absent + ); + } + assert_eq!( + filter_block_magic(&reader), + Some(ROARING_FILTER_BLOCK_MAGIC) + ); + } +} + +#[test] +fn test_roaring_tup1_i32_filter_roundtrip_exact_and_block_kind() { + init_test_logger(); + + for reopen in [false, true] { + let factories = Factories::::new::, ()>(); + let tempdir = tempdir().unwrap(); + let storage_backend = ::new( + &StorageConfig { + path: tempdir.path().to_string_lossy().to_string(), + cache: Default::default(), + }, + &StorageOptions::default(), + ) + .unwrap(); + + let mut writer = Writer1::new( + &factories, + test_buffer_cache, + &*storage_backend, + Parameters::default().with_filter_kind(BatchKeyFilterKind::RoaringU32), + 3, + ) + .unwrap(); + for key in [Tup1(-7i32), Tup1(1), Tup1(3)] { + writer.write0((&key, &())).unwrap(); + } + + let reader = if reopen { + let path = writer.path().clone(); + let (_file_handle, _key_filter) = writer.close().unwrap(); + Reader::open( + &[&factories.any_factories()], + test_buffer_cache, + &*storage_backend, + &path, + ) + .unwrap() + } else { + writer.into_reader().unwrap() + }; + + assert_eq!(reader.filter_kind(), Some(BatchKeyFilterKind::RoaringU32)); + for key in [Tup1(-7i32), Tup1(1), Tup1(3)] { + assert_eq!( + reader.probe_key_filter(key.erase(), None), + BatchKeyFilterProbe::Present + ); + } + for key in [Tup1(-8i32), Tup1(0), Tup1(9)] { + assert_eq!( + reader.probe_key_filter(key.erase(), None), + BatchKeyFilterProbe::Absent + ); + } + assert_eq!( + filter_block_magic(&reader), + Some(ROARING_FILTER_BLOCK_MAGIC) + ); + } +} + +#[test] +fn test_roaring_request_falls_back_to_bloom_for_non_u32_keys() { + init_test_logger(); + + let factories = Factories::::new::(); + let tempdir = tempdir().unwrap(); + let storage_backend = ::new( + &StorageConfig { + path: tempdir.path().to_string_lossy().to_string(), + cache: Default::default(), + }, + &StorageOptions::default(), + ) + .unwrap(); + + let mut writer = Writer1::new( + &factories, + test_buffer_cache, + &*storage_backend, + Parameters::default().with_filter_kind(BatchKeyFilterKind::RoaringU32), + 2, + ) + .unwrap(); + for key in [5i64, 8] { + writer.write0((&key, &())).unwrap(); + } + + let reader = writer.into_reader().unwrap(); + assert_eq!(reader.filter_kind(), Some(BatchKeyFilterKind::Bloom)); + assert_eq!(filter_block_magic(&reader), Some(BLOOM_FILTER_BLOCK_MAGIC)); +} + fn test_i64_helper(parameters: Parameters) { init_test_logger(); test_one_column( diff --git a/crates/dbsp/src/storage/file/writer.rs b/crates/dbsp/src/storage/file/writer.rs index db5a9e62dfa..145a1b3f4a6 100644 --- a/crates/dbsp/src/storage/file/writer.rs +++ b/crates/dbsp/src/storage/file/writer.rs @@ -8,11 +8,11 @@ use crate::storage::{ backend::{BlockLocation, FileReader, FileWriter, StorageBackend, StorageError}, buffer_cache::{BufferCache, FBuf, FBufSerializer, LimitExceeded}, file::{ - BLOOM_FILTER_SEED, format::{ - BlockHeader, COMPATIBLE_FEATURE_FILTER64, DATA_BLOCK_MAGIC, DataBlockHeader, - FILE_TRAILER_BLOCK_MAGIC, FileTrailer, FileTrailerColumn, FilterBlockRef, FixedLen, - INDEX_BLOCK_MAGIC, IndexBlockHeader, NodeType, VERSION_NUMBER, Varint, + BlockHeader, BloomFilterBlockRef, COMPATIBLE_FEATURE_FILTER64, DATA_BLOCK_MAGIC, + DataBlockHeader, FILE_TRAILER_BLOCK_MAGIC, FileTrailer, FileTrailerColumn, FixedLen, + INDEX_BLOCK_MAGIC, IndexBlockHeader, NodeType, ROARING_FILTER_BLOCK_MAGIC, + RoaringBitmapFilterBlockRef, VERSION_NUMBER, Varint, }, reader::TreeNode, with_serializer, @@ -25,7 +25,6 @@ use binrw::{ use crc32c::crc32c; #[cfg(debug_assertions)] use dyn_clone::clone_box; -use fastbloom::BloomFilter; use feldera_buffer_cache::CacheEntry; use feldera_storage::StoragePath; use snap::raw::{Encoder, max_compress_len}; @@ -41,8 +40,7 @@ use std::{ use tracing::info; use super::format::Compression; -use super::{AnyFactories, Factories, reader::Reader}; -use crate::storage::tracking_bloom_filter::TrackingBloomFilter; +use super::{AnyFactories, BatchKeyFilter, BatchKeyFilterKind, Factories, reader::Reader}; use crate::{ Runtime, dynamic::{DataTrait, DeserializeDyn, SerializeDyn}, @@ -139,6 +137,11 @@ pub struct Parameters { /// How to compress input and data blocks in the output file. pub compression: Option, + + /// Optional override for the per-batch key filter implementation. + /// + /// If unset, the writer uses the runtime `DevTweaks` selection. + pub filter_kind: Option, } impl Parameters { @@ -168,6 +171,15 @@ impl Parameters { ..self } } + + /// Returns these parameters with the batch key filter implementation + /// overridden. + pub fn with_filter_kind(self, filter_kind: BatchKeyFilterKind) -> Self { + Self { + filter_kind: Some(filter_kind), + ..self + } + } } impl Default for Parameters { @@ -179,6 +191,7 @@ impl Default for Parameters { #[cfg(test)] max_branch: usize::MAX, compression: Some(Compression::Snappy), + filter_kind: None, } } } @@ -1095,12 +1108,52 @@ impl BlockWriter { struct Writer { cache: fn() -> Arc, writer: BlockWriter, - bloom_filter: Option, + key_filter: Option, cws: Vec, finished_columns: Vec, } impl Writer { + fn filter_kind(parameters: &Parameters) -> BatchKeyFilterKind { + parameters + .filter_kind + .unwrap_or_else(|| Runtime::with_dev_tweaks(|dev_tweaks| dev_tweaks.batch_key_filter)) + } + + fn log_filter_selection( + requested_filter_kind: BatchKeyFilterKind, + key_type_name: &'static str, + key_filter: Option<&BatchKeyFilter>, + ) { + match key_filter.map(BatchKeyFilter::kind) { + Some(BatchKeyFilterKind::RoaringU32) => { + static ONCE: Once = Once::new(); + ONCE.call_once(|| { + info!("Using exact roaring_u32 batch key filter for key type {key_type_name}"); + }); + } + Some(BatchKeyFilterKind::Bloom) + if requested_filter_kind == BatchKeyFilterKind::RoaringU32 => + { + static ONCE: Once = Once::new(); + ONCE.call_once(|| { + info!( + "Requested roaring_u32 batch key filter for key type {key_type_name}; falling back to Bloom filter" + ); + }); + } + None if requested_filter_kind == BatchKeyFilterKind::RoaringU32 => { + static ONCE: Once = Once::new(); + ONCE.call_once(|| { + info!( + "Requested roaring_u32 batch key filter for key type {key_type_name}, but Bloom filters are disabled; batch key filtering disabled" + ); + }); + } + _ => {} + } + } + fn bloom_false_positive_rate() -> Option { let rate = Runtime::with_dev_tweaks(|dev_tweaks| dev_tweaks.bloom_false_positive_rate); let rate = (rate > 0.0 && rate < 1.0).then_some(rate); @@ -1126,18 +1179,23 @@ impl Writer { ) -> Result { assert_eq!(factories.len(), n_columns); - let bloom_filter = Self::bloom_false_positive_rate().map(|bloom_false_positive_rate| { - TrackingBloomFilter::new( - BloomFilter::with_false_pos(bloom_false_positive_rate) - .seed(&BLOOM_FILTER_SEED) - .expected_items({ - // `.max(64)` works around a fastbloom bug that hangs when the - // expected number of items is zero (see - // https://github.com/tomtomwombat/fastbloom/issues/17). - estimated_keys.max(64) - }), - ) - }); + let requested_filter_kind = Self::filter_kind(¶meters); + let bloom_false_positive_rate = match requested_filter_kind { + BatchKeyFilterKind::Bloom => Self::bloom_false_positive_rate(), + BatchKeyFilterKind::RoaringU32 if factories[0].supports_roaring_u32() => None, + BatchKeyFilterKind::RoaringU32 => Self::bloom_false_positive_rate(), + }; + let key_filter = BatchKeyFilter::new( + requested_filter_kind, + estimated_keys, + factories[0], + bloom_false_positive_rate, + ); + Self::log_filter_selection( + requested_filter_kind, + factories[0].key_type_name(), + key_filter.as_ref(), + ); let parameters = Arc::new(parameters); let cws = factories .iter() @@ -1148,7 +1206,7 @@ impl Writer { let writer = Self { cache, writer: BlockWriter::new(cache(), storage_backend.create_with_prefix(&worker.into())?), - bloom_filter, + key_filter, cws, finished_columns, }; @@ -1169,9 +1227,8 @@ impl Writer { }; if column == 0 { - // Add `key` to bloom filter. - if let Some(bloom_filter) = &mut self.bloom_filter { - bloom_filter.insert_hash(item.0.default_hash()); + if let Some(key_filter) = &mut self.key_filter { + key_filter.insert_key(item.0); } } @@ -1195,24 +1252,44 @@ impl Writer { Ok(()) } - pub fn close( - mut self, - ) -> Result<(Arc, Option), StorageError> { + pub fn close(mut self) -> Result<(Arc, Option), StorageError> { debug_assert_eq!(self.cws.len(), self.finished_columns.len()); - // Write the Bloom filter. - let filter_location = if let Some(bloom_filter) = &self.bloom_filter { - let filter_block = FilterBlockRef::from(bloom_filter); - // std::mem::size_of::() should be an - // upper bound: in-memory struct size + bloom payload bytes. - let estimated_block_size = (std::mem::size_of::() - + std::mem::size_of_val(filter_block.data)) - // our binrw min block size is 512 so we round it up to avoid another - // reallocation - .next_multiple_of(512); - self.writer - .write_block(filter_block.into_block(estimated_block_size), None)? - .1 + // Write the batch key filter. + let filter_location = if let Some(key_filter) = &self.key_filter { + match key_filter { + BatchKeyFilter::Bloom(filter) => { + let filter_block = BloomFilterBlockRef { + header: BlockHeader::new( + &crate::storage::file::format::BLOOM_FILTER_BLOCK_MAGIC, + ), + num_hashes: filter.num_hashes(), + data: filter.as_slice(), + }; + let estimated_block_size = (std::mem::size_of::() + + std::mem::size_of_val(filter_block.data)) + .next_multiple_of(512); + self.writer + .write_block(filter_block.into_block(estimated_block_size), None)? + .1 + } + BatchKeyFilter::RoaringU32(filter) => { + let mut data = Vec::with_capacity(filter.serialized_size()); + filter + .serialize_into(&mut data) + .map_err(|_| StorageError::BloomFilter)?; + let filter_block = RoaringBitmapFilterBlockRef { + header: BlockHeader::new(&ROARING_FILTER_BLOCK_MAGIC), + data: &data, + }; + let estimated_block_size = (std::mem::size_of::() + + data.len()) + .next_multiple_of(512); + self.writer + .write_block(filter_block.into_block(estimated_block_size), None)? + .1 + } + } } else { BlockLocation { offset: 0, size: 0 } }; @@ -1249,7 +1326,7 @@ impl Writer { self.writer .insert_cache_entry(location, Arc::new(file_trailer)); - Ok((self.writer.complete()?, self.bloom_filter)) + Ok((self.writer.complete()?, self.key_filter)) } pub fn n_columns(&self) -> usize { @@ -1368,9 +1445,7 @@ where /// Finishes writing the layer file and returns the writer passed to /// [`new`](Self::new). - pub fn close( - mut self, - ) -> Result<(Arc, Option), StorageError> { + pub fn close(mut self) -> Result<(Arc, Option), StorageError> { self.inner.finish_column::(0)?; self.inner.close() } @@ -1392,9 +1467,9 @@ where let any_factories = self.factories.any_factories(); let cache = self.inner.cache; - let (file_handle, bloom_filter) = self.close()?; + let (file_handle, key_filter) = self.close()?; - Reader::new(&[&any_factories], cache, file_handle, bloom_filter) + Reader::new(&[&any_factories], cache, file_handle, key_filter) } } @@ -1545,9 +1620,7 @@ where /// /// This function will panic if [`write1`](Self::write1) has been called /// without a subsequent call to [`write0`](Self::write0). - pub fn close( - mut self, - ) -> Result<(Arc, Option), StorageError> { + pub fn close(mut self) -> Result<(Arc, Option), StorageError> { self.inner.finish_column::(0)?; self.inner.finish_column::(1)?; self.inner.close() @@ -1574,12 +1647,12 @@ where let any_factories0 = self.factories0.any_factories(); let any_factories1 = self.factories1.any_factories(); let cache = self.inner.cache; - let (file_handle, bloom_filter) = self.close()?; + let (file_handle, key_filter) = self.close()?; Reader::new( &[&any_factories0, &any_factories1], cache, file_handle, - bloom_filter, + key_filter, ) } } diff --git a/crates/dbsp/src/trace.rs b/crates/dbsp/src/trace.rs index 687e84f73a9..ebdb108da9a 100644 --- a/crates/dbsp/src/trace.rs +++ b/crates/dbsp/src/trace.rs @@ -34,7 +34,7 @@ use crate::trace::cursor::{ DefaultPushCursor, FilteredMergeCursor, FilteredMergeCursorWithSnapshot, PushCursor, UnfilteredMergeCursor, }; -use crate::utils::IsNone; +use crate::utils::{IsNone, RoaringU32Key}; use crate::{dynamic::ArchivedDBData, storage::buffer_cache::FBuf}; use cursor::CursorFactory; use enum_map::Enum; @@ -102,6 +102,7 @@ pub trait DBData: + Debug + ArchivedDBData + IsNone + + RoaringU32Key + 'static { } @@ -119,6 +120,7 @@ impl DBData for T where + Debug + ArchivedDBData + IsNone + + RoaringU32Key + 'static { } @@ -467,8 +469,8 @@ where /// the implementation need not attempt to cache the return value. fn approximate_byte_size(&self) -> usize; - /// Statistics of the Bloom filter used by [Cursor::seek_key_exact]. - /// The Bloom filter (kept in memory) is used there to quickly check + /// Statistics of the batch key filter used by [Cursor::seek_key_exact]. + /// The filter (kept in memory) is used there to quickly check /// whether a key might be present in the batch, before doing a /// binary tree lookup within the batch to be exactly sure. /// The statistics include for example the size in bytes and the hit rate. @@ -493,8 +495,8 @@ where self.len() == 0 } - /// A method that returns either true (possibly in the batch) or false - /// (definitely not in the batch). + /// A hash-based filter probe that returns either true (possibly in the + /// batch) or false (definitely not in the batch). fn maybe_contains_key(&self, _hash: u64) -> bool { true } diff --git a/crates/dbsp/src/trace/ord/file/indexed_wset_batch.rs b/crates/dbsp/src/trace/ord/file/indexed_wset_batch.rs index 15be0616b87..fd3721cba37 100644 --- a/crates/dbsp/src/trace/ord/file/indexed_wset_batch.rs +++ b/crates/dbsp/src/trace/ord/file/indexed_wset_batch.rs @@ -750,8 +750,7 @@ where } fn seek_key_exact(&mut self, key: &K, hash: Option) -> bool { - let hash = hash.unwrap_or_else(|| key.default_hash()); - if !self.wset.maybe_contains_key(hash) { + if !self.wset.file.probe_key_filter(key, hash).may_contain() { return false; } self.seek_key(key); diff --git a/crates/dbsp/src/trace/ord/file/key_batch.rs b/crates/dbsp/src/trace/ord/file/key_batch.rs index 646bb11f64f..5ea2cb57dc6 100644 --- a/crates/dbsp/src/trace/ord/file/key_batch.rs +++ b/crates/dbsp/src/trace/ord/file/key_batch.rs @@ -535,8 +535,7 @@ where } fn seek_key_exact(&mut self, key: &K, hash: Option) -> bool { - let hash = hash.unwrap_or_else(|| key.default_hash()); - if !self.batch.maybe_contains_key(hash) { + if !self.batch.file.probe_key_filter(key, hash).may_contain() { return false; } self.seek_key(key); diff --git a/crates/dbsp/src/trace/ord/file/val_batch.rs b/crates/dbsp/src/trace/ord/file/val_batch.rs index 11e5b1652b4..25b24410c5d 100644 --- a/crates/dbsp/src/trace/ord/file/val_batch.rs +++ b/crates/dbsp/src/trace/ord/file/val_batch.rs @@ -572,8 +572,7 @@ where } fn seek_key_exact(&mut self, key: &K, hash: Option) -> bool { - let hash = hash.unwrap_or_else(|| key.default_hash()); - if !self.batch.maybe_contains_key(hash) { + if !self.batch.file.probe_key_filter(key, hash).may_contain() { return false; } self.seek_key(key); diff --git a/crates/dbsp/src/trace/ord/file/wset_batch.rs b/crates/dbsp/src/trace/ord/file/wset_batch.rs index fc5ba5bcb11..5ee92ece3bb 100644 --- a/crates/dbsp/src/trace/ord/file/wset_batch.rs +++ b/crates/dbsp/src/trace/ord/file/wset_batch.rs @@ -669,8 +669,7 @@ where } fn seek_key_exact(&mut self, key: &K, hash: Option) -> bool { - let hash = hash.unwrap_or_else(|| key.default_hash()); - if !self.wset.maybe_contains_key(hash) { + if !self.wset.file.probe_key_filter(key, hash).may_contain() { return false; } self.seek_key(key); diff --git a/crates/dbsp/src/trace/test.rs b/crates/dbsp/src/trace/test.rs index 7abe4c1c623..4bd56689737 100644 --- a/crates/dbsp/src/trace/test.rs +++ b/crates/dbsp/src/trace/test.rs @@ -12,13 +12,14 @@ use size_of::SizeOf; use crate::{ DynZWeight, Runtime, ZWeight, algebra::{ - IndexedZSet, OrdIndexedZSet, OrdIndexedZSetFactories, OrdZSet, OrdZSetFactories, ZBatch, - ZSet, + AddByRef, IndexedZSet, OrdIndexedZSet, OrdIndexedZSetFactories, OrdZSet, OrdZSetFactories, + ZBatch, ZSet, }, circuit::{CircuitConfig, mkconfig}, dynamic::{DowncastTrait, DynData, DynUnit, DynWeightedPairs, Erase, LeanVec, pair::DynPair}, + storage::{buffer_cache::CacheStats, file::BatchKeyFilterKind}, trace::{ - Batch, BatchReader, BatchReaderFactories, Builder, FileIndexedWSetFactories, + Batch, BatchReader, BatchReaderFactories, Builder, Cursor, FileIndexedWSetFactories, FileWSetFactories, GroupFilter, Spine, Trace, cursor::CursorPair, ord::{ @@ -31,7 +32,7 @@ use crate::{ assert_trace_eq, test_batch_sampling, test_trace_sampling, }, }, - utils::{Tup2, Tup3, Tup4}, + utils::{Tup1, Tup2, Tup3, Tup4}, }; use super::Filter; @@ -747,6 +748,13 @@ where F: FnOnce() + Clone + Send + 'static, { let (_temp_dir, config) = mkconfig(); + run_in_circuit_with_storage_config(config, f); +} + +fn run_in_circuit_with_storage_config(config: CircuitConfig, f: F) +where + F: FnOnce() + Clone + Send + 'static, +{ let count = Arc::new(AtomicUsize::new(0)); Runtime::init_circuit(config, { let count = count.clone(); @@ -762,6 +770,102 @@ where assert_eq!(count.load(Ordering::Relaxed), 1); } +fn total_cache_accesses(stats: CacheStats) -> u64 { + stats + .0 + .iter() + .map(|(_, accesses)| accesses.iter().map(|(_, counts)| counts.count).sum::()) + .sum() +} + +fn build_file_wset_u32(keys: &[u32]) -> FileWSet { + let factories = >::new::(); + let mut builder = + as Batch>::Builder::with_capacity(&factories, keys.len(), 0); + + for key in keys { + let weight: ZWeight = 1; + builder.push_time_diff(&(), weight.erase()); + builder.push_key(key.erase()); + } + + builder.done() +} + +fn build_file_wset_tup1_i32(keys: &[i32]) -> FileWSet { + let factories = >::new::, (), ZWeight>(); + let mut builder = + as Batch>::Builder::with_capacity(&factories, keys.len(), 0); + + for key in keys { + let weight: ZWeight = 1; + builder.push_time_diff(&(), weight.erase()); + builder.push_key(Tup1(*key).erase()); + } + + builder.done() +} + +#[test] +fn test_file_wset_roaring_seek_key_exact_skips_absent_reads() { + let (_temp_dir, mut config) = mkconfig(); + config.dev_tweaks.batch_key_filter = BatchKeyFilterKind::RoaringU32; + + run_in_circuit_with_storage_config(config, move || { + let batch = build_file_wset_u32(&[1, 3, 7]); + let mut cursor = batch.cursor(); + let before = total_cache_accesses(batch.cache_stats()); + + let missing = 2u32; + assert!(!cursor.seek_key_exact(missing.erase(), None)); + assert_eq!(total_cache_accesses(batch.cache_stats()), before); + + let present = 3u32; + assert!(cursor.seek_key_exact(present.erase(), None)); + }); +} + +#[test] +fn test_file_wset_tup1_i32_roaring_seek_key_exact_skips_absent_reads() { + let (_temp_dir, mut config) = mkconfig(); + config.dev_tweaks.batch_key_filter = BatchKeyFilterKind::RoaringU32; + + run_in_circuit_with_storage_config(config, move || { + let batch = build_file_wset_tup1_i32(&[-7, 1, 3]); + let mut cursor = batch.cursor(); + let before = total_cache_accesses(batch.cache_stats()); + + let missing = Tup1(2i32); + assert!(!cursor.seek_key_exact(missing.erase(), None)); + assert_eq!(total_cache_accesses(batch.cache_stats()), before); + + let present = Tup1(3i32); + assert!(cursor.seek_key_exact(present.erase(), None)); + }); +} + +#[test] +fn test_file_wset_roaring_filter_rebuilt_after_merge() { + let (_temp_dir, mut config) = mkconfig(); + config.dev_tweaks.batch_key_filter = BatchKeyFilterKind::RoaringU32; + + run_in_circuit_with_storage_config(config, move || { + let lhs = build_file_wset_u32(&[1, 5]); + let rhs = build_file_wset_u32(&[3, 7]); + let merged = lhs.add_by_ref(&rhs); + + let mut cursor = merged.cursor(); + let before = total_cache_accesses(merged.cache_stats()); + + let missing = 4u32; + assert!(!cursor.seek_key_exact(missing.erase(), None)); + assert_eq!(total_cache_accesses(merged.cache_stats()), before); + + let present = 7u32; + assert!(cursor.seek_key_exact(present.erase(), None)); + }); +} + proptest! { #![proptest_config(ProptestConfig::with_cases(1000))] diff --git a/crates/dbsp/src/utils.rs b/crates/dbsp/src/utils.rs index dbb79a5c8f8..5e9d35e58f9 100644 --- a/crates/dbsp/src/utils.rs +++ b/crates/dbsp/src/utils.rs @@ -5,6 +5,7 @@ pub(crate) mod binary_heap; mod consolidation; mod graph; mod is_none; +mod roaring_u32_key; mod sort; pub mod tuple; @@ -30,6 +31,7 @@ pub use consolidation::{ pub use graph::components; pub use is_none::IsNone; +pub use roaring_u32_key::RoaringU32Key; #[allow(unused_imports)] pub use dot::{DotEdgeAttributes, DotNodeAttributes}; diff --git a/crates/dbsp/src/utils/roaring_u32_key.rs b/crates/dbsp/src/utils/roaring_u32_key.rs new file mode 100644 index 00000000000..248c27f5961 --- /dev/null +++ b/crates/dbsp/src/utils/roaring_u32_key.rs @@ -0,0 +1,162 @@ +//! Trait for key types that can be mapped exactly into a `u32` domain for +//! roaring-bitmap membership tests. + +use crate::dynamic::{BSet, LeanVec}; +use crate::time::UnitTimestamp; +use std::collections::BTreeMap; +use std::rc::Rc; +use std::sync::Arc; +use uuid::Uuid; + +pub trait RoaringU32Key { + #[inline] + fn as_roaring_u32(&self) -> Option { + None + } + + #[inline] + fn can_use_roaring_u32() -> bool + where + Self: Sized, + { + false + } +} + +#[macro_export] +macro_rules! never_roaring_u32 { + ($($ty:ty),* $(,)?) => { + $( + impl $crate::utils::RoaringU32Key for $ty {} + )* + }; +} + +never_roaring_u32!( + (), + bool, + char, + i8, + i16, + i64, + i128, + u8, + u16, + u64, + u128, + f32, + f64, + usize, + isize, + String, + UnitTimestamp, + Uuid +); + +impl RoaringU32Key for u32 { + #[inline] + fn as_roaring_u32(&self) -> Option { + Some(*self) + } + + #[inline] + fn can_use_roaring_u32() -> bool + where + Self: Sized, + { + true + } +} + +impl RoaringU32Key for i32 { + #[inline] + fn as_roaring_u32(&self) -> Option { + // Preserve the full `i32` bit pattern, which keeps membership exact. + Some(*self as u32) + } + + #[inline] + fn can_use_roaring_u32() -> bool + where + Self: Sized, + { + true + } +} + +impl RoaringU32Key for Option {} + +#[macro_export] +macro_rules! never_roaring_u32_1 { + ($($wrapper:ident),* $(,)?) => { + $( + impl $crate::utils::RoaringU32Key for $wrapper {} + )* + }; +} + +never_roaring_u32_1!(Vec, LeanVec, BSet); + +#[macro_export] +macro_rules! delegate_roaring_u32 { + ($($wrapper:ident),* $(,)?) => { + $( + impl $crate::utils::RoaringU32Key for $wrapper { + #[inline] + fn as_roaring_u32(&self) -> Option { + self.as_ref().as_roaring_u32() + } + + #[inline] + fn can_use_roaring_u32() -> bool + where + Self: Sized, + { + T::can_use_roaring_u32() + } + } + )* + }; +} + +delegate_roaring_u32!(Box, Rc, Arc); + +#[macro_export] +macro_rules! never_roaring_u32_tuples { + ($($name:ident),+) => { + impl<$($name),+> RoaringU32Key for ($($name,)+) {} + }; +} + +never_roaring_u32_tuples!(A); +never_roaring_u32_tuples!(A, B); +never_roaring_u32_tuples!(A, B, C); +never_roaring_u32_tuples!(A, B, C, D); +never_roaring_u32_tuples!(A, B, C, D, E); +never_roaring_u32_tuples!(A, B, C, D, E, F); + +impl RoaringU32Key for BTreeMap {} + +#[cfg(test)] +mod test { + use super::RoaringU32Key; + use crate::utils::Tup1; + + #[test] + fn supported_roaring_u32_keys() { + assert!(u32::can_use_roaring_u32()); + assert_eq!(7u32.as_roaring_u32(), Some(7)); + + assert!(i32::can_use_roaring_u32()); + assert_eq!((-7i32).as_roaring_u32(), Some((-7i32) as u32)); + + assert!(Tup1::::can_use_roaring_u32()); + assert_eq!(Tup1(-7i32).as_roaring_u32(), Some((-7i32) as u32)); + } + + #[test] + fn unsupported_roaring_u32_keys() { + assert!(!String::can_use_roaring_u32()); + assert_eq!("feldera".to_string().as_roaring_u32(), None); + } +} diff --git a/crates/feldera-macros/src/lib.rs b/crates/feldera-macros/src/lib.rs index 0461482f405..9dd711912d1 100644 --- a/crates/feldera-macros/src/lib.rs +++ b/crates/feldera-macros/src/lib.rs @@ -1,4 +1,4 @@ -//! Procedural macros for Feldera tuple types and `IsNone`. +//! Procedural macros for Feldera tuple types and utility traits. //! //! The `declare_tuple!` macro decides which layout to use based on tuple size //! and the active storage format rules. @@ -51,6 +51,8 @@ pub fn derive_not_none(item: TokenStream) -> TokenStream { inner } } + + impl #impl_generics ::dbsp::utils::RoaringU32Key for #ident #ty_generics #where_clause {} }; TokenStream::from(expanded) diff --git a/crates/feldera-macros/src/tuples.rs b/crates/feldera-macros/src/tuples.rs index 99929c60bab..0afb5e2f0f3 100644 --- a/crates/feldera-macros/src/tuples.rs +++ b/crates/feldera-macros/src/tuples.rs @@ -247,6 +247,30 @@ pub(super) fn declare_tuple_impl(tuple: TupleDef) -> TokenStream2 { } }; + let roaring_u32_key_impl = if num_elements == 1 { + let inner = &elements[0]; + quote! { + impl<#inner: ::dbsp::utils::RoaringU32Key> ::dbsp::utils::RoaringU32Key for #name<#inner> { + #[inline] + fn as_roaring_u32(&self) -> Option { + self.0.as_roaring_u32() + } + + #[inline] + fn can_use_roaring_u32() -> bool + where + Self: Sized, + { + <#inner as ::dbsp::utils::RoaringU32Key>::can_use_roaring_u32() + } + } + } + } else { + quote! { + impl<#(#generics),*> ::dbsp::utils::RoaringU32Key for #name<#(#generics),*> {} + } + }; + let sparse_get_methods = fields .iter() .enumerate() @@ -972,6 +996,7 @@ pub(super) fn declare_tuple_impl(tuple: TupleDef) -> TokenStream2 { #copy_impl #checkpoint_impl #not_an_option + #roaring_u32_key_impl }); expanded diff --git a/crates/nexmark/src/queries/q9.rs b/crates/nexmark/src/queries/q9.rs index 9aa8b12abfc..e3a0bfe382d 100644 --- a/crates/nexmark/src/queries/q9.rs +++ b/crates/nexmark/src/queries/q9.rs @@ -43,6 +43,7 @@ pub struct Q9Output( ); dbsp::never_none!(Q9Output); +dbsp::never_roaring_u32!(Q9Output); type Q9Stream = Stream>; diff --git a/crates/storage-test-compat/src/bin/golden-writer.rs b/crates/storage-test-compat/src/bin/golden-writer.rs index ab3b6f1368b..773ce6d9ca6 100644 --- a/crates/storage-test-compat/src/bin/golden-writer.rs +++ b/crates/storage-test-compat/src/bin/golden-writer.rs @@ -111,7 +111,7 @@ where } let tmp_path = writer.path().clone(); - let (_file_handle, _bloom_filter) = writer.close()?; + let (_file_handle, _key_filter) = writer.close()?; let content = storage_backend.read(&tmp_path)?; storage_backend.write(&output_storage_path, (*content).clone())?; storage_backend.delete(&tmp_path)?; diff --git a/crates/storage/src/error.rs b/crates/storage/src/error.rs index f45874795fe..c6bffca985c 100644 --- a/crates/storage/src/error.rs +++ b/crates/storage/src/error.rs @@ -37,8 +37,8 @@ pub enum StorageError { /// Cannot perform operation because storage is not enabled. #[error("Cannot perform operation because storage is not enabled.")] StorageDisabled, - /// Error while creating a bloom filter. - #[error("Failed to serialize/deserialize bloom filter.")] + /// Error while creating a batch key filter. + #[error("Failed to serialize/deserialize batch key filter.")] BloomFilter, /// Path is not valid in storage.