From 39eb39818cf3b5569ee97ae75c14645be00386d0 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Wed, 27 May 2026 10:16:49 -0400 Subject: [PATCH 1/6] . --- datafusion/optimizer/src/eliminate_join.rs | 973 +++++++++++++++++- .../optimize_projections/required_indices.rs | 28 +- datafusion/optimizer/src/utils.rs | 53 +- datafusion/sqllogictest/test_files/joins.slt | 57 +- .../sqllogictest/test_files/subquery.slt | 14 +- 5 files changed, 1055 insertions(+), 70 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_join.rs b/datafusion/optimizer/src/eliminate_join.rs index 885910c1e4182..72fce8052cee9 100644 --- a/datafusion/optimizer/src/eliminate_join.rs +++ b/datafusion/optimizer/src/eliminate_join.rs @@ -15,19 +15,75 @@ // specific language governing permissions and limitations // under the License. -//! [`EliminateJoin`] rewrites `INNER JOIN` with `true`/`null` -use crate::optimizer::ApplyOrder; +//! [`EliminateJoin`] rewrites inner joins to simpler forms to make them cheaper +//! to evaluate. +//! +//! # What it rewrites +//! +//! * An inner join can be rewritten to an empty relation if the join condition +//! is trivially false. +//! +//! * An inner join `L ⋈ R` can be rewritten to a left semi join `L ⋉ R` +//! (`LeftSemi`), which keeps the rows of L that have a match in R and outputs +//! only L's columns. The rewrite to `L ⋉ R` is valid when both of the +//! following are true: +//! +//! 1. None of R's columns are referenced above the join. +//! 2. R does not observably multiply L's rows. This holds when either the +//! join's ancestors are duplicate-insensitive (e.g., DISTINCT) or we can use +//! functional dependencies to prove that each L row matches at most one R +//! row (R is provably unique on the join keys). +//! +//! # How it works +//! +//! `rewrite_subtree` walks the plan top-down, threading two pieces of context +//! down to each join: +//! +//! * `live` — which of the join's output columns are referenced above it. It is +//! propagated top-down: each node asks its children only for the columns it +//! needs from them, so a projection or aggregate asks for just the columns its +//! expressions reference, dropping the rest (the narrowing); a join splits the +//! set across its two inputs. +//! * `duplicate_insensitive` — whether emitting each row once instead of many +//! times will not change the output. A duplicate-collapsing node (e.g., +//! DISTINCT, GROUP BY with no aggregate functions, or the existence side of a +//! semi/anti/mark join) sets it `true` for its subtree, and it propagates +//! downward until a node that makes the row count observable again (a `LIMIT`, +//! a top-N sort, ...) clears it. It is therefore fixed by the nearest such +//! node, not by the whole ancestor chain: a collapsing node shields its subtree, +//! so a duplicate-sensitive node further above does not matter. +//! +//! At each join, `rewritten_join_type` combines this context with the side's +//! functional dependencies to choose `Inner`, `LeftSemi`, or `RightSemi`. Most +//! node types just forward the context to their single child via +//! `rewrite_single_input`; nodes that alter column requirements or +//! duplicate-sensitivity (projection, aggregate, sort, ...) adjust it first. +//! +//! Joins nested inside subquery expressions are reached as well: `rewrite_subtree` +//! descends into each node's subquery plans itself (via `map_subqueries`), +//! seeding each as a fresh root. +use crate::utils::for_each_referenced_index; use crate::{OptimizerConfig, OptimizerRule}; -use datafusion_common::tree_node::Transformed; -use datafusion_common::{Result, ScalarValue}; -use datafusion_expr::JoinType::Inner; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::{ + DFSchema, Dependency, HashSet, NullEquality, Result, ScalarValue, +}; use datafusion_expr::{ - Expr, - logical_plan::{EmptyRelation, LogicalPlan}, + Expr, JoinType, SortExpr, + logical_plan::{ + Aggregate, Distinct, DistinctOn, EmptyRelation, Filter, Join, Limit, LogicalPlan, + Partitioning, Projection, Repartition, Sort, SubqueryAlias, + }, }; +use std::sync::Arc; + +/// The columns that are "live" at a plan node, i.e., which of its output +/// columns are referenced by an ancestor node. Represented as a set of column +/// indices, relative to the node's schema. +type LiveColumns = HashSet; -/// Eliminates joins when join condition is false. -/// Replaces joins when inner join condition is true with a cross join. +/// Rewrites an inner join to a semi join when one input only filters the other, +/// and replaces an always-false inner join with an empty relation. #[derive(Default, Debug)] pub struct EliminateJoin; @@ -42,34 +98,474 @@ impl OptimizerRule for EliminateJoin { "eliminate_join" } - fn apply_order(&self) -> Option { - Some(ApplyOrder::TopDown) - } - fn rewrite( &self, plan: LogicalPlan, _config: &dyn OptimizerConfig, ) -> Result> { - match plan { - LogicalPlan::Join(join) if join.join_type == Inner && join.on.is_empty() => { - match join.filter { - Some(Expr::Literal(ScalarValue::Boolean(Some(false)), _)) => Ok( - Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation { - produce_one_row: false, - schema: join.schema, - })), - ), - _ => Ok(Transformed::no(LogicalPlan::Join(join))), + let live = all_columns(plan.schema()); + rewrite_subtree(plan, live, false) + } +} + +/// Rewrites `plan` and everything below it, including joins nested inside +/// subquery expressions. +/// +/// [`rewrite_node`] handles the node itself and recurses into its plan +/// children; this wrapper additionally descends into the node's own subquery +/// expressions. Each subquery is seeded as a fresh root, since its columns are +/// independent of the enclosing plan's `live` set. +fn rewrite_subtree( + plan: LogicalPlan, + live: LiveColumns, + duplicate_insensitive: bool, +) -> Result> { + rewrite_node(plan, live, duplicate_insensitive)?.transform_data(|plan| { + plan.map_subqueries(|subquery| { + let live = all_columns(subquery.schema()); + rewrite_subtree(subquery, live, false) + }) + }) +} + +fn rewrite_node( + plan: LogicalPlan, + live: LiveColumns, + duplicate_insensitive: bool, +) -> Result> { + match plan { + // The only arm that rewrites a join; the rest just thread context down to one. + LogicalPlan::Join(join) => rewrite_join(join, &live, duplicate_insensitive), + LogicalPlan::Projection(Projection { + expr, + input, + schema, + .. + }) => { + // Narrows `live` to the columns the projection's expressions reference. + let child_live = live_columns_for_exprs(&expr, input.schema())?; + rewrite_single_input(input, child_live, duplicate_insensitive, |input| { + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + expr, input, schema, + )?)) + }) + } + LogicalPlan::Filter(Filter { + predicate, input, .. + }) => { + // Adds the predicate's columns to `live` (a side used only by the filter stays live). + let mut child_live = live; + extend_live_columns(&mut child_live, &predicate, input.schema())?; + rewrite_single_input(input, child_live, duplicate_insensitive, |input| { + Ok(LogicalPlan::Filter(Filter::new(predicate, input))) + }) + } + LogicalPlan::Aggregate(Aggregate { + input, + group_expr, + aggr_expr, + schema, + .. + }) => { + // Narrows `live` to the grouping and aggregate expressions' columns. + let mut child_live = live_columns_for_exprs(&group_expr, input.schema())?; + extend_live_columns_for_exprs(&mut child_live, &aggr_expr, input.schema())?; + + // A grouping aggregate with no aggregate functions (`GROUP BY` with + // an empty `aggr_expr`) only observes which group-key values exist, + // not how many rows produced them, so its input is duplicate- + // insensitive. + let child_duplicate_insensitive = + !group_expr.is_empty() && aggr_expr.is_empty(); + + rewrite_single_input( + input, + child_live, + child_duplicate_insensitive, + |input| { + Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema( + input, group_expr, aggr_expr, schema, + )?)) + }, + ) + } + LogicalPlan::Distinct(Distinct::All(input)) => { + // `SELECT DISTINCT *` deduplicates on every input column, so a join + // side below is part of the DISTINCT key and cannot be dropped just + // because duplicate rows are collapsed. Recurse with + // `duplicate_insensitive = false` so a join is only rewritten to a + // semi join when the removed side is provably unique on the join + // keys: uniqueness makes that side's columns functionally determined + // by the preserved join keys, hence redundant in the DISTINCT key. + rewrite_single_input(input, live, false, |input| { + Ok(LogicalPlan::Distinct(Distinct::All(input))) + }) + } + LogicalPlan::Distinct(Distinct::On(DistinctOn { + on_expr, + select_expr, + sort_expr, + input, + schema, + })) => { + // Narrows `live` to the ON/SELECT/ORDER BY columns; stays duplicate-sensitive. + let mut child_live = live_columns_for_exprs(&on_expr, input.schema())?; + extend_live_columns_for_exprs(&mut child_live, &select_expr, input.schema())?; + if let Some(sort_expr) = &sort_expr { + extend_live_columns_for_sort_exprs( + &mut child_live, + sort_expr, + input.schema(), + )?; + } + + rewrite_single_input(input, child_live, false, |input| { + Ok(LogicalPlan::Distinct(Distinct::On(DistinctOn { + on_expr, + select_expr, + sort_expr, + input, + schema, + }))) + }) + } + LogicalPlan::Sort(Sort { expr, input, fetch }) => { + // Adds the sort-key columns to `live`. + let mut child_live = live; + extend_live_columns_for_sort_exprs(&mut child_live, &expr, input.schema())?; + + // A `fetch` (top-N) makes the row count observable, so duplicate- + // insensitivity does not survive past it. + let child_duplicate_insensitive = duplicate_insensitive && fetch.is_none(); + rewrite_single_input( + input, + child_live, + child_duplicate_insensitive, + |input| Ok(LogicalPlan::Sort(Sort { expr, input, fetch })), + ) + } + LogicalPlan::Limit(Limit { skip, fetch, input }) => { + // LIMIT makes the row count observable, so it clears duplicate-insensitivity. + rewrite_single_input(input, live, false, |input| { + Ok(LogicalPlan::Limit(Limit { skip, fetch, input })) + }) + } + LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { + // Re-aliases columns 1:1, so `live` and duplicate-sensitivity pass through unchanged. + rewrite_single_input(input, live, duplicate_insensitive, |input| { + Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new( + input, alias, + )?)) + }) + } + LogicalPlan::Repartition(Repartition { + input, + partitioning_scheme, + }) => { + // Adds any partitioning-key columns to `live`; duplicate-sensitivity is unchanged. + let mut child_live = live; + match &partitioning_scheme { + Partitioning::Hash(exprs, _) | Partitioning::DistributeBy(exprs) => { + extend_live_columns_for_exprs( + &mut child_live, + exprs, + input.schema(), + )?; } + Partitioning::RoundRobinBatch(_) => {} } - _ => Ok(Transformed::no(plan)), + rewrite_single_input(input, child_live, duplicate_insensitive, |input| { + Ok(LogicalPlan::Repartition(Repartition { + input, + partitioning_scheme, + })) + }) + } + // Conservatively treat any other plan node as a fresh root, since we are + // not sure of its semantics with respect to duplicates or live columns. + _ => plan.map_children(|child| { + let live = all_columns(child.schema()); + rewrite_subtree(child, live, false) + }), + } +} + +/// Recurses into a single-input node's child, threading `child_live` and +/// `duplicate_insensitive` down, then rebuilds the node from the (possibly +/// rewritten) child via `rebuild`. The child's `Transformed` flag is preserved, +/// so the node is reported as changed exactly when its child changed. +fn rewrite_single_input( + input: Arc, + child_live: LiveColumns, + duplicate_insensitive: bool, + rebuild: F, +) -> Result> +where + F: FnOnce(Arc) -> Result, +{ + rewrite_subtree( + Arc::unwrap_or_clone(input), + child_live, + duplicate_insensitive, + )? + .map_data(|input| rebuild(Arc::new(input))) +} + +fn rewrite_join( + join: Join, + live: &LiveColumns, + duplicate_insensitive: bool, +) -> Result> { + if join.join_type == JoinType::Inner + && join.on.is_empty() + && matches!( + join.filter.as_ref(), + Some(Expr::Literal(ScalarValue::Boolean(Some(false)), _)) + ) + { + return Ok(Transformed::yes(LogicalPlan::EmptyRelation( + EmptyRelation { + produce_one_row: false, + schema: join.schema, + }, + ))); + } + + let (visible_left, visible_right) = split_join_output_columns(&join, live); + + let rewritten_join_type = + rewritten_join_type(&join, &visible_left, &visible_right, duplicate_insensitive); + + let (mut left_live, mut right_live) = match rewritten_join_type { + JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => { + (visible_left, LiveColumns::new()) + } + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => { + (LiveColumns::new(), visible_right) + } + _ => (visible_left, visible_right), + }; + + add_join_condition_columns(&join, &mut left_live, &mut right_live)?; + + let (left_duplicate_insensitive, right_duplicate_insensitive) = + child_duplicate_insensitivity(rewritten_join_type, duplicate_insensitive); + + let left = rewrite_subtree( + Arc::unwrap_or_clone(join.left), + left_live, + left_duplicate_insensitive, + )?; + let right = rewrite_subtree( + Arc::unwrap_or_clone(join.right), + right_live, + right_duplicate_insensitive, + )?; + + let changed = + left.transformed || right.transformed || rewritten_join_type != join.join_type; + let left = Arc::new(left.data); + let right = Arc::new(right.data); + + if changed { + // The join type or an input changed, so the output schema may have + // narrowed; recompute it via `try_new`. + Ok(Transformed::yes(LogicalPlan::Join(Join::try_new( + left, + right, + join.on, + join.filter, + rewritten_join_type, + join.join_constraint, + join.null_equality, + join.null_aware, + )?))) + } else { + // Nothing changed; reassemble the join reusing its existing schema rather + // than recomputing it (as `rewrite_single_input` does for other nodes). + Ok(Transformed::no(LogicalPlan::Join(Join { + left, + right, + on: join.on, + filter: join.filter, + join_type: join.join_type, + join_constraint: join.join_constraint, + schema: join.schema, + null_equality: join.null_equality, + null_aware: join.null_aware, + }))) + } +} + +/// Returns which join inputs can safely ignore duplicate rows from their own +/// descendants. For semi/anti/mark joins, duplicates from the existence side do +/// not change the result even when the parent itself is duplicate-sensitive. +fn child_duplicate_insensitivity( + join_type: JoinType, + duplicate_insensitive: bool, +) -> (bool, bool) { + match join_type { + JoinType::Inner => (duplicate_insensitive, duplicate_insensitive), + JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => { + (duplicate_insensitive, true) + } + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => { + (true, duplicate_insensitive) + } + JoinType::Left | JoinType::Right | JoinType::Full => (false, false), + } +} + +/// Rewrites an inner join to a semi join when the removed side has no +/// parent-visible columns and either the parent ignores duplicate output rows or +/// the removed side is unique on the join keys. +fn rewritten_join_type( + join: &Join, + visible_left: &LiveColumns, + visible_right: &LiveColumns, + duplicate_insensitive: bool, +) -> JoinType { + if join.join_type != JoinType::Inner || join.on.is_empty() { + return join.join_type; + } + + let can_remove_right = duplicate_insensitive + || side_unique_on_join( + join.right.schema(), + join.on.iter().map(|(_, right)| right), + join.null_equality, + ); + if visible_right.is_empty() && can_remove_right { + return JoinType::LeftSemi; + } + + let can_remove_left = duplicate_insensitive + || side_unique_on_join( + join.left.schema(), + join.on.iter().map(|(left, _)| left), + join.null_equality, + ); + if visible_left.is_empty() && can_remove_left { + return JoinType::RightSemi; + } + + JoinType::Inner +} + +fn add_join_condition_columns( + join: &Join, + left_live: &mut LiveColumns, + right_live: &mut LiveColumns, +) -> Result<()> { + for (left_expr, right_expr) in &join.on { + extend_live_columns(left_live, left_expr, join.left.schema())?; + extend_live_columns(right_live, right_expr, join.right.schema())?; + } + + if let Some(filter) = &join.filter { + extend_live_columns(left_live, filter, join.left.schema())?; + extend_live_columns(right_live, filter, join.right.schema())?; + } + + Ok(()) +} + +fn split_join_output_columns( + join: &Join, + live: &LiveColumns, +) -> (LiveColumns, LiveColumns) { + let left_len = join.left.schema().fields().len(); + match join.join_type { + JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { + split_columns_at(live, left_len) + } + JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => { + let left = live.iter().copied().filter(|idx| *idx < left_len).collect(); + (left, LiveColumns::new()) + } + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => { + let right = live.iter().copied().collect(); + (LiveColumns::new(), right) + } + } +} + +fn split_columns_at(live: &LiveColumns, left_len: usize) -> (LiveColumns, LiveColumns) { + let mut left = LiveColumns::new(); + let mut right = LiveColumns::new(); + for idx in live { + if *idx < left_len { + left.insert(*idx); + } else { + right.insert(*idx - left_len); } } + (left, right) +} + +fn side_unique_on_join<'a>( + schema: &DFSchema, + join_exprs: impl Iterator, + null_equality: NullEquality, +) -> bool { + let join_key_indices = join_exprs + .filter_map(|expr| match expr { + Expr::Alias(alias) => alias.expr.as_ref().try_as_col(), + _ => expr.try_as_col(), + }) + .filter_map(|column| schema.maybe_index_of_column(column)) + .collect::>(); + + schema.functional_dependencies().iter().any(|dependency| { + dependency.mode == Dependency::Single + && (!dependency.nullable || null_equality == NullEquality::NullEqualsNothing) + && dependency + .source_indices + .iter() + .all(|idx| join_key_indices.contains(idx)) + }) +} + +fn all_columns(schema: &DFSchema) -> LiveColumns { + (0..schema.fields().len()).collect() +} + +fn live_columns_for_exprs(exprs: &[Expr], schema: &DFSchema) -> Result { + let mut live = LiveColumns::new(); + extend_live_columns_for_exprs(&mut live, exprs, schema)?; + Ok(live) +} + +fn extend_live_columns_for_exprs( + live: &mut LiveColumns, + exprs: &[Expr], + schema: &DFSchema, +) -> Result<()> { + for expr in exprs { + extend_live_columns(live, expr, schema)?; + } + Ok(()) +} - fn supports_rewrite(&self) -> bool { - true +fn extend_live_columns_for_sort_exprs( + live: &mut LiveColumns, + exprs: &[SortExpr], + schema: &DFSchema, +) -> Result<()> { + for sort_expr in exprs { + extend_live_columns(live, &sort_expr.expr, schema)?; } + Ok(()) +} + +fn extend_live_columns( + live: &mut LiveColumns, + expr: &Expr, + schema: &DFSchema, +) -> Result<()> { + for_each_referenced_index(expr, schema, |idx| { + live.insert(idx); + }) } #[cfg(test)] @@ -77,9 +573,17 @@ mod tests { use crate::OptimizerContext; use crate::assert_optimized_plan_eq_snapshot; use crate::eliminate_join::EliminateJoin; - use datafusion_common::Result; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::{Constraint, Constraints, NullEquality, Result}; use datafusion_expr::JoinType::Inner; - use datafusion_expr::{lit, logical_plan::builder::LogicalPlanBuilder}; + use datafusion_expr::{ + Expr, col, exists, lit, + logical_plan::builder::{ + LogicalPlanBuilder, table_scan, table_source_with_constraints, + }, + out_ref_col, + }; + use datafusion_functions_aggregate::expr_fn::count; use std::sync::Arc; macro_rules! assert_optimized_plan_equal { @@ -110,4 +614,417 @@ mod tests { assert_optimized_plan_equal!(plan, @"EmptyRelation: rows=0") } + + #[test] + fn inner_to_left_semi_when_removed_side_is_unique() -> Result<()> { + let plan = left_join_right_with_constraints(primary_key_on_id())? + .project(vec![col("l.x")])? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Projection: l.x + LeftSemi Join: l.id = r.id + TableScan: l + TableScan: r + ") + } + + #[test] + fn inner_to_right_semi_when_removed_side_is_unique() -> Result<()> { + let plan = left_with_constraints_join_right(primary_key_on_id())? + .project(vec![col("r.y")])? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Projection: r.y + RightSemi Join: l.id = r.id + TableScan: l + TableScan: r + ") + } + + #[test] + fn inner_to_left_semi_for_duplicate_insensitive_parent() -> Result<()> { + let plan = left_join_right()? + .aggregate(vec![col("l.x")], Vec::::new())? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Aggregate: groupBy=[[l.x]], aggr=[[]] + LeftSemi Join: l.id = r.id + TableScan: l + TableScan: r + ") + } + + #[test] + fn aggregate_can_rewrite_when_removed_side_is_unique() -> Result<()> { + let plan = left_join_right_with_constraints(primary_key_on_id())? + .aggregate(vec![col("l.x")], vec![count(col("l.id"))])? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Aggregate: groupBy=[[l.x]], aggr=[[count(l.id)]] + LeftSemi Join: l.id = r.id + TableScan: l + TableScan: r + ") + } + + #[test] + fn duplicate_insensitive_context_propagates_through_join_tree() -> Result<()> { + let left = scan("l", &test_schema(), Constraints::default())?; + let middle = scan("m", &test_schema(), Constraints::default())?; + let right = scan("r", &test_schema(), Constraints::default())?; + + let left_join_middle = LogicalPlanBuilder::from(left) + .join(middle, Inner, (vec!["l.id"], vec!["m.id"]), None)? + .build()?; + + let plan = LogicalPlanBuilder::from(left_join_middle) + .join(right, Inner, (vec!["l.id"], vec!["r.id"]), None)? + .aggregate(vec![col("l.x")], Vec::::new())? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Aggregate: groupBy=[[l.x]], aggr=[[]] + LeftSemi Join: l.id = r.id + LeftSemi Join: l.id = m.id + TableScan: l + TableScan: m + TableScan: r + ") + } + + #[test] + fn projection_does_not_rewrite_without_uniqueness() -> Result<()> { + let plan = left_join_right()?.project(vec![col("l.x")])?.build()?; + + assert_optimized_plan_equal!(plan, @r" + Projection: l.x + Inner Join: l.id = r.id + TableScan: l + TableScan: r + ") + } + + #[test] + fn required_filter_column_prevents_duplicate_insensitive_rewrite() -> Result<()> { + let plan = left_join_right()? + .filter(col("r.y").gt(lit(10_i32)))? + .aggregate(vec![col("l.x")], Vec::::new())? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Aggregate: groupBy=[[l.x]], aggr=[[]] + Filter: r.y > Int32(10) + Inner Join: l.id = r.id + TableScan: l + TableScan: r + ") + } + + #[test] + fn distinct_does_not_rewrite_without_uniqueness() -> Result<()> { + // `SELECT DISTINCT *` deduplicates on every join-output column, including + // the right side's. With a non-unique right side the inner join can + // multiply left rows into distinct `(l, r)` combinations, so the join + // must not be rewritten to a semi join (which would drop the right + // columns from the DISTINCT key and undercount the result). + let plan = left_join_right()? + .distinct()? + .project(vec![col("l.x")])? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Projection: l.x + Distinct: + Inner Join: l.id = r.id + TableScan: l + TableScan: r + ") + } + + #[test] + fn distinct_rewrites_when_removed_side_is_unique() -> Result<()> { + // When the removed side is unique on the join keys, its columns are + // functionally determined by the preserved join keys and so are + // redundant in the DISTINCT key. The inner join can be rewritten to a + // semi join even under `SELECT DISTINCT *`. + let plan = left_join_right_with_constraints(primary_key_on_id())? + .distinct()? + .project(vec![col("l.x")])? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Projection: l.x + Distinct: + LeftSemi Join: l.id = r.id + TableScan: l + TableScan: r + ") + } + + #[test] + fn correlated_subquery_outer_ref_prevents_rewrite() -> Result<()> { + // The aggregate makes the parent duplicate-insensitive, so absent any + // other use of the right side the join would collapse to a semi join. + // But the `EXISTS` subquery correlates on `r.y`, so the right side is + // still needed and the join must stay an inner join. Otherwise the + // semi join would drop `r`, orphaning the correlated `r.y` reference. + let subquery = + LogicalPlanBuilder::from(scan("s", &test_schema(), Constraints::default())?) + .filter(col("s.id").eq(out_ref_col(DataType::Int32, "r.y")))? + .project(vec![lit(1)])? + .build()?; + + let plan = left_join_right()? + .filter(exists(Arc::new(subquery)))? + .aggregate(vec![col("l.x")], Vec::::new())? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Aggregate: groupBy=[[l.x]], aggr=[[]] + Filter: EXISTS () + Subquery: + Projection: Int32(1) + Filter: s.id = outer_ref(r.y) + TableScan: s + Inner Join: l.id = r.id + TableScan: l + TableScan: r + ") + } + + #[test] + fn inner_to_semi_inside_uncorrelated_subquery() -> Result<()> { + // A join nested inside a (not-yet-decorrelated) subquery is still + // rewritten, because `rewrite_subtree` descends into subquery plans + // itself via `map_subqueries`. Here the subquery's projection keeps + // only `l.x` and the removed side `r` is unique (PK), so the inner join + // collapses to a semi join. + let subquery = left_join_right_with_constraints(primary_key_on_id())? + .project(vec![col("l.x")])? + .build()?; + + let plan = LogicalPlanBuilder::from(scan( + "outer", + &test_schema(), + Constraints::default(), + )?) + .filter(exists(Arc::new(subquery)))? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Filter: EXISTS () + Subquery: + Projection: l.x + LeftSemi Join: l.id = r.id + TableScan: l + TableScan: r + TableScan: outer + ") + } + + #[test] + fn inner_to_semi_inside_correlated_subquery() -> Result<()> { + // `map_subqueries` descends into correlated subqueries too, not just + // uncorrelated ones, so a join inside one is still rewritten. The + // subquery correlates on `outer.id` (via the filter), but that reference + // and the projection touch only `l`; `r` is unique (PK) and unreferenced, + // so the inner join inside the subquery collapses to a semi join. + let subquery = left_join_right_with_constraints(primary_key_on_id())? + .filter(col("l.x").eq(out_ref_col(DataType::Int32, "outer.id")))? + .project(vec![col("l.x")])? + .build()?; + + let plan = LogicalPlanBuilder::from(scan( + "outer", + &test_schema(), + Constraints::default(), + )?) + .filter(exists(Arc::new(subquery)))? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Filter: EXISTS () + Subquery: + Projection: l.x + Filter: l.x = outer_ref(outer.id) + LeftSemi Join: l.id = r.id + TableScan: l + TableScan: r + TableScan: outer + ") + } + + #[test] + fn nullable_unique_rewrites_under_null_equals_nothing() -> Result<()> { + // A `UNIQUE` (rather than `PRIMARY KEY`) constraint marks the key as + // nullable. Under the default `NullEqualsNothing` join semantics a null + // key matches nothing, so a unique side still yields at most one match + // per left row and the inner join can become a semi join. + let left = scan("l", &test_schema(), Constraints::default())?; + let right = scan("r", &test_schema(), unique_on_x())?; + let plan = LogicalPlanBuilder::from(left) + .join(right, Inner, (vec!["l.x"], vec!["r.x"]), None)? + .project(vec![col("l.id")])? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Projection: l.id + LeftSemi Join: l.x = r.x + TableScan: l + TableScan: r + ") + } + + #[test] + fn nullable_unique_does_not_rewrite_under_null_equals_null() -> Result<()> { + // With `NullEqualsNull` semantics two null keys compare equal, so a + // nullable `UNIQUE` key no longer guarantees at most one match per left + // row: several null-keyed right rows could match a null-keyed left row. + // Uniqueness on the join keys is therefore not established and the inner + // join must be preserved. + let left = scan("l", &test_schema(), Constraints::default())?; + let right = scan("r", &test_schema(), unique_on_x())?; + let plan = LogicalPlanBuilder::from(left) + .join_detailed( + right, + Inner, + (vec!["l.x"], vec!["r.x"]), + None, + NullEquality::NullEqualsNull, + )? + .project(vec![col("l.id")])? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Projection: l.id + Inner Join: l.x = r.x + TableScan: l + TableScan: r + ") + } + + #[test] + fn composite_unique_rewrites_when_join_covers_all_key_columns() -> Result<()> { + // The removed side is unique on the composite key `(id, x)`. The join + // equates both key columns, so each left row matches at most one right + // row and the inner join can become a semi join. + let left = scan("l", &test_schema(), Constraints::default())?; + let right = scan("r", &test_schema(), composite_primary_key_on_id_x())?; + let plan = LogicalPlanBuilder::from(left) + .join( + right, + Inner, + (vec!["l.id", "l.x"], vec!["r.id", "r.x"]), + None, + )? + .project(vec![col("l.y")])? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Projection: l.y + LeftSemi Join: l.id = r.id, l.x = r.x + TableScan: l + TableScan: r + ") + } + + #[test] + fn composite_unique_does_not_rewrite_when_join_misses_a_key_column() -> Result<()> { + // The removed side is unique only on the *composite* key `(id, x)`. The + // join equates `id` but not `x`, so a left row may match many right rows + // (those sharing its `id` but differing in `x`). Uniqueness on the join + // keys is not established, so the inner join must be preserved. This + // guards the requirement that the join cover *every* column of the + // unique key, not just some. + let left = scan("l", &test_schema(), Constraints::default())?; + let right = scan("r", &test_schema(), composite_primary_key_on_id_x())?; + let plan = LogicalPlanBuilder::from(left) + .join(right, Inner, (vec!["l.id"], vec!["r.id"]), None)? + .project(vec![col("l.y")])? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Projection: l.y + Inner Join: l.id = r.id + TableScan: l + TableScan: r + ") + } + + fn left_join_right() -> Result { + left_join_right_with_constraints(Constraints::default()) + } + + fn left_join_right_with_constraints( + right_constraints: Constraints, + ) -> Result { + let left = scan("l", &test_schema(), Constraints::default())?; + let right = scan("r", &test_schema(), right_constraints)?; + + LogicalPlanBuilder::from(left).join( + right, + Inner, + (vec!["l.id"], vec!["r.id"]), + None, + ) + } + + fn left_with_constraints_join_right( + left_constraints: Constraints, + ) -> Result { + let left = scan("l", &test_schema(), left_constraints)?; + let right = scan("r", &test_schema(), Constraints::default())?; + + LogicalPlanBuilder::from(left).join( + right, + Inner, + (vec!["l.id"], vec!["r.id"]), + None, + ) + } + + fn scan( + name: &str, + schema: &Schema, + constraints: Constraints, + ) -> Result { + if constraints.is_empty() { + table_scan(Some(name), schema, None)?.build() + } else { + LogicalPlanBuilder::scan( + name, + table_source_with_constraints(schema, constraints), + None, + )? + .build() + } + } + + fn test_schema() -> Schema { + Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("x", DataType::Int32, true), + Field::new("y", DataType::Int32, true), + ]) + } + + fn primary_key_on_id() -> Constraints { + Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]) + } + + /// A nullable unique key on column `x` (index 1). `Unique` (unlike + /// `PrimaryKey`) marks the dependency as nullable, which is what gates the + /// rewrite on the join's `null_equality`. + fn unique_on_x() -> Constraints { + Constraints::new_unverified(vec![Constraint::Unique(vec![1])]) + } + + /// A composite primary key spanning columns `id` and `x` (indices 0 and 1). + fn composite_primary_key_on_id_x() -> Constraints { + Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0, 1])]) + } } diff --git a/datafusion/optimizer/src/optimize_projections/required_indices.rs b/datafusion/optimizer/src/optimize_projections/required_indices.rs index 5e73a9fbeceda..33f0d48721a8b 100644 --- a/datafusion/optimizer/src/optimize_projections/required_indices.rs +++ b/datafusion/optimizer/src/optimize_projections/required_indices.rs @@ -17,7 +17,8 @@ //! [`RequiredIndices`] helper for OptimizeProjection -use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; +use crate::utils::for_each_referenced_index; +use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Column, DFSchemaRef, Result}; use datafusion_expr::{Expr, LogicalPlan}; @@ -112,29 +113,8 @@ impl RequiredIndices { /// * `input_schema`: The input schema to analyze for index requirements. /// * `expr`: An expression for which we want to find necessary field indices. fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) { - // `apply` does not descend into subqueries, so recurse manually to - // handle those cases. - expr.apply(|e| { - match e { - Expr::Column(c) | Expr::OuterReferenceColumn(_, c) => { - if let Some(idx) = input_schema.maybe_index_of_column(c) { - self.indices.push(idx); - } - } - Expr::ScalarSubquery(sub) => { - self.add_exprs(input_schema, &sub.outer_ref_columns); - } - Expr::Exists(ex) => { - self.add_exprs(input_schema, &ex.subquery.outer_ref_columns); - } - Expr::InSubquery(isq) => { - self.add_exprs(input_schema, &isq.subquery.outer_ref_columns); - } - _ => {} - } - Ok(TreeNodeRecursion::Continue) - }) - .expect("traversal is infallible"); + for_each_referenced_index(expr, input_schema, |idx| self.indices.push(idx)) + .expect("traversal is infallible"); } /// Like [`Self::add_expr`], but for multiple expressions. diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs index ad151d1ddb8e0..b29649e9ead49 100644 --- a/datafusion/optimizer/src/utils.rs +++ b/datafusion/optimizer/src/utils.rs @@ -24,9 +24,10 @@ use arrow::array::{Array, RecordBatch, new_null_array}; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::TableReference; use datafusion_common::cast::as_boolean_array; -use datafusion_common::tree_node::{TransformedResult, TreeNode}; +use datafusion_common::tree_node::{TransformedResult, TreeNode, TreeNodeRecursion}; use datafusion_common::{Column, DFSchema, Result, ScalarValue}; use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::expr::{Exists, InSubquery, SetComparison}; use datafusion_expr::expr_rewriter::replace_col; use datafusion_expr::{ColumnarValue, Expr, logical_plan::LogicalPlan}; use datafusion_physical_expr::create_physical_expr; @@ -37,6 +38,56 @@ use std::sync::Arc; /// as it was initially placed here and then moved elsewhere. pub use datafusion_expr::expr_rewriter::NamePreserver; +/// Invokes `f` with the index, within `schema`, of every column referenced by +/// `expr` — including columns reached through a correlated subquery's outer +/// references. Columns absent from `schema` are skipped. +/// +/// A subquery's own plan is intentionally not traversed: its internal columns +/// index into its own schema, not `schema`; only the outer (correlated) columns +/// it references from `schema` are relevant. The comparison expression of an +/// `IN`/set-comparison subquery is reached by the normal expression walk. +/// +/// This is the shared primitive behind the top-down "which of a node's output +/// columns does an ancestor still need" analyses, namely +/// [`OptimizeProjections`](crate::optimize_projections::OptimizeProjections) +/// and [`EliminateJoin`](crate::eliminate_join::EliminateJoin). The two keep +/// their own required-index containers (an ordered set vs. a hash set), so this +/// reports indices through a callback rather than populating a shared type. +pub(crate) fn for_each_referenced_index( + expr: &Expr, + schema: &DFSchema, + mut f: impl FnMut(usize), +) -> Result<()> { + visit_referenced_indices(expr, schema, &mut f) +} + +fn visit_referenced_indices( + expr: &Expr, + schema: &DFSchema, + f: &mut dyn FnMut(usize), +) -> Result<()> { + expr.apply(|expr| { + match expr { + Expr::Column(column) | Expr::OuterReferenceColumn(_, column) => { + if let Some(idx) = schema.maybe_index_of_column(column) { + f(idx); + } + } + Expr::Exists(Exists { subquery, .. }) + | Expr::InSubquery(InSubquery { subquery, .. }) + | Expr::SetComparison(SetComparison { subquery, .. }) + | Expr::ScalarSubquery(subquery) => { + for outer in &subquery.outer_ref_columns { + visit_referenced_indices(outer, schema, f)?; + } + } + _ => {} + } + Ok(TreeNodeRecursion::Continue) + })?; + Ok(()) +} + /// Returns true if `expr` contains all columns in `schema_cols` pub(crate) fn has_all_column_refs( expr: &Expr, diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index e0be63fe71525..9966a607d9971 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -1333,19 +1333,57 @@ inner join join_t2 on join_t1.t1_id = join_t2.t2_id ---- logical_plan 01)Aggregate: groupBy=[[join_t1.t1_id]], aggr=[[]] -02)--Projection: join_t1.t1_id -03)----Inner Join: join_t1.t1_id = join_t2.t2_id -04)------TableScan: join_t1 projection=[t1_id] -05)------TableScan: join_t2 projection=[t2_id] +02)--LeftSemi Join: join_t1.t1_id = join_t2.t2_id +03)----TableScan: join_t1 projection=[t1_id] +04)----TableScan: join_t2 projection=[t2_id] physical_plan 01)AggregateExec: mode=FinalPartitioned, gby=[t1_id@0 as t1_id], aggr=[] 02)--RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 03)----AggregateExec: mode=Partial, gby=[t1_id@0 as t1_id], aggr=[] -04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1_id@0, t2_id@0)], projection=[t1_id@0] +04)------HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)] 05)--------DataSourceExec: partitions=1, partition_sizes=[1] 06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 07)----------DataSourceExec: partitions=1, partition_sizes=[1] +statement ok +set datafusion.explain.logical_plan_only = true; + +# A single `count(DISTINCT col)` over a join whose other side is used only as an +# existence filter can be rewritten to a semi join. +query TT +EXPLAIN +select join_t1.t1_id, count(distinct join_t1.t1_int) +from join_t1 +inner join join_t2 on join_t1.t1_id = join_t2.t2_id +group by join_t1.t1_id +---- +logical_plan +01)Projection: join_t1.t1_id, count(alias1) AS count(DISTINCT join_t1.t1_int) +02)--Aggregate: groupBy=[[join_t1.t1_id]], aggr=[[count(alias1)]] +03)----Aggregate: groupBy=[[join_t1.t1_id, join_t1.t1_int AS alias1]], aggr=[[]] +04)------LeftSemi Join: join_t1.t1_id = join_t2.t2_id +05)--------TableScan: join_t1 projection=[t1_id, t1_int] +06)--------TableScan: join_t2 projection=[t2_id] + +# A similar query with two DISTINCT aggregates is currently not rewritten +# TODO: https://github.com/apache/datafusion/issues/22644 +query TT +EXPLAIN +select join_t1.t1_id, count(distinct join_t1.t1_int), count(distinct join_t1.t1_name) +from join_t1 +inner join join_t2 on join_t1.t1_id = join_t2.t2_id +group by join_t1.t1_id +---- +logical_plan +01)Aggregate: groupBy=[[join_t1.t1_id]], aggr=[[count(DISTINCT join_t1.t1_int), count(DISTINCT join_t1.t1_name)]] +02)--Projection: join_t1.t1_id, join_t1.t1_name, join_t1.t1_int +03)----Inner Join: join_t1.t1_id = join_t2.t2_id +04)------TableScan: join_t1 projection=[t1_id, t1_name, t1_int] +05)------TableScan: join_t2 projection=[t2_id] + +statement ok +set datafusion.explain.logical_plan_only = false; + # Join on struct query TT explain select join_t3.s3, join_t4.s4 @@ -1411,10 +1449,9 @@ logical_plan 01)Projection: count(alias1) AS count(DISTINCT join_t1.t1_id) 02)--Aggregate: groupBy=[[]], aggr=[[count(alias1)]] 03)----Aggregate: groupBy=[[join_t1.t1_id AS alias1]], aggr=[[]] -04)------Projection: join_t1.t1_id -05)--------Inner Join: join_t1.t1_id = join_t2.t2_id -06)----------TableScan: join_t1 projection=[t1_id] -07)----------TableScan: join_t2 projection=[t2_id] +04)------LeftSemi Join: join_t1.t1_id = join_t2.t2_id +05)--------TableScan: join_t1 projection=[t1_id] +06)--------TableScan: join_t2 projection=[t2_id] physical_plan 01)ProjectionExec: expr=[count(alias1)@0 as count(DISTINCT join_t1.t1_id)] 02)--AggregateExec: mode=Final, gby=[], aggr=[count(alias1)] @@ -1423,7 +1460,7 @@ physical_plan 05)--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[] 06)----------RepartitionExec: partitioning=Hash([alias1@0], 2), input_partitions=2 07)------------AggregateExec: mode=Partial, gby=[t1_id@0 as alias1], aggr=[] -08)--------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1_id@0, t2_id@0)], projection=[t1_id@0] +08)--------------HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)] 09)----------------DataSourceExec: partitions=1, partition_sizes=[1] 10)----------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 11)------------------DataSourceExec: partitions=1, partition_sizes=[1] diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index dd195b0ff4871..745718ee32fc6 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -338,13 +338,13 @@ where c_acctbal < ( logical_plan 01)Sort: customer.c_custkey ASC NULLS LAST 02)--Projection: customer.c_custkey -03)----Inner Join: customer.c_custkey = __scalar_sq_1.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_1.sum(orders.o_totalprice) +03)----LeftSemi Join: customer.c_custkey = __scalar_sq_1.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_1.sum(orders.o_totalprice) 04)------TableScan: customer projection=[c_custkey, c_acctbal] 05)------SubqueryAlias: __scalar_sq_1 06)--------Projection: sum(orders.o_totalprice), orders.o_custkey 07)----------Aggregate: groupBy=[[orders.o_custkey]], aggr=[[sum(orders.o_totalprice)]] 08)------------Projection: orders.o_custkey, orders.o_totalprice -09)--------------Inner Join: orders.o_orderkey = __scalar_sq_2.l_orderkey Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < __scalar_sq_2.price +09)--------------LeftSemi Join: orders.o_orderkey = __scalar_sq_2.l_orderkey Filter: CAST(orders.o_totalprice AS Decimal128(25, 2)) < __scalar_sq_2.price 10)----------------TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] 11)----------------SubqueryAlias: __scalar_sq_2 12)------------------Projection: sum(lineitem.l_extendedprice) AS price, lineitem.l_orderkey @@ -555,7 +555,7 @@ logical_plan 02)--TableScan: t0 projection=[t0_id, t0_name] 03)--SubqueryAlias: __correlated_sq_2 04)----Projection: t1.t1_name -05)------Inner Join: t1.t1_id = t2.t2_id +05)------LeftSemi Join: t1.t1_id = t2.t2_id 06)--------TableScan: t1 projection=[t1_id, t1_name] 07)--------TableScan: t2 projection=[t2_id] @@ -568,7 +568,7 @@ logical_plan 02)--TableScan: t0 projection=[t0_id, t0_name] 03)--SubqueryAlias: __correlated_sq_1 04)----Projection: t2.t2_name -05)------Inner Join: t1.t1_id = t2.t2_id +05)------RightSemi Join: t1.t1_id = t2.t2_id 06)--------TableScan: t1 projection=[t1_id] 07)--------SubqueryAlias: t2 08)----------TableScan: t2 projection=[t2_id, t2_name] @@ -1675,7 +1675,7 @@ where c_acctbal < ( logical_plan 01)Sort: customer.c_custkey ASC NULLS LAST 02)--Projection: customer.c_custkey -03)----Inner Join: customer.c_custkey = __scalar_sq_2.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_2.sum(orders.o_totalprice) +03)----LeftSemi Join: customer.c_custkey = __scalar_sq_2.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_2.sum(orders.o_totalprice) 04)------TableScan: customer projection=[c_custkey, c_acctbal] 05)------SubqueryAlias: __scalar_sq_2 06)--------Projection: sum(orders.o_totalprice), orders.o_custkey @@ -1701,7 +1701,7 @@ where c_acctbal < ( logical_plan 01)Sort: customer.c_custkey ASC NULLS LAST 02)--Projection: customer.c_custkey -03)----Inner Join: customer.c_custkey = __scalar_sq_2.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_2.sum(orders.o_totalprice) +03)----LeftSemi Join: customer.c_custkey = __scalar_sq_2.o_custkey Filter: CAST(customer.c_acctbal AS Decimal128(25, 2)) < __scalar_sq_2.sum(orders.o_totalprice) 04)------TableScan: customer projection=[c_custkey, c_acctbal] 05)------SubqueryAlias: __scalar_sq_2 06)--------Projection: sum(orders.o_totalprice), orders.o_custkey @@ -1746,7 +1746,7 @@ WHERE e1.salary > ( ---- logical_plan 01)Projection: e1.employee_name, e1.salary -02)--Inner Join: e1.dept_id = __scalar_sq_1.dept_id Filter: CAST(e1.salary AS Decimal128(38, 14)) > __scalar_sq_1.avg(e2.salary) +02)--LeftSemi Join: e1.dept_id = __scalar_sq_1.dept_id Filter: CAST(e1.salary AS Decimal128(38, 14)) > __scalar_sq_1.avg(e2.salary) 03)----SubqueryAlias: e1 04)------TableScan: employees projection=[employee_name, dept_id, salary] 05)----SubqueryAlias: __scalar_sq_1 From 0210a4c9573e0e3c5459726c4e5caaf248934b4f Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Sat, 30 May 2026 14:14:19 -0400 Subject: [PATCH 2/6] Adjust DISTINCT handling --- datafusion/optimizer/src/eliminate_join.rs | 56 +++++++++++++++------- 1 file changed, 40 insertions(+), 16 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_join.rs b/datafusion/optimizer/src/eliminate_join.rs index 72fce8052cee9..f9ad8ad809d0d 100644 --- a/datafusion/optimizer/src/eliminate_join.rs +++ b/datafusion/optimizer/src/eliminate_join.rs @@ -190,14 +190,14 @@ fn rewrite_node( ) } LogicalPlan::Distinct(Distinct::All(input)) => { - // `SELECT DISTINCT *` deduplicates on every input column, so a join - // side below is part of the DISTINCT key and cannot be dropped just - // because duplicate rows are collapsed. Recurse with - // `duplicate_insensitive = false` so a join is only rewritten to a - // semi join when the removed side is provably unique on the join - // keys: uniqueness makes that side's columns functionally determined - // by the preserved join keys, hence redundant in the DISTINCT key. - rewrite_single_input(input, live, false, |input| { + // `SELECT DISTINCT *` is exactly a no-aggregate `GROUP BY` over every + // input column, so the input is duplicate-insensitive — but every + // column is part of the dedup key. Marking all columns live keeps any + // join side whose columns reach the DISTINCT (dropping them would + // change the distinct count); only a side projected away below the + // DISTINCT falls outside the key and can collapse to a semi join. + let child_live = all_columns(input.schema()); + rewrite_single_input(input, child_live, true, |input| { Ok(LogicalPlan::Distinct(Distinct::All(input))) }) } @@ -208,7 +208,10 @@ fn rewrite_node( input, schema, })) => { - // Narrows `live` to the ON/SELECT/ORDER BY columns; stays duplicate-sensitive. + // `DISTINCT ON (on) select [ORDER BY sort]` is a no-aggregate + // `GROUP BY` on the columns it reads, so its input is duplicate- + // insensitive; the live columns are exactly those of the + // ON/SELECT/ORDER BY expressions. let mut child_live = live_columns_for_exprs(&on_expr, input.schema())?; extend_live_columns_for_exprs(&mut child_live, &select_expr, input.schema())?; if let Some(sort_expr) = &sort_expr { @@ -219,7 +222,7 @@ fn rewrite_node( )?; } - rewrite_single_input(input, child_live, false, |input| { + rewrite_single_input(input, child_live, true, |input| { Ok(LogicalPlan::Distinct(Distinct::On(DistinctOn { on_expr, select_expr, @@ -725,7 +728,7 @@ mod tests { } #[test] - fn distinct_does_not_rewrite_without_uniqueness() -> Result<()> { + fn distinct_star_keeps_unreferenced_side() -> Result<()> { // `SELECT DISTINCT *` deduplicates on every join-output column, including // the right side's. With a non-unique right side the inner join can // multiply left rows into distinct `(l, r)` combinations, so the join @@ -746,11 +749,12 @@ mod tests { } #[test] - fn distinct_rewrites_when_removed_side_is_unique() -> Result<()> { - // When the removed side is unique on the join keys, its columns are - // functionally determined by the preserved join keys and so are - // redundant in the DISTINCT key. The inner join can be rewritten to a - // semi join even under `SELECT DISTINCT *`. + fn distinct_star_keeps_side_even_when_unique() -> Result<()> { + // `SELECT DISTINCT *` keys on every column, including the right side's, so + // the right side stays even when it is unique on the join keys — its + // columns are part of the DISTINCT key regardless. This matches a + // no-aggregate `GROUP BY` over all columns; dropping a redundant unique + // key would be a separate, functional-dependency-based simplification. let plan = left_join_right_with_constraints(primary_key_on_id())? .distinct()? .project(vec![col("l.x")])? @@ -759,6 +763,26 @@ mod tests { assert_optimized_plan_equal!(plan, @r" Projection: l.x Distinct: + Inner Join: l.id = r.id + TableScan: l + TableScan: r + ") + } + + #[test] + fn distinct_drops_unreferenced_side_when_projected() -> Result<()> { + // `SELECT DISTINCT l.x` projects the right side away below the DISTINCT, + // leaving it outside the dedup key. Like a no-aggregate `GROUP BY l.x`, + // the DISTINCT makes the input duplicate-insensitive, so the inner join + // collapses to a semi join even though the right side is not unique. + let plan = left_join_right()? + .project(vec![col("l.x")])? + .distinct()? + .build()?; + + assert_optimized_plan_equal!(plan, @r" + Distinct: + Projection: l.x LeftSemi Join: l.id = r.id TableScan: l TableScan: r From ccb8e1dc783882f45b20fa7f184fd8d2b71f369f Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Sat, 30 May 2026 14:55:34 -0400 Subject: [PATCH 3/6] Update TPC-H fixtures --- datafusion/sqllogictest/test_files/tpch/plans/q2.slt.part | 4 ++-- datafusion/sqllogictest/test_files/tpch/plans/q20.slt.part | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q2.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q2.slt.part index e471c2c23d2e9..b7d926b3aaa53 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q2.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q2.slt.part @@ -65,7 +65,7 @@ limit 10; logical_plan 01)Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST, fetch=10 02)--Projection: supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment -03)----Inner Join: part.p_partkey = __scalar_sq_1.ps_partkey, partsupp.ps_supplycost = __scalar_sq_1.min(partsupp.ps_supplycost) +03)----LeftSemi Join: part.p_partkey = __scalar_sq_1.ps_partkey, partsupp.ps_supplycost = __scalar_sq_1.min(partsupp.ps_supplycost) 04)------Projection: part.p_partkey, part.p_mfgr, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, partsupp.ps_supplycost, nation.n_name 05)--------Inner Join: nation.n_regionkey = region.r_regionkey 06)----------Projection: part.p_partkey, part.p_mfgr, supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, partsupp.ps_supplycost, nation.n_name, nation.n_regionkey @@ -101,7 +101,7 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [s_acctbal@0 DESC, n_name@2 ASC NULLS LAST, s_name@1 ASC NULLS LAST, p_partkey@3 ASC NULLS LAST], fetch=10 02)--SortExec: TopK(fetch=10), expr=[s_acctbal@0 DESC, n_name@2 ASC NULLS LAST, s_name@1 ASC NULLS LAST, p_partkey@3 ASC NULLS LAST], preserve_partitioning=[true] -03)----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@0, ps_partkey@1), (ps_supplycost@7, min(partsupp.ps_supplycost)@0)], projection=[s_acctbal@5, s_name@2, n_name@8, p_partkey@0, p_mfgr@1, s_address@3, s_phone@4, s_comment@6] +03)----HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(p_partkey@0, ps_partkey@1), (ps_supplycost@7, min(partsupp.ps_supplycost)@0)], projection=[s_acctbal@5, s_name@2, n_name@8, p_partkey@0, p_mfgr@1, s_address@3, s_phone@4, s_comment@6] 04)------RepartitionExec: partitioning=Hash([p_partkey@0, ps_supplycost@7], 4), input_partitions=4 05)--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(n_regionkey@9, r_regionkey@0)], projection=[p_partkey@0, p_mfgr@1, s_name@2, s_address@3, s_phone@4, s_acctbal@5, s_comment@6, ps_supplycost@7, n_name@8] 06)----------RepartitionExec: partitioning=Hash([n_regionkey@9], 4), input_partitions=4 diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q20.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q20.slt.part index 76876160e2bb3..814f7844a9611 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q20.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q20.slt.part @@ -67,7 +67,7 @@ logical_plan 09)--------------TableScan: nation projection=[n_nationkey, n_name], partial_filters=[nation.n_name = Utf8View("CANADA")] 10)------SubqueryAlias: __correlated_sq_2 11)--------Projection: partsupp.ps_suppkey -12)----------Inner Join: partsupp.ps_partkey = __scalar_sq_3.l_partkey, partsupp.ps_suppkey = __scalar_sq_3.l_suppkey Filter: CAST(partsupp.ps_availqty AS Float64) > __scalar_sq_3.Float64(0.5) * sum(lineitem.l_quantity) +12)----------LeftSemi Join: partsupp.ps_partkey = __scalar_sq_3.l_partkey, partsupp.ps_suppkey = __scalar_sq_3.l_suppkey Filter: CAST(partsupp.ps_availqty AS Float64) > __scalar_sq_3.Float64(0.5) * sum(lineitem.l_quantity) 13)------------LeftSemi Join: partsupp.ps_partkey = __correlated_sq_1.p_partkey 14)--------------TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty] 15)--------------SubqueryAlias: __correlated_sq_1 @@ -93,7 +93,7 @@ physical_plan 10)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 11)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]}, projection=[n_nationkey, n_name], file_type=csv, has_header=false 12)------RepartitionExec: partitioning=Hash([ps_suppkey@0], 4), input_partitions=4 -13)--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(ps_partkey@0, l_partkey@1), (ps_suppkey@1, l_suppkey@2)], filter=CAST(ps_availqty@0 AS Float64) > Float64(0.5) * sum(lineitem.l_quantity)@1, projection=[ps_suppkey@1] +13)--------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(ps_partkey@0, l_partkey@1), (ps_suppkey@1, l_suppkey@2)], filter=CAST(ps_availqty@0 AS Float64) > Float64(0.5) * sum(lineitem.l_quantity)@1, projection=[ps_suppkey@1] 14)----------RepartitionExec: partitioning=Hash([ps_partkey@0, ps_suppkey@1], 4), input_partitions=4 15)------------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(ps_partkey@0, p_partkey@0)] 16)--------------RepartitionExec: partitioning=Hash([ps_partkey@0], 4), input_partitions=4 From 32aa91774e45d3870f09e4f021ff1f14ba04cc16 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Sun, 31 May 2026 10:06:52 -0400 Subject: [PATCH 4/6] Code cleanup --- datafusion/optimizer/src/eliminate_join.rs | 108 +++++++++------------ 1 file changed, 46 insertions(+), 62 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_join.rs b/datafusion/optimizer/src/eliminate_join.rs index f9ad8ad809d0d..25a8f123d286f 100644 --- a/datafusion/optimizer/src/eliminate_join.rs +++ b/datafusion/optimizer/src/eliminate_join.rs @@ -16,9 +16,7 @@ // under the License. //! [`EliminateJoin`] rewrites inner joins to simpler forms to make them cheaper -//! to evaluate. -//! -//! # What it rewrites +//! to evaluate. We implement two distinct rewrites: //! //! * An inner join can be rewritten to an empty relation if the join condition //! is trivially false. @@ -34,7 +32,7 @@ //! functional dependencies to prove that each L row matches at most one R //! row (R is provably unique on the join keys). //! -//! # How it works +//! # Overview //! //! `rewrite_subtree` walks the plan top-down, threading two pieces of context //! down to each join: @@ -58,10 +56,6 @@ //! node types just forward the context to their single child via //! `rewrite_single_input`; nodes that alter column requirements or //! duplicate-sensitivity (projection, aggregate, sort, ...) adjust it first. -//! -//! Joins nested inside subquery expressions are reached as well: `rewrite_subtree` -//! descends into each node's subquery plans itself (via `map_subqueries`), -//! seeding each as a fresh root. use crate::utils::for_each_referenced_index; use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::tree_node::{Transformed, TreeNode}; @@ -69,7 +63,7 @@ use datafusion_common::{ DFSchema, Dependency, HashSet, NullEquality, Result, ScalarValue, }; use datafusion_expr::{ - Expr, JoinType, SortExpr, + Expr, JoinType, logical_plan::{ Aggregate, Distinct, DistinctOn, EmptyRelation, Filter, Join, Limit, LogicalPlan, Partitioning, Projection, Repartition, Sort, SubqueryAlias, @@ -143,7 +137,7 @@ fn rewrite_node( .. }) => { // Narrows `live` to the columns the projection's expressions reference. - let child_live = live_columns_for_exprs(&expr, input.schema())?; + let child_live = live_columns(&expr, input.schema())?; rewrite_single_input(input, child_live, duplicate_insensitive, |input| { Ok(LogicalPlan::Projection(Projection::try_new_with_schema( expr, input, schema, @@ -155,7 +149,7 @@ fn rewrite_node( }) => { // Adds the predicate's columns to `live` (a side used only by the filter stays live). let mut child_live = live; - extend_live_columns(&mut child_live, &predicate, input.schema())?; + extend_live_columns(&mut child_live, [&predicate], input.schema())?; rewrite_single_input(input, child_live, duplicate_insensitive, |input| { Ok(LogicalPlan::Filter(Filter::new(predicate, input))) }) @@ -168,8 +162,8 @@ fn rewrite_node( .. }) => { // Narrows `live` to the grouping and aggregate expressions' columns. - let mut child_live = live_columns_for_exprs(&group_expr, input.schema())?; - extend_live_columns_for_exprs(&mut child_live, &aggr_expr, input.schema())?; + let child_live = + live_columns(group_expr.iter().chain(&aggr_expr), input.schema())?; // A grouping aggregate with no aggregate functions (`GROUP BY` with // an empty `aggr_expr`) only observes which group-key values exist, @@ -190,12 +184,9 @@ fn rewrite_node( ) } LogicalPlan::Distinct(Distinct::All(input)) => { - // `SELECT DISTINCT *` is exactly a no-aggregate `GROUP BY` over every - // input column, so the input is duplicate-insensitive — but every - // column is part of the dedup key. Marking all columns live keeps any - // join side whose columns reach the DISTINCT (dropping them would - // change the distinct count); only a side projected away below the - // DISTINCT falls outside the key and can collapse to a semi join. + // `SELECT DISTINCT *` is equivalent to a no-aggregate `GROUP BY` + // over every input column, so the input is duplicate-insensitive, + // but every column is part of the dedup key. let child_live = all_columns(input.schema()); rewrite_single_input(input, child_live, true, |input| { Ok(LogicalPlan::Distinct(Distinct::All(input))) @@ -212,12 +203,12 @@ fn rewrite_node( // `GROUP BY` on the columns it reads, so its input is duplicate- // insensitive; the live columns are exactly those of the // ON/SELECT/ORDER BY expressions. - let mut child_live = live_columns_for_exprs(&on_expr, input.schema())?; - extend_live_columns_for_exprs(&mut child_live, &select_expr, input.schema())?; + let mut child_live = + live_columns(on_expr.iter().chain(&select_expr), input.schema())?; if let Some(sort_expr) = &sort_expr { - extend_live_columns_for_sort_exprs( + extend_live_columns( &mut child_live, - sort_expr, + sort_expr.iter().map(|s| &s.expr), input.schema(), )?; } @@ -235,7 +226,11 @@ fn rewrite_node( LogicalPlan::Sort(Sort { expr, input, fetch }) => { // Adds the sort-key columns to `live`. let mut child_live = live; - extend_live_columns_for_sort_exprs(&mut child_live, &expr, input.schema())?; + extend_live_columns( + &mut child_live, + expr.iter().map(|s| &s.expr), + input.schema(), + )?; // A `fetch` (top-N) makes the row count observable, so duplicate- // insensitivity does not survive past it. @@ -269,11 +264,7 @@ fn rewrite_node( let mut child_live = live; match &partitioning_scheme { Partitioning::Hash(exprs, _) | Partitioning::DistributeBy(exprs) => { - extend_live_columns_for_exprs( - &mut child_live, - exprs, - input.schema(), - )?; + extend_live_columns(&mut child_live, exprs, input.schema())?; } Partitioning::RoundRobinBatch(_) => {} } @@ -385,7 +376,7 @@ fn rewrite_join( )?))) } else { // Nothing changed; reassemble the join reusing its existing schema rather - // than recomputing it (as `rewrite_single_input` does for other nodes). + // than recomputing it. Ok(Transformed::no(LogicalPlan::Join(Join { left, right, @@ -460,14 +451,20 @@ fn add_join_condition_columns( left_live: &mut LiveColumns, right_live: &mut LiveColumns, ) -> Result<()> { - for (left_expr, right_expr) in &join.on { - extend_live_columns(left_live, left_expr, join.left.schema())?; - extend_live_columns(right_live, right_expr, join.right.schema())?; - } + extend_live_columns( + left_live, + join.on.iter().map(|(l, _)| l), + join.left.schema(), + )?; + extend_live_columns( + right_live, + join.on.iter().map(|(_, r)| r), + join.right.schema(), + )?; if let Some(filter) = &join.filter { - extend_live_columns(left_live, filter, join.left.schema())?; - extend_live_columns(right_live, filter, join.right.schema())?; + extend_live_columns(left_live, [filter], join.left.schema())?; + extend_live_columns(right_live, [filter], join.right.schema())?; } Ok(()) @@ -533,44 +530,31 @@ fn all_columns(schema: &DFSchema) -> LiveColumns { (0..schema.fields().len()).collect() } -fn live_columns_for_exprs(exprs: &[Expr], schema: &DFSchema) -> Result { +/// The columns of `schema` referenced by any of `exprs`. +fn live_columns<'a>( + exprs: impl IntoIterator, + schema: &DFSchema, +) -> Result { let mut live = LiveColumns::new(); - extend_live_columns_for_exprs(&mut live, exprs, schema)?; + extend_live_columns(&mut live, exprs, schema)?; Ok(live) } -fn extend_live_columns_for_exprs( +/// Inserts into `live` the index, within `schema`, of every column referenced +/// by any of `exprs`. +fn extend_live_columns<'a>( live: &mut LiveColumns, - exprs: &[Expr], + exprs: impl IntoIterator, schema: &DFSchema, ) -> Result<()> { for expr in exprs { - extend_live_columns(live, expr, schema)?; - } - Ok(()) -} - -fn extend_live_columns_for_sort_exprs( - live: &mut LiveColumns, - exprs: &[SortExpr], - schema: &DFSchema, -) -> Result<()> { - for sort_expr in exprs { - extend_live_columns(live, &sort_expr.expr, schema)?; + for_each_referenced_index(expr, schema, |idx| { + live.insert(idx); + })?; } Ok(()) } -fn extend_live_columns( - live: &mut LiveColumns, - expr: &Expr, - schema: &DFSchema, -) -> Result<()> { - for_each_referenced_index(expr, schema, |idx| { - live.insert(idx); - }) -} - #[cfg(test)] mod tests { use crate::OptimizerContext; From 2563c3224adc2ea14ea3e1fa3124af0088dbbdf8 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Sun, 31 May 2026 10:16:02 -0400 Subject: [PATCH 5/6] Code cleanup --- datafusion/optimizer/src/eliminate_join.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_join.rs b/datafusion/optimizer/src/eliminate_join.rs index 25a8f123d286f..ae72d20b9e9bf 100644 --- a/datafusion/optimizer/src/eliminate_join.rs +++ b/datafusion/optimizer/src/eliminate_join.rs @@ -342,18 +342,18 @@ fn rewrite_join( add_join_condition_columns(&join, &mut left_live, &mut right_live)?; - let (left_duplicate_insensitive, right_duplicate_insensitive) = + let (left_dup_insensitive, right_dup_insensitive) = child_duplicate_insensitivity(rewritten_join_type, duplicate_insensitive); let left = rewrite_subtree( Arc::unwrap_or_clone(join.left), left_live, - left_duplicate_insensitive, + left_dup_insensitive, )?; let right = rewrite_subtree( Arc::unwrap_or_clone(join.right), right_live, - right_duplicate_insensitive, + right_dup_insensitive, )?; let changed = @@ -479,13 +479,13 @@ fn split_join_output_columns( JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { split_columns_at(live, left_len) } + // A semi/anti/mark join outputs only the surviving side's columns, with + // the same index space, so `live` passes straight through to that side. JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => { - let left = live.iter().copied().filter(|idx| *idx < left_len).collect(); - (left, LiveColumns::new()) + (live.clone(), LiveColumns::new()) } JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => { - let right = live.iter().copied().collect(); - (LiveColumns::new(), right) + (LiveColumns::new(), live.clone()) } } } From a061e2b06b6d2875357e590a48bc50153f940537 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Sun, 31 May 2026 10:32:48 -0400 Subject: [PATCH 6/6] Test improvements --- datafusion/optimizer/src/eliminate_join.rs | 35 +++++++--------------- 1 file changed, 10 insertions(+), 25 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_join.rs b/datafusion/optimizer/src/eliminate_join.rs index ae72d20b9e9bf..cb21ce08f4d64 100644 --- a/datafusion/optimizer/src/eliminate_join.rs +++ b/datafusion/optimizer/src/eliminate_join.rs @@ -645,14 +645,18 @@ mod tests { } #[test] - fn aggregate_can_rewrite_when_removed_side_is_unique() -> Result<()> { - let plan = left_join_right_with_constraints(primary_key_on_id())? + fn aggregate_with_aggregates_is_not_duplicate_insensitive() -> Result<()> { + // A `GROUP BY` *with* aggregate functions observes how many rows fall in + // each group, so its input is not duplicate-insensitive. With a non-unique + // right side the join must stay an inner join: collapsing it to a semi + // join would drop matching duplicates and undercount `count(l.id)`. + let plan = left_join_right()? .aggregate(vec![col("l.x")], vec![count(col("l.id"))])? .build()?; assert_optimized_plan_equal!(plan, @r" Aggregate: groupBy=[[l.x]], aggr=[[count(l.id)]] - LeftSemi Join: l.id = r.id + Inner Join: l.id = r.id TableScan: l TableScan: r ") @@ -717,7 +721,9 @@ mod tests { // the right side's. With a non-unique right side the inner join can // multiply left rows into distinct `(l, r)` combinations, so the join // must not be rewritten to a semi join (which would drop the right - // columns from the DISTINCT key and undercount the result). + // columns from the DISTINCT key and undercount the result). This holds + // even when the right side is unique on the join keys: its columns are + // part of the DISTINCT key regardless. let plan = left_join_right()? .distinct()? .project(vec![col("l.x")])? @@ -732,27 +738,6 @@ mod tests { ") } - #[test] - fn distinct_star_keeps_side_even_when_unique() -> Result<()> { - // `SELECT DISTINCT *` keys on every column, including the right side's, so - // the right side stays even when it is unique on the join keys — its - // columns are part of the DISTINCT key regardless. This matches a - // no-aggregate `GROUP BY` over all columns; dropping a redundant unique - // key would be a separate, functional-dependency-based simplification. - let plan = left_join_right_with_constraints(primary_key_on_id())? - .distinct()? - .project(vec![col("l.x")])? - .build()?; - - assert_optimized_plan_equal!(plan, @r" - Projection: l.x - Distinct: - Inner Join: l.id = r.id - TableScan: l - TableScan: r - ") - } - #[test] fn distinct_drops_unreferenced_side_when_projected() -> Result<()> { // `SELECT DISTINCT l.x` projects the right side away below the DISTINCT,