diff --git a/datafusion/physical-expr/benches/binary_op.rs b/datafusion/physical-expr/benches/binary_op.rs index 99fc40fa1c91b..f170561652070 100644 --- a/datafusion/physical-expr/benches/binary_op.rs +++ b/datafusion/physical-expr/benches/binary_op.rs @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. +use arrow::{array::StringArray, record_batch::RecordBatch}; use arrow::{ - array::BooleanArray, + array::{BooleanArray, Date32Array, Date64Array}, datatypes::{DataType, Field, Schema}, }; -use arrow::{array::StringArray, record_batch::RecordBatch}; use criterion::{Criterion, criterion_group, criterion_main}; use datafusion_expr::{Operator, and, binary_expr, col, lit, or}; use datafusion_physical_expr::{ @@ -30,6 +30,9 @@ use datafusion_physical_expr::{ use std::hint::black_box; use std::sync::Arc; +const DATE_ARRAY_LEN: usize = 8192; +const MILLIS_PER_DAY: i64 = 86_400_000; + /// Generates BooleanArrays with different true/false distributions for benchmarking. /// /// Returns a vector of tuples containing scenario name and corresponding BooleanArray. @@ -309,6 +312,81 @@ fn create_record_batch( Ok(rbs) } -criterion_group!(benches, benchmark_binary_op_in_short_circuit); +fn make_date32_batch(null_percent: f64) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Date32, true), + Field::new("b", DataType::Date32, true), + ])); + + let left = Date32Array::from_iter((0..DATE_ARRAY_LEN).map(|i| { + (null_percent == 0.0 || i % (1.0 / null_percent) as usize != 0) + .then_some(18_000 + i as i32) + })); + let right = Date32Array::from_iter((0..DATE_ARRAY_LEN).map(|i| { + (null_percent == 0.0 || i % (1.0 / null_percent) as usize != 0) + .then_some(17_000 + (i % 365) as i32) + })); + + RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(left), Arc::new(right)]) + .unwrap() +} + +fn make_date64_batch(null_percent: f64) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Date64, true), + Field::new("b", DataType::Date64, true), + ])); + + let left = Date64Array::from_iter((0..DATE_ARRAY_LEN).map(|i| { + (null_percent == 0.0 || i % (1.0 / null_percent) as usize != 0) + .then_some((18_000 + i as i64) * MILLIS_PER_DAY) + })); + let right = Date64Array::from_iter((0..DATE_ARRAY_LEN).map(|i| { + (null_percent == 0.0 || i % (1.0 / null_percent) as usize != 0) + .then_some((17_000 + (i % 365) as i64) * MILLIS_PER_DAY) + })); + + RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(left), Arc::new(right)]) + .unwrap() +} + +/// Benchmark Date32 column subtraction. +fn benchmark_date32_subtract(c: &mut Criterion) { + for (name, null_percent) in [("no_nulls", 0.0), ("20_percent_nulls", 0.2)] { + let batch = make_date32_batch(null_percent); + let expr = BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Minus, + Arc::new(Column::new("b", 1)), + ); + + c.bench_function(&format!("date32_subtract/{name}"), |b| { + b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap())) + }); + } +} + +/// Benchmark Date64 column subtraction. +fn benchmark_date64_subtract(c: &mut Criterion) { + for (name, null_percent) in [("no_nulls", 0.0), ("20_percent_nulls", 0.2)] { + let batch = make_date64_batch(null_percent); + let expr = BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Minus, + Arc::new(Column::new("b", 1)), + ); + + c.bench_function(&format!("date64_subtract/{name}"), |b| { + b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap())) + }); + } +} + +criterion_group!( + benches, + benchmark_binary_op_in_short_circuit, + benchmark_date32_subtract, + benchmark_date64_subtract +); criterion_main!(benches); diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 8be783985e2b4..6f0b60556a751 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -177,82 +177,100 @@ fn is_date_minus_date(lhs: &DataType, rhs: &DataType) -> bool { ) } -/// Computes the difference between two dates and returns the result as Int64 (days) -/// This aligns with PostgreSQL, DuckDB, and MySQL behavior where date - date returns an integer +/// Milliseconds per day, used for Date64 subtraction. +const MILLIS_PER_DAY: i64 = 86_400_000; + +/// Evaluates `Date32 - Date32` or `Date64 - Date64`, returning the difference in +/// whole days as `Int64`. /// -/// Implementation: Uses Arrow's sub_wrapping to get Duration, then converts to Int64 days +/// This matches the behavior of PostgreSQL, DuckDB, and MySQL, where +/// `date - date` yields an integer day count rather than an interval. fn apply_date_subtraction( lhs: &ColumnarValue, rhs: &ColumnarValue, ) -> Result { - use arrow::compute::kernels::numeric::sub_wrapping; - - // Use Arrow's sub_wrapping to compute the Duration result - let duration_result = apply(lhs, rhs, sub_wrapping)?; - - // Convert Duration to Int64 (days) - match duration_result { - ColumnarValue::Array(array) => { - let int64_array = duration_to_days(&array)?; - Ok(ColumnarValue::Array(int64_array)) + match (lhs.data_type(), rhs.data_type()) { + (DataType::Date32, DataType::Date32) => { + subtract_date_to_days::(lhs, rhs, |l, r| l - r) } - ColumnarValue::Scalar(scalar) => { - // Convert scalar Duration to Int64 days - let array = scalar.to_array_of_size(1)?; - let int64_array = duration_to_days(&array)?; - let int64_scalar = ScalarValue::try_from_array(int64_array.as_ref(), 0)?; - Ok(ColumnarValue::Scalar(int64_scalar)) + (DataType::Date64, DataType::Date64) => { + subtract_date_to_days::(lhs, rhs, |l, r| { + l.wrapping_sub(r) / MILLIS_PER_DAY + }) } + (_, _) => unreachable!("apply_date_subtraction called with non-date types"), } } -/// Converts a Duration array to Int64 days -/// Handles different Duration time units (Second, Millisecond, Microsecond, Nanosecond) -fn duration_to_days(array: &ArrayRef) -> Result { - use datafusion_common::cast::{ - as_duration_microsecond_array, as_duration_millisecond_array, - as_duration_nanosecond_array, as_duration_second_array, - }; +/// Generic date subtraction: operates directly on the native primitive values +/// of `T` (i32 for Date32, i64 for Date64), applying `day_diff_fn` to produce +/// an Int64 day count. +fn subtract_date_to_days( + lhs: &ColumnarValue, + rhs: &ColumnarValue, + day_diff_fn: impl Fn(i64, i64) -> i64, +) -> Result +where + T::Native: Copy + Into, +{ + /// Extract the date value as `i64`. Returns `None` for null scalars. + fn date_scalar_to_i64( + scalar: &ScalarValue, + ) -> Result> { + match scalar { + ScalarValue::Date32(value) if P::DATA_TYPE == DataType::Date32 => { + Ok(value.map(i64::from)) + } + ScalarValue::Date64(value) if P::DATA_TYPE == DataType::Date64 => Ok(*value), + other => { + internal_err!( + "{} date scalar expected, got: {}", + P::DATA_TYPE, + other.data_type() + ) + } + } + } - const SECONDS_PER_DAY: i64 = 86_400; - const MILLIS_PER_DAY: i64 = 86_400_000; - const MICROS_PER_DAY: i64 = 86_400_000_000; - const NANOS_PER_DAY: i64 = 86_400_000_000_000; - - match array.data_type() { - DataType::Duration(TimeUnit::Second) => { - let duration_array = as_duration_second_array(array)?; - let result: Int64Array = duration_array - .iter() - .map(|v| v.map(|val| val / SECONDS_PER_DAY)) - .collect(); - Ok(Arc::new(result)) + match (lhs, rhs) { + (ColumnarValue::Array(left), ColumnarValue::Array(right)) => { + let left = left.as_primitive::(); + let right = right.as_primitive::(); + let result: Int64Array = + arrow::compute::binary::<_, _, _, Int64Type>(left, right, |l, r| { + day_diff_fn(l.into(), r.into()) + })?; + Ok(ColumnarValue::Array(Arc::new(result))) } - DataType::Duration(TimeUnit::Millisecond) => { - let duration_array = as_duration_millisecond_array(array)?; - let result: Int64Array = duration_array - .iter() - .map(|v| v.map(|val| val / MILLIS_PER_DAY)) - .collect(); - Ok(Arc::new(result)) + (ColumnarValue::Array(left), ColumnarValue::Scalar(right)) => { + let left = left.as_primitive::(); + match date_scalar_to_i64::(right)? { + Some(right_val) => { + let result: Int64Array = + left.unary(|l| day_diff_fn(l.into(), right_val)); + Ok(ColumnarValue::Array(Arc::new(result))) + } + None => Ok(ColumnarValue::Scalar(ScalarValue::Int64(None))), + } } - DataType::Duration(TimeUnit::Microsecond) => { - let duration_array = as_duration_microsecond_array(array)?; - let result: Int64Array = duration_array - .iter() - .map(|v| v.map(|val| val / MICROS_PER_DAY)) - .collect(); - Ok(Arc::new(result)) + (ColumnarValue::Scalar(left), ColumnarValue::Array(right)) => { + let right = right.as_primitive::(); + match date_scalar_to_i64::(left)? { + Some(left_val) => { + let result: Int64Array = + right.unary(|r| day_diff_fn(left_val, r.into())); + Ok(ColumnarValue::Array(Arc::new(result))) + } + None => Ok(ColumnarValue::Scalar(ScalarValue::Int64(None))), + } } - DataType::Duration(TimeUnit::Nanosecond) => { - let duration_array = as_duration_nanosecond_array(array)?; - let result: Int64Array = duration_array - .iter() - .map(|v| v.map(|val| val / NANOS_PER_DAY)) - .collect(); - Ok(Arc::new(result)) + (ColumnarValue::Scalar(left), ColumnarValue::Scalar(right)) => { + let left_val = date_scalar_to_i64::(left)?; + let right_val = date_scalar_to_i64::(right)?; + Ok(ColumnarValue::Scalar(ScalarValue::Int64( + left_val.zip(right_val).map(|(l, r)| day_diff_fn(l, r)), + ))) } - other => internal_err!("duration_to_days expected Duration type, got: {}", other), } } @@ -2012,6 +2030,82 @@ mod tests { Ok(()) } + #[test] + fn date32_minus_date32_returns_int64_days() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Date32, true), + Field::new("b", DataType::Date32, true), + ])); + let a = Arc::new(Date32Array::from(vec![ + Some(18_901), + Some(18_901), + None, + Some(18_900), + ])); + let b = Arc::new(Date32Array::from(vec![ + Some(18_898), + Some(18_904), + Some(18_900), + None, + ])); + + apply_arithmetic::( + schema, + vec![a, b], + Operator::Minus, + Int64Array::from(vec![Some(3), Some(-3), None, None]), + )?; + + Ok(()) + } + + #[test] + fn date64_minus_date64_returns_int64_days() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Date64, true), + Field::new("b", DataType::Date64, true), + ])); + let a = Arc::new(Date64Array::from(vec![ + Some(18_901 * MILLIS_PER_DAY), + Some(18_901 * MILLIS_PER_DAY), + None, + Some(18_900 * MILLIS_PER_DAY), + ])); + let b = Arc::new(Date64Array::from(vec![ + Some(18_898 * MILLIS_PER_DAY), + Some(18_904 * MILLIS_PER_DAY), + Some(18_900 * MILLIS_PER_DAY), + None, + ])); + + apply_arithmetic::( + schema, + vec![a, b], + Operator::Minus, + Int64Array::from(vec![Some(3), Some(-3), None, None]), + )?; + + Ok(()) + } + + #[test] + fn date32_minus_null_scalar_returns_int64_null_scalar() -> Result<()> { + let result = apply_date_subtraction( + &ColumnarValue::Array(Arc::new(Date32Array::from(vec![ + Some(18_901), + Some(18_900), + ]))), + &ColumnarValue::Scalar(ScalarValue::Date32(None)), + )?; + + assert!(matches!( + result, + ColumnarValue::Scalar(ScalarValue::Int64(None)) + )); + + Ok(()) + } + #[test] fn minus_op_dict() -> Result<()> { let schema = Schema::new(vec![