Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,6 @@ jobs:
path: ~/.cargo
key: cargo-cache-${{ steps.rust-toolchain.outputs.cachekey }}-${{ hashFiles('Cargo.lock') }}

- name: Check Formatting
if: ${{ matrix.python-version == '3.10' && matrix.toolchain == 'stable' }}
run: cargo fmt -- --check

- name: Run Clippy
if: ${{ matrix.python-version == '3.10' && matrix.toolchain == 'stable' }}
run: cargo clippy --all-targets --all-features -- -D clippy::all -D warnings -A clippy::redundant_closure
Expand Down Expand Up @@ -125,3 +121,19 @@ jobs:
cd examples/tpch
uv run --no-project python convert_data_to_parquet.py
uv run --no-project pytest _tests.py

nightly-fmt:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v5

- name: Setup Rust Toolchain
uses: dtolnay/rust-toolchain@stable
id: rust-toolchain
with:
toolchain: "nightly"
components: clippy,rustfmt

- name: Check Formatting
run: cargo +nightly fmt -- --check
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ repos:
- id: rust-fmt
name: Rust fmt
description: Run cargo fmt on files included in the commit. rustfmt should be installed before-hand.
entry: cargo fmt --all --
entry: cargo +nightly fmt --all --
pass_filenames: true
types: [file, rust]
language: system
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/rust_fmt.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
# under the License.

set -ex
cargo fmt --all -- --check
cargo +nightly fmt --all -- --check
19 changes: 19 additions & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

group_imports = "StdExternalCrate"
imports_granularity = "Module"
25 changes: 13 additions & 12 deletions src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,26 @@
// specific language governing permissions and limitations
// under the License.

use crate::dataset::Dataset;
use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionError, PyDataFusionResult};
use crate::table::PyTable;
use crate::utils::{validate_pycapsule, wait_for_future};
use std::any::Any;
use std::collections::HashSet;
use std::sync::Arc;

use async_trait::async_trait;
use datafusion::catalog::{MemoryCatalogProvider, MemorySchemaProvider};
use datafusion::common::DataFusionError;
use datafusion::{
catalog::{CatalogProvider, SchemaProvider},
datasource::TableProvider,
use datafusion::catalog::{
CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider,
};
use datafusion::common::DataFusionError;
use datafusion::datasource::TableProvider;
use datafusion_ffi::schema_provider::{FFI_SchemaProvider, ForeignSchemaProvider};
use pyo3::exceptions::PyKeyError;
use pyo3::prelude::*;
use pyo3::types::PyCapsule;
use pyo3::IntoPyObjectExt;
use std::any::Any;
use std::collections::HashSet;
use std::sync::Arc;

use crate::dataset::Dataset;
use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionError, PyDataFusionResult};
use crate::table::PyTable;
use crate::utils::{validate_pycapsule, wait_for_future};

#[pyclass(frozen, name = "RawCatalog", module = "datafusion.catalog", subclass)]
#[derive(Clone)]
Expand Down
4 changes: 2 additions & 2 deletions src/common/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use datafusion::arrow::array::Array;
use datafusion::arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
use datafusion::common::ScalarValue;
use datafusion::logical_expr::sqlparser::ast::NullTreatment as DFNullTreatment;
use pyo3::exceptions::PyNotImplementedError;
use pyo3::{exceptions::PyValueError, prelude::*};
use pyo3::exceptions::{PyNotImplementedError, PyValueError};
use pyo3::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
pub struct PyScalarValue(pub ScalarValue);
Expand Down
13 changes: 6 additions & 7 deletions src/common/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,25 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::borrow::Cow;
use std::fmt::{self, Display, Formatter};
use std::sync::Arc;
use std::{any::Any, borrow::Cow};

use arrow::datatypes::Schema;
use arrow::pyarrow::PyArrowType;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::common::Constraints;
use datafusion::datasource::TableType;
use datafusion::logical_expr::utils::split_conjunction;
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableSource};
use parking_lot::RwLock;
use pyo3::prelude::*;

use datafusion::logical_expr::utils::split_conjunction;

use super::data_type::DataTypeMap;
use super::function::SqlFunction;
use crate::sql::logical::PyLogicalPlan;

use super::{data_type::DataTypeMap, function::SqlFunction};

use parking_lot::RwLock;

#[pyclass(name = "SqlSchema", module = "datafusion.common", subclass, frozen)]
#[derive(Debug, Clone)]
pub struct SqlSchema {
Expand Down
5 changes: 2 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

use std::sync::Arc;

use datafusion::config::ConfigOptions;
use parking_lot::RwLock;
use pyo3::prelude::*;
use pyo3::types::*;

use datafusion::config::ConfigOptions;

use crate::errors::PyDataFusionResult;
use crate::utils::py_obj_to_scalar_value;
use parking_lot::RwLock;
#[pyclass(name = "Config", module = "datafusion", subclass, frozen)]
#[derive(Clone)]
pub(crate) struct PyConfig {
Expand Down
53 changes: 25 additions & 28 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,56 +23,53 @@ use std::sync::Arc;
use arrow::array::RecordBatchReader;
use arrow::ffi_stream::ArrowArrayStreamReader;
use arrow::pyarrow::FromPyArrow;
use datafusion::execution::session_state::SessionStateBuilder;
use object_store::ObjectStore;
use url::Url;
use uuid::Uuid;

use pyo3::exceptions::{PyKeyError, PyValueError};
use pyo3::prelude::*;

use crate::catalog::{PyCatalog, RustWrappedPyCatalogProvider};
use crate::dataframe::PyDataFrame;
use crate::dataset::Dataset;
use crate::errors::{py_datafusion_err, PyDataFusionResult};
use crate::expr::sort_expr::PySortExpr;
use crate::physical_plan::PyExecutionPlan;
use crate::record_batch::PyRecordBatchStream;
use crate::sql::exceptions::py_value_err;
use crate::sql::logical::PyLogicalPlan;
use crate::store::StorageContexts;
use crate::table::PyTable;
use crate::udaf::PyAggregateUDF;
use crate::udf::PyScalarUDF;
use crate::udtf::PyTableFunction;
use crate::udwf::PyWindowUDF;
use crate::utils::{get_global_ctx, spawn_future, validate_pycapsule, wait_for_future};
use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::catalog::CatalogProvider;
use datafusion::common::TableReference;
use datafusion::common::{exec_err, ScalarValue};
use datafusion::common::{exec_err, ScalarValue, TableReference};
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::MemTable;
use datafusion::datasource::TableProvider;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::execution::context::{
DataFilePaths, SQLOptions, SessionConfig, SessionContext, TaskContext,
};
use datafusion::execution::disk_manager::DiskManagerMode;
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, UnboundedMemoryPool};
use datafusion::execution::options::ReadOptions;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::prelude::{
AvroReadOptions, CsvReadOptions, DataFrame, NdJsonReadOptions, ParquetReadOptions,
};
use datafusion_ffi::catalog_provider::{FFI_CatalogProvider, ForeignCatalogProvider};
use object_store::ObjectStore;
use pyo3::exceptions::{PyKeyError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{PyCapsule, PyDict, PyList, PyTuple, PyType};
use pyo3::IntoPyObjectExt;
use url::Url;
use uuid::Uuid;

use crate::catalog::{PyCatalog, RustWrappedPyCatalogProvider};
use crate::dataframe::PyDataFrame;
use crate::dataset::Dataset;
use crate::errors::{py_datafusion_err, PyDataFusionResult};
use crate::expr::sort_expr::PySortExpr;
use crate::physical_plan::PyExecutionPlan;
use crate::record_batch::PyRecordBatchStream;
use crate::sql::exceptions::py_value_err;
use crate::sql::logical::PyLogicalPlan;
use crate::store::StorageContexts;
use crate::table::PyTable;
use crate::udaf::PyAggregateUDF;
use crate::udf::PyScalarUDF;
use crate::udtf::PyTableFunction;
use crate::udwf::PyWindowUDF;
use crate::utils::{get_global_ctx, spawn_future, validate_pycapsule, wait_for_future};

/// Configuration options for a SessionContext
#[pyclass(frozen, name = "SessionConfig", module = "datafusion", subclass)]
Expand Down
14 changes: 5 additions & 9 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use cstr::cstr;
use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::sync::Arc;
Expand All @@ -26,6 +25,7 @@ use arrow::error::ArrowError;
use arrow::ffi::FFI_ArrowSchema;
use arrow::ffi_stream::FFI_ArrowArrayStream;
use arrow::pyarrow::FromPyArrow;
use cstr::cstr;
use datafusion::arrow::datatypes::{Schema, SchemaRef};
use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow};
use datafusion::arrow::util::pretty;
Expand All @@ -40,27 +40,23 @@ use datafusion::logical_expr::SortExpr;
use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
use datafusion::prelude::*;
use futures::{StreamExt, TryStreamExt};
use parking_lot::Mutex;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::pybacked::PyBackedStr;
use pyo3::types::{PyCapsule, PyList, PyTuple, PyTupleMethods};
use pyo3::PyErr;

use crate::errors::{py_datafusion_err, PyDataFusionError};
use crate::expr::sort_expr::to_sort_expressions;
use crate::errors::{py_datafusion_err, PyDataFusionError, PyDataFusionResult};
use crate::expr::sort_expr::{to_sort_expressions, PySortExpr};
use crate::expr::PyExpr;
use crate::physical_plan::PyExecutionPlan;
use crate::record_batch::{poll_next_batch, PyRecordBatchStream};
use crate::sql::logical::PyLogicalPlan;
use crate::table::{PyTable, TempViewTable};
use crate::utils::{
is_ipython_env, py_obj_to_scalar_value, spawn_future, validate_pycapsule, wait_for_future,
};
use crate::{
errors::PyDataFusionResult,
expr::{sort_expr::PySortExpr, PyExpr},
};

use parking_lot::Mutex;

/// File-level static CStr for the Arrow array stream capsule name.
static ARROW_ARRAY_STREAM_NAME: &CStr = cstr!("arrow_array_stream");
Expand Down
17 changes: 7 additions & 10 deletions src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,22 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::catalog::Session;
use pyo3::exceptions::PyValueError;
/// Implements a Datafusion TableProvider that delegates to a PyArrow Dataset
/// This allows us to use PyArrow Datasets as Datafusion tables while pushing down projections and filters
use pyo3::prelude::*;
use pyo3::types::PyType;

use std::any::Any;
use std::sync::Arc;

use async_trait::async_trait;

use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::catalog::Session;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::{DataFusionError, Result as DFResult};
use datafusion::logical_expr::Expr;
use datafusion::logical_expr::TableProviderFilterPushDown;
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
use datafusion::physical_plan::ExecutionPlan;
use pyo3::exceptions::PyValueError;
/// Implements a Datafusion TableProvider that delegates to a PyArrow Dataset
/// This allows us to use PyArrow Datasets as Datafusion tables while pushing down projections and filters
use pyo3::prelude::*;
use pyo3::types::PyType;

use crate::dataset_exec::DatasetExec;
use crate::pyarrow_filter_expression::PyArrowFilterExpression;
Expand Down
17 changes: 7 additions & 10 deletions src/dataset_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,29 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
/// Implements a Datafusion physical ExecutionPlan that delegates to a PyArrow Dataset
/// This actually performs the projection, filtering and scanning of a Dataset
use pyo3::prelude::*;
use pyo3::types::{PyDict, PyIterator, PyList};

use std::any::Any;
use std::sync::Arc;

use futures::{stream, TryStreamExt};

use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::error::{ArrowError, Result as ArrowResult};
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError as InnerDataFusionError, Result as DFResult};
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::utils::conjunction;
use datafusion::logical_expr::Expr;
use datafusion::physical_expr::{EquivalenceProperties, LexOrdering};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
SendableRecordBatchStream, Statistics,
};
use futures::{stream, TryStreamExt};
/// Implements a Datafusion physical ExecutionPlan that delegates to a PyArrow Dataset
/// This actually performs the projection, filtering and scanning of a Dataset
use pyo3::prelude::*;
use pyo3::types::{PyDict, PyIterator, PyList};

use crate::errors::PyDataFusionResult;
use crate::pyarrow_filter_expression::PyArrowFilterExpression;
Expand Down
3 changes: 2 additions & 1 deletion src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use std::fmt::Debug;
use datafusion::arrow::error::ArrowError;
use datafusion::error::DataFusionError as InnerDataFusionError;
use prost::EncodeError;
use pyo3::{exceptions::PyException, PyErr};
use pyo3::exceptions::PyException;
use pyo3::PyErr;

pub type PyDataFusionResult<T> = std::result::Result<T, PyDataFusionError>;

Expand Down
Loading