Skip to content
41 changes: 26 additions & 15 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::scalar::ScalarValue;

use async_trait::async_trait;
use datafusion_common::heap_size::{DFHeapSize, DFHeapSizeCtx};
use parquet::basic::ConvertedType;
use parquet::data_type::{ByteArray, FixedLenByteArray};
use parquet::file::reader::FileReader;
Expand Down Expand Up @@ -546,15 +547,17 @@ impl TableFunctionImpl for MetadataCacheFunc {
for (path, entry) in cached_entries {
path_arr.push(path.to_string());
file_modified_arr
.push(Some(entry.object_meta.last_modified.timestamp_millis()));
file_size_bytes_arr.push(entry.object_meta.size);
e_tag_arr.push(entry.object_meta.e_tag);
version_arr.push(entry.object_meta.version);
.push(Some(entry.value.meta.last_modified.timestamp_millis()));
file_size_bytes_arr.push(entry.value.meta.size);
e_tag_arr.push(entry.value.meta.e_tag);
version_arr.push(entry.value.meta.version);
metadata_size_bytes.push(entry.size_bytes as u64);
hits_arr.push(entry.hits as u64);

let mut extra = entry
.extra
.value
.file_metadata
.extra_info()
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>();
Expand Down Expand Up @@ -667,14 +670,22 @@ impl TableFunctionImpl for StatisticsCacheFunc {
table_arr
.push(path.table.map_or_else(|| "".to_string(), |t| t.to_string()));
file_modified_arr
.push(Some(entry.object_meta.last_modified.timestamp_millis()));
file_size_bytes_arr.push(entry.object_meta.size);
e_tag_arr.push(entry.object_meta.e_tag);
version_arr.push(entry.object_meta.version);
num_rows_arr.push(entry.num_rows.to_string());
num_columns_arr.push(entry.num_columns as u64);
table_size_bytes_arr.push(entry.table_size_bytes.to_string());
statistics_size_bytes_arr.push(entry.statistics_size_bytes as u64);
.push(Some(entry.value.meta.last_modified.timestamp_millis()));
file_size_bytes_arr.push(entry.value.meta.size);
e_tag_arr.push(entry.value.meta.e_tag);
version_arr.push(entry.value.meta.version);
num_rows_arr.push(entry.value.statistics.num_rows.to_string());
num_columns_arr
.push(entry.value.statistics.column_statistics.len() as u64);
table_size_bytes_arr
.push(entry.value.statistics.total_byte_size.to_string());
statistics_size_bytes_arr.push(
entry
.value
.statistics
.heap_size(&mut DFHeapSizeCtx::default())
as u64,
);
}
}

Expand Down Expand Up @@ -827,14 +838,14 @@ impl TableFunctionImpl for ListFilesCacheFunc {
.map(|t| t.duration_since(now).as_millis() as i64),
);

for meta in entry.metas.files.iter() {
for meta in entry.value.files.iter() {
file_path_arr.push(meta.location.to_string());
file_modified_arr.push(meta.last_modified.timestamp_millis());
file_size_bytes_arr.push(meta.size);
etag_arr.push(meta.e_tag.clone());
version_arr.push(meta.version.clone());
}
current_offset += entry.metas.files.len() as i32;
current_offset += entry.value.files.len() as i32;
offsets.push(current_offset);
}
}
Expand Down
5 changes: 3 additions & 2 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,10 @@ mod tests {
use std::time::Duration;

use super::*;
use datafusion::execution::cache::default_cache::DefaultCache;
use datafusion::{
common::test_util::batches_to_string,
execution::cache::{DefaultListFilesCache, cache_manager::CacheManagerConfig},
execution::cache::cache_manager::CacheManagerConfig,
prelude::{ParquetReadOptions, col, lit, split_part},
};
use insta::assert_snapshot;
Expand Down Expand Up @@ -700,7 +701,7 @@ mod tests {

#[tokio::test]
async fn test_list_files_cache() -> Result<(), DataFusionError> {
let list_files_cache = Arc::new(DefaultListFilesCache::new(
let list_files_cache = Arc::new(DefaultCache::new_with_ttl(
1024,
Some(Duration::from_secs(1)),
));
Expand Down
6 changes: 3 additions & 3 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_datasource::{
ListingTableUrl, PartitionedFile, TableSchemaBuilder, compute_all_files_statistics,
};
use datafusion_execution::cache::TableScopedPath;
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_manager::TableScopedPath;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
Expand Down Expand Up @@ -186,7 +186,7 @@ pub struct ListingTable {
/// The SQL definition for this table, if any
definition: Option<String>,
/// Cache for collected file statistics
collected_statistics: Option<Arc<dyn FileStatisticsCache>>,
collected_statistics: Option<Arc<FileStatisticsCache>>,
/// Constraints applied to this table
constraints: Constraints,
/// Column default expressions for columns that are not physically present in the data files
Expand Down Expand Up @@ -259,7 +259,7 @@ impl ListingTable {
/// Setting a statistics cache on the `SessionContext` can avoid refetching statistics
/// multiple times in the same session.
///
pub fn with_cache(mut self, cache: Option<Arc<dyn FileStatisticsCache>>) -> Self {
pub fn with_cache(mut self, cache: Option<Arc<FileStatisticsCache>>) -> Self {
self.collected_statistics = cache;
self
}
Expand Down
14 changes: 9 additions & 5 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,9 @@ mod tests {
datasource::file_format::csv::CsvFormat, execution::context::SessionContext,
test_util::parquet_test_data,
};
use datafusion_execution::cache::CacheAccessor;
use datafusion_execution::cache::cache_manager::CacheManagerConfig;
use datafusion_execution::cache::file_statistics_cache::DefaultFileStatisticsCache;
use datafusion_execution::cache::cache_manager::{
CacheManagerConfig, DEFAULT_FILE_STATISTICS_MEMORY_LIMIT,
};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use glob::Pattern;
Expand All @@ -242,6 +242,8 @@ mod tests {

use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{DFSchema, TableReference};
use datafusion_execution::cache::Cache;
use datafusion_execution::cache::default_cache::DefaultCache;
use datafusion_expr::registry::ExtensionTypeRegistryRef;

#[tokio::test]
Expand Down Expand Up @@ -484,7 +486,8 @@ mod tests {
.to_string();

// Test with collect_statistics enabled
let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default());
let file_statistics_cache =
Arc::new(DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT));
let cache_config = CacheManagerConfig::default()
.with_file_statistics_cache(Some(file_statistics_cache.clone()));
let runtime = RuntimeEnvBuilder::new()
Expand Down Expand Up @@ -514,7 +517,8 @@ mod tests {
);

// Test with collect_statistics disabled
let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default());
let file_statistics_cache =
Arc::new(DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT));
let cache_config = CacheManagerConfig::default()
.with_file_statistics_cache(Some(file_statistics_cache.clone()));
let runtime = RuntimeEnvBuilder::new()
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ use datafusion_common::{
};
pub use datafusion_execution::TaskContext;
use datafusion_execution::cache::cache_manager::{
DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_TTL,
DEFAULT_METADATA_CACHE_LIMIT,
DEFAULT_FILE_STATISTICS_MEMORY_LIMIT, DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
DEFAULT_LIST_FILES_CACHE_TTL, DEFAULT_METADATA_CACHE_LIMIT,
};
pub use datafusion_execution::config::SessionConfig;
use datafusion_execution::disk_manager::{
Expand All @@ -103,7 +103,6 @@ use datafusion_session::SessionStore;

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion_execution::cache::file_statistics_cache::DEFAULT_FILE_STATISTICS_MEMORY_LIMIT;
use object_store::ObjectStore;
use parking_lot::RwLock;
use url::Url;
Expand Down
21 changes: 10 additions & 11 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::prelude::SessionContext;
use datafusion_common::DFSchema;
use datafusion_common::stats::Precision;
use datafusion_execution::cache::DefaultListFilesCache;
use datafusion_execution::cache::cache_manager::{
CacheManagerConfig, FileStatisticsCache,
CacheManagerConfig, DEFAULT_FILE_STATISTICS_MEMORY_LIMIT,
DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, FileStatisticsCache, ListFilesCache,
};
use datafusion_execution::cache::file_statistics_cache::DefaultFileStatisticsCache;
use datafusion_execution::cache::default_cache::DefaultCache;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_expr::{Expr, col, lit};
Expand Down Expand Up @@ -238,7 +238,7 @@ async fn list_files_with_session_level_cache() {

async fn get_listing_table(
table_path: &ListingTableUrl,
static_cache: Option<Arc<dyn FileStatisticsCache>>,
static_cache: Option<Arc<FileStatisticsCache>>,
opt: &ListingOptions,
) -> ListingTable {
let schema = opt
Expand All @@ -256,14 +256,13 @@ async fn get_listing_table(
.with_cache(static_cache)
}

fn get_cache_runtime_state() -> (
Arc<DefaultFileStatisticsCache>,
Arc<DefaultListFilesCache>,
SessionState,
) {
fn get_cache_runtime_state()
-> (Arc<FileStatisticsCache>, Arc<ListFilesCache>, SessionState) {
let cache_config = CacheManagerConfig::default();
let file_static_cache = Arc::new(DefaultFileStatisticsCache::default());
let list_file_cache = Arc::new(DefaultListFilesCache::default());
let file_static_cache =
Arc::new(DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT));
let list_file_cache =
Arc::new(DefaultCache::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT));

let cache_config = cache_config
.with_file_statistics_cache(Some(file_static_cache.clone()))
Expand Down
17 changes: 11 additions & 6 deletions datafusion/core/tests/sql/runtime_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ use std::time::Duration;
use datafusion::execution::context::SessionContext;
use datafusion::execution::context::TaskContext;
use datafusion::prelude::SessionConfig;
use datafusion_execution::cache::DefaultListFilesCache;
use datafusion_execution::cache::cache_manager::CacheManagerConfig;
use datafusion_execution::cache::file_statistics_cache::DefaultFileStatisticsCache;
use datafusion_execution::cache::cache_manager::{
CacheManagerConfig, DEFAULT_FILE_STATISTICS_MEMORY_LIMIT,
DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
};
use datafusion_execution::cache::default_cache::DefaultCache;
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_physical_plan::common::collect;

Expand Down Expand Up @@ -260,7 +262,8 @@ async fn test_test_metadata_cache_limit() {

#[tokio::test]
async fn test_list_files_cache_limit() {
let list_files_cache = Arc::new(DefaultListFilesCache::default());
let list_files_cache =
Arc::new(DefaultCache::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT));

let rt = RuntimeEnvBuilder::new()
.with_cache_manager(
Expand Down Expand Up @@ -303,7 +306,8 @@ async fn test_list_files_cache_limit() {

#[tokio::test]
async fn test_list_files_cache_ttl() {
let list_files_cache = Arc::new(DefaultListFilesCache::default());
let list_files_cache =
Arc::new(DefaultCache::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT));

let rt = RuntimeEnvBuilder::new()
.with_cache_manager(
Expand Down Expand Up @@ -347,7 +351,8 @@ async fn test_list_files_cache_ttl() {

#[tokio::test]
async fn test_file_statistics_cache_limit() {
let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default());
let file_statistics_cache =
Arc::new(DefaultCache::new(DEFAULT_FILE_STATISTICS_MEMORY_LIMIT));

let rt = RuntimeEnvBuilder::new()
.with_cache_manager(
Expand Down
4 changes: 2 additions & 2 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ pub async fn fetch_parquet_metadata(
object_meta: &ObjectMeta,
size_hint: Option<usize>,
decryption_properties: Option<&FileDecryptionProperties>,
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
file_metadata_cache: Option<Arc<FileMetadataCache>>,
) -> Result<Arc<ParquetMetaData>> {
let decryption_properties = decryption_properties.cloned().map(Arc::new);
DFParquetMetadata::new(store, object_meta)
Expand All @@ -650,7 +650,7 @@ pub async fn fetch_statistics(
file: &ObjectMeta,
metadata_size_hint: Option<usize>,
decryption_properties: Option<&FileDecryptionProperties>,
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
file_metadata_cache: Option<Arc<FileMetadataCache>>,
) -> Result<Statistics> {
let decryption_properties = decryption_properties.cloned().map(Arc::new);
DFParquetMetadata::new(store, file)
Expand Down
7 changes: 3 additions & 4 deletions datafusion/datasource-parquet/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit};
use datafusion_common::encryption::FileDecryptionProperties;
use datafusion_common::stats::Precision;
use datafusion_common::{
ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics,
ColumnStatistics, DataFusionError, HashMap, Result, ScalarValue, Statistics,
};
use datafusion_execution::cache::cache_manager::{
CachedFileMetadataEntry, FileMetadata, FileMetadataCache,
Expand All @@ -48,7 +48,6 @@ use parquet::file::metadata::{
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::schema::types::SchemaDescriptor;
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

/// Minimum fraction of row groups that must report NDV statistics for the
Expand All @@ -69,7 +68,7 @@ pub struct DFParquetMetadata<'a> {
object_meta: &'a ObjectMeta,
metadata_size_hint: Option<usize>,
decryption_properties: Option<Arc<FileDecryptionProperties>>,
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
file_metadata_cache: Option<Arc<FileMetadataCache>>,
/// timeunit to coerce INT96 timestamps to
pub coerce_int96: Option<TimeUnit>,
/// Optional timezone applied to INT96-coerced timestamps.
Expand Down Expand Up @@ -107,7 +106,7 @@ impl<'a> DFParquetMetadata<'a> {
/// set file metadata cache
pub fn with_file_metadata_cache(
mut self,
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
file_metadata_cache: Option<Arc<FileMetadataCache>>,
) -> Self {
self.file_metadata_cache = file_metadata_cache;
self
Expand Down
10 changes: 5 additions & 5 deletions datafusion/datasource-parquet/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use crate::ParquetFileMetrics;
use crate::metadata::DFParquetMetadata;
use bytes::Bytes;
use datafusion_common::HashMap;
use datafusion_datasource::PartitionedFile;
use datafusion_execution::cache::cache_manager::FileMetadata;
use datafusion_execution::cache::cache_manager::FileMetadataCache;
Expand All @@ -32,7 +33,6 @@ use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use parquet::file::metadata::ParquetMetaData;
use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
Expand Down Expand Up @@ -182,13 +182,13 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
#[derive(Debug)]
pub struct CachedParquetFileReaderFactory {
store: Arc<dyn ObjectStore>,
metadata_cache: Arc<dyn FileMetadataCache>,
metadata_cache: Arc<FileMetadataCache>,
}

impl CachedParquetFileReaderFactory {
pub fn new(
store: Arc<dyn ObjectStore>,
metadata_cache: Arc<dyn FileMetadataCache>,
metadata_cache: Arc<FileMetadataCache>,
) -> Self {
Self {
store,
Expand Down Expand Up @@ -241,7 +241,7 @@ pub struct CachedParquetFileReader {
store: Arc<dyn ObjectStore>,
pub inner: ParquetObjectReader,
partitioned_file: PartitionedFile,
metadata_cache: Arc<dyn FileMetadataCache>,
metadata_cache: Arc<FileMetadataCache>,
metadata_size_hint: Option<usize>,
}

Expand All @@ -251,7 +251,7 @@ impl CachedParquetFileReader {
store: Arc<dyn ObjectStore>,
inner: ParquetObjectReader,
partitioned_file: PartitionedFile,
metadata_cache: Arc<dyn FileMetadataCache>,
metadata_cache: Arc<FileMetadataCache>,
metadata_size_hint: Option<usize>,
) -> Self {
Self {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/datasource/src/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
use std::sync::Arc;

use datafusion_common::{DataFusionError, Result, TableReference};
use datafusion_execution::cache::TableScopedPath;
use datafusion_execution::cache::cache_manager::CachedFileList;
use datafusion_execution::cache::cache_manager::TableScopedPath;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_session::Session;

Expand Down
Loading
Loading