|
| 1 | +// Licensed to the Apache Software Foundation (ASF) under one |
| 2 | +// or more contributor license agreements. See the NOTICE file |
| 3 | +// distributed with this work for additional information |
| 4 | +// regarding copyright ownership. The ASF licenses this file |
| 5 | +// to you under the Apache License, Version 2.0 (the |
| 6 | +// "License"); you may not use this file except in compliance |
| 7 | +// with the License. You may obtain a copy of the License at |
| 8 | +// |
| 9 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +// |
| 11 | +// Unless required by applicable law or agreed to in writing, |
| 12 | +// software distributed under the License is distributed on an |
| 13 | +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | +// KIND, either express or implied. See the License for the |
| 15 | +// specific language governing permissions and limitations |
| 16 | +// under the License. |
| 17 | + |
| 18 | +use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray}; |
| 19 | +use arrow_schema::SchemaRef; |
| 20 | +use base64::Engine; |
| 21 | +use datafusion::common::extensions_options; |
| 22 | +use datafusion::config::{EncryptionFactoryOptions, TableParquetOptions}; |
| 23 | +use datafusion::dataframe::DataFrameWriteOptions; |
| 24 | +use datafusion::datasource::file_format::parquet::ParquetFormat; |
| 25 | +use datafusion::datasource::listing::ListingOptions; |
| 26 | +use datafusion::error::Result; |
| 27 | +use datafusion::execution::parquet_encryption::EncryptionFactory; |
| 28 | +use datafusion::parquet::encryption::decrypt::KeyRetriever; |
| 29 | +use datafusion::parquet::encryption::{ |
| 30 | + decrypt::FileDecryptionProperties, encrypt::FileEncryptionProperties, |
| 31 | +}; |
| 32 | +use datafusion::prelude::SessionContext; |
| 33 | +use futures::StreamExt; |
| 34 | +use object_store::path::Path; |
| 35 | +use rand::rand_core::{OsRng, TryRngCore}; |
| 36 | +use std::collections::HashSet; |
| 37 | +use std::sync::Arc; |
| 38 | +use tempfile::TempDir; |
| 39 | + |
| 40 | +const ENCRYPTION_FACTORY_ID: &str = "example.mock_kms_encryption"; |
| 41 | + |
| 42 | +/// This example demonstrates reading and writing Parquet files that |
| 43 | +/// are encrypted using Parquet Modular Encryption. |
| 44 | +/// |
| 45 | +/// Compared to the `parquet_encrypted` example, where AES keys |
| 46 | +/// are specified directly, this example implements an `EncryptionFactory` that |
| 47 | +/// generates encryption keys dynamically per file. |
| 48 | +/// Encryption key metadata is stored inline in the Parquet files and is used to determine |
| 49 | +/// the decryption keys when reading the files. |
| 50 | +/// |
| 51 | +/// In this example, encryption keys are simply stored base64 encoded in the Parquet metadata, |
| 52 | +/// which is not a secure way to store encryption keys. |
| 53 | +/// For production use, it is recommended to use a key-management service (KMS) to encrypt |
| 54 | +/// data encryption keys. |
| 55 | +#[tokio::main] |
| 56 | +async fn main() -> Result<()> { |
| 57 | + let ctx = SessionContext::new(); |
| 58 | + |
| 59 | + // Register an `EncryptionFactory` implementation to be used for Parquet encryption |
| 60 | + // in the runtime environment. |
| 61 | + // `EncryptionFactory` instances are registered with a name to identify them so |
| 62 | + // they can be later referenced in configuration options, and it's possible to register |
| 63 | + // multiple different factories to handle different ways of encrypting Parquet. |
| 64 | + let encryption_factory = TestEncryptionFactory::default(); |
| 65 | + ctx.runtime_env().register_parquet_encryption_factory( |
| 66 | + ENCRYPTION_FACTORY_ID, |
| 67 | + Arc::new(encryption_factory), |
| 68 | + ); |
| 69 | + |
| 70 | + // Register some simple test data |
| 71 | + let a: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])); |
| 72 | + let b: ArrayRef = Arc::new(Int32Array::from(vec![1, 10, 10, 100])); |
| 73 | + let c: ArrayRef = Arc::new(Int32Array::from(vec![2, 20, 20, 200])); |
| 74 | + let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)])?; |
| 75 | + ctx.register_batch("test_data", batch)?; |
| 76 | + |
| 77 | + { |
| 78 | + // Write and read encrypted Parquet with the programmatic API |
| 79 | + let tmpdir = TempDir::new()?; |
| 80 | + let table_path = format!("{}/", tmpdir.path().to_str().unwrap()); |
| 81 | + write_encrypted(&ctx, &table_path).await?; |
| 82 | + read_encrypted(&ctx, &table_path).await?; |
| 83 | + } |
| 84 | + |
| 85 | + { |
| 86 | + // Write and read encrypted Parquet with the SQL API |
| 87 | + let tmpdir = TempDir::new()?; |
| 88 | + let table_path = format!("{}/", tmpdir.path().to_str().unwrap()); |
| 89 | + write_encrypted_with_sql(&ctx, &table_path).await?; |
| 90 | + read_encrypted_with_sql(&ctx, &table_path).await?; |
| 91 | + } |
| 92 | + |
| 93 | + Ok(()) |
| 94 | +} |
| 95 | + |
| 96 | +/// Write an encrypted Parquet file |
| 97 | +async fn write_encrypted(ctx: &SessionContext, table_path: &str) -> Result<()> { |
| 98 | + let df = ctx.table("test_data").await?; |
| 99 | + |
| 100 | + let mut parquet_options = TableParquetOptions::new(); |
| 101 | + // We specify that we want to use Parquet encryption by setting the identifier of the |
| 102 | + // encryption factory to use and providing the factory-specific configuration. |
| 103 | + // Our encryption factory only requires specifying the columns to encrypt. |
| 104 | + let encryption_config = EncryptionConfig { |
| 105 | + encrypted_columns: "b,c".to_owned(), |
| 106 | + }; |
| 107 | + parquet_options |
| 108 | + .crypto |
| 109 | + .configure_factory(ENCRYPTION_FACTORY_ID, &encryption_config); |
| 110 | + |
| 111 | + df.write_parquet( |
| 112 | + table_path, |
| 113 | + DataFrameWriteOptions::new(), |
| 114 | + Some(parquet_options), |
| 115 | + ) |
| 116 | + .await?; |
| 117 | + |
| 118 | + println!("Encrypted Parquet written to {table_path}"); |
| 119 | + Ok(()) |
| 120 | +} |
| 121 | + |
| 122 | +/// Read from an encrypted Parquet file |
| 123 | +async fn read_encrypted(ctx: &SessionContext, table_path: &str) -> Result<()> { |
| 124 | + let mut parquet_options = TableParquetOptions::new(); |
| 125 | + // Specify the encryption factory to use for decrypting Parquet. |
| 126 | + // In this example, we don't require any additional configuration options when reading |
| 127 | + // as we only need the key metadata from the Parquet files to determine the decryption keys. |
| 128 | + parquet_options |
| 129 | + .crypto |
| 130 | + .configure_factory(ENCRYPTION_FACTORY_ID, &EncryptionConfig::default()); |
| 131 | + |
| 132 | + let file_format = ParquetFormat::default().with_options(parquet_options); |
| 133 | + let listing_options = ListingOptions::new(Arc::new(file_format)); |
| 134 | + |
| 135 | + ctx.register_listing_table( |
| 136 | + "encrypted_parquet_table", |
| 137 | + &table_path, |
| 138 | + listing_options.clone(), |
| 139 | + None, |
| 140 | + None, |
| 141 | + ) |
| 142 | + .await?; |
| 143 | + |
| 144 | + let mut batch_stream = ctx |
| 145 | + .table("encrypted_parquet_table") |
| 146 | + .await? |
| 147 | + .execute_stream() |
| 148 | + .await?; |
| 149 | + println!("Reading encrypted Parquet as a RecordBatch stream"); |
| 150 | + while let Some(batch) = batch_stream.next().await { |
| 151 | + let batch = batch?; |
| 152 | + println!("Read batch with {} rows", batch.num_rows()); |
| 153 | + } |
| 154 | + |
| 155 | + println!("Finished reading"); |
| 156 | + Ok(()) |
| 157 | +} |
| 158 | + |
| 159 | +/// Write an encrypted Parquet file using only SQL syntax with string configuration |
| 160 | +async fn write_encrypted_with_sql(ctx: &SessionContext, table_path: &str) -> Result<()> { |
| 161 | + let query = format!( |
| 162 | + "COPY test_data \ |
| 163 | + TO '{table_path}' \ |
| 164 | + STORED AS parquet |
| 165 | + OPTIONS (\ |
| 166 | + 'format.crypto.factory_id' '{ENCRYPTION_FACTORY_ID}', \ |
| 167 | + 'format.crypto.factory_options.encrypted_columns' 'b,c' \ |
| 168 | + )" |
| 169 | + ); |
| 170 | + let _ = ctx.sql(&query).await?.collect().await?; |
| 171 | + |
| 172 | + println!("Encrypted Parquet written to {table_path}"); |
| 173 | + Ok(()) |
| 174 | +} |
| 175 | + |
| 176 | +/// Read from an encrypted Parquet file using only the SQL API and string-based configuration |
| 177 | +async fn read_encrypted_with_sql(ctx: &SessionContext, table_path: &str) -> Result<()> { |
| 178 | + let ddl = format!( |
| 179 | + "CREATE EXTERNAL TABLE encrypted_parquet_table_2 \ |
| 180 | + STORED AS PARQUET LOCATION '{table_path}' OPTIONS (\ |
| 181 | + 'format.crypto.factory_id' '{ENCRYPTION_FACTORY_ID}' \ |
| 182 | + )" |
| 183 | + ); |
| 184 | + ctx.sql(&ddl).await?; |
| 185 | + let df = ctx.sql("SELECT * FROM encrypted_parquet_table_2").await?; |
| 186 | + let mut batch_stream = df.execute_stream().await?; |
| 187 | + |
| 188 | + println!("Reading encrypted Parquet as a RecordBatch stream"); |
| 189 | + while let Some(batch) = batch_stream.next().await { |
| 190 | + let batch = batch?; |
| 191 | + println!("Read batch with {} rows", batch.num_rows()); |
| 192 | + } |
| 193 | + println!("Finished reading"); |
| 194 | + Ok(()) |
| 195 | +} |
| 196 | + |
| 197 | +// Options used to configure our example encryption factory |
| 198 | +extensions_options! { |
| 199 | + struct EncryptionConfig { |
| 200 | + /// Comma-separated list of columns to encrypt |
| 201 | + pub encrypted_columns: String, default = "".to_owned() |
| 202 | + } |
| 203 | +} |
| 204 | + |
| 205 | +/// Mock implementation of an `EncryptionFactory` that stores encryption keys |
| 206 | +/// base64 encoded in the Parquet encryption metadata. |
| 207 | +/// For production use, integrating with a key-management service to encrypt |
| 208 | +/// data encryption keys is recommended. |
| 209 | +#[derive(Default, Debug)] |
| 210 | +struct TestEncryptionFactory {} |
| 211 | + |
| 212 | +/// `EncryptionFactory` is a DataFusion trait for types that generate |
| 213 | +/// file encryption and decryption properties. |
| 214 | +impl EncryptionFactory for TestEncryptionFactory { |
| 215 | + /// Generate file encryption properties to use when writing a Parquet file. |
| 216 | + /// The `schema` is provided so that it may be used to dynamically configure |
| 217 | + /// per-column encryption keys. |
| 218 | + /// The file path is also available. We don't use the path in this example, |
| 219 | + /// but other implementations may want to use this to compute an |
| 220 | + /// AAD prefix for the file, or to allow use of external key material |
| 221 | + /// (where key metadata is stored in a JSON file alongside Parquet files). |
| 222 | + fn get_file_encryption_properties( |
| 223 | + &self, |
| 224 | + options: &EncryptionFactoryOptions, |
| 225 | + schema: &SchemaRef, |
| 226 | + _file_path: &Path, |
| 227 | + ) -> Result<Option<FileEncryptionProperties>> { |
| 228 | + let config: EncryptionConfig = options.to_extension_options()?; |
| 229 | + |
| 230 | + // Generate a random encryption key for this file. |
| 231 | + let mut key = vec![0u8; 16]; |
| 232 | + OsRng.try_fill_bytes(&mut key).unwrap(); |
| 233 | + |
| 234 | + // Generate the key metadata that allows retrieving the key when reading the file. |
| 235 | + let key_metadata = wrap_key(&key); |
| 236 | + |
| 237 | + let mut builder = FileEncryptionProperties::builder(key.to_vec()) |
| 238 | + .with_footer_key_metadata(key_metadata.clone()); |
| 239 | + |
| 240 | + let encrypted_columns: HashSet<&str> = |
| 241 | + config.encrypted_columns.split(",").collect(); |
| 242 | + if !encrypted_columns.is_empty() { |
| 243 | + // Set up per-column encryption. |
| 244 | + for field in schema.fields().iter() { |
| 245 | + if encrypted_columns.contains(field.name().as_str()) { |
| 246 | + // Here we re-use the same key for all encrypted columns, |
| 247 | + // but new keys could also be generated per column. |
| 248 | + builder = builder.with_column_key_and_metadata( |
| 249 | + field.name().as_str(), |
| 250 | + key.clone(), |
| 251 | + key_metadata.clone(), |
| 252 | + ); |
| 253 | + } |
| 254 | + } |
| 255 | + } |
| 256 | + |
| 257 | + let encryption_properties = builder.build()?; |
| 258 | + |
| 259 | + Ok(Some(encryption_properties)) |
| 260 | + } |
| 261 | + |
| 262 | + /// Generate file decryption properties to use when reading a Parquet file. |
| 263 | + /// Rather than provide the AES keys directly for decryption, we set a `KeyRetriever` |
| 264 | + /// that can determine the keys using the encryption metadata. |
| 265 | + fn get_file_decryption_properties( |
| 266 | + &self, |
| 267 | + _options: &EncryptionFactoryOptions, |
| 268 | + _file_path: &Path, |
| 269 | + ) -> Result<Option<FileDecryptionProperties>> { |
| 270 | + let decryption_properties = |
| 271 | + FileDecryptionProperties::with_key_retriever(Arc::new(TestKeyRetriever {})) |
| 272 | + .build()?; |
| 273 | + Ok(Some(decryption_properties)) |
| 274 | + } |
| 275 | +} |
| 276 | + |
| 277 | +/// Mock implementation of encrypting a key that simply base64 encodes the key. |
| 278 | +/// Note that this is not a secure way to store encryption keys, |
| 279 | +/// and for production use keys should be encrypted with a KMS. |
| 280 | +fn wrap_key(key: &[u8]) -> Vec<u8> { |
| 281 | + base64::prelude::BASE64_STANDARD |
| 282 | + .encode(key) |
| 283 | + .as_bytes() |
| 284 | + .to_vec() |
| 285 | +} |
| 286 | + |
| 287 | +struct TestKeyRetriever {} |
| 288 | + |
| 289 | +impl KeyRetriever for TestKeyRetriever { |
| 290 | + /// Get a data encryption key using the metadata stored in the Parquet file. |
| 291 | + fn retrieve_key( |
| 292 | + &self, |
| 293 | + key_metadata: &[u8], |
| 294 | + ) -> datafusion::parquet::errors::Result<Vec<u8>> { |
| 295 | + let key_metadata = std::str::from_utf8(key_metadata)?; |
| 296 | + let key = base64::prelude::BASE64_STANDARD |
| 297 | + .decode(key_metadata) |
| 298 | + .unwrap(); |
| 299 | + Ok(key) |
| 300 | + } |
| 301 | +} |
0 commit comments