Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,12 @@ config_namespace! {
/// (reading) Use any available bloom filters when reading parquet files
pub bloom_filter_on_read: bool, default = true

/// (reading) Whether or not to enable the caching of embedded metadata of Parquet files
/// (footer and page metadata). Enabling it can offer substantial performance improvements
/// for repeated queries over large files. By default, the cache is automatically
/// invalidated when the underlying file is modified.
pub cache_metadata: bool, default = true

// The following options affect writing to parquet files
// and map to parquet::file::properties::WriterProperties

Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ impl ParquetOptions {
binary_as_string: _, // not used for writer props
coerce_int96: _, // not used for writer props
skip_arrow_metadata: _,
cache_metadata: _,
} = self;

let mut builder = WriterProperties::builder()
Expand Down Expand Up @@ -522,6 +523,7 @@ mod tests {
binary_as_string: defaults.binary_as_string,
skip_arrow_metadata: defaults.skip_arrow_metadata,
coerce_int96: None,
cache_metadata: defaults.cache_metadata,
}
}

Expand Down Expand Up @@ -634,6 +636,7 @@ mod tests {
binary_as_string: global_options_defaults.binary_as_string,
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
coerce_int96: None,
cache_metadata: global_options_defaults.cache_metadata,
},
column_specific_options,
key_value_metadata,
Expand Down
15 changes: 15 additions & 0 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ pub struct ParquetReadOptions<'a> {
pub file_sort_order: Vec<Vec<SortExpr>>,
/// Properties for decryption of Parquet files that use modular encryption
pub file_decryption_properties: Option<ConfigFileDecryptionProperties>,
/// Whether or not to enable the caching of embedded metadata of this Parquet file (footer and
/// page metadata). Enabling it can offer substantial performance improvements for repeated
/// queries over large files. By default, the cache is automatically invalidated when the
/// underlying file is modified.
pub cache_metadata: Option<bool>,
}

impl Default for ParquetReadOptions<'_> {
Expand All @@ -266,6 +271,7 @@ impl Default for ParquetReadOptions<'_> {
schema: None,
file_sort_order: vec![],
file_decryption_properties: None,
cache_metadata: None,
}
}
}
Expand Down Expand Up @@ -325,6 +331,12 @@ impl<'a> ParquetReadOptions<'a> {
self.file_decryption_properties = Some(file_decryption_properties);
self
}

/// Specify whether to enable or not metadata caching
pub fn cache_metadata(mut self, cache_metadata: bool) -> Self {
self.cache_metadata = Some(cache_metadata);
self
}
}

/// Options that control the reading of ARROW files.
Expand Down Expand Up @@ -590,6 +602,9 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
if let Some(file_decryption_properties) = &self.file_decryption_properties {
options.crypto.file_decryption = Some(file_decryption_properties.clone());
}
if let Some(cache_metadata) = self.cache_metadata {
options.global.cache_metadata = cache_metadata;
}
let mut file_format = ParquetFormat::new().with_options(options);

if let Some(parquet_pruning) = self.parquet_pruning {
Expand Down
18 changes: 17 additions & 1 deletion datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ use datafusion_physical_plan::Accumulator;
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
use datafusion_session::Session;

use crate::reader::CachedParquetFileReaderFactory;
use crate::source::{parse_coerce_int96_string, ParquetSource};
use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -435,7 +436,7 @@ impl FileFormat for ParquetFormat {

async fn create_physical_plan(
&self,
_state: &dyn Session,
state: &dyn Session,
conf: FileScanConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut metadata_size_hint = None;
Expand All @@ -446,6 +447,21 @@ impl FileFormat for ParquetFormat {

let mut source = ParquetSource::new(self.options.clone());

// Use the CachedParquetFileReaderFactory when metadata caching is enabled
if self.options.global.cache_metadata {
if let Some(metadata_cache) =
state.runtime_env().cache_manager.get_file_metadata_cache()
{
let store = state
.runtime_env()
.object_store(conf.object_store_url.clone())?;
let cached_parquet_read_factory =
Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
source =
source.with_parquet_file_reader_factory(cached_parquet_read_factory);
}
}

if let Some(metadata_size_hint) = metadata_size_hint {
source = source.with_metadata_size_hint(metadata_size_hint)
}
Expand Down
131 changes: 130 additions & 1 deletion datafusion/datasource-parquet/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
use crate::ParquetFileMetrics;
use bytes::Bytes;
use datafusion_datasource::file_meta::FileMeta;
use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::future::BoxFuture;
use futures::FutureExt;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
Expand Down Expand Up @@ -150,3 +152,130 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
}))
}
}

/// Implementation of [`ParquetFileReaderFactory`] supporting the caching of footer and page
/// metadata. Reads and updates the [`FileMetadataCache`] with the [`ParquetMetaData`] data.
/// This reader always loads the entire metadata (including page index, unless the file is
/// encrypted), even if not required by the current query, to ensure it is always available for
/// those that need it.
#[derive(Debug)]
pub struct CachedParquetFileReaderFactory {
store: Arc<dyn ObjectStore>,
metadata_cache: FileMetadataCache,
}

impl CachedParquetFileReaderFactory {
pub fn new(store: Arc<dyn ObjectStore>, metadata_cache: FileMetadataCache) -> Self {
Self {
store,
metadata_cache,
}
}
}

impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
fn create_reader(
&self,
partition_index: usize,
file_meta: FileMeta,
metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
let file_metrics = ParquetFileMetrics::new(
partition_index,
file_meta.location().as_ref(),
metrics,
);
let store = Arc::clone(&self.store);

let mut inner =
ParquetObjectReader::new(store, file_meta.object_meta.location.clone())
.with_file_size(file_meta.object_meta.size);

if let Some(hint) = metadata_size_hint {
inner = inner.with_footer_size_hint(hint)
};

Ok(Box::new(CachedParquetFileReader {
inner,
file_metrics,
file_meta,
metadata_cache: Arc::clone(&self.metadata_cache),
}))
}
}

/// Implements [`AsyncFileReader`] for a Parquet file in object storage. Reads the file metadata
/// from the [`FileMetadataCache`], if available, otherwise reads it directly from the file and then
/// updates the cache.
pub(crate) struct CachedParquetFileReader {
pub file_metrics: ParquetFileMetrics,
pub inner: ParquetObjectReader,
file_meta: FileMeta,
metadata_cache: FileMetadataCache,
}

impl AsyncFileReader for CachedParquetFileReader {
fn get_bytes(
&mut self,
range: Range<u64>,
) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
let bytes_scanned = range.end - range.start;
self.file_metrics.bytes_scanned.add(bytes_scanned as usize);
self.inner.get_bytes(range)
}

fn get_byte_ranges(
&mut self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
where
Self: Send,
{
let total: u64 = ranges.iter().map(|r| r.end - r.start).sum();
self.file_metrics.bytes_scanned.add(total as usize);
self.inner.get_byte_ranges(ranges)
}

fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
let file_meta = self.file_meta.clone();
let metadata_cache = Arc::clone(&self.metadata_cache);

async move {
let object_meta = &file_meta.object_meta;

// lookup if the metadata is already cached
if let Some(metadata) =
metadata_cache.get_with_extra(&object_meta.location, object_meta)
{
if let Ok(parquet_metadata) = Arc::downcast::<ParquetMetaData>(metadata) {
return Ok(Arc::clone(&parquet_metadata));
}
}

let mut reader = ParquetMetaDataReader::new();
// the page index can only be loaded with unencrypted files
if let Some(file_decryption_properties) =
options.and_then(|o| o.file_decryption_properties())
{
reader =
reader.with_decryption_properties(Some(file_decryption_properties));
} else {
reader = reader.with_page_indexes(true);
}
reader.try_load(&mut self.inner, object_meta.size).await?;
let metadata = Arc::new(reader.finish()?);

metadata_cache.put_with_extra(
&object_meta.location,
Arc::clone(&metadata) as Arc<FileMetadata>,
object_meta,
);
Ok(metadata)
}
.boxed()
}
}
37 changes: 37 additions & 0 deletions datafusion/execution/src/cache/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use crate::cache::cache_unit::DefaultFilesMetadataCache;
use crate::cache::CacheAccessor;
use datafusion_common::{Result, Statistics};
use object_store::path::Path;
use object_store::ObjectMeta;
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

Expand All @@ -32,6 +34,13 @@ pub type FileStatisticsCache =
pub type ListFilesCache =
Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>;

/// Represents generic file-embedded metadata.
pub type FileMetadata = dyn Any + Send + Sync;

/// Cache to store file-embedded metadata.
pub type FileMetadataCache =
Arc<dyn CacheAccessor<Path, Arc<FileMetadata>, Extra = ObjectMeta>>;

impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
Expand All @@ -44,10 +53,17 @@ impl Debug for dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>
}
}

impl Debug for dyn CacheAccessor<Path, Arc<FileMetadata>, Extra = ObjectMeta> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
}
}

#[derive(Default, Debug)]
pub struct CacheManager {
file_statistic_cache: Option<FileStatisticsCache>,
list_files_cache: Option<ListFilesCache>,
file_metadata_cache: Option<FileMetadataCache>,
}

impl CacheManager {
Expand All @@ -59,6 +75,13 @@ impl CacheManager {
if let Some(lc) = &config.list_files_cache {
manager.list_files_cache = Some(Arc::clone(lc))
}
if let Some(mc) = &config.file_metadata_cache {
manager.file_metadata_cache = Some(Arc::clone(mc));
} else {
manager.file_metadata_cache =
Some(Arc::new(DefaultFilesMetadataCache::default()));
}

Ok(Arc::new(manager))
}

Expand All @@ -71,6 +94,11 @@ impl CacheManager {
pub fn get_list_files_cache(&self) -> Option<ListFilesCache> {
self.list_files_cache.clone()
}

/// Get the file embedded metadata cache.
pub fn get_file_metadata_cache(&self) -> Option<FileMetadataCache> {
self.file_metadata_cache.clone()
}
}

#[derive(Clone, Default)]
Expand All @@ -86,6 +114,10 @@ pub struct CacheManagerConfig {
/// location.
/// Default is disable.
pub list_files_cache: Option<ListFilesCache>,
/// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a
/// data file (e.g., Parquet footer and page metadata).
/// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`].
pub file_metadata_cache: Option<FileMetadataCache>,
}

impl CacheManagerConfig {
Expand All @@ -101,4 +133,9 @@ impl CacheManagerConfig {
self.list_files_cache = cache;
self
}

pub fn with_file_metadata_cache(mut self, cache: Option<FileMetadataCache>) -> Self {
self.file_metadata_cache = cache;
self
}
}
Loading
Loading