Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions datafusion-examples/examples/parquet_encrypted_with_kms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use base64::Engine;
use datafusion::common::extensions_options;
use datafusion::config::{EncryptionFactoryOptions, TableParquetOptions};
Expand Down Expand Up @@ -211,6 +212,7 @@ struct TestEncryptionFactory {}

/// `EncryptionFactory` is a DataFusion trait for types that generate
/// file encryption and decryption properties.
#[async_trait]
impl EncryptionFactory for TestEncryptionFactory {
/// Generate file encryption properties to use when writing a Parquet file.
/// The `schema` is provided so that it may be used to dynamically configure
Expand All @@ -219,7 +221,7 @@ impl EncryptionFactory for TestEncryptionFactory {
/// but other implementations may want to use this to compute an
/// AAD prefix for the file, or to allow use of external key material
/// (where key metadata is stored in a JSON file alongside Parquet files).
fn get_file_encryption_properties(
async fn get_file_encryption_properties(
&self,
options: &EncryptionFactoryOptions,
schema: &SchemaRef,
Expand Down Expand Up @@ -262,7 +264,7 @@ impl EncryptionFactory for TestEncryptionFactory {
/// Generate file decryption properties to use when reading a Parquet file.
/// Rather than provide the AES keys directly for decryption, we set a `KeyRetriever`
/// that can determine the keys using the encryption metadata.
fn get_file_decryption_properties(
async fn get_file_decryption_properties(
&self,
_options: &EncryptionFactoryOptions,
_file_path: &Path,
Expand Down
19 changes: 12 additions & 7 deletions datafusion/core/tests/parquet/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use arrow::array::{ArrayRef, Int32Array, StringArray};
use arrow::record_batch::RecordBatch;
use arrow_schema::{DataType, SchemaRef};
use async_trait::async_trait;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::datasource::listing::ListingOptions;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
Expand Down Expand Up @@ -175,7 +176,9 @@ async fn round_trip_parquet_with_encryption_factory() {
// Crypto factory should have generated one key per partition file
assert_eq!(encryption_factory.encryption_keys.lock().unwrap().len(), 3);

verify_table_encrypted(tmpdir.path(), &encryption_factory).unwrap();
verify_table_encrypted(tmpdir.path(), &encryption_factory)
.await
.unwrap();

// Registering table without decryption properties should fail
let table_path = format!("file://{}/", tmpdir.path().to_str().unwrap());
Expand Down Expand Up @@ -255,7 +258,7 @@ async fn round_trip_parquet_with_encryption_factory() {
assert_batches_sorted_eq!(expected, &table);
}

fn verify_table_encrypted(
async fn verify_table_encrypted(
table_path: &Path,
encryption_factory: &Arc<MockEncryptionFactory>,
) -> datafusion_common::Result<()> {
Expand All @@ -267,7 +270,7 @@ fn verify_table_encrypted(
if path.is_dir() {
directories.push(path);
} else {
verify_file_encrypted(&path, encryption_factory)?;
verify_file_encrypted(&path, encryption_factory).await?;
files_visited += 1;
}
}
Expand All @@ -276,7 +279,7 @@ fn verify_table_encrypted(
Ok(())
}

fn verify_file_encrypted(
async fn verify_file_encrypted(
file_path: &Path,
encryption_factory: &Arc<MockEncryptionFactory>,
) -> datafusion_common::Result<()> {
Expand All @@ -296,7 +299,8 @@ fn verify_file_encrypted(

let object_path = object_store::path::Path::from(file_path_str);
let decryption_properties = encryption_factory
.get_file_decryption_properties(&options, &object_path)?
.get_file_decryption_properties(&options, &object_path)
.await?
.unwrap();

let reader_options =
Expand Down Expand Up @@ -325,8 +329,9 @@ struct MockEncryptionFactory {
pub counter: AtomicU8,
}

#[async_trait]
impl EncryptionFactory for MockEncryptionFactory {
fn get_file_encryption_properties(
async fn get_file_encryption_properties(
&self,
config: &EncryptionFactoryOptions,
_schema: &SchemaRef,
Expand All @@ -344,7 +349,7 @@ impl EncryptionFactory for MockEncryptionFactory {
Ok(Some(encryption_properties))
}

fn get_file_decryption_properties(
async fn get_file_decryption_properties(
&self,
config: &EncryptionFactoryOptions,
file_path: &object_store::path::Path,
Expand Down
34 changes: 20 additions & 14 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ fn clear_metadata(
}

#[cfg(feature = "parquet_encryption")]
fn get_file_decryption_properties(
async fn get_file_decryption_properties(
state: &dyn Session,
options: &TableParquetOptions,
file_path: &Path,
Expand All @@ -314,10 +314,12 @@ fn get_file_decryption_properties(
Some(factory_id) => {
let factory =
state.runtime_env().parquet_encryption_factory(factory_id)?;
factory.get_file_decryption_properties(
&options.crypto.factory_options,
file_path,
)?
factory
.get_file_decryption_properties(
&options.crypto.factory_options,
file_path,
)
.await?
}
None => None,
},
Expand All @@ -326,7 +328,7 @@ fn get_file_decryption_properties(
}

#[cfg(not(feature = "parquet_encryption"))]
fn get_file_decryption_properties(
async fn get_file_decryption_properties(
_state: &dyn Session,
_options: &TableParquetOptions,
_file_path: &Path,
Expand Down Expand Up @@ -379,7 +381,8 @@ impl FileFormat for ParquetFormat {
state,
&self.options,
&object.location,
)?;
)
.await?;
let result = DFParquetMetadata::new(store.as_ref(), object)
.with_metadata_size_hint(self.metadata_size_hint())
.with_decryption_properties(file_decryption_properties.as_ref())
Expand Down Expand Up @@ -437,7 +440,8 @@ impl FileFormat for ParquetFormat {
object: &ObjectMeta,
) -> Result<Statistics> {
let file_decryption_properties =
get_file_decryption_properties(state, &self.options, &object.location)?;
get_file_decryption_properties(state, &self.options, &object.location)
.await?;
let file_metadata_cache =
state.runtime_env().cache_manager.get_file_metadata_cache();
DFParquetMetadata::new(store, object)
Expand Down Expand Up @@ -1119,7 +1123,7 @@ impl ParquetSink {

/// Create writer properties based upon configuration settings,
/// including partitioning and the inclusion of arrow schema metadata.
fn create_writer_props(
async fn create_writer_props(
&self,
runtime: &Arc<RuntimeEnv>,
path: &Path,
Expand Down Expand Up @@ -1147,7 +1151,8 @@ impl ParquetSink {
&parquet_opts,
schema,
path,
)?;
)
.await?;
Ok(builder.build())
}

Expand Down Expand Up @@ -1188,7 +1193,7 @@ impl ParquetSink {
}

#[cfg(feature = "parquet_encryption")]
fn set_writer_encryption_properties(
async fn set_writer_encryption_properties(
builder: WriterPropertiesBuilder,
runtime: &Arc<RuntimeEnv>,
parquet_opts: &TableParquetOptions,
Expand All @@ -1208,7 +1213,8 @@ fn set_writer_encryption_properties(
&parquet_opts.crypto.factory_options,
schema,
path,
)?;
)
.await?;
if let Some(file_encryption_properties) = file_encryption_properties {
return Ok(
builder.with_file_encryption_properties(file_encryption_properties)
Expand All @@ -1219,7 +1225,7 @@ fn set_writer_encryption_properties(
}

#[cfg(not(feature = "parquet_encryption"))]
fn set_writer_encryption_properties(
async fn set_writer_encryption_properties(
builder: WriterPropertiesBuilder,
_runtime: &Arc<RuntimeEnv>,
_parquet_opts: &TableParquetOptions,
Expand Down Expand Up @@ -1269,7 +1275,7 @@ impl FileSink for ParquetSink {
};

while let Some((path, mut rx)) = file_stream_rx.recv().await {
let parquet_props = self.create_writer_props(&runtime, &path)?;
let parquet_props = self.create_writer_props(&runtime, &path).await?;
if !allow_single_file_parallelism {
let mut writer = self
.create_async_arrow_writer(
Expand Down
69 changes: 54 additions & 15 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl FileOpener for ParquetOpener {
fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result<FileOpenFuture> {
let file_range = file_meta.range.clone();
let extensions = file_meta.extensions.clone();
let file_location = file_meta.location();
let file_location = file_meta.location().clone();
let file_name = file_location.to_string();
let file_metrics =
ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics);
Expand Down Expand Up @@ -152,16 +152,18 @@ impl FileOpener for ParquetOpener {
let mut predicate_file_schema = Arc::clone(&self.logical_file_schema);

let mut enable_page_index = self.enable_page_index;
let file_decryption_properties =
self.get_file_decryption_properties(file_location)?;

// For now, page index does not work with encrypted files. See:
// https://github.com/apache/arrow-rs/issues/7629
if file_decryption_properties.is_some() {
enable_page_index = false;
}
let encryption_context = self.get_encryption_context();

Ok(Box::pin(async move {
let file_decryption_properties = encryption_context
.get_file_decryption_properties(&file_location)
.await?;
// For now, page index does not work with encrypted files. See:
// https://github.com/apache/arrow-rs/issues/7629
Comment on lines +161 to +162
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed it should be possible to remove this workaround now. I've made #17352 to follow up on this as it should be a separate change

if file_decryption_properties.is_some() {
enable_page_index = false;
}

// Prune this file using the file level statistics and partition values.
// Since dynamic filters may have been updated since planning it is possible that we are able
// to prune files now that we couldn't prune at planning time.
Expand Down Expand Up @@ -508,9 +510,30 @@ where
}
}

#[derive(Default)]
struct EncryptionContext {
#[cfg(feature = "parquet_encryption")]
file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
#[cfg(feature = "parquet_encryption")]
encryption_factory: Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)>,
}

#[cfg(feature = "parquet_encryption")]
impl ParquetOpener {
fn get_file_decryption_properties(
impl EncryptionContext {
fn new(
file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
encryption_factory: Option<(
Arc<dyn EncryptionFactory>,
EncryptionFactoryOptions,
)>,
) -> Self {
Self {
file_decryption_properties,
encryption_factory,
}
}

async fn get_file_decryption_properties(
&self,
file_location: &object_store::path::Path,
) -> Result<Option<Arc<FileDecryptionProperties>>> {
Expand All @@ -520,7 +543,8 @@ impl ParquetOpener {
}
None => match &self.encryption_factory {
Some((encryption_factory, encryption_config)) => Ok(encryption_factory
.get_file_decryption_properties(encryption_config, file_location)?
.get_file_decryption_properties(encryption_config, file_location)
.await?
.map(Arc::new)),
None => Ok(None),
},
Expand All @@ -529,12 +553,27 @@ impl ParquetOpener {
}

#[cfg(not(feature = "parquet_encryption"))]
impl ParquetOpener {
fn get_file_decryption_properties(
impl EncryptionContext {
async fn get_file_decryption_properties(
&self,
_file_location: &object_store::path::Path,
) -> Result<Option<Arc<FileDecryptionProperties>>> {
Ok(self.file_decryption_properties.clone())
Ok(None)
}
}

impl ParquetOpener {
#[cfg(feature = "parquet_encryption")]
fn get_encryption_context(&self) -> EncryptionContext {
EncryptionContext::new(
self.file_decryption_properties.clone(),
self.encryption_factory.clone(),
)
}

#[cfg(not(feature = "parquet_encryption"))]
fn get_encryption_context(&self) -> EncryptionContext {
EncryptionContext::default()
}
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ parquet_encryption = [

[dependencies]
arrow = { workspace = true }
async-trait = { workspace = true }
dashmap = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-expr = { workspace = true }
Expand Down
6 changes: 4 additions & 2 deletions datafusion/execution/src/parquet_encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use dashmap::DashMap;
use datafusion_common::config::EncryptionFactoryOptions;
use datafusion_common::error::Result;
Expand All @@ -32,17 +33,18 @@ use std::sync::Arc;
/// For example usage, see the [`parquet_encrypted_with_kms` example].
///
/// [`parquet_encrypted_with_kms` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/parquet_encrypted_with_kms.rs
#[async_trait]
pub trait EncryptionFactory: Send + Sync + std::fmt::Debug + 'static {
/// Generate file encryption properties to use when writing a Parquet file.
fn get_file_encryption_properties(
async fn get_file_encryption_properties(
&self,
config: &EncryptionFactoryOptions,
schema: &SchemaRef,
file_path: &Path,
) -> Result<Option<FileEncryptionProperties>>;

/// Generate file decryption properties to use when reading a Parquet file.
fn get_file_decryption_properties(
async fn get_file_decryption_properties(
&self,
config: &EncryptionFactoryOptions,
file_path: &Path,
Expand Down