From a1deef320664726c3aa6bb13156b0142a67569a8 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Mon, 25 May 2026 22:00:51 +0800 Subject: [PATCH 1/5] perf date sub --- datafusion/physical-expr/benches/binary_op.rs | 84 ++++- .../physical-expr/src/expressions/binary.rs | 320 +++++++++++++++--- 2 files changed, 350 insertions(+), 54 deletions(-) diff --git a/datafusion/physical-expr/benches/binary_op.rs b/datafusion/physical-expr/benches/binary_op.rs index 99fc40fa1c91b..f170561652070 100644 --- a/datafusion/physical-expr/benches/binary_op.rs +++ b/datafusion/physical-expr/benches/binary_op.rs @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. +use arrow::{array::StringArray, record_batch::RecordBatch}; use arrow::{ - array::BooleanArray, + array::{BooleanArray, Date32Array, Date64Array}, datatypes::{DataType, Field, Schema}, }; -use arrow::{array::StringArray, record_batch::RecordBatch}; use criterion::{Criterion, criterion_group, criterion_main}; use datafusion_expr::{Operator, and, binary_expr, col, lit, or}; use datafusion_physical_expr::{ @@ -30,6 +30,9 @@ use datafusion_physical_expr::{ use std::hint::black_box; use std::sync::Arc; +const DATE_ARRAY_LEN: usize = 8192; +const MILLIS_PER_DAY: i64 = 86_400_000; + /// Generates BooleanArrays with different true/false distributions for benchmarking. /// /// Returns a vector of tuples containing scenario name and corresponding BooleanArray. @@ -309,6 +312,81 @@ fn create_record_batch( Ok(rbs) } -criterion_group!(benches, benchmark_binary_op_in_short_circuit); +fn make_date32_batch(null_percent: f64) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Date32, true), + Field::new("b", DataType::Date32, true), + ])); + + let left = Date32Array::from_iter((0..DATE_ARRAY_LEN).map(|i| { + (null_percent == 0.0 || i % (1.0 / null_percent) as usize != 0) + .then_some(18_000 + i as i32) + })); + let right = Date32Array::from_iter((0..DATE_ARRAY_LEN).map(|i| { + (null_percent == 0.0 || i % (1.0 / null_percent) as usize != 0) + .then_some(17_000 + (i % 365) as i32) + })); + + RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(left), Arc::new(right)]) + .unwrap() +} + +fn make_date64_batch(null_percent: f64) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Date64, true), + Field::new("b", DataType::Date64, true), + ])); + + let left = Date64Array::from_iter((0..DATE_ARRAY_LEN).map(|i| { + (null_percent == 0.0 || i % (1.0 / null_percent) as usize != 0) + .then_some((18_000 + i as i64) * MILLIS_PER_DAY) + })); + let right = Date64Array::from_iter((0..DATE_ARRAY_LEN).map(|i| { + (null_percent == 0.0 || i % (1.0 / null_percent) as usize != 0) + .then_some((17_000 + (i % 365) as i64) * MILLIS_PER_DAY) + })); + + RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(left), Arc::new(right)]) + .unwrap() +} + +/// Benchmark Date32 column subtraction. +fn benchmark_date32_subtract(c: &mut Criterion) { + for (name, null_percent) in [("no_nulls", 0.0), ("20_percent_nulls", 0.2)] { + let batch = make_date32_batch(null_percent); + let expr = BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Minus, + Arc::new(Column::new("b", 1)), + ); + + c.bench_function(&format!("date32_subtract/{name}"), |b| { + b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap())) + }); + } +} + +/// Benchmark Date64 column subtraction. +fn benchmark_date64_subtract(c: &mut Criterion) { + for (name, null_percent) in [("no_nulls", 0.0), ("20_percent_nulls", 0.2)] { + let batch = make_date64_batch(null_percent); + let expr = BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Minus, + Arc::new(Column::new("b", 1)), + ); + + c.bench_function(&format!("date64_subtract/{name}"), |b| { + b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap())) + }); + } +} + +criterion_group!( + benches, + benchmark_binary_op_in_short_circuit, + benchmark_date32_subtract, + benchmark_date64_subtract +); criterion_main!(benches); diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 712f8f58f3180..9bec77bdf69fc 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -177,85 +177,245 @@ fn is_date_minus_date(lhs: &DataType, rhs: &DataType) -> bool { ) } -/// Computes the difference between two dates and returns the result as Int64 (days) -/// This aligns with PostgreSQL, DuckDB, and MySQL behavior where date - date returns an integer +/// Number of time units per day for each Duration variant. +const SECONDS_PER_DAY: i64 = 86_400; +const MILLIS_PER_DAY: i64 = 86_400_000; +const MICROS_PER_DAY: i64 = 86_400_000_000; +const NANOS_PER_DAY: i64 = 86_400_000_000_000; + +/// Evaluates `Date32 - Date32` or `Date64 - Date64`, returning the difference in +/// whole days as `Int64`. /// -/// Implementation: Uses Arrow's sub_wrapping to get Duration, then converts to Int64 days +/// This matches the behavior of PostgreSQL, DuckDB, and MySQL, where +/// `date - date` yields an integer day count rather than an interval. +/// +/// Internally, Arrow's `sub_wrapping` kernel produces a `Duration` result, which +/// is then converted to days by [`duration_to_days`] (array path) or +/// [`duration_scalar_to_days`] (scalar path). fn apply_date_subtraction( lhs: &ColumnarValue, rhs: &ColumnarValue, ) -> Result { use arrow::compute::kernels::numeric::sub_wrapping; - // Use Arrow's sub_wrapping to compute the Duration result + match (lhs.data_type(), rhs.data_type()) { + (DataType::Date32, DataType::Date32) => { + return subtract_date32_to_days(lhs, rhs); + } + (DataType::Date64, DataType::Date64) => { + return subtract_date64_to_days(lhs, rhs); + } + _ => {} + } + let duration_result = apply(lhs, rhs, sub_wrapping)?; - // Convert Duration to Int64 (days) match duration_result { ColumnarValue::Array(array) => { let int64_array = duration_to_days(&array)?; Ok(ColumnarValue::Array(int64_array)) } ColumnarValue::Scalar(scalar) => { - // Convert scalar Duration to Int64 days - let array = scalar.to_array_of_size(1)?; - let int64_array = duration_to_days(&array)?; - let int64_scalar = ScalarValue::try_from_array(int64_array.as_ref(), 0)?; - Ok(ColumnarValue::Scalar(int64_scalar)) + let days = duration_scalar_to_days(&scalar)?; + Ok(ColumnarValue::Scalar(ScalarValue::Int64(days))) } } } -/// Converts a Duration array to Int64 days -/// Handles different Duration time units (Second, Millisecond, Microsecond, Nanosecond) -fn duration_to_days(array: &ArrayRef) -> Result { - use datafusion_common::cast::{ - as_duration_microsecond_array, as_duration_millisecond_array, - as_duration_nanosecond_array, as_duration_second_array, - }; +fn date32_scalar_value(scalar: &ScalarValue) -> Result> { + match scalar { + ScalarValue::Date32(value) => Ok(*value), + other => internal_err!("expected Date32 scalar, got: {other:?}"), + } +} - const SECONDS_PER_DAY: i64 = 86_400; - const MILLIS_PER_DAY: i64 = 86_400_000; - const MICROS_PER_DAY: i64 = 86_400_000_000; - const NANOS_PER_DAY: i64 = 86_400_000_000_000; +fn date64_scalar_value(scalar: &ScalarValue) -> Result> { + match scalar { + ScalarValue::Date64(value) => Ok(*value), + other => internal_err!("expected Date64 scalar, got: {other:?}"), + } +} - match array.data_type() { - DataType::Duration(TimeUnit::Second) => { - let duration_array = as_duration_second_array(array)?; - let result: Int64Array = duration_array - .iter() - .map(|v| v.map(|val| val / SECONDS_PER_DAY)) - .collect(); - Ok(Arc::new(result)) +fn as_date32_array(array: &ArrayRef) -> Result<&Date32Array> { + array.as_any().downcast_ref::().ok_or_else(|| { + datafusion_common::DataFusionError::Internal(format!( + "expected Date32Array, got {}", + array.data_type() + )) + }) +} + +fn as_date64_array(array: &ArrayRef) -> Result<&Date64Array> { + array.as_any().downcast_ref::().ok_or_else(|| { + datafusion_common::DataFusionError::Internal(format!( + "expected Date64Array, got {}", + array.data_type() + )) + }) +} + +fn subtract_date32_to_days( + lhs: &ColumnarValue, + rhs: &ColumnarValue, +) -> Result { + match (lhs, rhs) { + (ColumnarValue::Array(left), ColumnarValue::Array(right)) => { + let left = as_date32_array(left)?; + let right = as_date32_array(right)?; + let result: Int64Array = + arrow::compute::binary::<_, _, _, Int64Type>(left, right, |l, r| { + i64::from(l) - i64::from(r) + })?; + Ok(ColumnarValue::Array(Arc::new(result))) } - DataType::Duration(TimeUnit::Millisecond) => { - let duration_array = as_duration_millisecond_array(array)?; - let result: Int64Array = duration_array - .iter() - .map(|v| v.map(|val| val / MILLIS_PER_DAY)) - .collect(); - Ok(Arc::new(result)) + (ColumnarValue::Array(left), ColumnarValue::Scalar(right)) => { + let left = as_date32_array(left)?; + match date32_scalar_value(right)? { + Some(right) => { + let result: Int64Array = + left.unary(|left| i64::from(left) - i64::from(right)); + Ok(ColumnarValue::Array(Arc::new(result))) + } + None => Ok(ColumnarValue::Array(new_null_array( + &DataType::Int64, + left.len(), + ))), + } } - DataType::Duration(TimeUnit::Microsecond) => { - let duration_array = as_duration_microsecond_array(array)?; - let result: Int64Array = duration_array - .iter() - .map(|v| v.map(|val| val / MICROS_PER_DAY)) - .collect(); - Ok(Arc::new(result)) + (ColumnarValue::Scalar(left), ColumnarValue::Array(right)) => { + let right = as_date32_array(right)?; + match date32_scalar_value(left)? { + Some(left) => { + let result: Int64Array = + right.unary(|right| i64::from(left) - i64::from(right)); + Ok(ColumnarValue::Array(Arc::new(result))) + } + None => Ok(ColumnarValue::Array(new_null_array( + &DataType::Int64, + right.len(), + ))), + } } - DataType::Duration(TimeUnit::Nanosecond) => { - let duration_array = as_duration_nanosecond_array(array)?; - let result: Int64Array = duration_array - .iter() - .map(|v| v.map(|val| val / NANOS_PER_DAY)) - .collect(); - Ok(Arc::new(result)) + (ColumnarValue::Scalar(left), ColumnarValue::Scalar(right)) => { + let left = date32_scalar_value(left)?; + let right = date32_scalar_value(right)?; + Ok(ColumnarValue::Scalar(ScalarValue::Int64( + left.zip(right) + .map(|(left, right)| i64::from(left) - i64::from(right)), + ))) + } + } +} + +fn subtract_date64_to_days( + lhs: &ColumnarValue, + rhs: &ColumnarValue, +) -> Result { + match (lhs, rhs) { + (ColumnarValue::Array(left), ColumnarValue::Array(right)) => { + let left = as_date64_array(left)?; + let right = as_date64_array(right)?; + let result: Int64Array = + arrow::compute::binary::<_, _, _, Int64Type>(left, right, |l, r| { + l.wrapping_sub(r) / MILLIS_PER_DAY + })?; + Ok(ColumnarValue::Array(Arc::new(result))) + } + (ColumnarValue::Array(left), ColumnarValue::Scalar(right)) => { + let left = as_date64_array(left)?; + match date64_scalar_value(right)? { + Some(right) => { + let result: Int64Array = + left.unary(|left| left.wrapping_sub(right) / MILLIS_PER_DAY); + Ok(ColumnarValue::Array(Arc::new(result))) + } + None => Ok(ColumnarValue::Array(new_null_array( + &DataType::Int64, + left.len(), + ))), + } + } + (ColumnarValue::Scalar(left), ColumnarValue::Array(right)) => { + let right = as_date64_array(right)?; + match date64_scalar_value(left)? { + Some(left) => { + let result: Int64Array = + right.unary(|right| left.wrapping_sub(right) / MILLIS_PER_DAY); + Ok(ColumnarValue::Array(Arc::new(result))) + } + None => Ok(ColumnarValue::Array(new_null_array( + &DataType::Int64, + right.len(), + ))), + } + } + (ColumnarValue::Scalar(left), ColumnarValue::Scalar(right)) => { + let left = date64_scalar_value(left)?; + let right = date64_scalar_value(right)?; + Ok(ColumnarValue::Scalar(ScalarValue::Int64( + left.zip(right) + .map(|(left, right)| left.wrapping_sub(right) / MILLIS_PER_DAY), + ))) } - other => internal_err!("duration_to_days expected Duration type, got: {}", other), } } +/// Converts a `Duration` scalar to a whole-day count (truncating toward zero). +/// +/// Returns `None` if the scalar is null. +fn duration_scalar_to_days(scalar: &ScalarValue) -> Result> { + match scalar { + ScalarValue::DurationSecond(v) => Ok(v.map(|val| val / SECONDS_PER_DAY)), + ScalarValue::DurationMillisecond(v) => Ok(v.map(|val| val / MILLIS_PER_DAY)), + ScalarValue::DurationMicrosecond(v) => Ok(v.map(|val| val / MICROS_PER_DAY)), + ScalarValue::DurationNanosecond(v) => Ok(v.map(|val| val / NANOS_PER_DAY)), + other => internal_err!( + "duration_scalar_to_days expected a Duration scalar, got: {:?}", + other + ), + } +} + +/// Converts a `Duration` array to an `Int64` array of whole-day counts +/// (truncating toward zero). +/// +/// `Duration` arrays are physically identical to `Int64` arrays (same i64 +/// buffer layout), so [`cast`] to `Int64` is zero-copy. [`arrow::compute::unary`] +/// then divides the entire buffer in a single vectorized pass — the compiler +/// auto-vectorizes this loop with SIMD on supported targets — and copies the +/// null bitmap without per-element branching. +fn duration_to_days(array: &ArrayRef) -> Result { + use arrow::compute::unary; + + let units_per_day = match array.data_type() { + DataType::Duration(TimeUnit::Second) => SECONDS_PER_DAY, + DataType::Duration(TimeUnit::Millisecond) => MILLIS_PER_DAY, + DataType::Duration(TimeUnit::Microsecond) => MICROS_PER_DAY, + DataType::Duration(TimeUnit::Nanosecond) => NANOS_PER_DAY, + other => { + return internal_err!( + "duration_to_days expected a Duration array, got: {}", + other + ); + } + }; + + // Zero-copy cast: Duration and Int64 share the same physical i64 buffer. + let int64_array = cast(array.as_ref(), &DataType::Int64)?; + let int64_array = int64_array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion_common::DataFusionError::Internal( + "cast to Int64 did not produce an Int64Array".to_string(), + ) + })?; + + // Single vectorized pass; null bitmap is preserved automatically by `unary`. + let result: Int64Array = unary(int64_array, |val| val / units_per_day); + Ok(Arc::new(result)) +} + impl PhysicalExpr for BinaryExpr { fn data_type(&self, input_schema: &Schema) -> Result { BinaryTypeCoercer::new( @@ -2028,6 +2188,64 @@ mod tests { Ok(()) } + #[test] + fn date32_minus_date32_returns_int64_days() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Date32, true), + Field::new("b", DataType::Date32, true), + ])); + let a = Arc::new(Date32Array::from(vec![ + Some(18_901), + Some(18_901), + None, + Some(18_900), + ])); + let b = Arc::new(Date32Array::from(vec![ + Some(18_898), + Some(18_904), + Some(18_900), + None, + ])); + + apply_arithmetic::( + schema, + vec![a, b], + Operator::Minus, + Int64Array::from(vec![Some(3), Some(-3), None, None]), + )?; + + Ok(()) + } + + #[test] + fn date64_minus_date64_returns_int64_days() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Date64, true), + Field::new("b", DataType::Date64, true), + ])); + let a = Arc::new(Date64Array::from(vec![ + Some(18_901 * MILLIS_PER_DAY), + Some(18_901 * MILLIS_PER_DAY), + None, + Some(18_900 * MILLIS_PER_DAY), + ])); + let b = Arc::new(Date64Array::from(vec![ + Some(18_898 * MILLIS_PER_DAY), + Some(18_904 * MILLIS_PER_DAY), + Some(18_900 * MILLIS_PER_DAY), + None, + ])); + + apply_arithmetic::( + schema, + vec![a, b], + Operator::Minus, + Int64Array::from(vec![Some(3), Some(-3), None, None]), + )?; + + Ok(()) + } + #[test] fn minus_op_dict() -> Result<()> { let schema = Schema::new(vec![ From 5dd794e76356a80c8a13212fb6b0c9ff61fbae30 Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Tue, 26 May 2026 13:52:10 +0800 Subject: [PATCH 2/5] refactor --- .../physical-expr/src/expressions/binary.rs | 235 ++++-------------- 1 file changed, 48 insertions(+), 187 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 9bec77bdf69fc..ac8f1d4da5555 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -177,156 +177,72 @@ fn is_date_minus_date(lhs: &DataType, rhs: &DataType) -> bool { ) } -/// Number of time units per day for each Duration variant. -const SECONDS_PER_DAY: i64 = 86_400; +/// Milliseconds per day, used for Date64 subtraction. const MILLIS_PER_DAY: i64 = 86_400_000; -const MICROS_PER_DAY: i64 = 86_400_000_000; -const NANOS_PER_DAY: i64 = 86_400_000_000_000; /// Evaluates `Date32 - Date32` or `Date64 - Date64`, returning the difference in /// whole days as `Int64`. /// /// This matches the behavior of PostgreSQL, DuckDB, and MySQL, where /// `date - date` yields an integer day count rather than an interval. -/// -/// Internally, Arrow's `sub_wrapping` kernel produces a `Duration` result, which -/// is then converted to days by [`duration_to_days`] (array path) or -/// [`duration_scalar_to_days`] (scalar path). fn apply_date_subtraction( lhs: &ColumnarValue, rhs: &ColumnarValue, ) -> Result { - use arrow::compute::kernels::numeric::sub_wrapping; - match (lhs.data_type(), rhs.data_type()) { - (DataType::Date32, DataType::Date32) => { - return subtract_date32_to_days(lhs, rhs); - } - (DataType::Date64, DataType::Date64) => { - return subtract_date64_to_days(lhs, rhs); - } - _ => {} - } - - let duration_result = apply(lhs, rhs, sub_wrapping)?; - - match duration_result { - ColumnarValue::Array(array) => { - let int64_array = duration_to_days(&array)?; - Ok(ColumnarValue::Array(int64_array)) - } - ColumnarValue::Scalar(scalar) => { - let days = duration_scalar_to_days(&scalar)?; - Ok(ColumnarValue::Scalar(ScalarValue::Int64(days))) - } - } -} - -fn date32_scalar_value(scalar: &ScalarValue) -> Result> { - match scalar { - ScalarValue::Date32(value) => Ok(*value), - other => internal_err!("expected Date32 scalar, got: {other:?}"), - } -} - -fn date64_scalar_value(scalar: &ScalarValue) -> Result> { - match scalar { - ScalarValue::Date64(value) => Ok(*value), - other => internal_err!("expected Date64 scalar, got: {other:?}"), + (DataType::Date32, DataType::Date32) => subtract_date_to_days::( + lhs, + rhs, + |l, r| i64::from(l) - i64::from(r), + ), + (DataType::Date64, DataType::Date64) => subtract_date_to_days::( + lhs, + rhs, + |l, r| l.wrapping_sub(r) / MILLIS_PER_DAY, + ), + (_, _) => unreachable!( + "apply_date_subtraction called with non-date types" + ), } } -fn as_date32_array(array: &ArrayRef) -> Result<&Date32Array> { - array.as_any().downcast_ref::().ok_or_else(|| { - datafusion_common::DataFusionError::Internal(format!( - "expected Date32Array, got {}", - array.data_type() - )) - }) -} - -fn as_date64_array(array: &ArrayRef) -> Result<&Date64Array> { - array.as_any().downcast_ref::().ok_or_else(|| { - datafusion_common::DataFusionError::Internal(format!( - "expected Date64Array, got {}", - array.data_type() - )) - }) -} - -fn subtract_date32_to_days( +/// Generic date subtraction: operates directly on the native primitive values +/// of `T` (i32 for Date32, i64 for Date64), applying `day_diff_fn` to produce +/// an Int64 day count. +fn subtract_date_to_days( lhs: &ColumnarValue, rhs: &ColumnarValue, -) -> Result { - match (lhs, rhs) { - (ColumnarValue::Array(left), ColumnarValue::Array(right)) => { - let left = as_date32_array(left)?; - let right = as_date32_array(right)?; - let result: Int64Array = - arrow::compute::binary::<_, _, _, Int64Type>(left, right, |l, r| { - i64::from(l) - i64::from(r) - })?; - Ok(ColumnarValue::Array(Arc::new(result))) - } - (ColumnarValue::Array(left), ColumnarValue::Scalar(right)) => { - let left = as_date32_array(left)?; - match date32_scalar_value(right)? { - Some(right) => { - let result: Int64Array = - left.unary(|left| i64::from(left) - i64::from(right)); - Ok(ColumnarValue::Array(Arc::new(result))) - } - None => Ok(ColumnarValue::Array(new_null_array( - &DataType::Int64, - left.len(), - ))), - } - } - (ColumnarValue::Scalar(left), ColumnarValue::Array(right)) => { - let right = as_date32_array(right)?; - match date32_scalar_value(left)? { - Some(left) => { - let result: Int64Array = - right.unary(|right| i64::from(left) - i64::from(right)); - Ok(ColumnarValue::Array(Arc::new(result))) - } - None => Ok(ColumnarValue::Array(new_null_array( - &DataType::Int64, - right.len(), - ))), - } - } - (ColumnarValue::Scalar(left), ColumnarValue::Scalar(right)) => { - let left = date32_scalar_value(left)?; - let right = date32_scalar_value(right)?; - Ok(ColumnarValue::Scalar(ScalarValue::Int64( - left.zip(right) - .map(|(left, right)| i64::from(left) - i64::from(right)), - ))) + day_diff_fn: impl Fn(T::Native, T::Native) -> i64, +) -> Result +where + T::Native: Copy, +{ + /// Extract the native value from a scalar by converting to a single-element + /// array and reading index 0. Returns `None` for null scalars. + fn scalar_to_native( + scalar: &ScalarValue, + ) -> Result> { + if scalar.is_null() { + return Ok(None); } + let array = scalar.to_array_of_size(1)?; + Ok(Some(array.as_primitive::

().value(0))) } -} -fn subtract_date64_to_days( - lhs: &ColumnarValue, - rhs: &ColumnarValue, -) -> Result { match (lhs, rhs) { (ColumnarValue::Array(left), ColumnarValue::Array(right)) => { - let left = as_date64_array(left)?; - let right = as_date64_array(right)?; + let left = left.as_primitive::(); + let right = right.as_primitive::(); let result: Int64Array = - arrow::compute::binary::<_, _, _, Int64Type>(left, right, |l, r| { - l.wrapping_sub(r) / MILLIS_PER_DAY - })?; + arrow::compute::binary::<_, _, _, Int64Type>(left, right, &day_diff_fn)?; Ok(ColumnarValue::Array(Arc::new(result))) } (ColumnarValue::Array(left), ColumnarValue::Scalar(right)) => { - let left = as_date64_array(left)?; - match date64_scalar_value(right)? { - Some(right) => { + let left = left.as_primitive::(); + match scalar_to_native::(right)? { + Some(right_val) => { let result: Int64Array = - left.unary(|left| left.wrapping_sub(right) / MILLIS_PER_DAY); + left.unary(|l| day_diff_fn(l, right_val)); Ok(ColumnarValue::Array(Arc::new(result))) } None => Ok(ColumnarValue::Array(new_null_array( @@ -336,11 +252,11 @@ fn subtract_date64_to_days( } } (ColumnarValue::Scalar(left), ColumnarValue::Array(right)) => { - let right = as_date64_array(right)?; - match date64_scalar_value(left)? { - Some(left) => { + let right = right.as_primitive::(); + match scalar_to_native::(left)? { + Some(left_val) => { let result: Int64Array = - right.unary(|right| left.wrapping_sub(right) / MILLIS_PER_DAY); + right.unary(|r| day_diff_fn(left_val, r)); Ok(ColumnarValue::Array(Arc::new(result))) } None => Ok(ColumnarValue::Array(new_null_array( @@ -350,72 +266,17 @@ fn subtract_date64_to_days( } } (ColumnarValue::Scalar(left), ColumnarValue::Scalar(right)) => { - let left = date64_scalar_value(left)?; - let right = date64_scalar_value(right)?; + let left_val = scalar_to_native::(left)?; + let right_val = scalar_to_native::(right)?; Ok(ColumnarValue::Scalar(ScalarValue::Int64( - left.zip(right) - .map(|(left, right)| left.wrapping_sub(right) / MILLIS_PER_DAY), + left_val + .zip(right_val) + .map(|(l, r)| day_diff_fn(l, r)), ))) } } } -/// Converts a `Duration` scalar to a whole-day count (truncating toward zero). -/// -/// Returns `None` if the scalar is null. -fn duration_scalar_to_days(scalar: &ScalarValue) -> Result> { - match scalar { - ScalarValue::DurationSecond(v) => Ok(v.map(|val| val / SECONDS_PER_DAY)), - ScalarValue::DurationMillisecond(v) => Ok(v.map(|val| val / MILLIS_PER_DAY)), - ScalarValue::DurationMicrosecond(v) => Ok(v.map(|val| val / MICROS_PER_DAY)), - ScalarValue::DurationNanosecond(v) => Ok(v.map(|val| val / NANOS_PER_DAY)), - other => internal_err!( - "duration_scalar_to_days expected a Duration scalar, got: {:?}", - other - ), - } -} - -/// Converts a `Duration` array to an `Int64` array of whole-day counts -/// (truncating toward zero). -/// -/// `Duration` arrays are physically identical to `Int64` arrays (same i64 -/// buffer layout), so [`cast`] to `Int64` is zero-copy. [`arrow::compute::unary`] -/// then divides the entire buffer in a single vectorized pass — the compiler -/// auto-vectorizes this loop with SIMD on supported targets — and copies the -/// null bitmap without per-element branching. -fn duration_to_days(array: &ArrayRef) -> Result { - use arrow::compute::unary; - - let units_per_day = match array.data_type() { - DataType::Duration(TimeUnit::Second) => SECONDS_PER_DAY, - DataType::Duration(TimeUnit::Millisecond) => MILLIS_PER_DAY, - DataType::Duration(TimeUnit::Microsecond) => MICROS_PER_DAY, - DataType::Duration(TimeUnit::Nanosecond) => NANOS_PER_DAY, - other => { - return internal_err!( - "duration_to_days expected a Duration array, got: {}", - other - ); - } - }; - - // Zero-copy cast: Duration and Int64 share the same physical i64 buffer. - let int64_array = cast(array.as_ref(), &DataType::Int64)?; - let int64_array = int64_array - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion_common::DataFusionError::Internal( - "cast to Int64 did not produce an Int64Array".to_string(), - ) - })?; - - // Single vectorized pass; null bitmap is preserved automatically by `unary`. - let result: Int64Array = unary(int64_array, |val| val / units_per_day); - Ok(Arc::new(result)) -} - impl PhysicalExpr for BinaryExpr { fn data_type(&self, input_schema: &Schema) -> Result { BinaryTypeCoercer::new( From 0ce4250014ab3c595dcdd0d2cc1c5eb17337b959 Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Fri, 29 May 2026 09:58:14 +0800 Subject: [PATCH 3/5] fmt --- .../physical-expr/src/expressions/binary.rs | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index ac8f1d4da5555..c274a79bf5d80 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -190,19 +190,17 @@ fn apply_date_subtraction( rhs: &ColumnarValue, ) -> Result { match (lhs.data_type(), rhs.data_type()) { - (DataType::Date32, DataType::Date32) => subtract_date_to_days::( - lhs, - rhs, - |l, r| i64::from(l) - i64::from(r), - ), - (DataType::Date64, DataType::Date64) => subtract_date_to_days::( - lhs, - rhs, - |l, r| l.wrapping_sub(r) / MILLIS_PER_DAY, - ), - (_, _) => unreachable!( - "apply_date_subtraction called with non-date types" - ), + (DataType::Date32, DataType::Date32) => { + subtract_date_to_days::(lhs, rhs, |l, r| { + i64::from(l) - i64::from(r) + }) + } + (DataType::Date64, DataType::Date64) => { + subtract_date_to_days::(lhs, rhs, |l, r| { + l.wrapping_sub(r) / MILLIS_PER_DAY + }) + } + (_, _) => unreachable!("apply_date_subtraction called with non-date types"), } } @@ -241,8 +239,7 @@ where let left = left.as_primitive::(); match scalar_to_native::(right)? { Some(right_val) => { - let result: Int64Array = - left.unary(|l| day_diff_fn(l, right_val)); + let result: Int64Array = left.unary(|l| day_diff_fn(l, right_val)); Ok(ColumnarValue::Array(Arc::new(result))) } None => Ok(ColumnarValue::Array(new_null_array( @@ -255,8 +252,7 @@ where let right = right.as_primitive::(); match scalar_to_native::(left)? { Some(left_val) => { - let result: Int64Array = - right.unary(|r| day_diff_fn(left_val, r)); + let result: Int64Array = right.unary(|r| day_diff_fn(left_val, r)); Ok(ColumnarValue::Array(Arc::new(result))) } None => Ok(ColumnarValue::Array(new_null_array( @@ -269,9 +265,7 @@ where let left_val = scalar_to_native::(left)?; let right_val = scalar_to_native::(right)?; Ok(ColumnarValue::Scalar(ScalarValue::Int64( - left_val - .zip(right_val) - .map(|(l, r)| day_diff_fn(l, r)), + left_val.zip(right_val).map(|(l, r)| day_diff_fn(l, r)), ))) } } From 65839d0f707e4c27e693e61168b4b48ec4abdf59 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Mon, 1 Jun 2026 09:41:38 +0800 Subject: [PATCH 4/5] apply suggestions --- .../physical-expr/src/expressions/binary.rs | 73 ++++++++++++------- 1 file changed, 48 insertions(+), 25 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 169946f80d948..3b9a25801ec99 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -210,21 +210,28 @@ fn apply_date_subtraction( fn subtract_date_to_days( lhs: &ColumnarValue, rhs: &ColumnarValue, - day_diff_fn: impl Fn(T::Native, T::Native) -> i64, + day_diff_fn: impl Fn(i64, i64) -> i64, ) -> Result where - T::Native: Copy, + T::Native: Copy + Into, { - /// Extract the native value from a scalar by converting to a single-element - /// array and reading index 0. Returns `None` for null scalars. - fn scalar_to_native( + /// Extract the date value as `i64`. Returns `None` for null scalars. + fn date_scalar_to_i64( scalar: &ScalarValue, - ) -> Result> { - if scalar.is_null() { - return Ok(None); + ) -> Result> { + match scalar { + ScalarValue::Date32(value) if P::DATA_TYPE == DataType::Date32 => { + Ok(value.map(i64::from)) + } + ScalarValue::Date64(value) if P::DATA_TYPE == DataType::Date64 => Ok(*value), + other => { + internal_err!( + "{} date scalar expected, got: {}", + P::DATA_TYPE, + other.data_type() + ) + } } - let array = scalar.to_array_of_size(1)?; - Ok(Some(array.as_primitive::

().value(0))) } match (lhs, rhs) { @@ -232,38 +239,36 @@ where let left = left.as_primitive::(); let right = right.as_primitive::(); let result: Int64Array = - arrow::compute::binary::<_, _, _, Int64Type>(left, right, &day_diff_fn)?; + arrow::compute::binary::<_, _, _, Int64Type>(left, right, |l, r| { + day_diff_fn(l.into(), r.into()) + })?; Ok(ColumnarValue::Array(Arc::new(result))) } (ColumnarValue::Array(left), ColumnarValue::Scalar(right)) => { let left = left.as_primitive::(); - match scalar_to_native::(right)? { + match date_scalar_to_i64::(right)? { Some(right_val) => { - let result: Int64Array = left.unary(|l| day_diff_fn(l, right_val)); + let result: Int64Array = + left.unary(|l| day_diff_fn(l.into(), right_val)); Ok(ColumnarValue::Array(Arc::new(result))) } - None => Ok(ColumnarValue::Array(new_null_array( - &DataType::Int64, - left.len(), - ))), + None => Ok(ColumnarValue::Scalar(ScalarValue::Int64(None))), } } (ColumnarValue::Scalar(left), ColumnarValue::Array(right)) => { let right = right.as_primitive::(); - match scalar_to_native::(left)? { + match date_scalar_to_i64::(left)? { Some(left_val) => { - let result: Int64Array = right.unary(|r| day_diff_fn(left_val, r)); + let result: Int64Array = + right.unary(|r| day_diff_fn(left_val, r.into())); Ok(ColumnarValue::Array(Arc::new(result))) } - None => Ok(ColumnarValue::Array(new_null_array( - &DataType::Int64, - right.len(), - ))), + None => Ok(ColumnarValue::Scalar(ScalarValue::Int64(None))), } } (ColumnarValue::Scalar(left), ColumnarValue::Scalar(right)) => { - let left_val = scalar_to_native::(left)?; - let right_val = scalar_to_native::(right)?; + let left_val = date_scalar_to_i64::(left)?; + let right_val = date_scalar_to_i64::(right)?; Ok(ColumnarValue::Scalar(ScalarValue::Int64( left_val.zip(right_val).map(|(l, r)| day_diff_fn(l, r)), ))) @@ -2085,6 +2090,24 @@ mod tests { Ok(()) } + #[test] + fn date32_minus_null_scalar_returns_int64_null_scalar() -> Result<()> { + let result = apply_date_subtraction( + &ColumnarValue::Array(Arc::new(Date32Array::from(vec![ + Some(18_901), + Some(18_900), + ]))), + &ColumnarValue::Scalar(ScalarValue::Date32(None)), + )?; + + assert!(matches!( + result, + ColumnarValue::Scalar(ScalarValue::Int64(None)) + )); + + Ok(()) + } + #[test] fn minus_op_dict() -> Result<()> { let schema = Schema::new(vec![ From 607cc320f258614cd777072bca218dba599b64e9 Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Mon, 1 Jun 2026 10:00:42 +0800 Subject: [PATCH 5/5] lint --- datafusion/physical-expr/src/expressions/binary.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 3b9a25801ec99..6f0b60556a751 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -191,9 +191,7 @@ fn apply_date_subtraction( ) -> Result { match (lhs.data_type(), rhs.data_type()) { (DataType::Date32, DataType::Date32) => { - subtract_date_to_days::(lhs, rhs, |l, r| { - i64::from(l) - i64::from(r) - }) + subtract_date_to_days::(lhs, rhs, |l, r| l - r) } (DataType::Date64, DataType::Date64) => { subtract_date_to_days::(lhs, rhs, |l, r| {