diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 4327184058..5cb06d1d37 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -138,7 +138,7 @@ pub struct ArrowReader { impl ArrowReader { /// Take a stream of FileScanTasks and reads all the files. /// Returns a stream of Arrow RecordBatches containing the data from the files - pub async fn read(self, tasks: FileScanTaskStream) -> Result { + pub fn read(self, tasks: FileScanTaskStream) -> Result { let file_io = self.file_io.clone(); let batch_size = self.batch_size; let concurrency_limit_data_files = self.concurrency_limit_data_files; @@ -1751,7 +1751,6 @@ message schema { let result = reader .read(tasks) - .await .unwrap() .try_collect::>() .await diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index e987de859f..3d14b3cce4 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -437,10 +437,7 @@ impl TableScan { arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); } - arrow_reader_builder - .build() - .read(self.plan_files().await?) - .await + arrow_reader_builder.build().read(self.plan_files().await?) } /// Returns a reference to the column names of the table scan. @@ -1332,14 +1329,12 @@ pub mod tests { let batch_stream = reader .clone() .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) - .await .unwrap(); let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap(); let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build(); let batch_stream = reader .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) - .await .unwrap(); let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();