From e16b52154e3dc740da538dc90b19f69044d25767 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Tue, 15 Jul 2025 12:46:41 +1200 Subject: [PATCH 01/17] Add encryption factory API for more flexible encryption configuration --- Cargo.lock | 16 + datafusion-examples/Cargo.toml | 4 +- .../examples/parquet_encrypted_with_kms.rs | 289 ++++++++++++++++++ datafusion/common/src/config.rs | 79 +++++ datafusion/common/src/encryption.rs | 29 +- .../common/src/file_options/parquet_writer.rs | 7 +- datafusion/core/Cargo.toml | 2 +- .../core/src/execution/context/parquet.rs | 14 + datafusion/core/tests/memory_limit/mod.rs | 4 + datafusion/core/tests/parquet/encryption.rs | 197 +++++++++++- datafusion/core/tests/parquet/mod.rs | 1 + datafusion/datasource-parquet/Cargo.toml | 1 + .../datasource-parquet/src/file_format.rs | 166 ++++++++-- datafusion/datasource-parquet/src/opener.rs | 56 +++- datafusion/datasource-parquet/src/source.rs | 42 ++- datafusion/execution/Cargo.toml | 6 + datafusion/execution/src/lib.rs | 2 + .../execution/src/parquet_encryption.rs | 126 ++++++++ datafusion/execution/src/runtime_env.rs | 40 +++ 19 files changed, 1012 insertions(+), 69 deletions(-) create mode 100644 datafusion-examples/examples/parquet_encrypted_with_kms.rs create mode 100644 datafusion/execution/src/parquet_encryption.rs diff --git a/Cargo.lock b/Cargo.lock index cb13a55b56da..34ef755521b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2187,6 +2187,8 @@ dependencies = [ "mimalloc", "nix", "object_store", + "parquet", + "parquet-key-management", "prost", "tempfile", "test-utils", @@ -2212,6 +2214,7 @@ dependencies = [ "log", "object_store", "parking_lot", + "parquet", "rand 0.9.1", "tempfile", "url", @@ -4503,6 +4506,19 @@ dependencies = [ "zstd", ] +[[package]] +name = "parquet-key-management" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea4e4d465462ac90306e7551f8d7d959f05a658c445bf55dd6b87578ad31a0d8" +dependencies = [ + "base64 0.22.1", + "parquet", + "ring", + "serde", + "serde_json", +] + [[package]] name = "parse-display" version = "0.9.1" diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index b31708a5c1cc..06b6ec5a8b72 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -61,7 +61,7 @@ async-trait = { workspace = true } bytes = { workspace = true } dashmap = { workspace = true } # note only use main datafusion crate for examples -datafusion = { workspace = true, default-features = true } +datafusion = { workspace = true, default-features = true, features = ["parquet_encryption"] } datafusion-ffi = { workspace = true } datafusion-proto = { workspace = true } env_logger = { workspace = true } @@ -69,6 +69,8 @@ futures = { workspace = true } log = { workspace = true } mimalloc = { version = "0.1", default-features = false } object_store = { workspace = true, features = ["aws", "http"] } +parquet = { workspace = true, features = ["encryption"] } +parquet-key-management = { version = "0.3", features = ["_test_utils"] } prost = { workspace = true } tempfile = { workspace = true } test-utils = { path = "../test-utils" } diff --git a/datafusion-examples/examples/parquet_encrypted_with_kms.rs b/datafusion-examples/examples/parquet_encrypted_with_kms.rs new file mode 100644 index 000000000000..0d3413d65112 --- /dev/null +++ b/datafusion-examples/examples/parquet_encrypted_with_kms.rs @@ -0,0 +1,289 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray}; +use arrow_schema::SchemaRef; +use datafusion::common::{extensions_options, DataFusionError}; +use datafusion::config::TableParquetOptions; +use datafusion::dataframe::DataFrameWriteOptions; +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::ListingOptions; +use datafusion::error::Result; +use datafusion::execution::parquet_encryption::EncryptionFactory; +use datafusion::prelude::SessionContext; +use futures::StreamExt; +use object_store::path::Path; +use parquet::encryption::decrypt::FileDecryptionProperties; +use parquet::encryption::encrypt::FileEncryptionProperties; +use parquet_key_management::crypto_factory::{ + CryptoFactory, DecryptionConfiguration, EncryptionConfiguration, +}; +use parquet_key_management::kms::KmsConnectionConfig; +use parquet_key_management::test_kms::TestKmsClientFactory; +use std::collections::HashSet; +use std::fmt::Formatter; +use std::sync::Arc; +use tempfile::TempDir; + +/// This example demonstrates reading and writing Parquet files that +/// are encrypted using Parquet Modular Encryption, and uses the +/// parquet-key-management crate to integrate with a Key Management Server (KMS). + +const ENCRYPTION_FACTORY_ID: &'static str = "example.memory_kms_encryption"; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + + // Register an `EncryptionFactory` implementation to be used for Parquet encryption + // in the session context. + // This example uses an in-memory test KMS from the `parquet_key_management` crate with + // a custom `KmsEncryptionFactory` wrapper type to integrate with DataFusion. + // `EncryptionFactory` instances are registered with a name to identify them so + // they can be later referenced in configuration options, and it's possible to register + // multiple different factories to handle different ways of encrypting Parquet. + let crypto_factory = CryptoFactory::new(TestKmsClientFactory::with_default_keys()); + let encryption_factory = KmsEncryptionFactory { crypto_factory }; + ctx.register_parquet_encryption_factory( + ENCRYPTION_FACTORY_ID, + Arc::new(encryption_factory), + ); + + // Register some simple test data + let a: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])); + let b: ArrayRef = Arc::new(Int32Array::from(vec![1, 10, 10, 100])); + let c: ArrayRef = Arc::new(Int32Array::from(vec![2, 20, 20, 200])); + let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)])?; + ctx.register_batch("test_data", batch)?; + + { + // Write and read with the programmatic API + let tmpdir = TempDir::new()?; + write_encrypted(&ctx, &tmpdir).await?; + let file_path = std::fs::read_dir(&tmpdir)?.next().unwrap()?.path(); + read_encrypted(&ctx, &file_path).await?; + } + + { + // Write and read with the SQL API + let tmpdir = TempDir::new()?; + write_encrypted_with_sql(&ctx, &tmpdir).await?; + let file_path = std::fs::read_dir(&tmpdir)?.next().unwrap()?.path(); + read_encrypted_with_sql(&ctx, &file_path).await?; + } + + Ok(()) +} + +/// Write an encrypted Parquet file +async fn write_encrypted(ctx: &SessionContext, tmpdir: &TempDir) -> Result<()> { + let df = ctx.table("test_data").await?; + + let mut parquet_options = TableParquetOptions::new(); + // We specify that we want to use Parquet encryption by setting the identifier of the + // encryption factory to use and providing the factory specific configuration. + // Our encryption factory requires specifying the master key identifier to + // use for encryption, and we can optionally configure which columns are encrypted. + let mut encryption_config = KmsEncryptionConfig::default(); + encryption_config.key_id = "kf".to_owned(); + encryption_config.encrypted_columns = "b,c".to_owned(); + parquet_options + .crypto + .configure_factory(ENCRYPTION_FACTORY_ID, &encryption_config); + + df.write_parquet( + tmpdir.path().to_str().unwrap(), + DataFrameWriteOptions::new(), + Some(parquet_options), + ) + .await?; + + println!("Encrypted Parquet written to {:?}", tmpdir.path()); + Ok(()) +} + +/// Read from an encrypted Parquet file +async fn read_encrypted(ctx: &SessionContext, file_path: &std::path::Path) -> Result<()> { + let mut parquet_options = TableParquetOptions::new(); + // Specify the encryption factory to use for decrypting Parquet. + // In this example, we don't require any additional configuration options when reading + // as key identifiers are stored in the key metadata. + parquet_options + .crypto + .configure_factory(ENCRYPTION_FACTORY_ID, &KmsEncryptionConfig::default()); + + let file_format = ParquetFormat::default().with_options(parquet_options); + let listing_options = ListingOptions::new(Arc::new(file_format)); + + let table_path = format!("file://{}", file_path.to_str().unwrap()); + + ctx.register_listing_table( + "encrypted_parquet_table", + &table_path, + listing_options.clone(), + None, + None, + ) + .await?; + + let mut batch_stream = ctx + .table("encrypted_parquet_table") + .await? + .execute_stream() + .await?; + println!("Reading encrypted Parquet as a RecordBatch stream"); + while let Some(batch) = batch_stream.next().await { + let batch = batch?; + println!("Read batch with {} rows", batch.num_rows()); + } + + println!("Finished reading"); + Ok(()) +} + +/// Write an encrypted Parquet file using only SQL syntax with string configuration +async fn write_encrypted_with_sql(ctx: &SessionContext, tmpdir: &TempDir) -> Result<()> { + let output_path = tmpdir.path().to_str().unwrap(); + let query = format!( + "COPY test_data \ + TO '{output_path}' \ + STORED AS parquet + OPTIONS (\ + 'format.crypto.factory_id' '{ENCRYPTION_FACTORY_ID}', \ + 'format.crypto.factory_options.key_id' 'kf', \ + 'format.crypto.factory_options.encrypted_columns' 'b,c' \ + )" + ); + let _ = ctx.sql(&query).await?.collect().await?; + + println!("Encrypted Parquet written to {:?}", tmpdir.path()); + Ok(()) +} + +/// Read from an encrypted Parquet file using only the SQL API and string based configuration +async fn read_encrypted_with_sql( + ctx: &SessionContext, + file_path: &std::path::Path, +) -> Result<()> { + let file_path = file_path.to_str().unwrap(); + let ddl = format!( + "CREATE EXTERNAL TABLE encrypted_parquet_table_2 \ + STORED AS PARQUET LOCATION '{file_path}' OPTIONS (\ + 'format.crypto.factory_id' '{ENCRYPTION_FACTORY_ID}' \ + )" + ); + ctx.sql(&ddl).await?; + let df = ctx.sql("SELECT * FROM encrypted_parquet_table_2").await?; + let mut batch_stream = df.execute_stream().await?; + + println!("Reading encrypted Parquet as a RecordBatch stream"); + while let Some(batch) = batch_stream.next().await { + let batch = batch?; + println!("Read batch with {} rows", batch.num_rows()); + } + println!("Finished reading"); + Ok(()) +} + +// Options used to configure our example encryption factory +extensions_options! { + struct KmsEncryptionConfig { + /// Identifier of the encryption key to use + pub key_id: String, default = "".to_owned() + /// Comma separated list of columns to encrypt + pub encrypted_columns: String, default = "".to_owned() + } +} + +/// Wrapper type around `CryptoFactory` to allow implementing the `EncryptionFactory` trait +struct KmsEncryptionFactory { + crypto_factory: CryptoFactory, +} + +impl std::fmt::Debug for KmsEncryptionFactory { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("KmsEncryptionFactory") + .finish_non_exhaustive() + } +} + +/// `EncryptionFactory` is a trait defined by DataFusion that allows generating +/// file encryption and decryption properties. +impl EncryptionFactory for KmsEncryptionFactory { + type Options = KmsEncryptionConfig; + + /// Generate file encryption properties to use when writing a Parquet file. + /// The `FileSinkConfig` is provided so that the schema may be used to dynamically configure + /// per-column encryption keys. + /// Because `FileSinkConfig` can represent multiple output files, we also provide a + /// single file path so that external key material may be used (where key metadata is + /// stored in a JSON file alongside Parquet files). + fn get_file_encryption_properties( + &self, + config: &KmsEncryptionConfig, + schema: &SchemaRef, + _file_path: &Path, + ) -> Result> { + if config.key_id.is_empty() { + return Err(DataFusionError::Configuration( + "Key id for encryption is not set".to_owned(), + )); + }; + // Configure encryption key to use + let mut encryption_config_builder = + EncryptionConfiguration::builder(config.key_id.clone()); + + // Set up per-column encryption. + let encrypted_columns: HashSet<&str> = + config.encrypted_columns.split(",").collect(); + if !encrypted_columns.is_empty() { + let encrypted_columns: Vec = schema + .fields + .iter() + .filter(|f| encrypted_columns.contains(f.name().as_str())) + .map(|f| f.name().clone()) + .collect(); + encryption_config_builder = encryption_config_builder + .add_column_key(config.key_id.clone(), encrypted_columns); + } + let encryption_config = encryption_config_builder.build()?; + + // The KMS connection could be configured from the options if needed, + // but this example just uses the default options. + let kms_config = Arc::new(KmsConnectionConfig::default()); + + Ok(Some(self.crypto_factory.file_encryption_properties( + kms_config, + &encryption_config, + )?)) + } + + /// Generate file decryption properties to use when reading a Parquet file. + /// The `file_path` needs to be known to support encryption factories that use external key material. + fn get_file_decryption_properties( + &self, + _config: &KmsEncryptionConfig, + _file_path: &Path, + ) -> Result> { + let decryption_config = DecryptionConfiguration::builder().build(); + let kms_config = Arc::new(KmsConnectionConfig::default()); + Ok(Some(self.crypto_factory.file_decryption_properties( + kms_config, + decryption_config, + )?)) + } +} diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 31159d4a8588..e598d44fe2a8 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -194,6 +194,7 @@ macro_rules! config_namespace { } } } + config_namespace! { /// Options related to catalog and directory scanning /// @@ -673,6 +674,31 @@ config_namespace! { /// Optional file encryption properties pub file_encryption: Option, default = None + + /// Identifier for the encryption factory to use to create file encryption and decryption properties. + /// Encryption factories can be registered in a session with + /// [`SessionConfig::register_parquet_encryption_factory`]. + pub factory_id: Option, default = None + + /// Any encryption factory specific options + pub factory_options: EncryptionFactoryOptions, default = EncryptionFactoryOptions::default() + } +} + +impl ParquetEncryptionOptions { + /// Specify the encryption factory to use for Parquet modular encryption, along with its configuration + pub fn configure_factory( + &mut self, + factory_id: &str, + config: &impl ExtensionOptions, + ) { + self.factory_id = Some(factory_id.to_owned()); + self.factory_options.options.clear(); + for entry in config.entries() { + if let Some(value) = entry.value { + self.factory_options.options.insert(entry.key, value); + } + } } } @@ -2370,6 +2396,29 @@ impl From<&FileDecryptionProperties> for ConfigFileDecryptionProperties { } } +/// Holds implementation-specific options for an encryption factory +#[derive(Clone, Debug, Default, PartialEq)] +pub struct EncryptionFactoryOptions { + pub options: HashMap, +} + +impl ConfigField for EncryptionFactoryOptions { + fn visit(&self, v: &mut V, key: &str, _description: &'static str) { + for (option_key, option_value) in &self.options { + v.some( + &format!("{key}.{option_key}"), + option_value, + "Encryption factory specific option", + ); + } + } + + fn set(&mut self, key: &str, value: &str) -> Result<()> { + self.options.insert(key.to_owned(), value.to_owned()); + Ok(()) + } +} + config_namespace! { /// Options controlling CSV format pub struct CsvOptions { @@ -2809,6 +2858,36 @@ mod tests { ); } + #[cfg(feature = "parquet_encryption")] + #[test] + fn parquet_encryption_factory_config() { + let mut parquet_options = crate::config::TableParquetOptions::default(); + + assert_eq!(parquet_options.crypto.factory_id, None); + assert_eq!(parquet_options.crypto.factory_options.options.len(), 0); + + let mut input_config = TestExtensionConfig::default(); + input_config + .properties + .insert("key1".to_string(), "value 1".to_string()); + input_config + .properties + .insert("key2".to_string(), "value 2".to_string()); + + parquet_options + .crypto + .configure_factory("example_factory", &input_config); + + assert_eq!( + parquet_options.crypto.factory_id, + Some("example_factory".to_string()) + ); + let factory_options = &parquet_options.crypto.factory_options.options; + assert_eq!(factory_options.len(), 2); + assert_eq!(factory_options.get("key1"), Some(&"value 1".to_string())); + assert_eq!(factory_options.get("key2"), Some(&"value 2".to_string())); + } + #[cfg(feature = "parquet")] #[test] fn parquet_table_options_config_entry() { diff --git a/datafusion/common/src/encryption.rs b/datafusion/common/src/encryption.rs index 5d50d4a9efd3..5dd603a08112 100644 --- a/datafusion/common/src/encryption.rs +++ b/datafusion/common/src/encryption.rs @@ -28,24 +28,7 @@ pub struct FileDecryptionProperties; #[cfg(not(feature = "parquet_encryption"))] pub struct FileEncryptionProperties; -#[cfg(feature = "parquet")] -use crate::config::ParquetEncryptionOptions; pub use crate::config::{ConfigFileDecryptionProperties, ConfigFileEncryptionProperties}; -#[cfg(feature = "parquet")] -use parquet::file::properties::WriterPropertiesBuilder; - -#[cfg(feature = "parquet")] -pub fn add_crypto_to_writer_properties( - #[allow(unused)] crypto: &ParquetEncryptionOptions, - #[allow(unused_mut)] mut builder: WriterPropertiesBuilder, -) -> WriterPropertiesBuilder { - #[cfg(feature = "parquet_encryption")] - if let Some(file_encryption_properties) = &crypto.file_encryption { - builder = builder - .with_file_encryption_properties(file_encryption_properties.clone().into()); - } - builder -} #[cfg(feature = "parquet_encryption")] pub fn map_encryption_to_config_encryption( @@ -63,14 +46,14 @@ pub fn map_encryption_to_config_encryption( #[cfg(feature = "parquet_encryption")] pub fn map_config_decryption_to_decryption( - decryption: Option<&ConfigFileDecryptionProperties>, -) -> Option { - decryption.map(|fd| fd.clone().into()) + decryption: &ConfigFileDecryptionProperties, +) -> FileDecryptionProperties { + decryption.clone().into() } #[cfg(not(feature = "parquet_encryption"))] pub fn map_config_decryption_to_decryption( - _decryption: Option<&ConfigFileDecryptionProperties>, -) -> Option { - None + _decryption: &ConfigFileDecryptionProperties, +) -> FileDecryptionProperties { + FileDecryptionProperties {} } diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index cde0ea129979..eb3a5e6ad831 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -27,7 +27,6 @@ use crate::{ use arrow::datatypes::Schema; // TODO: handle once deprecated -use crate::encryption::add_crypto_to_writer_properties; #[allow(deprecated)] use parquet::{ arrow::ARROW_SCHEMA_META_KEY, @@ -96,13 +95,11 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { global, column_specific_options, key_value_metadata, - crypto, + crypto: _, } = table_parquet_options; let mut builder = global.into_writer_properties_builder()?; - builder = add_crypto_to_writer_properties(crypto, builder); - // check that the arrow schema is present in the kv_metadata, if configured to do so if !global.skip_arrow_metadata && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY) @@ -640,6 +637,8 @@ mod tests { crypto: ParquetEncryptionOptions { file_encryption: fep, file_decryption: None, + factory_id: None, + factory_options: Default::default(), }, } } diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index c4455e271c84..2d0af2fba296 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -68,7 +68,7 @@ encoding_expressions = ["datafusion-functions/encoding_expressions"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = ["datafusion-physical-plan/force_hash_collisions", "datafusion-common/force_hash_collisions"] math_expressions = ["datafusion-functions/math_expressions"] -parquet = ["datafusion-common/parquet", "dep:parquet", "datafusion-datasource-parquet"] +parquet = ["datafusion-common/parquet", "datafusion-execution/parquet", "dep:parquet", "datafusion-datasource-parquet"] parquet_encryption = [ "dep:parquet", "parquet/encryption", diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index 731f7e59ecfa..6ba708686a57 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -22,6 +22,8 @@ use super::{DataFilePaths, DataFrame, ExecutionPlan, Result, SessionContext}; use datafusion_datasource_parquet::plan_to_parquet; use datafusion_common::TableReference; +#[cfg(feature = "parquet_encryption")] +use datafusion_execution::parquet_encryption::DynEncryptionFactory; use parquet::file::properties::WriterProperties; impl SessionContext { @@ -94,6 +96,18 @@ impl SessionContext { ) -> Result<()> { plan_to_parquet(self.task_ctx(), plan, path, writer_properties).await } + + /// Registers a Parquet [`DynEncryptionFactory`] with an associated unique identifier. + /// If an encryption factory with the same identifier was already registered, it is replaced and returned. + #[cfg(feature = "parquet_encryption")] + pub fn register_parquet_encryption_factory( + &self, + id: &str, + encryption_factory: Arc, + ) -> Option> { + self.runtime_env() + .register_parquet_encryption_factory(id, encryption_factory) + } } #[cfg(test)] diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 2b262d4326cc..bc0ff4e54efd 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -567,6 +567,10 @@ async fn setup_context( disk_manager: Arc::new(disk_manager), cache_manager: runtime.cache_manager.clone(), object_store_registry: runtime.object_store_registry.clone(), + #[cfg(feature = "parquet_encryption")] + parquet_encryption_factory_registry: runtime + .parquet_encryption_factory_registry + .clone(), }); let config = SessionConfig::new() diff --git a/datafusion/core/tests/parquet/encryption.rs b/datafusion/core/tests/parquet/encryption.rs index 8e90b9aaa955..b28f7cc1e241 100644 --- a/datafusion/core/tests/parquet/encryption.rs +++ b/datafusion/core/tests/parquet/encryption.rs @@ -15,27 +15,27 @@ // specific language governing permissions and limitations // under the License. -//! non trivial integration testing for parquet predicate pushdown -//! -//! Testing hints: If you run this test with --nocapture it will tell you where -//! the generated parquet file went. You can then test it and try out various queries -//! datafusion-cli like: -//! -//! ```sql -//! create external table data stored as parquet location 'data.parquet'; -//! select * from data limit 10; -//! ``` +//! Tests for reading and writing Parquet files that use Parquet modular encryption +use arrow::array::{ArrayRef, Int32Array, StringArray}; use arrow::record_batch::RecordBatch; +use arrow_schema::{DataType, SchemaRef}; +use datafusion::dataframe::DataFrameWriteOptions; +use datafusion::datasource::listing::ListingOptions; use datafusion::prelude::{ParquetReadOptions, SessionContext}; -use std::fs::File; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - +use datafusion_common::config::{EncryptionFactoryOptions, TableParquetOptions}; +use datafusion_common::{assert_batches_sorted_eq, DataFusionError}; +use datafusion_datasource_parquet::ParquetFormat; +use datafusion_execution::parquet_encryption::DynEncryptionFactory; use parquet::arrow::ArrowWriter; use parquet::encryption::decrypt::FileDecryptionProperties; use parquet::encryption::encrypt::FileEncryptionProperties; use parquet::file::properties::WriterProperties; +use std::collections::HashMap; +use std::fs::File; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::{Arc, Mutex}; use tempfile::TempDir; async fn read_parquet_test_data<'a, T: Into>( @@ -128,3 +128,172 @@ async fn round_trip_encryption() { assert_eq!(num_rows_written, num_rows_read); } + +#[tokio::test] +async fn round_trip_parquet_with_encryption_factory() { + let ctx = SessionContext::new(); + let encryption_factory = Arc::new(MockEncryptionFactory::default()); + ctx.register_parquet_encryption_factory( + "test_encryption_factory", + Arc::clone(&encryption_factory) as Arc, + ); + + let tmpdir = TempDir::new().unwrap(); + + // Register some simple test data + let strings: ArrayRef = + Arc::new(StringArray::from(vec!["a", "b", "c", "a", "b", "c"])); + let x1: ArrayRef = Arc::new(Int32Array::from(vec![1, 10, 11, 100, 101, 111])); + let x2: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6])); + let batch = + RecordBatch::try_from_iter(vec![("string", strings), ("x1", x1), ("x2", x2)]) + .unwrap(); + let test_data_schema = batch.schema(); + ctx.register_batch("test_data", batch).unwrap(); + let df = ctx.table("test_data").await.unwrap(); + + // Write encrypted Parquet, partitioned by string column into separate files + let mut parquet_options = TableParquetOptions::new(); + parquet_options.crypto.factory_id = Some("test_encryption_factory".to_string()); + parquet_options + .crypto + .factory_options + .options + .insert("test_key".to_string(), "test value".to_string()); + + let df_write_options = + DataFrameWriteOptions::default().with_partition_by(vec!["string".to_string()]); + df.write_parquet( + tmpdir.path().to_str().unwrap(), + df_write_options, + Some(parquet_options.clone()), + ) + .await + .unwrap(); + + // Crypto factory should have generated one key per partition file + assert_eq!(encryption_factory.encryption_keys.lock().unwrap().len(), 3); + + // Registering table without decryption properties should fail + let table_path = format!("file://{}/", tmpdir.path().to_str().unwrap()); + let without_decryption_register = ctx + .register_listing_table( + "parquet_missing_decryption", + &table_path, + ListingOptions::new(Arc::new(ParquetFormat::default())), + None, + None, + ) + .await; + assert!(matches!( + without_decryption_register.unwrap_err(), + DataFusionError::ParquetError(_) + )); + + // Registering table succeeds if schema is provided + ctx.register_listing_table( + "parquet_missing_decryption", + &table_path, + ListingOptions::new(Arc::new(ParquetFormat::default())), + Some(test_data_schema), + None, + ) + .await + .unwrap(); + + // But trying to read from the table should fail + let without_decryption_read = ctx + .table("parquet_missing_decryption") + .await + .unwrap() + .collect() + .await; + assert!(matches!( + without_decryption_read.unwrap_err(), + DataFusionError::ParquetError(_) + )); + + // Register table with encryption factory specified + let listing_options = ListingOptions::new(Arc::new( + ParquetFormat::default().with_options(parquet_options), + )) + .with_table_partition_cols(vec![("string".to_string(), DataType::Utf8)]); + ctx.register_listing_table( + "parquet_with_decryption", + &table_path, + listing_options, + None, + None, + ) + .await + .unwrap(); + + // Can read correct data when encryption factory has been specified + let table = ctx + .table("parquet_with_decryption") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let expected = vec![ + "+-----+----+--------+", + "| x1 | x2 | string |", + "+-----+----+--------+", + "| 1 | 1 | a |", + "| 100 | 4 | a |", + "| 10 | 2 | b |", + "| 101 | 5 | b |", + "| 11 | 3 | c |", + "| 111 | 6 | c |", + "+-----+----+--------+", + ]; + assert_batches_sorted_eq!(expected, &table); +} + +/// Encryption factory implementation for use in tests, +/// which generates encryption keys in a sequence +#[derive(Debug, Default)] +struct MockEncryptionFactory { + pub encryption_keys: Mutex>>, + pub counter: AtomicU8, +} + +impl DynEncryptionFactory for MockEncryptionFactory { + fn get_file_encryption_properties( + &self, + config: &EncryptionFactoryOptions, + _schema: &SchemaRef, + file_path: &object_store::path::Path, + ) -> datafusion_common::Result> { + assert_eq!( + config.options.get("test_key"), + Some(&"test value".to_string()) + ); + let file_idx = self.counter.fetch_add(1, Ordering::Relaxed); + let key = vec![file_idx, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; + let mut keys = self.encryption_keys.lock().unwrap(); + keys.insert(file_path.clone(), key.clone()); + let encryption_properties = FileEncryptionProperties::builder(key).build()?; + Ok(Some(encryption_properties)) + } + + fn get_file_decryption_properties( + &self, + config: &EncryptionFactoryOptions, + file_path: &object_store::path::Path, + ) -> datafusion_common::Result> { + assert_eq!( + config.options.get("test_key"), + Some(&"test value".to_string()) + ); + let keys = self.encryption_keys.lock().unwrap(); + let key = keys.get(file_path).ok_or_else(|| { + DataFusionError::Execution(format!("No key for file {:?}", file_path)) + })?; + let decryption_properties = + FileDecryptionProperties::builder(key.clone()).build()?; + Ok(Some(decryption_properties)) + } +} diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 94d6d152a384..090c6cf95599 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -43,6 +43,7 @@ use std::sync::Arc; use tempfile::NamedTempFile; mod custom_reader; +#[cfg(feature = "parquet_encryption")] mod encryption; mod external_access_plan; mod file_statistics; diff --git a/datafusion/datasource-parquet/Cargo.toml b/datafusion/datasource-parquet/Cargo.toml index 8a75a445c8ff..6bccd76b60fc 100644 --- a/datafusion/datasource-parquet/Cargo.toml +++ b/datafusion/datasource-parquet/Cargo.toml @@ -71,5 +71,6 @@ path = "src/mod.rs" parquet_encryption = [ "parquet/encryption", "datafusion-common/parquet_encryption", + "datafusion-execution/parquet_encryption", "dep:hex", ] diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 43b0886193e7..ae60ebef4961 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -39,9 +39,9 @@ use datafusion_datasource::write::demux::DemuxedStreamReceiver; use arrow::compute::sum; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions}; -use datafusion_common::encryption::{ - map_config_decryption_to_decryption, FileDecryptionProperties, -}; +#[cfg(feature = "parquet_encryption")] +use datafusion_common::encryption::map_config_decryption_to_decryption; +use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; use datafusion_common::{ @@ -67,6 +67,7 @@ use crate::source::{parse_coerce_int96_string, ParquetSource}; use async_trait::async_trait; use bytes::Bytes; use datafusion_datasource::source::DataSourceExec; +use datafusion_execution::runtime_env::RuntimeEnv; use futures::future::BoxFuture; use futures::{FutureExt, StreamExt, TryStreamExt}; use log::debug; @@ -304,24 +305,60 @@ fn clear_metadata( } async fn fetch_schema_with_location( + state: &dyn Session, store: &dyn ObjectStore, + options: &TableParquetOptions, file: &ObjectMeta, metadata_size_hint: Option, - file_decryption_properties: Option<&FileDecryptionProperties>, coerce_int96: Option, ) -> Result<(Path, Schema)> { + let file_decryption_properties = + get_file_decryption_properties(state, options, &file.location)?; let loc_path = file.location.clone(); let schema = fetch_schema( store, file, metadata_size_hint, - file_decryption_properties, + file_decryption_properties.as_ref(), coerce_int96, ) .await?; Ok((loc_path, schema)) } +#[cfg(feature = "parquet_encryption")] +fn get_file_decryption_properties( + state: &dyn Session, + options: &TableParquetOptions, + file_path: &Path, +) -> Result> { + let file_decryption_properties: Option = + match &options.crypto.file_decryption { + Some(cfd) => Some(map_config_decryption_to_decryption(cfd)), + None => match &options.crypto.factory_id { + Some(factory_id) => { + let factory = + state.runtime_env().parquet_encryption_factory(factory_id)?; + factory.get_file_decryption_properties( + &options.crypto.factory_options, + file_path, + )? + } + None => None, + }, + }; + Ok(file_decryption_properties) +} + +#[cfg(not(feature = "parquet_encryption"))] +fn get_file_decryption_properties( + _state: &dyn Session, + _options: &TableParquetOptions, + _file_path: &Path, +) -> Result> { + Ok(None) +} + #[async_trait] impl FileFormat for ParquetFormat { fn as_any(&self) -> &dyn Any { @@ -357,18 +394,15 @@ impl FileFormat for ParquetFormat { Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?), None => None, }; - let file_decryption_properties: Option = - map_config_decryption_to_decryption( - self.options.crypto.file_decryption.as_ref(), - ); let mut schemas: Vec<_> = futures::stream::iter(objects) .map(|object| { fetch_schema_with_location( + state, store.as_ref(), + &self.options, object, self.metadata_size_hint(), - file_decryption_properties.as_ref(), coerce_int96, ) }) @@ -413,15 +447,13 @@ impl FileFormat for ParquetFormat { async fn infer_stats( &self, - _state: &dyn Session, + state: &dyn Session, store: &Arc, table_schema: SchemaRef, object: &ObjectMeta, ) -> Result { - let file_decryption_properties: Option = - map_config_decryption_to_decryption( - self.options.crypto.file_decryption.as_ref(), - ); + let file_decryption_properties = + get_file_decryption_properties(state, &self.options, &object.location)?; let stats = fetch_statistics( store.as_ref(), table_schema, @@ -435,7 +467,7 @@ impl FileFormat for ParquetFormat { async fn create_physical_plan( &self, - _state: &dyn Session, + state: &dyn Session, conf: FileScanConfig, ) -> Result> { let mut metadata_size_hint = None; @@ -449,6 +481,9 @@ impl FileFormat for ParquetFormat { if let Some(metadata_size_hint) = metadata_size_hint { source = source.with_metadata_size_hint(metadata_size_hint) } + + source = self.set_source_encryption_factory(source, state)?; + // Apply schema adapter factory before building the new config let file_source = source.apply_schema_adapter(&conf)?; @@ -479,6 +514,40 @@ impl FileFormat for ParquetFormat { } } +#[cfg(feature = "parquet_encryption")] +impl ParquetFormat { + fn set_source_encryption_factory( + &self, + source: ParquetSource, + state: &dyn Session, + ) -> Result { + if let Some(encryption_factory_id) = &self.options.crypto.factory_id { + Ok(source.with_encryption_factory( + state + .runtime_env() + .parquet_encryption_factory(encryption_factory_id)?, + )) + } else { + Ok(source) + } + } +} + +#[cfg(not(feature = "parquet_encryption"))] +impl ParquetFormat { + fn set_source_encryption_factory( + &self, + source: ParquetSource, + _state: &dyn Session, + ) -> Result { + if let Some(encryption_factory_id) = &self.options.crypto.factory_id { + Err(DataFusionError::Configuration(format!("Parquet encryption factory id is set to '{encryption_factory_id}' but the parquet_encryption feature is disabled"))) + } else { + Ok(source) + } + } +} + /// Apply necessary schema type coercions to make file schema match table schema. /// /// This function performs two main types of transformations in a single pass: @@ -1233,7 +1302,11 @@ impl ParquetSink { /// Create writer properties based upon configuration settings, /// including partitioning and the inclusion of arrow schema metadata. - fn create_writer_props(&self) -> Result { + fn create_writer_props( + &self, + runtime: &Arc, + path: &Path, + ) -> Result { let schema = if self.parquet_options.global.allow_single_file_parallelism { // If parallelizing writes, we may be also be doing hive style partitioning // into multiple files which impacts the schema per file. @@ -1250,7 +1323,15 @@ impl ParquetSink { parquet_opts.arrow_schema(schema); } - Ok(WriterPropertiesBuilder::try_from(&parquet_opts)?.build()) + let mut builder = WriterPropertiesBuilder::try_from(&parquet_opts)?; + builder = set_writer_encryption_properties( + builder, + runtime, + &parquet_opts, + schema, + path, + )?; + Ok(builder.build()) } /// Creates an AsyncArrowWriter which serializes a parquet file to an ObjectStore @@ -1289,6 +1370,48 @@ impl ParquetSink { } } +#[cfg(feature = "parquet_encryption")] +fn set_writer_encryption_properties( + builder: WriterPropertiesBuilder, + runtime: &Arc, + parquet_opts: &TableParquetOptions, + schema: &Arc, + path: &Path, +) -> Result { + if let Some(file_encryption_properties) = &parquet_opts.crypto.file_encryption { + // Encryption properties have been specified directly + return Ok(builder + .with_file_encryption_properties(file_encryption_properties.clone().into())); + } else if let Some(encryption_factory_id) = &parquet_opts.crypto.factory_id.as_ref() { + // Encryption properties will be generated by an encryption factory + let encryption_factory = + runtime.parquet_encryption_factory(encryption_factory_id)?; + let file_encryption_properties = encryption_factory + .get_file_encryption_properties( + &parquet_opts.crypto.factory_options, + schema, + path, + )?; + if let Some(file_encryption_properties) = file_encryption_properties { + return Ok( + builder.with_file_encryption_properties(file_encryption_properties) + ); + } + } + Ok(builder) +} + +#[cfg(not(feature = "parquet_encryption"))] +fn set_writer_encryption_properties( + builder: WriterPropertiesBuilder, + _runtime: &Arc, + _parquet_opts: &TableParquetOptions, + _schema: &Arc, + _path: &Path, +) -> Result { + Ok(builder) +} + #[async_trait] impl FileSink for ParquetSink { fn config(&self) -> &FileSinkConfig { @@ -1306,7 +1429,9 @@ impl FileSink for ParquetSink { let mut allow_single_file_parallelism = parquet_opts.global.allow_single_file_parallelism; - if parquet_opts.crypto.file_encryption.is_some() { + if parquet_opts.crypto.file_encryption.is_some() + || parquet_opts.crypto.factory_id.is_some() + { // For now, arrow-rs does not support parallel writes with encryption // See https://github.com/apache/arrow-rs/issues/7359 allow_single_file_parallelism = false; @@ -1316,7 +1441,7 @@ impl FileSink for ParquetSink { std::result::Result<(Path, FileMetaData), DataFusionError>, > = JoinSet::new(); - let parquet_props = self.create_writer_props()?; + let runtime = context.runtime_env(); let parallel_options = ParallelParquetWriterOptions { max_parallel_row_groups: parquet_opts .global @@ -1327,6 +1452,7 @@ impl FileSink for ParquetSink { }; while let Some((path, mut rx)) = file_stream_rx.recv().await { + let parquet_props = self.create_writer_props(&runtime, &path)?; if !allow_single_file_parallelism { let mut writer = self .create_async_arrow_writer( diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index daed52e05950..f9f0a3feb416 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -43,6 +43,10 @@ use datafusion_physical_expr_common::physical_expr::{ use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder}; use datafusion_pruning::{build_pruning_predicate, FilePruner, PruningPredicate}; +#[cfg(feature = "parquet_encryption")] +use datafusion_common::config::EncryptionFactoryOptions; +#[cfg(feature = "parquet_encryption")] +use datafusion_execution::parquet_encryption::DynEncryptionFactory; use futures::{StreamExt, TryStreamExt}; use log::debug; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; @@ -93,13 +97,18 @@ pub(super) struct ParquetOpener { pub coerce_int96: Option, /// Optional parquet FileDecryptionProperties pub file_decryption_properties: Option>, + /// Optional factory to create file decryption properties dynamically + #[cfg(feature = "parquet_encryption")] + pub encryption_factory: + Option<(Arc, EncryptionFactoryOptions)>, } impl FileOpener for ParquetOpener { fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result { let file_range = file_meta.range.clone(); let extensions = file_meta.extensions.clone(); - let file_name = file_meta.location().to_string(); + let file_location = file_meta.location().clone(); + let file_name = file_location.to_string(); let file_metrics = ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics); @@ -134,7 +143,8 @@ impl FileOpener for ParquetOpener { .global_counter("num_predicate_creation_errors"); let mut enable_page_index = self.enable_page_index; - let file_decryption_properties = self.file_decryption_properties.clone(); + let file_decryption_properties = + self.get_file_decryption_properties(&file_location)?; // For now, page index does not work with encrypted files. See: // https://github.com/apache/arrow-rs/issues/7629 @@ -402,6 +412,38 @@ impl FileOpener for ParquetOpener { } } +#[cfg(feature = "parquet_encryption")] +impl ParquetOpener { + fn get_file_decryption_properties( + &self, + file_location: &object_store::path::Path, + ) -> Result>> { + // Creating props is delayed until here so that the file url/path is available, + // and we can handle errors from the encryption factory. + match &self.file_decryption_properties { + Some(file_decryption_properties) => { + Ok(Some(Arc::clone(file_decryption_properties))) + } + None => match &self.encryption_factory { + Some((encryption_factory, encryption_config)) => Ok(encryption_factory + .get_file_decryption_properties(encryption_config, file_location)? + .map(Arc::new)), + None => Ok(None), + }, + } + } +} + +#[cfg(not(feature = "parquet_encryption"))] +impl ParquetOpener { + fn get_file_decryption_properties( + &self, + _file_location: &object_store::path::Path, + ) -> Result>> { + Ok(self.file_decryption_properties.clone()) + } +} + /// Return the initial [`ParquetAccessPlan`] /// /// If the user has supplied one as an extension, use that @@ -633,6 +675,8 @@ mod test { enable_row_group_stats_pruning: true, coerce_int96: None, file_decryption_properties: None, + #[cfg(feature = "parquet_encryption")] + encryption_factory: None, } }; @@ -718,6 +762,8 @@ mod test { enable_row_group_stats_pruning: true, coerce_int96: None, file_decryption_properties: None, + #[cfg(feature = "parquet_encryption")] + encryption_factory: None, } }; @@ -819,6 +865,8 @@ mod test { enable_row_group_stats_pruning: true, coerce_int96: None, file_decryption_properties: None, + #[cfg(feature = "parquet_encryption")] + encryption_factory: None, } }; let make_meta = || FileMeta { @@ -930,6 +978,8 @@ mod test { enable_row_group_stats_pruning: false, // note that this is false! coerce_int96: None, file_decryption_properties: None, + #[cfg(feature = "parquet_encryption")] + encryption_factory: None, } }; @@ -1042,6 +1092,8 @@ mod test { enable_row_group_stats_pruning: true, coerce_int96: None, file_decryption_properties: None, + #[cfg(feature = "parquet_encryption")] + encryption_factory: None, } }; diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index f3cb143bc619..c9f9114af694 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -27,6 +27,8 @@ use crate::row_filter::can_expr_be_pushed_down_with_schemas; use crate::DefaultParquetFileReaderFactory; use crate::ParquetFileReaderFactory; use datafusion_common::config::ConfigOptions; +#[cfg(feature = "parquet_encryption")] +use datafusion_common::config::EncryptionFactoryOptions; use datafusion_datasource::as_file_source; use datafusion_datasource::file_stream::FileOpener; use datafusion_datasource::schema_adapter::{ @@ -50,6 +52,8 @@ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::DisplayFormatType; use datafusion_common::encryption::map_config_decryption_to_decryption; +#[cfg(feature = "parquet_encryption")] +use datafusion_execution::parquet_encryption::DynEncryptionFactory; use itertools::Itertools; use object_store::ObjectStore; @@ -281,6 +285,8 @@ pub struct ParquetSource { /// Optional hint for the size of the parquet metadata pub(crate) metadata_size_hint: Option, pub(crate) projected_statistics: Option, + #[cfg(feature = "parquet_encryption")] + pub(crate) encryption_factory: Option>, } impl ParquetSource { @@ -319,6 +325,16 @@ impl ParquetSource { conf } + /// Set the encryption factory to use to generate file decryption properties + #[cfg(feature = "parquet_encryption")] + pub fn with_encryption_factory( + mut self, + encryption_factory: Arc, + ) -> Self { + self.encryption_factory = Some(encryption_factory); + self + } + /// Options passed to the parquet reader for this scan pub fn table_parquet_options(&self) -> &TableParquetOptions { &self.table_parquet_options @@ -430,6 +446,19 @@ impl ParquetSource { Ok(file_source) } } + + #[cfg(feature = "parquet_encryption")] + fn get_encryption_factory_with_config( + &self, + ) -> Option<(Arc, EncryptionFactoryOptions)> { + match &self.encryption_factory { + None => None, + Some(factory) => Some(( + Arc::clone(factory), + self.table_parquet_options.crypto.factory_options.clone(), + )), + } + } } /// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit @@ -477,10 +506,13 @@ impl FileSource for ParquetSource { Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _ }); - let file_decryption_properties = map_config_decryption_to_decryption( - self.table_parquet_options().crypto.file_decryption.as_ref(), - ) - .map(Arc::new); + let file_decryption_properties = self + .table_parquet_options() + .crypto + .file_decryption + .as_ref() + .map(map_config_decryption_to_decryption) + .map(Arc::new); let coerce_int96 = self .table_parquet_options @@ -510,6 +542,8 @@ impl FileSource for ParquetSource { schema_adapter_factory, coerce_int96, file_decryption_properties, + #[cfg(feature = "parquet_encryption")] + encryption_factory: self.get_encryption_factory_with_config(), }) } diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index 5988d3a33660..a63434ef4910 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -37,6 +37,11 @@ workspace = true [lib] name = "datafusion_execution" +[features] +parquet_encryption = [ + "parquet/encryption", +] + [dependencies] arrow = { workspace = true } dashmap = { workspace = true } @@ -45,6 +50,7 @@ datafusion-expr = { workspace = true } futures = { workspace = true } log = { workspace = true } object_store = { workspace = true, features = ["fs"] } +parquet = { workspace = true, optional = true } parking_lot = { workspace = true } rand = { workspace = true } tempfile = { workspace = true } diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs index 6a0a4b6322ee..e971e838a6e5 100644 --- a/datafusion/execution/src/lib.rs +++ b/datafusion/execution/src/lib.rs @@ -31,6 +31,8 @@ pub mod config; pub mod disk_manager; pub mod memory_pool; pub mod object_store; +#[cfg(feature = "parquet_encryption")] +pub mod parquet_encryption; pub mod runtime_env; mod stream; mod task; diff --git a/datafusion/execution/src/parquet_encryption.rs b/datafusion/execution/src/parquet_encryption.rs new file mode 100644 index 000000000000..528c159cbcd7 --- /dev/null +++ b/datafusion/execution/src/parquet_encryption.rs @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::SchemaRef; +use dashmap::DashMap; +use datafusion_common::config::{EncryptionFactoryOptions, ExtensionOptions}; +use datafusion_common::error::Result; +use datafusion_common::DataFusionError; +use object_store::path::Path; +use parquet::encryption::decrypt::FileDecryptionProperties; +use parquet::encryption::encrypt::FileEncryptionProperties; +use std::sync::Arc; + +/// Trait for types that generate file encryption and decryption properties to +/// write and read encrypted Parquet files. +/// This allows flexibility in how encryption keys are managed, for example, to +/// integrate with a user's key management service (KMS). +pub trait EncryptionFactory: Send + Sync + std::fmt::Debug + 'static { + /// The type to hold configuration options for this factory + type Options: ExtensionOptions + Default; + + /// Generate file encryption properties to use when writing a Parquet file. + fn get_file_encryption_properties( + &self, + config: &Self::Options, + schema: &SchemaRef, + file_path: &Path, + ) -> Result>; + + /// Generate file decryption properties to use when reading a Parquet file. + fn get_file_decryption_properties( + &self, + config: &Self::Options, + file_path: &Path, + ) -> Result>; +} + +/// Dyn-compatible version of the [`EncryptionFactory`] trait +pub trait DynEncryptionFactory: Send + Sync + std::fmt::Debug + 'static { + /// Generate file encryption properties to use when writing a Parquet file. + fn get_file_encryption_properties( + &self, + config: &EncryptionFactoryOptions, + schema: &SchemaRef, + file_path: &Path, + ) -> Result>; + + /// Generate file decryption properties to use when reading a Parquet file. + fn get_file_decryption_properties( + &self, + config: &EncryptionFactoryOptions, + file_path: &Path, + ) -> Result>; +} + +impl DynEncryptionFactory for T { + fn get_file_encryption_properties( + &self, + config: &EncryptionFactoryOptions, + schema: &SchemaRef, + file_path: &Path, + ) -> Result> { + let mut options = T::Options::default(); + for (key, value) in &config.options { + options.set(key, value)?; + } + self.get_file_encryption_properties(&options, schema, file_path) + } + + fn get_file_decryption_properties( + &self, + config: &EncryptionFactoryOptions, + file_path: &Path, + ) -> Result> { + let mut options = T::Options::default(); + for (key, value) in &config.options { + options.set(key, value)?; + } + self.get_file_decryption_properties(&options, file_path) + } +} + +/// Stores [`EncryptionFactory`] implementations that can be retrieved by a unique string identifier +#[derive(Clone, Debug, Default)] +pub struct EncryptionFactoryRegistry { + factories: DashMap>, +} + +impl EncryptionFactoryRegistry { + /// Register a [`DynEncryptionFactory`] with an associated identifier that can be later + /// used to configure encryption when reading or writing Parquet. + /// If an encryption factory with the same identifier was already registered, it is replaced and returned. + pub fn register_factory( + &self, + id: &str, + factory: Arc, + ) -> Option> { + self.factories.insert(id.to_owned(), factory) + } + + /// Retrieve a [`DynEncryptionFactory`] by its identifier + pub fn get_factory(&self, id: &str) -> Result> { + self.factories + .get(id) + .map(|f| Arc::clone(f.value())) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "No Parquet encryption factory found for id '{id}'" + )) + }) + } +} diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index b086430a4ef7..520a9975d7f4 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -29,6 +29,8 @@ use crate::{ }; use crate::cache::cache_manager::{CacheManager, CacheManagerConfig}; +#[cfg(feature = "parquet_encryption")] +use crate::parquet_encryption::{DynEncryptionFactory, EncryptionFactoryRegistry}; use datafusion_common::{config::ConfigEntry, Result}; use object_store::ObjectStore; use std::path::PathBuf; @@ -78,6 +80,9 @@ pub struct RuntimeEnv { pub cache_manager: Arc, /// Object Store Registry pub object_store_registry: Arc, + /// Parquet encryption factory registry + #[cfg(feature = "parquet_encryption")] + pub parquet_encryption_factory_registry: Arc, } impl Debug for RuntimeEnv { @@ -154,6 +159,28 @@ impl RuntimeEnv { pub fn object_store(&self, url: impl AsRef) -> Result> { self.object_store_registry.get_store(url.as_ref()) } + + /// Register a [`DynEncryptionFactory`] with an associated identifier that can be later + /// used to configure encryption when reading or writing Parquet. + /// If an encryption factory with the same identifier was already registered, it is replaced and returned. + #[cfg(feature = "parquet_encryption")] + pub fn register_parquet_encryption_factory( + &self, + id: &str, + encryption_factory: Arc, + ) -> Option> { + self.parquet_encryption_factory_registry + .register_factory(id, encryption_factory) + } + + /// Retrieve a [`DynEncryptionFactory`] by its identifier + #[cfg(feature = "parquet_encryption")] + pub fn parquet_encryption_factory( + &self, + id: &str, + ) -> Result> { + self.parquet_encryption_factory_registry.get_factory(id) + } } impl Default for RuntimeEnv { @@ -185,6 +212,9 @@ pub struct RuntimeEnvBuilder { pub cache_manager: CacheManagerConfig, /// ObjectStoreRegistry to get object store based on url pub object_store_registry: Arc, + /// Parquet encryption factory registry + #[cfg(feature = "parquet_encryption")] + pub parquet_encryption_factory_registry: Arc, } impl Default for RuntimeEnvBuilder { @@ -202,6 +232,8 @@ impl RuntimeEnvBuilder { memory_pool: Default::default(), cache_manager: Default::default(), object_store_registry: Arc::new(DefaultObjectStoreRegistry::default()), + #[cfg(feature = "parquet_encryption")] + parquet_encryption_factory_registry: Default::default(), } } @@ -270,6 +302,8 @@ impl RuntimeEnvBuilder { memory_pool, cache_manager, object_store_registry, + #[cfg(feature = "parquet_encryption")] + parquet_encryption_factory_registry, } = self; let memory_pool = memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default())); @@ -284,6 +318,8 @@ impl RuntimeEnvBuilder { }, cache_manager: CacheManager::try_new(&cache_manager)?, object_store_registry, + #[cfg(feature = "parquet_encryption")] + parquet_encryption_factory_registry, }) } @@ -310,6 +346,10 @@ impl RuntimeEnvBuilder { memory_pool: Some(Arc::clone(&runtime_env.memory_pool)), cache_manager: cache_config, object_store_registry: Arc::clone(&runtime_env.object_store_registry), + #[cfg(feature = "parquet_encryption")] + parquet_encryption_factory_registry: Arc::clone( + &runtime_env.parquet_encryption_factory_registry, + ), } } From 430437ad1fd999d63d1bd9aa20bdf266f4b8dca7 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Tue, 15 Jul 2025 13:14:35 +1200 Subject: [PATCH 02/17] Remove extra DynEncryptionFactory trait --- .../examples/parquet_encrypted_with_kms.rs | 9 ++- datafusion/common/src/config.rs | 11 ++++ .../core/src/execution/context/parquet.rs | 8 +-- datafusion/core/tests/parquet/encryption.rs | 6 +- datafusion/datasource-parquet/src/opener.rs | 4 +- datafusion/datasource-parquet/src/source.rs | 8 +-- .../execution/src/parquet_encryption.rs | 65 +++---------------- datafusion/execution/src/runtime_env.rs | 12 ++-- 8 files changed, 44 insertions(+), 79 deletions(-) diff --git a/datafusion-examples/examples/parquet_encrypted_with_kms.rs b/datafusion-examples/examples/parquet_encrypted_with_kms.rs index 0d3413d65112..7c46f0d631a1 100644 --- a/datafusion-examples/examples/parquet_encrypted_with_kms.rs +++ b/datafusion-examples/examples/parquet_encrypted_with_kms.rs @@ -18,7 +18,7 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray}; use arrow_schema::SchemaRef; use datafusion::common::{extensions_options, DataFusionError}; -use datafusion::config::TableParquetOptions; +use datafusion::config::{EncryptionFactoryOptions, TableParquetOptions}; use datafusion::dataframe::DataFrameWriteOptions; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; @@ -224,8 +224,6 @@ impl std::fmt::Debug for KmsEncryptionFactory { /// `EncryptionFactory` is a trait defined by DataFusion that allows generating /// file encryption and decryption properties. impl EncryptionFactory for KmsEncryptionFactory { - type Options = KmsEncryptionConfig; - /// Generate file encryption properties to use when writing a Parquet file. /// The `FileSinkConfig` is provided so that the schema may be used to dynamically configure /// per-column encryption keys. @@ -234,10 +232,11 @@ impl EncryptionFactory for KmsEncryptionFactory { /// stored in a JSON file alongside Parquet files). fn get_file_encryption_properties( &self, - config: &KmsEncryptionConfig, + options: &EncryptionFactoryOptions, schema: &SchemaRef, _file_path: &Path, ) -> Result> { + let config: KmsEncryptionConfig = options.to_extension_options()?; if config.key_id.is_empty() { return Err(DataFusionError::Configuration( "Key id for encryption is not set".to_owned(), @@ -276,7 +275,7 @@ impl EncryptionFactory for KmsEncryptionFactory { /// The `file_path` needs to be known to support encryption factories that use external key material. fn get_file_decryption_properties( &self, - _config: &KmsEncryptionConfig, + _options: &EncryptionFactoryOptions, _file_path: &Path, ) -> Result> { let decryption_config = DecryptionConfiguration::builder().build(); diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index e598d44fe2a8..2ee40af1b4e6 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -2419,6 +2419,17 @@ impl ConfigField for EncryptionFactoryOptions { } } +impl EncryptionFactoryOptions { + /// Convert these encryption factory options to an [`ExtensionOptions`] instance. + pub fn to_extension_options(&self) -> Result { + let mut options = T::default(); + for (key, value) in &self.options { + options.set(key, value)?; + } + Ok(options) + } +} + config_namespace! { /// Options controlling CSV format pub struct CsvOptions { diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index 6ba708686a57..0a8fee8980a9 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -23,7 +23,7 @@ use datafusion_datasource_parquet::plan_to_parquet; use datafusion_common::TableReference; #[cfg(feature = "parquet_encryption")] -use datafusion_execution::parquet_encryption::DynEncryptionFactory; +use datafusion_execution::parquet_encryption::EncryptionFactory; use parquet::file::properties::WriterProperties; impl SessionContext { @@ -97,14 +97,14 @@ impl SessionContext { plan_to_parquet(self.task_ctx(), plan, path, writer_properties).await } - /// Registers a Parquet [`DynEncryptionFactory`] with an associated unique identifier. + /// Registers a Parquet [`EncryptionFactory`] with an associated unique identifier. /// If an encryption factory with the same identifier was already registered, it is replaced and returned. #[cfg(feature = "parquet_encryption")] pub fn register_parquet_encryption_factory( &self, id: &str, - encryption_factory: Arc, - ) -> Option> { + encryption_factory: Arc, + ) -> Option> { self.runtime_env() .register_parquet_encryption_factory(id, encryption_factory) } diff --git a/datafusion/core/tests/parquet/encryption.rs b/datafusion/core/tests/parquet/encryption.rs index b28f7cc1e241..73148a9a4e77 100644 --- a/datafusion/core/tests/parquet/encryption.rs +++ b/datafusion/core/tests/parquet/encryption.rs @@ -26,7 +26,7 @@ use datafusion::prelude::{ParquetReadOptions, SessionContext}; use datafusion_common::config::{EncryptionFactoryOptions, TableParquetOptions}; use datafusion_common::{assert_batches_sorted_eq, DataFusionError}; use datafusion_datasource_parquet::ParquetFormat; -use datafusion_execution::parquet_encryption::DynEncryptionFactory; +use datafusion_execution::parquet_encryption::EncryptionFactory; use parquet::arrow::ArrowWriter; use parquet::encryption::decrypt::FileDecryptionProperties; use parquet::encryption::encrypt::FileEncryptionProperties; @@ -135,7 +135,7 @@ async fn round_trip_parquet_with_encryption_factory() { let encryption_factory = Arc::new(MockEncryptionFactory::default()); ctx.register_parquet_encryption_factory( "test_encryption_factory", - Arc::clone(&encryption_factory) as Arc, + Arc::clone(&encryption_factory) as Arc, ); let tmpdir = TempDir::new().unwrap(); @@ -260,7 +260,7 @@ struct MockEncryptionFactory { pub counter: AtomicU8, } -impl DynEncryptionFactory for MockEncryptionFactory { +impl EncryptionFactory for MockEncryptionFactory { fn get_file_encryption_properties( &self, config: &EncryptionFactoryOptions, diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index f9f0a3feb416..b296ebfc9cb4 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -46,7 +46,7 @@ use datafusion_pruning::{build_pruning_predicate, FilePruner, PruningPredicate}; #[cfg(feature = "parquet_encryption")] use datafusion_common::config::EncryptionFactoryOptions; #[cfg(feature = "parquet_encryption")] -use datafusion_execution::parquet_encryption::DynEncryptionFactory; +use datafusion_execution::parquet_encryption::EncryptionFactory; use futures::{StreamExt, TryStreamExt}; use log::debug; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; @@ -100,7 +100,7 @@ pub(super) struct ParquetOpener { /// Optional factory to create file decryption properties dynamically #[cfg(feature = "parquet_encryption")] pub encryption_factory: - Option<(Arc, EncryptionFactoryOptions)>, + Option<(Arc, EncryptionFactoryOptions)>, } impl FileOpener for ParquetOpener { diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index c9f9114af694..20e8eb9abc92 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -53,7 +53,7 @@ use datafusion_physical_plan::DisplayFormatType; use datafusion_common::encryption::map_config_decryption_to_decryption; #[cfg(feature = "parquet_encryption")] -use datafusion_execution::parquet_encryption::DynEncryptionFactory; +use datafusion_execution::parquet_encryption::EncryptionFactory; use itertools::Itertools; use object_store::ObjectStore; @@ -286,7 +286,7 @@ pub struct ParquetSource { pub(crate) metadata_size_hint: Option, pub(crate) projected_statistics: Option, #[cfg(feature = "parquet_encryption")] - pub(crate) encryption_factory: Option>, + pub(crate) encryption_factory: Option>, } impl ParquetSource { @@ -329,7 +329,7 @@ impl ParquetSource { #[cfg(feature = "parquet_encryption")] pub fn with_encryption_factory( mut self, - encryption_factory: Arc, + encryption_factory: Arc, ) -> Self { self.encryption_factory = Some(encryption_factory); self @@ -450,7 +450,7 @@ impl ParquetSource { #[cfg(feature = "parquet_encryption")] fn get_encryption_factory_with_config( &self, - ) -> Option<(Arc, EncryptionFactoryOptions)> { + ) -> Option<(Arc, EncryptionFactoryOptions)> { match &self.encryption_factory { None => None, Some(factory) => Some(( diff --git a/datafusion/execution/src/parquet_encryption.rs b/datafusion/execution/src/parquet_encryption.rs index 528c159cbcd7..13a18390d02a 100644 --- a/datafusion/execution/src/parquet_encryption.rs +++ b/datafusion/execution/src/parquet_encryption.rs @@ -17,7 +17,7 @@ use arrow::datatypes::SchemaRef; use dashmap::DashMap; -use datafusion_common::config::{EncryptionFactoryOptions, ExtensionOptions}; +use datafusion_common::config::EncryptionFactoryOptions; use datafusion_common::error::Result; use datafusion_common::DataFusionError; use object_store::path::Path; @@ -29,28 +29,10 @@ use std::sync::Arc; /// write and read encrypted Parquet files. /// This allows flexibility in how encryption keys are managed, for example, to /// integrate with a user's key management service (KMS). +/// For example usage, see the [`parquet_encrypted_with_kms` example]. +/// +/// [`parquet_encrypted_with_kms` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/parquet_encrypted_with_kms.rs pub trait EncryptionFactory: Send + Sync + std::fmt::Debug + 'static { - /// The type to hold configuration options for this factory - type Options: ExtensionOptions + Default; - - /// Generate file encryption properties to use when writing a Parquet file. - fn get_file_encryption_properties( - &self, - config: &Self::Options, - schema: &SchemaRef, - file_path: &Path, - ) -> Result>; - - /// Generate file decryption properties to use when reading a Parquet file. - fn get_file_decryption_properties( - &self, - config: &Self::Options, - file_path: &Path, - ) -> Result>; -} - -/// Dyn-compatible version of the [`EncryptionFactory`] trait -pub trait DynEncryptionFactory: Send + Sync + std::fmt::Debug + 'static { /// Generate file encryption properties to use when writing a Parquet file. fn get_file_encryption_properties( &self, @@ -67,53 +49,26 @@ pub trait DynEncryptionFactory: Send + Sync + std::fmt::Debug + 'static { ) -> Result>; } -impl DynEncryptionFactory for T { - fn get_file_encryption_properties( - &self, - config: &EncryptionFactoryOptions, - schema: &SchemaRef, - file_path: &Path, - ) -> Result> { - let mut options = T::Options::default(); - for (key, value) in &config.options { - options.set(key, value)?; - } - self.get_file_encryption_properties(&options, schema, file_path) - } - - fn get_file_decryption_properties( - &self, - config: &EncryptionFactoryOptions, - file_path: &Path, - ) -> Result> { - let mut options = T::Options::default(); - for (key, value) in &config.options { - options.set(key, value)?; - } - self.get_file_decryption_properties(&options, file_path) - } -} - /// Stores [`EncryptionFactory`] implementations that can be retrieved by a unique string identifier #[derive(Clone, Debug, Default)] pub struct EncryptionFactoryRegistry { - factories: DashMap>, + factories: DashMap>, } impl EncryptionFactoryRegistry { - /// Register a [`DynEncryptionFactory`] with an associated identifier that can be later + /// Register an [`EncryptionFactory`] with an associated identifier that can be later /// used to configure encryption when reading or writing Parquet. /// If an encryption factory with the same identifier was already registered, it is replaced and returned. pub fn register_factory( &self, id: &str, - factory: Arc, - ) -> Option> { + factory: Arc, + ) -> Option> { self.factories.insert(id.to_owned(), factory) } - /// Retrieve a [`DynEncryptionFactory`] by its identifier - pub fn get_factory(&self, id: &str) -> Result> { + /// Retrieve an [`EncryptionFactory`] by its identifier + pub fn get_factory(&self, id: &str) -> Result> { self.factories .get(id) .map(|f| Arc::clone(f.value())) diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 520a9975d7f4..2a30fcc2a751 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -30,7 +30,7 @@ use crate::{ use crate::cache::cache_manager::{CacheManager, CacheManagerConfig}; #[cfg(feature = "parquet_encryption")] -use crate::parquet_encryption::{DynEncryptionFactory, EncryptionFactoryRegistry}; +use crate::parquet_encryption::{EncryptionFactory, EncryptionFactoryRegistry}; use datafusion_common::{config::ConfigEntry, Result}; use object_store::ObjectStore; use std::path::PathBuf; @@ -160,25 +160,25 @@ impl RuntimeEnv { self.object_store_registry.get_store(url.as_ref()) } - /// Register a [`DynEncryptionFactory`] with an associated identifier that can be later + /// Register an [`EncryptionFactory`] with an associated identifier that can be later /// used to configure encryption when reading or writing Parquet. /// If an encryption factory with the same identifier was already registered, it is replaced and returned. #[cfg(feature = "parquet_encryption")] pub fn register_parquet_encryption_factory( &self, id: &str, - encryption_factory: Arc, - ) -> Option> { + encryption_factory: Arc, + ) -> Option> { self.parquet_encryption_factory_registry .register_factory(id, encryption_factory) } - /// Retrieve a [`DynEncryptionFactory`] by its identifier + /// Retrieve an [`EncryptionFactory`] by its identifier #[cfg(feature = "parquet_encryption")] pub fn parquet_encryption_factory( &self, id: &str, - ) -> Result> { + ) -> Result> { self.parquet_encryption_factory_registry.get_factory(id) } } From 80eaaea90baefff8d9531f1e2f1425d14f639f93 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Tue, 15 Jul 2025 14:08:29 +1200 Subject: [PATCH 03/17] Tidy up example --- datafusion-examples/Cargo.toml | 2 +- .../examples/parquet_encrypted_with_kms.rs | 57 +++++++++---------- 2 files changed, 29 insertions(+), 30 deletions(-) diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 06b6ec5a8b72..20c5b849962e 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -61,7 +61,7 @@ async-trait = { workspace = true } bytes = { workspace = true } dashmap = { workspace = true } # note only use main datafusion crate for examples -datafusion = { workspace = true, default-features = true, features = ["parquet_encryption"] } +datafusion = { workspace = true, default-features = true } datafusion-ffi = { workspace = true } datafusion-proto = { workspace = true } env_logger = { workspace = true } diff --git a/datafusion-examples/examples/parquet_encrypted_with_kms.rs b/datafusion-examples/examples/parquet_encrypted_with_kms.rs index 7c46f0d631a1..6e372f377507 100644 --- a/datafusion-examples/examples/parquet_encrypted_with_kms.rs +++ b/datafusion-examples/examples/parquet_encrypted_with_kms.rs @@ -42,6 +42,12 @@ use tempfile::TempDir; /// This example demonstrates reading and writing Parquet files that /// are encrypted using Parquet Modular Encryption, and uses the /// parquet-key-management crate to integrate with a Key Management Server (KMS). +/// +/// Compared to the `parquet_encrypted` example, where AES keys +/// are specified directly, this example uses an `EncryptionFactory` so that +/// encryption keys can be dynamically generated per file, +/// and the encryption key metadata stored in files can be used to determine +/// the decryption keys when reading. const ENCRYPTION_FACTORY_ID: &'static str = "example.memory_kms_encryption"; @@ -73,24 +79,24 @@ async fn main() -> Result<()> { { // Write and read with the programmatic API let tmpdir = TempDir::new()?; - write_encrypted(&ctx, &tmpdir).await?; - let file_path = std::fs::read_dir(&tmpdir)?.next().unwrap()?.path(); - read_encrypted(&ctx, &file_path).await?; + let table_path = format!("{}/", tmpdir.path().to_str().unwrap()); + write_encrypted(&ctx, &table_path).await?; + read_encrypted(&ctx, &table_path).await?; } { // Write and read with the SQL API let tmpdir = TempDir::new()?; - write_encrypted_with_sql(&ctx, &tmpdir).await?; - let file_path = std::fs::read_dir(&tmpdir)?.next().unwrap()?.path(); - read_encrypted_with_sql(&ctx, &file_path).await?; + let table_path = format!("{}/", tmpdir.path().to_str().unwrap()); + write_encrypted_with_sql(&ctx, &table_path).await?; + read_encrypted_with_sql(&ctx, &table_path).await?; } Ok(()) } /// Write an encrypted Parquet file -async fn write_encrypted(ctx: &SessionContext, tmpdir: &TempDir) -> Result<()> { +async fn write_encrypted(ctx: &SessionContext, table_path: &str) -> Result<()> { let df = ctx.table("test_data").await?; let mut parquet_options = TableParquetOptions::new(); @@ -106,22 +112,22 @@ async fn write_encrypted(ctx: &SessionContext, tmpdir: &TempDir) -> Result<()> { .configure_factory(ENCRYPTION_FACTORY_ID, &encryption_config); df.write_parquet( - tmpdir.path().to_str().unwrap(), + table_path, DataFrameWriteOptions::new(), Some(parquet_options), ) .await?; - println!("Encrypted Parquet written to {:?}", tmpdir.path()); + println!("Encrypted Parquet written to {table_path}"); Ok(()) } /// Read from an encrypted Parquet file -async fn read_encrypted(ctx: &SessionContext, file_path: &std::path::Path) -> Result<()> { +async fn read_encrypted(ctx: &SessionContext, table_path: &str) -> Result<()> { let mut parquet_options = TableParquetOptions::new(); // Specify the encryption factory to use for decrypting Parquet. // In this example, we don't require any additional configuration options when reading - // as key identifiers are stored in the key metadata. + // as master key identifiers are stored in the key metadata within Parquet files. parquet_options .crypto .configure_factory(ENCRYPTION_FACTORY_ID, &KmsEncryptionConfig::default()); @@ -129,8 +135,6 @@ async fn read_encrypted(ctx: &SessionContext, file_path: &std::path::Path) -> Re let file_format = ParquetFormat::default().with_options(parquet_options); let listing_options = ListingOptions::new(Arc::new(file_format)); - let table_path = format!("file://{}", file_path.to_str().unwrap()); - ctx.register_listing_table( "encrypted_parquet_table", &table_path, @@ -156,11 +160,10 @@ async fn read_encrypted(ctx: &SessionContext, file_path: &std::path::Path) -> Re } /// Write an encrypted Parquet file using only SQL syntax with string configuration -async fn write_encrypted_with_sql(ctx: &SessionContext, tmpdir: &TempDir) -> Result<()> { - let output_path = tmpdir.path().to_str().unwrap(); +async fn write_encrypted_with_sql(ctx: &SessionContext, table_path: &str) -> Result<()> { let query = format!( "COPY test_data \ - TO '{output_path}' \ + TO '{table_path}' \ STORED AS parquet OPTIONS (\ 'format.crypto.factory_id' '{ENCRYPTION_FACTORY_ID}', \ @@ -170,19 +173,15 @@ async fn write_encrypted_with_sql(ctx: &SessionContext, tmpdir: &TempDir) -> Res ); let _ = ctx.sql(&query).await?.collect().await?; - println!("Encrypted Parquet written to {:?}", tmpdir.path()); + println!("Encrypted Parquet written to {table_path}"); Ok(()) } /// Read from an encrypted Parquet file using only the SQL API and string based configuration -async fn read_encrypted_with_sql( - ctx: &SessionContext, - file_path: &std::path::Path, -) -> Result<()> { - let file_path = file_path.to_str().unwrap(); +async fn read_encrypted_with_sql(ctx: &SessionContext, table_path: &str) -> Result<()> { let ddl = format!( "CREATE EXTERNAL TABLE encrypted_parquet_table_2 \ - STORED AS PARQUET LOCATION '{file_path}' OPTIONS (\ + STORED AS PARQUET LOCATION '{table_path}' OPTIONS (\ 'format.crypto.factory_id' '{ENCRYPTION_FACTORY_ID}' \ )" ); @@ -221,15 +220,15 @@ impl std::fmt::Debug for KmsEncryptionFactory { } } -/// `EncryptionFactory` is a trait defined by DataFusion that allows generating +/// `EncryptionFactory` is DataFusion trait for types that generate /// file encryption and decryption properties. impl EncryptionFactory for KmsEncryptionFactory { /// Generate file encryption properties to use when writing a Parquet file. - /// The `FileSinkConfig` is provided so that the schema may be used to dynamically configure + /// The `schema` is provided so that it may be used to dynamically configure /// per-column encryption keys. - /// Because `FileSinkConfig` can represent multiple output files, we also provide a - /// single file path so that external key material may be used (where key metadata is - /// stored in a JSON file alongside Parquet files). + /// The file path is also provided, so that it may be used to set an + /// AAD prefix for the file, or to allow use of external key material + /// (where key metadata is stored in a JSON file alongside Parquet files). fn get_file_encryption_properties( &self, options: &EncryptionFactoryOptions, @@ -265,6 +264,7 @@ impl EncryptionFactory for KmsEncryptionFactory { // but this example just uses the default options. let kms_config = Arc::new(KmsConnectionConfig::default()); + // Use the `CryptoFactory` to generate file encryption properties Ok(Some(self.crypto_factory.file_encryption_properties( kms_config, &encryption_config, @@ -272,7 +272,6 @@ impl EncryptionFactory for KmsEncryptionFactory { } /// Generate file decryption properties to use when reading a Parquet file. - /// The `file_path` needs to be known to support encryption factories that use external key material. fn get_file_decryption_properties( &self, _options: &EncryptionFactoryOptions, From b42f21ddf7dd01991e576dc34472db010dc23012 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Tue, 15 Jul 2025 14:37:42 +1200 Subject: [PATCH 04/17] Tidy ups --- datafusion/common/src/config.rs | 2 +- datafusion/core/Cargo.toml | 2 +- datafusion/core/tests/parquet/encryption.rs | 1 - datafusion/datasource-parquet/src/file_format.rs | 3 ++- datafusion/datasource-parquet/src/opener.rs | 6 ++---- 5 files changed, 6 insertions(+), 8 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 2ee40af1b4e6..f6bde63ebd40 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -677,7 +677,7 @@ config_namespace! { /// Identifier for the encryption factory to use to create file encryption and decryption properties. /// Encryption factories can be registered in a session with - /// [`SessionConfig::register_parquet_encryption_factory`]. + /// [`SessionContext::register_parquet_encryption_factory`]. pub factory_id: Option, default = None /// Any encryption factory specific options diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 2d0af2fba296..c4455e271c84 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -68,7 +68,7 @@ encoding_expressions = ["datafusion-functions/encoding_expressions"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = ["datafusion-physical-plan/force_hash_collisions", "datafusion-common/force_hash_collisions"] math_expressions = ["datafusion-functions/math_expressions"] -parquet = ["datafusion-common/parquet", "datafusion-execution/parquet", "dep:parquet", "datafusion-datasource-parquet"] +parquet = ["datafusion-common/parquet", "dep:parquet", "datafusion-datasource-parquet"] parquet_encryption = [ "dep:parquet", "parquet/encryption", diff --git a/datafusion/core/tests/parquet/encryption.rs b/datafusion/core/tests/parquet/encryption.rs index 73148a9a4e77..389c92b59974 100644 --- a/datafusion/core/tests/parquet/encryption.rs +++ b/datafusion/core/tests/parquet/encryption.rs @@ -74,7 +74,6 @@ pub fn write_batches( Ok(num_rows) } -#[cfg(feature = "parquet_encryption")] #[tokio::test] async fn round_trip_encryption() { let ctx: SessionContext = SessionContext::new(); diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index ae60ebef4961..a7869be99187 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -541,7 +541,8 @@ impl ParquetFormat { _state: &dyn Session, ) -> Result { if let Some(encryption_factory_id) = &self.options.crypto.factory_id { - Err(DataFusionError::Configuration(format!("Parquet encryption factory id is set to '{encryption_factory_id}' but the parquet_encryption feature is disabled"))) + Err(DataFusionError::Configuration( + format!("Parquet encryption factory id is set to '{encryption_factory_id}' but the parquet_encryption feature is disabled"))) } else { Ok(source) } diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index b296ebfc9cb4..d1abde1d95ed 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -107,7 +107,7 @@ impl FileOpener for ParquetOpener { fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result { let file_range = file_meta.range.clone(); let extensions = file_meta.extensions.clone(); - let file_location = file_meta.location().clone(); + let file_location = file_meta.location(); let file_name = file_location.to_string(); let file_metrics = ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics); @@ -144,7 +144,7 @@ impl FileOpener for ParquetOpener { let mut enable_page_index = self.enable_page_index; let file_decryption_properties = - self.get_file_decryption_properties(&file_location)?; + self.get_file_decryption_properties(file_location)?; // For now, page index does not work with encrypted files. See: // https://github.com/apache/arrow-rs/issues/7629 @@ -418,8 +418,6 @@ impl ParquetOpener { &self, file_location: &object_store::path::Path, ) -> Result>> { - // Creating props is delayed until here so that the file url/path is available, - // and we can handle errors from the encryption factory. match &self.file_decryption_properties { Some(file_decryption_properties) => { Ok(Some(Arc::clone(file_decryption_properties))) From f081a1ad10da807d07f821807c98c04ff1769c7f Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Tue, 15 Jul 2025 15:07:18 +1200 Subject: [PATCH 05/17] Fix "unnecessary qualification" errors in example --- Cargo.lock | 1 - datafusion-examples/Cargo.toml | 1 - datafusion-examples/examples/parquet_encrypted_with_kms.rs | 5 +++-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 34ef755521b3..cc7a6f9cb96c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2187,7 +2187,6 @@ dependencies = [ "mimalloc", "nix", "object_store", - "parquet", "parquet-key-management", "prost", "tempfile", diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 20c5b849962e..cf3aaa68fb9b 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -69,7 +69,6 @@ futures = { workspace = true } log = { workspace = true } mimalloc = { version = "0.1", default-features = false } object_store = { workspace = true, features = ["aws", "http"] } -parquet = { workspace = true, features = ["encryption"] } parquet-key-management = { version = "0.3", features = ["_test_utils"] } prost = { workspace = true } tempfile = { workspace = true } diff --git a/datafusion-examples/examples/parquet_encrypted_with_kms.rs b/datafusion-examples/examples/parquet_encrypted_with_kms.rs index 6e372f377507..e2bd03abd08c 100644 --- a/datafusion-examples/examples/parquet_encrypted_with_kms.rs +++ b/datafusion-examples/examples/parquet_encrypted_with_kms.rs @@ -24,11 +24,12 @@ use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; use datafusion::error::Result; use datafusion::execution::parquet_encryption::EncryptionFactory; +use datafusion::parquet::encryption::{ + decrypt::FileDecryptionProperties, encrypt::FileEncryptionProperties, +}; use datafusion::prelude::SessionContext; use futures::StreamExt; use object_store::path::Path; -use parquet::encryption::decrypt::FileDecryptionProperties; -use parquet::encryption::encrypt::FileEncryptionProperties; use parquet_key_management::crypto_factory::{ CryptoFactory, DecryptionConfiguration, EncryptionConfiguration, }; From 681001c4729c665bccabda5c014ac8f3daa3278f Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Tue, 15 Jul 2025 15:25:25 +1200 Subject: [PATCH 06/17] Fix toml format with taplo --- datafusion-examples/Cargo.toml | 2 +- datafusion/execution/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index cf3aaa68fb9b..b51c08a5ba89 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -69,7 +69,7 @@ futures = { workspace = true } log = { workspace = true } mimalloc = { version = "0.1", default-features = false } object_store = { workspace = true, features = ["aws", "http"] } -parquet-key-management = { version = "0.3", features = ["_test_utils"] } +parquet-key-management = { version = "0.3", features = ["_test_utils"] } prost = { workspace = true } tempfile = { workspace = true } test-utils = { path = "../test-utils" } diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index a63434ef4910..f6d02615e39a 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -50,8 +50,8 @@ datafusion-expr = { workspace = true } futures = { workspace = true } log = { workspace = true } object_store = { workspace = true, features = ["fs"] } -parquet = { workspace = true, optional = true } parking_lot = { workspace = true } +parquet = { workspace = true, optional = true } rand = { workspace = true } tempfile = { workspace = true } url = { workspace = true } From 99abeb2eb17032fdc636ec2bda6dc0c0bb706f6f Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Tue, 15 Jul 2025 15:32:52 +1200 Subject: [PATCH 07/17] Fix broken link in docs --- datafusion/common/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index f6bde63ebd40..926231ba4ae2 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -677,7 +677,7 @@ config_namespace! { /// Identifier for the encryption factory to use to create file encryption and decryption properties. /// Encryption factories can be registered in a session with - /// [`SessionContext::register_parquet_encryption_factory`]. + /// `SessionContext::register_parquet_encryption_factory`. pub factory_id: Option, default = None /// Any encryption factory specific options From 2f3201f86fe3dfa23c1f0502cb1af86c3ee07ddd Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Tue, 15 Jul 2025 15:38:03 +1200 Subject: [PATCH 08/17] Clippy fixes --- .../examples/parquet_encrypted_with_kms.rs | 12 ++++++------ datafusion/core/tests/parquet/encryption.rs | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/datafusion-examples/examples/parquet_encrypted_with_kms.rs b/datafusion-examples/examples/parquet_encrypted_with_kms.rs index e2bd03abd08c..32686d54d4c0 100644 --- a/datafusion-examples/examples/parquet_encrypted_with_kms.rs +++ b/datafusion-examples/examples/parquet_encrypted_with_kms.rs @@ -40,6 +40,8 @@ use std::fmt::Formatter; use std::sync::Arc; use tempfile::TempDir; +const ENCRYPTION_FACTORY_ID: &str = "example.memory_kms_encryption"; + /// This example demonstrates reading and writing Parquet files that /// are encrypted using Parquet Modular Encryption, and uses the /// parquet-key-management crate to integrate with a Key Management Server (KMS). @@ -49,9 +51,6 @@ use tempfile::TempDir; /// encryption keys can be dynamically generated per file, /// and the encryption key metadata stored in files can be used to determine /// the decryption keys when reading. - -const ENCRYPTION_FACTORY_ID: &'static str = "example.memory_kms_encryption"; - #[tokio::main] async fn main() -> Result<()> { let ctx = SessionContext::new(); @@ -105,9 +104,10 @@ async fn write_encrypted(ctx: &SessionContext, table_path: &str) -> Result<()> { // encryption factory to use and providing the factory specific configuration. // Our encryption factory requires specifying the master key identifier to // use for encryption, and we can optionally configure which columns are encrypted. - let mut encryption_config = KmsEncryptionConfig::default(); - encryption_config.key_id = "kf".to_owned(); - encryption_config.encrypted_columns = "b,c".to_owned(); + let encryption_config = KmsEncryptionConfig { + key_id: "kf".to_owned(), + encrypted_columns: "b,c".to_owned(), + }; parquet_options .crypto .configure_factory(ENCRYPTION_FACTORY_ID, &encryption_config); diff --git a/datafusion/core/tests/parquet/encryption.rs b/datafusion/core/tests/parquet/encryption.rs index 389c92b59974..27c5ad992122 100644 --- a/datafusion/core/tests/parquet/encryption.rs +++ b/datafusion/core/tests/parquet/encryption.rs @@ -236,7 +236,7 @@ async fn round_trip_parquet_with_encryption_factory() { .await .unwrap(); - let expected = vec![ + let expected = [ "+-----+----+--------+", "| x1 | x2 | string |", "+-----+----+--------+", @@ -289,7 +289,7 @@ impl EncryptionFactory for MockEncryptionFactory { ); let keys = self.encryption_keys.lock().unwrap(); let key = keys.get(file_path).ok_or_else(|| { - DataFusionError::Execution(format!("No key for file {:?}", file_path)) + DataFusionError::Execution(format!("No key for file {file_path:?}")) })?; let decryption_properties = FileDecryptionProperties::builder(key.clone()).build()?; From 0498d8b3723eb4eba4c0aebcbd7b89fd724998ed Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Wed, 16 Jul 2025 12:15:50 +1200 Subject: [PATCH 09/17] Update examples README --- datafusion-examples/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index ef5687c8677f..00d0e1dbadee 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -67,6 +67,7 @@ cargo run --example dataframe - [`optimizer_rule.rs`](examples/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates - [`parquet_embedded_index.rs`](examples/parquet_embedded_index.rs): Store a custom index inside a Parquet file and use it to speed up queries - [`parquet_encrypted.rs`](examples/parquet_encrypted.rs): Read and write encrypted Parquet files using DataFusion +- [`parquet_encrypted_with_kms.rs`](examples/parquet_encrypted_with_kms.rs): Read and write encrypted Parquet files using envelope encryption with a key management server - [`parquet_index.rs`](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries - [`parquet_exec_visitor.rs`](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution - [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`. From b491e86d2c9dafaed72154a3516b7560b81d38aa Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Fri, 18 Jul 2025 13:25:49 +1200 Subject: [PATCH 10/17] Add extra validation of table encryption --- datafusion/core/tests/parquet/encryption.rs | 56 +++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/datafusion/core/tests/parquet/encryption.rs b/datafusion/core/tests/parquet/encryption.rs index 27c5ad992122..5a9f0a791812 100644 --- a/datafusion/core/tests/parquet/encryption.rs +++ b/datafusion/core/tests/parquet/encryption.rs @@ -27,9 +27,11 @@ use datafusion_common::config::{EncryptionFactoryOptions, TableParquetOptions}; use datafusion_common::{assert_batches_sorted_eq, DataFusionError}; use datafusion_datasource_parquet::ParquetFormat; use datafusion_execution::parquet_encryption::EncryptionFactory; +use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::ArrowWriter; use parquet::encryption::decrypt::FileDecryptionProperties; use parquet::encryption::encrypt::FileEncryptionProperties; +use parquet::file::column_crypto_metadata::ColumnCryptoMetaData; use parquet::file::properties::WriterProperties; use std::collections::HashMap; use std::fs::File; @@ -173,6 +175,8 @@ async fn round_trip_parquet_with_encryption_factory() { // Crypto factory should have generated one key per partition file assert_eq!(encryption_factory.encryption_keys.lock().unwrap().len(), 3); + verify_table_encrypted(tmpdir.path(), &encryption_factory).unwrap(); + // Registering table without decryption properties should fail let table_path = format!("file://{}/", tmpdir.path().to_str().unwrap()); let without_decryption_register = ctx @@ -251,6 +255,58 @@ async fn round_trip_parquet_with_encryption_factory() { assert_batches_sorted_eq!(expected, &table); } +fn verify_table_encrypted( + table_path: &Path, + encryption_factory: &Arc, +) -> datafusion_common::Result<()> { + let mut directories = vec![table_path.to_path_buf()]; + let mut files_visited = 0; + while let Some(directory) = directories.pop() { + for entry in std::fs::read_dir(&directory)? { + let path = entry?.path(); + if path.is_dir() { + directories.push(path); + } else { + verify_file_encrypted(&path, encryption_factory)?; + files_visited += 1; + } + } + } + assert!(files_visited > 0); + Ok(()) +} + +fn verify_file_encrypted( + file_path: &Path, + encryption_factory: &Arc, +) -> datafusion_common::Result<()> { + let mut options = EncryptionFactoryOptions::default(); + options + .options + .insert("test_key".to_string(), "test value".to_string()); + let object_path = object_store::path::Path::from(file_path.to_str().unwrap()); + let decryption_properties = encryption_factory + .get_file_decryption_properties(&options, &object_path)? + .unwrap(); + + let reader_options = + ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties); + let file = File::open(&file_path)?; + let reader_metadata = ArrowReaderMetadata::load(&file, reader_options)?; + let metadata = reader_metadata.metadata(); + assert!(metadata.num_row_groups() > 0); + for row_group in metadata.row_groups() { + assert!(row_group.num_columns() > 0); + for col in row_group.columns() { + assert!(matches!( + col.crypto_metadata(), + Some(ColumnCryptoMetaData::EncryptionWithFooterKey) + )); + } + } + Ok(()) +} + /// Encryption factory implementation for use in tests, /// which generates encryption keys in a sequence #[derive(Debug, Default)] From 5755dee71d430b5575dccf020cbaf8e2d4dcb8de Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Fri, 18 Jul 2025 13:36:42 +1200 Subject: [PATCH 11/17] clippy fix --- datafusion/core/tests/parquet/encryption.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/parquet/encryption.rs b/datafusion/core/tests/parquet/encryption.rs index 5a9f0a791812..4ea8bc4b6424 100644 --- a/datafusion/core/tests/parquet/encryption.rs +++ b/datafusion/core/tests/parquet/encryption.rs @@ -291,7 +291,7 @@ fn verify_file_encrypted( let reader_options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties); - let file = File::open(&file_path)?; + let file = File::open(file_path)?; let reader_metadata = ArrowReaderMetadata::load(&file, reader_options)?; let metadata = reader_metadata.metadata(); assert!(metadata.num_row_groups() > 0); From 11ed64a92b9283b519f6759a303c38921bc0be0c Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Mon, 4 Aug 2025 12:28:03 +1200 Subject: [PATCH 12/17] Remove register_parquet_encryption_factory from SessionContext --- .../examples/parquet_encrypted_with_kms.rs | 2 +- datafusion/core/src/execution/context/parquet.rs | 14 -------------- datafusion/core/tests/parquet/encryption.rs | 2 +- 3 files changed, 2 insertions(+), 16 deletions(-) diff --git a/datafusion-examples/examples/parquet_encrypted_with_kms.rs b/datafusion-examples/examples/parquet_encrypted_with_kms.rs index 32686d54d4c0..723e2a5e881e 100644 --- a/datafusion-examples/examples/parquet_encrypted_with_kms.rs +++ b/datafusion-examples/examples/parquet_encrypted_with_kms.rs @@ -64,7 +64,7 @@ async fn main() -> Result<()> { // multiple different factories to handle different ways of encrypting Parquet. let crypto_factory = CryptoFactory::new(TestKmsClientFactory::with_default_keys()); let encryption_factory = KmsEncryptionFactory { crypto_factory }; - ctx.register_parquet_encryption_factory( + ctx.runtime_env().register_parquet_encryption_factory( ENCRYPTION_FACTORY_ID, Arc::new(encryption_factory), ); diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index 0a8fee8980a9..731f7e59ecfa 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -22,8 +22,6 @@ use super::{DataFilePaths, DataFrame, ExecutionPlan, Result, SessionContext}; use datafusion_datasource_parquet::plan_to_parquet; use datafusion_common::TableReference; -#[cfg(feature = "parquet_encryption")] -use datafusion_execution::parquet_encryption::EncryptionFactory; use parquet::file::properties::WriterProperties; impl SessionContext { @@ -96,18 +94,6 @@ impl SessionContext { ) -> Result<()> { plan_to_parquet(self.task_ctx(), plan, path, writer_properties).await } - - /// Registers a Parquet [`EncryptionFactory`] with an associated unique identifier. - /// If an encryption factory with the same identifier was already registered, it is replaced and returned. - #[cfg(feature = "parquet_encryption")] - pub fn register_parquet_encryption_factory( - &self, - id: &str, - encryption_factory: Arc, - ) -> Option> { - self.runtime_env() - .register_parquet_encryption_factory(id, encryption_factory) - } } #[cfg(test)] diff --git a/datafusion/core/tests/parquet/encryption.rs b/datafusion/core/tests/parquet/encryption.rs index 4ea8bc4b6424..a71a4f1ea24f 100644 --- a/datafusion/core/tests/parquet/encryption.rs +++ b/datafusion/core/tests/parquet/encryption.rs @@ -134,7 +134,7 @@ async fn round_trip_encryption() { async fn round_trip_parquet_with_encryption_factory() { let ctx = SessionContext::new(); let encryption_factory = Arc::new(MockEncryptionFactory::default()); - ctx.register_parquet_encryption_factory( + ctx.runtime_env().register_parquet_encryption_factory( "test_encryption_factory", Arc::clone(&encryption_factory) as Arc, ); From 8aafaf37d4ec9efad4b5e57eed42019d16332ebc Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Mon, 4 Aug 2025 14:33:13 +1200 Subject: [PATCH 13/17] Remove new dependency from example --- Cargo.lock | 16 +- datafusion-examples/Cargo.toml | 3 +- .../examples/parquet_encrypted_with_kms.rs | 177 ++++++++++-------- 3 files changed, 99 insertions(+), 97 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 791f72fff243..b4821525ebe3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2196,6 +2196,7 @@ dependencies = [ "arrow-flight", "arrow-schema", "async-trait", + "base64 0.22.1", "bytes", "dashmap", "datafusion", @@ -2207,8 +2208,8 @@ dependencies = [ "mimalloc", "nix", "object_store", - "parquet-key-management", "prost", + "rand 0.9.2", "serde_json", "tempfile", "test-utils", @@ -4559,19 +4560,6 @@ dependencies = [ "zstd", ] -[[package]] -name = "parquet-key-management" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea4e4d465462ac90306e7551f8d7d959f05a658c445bf55dd6b87578ad31a0d8" -dependencies = [ - "base64 0.22.1", - "parquet", - "ring", - "serde", - "serde_json", -] - [[package]] name = "parse-display" version = "0.9.1" diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index e06283ed23cb..0e903dba38c1 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -73,7 +73,6 @@ futures = { workspace = true } log = { workspace = true } mimalloc = { version = "0.1", default-features = false } object_store = { workspace = true, features = ["aws", "http"] } -parquet-key-management = { version = "0.3", features = ["_test_utils"] } prost = { workspace = true } serde_json = { workspace = true } tempfile = { workspace = true } @@ -84,6 +83,8 @@ tracing = { version = "0.1" } tracing-subscriber = { version = "0.3" } url = { workspace = true } uuid = "1.17" +base64 = "0.22.1" +rand = { workspace = true } [target.'cfg(not(target_os = "windows"))'.dev-dependencies] nix = { version = "0.30.1", features = ["fs"] } diff --git a/datafusion-examples/examples/parquet_encrypted_with_kms.rs b/datafusion-examples/examples/parquet_encrypted_with_kms.rs index 723e2a5e881e..d30608ce7a1c 100644 --- a/datafusion-examples/examples/parquet_encrypted_with_kms.rs +++ b/datafusion-examples/examples/parquet_encrypted_with_kms.rs @@ -17,53 +17,51 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray}; use arrow_schema::SchemaRef; -use datafusion::common::{extensions_options, DataFusionError}; +use base64::Engine; +use datafusion::common::extensions_options; use datafusion::config::{EncryptionFactoryOptions, TableParquetOptions}; use datafusion::dataframe::DataFrameWriteOptions; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; use datafusion::error::Result; use datafusion::execution::parquet_encryption::EncryptionFactory; +use datafusion::parquet::encryption::decrypt::KeyRetriever; use datafusion::parquet::encryption::{ decrypt::FileDecryptionProperties, encrypt::FileEncryptionProperties, }; use datafusion::prelude::SessionContext; use futures::StreamExt; use object_store::path::Path; -use parquet_key_management::crypto_factory::{ - CryptoFactory, DecryptionConfiguration, EncryptionConfiguration, -}; -use parquet_key_management::kms::KmsConnectionConfig; -use parquet_key_management::test_kms::TestKmsClientFactory; +use rand::rand_core::{OsRng, TryRngCore}; use std::collections::HashSet; -use std::fmt::Formatter; use std::sync::Arc; use tempfile::TempDir; -const ENCRYPTION_FACTORY_ID: &str = "example.memory_kms_encryption"; +const ENCRYPTION_FACTORY_ID: &str = "example.mock_kms_encryption"; /// This example demonstrates reading and writing Parquet files that -/// are encrypted using Parquet Modular Encryption, and uses the -/// parquet-key-management crate to integrate with a Key Management Server (KMS). +/// are encrypted using Parquet Modular Encryption. /// /// Compared to the `parquet_encrypted` example, where AES keys -/// are specified directly, this example uses an `EncryptionFactory` so that -/// encryption keys can be dynamically generated per file, -/// and the encryption key metadata stored in files can be used to determine -/// the decryption keys when reading. +/// are specified directly, this example implements an `EncryptionFactory` that +/// generates encryption keys dynamically per file. +/// Encryption key metadata is stored inline in the Parquet files and is used to determine +/// the decryption keys when reading the files. +/// +/// In this example, encryption keys are simply stored base64 encoded in the Parquet metadata, +/// which is not a secure way to store encryption keys. +/// For production use, it is recommended to use a key-management service (KMS) to encrypt +/// data encryption keys. #[tokio::main] async fn main() -> Result<()> { let ctx = SessionContext::new(); // Register an `EncryptionFactory` implementation to be used for Parquet encryption - // in the session context. - // This example uses an in-memory test KMS from the `parquet_key_management` crate with - // a custom `KmsEncryptionFactory` wrapper type to integrate with DataFusion. + // in the runtime environment. // `EncryptionFactory` instances are registered with a name to identify them so // they can be later referenced in configuration options, and it's possible to register // multiple different factories to handle different ways of encrypting Parquet. - let crypto_factory = CryptoFactory::new(TestKmsClientFactory::with_default_keys()); - let encryption_factory = KmsEncryptionFactory { crypto_factory }; + let encryption_factory = TestEncryptionFactory::default(); ctx.runtime_env().register_parquet_encryption_factory( ENCRYPTION_FACTORY_ID, Arc::new(encryption_factory), @@ -77,7 +75,7 @@ async fn main() -> Result<()> { ctx.register_batch("test_data", batch)?; { - // Write and read with the programmatic API + // Write and read encrypted Parquet with the programmatic API let tmpdir = TempDir::new()?; let table_path = format!("{}/", tmpdir.path().to_str().unwrap()); write_encrypted(&ctx, &table_path).await?; @@ -85,7 +83,7 @@ async fn main() -> Result<()> { } { - // Write and read with the SQL API + // Write and read encrypted Parquet with the SQL API let tmpdir = TempDir::new()?; let table_path = format!("{}/", tmpdir.path().to_str().unwrap()); write_encrypted_with_sql(&ctx, &table_path).await?; @@ -101,11 +99,9 @@ async fn write_encrypted(ctx: &SessionContext, table_path: &str) -> Result<()> { let mut parquet_options = TableParquetOptions::new(); // We specify that we want to use Parquet encryption by setting the identifier of the - // encryption factory to use and providing the factory specific configuration. - // Our encryption factory requires specifying the master key identifier to - // use for encryption, and we can optionally configure which columns are encrypted. - let encryption_config = KmsEncryptionConfig { - key_id: "kf".to_owned(), + // encryption factory to use and providing the factory-specific configuration. + // Our encryption factory only requires specifying the columns to encrypt. + let encryption_config = EncryptionConfig { encrypted_columns: "b,c".to_owned(), }; parquet_options @@ -128,10 +124,10 @@ async fn read_encrypted(ctx: &SessionContext, table_path: &str) -> Result<()> { let mut parquet_options = TableParquetOptions::new(); // Specify the encryption factory to use for decrypting Parquet. // In this example, we don't require any additional configuration options when reading - // as master key identifiers are stored in the key metadata within Parquet files. + // as we only need the key metadata from the Parquet files to determine the decryption keys. parquet_options .crypto - .configure_factory(ENCRYPTION_FACTORY_ID, &KmsEncryptionConfig::default()); + .configure_factory(ENCRYPTION_FACTORY_ID, &EncryptionConfig::default()); let file_format = ParquetFormat::default().with_options(parquet_options); let listing_options = ListingOptions::new(Arc::new(file_format)); @@ -168,7 +164,6 @@ async fn write_encrypted_with_sql(ctx: &SessionContext, table_path: &str) -> Res STORED AS parquet OPTIONS (\ 'format.crypto.factory_id' '{ENCRYPTION_FACTORY_ID}', \ - 'format.crypto.factory_options.key_id' 'kf', \ 'format.crypto.factory_options.encrypted_columns' 'b,c' \ )" ); @@ -178,7 +173,7 @@ async fn write_encrypted_with_sql(ctx: &SessionContext, table_path: &str) -> Res Ok(()) } -/// Read from an encrypted Parquet file using only the SQL API and string based configuration +/// Read from an encrypted Parquet file using only the SQL API and string-based configuration async fn read_encrypted_with_sql(ctx: &SessionContext, table_path: &str) -> Result<()> { let ddl = format!( "CREATE EXTERNAL TABLE encrypted_parquet_table_2 \ @@ -201,33 +196,27 @@ async fn read_encrypted_with_sql(ctx: &SessionContext, table_path: &str) -> Resu // Options used to configure our example encryption factory extensions_options! { - struct KmsEncryptionConfig { - /// Identifier of the encryption key to use - pub key_id: String, default = "".to_owned() - /// Comma separated list of columns to encrypt + struct EncryptionConfig { + /// Comma-separated list of columns to encrypt pub encrypted_columns: String, default = "".to_owned() } } -/// Wrapper type around `CryptoFactory` to allow implementing the `EncryptionFactory` trait -struct KmsEncryptionFactory { - crypto_factory: CryptoFactory, -} +/// Mock implementation of an `EncryptionFactory` that stores encryption keys +/// base64 encoded in the Parquet encryption metadata. +/// For production use, integrating with a key-management service to encrypt +/// data encryption keys is recommended. +#[derive(Default, Debug)] +struct TestEncryptionFactory {} -impl std::fmt::Debug for KmsEncryptionFactory { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("KmsEncryptionFactory") - .finish_non_exhaustive() - } -} - -/// `EncryptionFactory` is DataFusion trait for types that generate +/// `EncryptionFactory` is a DataFusion trait for types that generate /// file encryption and decryption properties. -impl EncryptionFactory for KmsEncryptionFactory { +impl EncryptionFactory for TestEncryptionFactory { /// Generate file encryption properties to use when writing a Parquet file. /// The `schema` is provided so that it may be used to dynamically configure /// per-column encryption keys. - /// The file path is also provided, so that it may be used to set an + /// The file path is also available. We don't use the path in this example, + /// but other implementations may want to use this to compute an /// AAD prefix for the file, or to allow use of external key material /// (where key metadata is stored in a JSON file alongside Parquet files). fn get_file_encryption_properties( @@ -236,53 +225,77 @@ impl EncryptionFactory for KmsEncryptionFactory { schema: &SchemaRef, _file_path: &Path, ) -> Result> { - let config: KmsEncryptionConfig = options.to_extension_options()?; - if config.key_id.is_empty() { - return Err(DataFusionError::Configuration( - "Key id for encryption is not set".to_owned(), - )); - }; - // Configure encryption key to use - let mut encryption_config_builder = - EncryptionConfiguration::builder(config.key_id.clone()); - - // Set up per-column encryption. + let config: EncryptionConfig = options.to_extension_options()?; + + // Generate a random encryption key for this file. + let mut key = vec![0u8; 16]; + OsRng.try_fill_bytes(&mut key).unwrap(); + + // Generate the key metadata that allows retrieving the key when reading the file. + let key_metadata = wrap_key(&key); + + let mut builder = FileEncryptionProperties::builder(key.to_vec()) + .with_footer_key_metadata(key_metadata.clone()); + let encrypted_columns: HashSet<&str> = config.encrypted_columns.split(",").collect(); if !encrypted_columns.is_empty() { - let encrypted_columns: Vec = schema - .fields - .iter() - .filter(|f| encrypted_columns.contains(f.name().as_str())) - .map(|f| f.name().clone()) - .collect(); - encryption_config_builder = encryption_config_builder - .add_column_key(config.key_id.clone(), encrypted_columns); + // Set up per-column encryption. + for field in schema.fields().iter() { + if encrypted_columns.contains(field.name().as_str()) { + // Here we re-use the same key for all encrypted columns, + // but new keys could also be generated per column. + builder = builder.with_column_key_and_metadata( + field.name().as_str(), + key.clone(), + key_metadata.clone(), + ); + } + } } - let encryption_config = encryption_config_builder.build()?; - // The KMS connection could be configured from the options if needed, - // but this example just uses the default options. - let kms_config = Arc::new(KmsConnectionConfig::default()); + let encryption_properties = builder.build()?; - // Use the `CryptoFactory` to generate file encryption properties - Ok(Some(self.crypto_factory.file_encryption_properties( - kms_config, - &encryption_config, - )?)) + Ok(Some(encryption_properties)) } /// Generate file decryption properties to use when reading a Parquet file. + /// Rather than provide the AES keys directly for decryption, we set a `KeyRetriever` + /// that can determine the keys using the encryption metadata. fn get_file_decryption_properties( &self, _options: &EncryptionFactoryOptions, _file_path: &Path, ) -> Result> { - let decryption_config = DecryptionConfiguration::builder().build(); - let kms_config = Arc::new(KmsConnectionConfig::default()); - Ok(Some(self.crypto_factory.file_decryption_properties( - kms_config, - decryption_config, - )?)) + let decryption_properties = + FileDecryptionProperties::with_key_retriever(Arc::new(TestKeyRetriever {})) + .build()?; + Ok(Some(decryption_properties)) + } +} + +/// Mock implementation of encrypting a key that simply base64 encodes the key. +/// Note that this is not a secure way to store encryption keys, +/// and for production use keys should be encrypted with a KMS. +fn wrap_key(key: &[u8]) -> Vec { + base64::prelude::BASE64_STANDARD + .encode(key) + .as_bytes() + .to_vec() +} + +struct TestKeyRetriever {} + +impl KeyRetriever for TestKeyRetriever { + /// Get a data encryption key using the metadata stored in the Parquet file. + fn retrieve_key( + &self, + key_metadata: &[u8], + ) -> datafusion::parquet::errors::Result> { + let key_metadata = std::str::from_utf8(key_metadata)?; + let key = base64::prelude::BASE64_STANDARD + .decode(key_metadata) + .unwrap(); + Ok(key) } } From 7c806ee675b52593f447a7b21455abef0ca29224 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Mon, 4 Aug 2025 14:47:11 +1200 Subject: [PATCH 14/17] Update examples readme --- datafusion-examples/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 131db8273921..75a53bc568eb 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -68,7 +68,7 @@ cargo run --example dataframe - [`optimizer_rule.rs`](examples/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates - [`parquet_embedded_index.rs`](examples/parquet_embedded_index.rs): Store a custom index inside a Parquet file and use it to speed up queries - [`parquet_encrypted.rs`](examples/parquet_encrypted.rs): Read and write encrypted Parquet files using DataFusion -- [`parquet_encrypted_with_kms.rs`](examples/parquet_encrypted_with_kms.rs): Read and write encrypted Parquet files using envelope encryption with a key management server +- [`parquet_encrypted_with_kms.rs`](examples/parquet_encrypted_with_kms.rs): Read and write encrypted Parquet files using an encryption factory - [`parquet_index.rs`](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries - [`parquet_exec_visitor.rs`](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution - [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`. From a2f841dc64f8d282652982bee2946cf879d1a7ec Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Mon, 4 Aug 2025 15:01:32 +1200 Subject: [PATCH 15/17] Run taplo format --- datafusion-examples/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 0e903dba38c1..2f82c5ff6919 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -65,6 +65,7 @@ async-trait = { workspace = true } bytes = { workspace = true } dashmap = { workspace = true } # note only use main datafusion crate for examples +base64 = "0.22.1" datafusion = { workspace = true, default-features = true } datafusion-ffi = { workspace = true } datafusion-proto = { workspace = true } @@ -74,6 +75,7 @@ log = { workspace = true } mimalloc = { version = "0.1", default-features = false } object_store = { workspace = true, features = ["aws", "http"] } prost = { workspace = true } +rand = { workspace = true } serde_json = { workspace = true } tempfile = { workspace = true } test-utils = { path = "../test-utils" } @@ -83,8 +85,6 @@ tracing = { version = "0.1" } tracing-subscriber = { version = "0.3" } url = { workspace = true } uuid = "1.17" -base64 = "0.22.1" -rand = { workspace = true } [target.'cfg(not(target_os = "windows"))'.dev-dependencies] nix = { version = "0.30.1", features = ["fs"] } From 47c4a3a3c8e4aea90a575e02113339174ed009a9 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Thu, 7 Aug 2025 11:33:50 +1200 Subject: [PATCH 16/17] Fix outdated method reference in comment --- datafusion/common/src/config.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 37e20b199b31..2f98f562bced 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -692,8 +692,8 @@ config_namespace! { pub file_encryption: Option, default = None /// Identifier for the encryption factory to use to create file encryption and decryption properties. - /// Encryption factories can be registered in a session with - /// `SessionContext::register_parquet_encryption_factory`. + /// Encryption factories can be registered in the runtime environment with + /// `RuntimeEnv::register_parquet_encryption_factory`. pub factory_id: Option, default = None /// Any encryption factory specific options From 9c1c4120cc5d347f2a2e688bf8c0c0818019e799 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Thu, 7 Aug 2025 11:50:59 +1200 Subject: [PATCH 17/17] Extra comment --- datafusion/common/src/file_options/parquet_writer.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index afc7c38adcfd..b529644658ae 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -89,6 +89,8 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { /// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`]. /// /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column. + /// Note that any encryption options are ignored as building the `FileEncryptionProperties` + /// might require other inputs besides the [`TableParquetOptions`]. fn try_from(table_parquet_options: &TableParquetOptions) -> Result { // Table options include kv_metadata and col-specific options let TableParquetOptions {