Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions crates/polars-plan/dsl-schema-hashes.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,11 @@
"HivePathProvider": "7f2a8a61152d613b4015ee4fd7907971e4dfce1e4c1afefb8808672237403327",
"IcebergColumn": "171ff56c222358389754a7ff774eec6fc958478df2317720c63b4addc8f9a4c5",
"IcebergColumnType": "e612983b0dfce78d172af2e4bb4726e3303ede09ea3c1de8ec40e12ee7922dac",
"IcebergCommitMode": "330f974fb44353b3b95c1004864b84d88a03e756120e1a14385f84be8d4b5245",
"IcebergIdentityTransformedPartitionFields": "a9ea26367a6a3a97560aa9010f711a211cabfbffb6a318cb834ceccc672d3ae1",
"IcebergPathProvider": "20732e7c4d3e6386d7e2a9675973ea4527a4e474a194c9a837a73839acbef715",
"IcebergSchema": "2341b76e5aca7780e28fcee6bd7a2650ce7a9df61e043b839dd3e74bd95efb3b",
"IcebergSinkState": "999c6e25060303d50a96d367e9f9084a78a9fc37f40a9abf92ea7a42aade89d9",
"IntDataTypeExpr": "cd66dcd9c44cdddd8864c0fe642e5fcef5263f6f142cce906011a0180e0fd161",
"IntegerType": "2e73fb811a2830b8b114dfe914512bfa6031325da9ea5513875a6e49b6ab1a58",
"InterpolationMethod": "157b72c21c66950baafe8033836c3335571d2f227dd882ba6b9c8d3e2f5928d3",
Expand Down Expand Up @@ -161,7 +163,8 @@
"SeriesColumn": "b57487b5f7afd368f85a5c22b45547d03ddd4ecd746e73d8d3ba68dc1236b84a",
"SetOperation": "88195de86227bd4aaff8cd7e1ba5c696907e036d128f380c0ed610eb6e77299a",
"SinkTarget": "0faaddc3196c89bd9dcf872bbc4304471855dff7f9d24107ef279bc06ef7cbb4",
"SinkType": "d0b7209ad6f7b18504f6454514e27e2027dfdf2c429e5f2791aec8d9cb400099",
"SinkType": "59b1af4a031de93b99cd27fc20c4e4f022f0bfb6f04f0e1571cacd3d89e5f440",
"SinkedPathsCallback": "d2b51d62ec90a46a0971315f694d4242b16b8fbbb51f9d660a511259300ee6c3",
"Slice": "a77ca4a44c184f1d4b63ee03b67d9cf751eb3597efc02b48be0bbb0d0ed16095",
"SortMultipleOptions": "51948d89a3c050f01736eb0a74de70cdfa2b6d0775299b57913ce85f6e4fcc23",
"SortOptions": "bb71e924805d71398f85a2fb7fd961bd9a742b2e9fde8f5adf12fdc0e2dc10aa",
Expand All @@ -185,7 +188,7 @@
"TrigonometricFunction": "9444fa00e47ea519496e1242418c2383101508ddd0dcec6174a6175f4e6d5371",
"UnicodeForm": "f539f29f54ef29faede48a9842191bf0c0ca7206e4f7d32ef1a54972b4a0cae5",
"UnifiedScanArgs": "2234b970de3c35d0918eb525d41ca3e995ac3343afd7f9c1b03337bda6dff93e",
"UnifiedSinkArgs": "6049272153d058150d38669187386b9fab2e376dff21418948e3c6f257b50cc9",
"UnifiedSinkArgs": "8364f31ac108b8d618f6bae43adfdcd2d548b3ba47cfa9e53c37fccbd0630f11",
"UnionArgs": "98eb7fd93d1a3a6d7cb3e5fffd16e3536efb11344e1140a8763b21ee1d16d513",
"UniqueId": "4cd0b4f653d64777df264faff1f08e1f1318915656c11642d852f60e9bf17f64",
"UniqueKeepStrategy": "76e65109633976c30388deeb78ffe892e92c6730511addcbe1156f9e7e8adfa1",
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/client/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub(super) fn assert_cloud_eligible(dsl: &DslPlan, allow_local_scans: bool) -> P
// The sink destination is passed around separately, can't check the
// eligibility here.
},
SinkType::Iceberg(_) => {},
}
},
_ => (),
Expand Down
92 changes: 92 additions & 0 deletions crates/polars-plan/src/dsl/options/iceberg_sink_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::collections::BTreeMap;

use polars_utils::pl_str::PlSmallStr;
#[cfg(feature = "python")]
use polars_utils::python_function::PythonObject;

/// `class IcebergSinkState` in Python
#[derive(Clone, PartialEq, Debug, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
#[cfg_attr(feature = "python", derive(pyo3::IntoPyObject, pyo3::FromPyObject))]
pub struct IcebergSinkState {
/// Python class module
pub py_catalog_class_module: PlSmallStr,
/// Python class qualname
pub py_catalog_class_qualname: PlSmallStr,

pub catalog_name: PlSmallStr,
pub catalog_properties: BTreeMap<PlSmallStr, PlSmallStr>,

pub table_name: PlSmallStr,
pub mode: IcebergCommitMode,
pub iceberg_storage_properties: BTreeMap<PlSmallStr, PlSmallStr>,

pub sink_uuid_str: String,

#[cfg(feature = "python")]
pub table_: Option<PythonObject>, // NoPickleOption[pyiceberg.table.Table]

#[cfg(feature = "python")]
pub commit_result_df: Option<PythonObject>, // NoPickleOption[pl.DataFrame]
}

#[derive(Copy, Clone, PartialEq, Debug, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
pub enum IcebergCommitMode {
Append,
Overwrite,
}

#[cfg(feature = "python")]
mod _python_impl {
use std::convert::Infallible;

use pyo3::exceptions::PyValueError;
use pyo3::pybacked::PyBackedStr;
use pyo3::types::PyString;
use pyo3::{Borrowed, Bound, FromPyObject, IntoPyObject, Py, PyAny, PyErr, PyResult, Python};

use super::{IcebergCommitMode, IcebergSinkState};

impl IcebergSinkState {
pub(crate) fn into_sink_state_obj(self) -> PyResult<Py<PyAny>> {
Python::attach(|py| {
polars_utils::python_convert_registry::get_python_convert_registry()
.py_iceberg_sink_state_class()
.call(py, (), Some(&self.into_pyobject(py)?))
})
}
}

impl<'py> IntoPyObject<'py> for IcebergCommitMode {
type Target = PyString;
type Output = Bound<'py, Self::Target>;
type Error = Infallible;

fn into_pyobject(self, py: pyo3::Python<'py>) -> Result<Self::Output, Self::Error> {
match self {
Self::Append => "append",
Self::Overwrite => "overwrite",
}
.into_pyobject(py)
}
}

impl<'a, 'py> FromPyObject<'a, 'py> for IcebergCommitMode {
type Error = PyErr;

fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
Ok(match &*ob.extract::<PyBackedStr>()? {
"append" => Self::Append,
"overwrite" => Self::Overwrite,
v => {
return Err(PyValueError::new_err(format!(
"invalid iceberg commit mode: '{v}'"
)));
},
})
}
}
}
1 change: 1 addition & 0 deletions crates/polars-plan/src/dsl/options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::num::NonZeroUsize;
use std::sync::Arc;

pub mod file_provider;
pub mod iceberg_sink_state;
pub mod sink;
pub use polars_config::Engine;
use polars_core::error::PolarsResult;
Expand Down
59 changes: 49 additions & 10 deletions crates/polars-plan/src/dsl/options/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use polars_core::error::PolarsResult;
use polars_core::frame::DataFrame;
use polars_core::prelude::PlHashSet;
use polars_core::schema::Schema;
use polars_error::feature_gated;
use polars_io::cloud::CloudOptions;
use polars_io::metrics::IOMetrics;
use polars_io::utils::file::Writeable;
Expand All @@ -19,6 +20,7 @@ use polars_utils::pl_str::PlSmallStr;

use super::FileWriteFormat;
use crate::dsl::file_provider::FileProviderType;
use crate::dsl::iceberg_sink_state::IcebergSinkState;
use crate::dsl::{AExpr, Expr, SpecialEq};
use crate::plans::{ExprIR, ToFieldContext};
use crate::prelude::PlanCallback;
Expand Down Expand Up @@ -227,6 +229,7 @@ pub enum SinkType {
Callback(CallbackSinkType),
File(FileSinkOptions),
Partitioned(PartitionedSinkOptions),
Iceberg(IcebergSinkState),
}

#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
Expand Down Expand Up @@ -465,7 +468,13 @@ pub struct FileSinkOptions {
pub unified_sink_args: UnifiedSinkArgs,
}

pub type SinkedPathsCallback = PlanCallback<SinkedPathsCallbackArgs, ()>;
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
#[derive(Clone, Debug, Hash, PartialEq)]
pub enum SinkedPathsCallback {
IcebergCommit(IcebergSinkState),
Callback(PlanCallback<SinkedPathsCallbackArgs, ()>),
}

#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
Expand All @@ -482,19 +491,48 @@ pub struct SinkedPathInfo {
}

impl SinkedPathsCallback {
pub fn call_(&self, args: SinkedPathsCallbackArgs) -> PolarsResult<()> {
pub fn call(&self, args: SinkedPathsCallbackArgs) -> PolarsResult<()> {
use PlanCallback as CB;

match self {
Self::Rust(func) => (func)(args),
Self::IcebergCommit(sink_state) => {
feature_gated!("python", {
use pyo3::Python;

Python::attach(|py| {
use pyo3::intern;
use pyo3::types::PyList;

let py_paths = PyList::empty(py);

let SinkedPathsCallbackArgs { path_info_list } = args;

for SinkedPathInfo { path } in path_info_list {
use pyo3::types::PyListMethods;

let path: &str = path.as_str();

py_paths.append(path)?;
}

sink_state.clone().into_sink_state_obj()?.call_method1(
py,
intern!(py, "commit"),
(py_paths,),
)?;

PolarsResult::Ok(())
})
})
},
Self::Callback(CB::Rust(func)) => (func)(args),
#[cfg(feature = "python")]
Self::Python(object) => pyo3::Python::attach(|py| {
Self::Callback(CB::Python(object)) => pyo3::Python::attach(|py| {
use pyo3::intern;
use pyo3::types::{PyAnyMethods, PyDict, PyList};

let SinkedPathsCallbackArgs { path_info_list } = args;

let convert_registry =
polars_utils::python_convert_registry::get_python_convert_registry();

let py_paths = PyList::empty(py);

for SinkedPathInfo { path } in path_info_list {
Expand All @@ -508,9 +546,10 @@ impl SinkedPathsCallback {
let kwargs = PyDict::new(py);
kwargs.set_item(intern!(py, "paths"), py_paths)?;

let args_dataclass = convert_registry
.py_sinked_paths_callback_args_dataclass()
.call(py, (), Some(&kwargs))?;
let args_dataclass =
polars_utils::python_convert_registry::get_python_convert_registry()
.py_sinked_paths_callback_args_dataclass()
.call(py, (), Some(&kwargs))?;

object.call1(py, (args_dataclass,))?;

Expand Down
45 changes: 45 additions & 0 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use futures::stream::FuturesUnordered;
use hive::hive_partitions_from_paths;
use polars_core::chunked_array::cast::CastOptions;
use polars_core::config::verbose;
use polars_error::feature_gated;
use polars_io::ExternalCompression;
use polars_io::pl_async::get_runtime;
use polars_utils::format_pl_smallstr;
Expand Down Expand Up @@ -1354,10 +1355,54 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
}
},
DslPlan::Sink { input, payload } => {
if let SinkType::Iceberg(state) = payload {
feature_gated!("python", {
use polars_utils::python_convert_registry::get_python_convert_registry;
use pyo3::{Python, intern};

use crate::dsl::sink::SinkedPathsCallback;

let py_sink_state = state.clone().into_sink_state_obj()?;

let reg = get_python_convert_registry();

let py_lf = (reg.to_py.dsl_plan)(input.as_ref())?;

let out = Python::attach(|py| {
py_sink_state.call_method1(
py,
intern!(py, "_attach_resolved_sink"),
(py_lf,),
)
})?;

let mut plan: Box<DslPlan> = (reg.from_py.dsl_plan)(out)?.downcast().unwrap();

let DslPlan::Sink {
input: _,
payload:
SinkType::Partitioned(PartitionedSinkOptions {
unified_sink_args, ..
}),
} = plan.as_mut()
else {
unreachable!()
};

debug_assert!(unified_sink_args.sinked_paths_callback.is_none());

unified_sink_args.sinked_paths_callback =
Some(SinkedPathsCallback::IcebergCommit(state));

return to_alp_impl(*plan, ctxt);
})
}

let input =
to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(sink)))?;
let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
let payload = match payload {
SinkType::Iceberg(_) => unreachable!(),
SinkType::Memory => SinkTypeIR::Memory,
SinkType::Callback(f) => SinkTypeIR::Callback(f),
SinkType::File(mut options) => {
Expand Down
8 changes: 6 additions & 2 deletions crates/polars-python/src/io/sink_options.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use polars::prelude::sink::SinkedPathsCallback;
use polars::prelude::sync_on_close::SyncOnCloseType;
use polars::prelude::{CloudScheme, PlanCallback, SpecialEq, UnifiedSinkArgs};
use polars_utils::python_function::PythonObject;
Expand Down Expand Up @@ -53,8 +54,11 @@ impl PySinkOptions<'_> {
maintain_order,
sync_on_close,
cloud_options: cloud_options.map(Arc::new),
sinked_paths_callback: sinked_paths_callback
.map(|x| PlanCallback::Python(SpecialEq::new(Arc::new(PythonObject(x))))),
sinked_paths_callback: sinked_paths_callback.map(|x| {
SinkedPathsCallback::Callback(PlanCallback::Python(SpecialEq::new(Arc::new(
PythonObject(x),
))))
}),
};

Ok(unified_sink_args)
Expand Down
13 changes: 13 additions & 0 deletions crates/polars-python/src/lazyframe/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use parking_lot::Mutex;
#[cfg(feature = "pivot")]
use polars::frame::PivotColumnNaming;
use polars::io::RowIndex;
use polars::prelude::iceberg_sink_state::IcebergSinkState;
use polars::time::*;
use polars_core::prelude::*;
use polars_core::query_result::QueryResult;
Expand Down Expand Up @@ -883,6 +884,18 @@ impl PyLazyFrame {
.map_err(Into::into)
}

pub fn sink_iceberg(&self, py: Python<'_>, sink_state_obj: Py<PyAny>) -> PyResult<PyLazyFrame> {
let sink_state: IcebergSinkState = sink_state_obj.extract(py)?;
let mut ldf = { self.ldf.read().clone() };

ldf.logical_plan = DslPlan::Sink {
input: Arc::new(ldf.logical_plan),
payload: SinkType::Iceberg(sink_state),
};

Ok(ldf.into())
}

fn filter(&self, predicate: PyExpr) -> Self {
self.ldf.read().clone().filter(predicate.inner).into()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub async fn call_sinked_paths_callback(
path_info_list: std::mem::take(&mut path_info_list.lock()),
};

sinked_paths_callback.call_(args)
sinked_paths_callback.call(args)
})
.await
.unwrap()
Expand Down
Loading
Loading