Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions benchmarks/src/tpcds/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ impl RunOpt {
self.enable_piecewise_merge_join;
config.options_mut().execution.hash_join_buffering_capacity =
self.hash_join_buffering_capacity;
if std::env::var("DISABLE_MATERIALIZED_CTES").is_ok() {
config.options_mut().execution.enable_materialized_ctes = false;
}
let rt = self.common.build_runtime()?;
let ctx = SessionContext::new_with_config_rt(config, rt);
// register tables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ async fn automatic_usage_example() -> Result<()> {
.with_memory_limit(5_000_000, 1.0) // 5MB, 100% utilization
.build_arc()?;

let config = SessionConfig::new();
let mut config = SessionConfig::new();
config.options_mut().execution.enable_materialized_ctes = false;
let ctx = SessionContext::new_with_config_rt(config, runtime);

// Create a simple table for demonstration
Expand Down
6 changes: 6 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,12 @@ config_namespace! {
/// Should DataFusion support recursive CTEs
pub enable_recursive_ctes: bool, default = true

/// Should DataFusion materialize CTEs that are referenced multiple times.
/// When enabled, CTEs referenced more than once are generally computed
/// once and cached, except for cheap CTEs and CTEs consumed below a top-level
/// limit.
pub enable_materialized_ctes: bool, default = true

/// Attempt to eliminate sorts by packing & sorting files with non-overlapping
/// statistics into the same file groups.
/// Currently experimental
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2313,7 +2313,9 @@ impl QueryPlanner for DefaultQueryPlanner {
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
let planner = DefaultPhysicalPlanner::default();
let planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
crate::materialized_cte_planner::MaterializedCtePlanner::new(),
)]);
planner
.create_physical_plan(logical_plan, session_state)
.await
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,7 @@ pub mod dataframe;
pub mod datasource;
pub mod error;
pub mod execution;
pub mod materialized_cte_planner;
pub mod physical_planner;
pub mod prelude;
pub mod scalar;
Expand Down
154 changes: 154 additions & 0 deletions datafusion/core/src/materialized_cte_planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Extension planner for materialized CTEs.
//!
//! This module provides [`MaterializedCtePlanner`] which connects the logical
//! plan nodes ([`MaterializedCteProducer`] and [`MaterializedCteReader`]) to
//! their physical execution counterparts.

use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use datafusion_common::Result;
use datafusion_expr::logical_plan::{MaterializedCteProducer, MaterializedCteReader};
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion_physical_plan::materialize::{
MaterializeExec, MaterializedCache, MaterializedScanExec, materialized_statistics,
replace_materialized_scans,
};
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};

use crate::execution::context::SessionState;
use crate::physical_planner::{ExtensionPlanner, PhysicalPlanner};

/// An extension planner that handles materialized CTE logical nodes.
///
/// It maintains a map of CTE name to shared cache, ensuring that
/// producers and readers for the same CTE share the same cache instance.
#[derive(Debug)]
pub struct MaterializedCtePlanner {
/// Map of CTE name to shared cache
caches: Mutex<HashMap<String, Arc<MaterializedCache>>>,
/// Map of CTE name to the number of partitions readers should expose
partition_counts: Mutex<HashMap<String, usize>>,
}

impl MaterializedCtePlanner {
/// Create a new `MaterializedCtePlanner`.
pub fn new() -> Self {
Self {
caches: Mutex::new(HashMap::new()),
partition_counts: Mutex::new(HashMap::new()),
}
}

/// Get or create a cache for the given CTE name.
fn get_or_create_cache(&self, name: &str) -> Arc<MaterializedCache> {
let mut caches = self.caches.lock().unwrap();
Arc::clone(
caches
.entry(name.to_string())
.or_insert_with(|| Arc::new(MaterializedCache::new(name.to_string()))),
)
}

fn create_cache(&self, name: &str) -> Arc<MaterializedCache> {
let cache = Arc::new(MaterializedCache::new(name.to_string()));
self.caches
.lock()
.unwrap()
.insert(name.to_string(), Arc::clone(&cache));
cache
}

fn set_partition_count(&self, name: &str, partition_count: usize) {
self.partition_counts
.lock()
.unwrap()
.insert(name.to_string(), partition_count);
}

fn partition_count(&self, name: &str) -> usize {
self.partition_counts
.lock()
.unwrap()
.get(name)
.copied()
.unwrap_or(1)
}
}

impl Default for MaterializedCtePlanner {
fn default() -> Self {
Self::new()
}
}

#[async_trait]
impl ExtensionPlanner for MaterializedCtePlanner {
async fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
_logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
_session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
// Handle MaterializedCteProducer
if let Some(producer) = node.as_any().downcast_ref::<MaterializedCteProducer>() {
let cache = self.create_cache(&producer.name);
let cte_plan = Arc::clone(&physical_inputs[0]);
let partition_count = cte_plan.output_partitioning().partition_count();
let statistics = materialized_statistics(cte_plan.as_ref())?;
self.set_partition_count(&producer.name, partition_count);
let continuation = replace_materialized_scans(
Arc::clone(&physical_inputs[1]),
&producer.name,
&cache,
partition_count,
&statistics,
)?;
let exec = MaterializeExec::new(
producer.name.clone(),
cte_plan,
continuation,
cache,
);
return Ok(Some(Arc::new(exec)));
}

// Handle MaterializedCteReader
if let Some(reader) = node.as_any().downcast_ref::<MaterializedCteReader>() {
let cache = self.get_or_create_cache(&reader.name);
let schema = Arc::clone(reader.schema.inner());
let statistics =
Arc::new(datafusion_physical_plan::Statistics::new_unknown(&schema));
let exec = MaterializedScanExec::new(
reader.name.clone(),
schema,
cache,
self.partition_count(&reader.name),
statistics,
);
return Ok(Some(Arc::new(exec)));
}

Ok(None)
}
}
35 changes: 19 additions & 16 deletions datafusion/core/src/optimizer_rule_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,25 @@ Rule order matters. The default pipeline may change between releases.
| 7 | `decorrelate_predicate_subquery` | Converts eligible `IN` and `EXISTS` predicate subqueries into semi or anti joins. |
| 8 | `scalar_subquery_to_join` | Rewrites eligible scalar subqueries into joins and adds schema-preserving projections. |
| 9 | `decorrelate_lateral_join` | Rewrites eligible lateral joins into regular joins. |
| 10 | `extract_equijoin_predicate` | Splits join filters into equijoin keys and residual predicates. |
| 11 | `eliminate_duplicated_expr` | Removes duplicate expressions from projections, aggregates, and similar operators. |
| 12 | `eliminate_filter` | Drops always-true filters and replaces always-false or NULL filters with empty relations. |
| 13 | `eliminate_cross_join` | Uses filter predicates to replace cross joins with inner joins when join keys can be found. |
| 14 | `eliminate_limit` | Removes no-op limits and simplifies trivial limit shapes. |
| 15 | `propagate_empty_relation` | Pushes empty-relation knowledge upward so operators fed by no rows collapse early. |
| 16 | `filter_null_join_keys` | Adds `IS NOT NULL` filters to nullable equijoin keys that can never match. |
| 17 | `eliminate_outer_join` | Rewrites outer joins to inner joins when later filters reject the NULL-extended rows. |
| 18 | `push_down_limit` | Moves literal limits closer to scans and unions and merges adjacent limits. |
| 19 | `push_down_filter` | Moves filters as early as possible through filter-commutative operators. |
| 20 | `single_distinct_aggregation_to_group_by` | Rewrites single-column `DISTINCT` aggregations into two-stage `GROUP BY` plans. |
| 21 | `eliminate_group_by_constant` | Removes constant or functionally redundant expressions from `GROUP BY`. |
| 22 | `common_sub_expression_eliminate` | Computes repeated subexpressions once and reuses the result. |
| 23 | `extract_leaf_expressions` | Pulls cheap leaf expressions closer to data sources so later pruning and filter rules can act earlier. |
| 24 | `push_down_leaf_projections` | Pushes the helper projections created by leaf extraction toward leaf inputs. |
| 25 | `optimize_projections` | Prunes unused columns and removes unnecessary logical projections. |
| 10 | `common_subplan_eliminate` | Detects duplicate subplans and materializes them so they are computed once and read multiple times. |
| 11 | `extract_equijoin_predicate` | Splits join filters into equijoin keys and residual predicates. |
| 12 | `eliminate_duplicated_expr` | Removes duplicate expressions from projections, aggregates, and similar operators. |
| 13 | `eliminate_filter` | Drops always-true filters and replaces always-false or NULL filters with empty relations. |
| 14 | `eliminate_cross_join` | Uses filter predicates to replace cross joins with inner joins when join keys can be found. |
| 15 | `eliminate_limit` | Removes no-op limits and simplifies trivial limit shapes. |
| 16 | `propagate_empty_relation` | Pushes empty-relation knowledge upward so operators fed by no rows collapse early. |
| 17 | `filter_null_join_keys` | Adds `IS NOT NULL` filters to nullable equijoin keys that can never match. |
| 18 | `eliminate_outer_join` | Rewrites outer joins to inner joins when later filters reject the NULL-extended rows. |
| 19 | `push_down_limit` | Moves literal limits closer to scans and unions and merges adjacent limits. |
| 20 | `push_down_filter` | Moves filters as early as possible through filter-commutative operators. |
| 21 | `inline_cte` | Inlines materialized CTEs where materialization is not beneficial (cheap, limited, or disjoint-filtered). |
| 22 | `cte_filter_pusher` | Pushes OR-combined filters from CTE readers into the materialized CTE body to reduce materialization volume. |
| 23 | `single_distinct_aggregation_to_group_by` | Rewrites single-column `DISTINCT` aggregations into two-stage `GROUP BY` plans. |
| 24 | `eliminate_group_by_constant` | Removes constant or functionally redundant expressions from `GROUP BY`. |
| 25 | `common_sub_expression_eliminate` | Computes repeated subexpressions once and reuses the result. |
| 26 | `extract_leaf_expressions` | Pulls cheap leaf expressions closer to data sources so later pruning and filter rules can act earlier. |
| 27 | `push_down_leaf_projections` | Pushes the helper projections created by leaf extraction toward leaf inputs. |
| 28 | `optimize_projections` | Prunes unused columns and removes unnecessary logical projections. |

### Physical Optimizer Rules

Expand Down
Loading