diff --git a/crates/dbsp/Cargo.toml b/crates/dbsp/Cargo.toml index 71e28e6853e..76aa287a552 100644 --- a/crates/dbsp/Cargo.toml +++ b/crates/dbsp/Cargo.toml @@ -137,6 +137,14 @@ harness = false name = "cursor_list" harness = false +[[bench]] +name = "list_merger" +harness = false + +[[bench]] +name = "input_map_ingest" +harness = false + [[example]] name = "orgchart" diff --git a/crates/dbsp/benches/input_map_ingest.rs b/crates/dbsp/benches/input_map_ingest.rs new file mode 100644 index 00000000000..f4e0ca456ee --- /dev/null +++ b/crates/dbsp/benches/input_map_ingest.rs @@ -0,0 +1,163 @@ +use anyhow::{Context, Result, anyhow}; +use crossbeam::channel::{Sender, bounded}; +use dbsp::{ + Runtime, + mimalloc::MiMalloc, + operator::{MapHandle, Update}, + utils::{Tup2, Tup5}, +}; +use rand::{Rng, RngCore, SeedableRng}; +use rand_chacha::ChaCha8Rng; +use std::{ + thread::{self, JoinHandle}, + time::{Duration, Instant}, +}; + +const BATCH_SIZE: usize = 10_000; +const PROGRESS_EVERY_BATCHES: usize = 100; +const WORKERS: usize = 8; +const DATAGEN_THREADS: usize = 8; +const TOTAL_RECORDS: u64 = 200_000_000; +const KEY_SPACE: u64 = 100_000_000; +const SEED: u64 = 0; +const MAX_IN_FLIGHT_BATCHES: usize = 64; + +type Value = Tup5; +type BatchRecord = Tup2>; +type Batch = Vec; + +#[global_allocator] +static ALLOC: MiMalloc = MiMalloc; + +fn main() -> Result<()> { + validate_constants()?; + + let total_batches = (TOTAL_RECORDS / BATCH_SIZE as u64) as usize; + println!( + "Running input_map_ingest benchmark with {} workers, {} datagen threads, {} records ({} batches of {})", + WORKERS, DATAGEN_THREADS, TOTAL_RECORDS, total_batches, BATCH_SIZE + ); + + let (mut dbsp, mut input_handle) = Runtime::init_circuit(WORKERS, |circuit| { + let (stream, handle): (_, MapHandle) = + circuit.add_input_map::(|v, u| *v = *u); + + stream.inspect(|_| {}); + Ok(handle) + }) + .context("failed to initialize DBSP circuit")?; + + let (sender, receiver) = bounded::(MAX_IN_FLIGHT_BATCHES); + let datagen_handles = spawn_datagen_threads(total_batches, sender); + + let mut total_step_duration = Duration::ZERO; + let benchmark_start = Instant::now(); + for batch_idx in 0..total_batches { + let mut batch = receiver + .recv() + .map_err(|_| anyhow!("datagen channel closed before sending all batches"))?; + input_handle.append(&mut batch); + + let step_start = Instant::now(); + dbsp.transaction().context("DBSP transaction/step failed")?; + total_step_duration += step_start.elapsed(); + + let processed_batches = batch_idx + 1; + if processed_batches % PROGRESS_EVERY_BATCHES == 0 || processed_batches == total_batches { + let processed_records = processed_batches as u64 * BATCH_SIZE as u64; + let elapsed = benchmark_start.elapsed().as_secs_f64(); + let percent = (processed_batches as f64 / total_batches as f64) * 100.0; + println!( + "progress: {processed_batches}/{total_batches} batches ({processed_records}/{TOTAL_RECORDS} records, {percent:.2}%), wall_clock_secs={elapsed:.3}" + ); + } + } + + for handle in datagen_handles { + handle + .join() + .map_err(|_| anyhow!("datagen thread panicked"))? + .context("datagen thread failed")?; + } + + dbsp.kill() + .map_err(|_| anyhow!("failed to stop DBSP runtime"))?; + + let wall_clock = benchmark_start.elapsed(); + let step_seconds = total_step_duration.as_secs_f64(); + let throughput = TOTAL_RECORDS as f64 / step_seconds; + + println!("total_step_duration_secs={step_seconds}"); + println!("throughput_records_per_sec={throughput}"); + println!("wall_clock_secs={}", wall_clock.as_secs_f64()); + + Ok(()) +} + +fn validate_constants() -> Result<()> { + if WORKERS == 0 { + return Err(anyhow!("WORKERS must be > 0")); + } + if DATAGEN_THREADS == 0 { + return Err(anyhow!("DATAGEN_THREADS must be > 0")); + } + if KEY_SPACE == 0 { + return Err(anyhow!("KEY_SPACE must be > 0")); + } + if TOTAL_RECORDS == 0 || !TOTAL_RECORDS.is_multiple_of(BATCH_SIZE as u64) { + return Err(anyhow!( + "TOTAL_RECORDS must be > 0 and divisible by {BATCH_SIZE}" + )); + } + if MAX_IN_FLIGHT_BATCHES == 0 { + return Err(anyhow!("MAX_IN_FLIGHT_BATCHES must be > 0")); + } + Ok(()) +} + +fn spawn_datagen_threads( + total_batches: usize, + sender: Sender, +) -> Vec>> { + let mut handles = Vec::with_capacity(DATAGEN_THREADS); + let batches_per_thread = total_batches / DATAGEN_THREADS; + let extra_batches = total_batches % DATAGEN_THREADS; + + for thread_id in 0..DATAGEN_THREADS { + let thread_sender = sender.clone(); + let batches_for_thread = batches_per_thread + usize::from(thread_id < extra_batches); + let thread_seed = SEED.wrapping_add((thread_id as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15)); + + handles.push(thread::spawn(move || { + datagen_thread(thread_sender, batches_for_thread, KEY_SPACE, thread_seed) + })); + } + + drop(sender); + handles +} + +fn datagen_thread(sender: Sender, batches: usize, key_space: u64, seed: u64) -> Result<()> { + let mut rng = ChaCha8Rng::seed_from_u64(seed); + + for _ in 0..batches { + let mut batch = Vec::with_capacity(BATCH_SIZE); + for _ in 0..BATCH_SIZE { + let key = rng.gen_range(0..key_space); + let value = Tup5( + rng.next_u64(), + rng.next_u64(), + rng.next_u64(), + rng.next_u64(), + rng.next_u64(), + ); + batch.push(Tup2(key, Update::Insert(value))); + } + + sender + .send(batch) + .map_err(|_| anyhow!("receiver dropped while datagen was sending batches"))?; + } + + Ok(()) +} diff --git a/crates/dbsp/benches/list_merger.rs b/crates/dbsp/benches/list_merger.rs new file mode 100644 index 00000000000..0aeb4772418 --- /dev/null +++ b/crates/dbsp/benches/list_merger.rs @@ -0,0 +1,208 @@ +//! Benchmark for `ListMerger` over `OrdIndexedZSet` batches. +//! +//! Generates 100M `(key, value, +1)` records split across `N` input batches and +//! merges them into one batch using `ListMerger`. +//! +//! Runs both in-memory and file-backed modes. +//! +//! Run with: `cargo bench -p dbsp --bench list_merger` + +use dbsp::circuit::{CircuitConfig, CircuitStorageConfig, DevTweaks, Layout, Mode}; +use dbsp::{ + OrdIndexedZSet, Runtime, ZWeight, + trace::{Batch as DynBatch, BatchLocation, BatchReader as DynBatchReader, Builder, ListMerger}, + typed_batch::BatchReader as TypedBatchReader, + utils::{Tup2, Tup10}, +}; +use feldera_types::config::{StorageCacheConfig, StorageConfig, StorageOptions}; +use rand::{Rng, RngCore, SeedableRng}; +use rand_xoshiro::Xoshiro256StarStar; +use std::hint::black_box; +use std::sync::{Arc, Mutex}; +use std::time::Instant; +use tempfile::tempdir; + +const SEED: [u8; 32] = [ + 0x7f, 0xc3, 0x59, 0x18, 0x45, 0x19, 0xc0, 0xaa, 0xd2, 0xec, 0x31, 0x26, 0xbb, 0x74, 0x2f, 0x8b, + 0x11, 0x7d, 0x0c, 0xe4, 0x64, 0xbf, 0x72, 0x17, 0x46, 0x28, 0x46, 0x42, 0xb2, 0x4b, 0x72, 0x18, +]; + +const NUM_RECORDS: usize = 20_000_000; +const BATCH_COUNTS: &[usize] = &[1, 8, 32, 64]; +const KEY_RANGES: &[u64] = &[100, 100_000_000]; + +#[derive(Clone)] +struct BenchResult { + num_batches: usize, + key_range: u64, + m_records_per_sec: f64, +} + +type Value = Tup10; + +fn records_in_batch(batch_index: usize, num_batches: usize) -> usize { + let base = NUM_RECORDS / num_batches; + let remainder = NUM_RECORDS % num_batches; + base + usize::from(batch_index < remainder) +} + +fn generate_batches(num_batches: usize, key_range: u64) -> Vec> { + let mut rng = Xoshiro256StarStar::from_seed(SEED); + + (0..num_batches) + .map(|batch_index| { + let n = records_in_batch(batch_index, num_batches); + let tuples: Vec, ZWeight>> = (0..n) + .map(|_| { + let key = rng.gen_range(0..key_range); + let value = Tup10( + rng.next_u64(), + rng.next_u64(), + rng.next_u64(), + rng.next_u64(), + rng.next_u64(), + rng.next_u64(), + rng.next_u64(), + rng.next_u64(), + rng.next_u64(), + rng.next_u64(), + ); + Tup2(Tup2(key, value), 1) + }) + .collect(); + OrdIndexedZSet::from_tuples((), tuples) + }) + .collect() +} + +fn merge_with_list_merger( + batches: Vec>, +) -> ( + as TypedBatchReader>::Inner, + usize, + BatchLocation, +) { + type InnerBatch = as TypedBatchReader>::Inner; + let mut inner_batches: Vec = batches + .into_iter() + .map(|batch| batch.into_inner()) + .collect(); + let factories = inner_batches[0].factories(); + let builder = + ::Builder::for_merge(&factories, inner_batches.iter(), None); + + let output: InnerBatch = ListMerger::merge( + &factories, + builder, + inner_batches + .iter_mut() + .map(|batch| batch.consuming_cursor(None, None)) + .collect(), + ); + let output_len = output.len(); + let actual_location = output.location(); + (output, output_len, actual_location) +} + +fn bench(generate_on_storage: bool) { + let temp = tempdir().expect("failed to create temp directory"); + let config = CircuitConfig { + layout: Layout::new_solo(1), + mode: Mode::Ephemeral, + pin_cpus: Vec::new(), + storage: Some( + CircuitStorageConfig::for_config( + StorageConfig { + path: temp.path().to_string_lossy().into_owned(), + cache: StorageCacheConfig::default(), + }, + StorageOptions { + min_storage_bytes: Some(0), + min_step_storage_bytes: if generate_on_storage { Some(0) } else { None }, + ..StorageOptions::default() + }, + ) + .expect("failed to configure POSIX storage"), + ), + dev_tweaks: DevTweaks::default(), + }; + + let results: Arc>> = Arc::new(Mutex::new(Vec::new())); + let results_clone = Arc::clone(&results); + + let handle = Runtime::run(config, move |_parker| { + for &num_batches in BATCH_COUNTS { + assert!(num_batches > 0 && num_batches <= 64); + for &key_range in KEY_RANGES { + println!( + "\nMerging {NUM_RECORDS} records across {num_batches} batches (key_range={key_range})..." + ); + let batches = generate_batches(num_batches, key_range); + let storage_batches = batches + .iter() + .filter(|batch| batch.location() == BatchLocation::Storage) + .count(); + println!(" Input batches on storage: {storage_batches}/{num_batches}"); + + let start = Instant::now(); + let (output, output_len, actual_location) = merge_with_list_merger(batches); + let elapsed = start.elapsed(); + black_box(output); + black_box(output_len); + + let m_records_per_sec = NUM_RECORDS as f64 / elapsed.as_secs_f64() / 1_000_000.0; + println!( + " merged in {:?} ({:.1} M input records/s), output_location={:?}", + elapsed, + m_records_per_sec, + actual_location + ); + + results_clone.lock().unwrap().push(BenchResult { + num_batches, + key_range, + m_records_per_sec, + }); + } + } + }) + .expect("failed to start DBSP runtime"); + + handle.kill().expect("failed to kill runtime"); + + let results = results.lock().unwrap().clone(); + let storage_label = if generate_on_storage { + "file-backed (min_step_storage_bytes=Some(0))" + } else { + "in-memory (min_step_storage_bytes=None)" + }; + println!("\nSummary ({storage_label}) – M input records/s"); + println!("\n┌─────────────┬────────────────────┬──────────────────────────┐"); + println!("│ # Batches │ key range: 100 │ key range: 100000000 │"); + println!("├─────────────┼────────────────────┼──────────────────────────┤"); + for &num_batches in BATCH_COUNTS { + let throughput_100 = results + .iter() + .find(|r| r.num_batches == num_batches && r.key_range == 100) + .map(|r| r.m_records_per_sec) + .unwrap_or(0.0); + let throughput_100m = results + .iter() + .find(|r| r.num_batches == num_batches && r.key_range == 100_000_000) + .map(|r| r.m_records_per_sec) + .unwrap_or(0.0); + println!( + "│ {:>11} │ {:>18.1} │ {:>24.1} │", + num_batches, throughput_100, throughput_100m + ); + } + println!("└─────────────┴────────────────────┴──────────────────────────┘"); +} + +fn main() { + println!("Running ListMerger benchmark with in-memory batches..."); + bench(false); + + println!("\nRunning ListMerger benchmark with file-backed batches..."); + bench(true); +} diff --git a/crates/dbsp/src/trace/spine_async/list_merger.rs b/crates/dbsp/src/trace/spine_async/list_merger.rs index 46a1e058262..7e5aaf6093f 100644 --- a/crates/dbsp/src/trace/spine_async/list_merger.rs +++ b/crates/dbsp/src/trace/spine_async/list_merger.rs @@ -1,4 +1,4 @@ -use std::{cmp::Ordering, sync::Arc}; +use std::sync::Arc; use ouroboros::self_referencing; @@ -8,8 +8,9 @@ use crate::{ time::Timestamp, trace::{ Batch, BatchFactories, BatchReaderFactories, Builder, Filter, GroupFilter, MergeCursor, - SpineSnapshot, spine_async::index_set::IndexSet, + SpineSnapshot, }, + utils::binary_heap::BinaryHeap, }; pub struct ArcListMerger(ArcMergerInner) @@ -81,10 +82,25 @@ where B: Batch, { cursors: Vec, + + /// Indexes of cursors that hold the current minimum key and their positions in the key heap. + current_key: Vec<(usize, usize)>, + + /// A binary heap containing the indexes of cursors sorted by key. + key_heap: Vec, + + /// Indexes of cursors that hold the current minimum value and their positions in the value heap. + current_val: Vec<(usize, usize)>, + + /// A binary heap containing the indexes of cursors sorted by value. + val_heap: Vec, + any_values: bool, has_mut: Vec, tmp_weight: Box, time_diffs: Option, B::R>>>, + + scratch: Vec, } impl ListMerger @@ -107,14 +123,138 @@ where let time_diffs = factories.time_diffs_factory().map(|f| f.default_box()); let has_mut = cursors.iter().map(|c| c.has_mut()).collect(); + let num_cursors = cursors.len(); - ListMerger { + let mut merger = ListMerger { cursors, + current_key: Vec::new(), + key_heap: Vec::new(), + current_val: Vec::new(), + val_heap: Vec::new(), any_values: false, has_mut, tmp_weight: factories.weight_factory().default_box(), time_diffs, + scratch: Vec::with_capacity(num_cursors), + }; + + merger.init_key_heap(); + + merger + } + + /// Called once when initializing the merger to build the initial key heap. + fn init_key_heap(&mut self) { + self.key_heap.clear(); + + let cmp = |a: &usize, b: &usize| self.cursors[*b].key().cmp(self.cursors[*a].key()); + for (index, cursor) in self.cursors.iter().enumerate() { + if cursor.key_valid() { + self.key_heap.push(index); + } + } + + let heap = BinaryHeap::from_vec(std::mem::take(&mut self.key_heap), cmp); + self.key_heap = heap.into_vec(); + self.update_key_heap(); + } + + /// Adjust the ordering of elements in the key heap after advancing cursors in `current_key`; update `current_key` list. + /// + /// Assumes that only the cursors in `current_key` were advanced forward; other cursors must be in the same position + /// as when the heap was last updated. + /// + /// Update the position of cursors in `current_key` by either sifting them down or removing them from the heap if the + /// cursor is exhausted. Determines the new set of cursors with minimum keys and updates `current_key` accordingly. + fn update_key_heap(&mut self) { + let cmp = |a: &usize, b: &usize| self.cursors[*b].key().cmp(self.cursors[*a].key()); + + let mut heap = + unsafe { BinaryHeap::from_vec_unchecked(std::mem::take(&mut self.key_heap), cmp) }; + + // This is a subtle part: we iterate over the indexes previously returned by peek_all in reverse order, which + // guarantees the modifying or removing the elements does not affect the positions of the remaining min elements, + // which guarantees that this loop leaves the heap in a valid state. + for (index, pos) in self.current_key.iter().rev() { + if unsafe { self.cursors.get_unchecked(*index).key_valid() } { + unsafe { heap.sift_down(*pos) }; + } else { + heap.remove(*pos); + } } + + // Compute new current_key. + self.current_key.clear(); + heap.peek_all( + |pos, &index| { + self.current_key.push((index, pos)); + }, + &mut self.scratch, + ); + + self.key_heap = heap.into_vec(); + + self.init_val_heap(); + } + + /// Called every time the current set of current_keys is updated to initialize the value heap. + fn init_val_heap(&mut self) { + self.val_heap.clear(); + + let cmp = |a: &usize, b: &usize| self.cursors[*b].val().cmp(self.cursors[*a].val()); + + for &(index, _pos) in self.current_key.iter() { + debug_assert!(self.cursors[index].key_valid()); + // TODO: can we debug_assert cursors[index].val_valid() here instead? + // Well-behaved cursors should not expose keys without values. + if self.cursors[index].val_valid() { + self.val_heap.push(index); + } + } + + let heap = BinaryHeap::from_vec(std::mem::take(&mut self.val_heap), cmp); + + self.current_val.clear(); + heap.peek_all( + |pos, &index| { + self.current_val.push((index, pos)); + }, + &mut self.scratch, + ); + + self.val_heap = heap.into_vec(); + } + + /// Adjust the ordering of elements in the value heap after advancing cursors in `current_val`; update `current_val` list. + /// + /// Assumes that only the cursors in `current_val` were advanced forward; other cursors must be in the same position + /// as when the heap was last updated. + /// + /// Update the position of cursors in `current_val` by either sifting them down or removing them from the heap if the + /// cursor is exhausted. Determines the new set of cursors with minimum values and updates `current_val` accordingly. + fn update_val_heap(&mut self) { + let cmp = |a: &usize, b: &usize| self.cursors[*b].val().cmp(self.cursors[*a].val()); + + let mut heap = + unsafe { BinaryHeap::from_vec_unchecked(std::mem::take(&mut self.val_heap), cmp) }; + + for (index, pos) in self.current_val.iter().rev() { + if unsafe { self.cursors.get_unchecked(*index).val_valid() } { + unsafe { heap.sift_down(*pos) }; + } else { + heap.remove(*pos); + } + } + + self.current_val.clear(); + heap.peek_all( + |pos, &index| { + self.current_val.push((index, pos)); + }, + &mut self.scratch, + ); + + self.val_heap = heap.into_vec(); } /// Perform `fuel` amount of work. @@ -122,15 +262,6 @@ where /// When the function returns and fuel > 0, the batches should be guaranteed to be fully merged. pub fn work(&mut self, builder: &mut B::Builder, frontier: &B::Time, fuel: &mut isize) { assert!(self.cursors.len() <= 64); - let mut remaining_cursors = self - .cursors - .iter() - .enumerate() - .filter_map(|(index, cursor)| cursor.key_valid().then_some(index)) - .collect::(); - if remaining_cursors.is_empty() { - return; - } let advance_func = |t: &mut DynDataTyped| t.join_assign(frontier); @@ -141,49 +272,18 @@ where }; // As long as there are multiple cursors... - while remaining_cursors.is_long() && *fuel > 0 { - // Find the indexes of the cursors with minimum keys, among the - // remaining cursors. - let orig_min_keys = find_min_indexes( - remaining_cursors - .into_iter() - .map(|index| (index, self.cursors[index].key())), - ); - - // If we're resuming after stopping due to running out of fuel in a - // previous call, then we might have exhausted the values in some of - // the keys, so drop them. We still need them in `orig_min_keys` so - // we can advance the key for all of them later. - let mut min_keys = if self.any_values { - orig_min_keys - .into_iter() - .filter(|index| self.cursors[*index].val_valid()) - .collect::() - } else { - orig_min_keys - }; - + while self.key_heap.len() > 1 && *fuel > 0 { // As long as there is more than one cursor with minimum keys... - while min_keys.is_long() { - // ...Find the indexes of the cursors with minimum values, among - // those with minimum keys, and copy their time-diff pairs and - // value into the output. - let min_vals = find_min_indexes( - min_keys - .into_iter() - .map(|index| (index, self.cursors[index].val())), - ); - self.any_values = - self.copy_times(builder, time_map_func, min_vals, fuel) || self.any_values; + while self.val_heap.len() > 1 { + self.any_values = self.copy_times(builder, time_map_func, fuel) || self.any_values; // Then go on to the next value in each cursor, dropping the keys // for which we've exhausted the values. - for index in min_vals { - self.cursors[index].step_val(); - if !self.cursors[index].val_valid() { - min_keys.remove(index); - } + for (index, _pos) in self.current_val.iter() { + self.cursors[*index].step_val(); } + self.update_val_heap(); + if *fuel <= 0 { return; } @@ -191,23 +291,27 @@ where // If there's exactly one cursor left with minimum key, copy its // values into the output. - if let Some(index) = min_keys.first() { + if let Some(&index) = self.val_heap.first() { + debug_assert_eq!(self.current_val.len(), 1); loop { self.any_values = - self.copy_times(builder, time_map_func, min_keys, fuel) || self.any_values; + self.copy_times(builder, time_map_func, fuel) || self.any_values; self.cursors[index].step_val(); - if *fuel <= 0 { - return; - } if !self.cursors[index].val_valid() { + self.val_heap.clear(); + self.current_val.clear(); break; } + + if *fuel <= 0 { + return; + } } } // If we wrote any values for these minimum keys, write the key. if self.any_values { - let index = orig_min_keys.first().unwrap(); + let index = self.current_key.first().unwrap().0; if self.has_mut[index] { builder.push_key_mut(self.cursors[index].key_mut()); } else { @@ -218,26 +322,22 @@ where // Advance each minimum-key cursor, dropping the cursors for which // we've exhausted the data. - for index in orig_min_keys { - self.cursors[index].step_key(); - if !self.cursors[index].key_valid() { - remaining_cursors.remove(index); - } + for (index, _pos) in self.current_key.iter() { + self.cursors[*index].step_key(); } + self.update_key_heap(); } // If there is a cursor left (there's either one or none), copy it // directly to the output. - if let Some(index) = remaining_cursors.first() { + if let Some(&(index, _pos)) = self.current_key.first() { while *fuel > 0 { - loop { + debug_assert_eq!(self.current_key.len(), 1); + debug_assert_eq!(self.current_val.len(), 1); + while self.cursors[index].val_valid() { self.any_values = - self.copy_times(builder, time_map_func, remaining_cursors, fuel) - || self.any_values; + self.copy_times(builder, time_map_func, fuel) || self.any_values; self.cursors[index].step_val(); - if !self.cursors[index].val_valid() { - break; - } if *fuel <= 0 { return; } @@ -256,6 +356,8 @@ where } self.cursors[index].step_key(); if !self.cursors[index].key_valid() { + self.current_key.clear(); + self.key_heap.clear(); break; } } @@ -266,7 +368,6 @@ where &mut self, builder: &mut B::Builder, map_func: Option<&dyn Fn(&mut DynDataTyped)>, - indexes: IndexSet, fuel: &mut isize, ) -> bool { // If this is a timed batch, we must consolidate the (time, weight) array; otherwise we @@ -274,8 +375,8 @@ where if let Some(time_diffs) = &mut self.time_diffs { if let Some(map_func) = map_func { time_diffs.clear(); - for i in indexes { - self.cursors[i].map_times(&mut |time, w| { + for (i, _pos) in self.current_val.iter() { + self.cursors[*i].map_times(&mut |time, w| { let mut time: B::Time = time.clone(); map_func(&mut time); @@ -289,10 +390,10 @@ where for (time, diff) in time_diffs.dyn_iter().map(|td| td.split()) { builder.push_time_diff(time, diff); } - } else if indexes.is_long() { + } else if self.current_val.len() > 1 { time_diffs.clear(); - for i in indexes { - self.cursors[i].map_times(&mut |time, w| { + for (i, _pos) in self.current_val.iter() { + self.cursors[*i].map_times(&mut |time, w| { time_diffs.push_refs((time, w)); }); } @@ -304,17 +405,17 @@ where builder.push_time_diff(time, diff); } } else { - debug_assert_eq!(indexes.len(), 1); - for i in indexes { - self.cursors[i].map_times(&mut |time, w| { + debug_assert_eq!(self.current_val.len(), 1); + for (i, _pos) in self.current_val.iter() { + self.cursors[*i].map_times(&mut |time, w| { builder.push_time_diff(time, w); }); } } } else { self.tmp_weight.set_zero(); - for i in indexes { - self.cursors[i].map_times(&mut |_time, weight| { + for (i, _pos) in self.current_val.iter() { + self.cursors[*i].map_times(&mut |_time, weight| { self.tmp_weight.add_assign(weight); }); } @@ -324,7 +425,7 @@ where builder.push_time_diff_mut(&mut B::Time::default(), &mut self.tmp_weight); } - let index = indexes.first().unwrap(); + let index = self.current_val.first().unwrap().0; if self.has_mut[index] { builder.push_val_mut(self.cursors[index].val_mut()); } else { @@ -335,28 +436,6 @@ where } } -fn find_min_indexes(mut iterator: impl Iterator) -> IndexSet -where - Item: Ord, -{ - let (min_index, mut min_value) = iterator.next().unwrap(); - let mut min_indexes = IndexSet::for_index(min_index); - - for (index, value) in iterator { - match value.cmp(&min_value) { - Ordering::Less => { - min_value = value; - min_indexes = IndexSet::for_index(index); - } - Ordering::Equal => { - min_indexes.add(index); - } - Ordering::Greater => (), - } - } - min_indexes -} - #[cfg(test)] mod test { use std::fmt::{Debug, Formatter}; diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/BaseRustCodeGenerator.java b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/BaseRustCodeGenerator.java index abb5c909940..6572b925db9 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/BaseRustCodeGenerator.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/backend/rust/BaseRustCodeGenerator.java @@ -100,7 +100,7 @@ public void addDependency(String crate) { static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; #[allow(non_upper_case_globals)] #[export_name = "malloc_conf"] - pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\\0";"""; + pub static malloc_conf: &[u8] = b"background_thread:true,metadata_thp:auto,dirty_decay_ms:30000,muzzy_decay_ms:30000\\0";"""; public static final String STANDARD_PREAMBLE = """ use dbsp::{