From 1452333cf0933d4d8da032af68bc5a3a05c62483 Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Tue, 29 Jul 2025 19:03:35 +0100 Subject: [PATCH 1/2] feat: Cache Parquet metadata --- datafusion/common/src/config.rs | 6 + .../common/src/file_options/parquet_writer.rs | 3 + .../src/datasource/file_format/options.rs | 15 ++ .../datasource-parquet/src/file_format.rs | 18 ++- datafusion/datasource-parquet/src/reader.rs | 131 +++++++++++++++++- .../execution/src/cache/cache_manager.rs | 37 +++++ datafusion/execution/src/cache/cache_unit.rs | 121 +++++++++++++++- datafusion/execution/src/runtime_env.rs | 1 + .../proto/datafusion_common.proto | 1 + datafusion/proto-common/src/from_proto/mod.rs | 1 + .../proto-common/src/generated/pbjson.rs | 18 +++ .../proto-common/src/generated/prost.rs | 3 + datafusion/proto-common/src/to_proto/mod.rs | 1 + .../src/generated/datafusion_proto_common.rs | 3 + .../proto/src/logical_plan/file_formats.rs | 2 + .../test_files/information_schema.slt | 2 + .../sqllogictest/test_files/parquet.slt | 119 ++++++++++++++++ docs/source/user-guide/configs.md | 1 + 18 files changed, 480 insertions(+), 3 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 5796edc283e0..3bcb0839b5a2 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -549,6 +549,12 @@ config_namespace! { /// (reading) Use any available bloom filters when reading parquet files pub bloom_filter_on_read: bool, default = true + /// (reading) Whether or not to enable the caching of embedded metadata of Parquet files + /// (footer and page metadata). Enabling it can offer substantial performance improvements + /// for repeated queries over large files. By default, the cache is automatically + /// invalidated when the underlying file is modified. + pub cache_metadata: bool, default = false + // The following options affect writing to parquet files // and map to parquet::file::properties::WriterProperties diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index cde0ea129979..9ea2b6af82ec 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -245,6 +245,7 @@ impl ParquetOptions { binary_as_string: _, // not used for writer props coerce_int96: _, // not used for writer props skip_arrow_metadata: _, + cache_metadata: _, } = self; let mut builder = WriterProperties::builder() @@ -522,6 +523,7 @@ mod tests { binary_as_string: defaults.binary_as_string, skip_arrow_metadata: defaults.skip_arrow_metadata, coerce_int96: None, + cache_metadata: defaults.cache_metadata, } } @@ -634,6 +636,7 @@ mod tests { binary_as_string: global_options_defaults.binary_as_string, skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, coerce_int96: None, + cache_metadata: global_options_defaults.cache_metadata, }, column_specific_options, key_value_metadata, diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 02b792823a82..459e92a7a976 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -254,6 +254,11 @@ pub struct ParquetReadOptions<'a> { pub file_sort_order: Vec>, /// Properties for decryption of Parquet files that use modular encryption pub file_decryption_properties: Option, + /// Whether or not to enable the caching of embedded metadata of this Parquet file (footer and + /// page metadata). Enabling it can offer substantial performance improvements for repeated + /// queries over large files. By default, the cache is automatically invalidated when the + /// underlying file is modified. + pub cache_metadata: Option, } impl Default for ParquetReadOptions<'_> { @@ -266,6 +271,7 @@ impl Default for ParquetReadOptions<'_> { schema: None, file_sort_order: vec![], file_decryption_properties: None, + cache_metadata: None, } } } @@ -325,6 +331,12 @@ impl<'a> ParquetReadOptions<'a> { self.file_decryption_properties = Some(file_decryption_properties); self } + + /// Specify whether to enable or not metadata caching + pub fn cache_metadata(mut self, cache_metadata: bool) -> Self { + self.cache_metadata = Some(cache_metadata); + self + } } /// Options that control the reading of ARROW files. @@ -590,6 +602,9 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> { if let Some(file_decryption_properties) = &self.file_decryption_properties { options.crypto.file_decryption = Some(file_decryption_properties.clone()); } + if let Some(cache_metadata) = self.cache_metadata { + options.global.cache_metadata = cache_metadata; + } let mut file_format = ParquetFormat::new().with_options(options); if let Some(parquet_pruning) = self.parquet_pruning { diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 43b0886193e7..7210cc09a0b3 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -63,6 +63,7 @@ use datafusion_physical_plan::Accumulator; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; +use crate::reader::CachedParquetFileReaderFactory; use crate::source::{parse_coerce_int96_string, ParquetSource}; use async_trait::async_trait; use bytes::Bytes; @@ -435,7 +436,7 @@ impl FileFormat for ParquetFormat { async fn create_physical_plan( &self, - _state: &dyn Session, + state: &dyn Session, conf: FileScanConfig, ) -> Result> { let mut metadata_size_hint = None; @@ -446,6 +447,21 @@ impl FileFormat for ParquetFormat { let mut source = ParquetSource::new(self.options.clone()); + // Use the CachedParquetFileReaderFactory when metadata caching is enabled + if self.options.global.cache_metadata { + if let Some(metadata_cache) = + state.runtime_env().cache_manager.get_file_metadata_cache() + { + let store = state + .runtime_env() + .object_store(conf.object_store_url.clone())?; + let cached_parquet_read_factory = + Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache)); + source = + source.with_parquet_file_reader_factory(cached_parquet_read_factory); + } + } + if let Some(metadata_size_hint) = metadata_size_hint { source = source.with_metadata_size_hint(metadata_size_hint) } diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index 27ec843c1991..bd9a3bc1e804 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -21,12 +21,14 @@ use crate::ParquetFileMetrics; use bytes::Bytes; use datafusion_datasource::file_meta::FileMeta; +use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::future::BoxFuture; +use futures::FutureExt; use object_store::ObjectStore; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; -use parquet::file::metadata::ParquetMetaData; +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use std::fmt::Debug; use std::ops::Range; use std::sync::Arc; @@ -150,3 +152,130 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { })) } } + +/// Implementation of [`ParquetFileReaderFactory`] supporting the caching of footer and page +/// metadata. Reads and updates the [`FileMetadataCache`] with the [`ParquetMetaData`] data. +/// This reader always loads the entire metadata (including page index, unless the file is +/// encrypted), even if not required by the current query, to ensure it is always available for +/// those that need it. +#[derive(Debug)] +pub struct CachedParquetFileReaderFactory { + store: Arc, + metadata_cache: FileMetadataCache, +} + +impl CachedParquetFileReaderFactory { + pub fn new(store: Arc, metadata_cache: FileMetadataCache) -> Self { + Self { + store, + metadata_cache, + } + } +} + +impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { + fn create_reader( + &self, + partition_index: usize, + file_meta: FileMeta, + metadata_size_hint: Option, + metrics: &ExecutionPlanMetricsSet, + ) -> datafusion_common::Result> { + let file_metrics = ParquetFileMetrics::new( + partition_index, + file_meta.location().as_ref(), + metrics, + ); + let store = Arc::clone(&self.store); + + let mut inner = + ParquetObjectReader::new(store, file_meta.object_meta.location.clone()) + .with_file_size(file_meta.object_meta.size); + + if let Some(hint) = metadata_size_hint { + inner = inner.with_footer_size_hint(hint) + }; + + Ok(Box::new(CachedParquetFileReader { + inner, + file_metrics, + file_meta, + metadata_cache: Arc::clone(&self.metadata_cache), + })) + } +} + +/// Implements [`AsyncFileReader`] for a Parquet file in object storage. Reads the file metadata +/// from the [`FileMetadataCache`], if available, otherwise reads it directly from the file and then +/// updates the cache. +pub(crate) struct CachedParquetFileReader { + pub file_metrics: ParquetFileMetrics, + pub inner: ParquetObjectReader, + file_meta: FileMeta, + metadata_cache: FileMetadataCache, +} + +impl AsyncFileReader for CachedParquetFileReader { + fn get_bytes( + &mut self, + range: Range, + ) -> BoxFuture<'_, parquet::errors::Result> { + let bytes_scanned = range.end - range.start; + self.file_metrics.bytes_scanned.add(bytes_scanned as usize); + self.inner.get_bytes(range) + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> + where + Self: Send, + { + let total: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + self.file_metrics.bytes_scanned.add(total as usize); + self.inner.get_byte_ranges(ranges) + } + + fn get_metadata<'a>( + &'a mut self, + options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture<'a, parquet::errors::Result>> { + let file_meta = self.file_meta.clone(); + let metadata_cache = Arc::clone(&self.metadata_cache); + + async move { + let object_meta = &file_meta.object_meta; + + // lookup if the metadata is already cached + if let Some(metadata) = + metadata_cache.get_with_extra(&object_meta.location, object_meta) + { + if let Ok(parquet_metadata) = Arc::downcast::(metadata) { + return Ok(Arc::clone(&parquet_metadata)); + } + } + + let mut reader = ParquetMetaDataReader::new(); + // the page index can only be loaded with unencrypted files + if let Some(file_decryption_properties) = + options.and_then(|o| o.file_decryption_properties()) + { + reader = + reader.with_decryption_properties(Some(file_decryption_properties)); + } else { + reader = reader.with_page_indexes(true); + } + reader.try_load(&mut self.inner, object_meta.size).await?; + let metadata = Arc::new(reader.finish()?); + + metadata_cache.put_with_extra( + &object_meta.location, + Arc::clone(&metadata) as Arc, + object_meta, + ); + Ok(metadata) + } + .boxed() + } +} diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index c2403e34c665..8f86a593e893 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. +use crate::cache::cache_unit::DefaultFilesMetadataCache; use crate::cache::CacheAccessor; use datafusion_common::{Result, Statistics}; use object_store::path::Path; use object_store::ObjectMeta; +use std::any::Any; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -32,6 +34,13 @@ pub type FileStatisticsCache = pub type ListFilesCache = Arc>, Extra = ObjectMeta>>; +/// Represents generic file-embedded metadata. +pub type FileMetadata = dyn Any + Send + Sync; + +/// Cache to store file-embedded metadata. +pub type FileMetadataCache = + Arc, Extra = ObjectMeta>>; + impl Debug for dyn CacheAccessor, Extra = ObjectMeta> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "Cache name: {} with length: {}", self.name(), self.len()) @@ -44,10 +53,17 @@ impl Debug for dyn CacheAccessor>, Extra = ObjectMeta> } } +impl Debug for dyn CacheAccessor, Extra = ObjectMeta> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Cache name: {} with length: {}", self.name(), self.len()) + } +} + #[derive(Default, Debug)] pub struct CacheManager { file_statistic_cache: Option, list_files_cache: Option, + file_metadata_cache: Option, } impl CacheManager { @@ -59,6 +75,13 @@ impl CacheManager { if let Some(lc) = &config.list_files_cache { manager.list_files_cache = Some(Arc::clone(lc)) } + if let Some(mc) = &config.file_metadata_cache { + manager.file_metadata_cache = Some(Arc::clone(mc)); + } else { + manager.file_metadata_cache = + Some(Arc::new(DefaultFilesMetadataCache::default())); + } + Ok(Arc::new(manager)) } @@ -71,6 +94,11 @@ impl CacheManager { pub fn get_list_files_cache(&self) -> Option { self.list_files_cache.clone() } + + /// Get the file embedded metadata cache. + pub fn get_file_metadata_cache(&self) -> Option { + self.file_metadata_cache.clone() + } } #[derive(Clone, Default)] @@ -86,6 +114,10 @@ pub struct CacheManagerConfig { /// location. /// Default is disable. pub list_files_cache: Option, + /// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a + /// data file (e.g., Parquet footer and page metadata). + /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`]. + pub file_metadata_cache: Option, } impl CacheManagerConfig { @@ -101,4 +133,9 @@ impl CacheManagerConfig { self.list_files_cache = cache; self } + + pub fn with_file_metadata_cache(mut self, cache: Option) -> Self { + self.file_metadata_cache = cache; + self + } } diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index a9291659a3ef..38d49b306832 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -17,6 +17,7 @@ use std::sync::Arc; +use crate::cache::cache_manager::FileMetadata; use crate::cache::CacheAccessor; use datafusion_common::Statistics; @@ -157,9 +158,79 @@ impl CacheAccessor>> for DefaultListFilesCache { } } +/// Collected file embedded metadata cache. +/// The metadata for some file is invalided when the file size or last modification time have been +/// changed. +#[derive(Default)] +pub struct DefaultFilesMetadataCache { + metadata: DashMap)>, +} + +impl CacheAccessor> for DefaultFilesMetadataCache { + type Extra = ObjectMeta; + + fn get(&self, _k: &Path) -> Option> { + panic!("get in DefaultFilesMetadataCache is not supported, please use get_with_extra") + } + + fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option> { + self.metadata + .get(k) + .map(|s| { + let (extra, metadata) = s.value(); + if extra.size != e.size || extra.last_modified != e.last_modified { + None + } else { + Some(Arc::clone(metadata)) + } + }) + .unwrap_or(None) + } + + fn put(&self, _key: &Path, _value: Arc) -> Option> { + panic!("put in DefaultFilesMetadataCache is not supported, please use put_with_extra") + } + + fn put_with_extra( + &self, + key: &Path, + value: Arc, + e: &Self::Extra, + ) -> Option> { + self.metadata + .insert(key.clone(), (e.clone(), value)) + .map(|x| x.1) + } + + fn remove(&mut self, k: &Path) -> Option> { + self.metadata.remove(k).map(|x| x.1 .1) + } + + fn contains_key(&self, k: &Path) -> bool { + self.metadata.contains_key(k) + } + + fn len(&self) -> usize { + self.metadata.len() + } + + fn clear(&self) { + self.metadata.clear() + } + + fn name(&self) -> String { + "DefaultFilesMetadataCache".to_string() + } +} + #[cfg(test)] mod tests { - use crate::cache::cache_unit::{DefaultFileStatisticsCache, DefaultListFilesCache}; + use std::sync::Arc; + + use crate::cache::cache_manager::FileMetadata; + use crate::cache::cache_unit::{ + DefaultFileStatisticsCache, DefaultFilesMetadataCache, DefaultListFilesCache, + }; use crate::cache::CacheAccessor; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use chrono::DateTime; @@ -232,4 +303,52 @@ mod tests { meta.clone() ); } + + #[test] + fn test_file_metadata_cache() { + let object_meta = ObjectMeta { + location: Path::from("test"), + last_modified: DateTime::parse_from_rfc3339("2025-07-29T12:12:12+00:00") + .unwrap() + .into(), + size: 1024, + e_tag: None, + version: None, + }; + let metadata: Arc = Arc::new("retrieved_metadata".to_owned()); + + let cache = DefaultFilesMetadataCache::default(); + assert!(cache + .get_with_extra(&object_meta.location, &object_meta) + .is_none()); + + cache.put_with_extra(&object_meta.location, metadata, &object_meta); + assert!(cache + .get_with_extra(&object_meta.location, &object_meta) + .is_some()); + + // file size changed + let mut object_meta2 = object_meta.clone(); + object_meta2.size = 2048; + assert!(cache + .get_with_extra(&object_meta2.location, &object_meta2) + .is_none()); + + // file last_modified changed + let mut object_meta2 = object_meta.clone(); + object_meta2.last_modified = + DateTime::parse_from_rfc3339("2025-07-29T13:13:13+00:00") + .unwrap() + .into(); + assert!(cache + .get_with_extra(&object_meta2.location, &object_meta2) + .is_none()); + + // different file + let mut object_meta2 = object_meta; + object_meta2.location = Path::from("test2"); + assert!(cache + .get_with_extra(&object_meta2.location, &object_meta2) + .is_none()); + } } diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index b086430a4ef7..4c75a53e9fba 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -299,6 +299,7 @@ impl RuntimeEnvBuilder { .cache_manager .get_file_statistic_cache(), list_files_cache: runtime_env.cache_manager.get_list_files_cache(), + file_metadata_cache: runtime_env.cache_manager.get_file_metadata_cache(), }; Self { diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 8cb272605899..ffdc29e429e8 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -504,6 +504,7 @@ message ParquetOptions { bool schema_force_view_types = 28; // default = false bool binary_as_string = 29; // default = false bool skip_arrow_metadata = 30; // default = false + bool cache_metadata = 33; // default = false oneof metadata_size_hint_opt { uint64 metadata_size_hint = 4; diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 0823e150268d..98df86a21f53 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -988,6 +988,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v), }).unwrap_or(None), skip_arrow_metadata: value.skip_arrow_metadata, + cache_metadata: value.cache_metadata, }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index f35fd1594695..89e85b0dc8f1 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -5066,6 +5066,9 @@ impl serde::Serialize for ParquetOptions { if self.skip_arrow_metadata { len += 1; } + if self.cache_metadata { + len += 1; + } if self.dictionary_page_size_limit != 0 { len += 1; } @@ -5168,6 +5171,9 @@ impl serde::Serialize for ParquetOptions { if self.skip_arrow_metadata { struct_ser.serialize_field("skipArrowMetadata", &self.skip_arrow_metadata)?; } + if self.cache_metadata { + struct_ser.serialize_field("cacheMetadata", &self.cache_metadata)?; + } if self.dictionary_page_size_limit != 0 { #[allow(clippy::needless_borrow)] #[allow(clippy::needless_borrows_for_generic_args)] @@ -5314,6 +5320,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "binaryAsString", "skip_arrow_metadata", "skipArrowMetadata", + "cache_metadata", + "cacheMetadata", "dictionary_page_size_limit", "dictionaryPageSizeLimit", "data_page_row_count_limit", @@ -5362,6 +5370,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { SchemaForceViewTypes, BinaryAsString, SkipArrowMetadata, + CacheMetadata, DictionaryPageSizeLimit, DataPageRowCountLimit, MaxRowGroupSize, @@ -5414,6 +5423,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "schemaForceViewTypes" | "schema_force_view_types" => Ok(GeneratedField::SchemaForceViewTypes), "binaryAsString" | "binary_as_string" => Ok(GeneratedField::BinaryAsString), "skipArrowMetadata" | "skip_arrow_metadata" => Ok(GeneratedField::SkipArrowMetadata), + "cacheMetadata" | "cache_metadata" => Ok(GeneratedField::CacheMetadata), "dictionaryPageSizeLimit" | "dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit), "dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit), "maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize), @@ -5464,6 +5474,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut schema_force_view_types__ = None; let mut binary_as_string__ = None; let mut skip_arrow_metadata__ = None; + let mut cache_metadata__ = None; let mut dictionary_page_size_limit__ = None; let mut data_page_row_count_limit__ = None; let mut max_row_group_size__ = None; @@ -5585,6 +5596,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } skip_arrow_metadata__ = Some(map_.next_value()?); } + GeneratedField::CacheMetadata => { + if cache_metadata__.is_some() { + return Err(serde::de::Error::duplicate_field("cacheMetadata")); + } + cache_metadata__ = Some(map_.next_value()?); + } GeneratedField::DictionaryPageSizeLimit => { if dictionary_page_size_limit__.is_some() { return Err(serde::de::Error::duplicate_field("dictionaryPageSizeLimit")); @@ -5700,6 +5717,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { schema_force_view_types: schema_force_view_types__.unwrap_or_default(), binary_as_string: binary_as_string__.unwrap_or_default(), skip_arrow_metadata: skip_arrow_metadata__.unwrap_or_default(), + cache_metadata: cache_metadata__.unwrap_or_default(), dictionary_page_size_limit: dictionary_page_size_limit__.unwrap_or_default(), data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(), max_row_group_size: max_row_group_size__.unwrap_or_default(), diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index ac4a9ea4be69..6ed32d7de053 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -764,6 +764,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "30")] pub skip_arrow_metadata: bool, + /// default = false + #[prost(bool, tag = "33")] + pub cache_metadata: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index b6cbe5759cfc..0bd6f09bb3d1 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -836,6 +836,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { binary_as_string: value.binary_as_string, skip_arrow_metadata: value.skip_arrow_metadata, coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), + cache_metadata: value.cache_metadata, }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index ac4a9ea4be69..6ed32d7de053 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -764,6 +764,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "30")] pub skip_arrow_metadata: bool, + /// default = false + #[prost(bool, tag = "33")] + pub cache_metadata: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 620442c79e72..1e0d76bc672b 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -414,6 +414,7 @@ impl TableParquetOptionsProto { coerce_int96_opt: global_options.global.coerce_int96.map(|compression| { parquet_options::CoerceInt96Opt::CoerceInt96(compression) }), + cache_metadata: global_options.global.cache_metadata, }), column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { ParquetColumnSpecificOptions { @@ -513,6 +514,7 @@ impl From<&ParquetOptionsProto> for ParquetOptions { coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt { parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(), }), + cache_metadata: proto.cache_metadata, } } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 86dfbd7c8496..50d2c78cbe7e 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -231,6 +231,7 @@ datafusion.execution.parquet.bloom_filter_fpp NULL datafusion.execution.parquet.bloom_filter_ndv NULL datafusion.execution.parquet.bloom_filter_on_read true datafusion.execution.parquet.bloom_filter_on_write false +datafusion.execution.parquet.cache_metadata false datafusion.execution.parquet.coerce_int96 NULL datafusion.execution.parquet.column_index_truncate_length 64 datafusion.execution.parquet.compression zstd(3) @@ -344,6 +345,7 @@ datafusion.execution.parquet.bloom_filter_fpp NULL (writing) Sets bloom filter f datafusion.execution.parquet.bloom_filter_ndv NULL (writing) Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting datafusion.execution.parquet.bloom_filter_on_read true (reading) Use any available bloom filters when reading parquet files datafusion.execution.parquet.bloom_filter_on_write false (writing) Write bloom filters for all columns when creating parquet files +datafusion.execution.parquet.cache_metadata false (reading) Whether or not to enable the caching of embedded metadata of Parquet files (footer and page metadata). Enabling it can offer substantial performance improvements for repeated queries over large files. By default, the cache is automatically invalidated when the underlying file is modified. datafusion.execution.parquet.coerce_int96 NULL (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. datafusion.execution.parquet.column_index_truncate_length 64 (writing) Sets column index truncate length datafusion.execution.parquet.compression zstd(3) (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting Note that this default setting is not the same as the default parquet writer setting. diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index 51e40e3e685d..0beb2e8f5d20 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -750,3 +750,122 @@ drop table int96_from_spark; statement ok set datafusion.execution.parquet.coerce_int96 = ns; + + +### Tests for metadata caching + +# Create temporary data +query I +COPY ( + SELECT 'k-' || i as k, i as v + FROM generate_series(1, 20000) t(i) + ORDER BY k +) +TO 'test_files/scratch/parquet/cache_metadata.parquet' +OPTIONS (MAX_ROW_GROUP_SIZE 4096, DATA_PAGE_ROW_COUNT_LIMIT 2048); +---- +20000 + +# Enable the cache +statement ok +set datafusion.execution.parquet.cache_metadata = true; + +statement ok +CREATE EXTERNAL TABLE t +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet/cache_metadata.parquet'; + +query TI +select * from t where k = 'k-1000' or k = 'k-9999' order by k +---- +k-1000 1000 +k-9999 9999 + +query IT +select v, k from t where (v between 1 and 2) or (v between 9999 and 10000) order by v +---- +1 k-1 +2 k-2 +9999 k-9999 +10000 k-10000 + +# Updating the file should invalidate the cache. Otherwise, the following queries would fail +# (e.g., with "Arrow: Parquet argument error: External: incomplete frame"). +query I +COPY ( + SELECT 'k-' || i as k, 20000 - i as v + FROM generate_series(1, 20000) t(i) + ORDER BY k +) +TO 'test_files/scratch/parquet/cache_metadata.parquet' +OPTIONS (MAX_ROW_GROUP_SIZE 4096, DATA_PAGE_ROW_COUNT_LIMIT 2048); +---- +20000 + +query TI +select * from t where k = 'k-1000' or k = 'k-9999' order by k +---- +k-1000 19000 +k-9999 10001 + +query IT +select v, k from t where (v between 1 and 2) or (v between 9999 and 10000) order by v +---- +1 k-19999 +2 k-19998 +9999 k-10001 +10000 k-10000 + +statement ok +DROP TABLE t; + +# Partitioned files should be independently cached. Otherwise, the following queries might fail. +statement ok +COPY ( + SELECT i % 10 as part, 'k-' || i as k, i as v + FROM generate_series(0, 9) t(i) + ORDER BY k +) +TO 'test_files/scratch/parquet/cache_metadata_partitioned.parquet' +PARTITIONED BY (part); + +statement ok +CREATE EXTERNAL TABLE t +STORED AS PARQUET +PARTITIONED BY (part) +LOCATION 'test_files/scratch/parquet/cache_metadata_partitioned.parquet'; + +query TTI +select part, k, v from t where k = 'k-0' +---- +0 k-0 0 + +query TTI +select part, k, v from t where k = 'k-5' +---- +5 k-5 5 + +query TTI +select part, k, v from t where k = 'k-9' +---- +9 k-9 9 + +query TTI +select part, k, v from t order by k +---- +0 k-0 0 +1 k-1 1 +2 k-2 2 +3 k-3 3 +4 k-4 4 +5 k-5 5 +6 k-6 6 +7 k-7 7 +8 k-8 8 +9 k-9 9 + +statement ok +DROP TABLE t; + +statement ok +set datafusion.execution.parquet.cache_metadata = false; diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 96b7ee672bdb..86eee5b68b9a 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -60,6 +60,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files | +| datafusion.execution.parquet.cache_metadata | false | (reading) Whether or not to enable the caching of embedded metadata of Parquet files (footer and page metadata). Enabling it can offer substantial performance improvements for repeated queries over large files. By default, the cache is automatically invalidated when the underlying file is modified. | | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | | datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in bytes | | datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" | From 5a824b1259d3b0a2eac52ed64e43d4506489fab2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 30 Jul 2025 16:35:41 -0400 Subject: [PATCH 2/2] Default cache_metadata to on --- datafusion/common/src/config.rs | 2 +- datafusion/sqllogictest/test_files/information_schema.slt | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 3bcb0839b5a2..46a5fbea33f2 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -553,7 +553,7 @@ config_namespace! { /// (footer and page metadata). Enabling it can offer substantial performance improvements /// for repeated queries over large files. By default, the cache is automatically /// invalidated when the underlying file is modified. - pub cache_metadata: bool, default = false + pub cache_metadata: bool, default = true // The following options affect writing to parquet files // and map to parquet::file::properties::WriterProperties diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 50d2c78cbe7e..165d3430f269 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -231,7 +231,7 @@ datafusion.execution.parquet.bloom_filter_fpp NULL datafusion.execution.parquet.bloom_filter_ndv NULL datafusion.execution.parquet.bloom_filter_on_read true datafusion.execution.parquet.bloom_filter_on_write false -datafusion.execution.parquet.cache_metadata false +datafusion.execution.parquet.cache_metadata true datafusion.execution.parquet.coerce_int96 NULL datafusion.execution.parquet.column_index_truncate_length 64 datafusion.execution.parquet.compression zstd(3) @@ -345,7 +345,7 @@ datafusion.execution.parquet.bloom_filter_fpp NULL (writing) Sets bloom filter f datafusion.execution.parquet.bloom_filter_ndv NULL (writing) Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting datafusion.execution.parquet.bloom_filter_on_read true (reading) Use any available bloom filters when reading parquet files datafusion.execution.parquet.bloom_filter_on_write false (writing) Write bloom filters for all columns when creating parquet files -datafusion.execution.parquet.cache_metadata false (reading) Whether or not to enable the caching of embedded metadata of Parquet files (footer and page metadata). Enabling it can offer substantial performance improvements for repeated queries over large files. By default, the cache is automatically invalidated when the underlying file is modified. +datafusion.execution.parquet.cache_metadata true (reading) Whether or not to enable the caching of embedded metadata of Parquet files (footer and page metadata). Enabling it can offer substantial performance improvements for repeated queries over large files. By default, the cache is automatically invalidated when the underlying file is modified. datafusion.execution.parquet.coerce_int96 NULL (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. datafusion.execution.parquet.column_index_truncate_length 64 (writing) Sets column index truncate length datafusion.execution.parquet.compression zstd(3) (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting Note that this default setting is not the same as the default parquet writer setting.