diff --git a/datafusion/core/tests/core_integration.rs b/datafusion/core/tests/core_integration.rs index 250538b13370..e37a368f0771 100644 --- a/datafusion/core/tests/core_integration.rs +++ b/datafusion/core/tests/core_integration.rs @@ -45,6 +45,9 @@ mod optimizer; /// Run all tests that are found in the `physical_optimizer` directory mod physical_optimizer; +/// Run all tests that are found in the `schema_adapter` directory +mod schema_adapter; + /// Run all tests that are found in the `serde` directory mod serde; diff --git a/datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs b/datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs deleted file mode 100644 index e3d53a31c549..000000000000 --- a/datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs +++ /dev/null @@ -1,509 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Integration test for schema adapter factory functionality - -use std::any::Any; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use arrow::record_batch::RecordBatch; -use datafusion::datasource::object_store::ObjectStoreUrl; -use datafusion::datasource::physical_plan::arrow_file::ArrowSource; -use datafusion::prelude::*; -use datafusion_common::Result; -use datafusion_datasource::file::FileSource; -use datafusion_datasource::file_scan_config::FileScanConfigBuilder; -use datafusion_datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory}; -use datafusion_datasource::source::DataSourceExec; -use datafusion_datasource::PartitionedFile; -use std::sync::Arc; -use tempfile::TempDir; - -#[cfg(feature = "parquet")] -use datafusion_datasource_parquet::ParquetSource; -#[cfg(feature = "parquet")] -use parquet::arrow::ArrowWriter; -#[cfg(feature = "parquet")] -use parquet::file::properties::WriterProperties; - -#[cfg(feature = "csv")] -use datafusion_datasource_csv::CsvSource; - -/// A schema adapter factory that transforms column names to uppercase -#[derive(Debug)] -struct UppercaseAdapterFactory {} - -impl SchemaAdapterFactory for UppercaseAdapterFactory { - fn create(&self, schema: &Schema) -> Result> { - Ok(Box::new(UppercaseAdapter { - input_schema: Arc::new(schema.clone()), - })) - } -} - -/// Schema adapter that transforms column names to uppercase -#[derive(Debug)] -struct UppercaseAdapter { - input_schema: SchemaRef, -} - -impl SchemaAdapter for UppercaseAdapter { - fn adapt(&self, record_batch: RecordBatch) -> Result { - // In a real adapter, we might transform the data too - // For this test, we're just passing through the batch - Ok(record_batch) - } - - fn output_schema(&self) -> SchemaRef { - let fields = self - .input_schema - .fields() - .iter() - .map(|f| { - Field::new( - f.name().to_uppercase().as_str(), - f.data_type().clone(), - f.is_nullable(), - ) - }) - .collect(); - - Arc::new(Schema::new(fields)) - } -} - -#[cfg(feature = "parquet")] -#[tokio::test] -async fn test_parquet_integration_with_schema_adapter() -> Result<()> { - // Create a temporary directory for our test file - let tmp_dir = TempDir::new()?; - let file_path = tmp_dir.path().join("test.parquet"); - let file_path_str = file_path.to_str().unwrap(); - - // Create test data - let schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("name", DataType::Utf8, true), - ])); - - let batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])), - Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])), - ], - )?; - - // Write test parquet file - let file = std::fs::File::create(file_path_str)?; - let props = WriterProperties::builder().build(); - let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?; - writer.write(&batch)?; - writer.close()?; - - // Create a session context - let ctx = SessionContext::new(); - - // Create a ParquetSource with the adapter factory - let source = ParquetSource::default() - .with_schema_adapter_factory(Arc::new(UppercaseAdapterFactory {})); - - // Create a scan config - let config = FileScanConfigBuilder::new( - ObjectStoreUrl::parse(&format!("file://{}", file_path_str))?, - schema.clone(), - ) - .with_source(source) - .build(); - - // Create a data source executor - let exec = DataSourceExec::from_data_source(config); - - // Collect results - let task_ctx = ctx.task_ctx(); - let stream = exec.execute(0, task_ctx)?; - let batches = datafusion::physical_plan::common::collect(stream).await?; - - // There should be one batch - assert_eq!(batches.len(), 1); - - // Verify the schema has uppercase column names - let result_schema = batches[0].schema(); - assert_eq!(result_schema.field(0).name(), "ID"); - assert_eq!(result_schema.field(1).name(), "NAME"); - - Ok(()) -} - -#[cfg(feature = "parquet")] -#[tokio::test] -async fn test_parquet_integration_with_schema_adapter_and_expression_rewriter() -> Result<()> { - // Create a temporary directory for our test file - let tmp_dir = TempDir::new()?; - let file_path = tmp_dir.path().join("test.parquet"); - let file_path_str = file_path.to_str().unwrap(); - - // Create test data - let schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("name", DataType::Utf8, true), - ])); - - let batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])), - Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])), - ], - )?; - - // Write test parquet file - let file = std::fs::File::create(file_path_str)?; - let props = WriterProperties::builder().build(); - let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?; - writer.write(&batch)?; - writer.close()?; - - // Create a session context - let ctx = SessionContext::new(); - - // Create a ParquetSource with the adapter factory - let source = ParquetSource::default() - .with_schema_adapter_factory(Arc::new(UppercaseAdapterFactory {})); - - // Create a scan config - let config = FileScanConfigBuilder::new( - ObjectStoreUrl::parse(&format!("file://{}", file_path_str))?, - schema.clone(), - ) - .with_source(source) - .build(); - - // Create a data source executor - let exec = DataSourceExec::from_data_source(config); - - // Collect results - let task_ctx = ctx.task_ctx(); - let stream = exec.execute(0, task_ctx)?; - let batches = datafusion::physical_plan::common::collect(stream).await?; - - // There should be one batch - assert_eq!(batches.len(), 1); - - // Verify the schema has uppercase column names - let result_schema = batches[0].schema(); - assert_eq!(result_schema.field(0).name(), "ID"); - assert_eq!(result_schema.field(1).name(), "NAME"); - - Ok(()) -} - - -#[tokio::test] -async fn test_multi_source_schema_adapter_reuse() -> Result<()> { - // This test verifies that the same schema adapter factory can be reused - // across different file source types. This is important for ensuring that: - // 1. The schema adapter factory interface works uniformly across all source types - // 2. The factory can be shared and cloned efficiently using Arc - // 3. Various data source implementations correctly implement the schema adapter factory pattern - - // Create a test factory - let factory = Arc::new(UppercaseAdapterFactory {}); - - // Apply the same adapter to different source types - let arrow_source = - ArrowSource::default().with_schema_adapter_factory(factory.clone()); - - #[cfg(feature = "parquet")] - let parquet_source = - ParquetSource::default().with_schema_adapter_factory(factory.clone()); - - #[cfg(feature = "csv")] - let csv_source = CsvSource::default().with_schema_adapter_factory(factory.clone()); - - // Verify adapters were properly set - assert!(arrow_source.schema_adapter_factory().is_some()); - - #[cfg(feature = "parquet")] - assert!(parquet_source.schema_adapter_factory().is_some()); - - #[cfg(feature = "csv")] - assert!(csv_source.schema_adapter_factory().is_some()); - - Ok(()) -} - -// Helper function to test From for Arc implementations -fn test_from_impl> + Default>(expected_file_type: &str) { - let source = T::default(); - let file_source: Arc = source.into(); - assert_eq!(file_source.file_type(), expected_file_type); -} - -#[test] -fn test_from_implementations() { - // Test From implementation for various sources - test_from_impl::("arrow"); - - #[cfg(feature = "parquet")] - test_from_impl::("parquet"); - - #[cfg(feature = "csv")] - test_from_impl::("csv"); - - #[cfg(feature = "json")] - test_from_impl::("json"); -} - -/// A simple test schema adapter factory that doesn't modify the schema -#[derive(Debug)] -struct TestSchemaAdapterFactory {} - -impl SchemaAdapterFactory for TestSchemaAdapterFactory { - fn create(&self, schema: &Schema) -> Result> { - Ok(Box::new(TestSchemaAdapter { - input_schema: Arc::new(schema.clone()), - })) - } -} - -/// A test schema adapter that passes through data unmodified -#[derive(Debug)] -struct TestSchemaAdapter { - input_schema: SchemaRef, -} - -impl SchemaAdapter for TestSchemaAdapter { - fn adapt(&self, record_batch: RecordBatch) -> Result { - // Just pass through the batch unmodified - Ok(record_batch) - } - - fn output_schema(&self) -> SchemaRef { - self.input_schema.clone() - } -} - -#[cfg(feature = "parquet")] -#[test] -fn test_schema_adapter_preservation() { - // Create a test schema - let schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("name", DataType::Utf8, true), - ])); - - // Create source with schema adapter factory - let source = ParquetSource::default(); - let factory = Arc::new(TestSchemaAdapterFactory {}); - let file_source = source.with_schema_adapter_factory(factory); - - // Create a FileScanConfig with the source - let config_builder = - FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), schema.clone()) - .with_source(file_source.clone()) - // Add a file to make it valid - .with_file(PartitionedFile::new("test.parquet", 100)); - - let config = config_builder.build(); - - // Verify the schema adapter factory is present in the file source - assert!(config.source().schema_adapter_factory().is_some()); -} - - -/// A test source for testing schema adapters -#[derive(Debug, Clone)] -struct TestSource { - schema_adapter_factory: Option>, -} - -impl TestSource { - fn new() -> Self { - Self { - schema_adapter_factory: None, - } - } -} - -impl FileSource for TestSource { - fn file_type(&self) -> &str { - "test" - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn create_file_opener( - &self, - _store: Arc, - _conf: &FileScanConfig, - _index: usize, - ) -> Arc { - unimplemented!("Not needed for this test") - } - - fn with_batch_size(&self, _batch_size: usize) -> Arc { - Arc::new(self.clone()) - } - - fn with_schema(&self, _schema: SchemaRef) -> Arc { - Arc::new(self.clone()) - } - - fn with_projection(&self, _projection: &FileScanConfig) -> Arc { - Arc::new(self.clone()) - } - - fn with_statistics(&self, _statistics: Statistics) -> Arc { - Arc::new(self.clone()) - } - - fn metrics(&self) -> &ExecutionPlanMetricsSet { - unimplemented!("Not needed for this test") - } - - fn statistics(&self) -> Result { - Ok(Statistics::default()) - } - - fn with_schema_adapter_factory( - &self, - schema_adapter_factory: Arc, - ) -> Result> { - Ok(Arc::new(Self { - schema_adapter_factory: Some(schema_adapter_factory), - })) - } - - fn schema_adapter_factory(&self) -> Option> { - self.schema_adapter_factory.clone() - } -} - -/// A test schema adapter factory -#[derive(Debug)] -struct TestSchemaAdapterFactory {} - -impl SchemaAdapterFactory for TestSchemaAdapterFactory { - fn create( - &self, - projected_table_schema: SchemaRef, - _table_schema: SchemaRef, - ) -> Box { - Box::new(TestSchemaAdapter { - table_schema: projected_table_schema, - }) - } -} - -/// A test schema adapter implementation -#[derive(Debug)] -struct TestSchemaAdapter { - table_schema: SchemaRef, -} - -impl SchemaAdapter for TestSchemaAdapter { - fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { - let field = self.table_schema.field(index); - file_schema.fields.find(field.name()).map(|(i, _)| i) - } - - fn map_schema( - &self, - file_schema: &Schema, - ) -> Result<(Arc, Vec)> { - let mut projection = Vec::with_capacity(file_schema.fields().len()); - for (file_idx, file_field) in file_schema.fields().iter().enumerate() { - if self.table_schema.fields().find(file_field.name()).is_some() { - projection.push(file_idx); - } - } - - Ok((Arc::new(TestSchemaMapping {}), projection)) - } -} - -/// A test schema mapper implementation -#[derive(Debug)] -struct TestSchemaMapping {} - -impl SchemaMapper for TestSchemaMapping { - fn map_batch(&self, batch: RecordBatch) -> Result { - // For testing, just return the original batch - Ok(batch) - } - - fn map_column_statistics( - &self, - stats: &[ColumnStatistics], - ) -> Result> { - // For testing, just return the input statistics - Ok(stats.to_vec()) - } -} - -#[test] -fn test_schema_adapter() { - // This test verifies the functionality of the SchemaAdapter and SchemaAdapterFactory - // components used in DataFusion's file sources. - // - // The test specifically checks: - // 1. Creating and attaching a schema adapter factory to a file source - // 2. Creating a schema adapter using the factory - // 3. The schema adapter's ability to map column indices between a table schema and a file schema - // 4. The schema adapter's ability to create a projection that selects only the columns - // from the file schema that are present in the table schema - // - // Schema adapters are used when the schema of data in files doesn't exactly match - // the schema expected by the query engine, allowing for field mapping and data transformation. - - // Create a test schema - let table_schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("name", DataType::Utf8, true), - ])); - - // Create a file schema - let file_schema = Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("name", DataType::Utf8, true), - Field::new("extra", DataType::Int64, true), - ]); - - // Create a TestSource - let source = TestSource::new(); - assert!(source.schema_adapter_factory().is_none()); - - // Add a schema adapter factory - let factory = Arc::new(TestSchemaAdapterFactory {}); - let source_with_adapter = source.with_schema_adapter_factory(factory).unwrap(); - assert!(source_with_adapter.schema_adapter_factory().is_some()); - - // Create a schema adapter - let adapter_factory = source_with_adapter.schema_adapter_factory().unwrap(); - let adapter = - adapter_factory.create(Arc::clone(&table_schema), Arc::clone(&table_schema)); - - // Test mapping column index - assert_eq!(adapter.map_column_index(0, &file_schema), Some(0)); - assert_eq!(adapter.map_column_index(1, &file_schema), Some(1)); - - // Test creating schema mapper - let (_mapper, projection) = adapter.map_schema(&file_schema).unwrap(); - assert_eq!(projection, vec![0, 1]); -} diff --git a/datafusion/core/tests/parquet/schema_adapter.rs b/datafusion/core/tests/parquet/schema_adapter.rs index 2bfd9bd6b842..f9a46f2e240f 100644 --- a/datafusion/core/tests/parquet/schema_adapter.rs +++ b/datafusion/core/tests/parquet/schema_adapter.rs @@ -26,6 +26,7 @@ use datafusion::common::Result; use datafusion::datasource::listing::{ListingTable, ListingTableConfig}; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::DataFusionError; use datafusion_common::{ColumnStatistics, ScalarValue}; use datafusion_datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, SchemaMapper, @@ -187,7 +188,7 @@ impl PhysicalExprAdapter for CustomPhysicalExprAdapter { .logical_file_schema .field_with_name(field_name) .map_err(|_| { - datafusion_common::DataFusionError::Plan(format!( + DataFusionError::Plan(format!( "Field '{field_name}' not found in logical file schema", )) })?; diff --git a/datafusion/core/tests/schema_adapter/mod.rs b/datafusion/core/tests/schema_adapter/mod.rs new file mode 100644 index 000000000000..2f81a43f4736 --- /dev/null +++ b/datafusion/core/tests/schema_adapter/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod schema_adapter_integration_tests; diff --git a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs new file mode 100644 index 000000000000..c3c92a9028d6 --- /dev/null +++ b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs @@ -0,0 +1,363 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow::array::RecordBatch; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use bytes::{BufMut, BytesMut}; +use datafusion::common::Result; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::physical_plan::{ + ArrowSource, CsvSource, FileSource, JsonSource, ParquetSource, +}; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::SessionContext; +use datafusion_common::ColumnStatistics; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::schema_adapter::{ + SchemaAdapter, SchemaAdapterFactory, SchemaMapper, +}; +use datafusion_datasource::source::DataSourceExec; +use datafusion_execution::object_store::ObjectStoreUrl; +use object_store::{memory::InMemory, path::Path, ObjectStore}; +use parquet::arrow::ArrowWriter; + +async fn write_parquet(batch: RecordBatch, store: Arc, path: &str) { + let mut out = BytesMut::new().writer(); + { + let mut writer = ArrowWriter::try_new(&mut out, batch.schema(), None).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + } + let data = out.into_inner().freeze(); + store.put(&Path::from(path), data.into()).await.unwrap(); +} + +/// A schema adapter factory that transforms column names to uppercase +#[derive(Debug, PartialEq)] +struct UppercaseAdapterFactory {} + +impl SchemaAdapterFactory for UppercaseAdapterFactory { + fn create( + &self, + projected_table_schema: SchemaRef, + _table_schema: SchemaRef, + ) -> Box { + Box::new(UppercaseAdapter { + table_schema: projected_table_schema, + }) + } +} + +/// Schema adapter that transforms column names to uppercase +#[derive(Debug)] +struct UppercaseAdapter { + table_schema: SchemaRef, +} + +impl SchemaAdapter for UppercaseAdapter { + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { + let field = self.table_schema.field(index); + let uppercase_name = field.name().to_uppercase(); + file_schema + .fields() + .iter() + .position(|f| f.name().to_uppercase() == uppercase_name) + } + + fn map_schema( + &self, + file_schema: &Schema, + ) -> Result<(Arc, Vec)> { + let mut projection = Vec::new(); + + // Map each field in the table schema to the corresponding field in the file schema + for table_field in self.table_schema.fields() { + let uppercase_name = table_field.name().to_uppercase(); + if let Some(pos) = file_schema + .fields() + .iter() + .position(|f| f.name().to_uppercase() == uppercase_name) + { + projection.push(pos); + } + } + + let mapper = UppercaseSchemaMapper { + output_schema: self.output_schema(), + projection: projection.clone(), + }; + + Ok((Arc::new(mapper), projection)) + } +} + +impl UppercaseAdapter { + fn output_schema(&self) -> SchemaRef { + let fields: Vec = self + .table_schema + .fields() + .iter() + .map(|f| { + Field::new( + f.name().to_uppercase().as_str(), + f.data_type().clone(), + f.is_nullable(), + ) + }) + .collect(); + + Arc::new(Schema::new(fields)) + } +} + +#[derive(Debug)] +struct UppercaseSchemaMapper { + output_schema: SchemaRef, + projection: Vec, +} + +impl SchemaMapper for UppercaseSchemaMapper { + fn map_batch(&self, batch: RecordBatch) -> Result { + let columns = self + .projection + .iter() + .map(|&i| batch.column(i).clone()) + .collect::>(); + Ok(RecordBatch::try_new(self.output_schema.clone(), columns)?) + } + + fn map_column_statistics( + &self, + stats: &[ColumnStatistics], + ) -> Result> { + Ok(self + .projection + .iter() + .map(|&i| stats.get(i).cloned().unwrap_or_default()) + .collect()) + } +} + +#[cfg(feature = "parquet")] +#[tokio::test] +async fn test_parquet_integration_with_schema_adapter() -> Result<()> { + // Create test data + let batch = RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])), + vec![ + Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])), + Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])), + ], + )?; + + let store = Arc::new(InMemory::new()) as Arc; + let store_url = ObjectStoreUrl::parse("memory://").unwrap(); + let path = "test.parquet"; + write_parquet(batch.clone(), store.clone(), path).await; + + // Get the actual file size from the object store + let object_meta = store.head(&Path::from(path)).await?; + let file_size = object_meta.size; + + // Create a session context and register the object store + let ctx = SessionContext::new(); + ctx.register_object_store(store_url.as_ref(), Arc::clone(&store)); + + // Create a ParquetSource with the adapter factory + let file_source = ParquetSource::default() + .with_schema_adapter_factory(Arc::new(UppercaseAdapterFactory {}))?; + + // Create a table schema with uppercase column names + let table_schema = Arc::new(Schema::new(vec![ + Field::new("ID", DataType::Int32, false), + Field::new("NAME", DataType::Utf8, true), + ])); + + let config = FileScanConfigBuilder::new(store_url, table_schema.clone(), file_source) + .with_file(PartitionedFile::new(path, file_size)) + .build(); + + // Create a data source executor + let exec = DataSourceExec::from_data_source(config); + + // Collect results + let task_ctx = ctx.task_ctx(); + let stream = exec.execute(0, task_ctx)?; + let batches = datafusion::physical_plan::common::collect(stream).await?; + + // There should be one batch + assert_eq!(batches.len(), 1); + + // Verify the schema has the uppercase column names + let result_schema = batches[0].schema(); + assert_eq!(result_schema.field(0).name(), "ID"); + assert_eq!(result_schema.field(1).name(), "NAME"); + + Ok(()) +} + +#[cfg(feature = "parquet")] +#[tokio::test] +async fn test_parquet_integration_with_schema_adapter_and_expression_rewriter( +) -> Result<()> { + // Create test data + let batch = RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])), + vec![ + Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])), + Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])), + ], + )?; + + let store = Arc::new(InMemory::new()) as Arc; + let store_url = ObjectStoreUrl::parse("memory://").unwrap(); + let path = "test.parquet"; + write_parquet(batch.clone(), store.clone(), path).await; + + // Get the actual file size from the object store + let object_meta = store.head(&Path::from(path)).await?; + let file_size = object_meta.size; + + // Create a session context and register the object store + let ctx = SessionContext::new(); + ctx.register_object_store(store_url.as_ref(), Arc::clone(&store)); + + // Create a ParquetSource with the adapter factory + let file_source = ParquetSource::default() + .with_schema_adapter_factory(Arc::new(UppercaseAdapterFactory {}))?; + + let config = FileScanConfigBuilder::new(store_url, batch.schema(), file_source) + .with_file(PartitionedFile::new(path, file_size)) + .build(); + + // Create a data source executor + let exec = DataSourceExec::from_data_source(config); + + // Collect results + let task_ctx = ctx.task_ctx(); + let stream = exec.execute(0, task_ctx)?; + let batches = datafusion::physical_plan::common::collect(stream).await?; + + // There should be one batch + assert_eq!(batches.len(), 1); + + // Verify the schema has the original column names (schema adapter not applied in DataSourceExec) + let result_schema = batches[0].schema(); + assert_eq!(result_schema.field(0).name(), "id"); + assert_eq!(result_schema.field(1).name(), "name"); + + Ok(()) +} + +#[tokio::test] +async fn test_multi_source_schema_adapter_reuse() -> Result<()> { + // This test verifies that the same schema adapter factory can be reused + // across different file source types. This is important for ensuring that: + // 1. The schema adapter factory interface works uniformly across all source types + // 2. The factory can be shared and cloned efficiently using Arc + // 3. Various data source implementations correctly implement the schema adapter factory pattern + + // Create a test factory + let factory = Arc::new(UppercaseAdapterFactory {}); + + // Test ArrowSource + { + let source = ArrowSource::default(); + let source_with_adapter = source + .clone() + .with_schema_adapter_factory(factory.clone()) + .unwrap(); + + let base_source: Arc = source.into(); + assert!(base_source.schema_adapter_factory().is_none()); + assert!(source_with_adapter.schema_adapter_factory().is_some()); + + let retrieved_factory = source_with_adapter.schema_adapter_factory().unwrap(); + assert_eq!( + format!("{:?}", retrieved_factory.as_ref()), + format!("{:?}", factory.as_ref()) + ); + } + + // Test ParquetSource + #[cfg(feature = "parquet")] + { + let source = ParquetSource::default(); + let source_with_adapter = source + .clone() + .with_schema_adapter_factory(factory.clone()) + .unwrap(); + + let base_source: Arc = source.into(); + assert!(base_source.schema_adapter_factory().is_none()); + assert!(source_with_adapter.schema_adapter_factory().is_some()); + + let retrieved_factory = source_with_adapter.schema_adapter_factory().unwrap(); + assert_eq!( + format!("{:?}", retrieved_factory.as_ref()), + format!("{:?}", factory.as_ref()) + ); + } + + // Test CsvSource + { + let source = CsvSource::default(); + let source_with_adapter = source + .clone() + .with_schema_adapter_factory(factory.clone()) + .unwrap(); + + let base_source: Arc = source.into(); + assert!(base_source.schema_adapter_factory().is_none()); + assert!(source_with_adapter.schema_adapter_factory().is_some()); + + let retrieved_factory = source_with_adapter.schema_adapter_factory().unwrap(); + assert_eq!( + format!("{:?}", retrieved_factory.as_ref()), + format!("{:?}", factory.as_ref()) + ); + } + + // Test JsonSource + { + let source = JsonSource::default(); + let source_with_adapter = source + .clone() + .with_schema_adapter_factory(factory.clone()) + .unwrap(); + + let base_source: Arc = source.into(); + assert!(base_source.schema_adapter_factory().is_none()); + assert!(source_with_adapter.schema_adapter_factory().is_some()); + + let retrieved_factory = source_with_adapter.schema_adapter_factory().unwrap(); + assert_eq!( + format!("{:?}", retrieved_factory.as_ref()), + format!("{:?}", factory.as_ref()) + ); + } + + Ok(()) +}