diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 0fc75043bf333..4b2b31febef2a 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -123,7 +123,6 @@ required-features = ["test_utils"] [[bench]] harness = false name = "aggregate_vectorized" -required-features = ["test_utils"] [[bench]] harness = false diff --git a/datafusion/physical-plan/benches/aggregate_vectorized.rs b/datafusion/physical-plan/benches/aggregate_vectorized.rs index 48ca76d80d2d3..488647d5f8315 100644 --- a/datafusion/physical-plan/benches/aggregate_vectorized.rs +++ b/datafusion/physical-plan/benches/aggregate_vectorized.rs @@ -21,7 +21,6 @@ use arrow::util::bench_util::{ create_primitive_array, create_string_view_array_with_len, create_string_view_array_with_max_len, }; -use arrow::util::test_util::seedable_rng; use arrow_schema::DataType; use criterion::measurement::WallTime; use criterion::{ @@ -30,7 +29,9 @@ use criterion::{ use datafusion_physical_plan::aggregates::group_values::multi_group_by::GroupColumn; use datafusion_physical_plan::aggregates::group_values::multi_group_by::bytes_view::ByteViewGroupValueBuilder; use datafusion_physical_plan::aggregates::group_values::multi_group_by::primitive::PrimitiveGroupValueBuilder; +use rand::SeedableRng; use rand::distr::{Bernoulli, Distribution}; +use rand::rngs::StdRng; use std::hint::black_box; use std::sync::Arc; @@ -128,7 +129,7 @@ fn bytes_bench( input, "0.75 true", { - let mut rng = seedable_rng(); + let mut rng = StdRng::seed_from_u64(42); let d = Bernoulli::new(0.75).unwrap(); (0..size).map(|_| d.sample(&mut rng)).collect::>() }, @@ -141,7 +142,7 @@ fn bytes_bench( input, "0.5 true", { - let mut rng = seedable_rng(); + let mut rng = StdRng::seed_from_u64(42); let d = Bernoulli::new(0.5).unwrap(); (0..size).map(|_| d.sample(&mut rng)).collect::>() }, @@ -154,7 +155,7 @@ fn bytes_bench( input, "0.25 true", { - let mut rng = seedable_rng(); + let mut rng = StdRng::seed_from_u64(42); let d = Bernoulli::new(0.25).unwrap(); (0..size).map(|_| d.sample(&mut rng)).collect::>() }, @@ -236,7 +237,7 @@ fn bench_single_primitive( &input, "0.75 true", { - let mut rng = seedable_rng(); + let mut rng = StdRng::seed_from_u64(42); let d = Bernoulli::new(0.75).unwrap(); (0..size).map(|_| d.sample(&mut rng)).collect::>() }, @@ -249,7 +250,7 @@ fn bench_single_primitive( &input, "0.5 true", { - let mut rng = seedable_rng(); + let mut rng = StdRng::seed_from_u64(42); let d = Bernoulli::new(0.5).unwrap(); (0..size).map(|_| d.sample(&mut rng)).collect::>() }, @@ -262,7 +263,7 @@ fn bench_single_primitive( &input, "0.25 true", { - let mut rng = seedable_rng(); + let mut rng = StdRng::seed_from_u64(42); let d = Bernoulli::new(0.25).unwrap(); (0..size).map(|_| d.sample(&mut rng)).collect::>() }, diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs index e94e4547e1a75..abc3aba88ad48 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs @@ -145,7 +145,11 @@ impl ByteViewGroupValueBuilder { } } - fn vectorized_append_inner(&mut self, array: &ArrayRef, rows: &[usize]) { + fn vectorized_append_inner( + &mut self, + array: &ArrayRef, + rows: &[usize], + ) -> Result<()> { let arr = array.as_byte_view::(); let null_count = array.null_count(); let num_rows = array.len(); @@ -166,8 +170,50 @@ impl ByteViewGroupValueBuilder { Nulls::None => { self.nulls.append_n(rows.len(), false); - for &row in rows { - self.do_append_val_inner(arr, row); + if arr.data_buffers().is_empty() { + // Fast path: all strings are inline (≤12 bytes). + // The input array's u128 views are already in the correct format; + // copy them directly instead of going through value() → make_view(). + self.views.extend(rows.iter().map(|&row| arr.views()[row])); + } else { + // Slow path: some strings are non-inline (>12 bytes). + // Read views directly to avoid array.value(row) overhead and + // reuse the source view's prefix instead of recomputing it via make_view. + self.views.try_reserve(rows.len()).map_err(|e| { + datafusion_common::exec_datafusion_err!( + "failed to reserve {0} views: {e}", + rows.len() + ) + })?; + for &row in rows { + let view = arr.views()[row]; + let len = view as u32; + if len <= 12 { + // This row happens to be inline; copy view directly. + self.views.push(view); + } else { + let src = ByteView::from(view); + // ensure_in_progress_big_enough must be called before computing + // new_buffer_index / new_offset — it may flush in_progress to completed. + self.ensure_in_progress_big_enough(len as usize); + let new_buffer_index = self.completed.len() as u32; + let new_offset = self.in_progress.len() as u32; + let src_buf = &arr.data_buffers()[src.buffer_index as usize]; + self.in_progress.extend_from_slice( + &src_buf[src.offset as usize + ..(src.offset + src.length) as usize], + ); + // Reuse prefix from the source view — avoids re-reading first 4 bytes. + let new_view = ByteView { + length: src.length, + prefix: src.prefix, + buffer_index: new_buffer_index, + offset: new_offset, + } + .as_u128(); + self.views.push(new_view); + } + } } } @@ -177,6 +223,7 @@ impl ByteViewGroupValueBuilder { self.views.resize(new_len, 0); } } + Ok(()) } fn do_append_val_inner(&mut self, array: &GenericByteViewArray, row: usize) @@ -548,8 +595,7 @@ impl GroupColumn for ByteViewGroupValueBuilder { } fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) -> Result<()> { - self.vectorized_append_inner(array, rows); - Ok(()) + self.vectorized_append_inner(array, rows) } fn len(&self) -> usize {