diff --git a/crates/polars-plan/dsl-schema-hashes.json b/crates/polars-plan/dsl-schema-hashes.json index 72fc9ff54c2b..4a6889c9cabd 100644 --- a/crates/polars-plan/dsl-schema-hashes.json +++ b/crates/polars-plan/dsl-schema-hashes.json @@ -77,9 +77,11 @@ "HivePathProvider": "7f2a8a61152d613b4015ee4fd7907971e4dfce1e4c1afefb8808672237403327", "IcebergColumn": "171ff56c222358389754a7ff774eec6fc958478df2317720c63b4addc8f9a4c5", "IcebergColumnType": "e612983b0dfce78d172af2e4bb4726e3303ede09ea3c1de8ec40e12ee7922dac", + "IcebergCommitMode": "330f974fb44353b3b95c1004864b84d88a03e756120e1a14385f84be8d4b5245", "IcebergIdentityTransformedPartitionFields": "a9ea26367a6a3a97560aa9010f711a211cabfbffb6a318cb834ceccc672d3ae1", "IcebergPathProvider": "20732e7c4d3e6386d7e2a9675973ea4527a4e474a194c9a837a73839acbef715", "IcebergSchema": "2341b76e5aca7780e28fcee6bd7a2650ce7a9df61e043b839dd3e74bd95efb3b", + "IcebergSinkState": "999c6e25060303d50a96d367e9f9084a78a9fc37f40a9abf92ea7a42aade89d9", "IntDataTypeExpr": "cd66dcd9c44cdddd8864c0fe642e5fcef5263f6f142cce906011a0180e0fd161", "IntegerType": "2e73fb811a2830b8b114dfe914512bfa6031325da9ea5513875a6e49b6ab1a58", "InterpolationMethod": "157b72c21c66950baafe8033836c3335571d2f227dd882ba6b9c8d3e2f5928d3", @@ -161,7 +163,8 @@ "SeriesColumn": "b57487b5f7afd368f85a5c22b45547d03ddd4ecd746e73d8d3ba68dc1236b84a", "SetOperation": "88195de86227bd4aaff8cd7e1ba5c696907e036d128f380c0ed610eb6e77299a", "SinkTarget": "0faaddc3196c89bd9dcf872bbc4304471855dff7f9d24107ef279bc06ef7cbb4", - "SinkType": "d0b7209ad6f7b18504f6454514e27e2027dfdf2c429e5f2791aec8d9cb400099", + "SinkType": "59b1af4a031de93b99cd27fc20c4e4f022f0bfb6f04f0e1571cacd3d89e5f440", + "SinkedPathsCallback": "d2b51d62ec90a46a0971315f694d4242b16b8fbbb51f9d660a511259300ee6c3", "Slice": "a77ca4a44c184f1d4b63ee03b67d9cf751eb3597efc02b48be0bbb0d0ed16095", "SortMultipleOptions": "51948d89a3c050f01736eb0a74de70cdfa2b6d0775299b57913ce85f6e4fcc23", "SortOptions": "bb71e924805d71398f85a2fb7fd961bd9a742b2e9fde8f5adf12fdc0e2dc10aa", @@ -185,7 +188,7 @@ "TrigonometricFunction": "9444fa00e47ea519496e1242418c2383101508ddd0dcec6174a6175f4e6d5371", "UnicodeForm": "f539f29f54ef29faede48a9842191bf0c0ca7206e4f7d32ef1a54972b4a0cae5", "UnifiedScanArgs": "2234b970de3c35d0918eb525d41ca3e995ac3343afd7f9c1b03337bda6dff93e", - "UnifiedSinkArgs": "6049272153d058150d38669187386b9fab2e376dff21418948e3c6f257b50cc9", + "UnifiedSinkArgs": "8364f31ac108b8d618f6bae43adfdcd2d548b3ba47cfa9e53c37fccbd0630f11", "UnionArgs": "98eb7fd93d1a3a6d7cb3e5fffd16e3536efb11344e1140a8763b21ee1d16d513", "UniqueId": "4cd0b4f653d64777df264faff1f08e1f1318915656c11642d852f60e9bf17f64", "UniqueKeepStrategy": "76e65109633976c30388deeb78ffe892e92c6730511addcbe1156f9e7e8adfa1", diff --git a/crates/polars-plan/src/client/check.rs b/crates/polars-plan/src/client/check.rs index 1bd0f4a1451b..2f1edb0357ae 100644 --- a/crates/polars-plan/src/client/check.rs +++ b/crates/polars-plan/src/client/check.rs @@ -55,6 +55,7 @@ pub(super) fn assert_cloud_eligible(dsl: &DslPlan, allow_local_scans: bool) -> P // The sink destination is passed around separately, can't check the // eligibility here. }, + SinkType::Iceberg(_) => {}, } }, _ => (), diff --git a/crates/polars-plan/src/dsl/options/iceberg_sink_state.rs b/crates/polars-plan/src/dsl/options/iceberg_sink_state.rs new file mode 100644 index 000000000000..ce357e487c13 --- /dev/null +++ b/crates/polars-plan/src/dsl/options/iceberg_sink_state.rs @@ -0,0 +1,92 @@ +use std::collections::BTreeMap; + +use polars_utils::pl_str::PlSmallStr; +#[cfg(feature = "python")] +use polars_utils::python_function::PythonObject; + +/// `class IcebergSinkState` in Python +#[derive(Clone, PartialEq, Debug, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "python", derive(pyo3::IntoPyObject, pyo3::FromPyObject))] +pub struct IcebergSinkState { + /// Python class module + pub py_catalog_class_module: PlSmallStr, + /// Python class qualname + pub py_catalog_class_qualname: PlSmallStr, + + pub catalog_name: PlSmallStr, + pub catalog_properties: BTreeMap, + + pub table_name: PlSmallStr, + pub mode: IcebergCommitMode, + pub iceberg_storage_properties: BTreeMap, + + pub sink_uuid_str: String, + + #[cfg(feature = "python")] + pub table_: Option, // NoPickleOption[pyiceberg.table.Table] + + #[cfg(feature = "python")] + pub commit_result_df: Option, // NoPickleOption[pl.DataFrame] +} + +#[derive(Copy, Clone, PartialEq, Debug, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))] +pub enum IcebergCommitMode { + Append, + Overwrite, +} + +#[cfg(feature = "python")] +mod _python_impl { + use std::convert::Infallible; + + use pyo3::exceptions::PyValueError; + use pyo3::pybacked::PyBackedStr; + use pyo3::types::PyString; + use pyo3::{Borrowed, Bound, FromPyObject, IntoPyObject, Py, PyAny, PyErr, PyResult, Python}; + + use super::{IcebergCommitMode, IcebergSinkState}; + + impl IcebergSinkState { + pub(crate) fn into_sink_state_obj(self) -> PyResult> { + Python::attach(|py| { + polars_utils::python_convert_registry::get_python_convert_registry() + .py_iceberg_sink_state_class() + .call(py, (), Some(&self.into_pyobject(py)?)) + }) + } + } + + impl<'py> IntoPyObject<'py> for IcebergCommitMode { + type Target = PyString; + type Output = Bound<'py, Self::Target>; + type Error = Infallible; + + fn into_pyobject(self, py: pyo3::Python<'py>) -> Result { + match self { + Self::Append => "append", + Self::Overwrite => "overwrite", + } + .into_pyobject(py) + } + } + + impl<'a, 'py> FromPyObject<'a, 'py> for IcebergCommitMode { + type Error = PyErr; + + fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult { + Ok(match &*ob.extract::()? { + "append" => Self::Append, + "overwrite" => Self::Overwrite, + v => { + return Err(PyValueError::new_err(format!( + "invalid iceberg commit mode: '{v}'" + ))); + }, + }) + } + } +} diff --git a/crates/polars-plan/src/dsl/options/mod.rs b/crates/polars-plan/src/dsl/options/mod.rs index bc854693965e..312d01e1869f 100644 --- a/crates/polars-plan/src/dsl/options/mod.rs +++ b/crates/polars-plan/src/dsl/options/mod.rs @@ -4,6 +4,7 @@ use std::num::NonZeroUsize; use std::sync::Arc; pub mod file_provider; +pub mod iceberg_sink_state; pub mod sink; pub use polars_config::Engine; use polars_core::error::PolarsResult; diff --git a/crates/polars-plan/src/dsl/options/sink.rs b/crates/polars-plan/src/dsl/options/sink.rs index 1779da29dbf4..268cb7b8b0ed 100644 --- a/crates/polars-plan/src/dsl/options/sink.rs +++ b/crates/polars-plan/src/dsl/options/sink.rs @@ -8,6 +8,7 @@ use polars_core::error::PolarsResult; use polars_core::frame::DataFrame; use polars_core::prelude::PlHashSet; use polars_core::schema::Schema; +use polars_error::feature_gated; use polars_io::cloud::CloudOptions; use polars_io::metrics::IOMetrics; use polars_io::utils::file::Writeable; @@ -19,6 +20,7 @@ use polars_utils::pl_str::PlSmallStr; use super::FileWriteFormat; use crate::dsl::file_provider::FileProviderType; +use crate::dsl::iceberg_sink_state::IcebergSinkState; use crate::dsl::{AExpr, Expr, SpecialEq}; use crate::plans::{ExprIR, ToFieldContext}; use crate::prelude::PlanCallback; @@ -227,6 +229,7 @@ pub enum SinkType { Callback(CallbackSinkType), File(FileSinkOptions), Partitioned(PartitionedSinkOptions), + Iceberg(IcebergSinkState), } #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] @@ -465,7 +468,13 @@ pub struct FileSinkOptions { pub unified_sink_args: UnifiedSinkArgs, } -pub type SinkedPathsCallback = PlanCallback; +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))] +#[derive(Clone, Debug, Hash, PartialEq)] +pub enum SinkedPathsCallback { + IcebergCommit(IcebergSinkState), + Callback(PlanCallback), +} #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))] @@ -482,19 +491,48 @@ pub struct SinkedPathInfo { } impl SinkedPathsCallback { - pub fn call_(&self, args: SinkedPathsCallbackArgs) -> PolarsResult<()> { + pub fn call(&self, args: SinkedPathsCallbackArgs) -> PolarsResult<()> { + use PlanCallback as CB; + match self { - Self::Rust(func) => (func)(args), + Self::IcebergCommit(sink_state) => { + feature_gated!("python", { + use pyo3::Python; + + Python::attach(|py| { + use pyo3::intern; + use pyo3::types::PyList; + + let py_paths = PyList::empty(py); + + let SinkedPathsCallbackArgs { path_info_list } = args; + + for SinkedPathInfo { path } in path_info_list { + use pyo3::types::PyListMethods; + + let path: &str = path.as_str(); + + py_paths.append(path)?; + } + + sink_state.clone().into_sink_state_obj()?.call_method1( + py, + intern!(py, "commit"), + (py_paths,), + )?; + + PolarsResult::Ok(()) + }) + }) + }, + Self::Callback(CB::Rust(func)) => (func)(args), #[cfg(feature = "python")] - Self::Python(object) => pyo3::Python::attach(|py| { + Self::Callback(CB::Python(object)) => pyo3::Python::attach(|py| { use pyo3::intern; use pyo3::types::{PyAnyMethods, PyDict, PyList}; let SinkedPathsCallbackArgs { path_info_list } = args; - let convert_registry = - polars_utils::python_convert_registry::get_python_convert_registry(); - let py_paths = PyList::empty(py); for SinkedPathInfo { path } in path_info_list { @@ -508,9 +546,10 @@ impl SinkedPathsCallback { let kwargs = PyDict::new(py); kwargs.set_item(intern!(py, "paths"), py_paths)?; - let args_dataclass = convert_registry - .py_sinked_paths_callback_args_dataclass() - .call(py, (), Some(&kwargs))?; + let args_dataclass = + polars_utils::python_convert_registry::get_python_convert_registry() + .py_sinked_paths_callback_args_dataclass() + .call(py, (), Some(&kwargs))?; object.call1(py, (args_dataclass,))?; diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir/mod.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir/mod.rs index b8091633f64e..6eb47ca9fa7c 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir/mod.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir/mod.rs @@ -5,6 +5,7 @@ use futures::stream::FuturesUnordered; use hive::hive_partitions_from_paths; use polars_core::chunked_array::cast::CastOptions; use polars_core::config::verbose; +use polars_error::feature_gated; use polars_io::ExternalCompression; use polars_io::pl_async::get_runtime; use polars_utils::format_pl_smallstr; @@ -1354,10 +1355,54 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult } }, DslPlan::Sink { input, payload } => { + if let SinkType::Iceberg(state) = payload { + feature_gated!("python", { + use polars_utils::python_convert_registry::get_python_convert_registry; + use pyo3::{Python, intern}; + + use crate::dsl::sink::SinkedPathsCallback; + + let py_sink_state = state.clone().into_sink_state_obj()?; + + let reg = get_python_convert_registry(); + + let py_lf = (reg.to_py.dsl_plan)(input.as_ref())?; + + let out = Python::attach(|py| { + py_sink_state.call_method1( + py, + intern!(py, "_attach_resolved_sink"), + (py_lf,), + ) + })?; + + let mut plan: Box = (reg.from_py.dsl_plan)(out)?.downcast().unwrap(); + + let DslPlan::Sink { + input: _, + payload: + SinkType::Partitioned(PartitionedSinkOptions { + unified_sink_args, .. + }), + } = plan.as_mut() + else { + unreachable!() + }; + + debug_assert!(unified_sink_args.sinked_paths_callback.is_none()); + + unified_sink_args.sinked_paths_callback = + Some(SinkedPathsCallback::IcebergCommit(state)); + + return to_alp_impl(*plan, ctxt); + }) + } + let input = to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(sink)))?; let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena); let payload = match payload { + SinkType::Iceberg(_) => unreachable!(), SinkType::Memory => SinkTypeIR::Memory, SinkType::Callback(f) => SinkTypeIR::Callback(f), SinkType::File(mut options) => { diff --git a/crates/polars-python/src/io/sink_options.rs b/crates/polars-python/src/io/sink_options.rs index c144dfd28633..515ba15b0ebb 100644 --- a/crates/polars-python/src/io/sink_options.rs +++ b/crates/polars-python/src/io/sink_options.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use polars::prelude::sink::SinkedPathsCallback; use polars::prelude::sync_on_close::SyncOnCloseType; use polars::prelude::{CloudScheme, PlanCallback, SpecialEq, UnifiedSinkArgs}; use polars_utils::python_function::PythonObject; @@ -53,8 +54,11 @@ impl PySinkOptions<'_> { maintain_order, sync_on_close, cloud_options: cloud_options.map(Arc::new), - sinked_paths_callback: sinked_paths_callback - .map(|x| PlanCallback::Python(SpecialEq::new(Arc::new(PythonObject(x))))), + sinked_paths_callback: sinked_paths_callback.map(|x| { + SinkedPathsCallback::Callback(PlanCallback::Python(SpecialEq::new(Arc::new( + PythonObject(x), + )))) + }), }; Ok(unified_sink_args) diff --git a/crates/polars-python/src/lazyframe/general.rs b/crates/polars-python/src/lazyframe/general.rs index 3b6fb9bd8337..4b64bfc494f6 100644 --- a/crates/polars-python/src/lazyframe/general.rs +++ b/crates/polars-python/src/lazyframe/general.rs @@ -8,6 +8,7 @@ use parking_lot::Mutex; #[cfg(feature = "pivot")] use polars::frame::PivotColumnNaming; use polars::io::RowIndex; +use polars::prelude::iceberg_sink_state::IcebergSinkState; use polars::time::*; use polars_core::prelude::*; use polars_core::query_result::QueryResult; @@ -883,6 +884,18 @@ impl PyLazyFrame { .map_err(Into::into) } + pub fn sink_iceberg(&self, py: Python<'_>, sink_state_obj: Py) -> PyResult { + let sink_state: IcebergSinkState = sink_state_obj.extract(py)?; + let mut ldf = { self.ldf.read().clone() }; + + ldf.logical_plan = DslPlan::Sink { + input: Arc::new(ldf.logical_plan), + payload: SinkType::Iceberg(sink_state), + }; + + Ok(ldf.into()) + } + fn filter(&self, predicate: PyExpr) -> Self { self.ldf.read().clone().filter(predicate.inner).into() } diff --git a/crates/polars-stream/src/nodes/io_sinks/components/sinked_path_info_list.rs b/crates/polars-stream/src/nodes/io_sinks/components/sinked_path_info_list.rs index c6860eead4e7..cdf24bc44d8b 100644 --- a/crates/polars-stream/src/nodes/io_sinks/components/sinked_path_info_list.rs +++ b/crates/polars-stream/src/nodes/io_sinks/components/sinked_path_info_list.rs @@ -23,7 +23,7 @@ pub async fn call_sinked_paths_callback( path_info_list: std::mem::take(&mut path_info_list.lock()), }; - sinked_paths_callback.call_(args) + sinked_paths_callback.call(args) }) .await .unwrap() diff --git a/crates/polars-utils/src/pl_str.rs b/crates/polars-utils/src/pl_str.rs index 65e9a0d153c2..17c81a0846e3 100644 --- a/crates/polars-utils/src/pl_str.rs +++ b/crates/polars-utils/src/pl_str.rs @@ -314,3 +314,42 @@ pub fn unique_column_name() -> PlSmallStr { let idx = COUNTER.fetch_add(1); format_pl_smallstr!("_POLARS_TMP_{idx}") } + +#[cfg(feature = "python")] +mod _python_impl { + use std::convert::Infallible; + + use pyo3::pybacked::PyBackedStr; + use pyo3::types::PyString; + use pyo3::{Borrowed, Bound, FromPyObject, IntoPyObject, PyAny, PyErr, PyResult}; + + use super::PlSmallStr; + + impl<'py> IntoPyObject<'py> for PlSmallStr { + type Target = PyString; + type Output = Bound<'py, Self::Target>; + type Error = Infallible; + + fn into_pyobject(self, py: pyo3::Python<'py>) -> Result { + self.as_str().into_pyobject(py) + } + } + + impl<'py> IntoPyObject<'py> for &PlSmallStr { + type Target = PyString; + type Output = Bound<'py, Self::Target>; + type Error = Infallible; + + fn into_pyobject(self, py: pyo3::Python<'py>) -> Result { + self.as_str().into_pyobject(py) + } + } + + impl<'a, 'py> FromPyObject<'a, 'py> for PlSmallStr { + type Error = PyErr; + + fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult { + Ok((&*ob.extract::()?).into()) + } + } +} diff --git a/crates/polars-utils/src/python_convert_registry.rs b/crates/polars-utils/src/python_convert_registry.rs index b951647e2a9f..f0468057a69c 100644 --- a/crates/polars-utils/src/python_convert_registry.rs +++ b/crates/polars-utils/src/python_convert_registry.rs @@ -78,6 +78,20 @@ impl PythonConvertRegistry { &CLS } + + pub fn py_iceberg_sink_state_class(&self) -> &'static Py { + static CLS: LazyLock> = LazyLock::new(|| { + Python::attach(|py| { + py.import("polars.io.iceberg._sink") + .unwrap() + .getattr("IcebergSinkState") + .unwrap() + .unbind() + }) + }); + + &CLS + } } static PYTHON_CONVERT_REGISTRY: LazyLock>> = diff --git a/crates/polars-utils/src/python_function.rs b/crates/polars-utils/src/python_function.rs index 4863d74b4c2d..ed6a5f398ffe 100644 --- a/crates/polars-utils/src/python_function.rs +++ b/crates/polars-utils/src/python_function.rs @@ -1,3 +1,4 @@ +use pyo3::BoundObject; use pyo3::prelude::*; #[cfg(feature = "serde")] pub use serde_wrap::{ @@ -26,6 +27,12 @@ impl std::ops::DerefMut for PythonObject { } } +impl std::hash::Hash for PythonObject { + fn hash(&self, state: &mut H) { + usize::hash(&(self.0.as_ptr() as _), state) + } +} + impl Clone for PythonObject { fn clone(&self) -> Self { Python::attach(|py| Self(self.0.clone_ref(py))) @@ -38,6 +45,14 @@ impl From> for PythonObject { } } +impl<'a, 'py> FromPyObject<'a, 'py> for PythonObject { + type Error = PyErr; + + fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult { + Ok(PythonObject(ob.into_bound().unbind())) + } +} + impl<'py> pyo3::conversion::IntoPyObject<'py> for PythonObject { type Target = PyAny; type Output = Bound<'py, Self::Target>; diff --git a/py-polars/src/polars/_plr.pyi b/py-polars/src/polars/_plr.pyi index dd1a470a9207..e053f94ea9bc 100644 --- a/py-polars/src/polars/_plr.pyi +++ b/py-polars/src/polars/_plr.pyi @@ -4,6 +4,7 @@ from typing import Any, Literal, TypeAlias, overload from numpy.typing import NDArray from polars._typing import ArrowSchemaExportable +from polars.io.iceberg._sink import IcebergSinkState from polars.io.scan_options._options import ScanOptions # This file mirrors all the definitions made in the polars-python Rust API. @@ -1016,6 +1017,7 @@ class PyLazyFrame: maintain_order: bool, chunk_size: int | None, ) -> PyLazyFrame: ... + def sink_iceberg(self, sink_state_obj: IcebergSinkState) -> PyLazyFrame: ... def filter(self, predicate: PyExpr) -> PyLazyFrame: ... def remove(self, predicate: PyExpr) -> PyLazyFrame: ... def select(self, exprs: Sequence[PyExpr]) -> PyLazyFrame: ... diff --git a/py-polars/src/polars/io/iceberg/_sink.py b/py-polars/src/polars/io/iceberg/_sink.py index 9becb599ebb6..614cbba461b3 100644 --- a/py-polars/src/polars/io/iceberg/_sink.py +++ b/py-polars/src/polars/io/iceberg/_sink.py @@ -1,19 +1,18 @@ from __future__ import annotations import contextlib +import importlib import importlib.util +import sys from dataclasses import dataclass from time import perf_counter from typing import TYPE_CHECKING, ClassVar, Literal from polars._utils.logging import eprint -from polars.io._expand_paths import _expand_paths +from polars._utils.wrap import wrap_ldf from polars.io.cloud._utils import NoPickleOption from polars.io.iceberg._dataset import ( IcebergCatalogConfig, - IcebergCatalogTableDescriptor, - IcebergTableSerializer, - IcebergTableWrap, _convert_iceberg_to_object_store_storage_options, ) from polars.io.iceberg._utils import _normalize_windows_iceberg_file_uri @@ -27,109 +26,106 @@ import pyiceberg.table import polars as pl + from polars._plr import PyLazyFrame from polars._typing import StorageOptionsDict - from polars.io.iceberg._dataset import SerializedTableState -def _ensure_table_has_catalog(table: pyiceberg.table.Table) -> None: - from pyiceberg.catalog.noop import NoopCatalog - - if isinstance(table.catalog, NoopCatalog): - msg = ( - "cannot sink to static Iceberg table: " - f"{type(table) = }, {type(table.catalog) = }" - ) - raise TypeError(msg) - - -class IcebergSinkTableSerializer(IcebergTableSerializer): - @staticmethod - def serialize_table(table: pyiceberg.table.Table) -> SerializedTableState: - _ensure_table_has_catalog(table) - - return IcebergCatalogTableDescriptor( - table_identifier=table.name(), - catalog_config=IcebergCatalogConfig.from_catalog(table.catalog), - ) +@dataclass(kw_only=True) +class IcebergSinkState: + py_catalog_class_module: str + py_catalog_class_qualname: str + catalog_name: str + catalog_properties: dict[str, str] -# Passed to `pipe_with_schema` to defer sink resolution logic until IR resolution. -@dataclass(kw_only=True) -class AttachSink: - sink_state: IcebergSinkState + table_name: str + mode: Literal["append", "overwrite"] + iceberg_storage_properties: StorageOptionsDict - def __call__( - self, - lf: pl.LazyFrame, - schema: pl.Schema, # noqa: ARG002 - ) -> pl.LazyFrame: - return self.sink_state._attach_sink_impl(lf) + sink_uuid_str: str + table_: NoPickleOption[pyiceberg.table.Table] + commit_result_df: NoPickleOption[pl.DataFrame] -class IcebergSinkState: - def __init__( - self, + @staticmethod + def new( target: str | pyiceberg.table.Table, *, - catalog: pyiceberg.catalog.Catalog | IcebergCatalogConfig | None = None, mode: Literal["append", "overwrite"] = "append", + catalog: pyiceberg.catalog.Catalog | IcebergCatalogConfig | None = None, storage_options: StorageOptionsDict | None = None, - ) -> None: - table: pyiceberg.table.Table | None = None - - if importlib.util.find_spec("pyiceberg.table") is not None: - from pyiceberg.table import Table - - if isinstance(target, Table): - table = target - - table_descriptor: IcebergCatalogTableDescriptor | None = None - - if isinstance(target, str): - catalog_config = ( + ) -> IcebergSinkState: + catalog_config = ( + ( IcebergCatalogConfig._from_api_parameter_or_environment_default( catalog, fn_name="sink_iceberg", ) ) - - table_descriptor = IcebergCatalogTableDescriptor( - table_identifier=target, - catalog_config=catalog_config, + if isinstance(target, str) + else ( + IcebergCatalogConfig( + class_=type(target.catalog), + name=target.catalog.name, + properties=target.catalog.properties, + ) ) + ) - if table is not None: - _ensure_table_has_catalog(table) + from pyiceberg.catalog.noop import NoopCatalog - self.table = IcebergTableWrap( - table_=NoPickleOption(table), - table_descriptor_=table_descriptor, - serializer=IcebergSinkTableSerializer(), - iceberg_storage_properties=storage_options, + if catalog_config.class_ is NoopCatalog: + msg = ( + "cannot sink to static Iceberg table: " + f"{type(target) = }, {getattr(target, 'catalog', None) = }" + ) + raise TypeError(msg) + + return IcebergSinkState( + py_catalog_class_module=catalog_config.class_.__module__, + py_catalog_class_qualname=catalog_config.class_.__qualname__, + catalog_name=catalog_config.name, + catalog_properties=catalog_config.properties, + table_name=target if isinstance(target, str) else ".".join(target.name()), + mode=mode, + iceberg_storage_properties=storage_options or {}, + sink_uuid_str=gen_uuid_v7().hex(), + table_=NoPickleOption(target if not isinstance(target, str) else None), + commit_result_df=NoPickleOption(), ) - self.mode = mode - self.sink_uuid_str = gen_uuid_v7().hex() - self._output_base_path: str | None = None - def _get_converted_storage_options(self) -> dict[str, str] | None: - return ( - _convert_iceberg_to_object_store_storage_options( - self.table.iceberg_storage_properties + def table(self) -> pyiceberg.table.Table: + if self.table_.get() is None: + module = importlib.import_module(self.py_catalog_class_module) + qualname_split = self.py_catalog_class_qualname.split(".") + + catalog_class: type[pyiceberg.catalog.Catalog] = getattr( + module, qualname_split[0] ) - if self.table.iceberg_storage_properties is not None - else None + + for part in qualname_split[1:]: + catalog_class = getattr(catalog_class, part) + + catalog = catalog_class(self.catalog_name, **self.catalog_properties) + self.table_.set(catalog.load_table(self.table_name)) + + return self.table_.get() # type: ignore[return-value] + + def _get_converted_storage_options(self) -> dict[str, str]: + return _convert_iceberg_to_object_store_storage_options( + self.iceberg_storage_properties ) def attach_sink(self, lf: pl.LazyFrame) -> pl.LazyFrame: - return lf.pipe_with_schema(AttachSink(sink_state=self)) + return wrap_ldf(lf._ldf.sink_iceberg(self)) - def _attach_sink_impl(self, lf: pl.LazyFrame) -> pl.LazyFrame: + def _attach_resolved_sink(self, plf: PyLazyFrame) -> PyLazyFrame: from pyiceberg.table import TableProperties from pyiceberg.utils.properties import property_as_bool, property_as_int import polars as pl - table = self.table.get() + table = self.table() table_metadata = table.metadata table_properties = table_metadata.properties @@ -156,7 +152,9 @@ def _attach_sink_impl(self, lf: pl.LazyFrame) -> pl.LazyFrame: msg = f"sink to Iceberg table with '{TableProperties.OBJECT_STORE_ENABLED}'" raise NotImplementedError(msg) - arrow_schema = self.table.arrow_schema() + from pyiceberg.io.pyarrow import schema_to_pyarrow + + arrow_schema = schema_to_pyarrow(table.schema()) approximate_bytes_per_file = 2 * 1024 * 1024 * 1024 @@ -169,18 +167,22 @@ def _attach_sink_impl(self, lf: pl.LazyFrame) -> pl.LazyFrame: estimated_compression_ratio * v, (1 << 64) - 1 ) - return lf.sink_parquet( - pl.PartitionBy( - _normalize_windows_iceberg_file_uri(self.output_base_path()), - file_path_provider=PlIcebergPathProviderConfig(), - approximate_bytes_per_file=approximate_bytes_per_file, - ), - arrow_schema=arrow_schema, - storage_options=self._get_converted_storage_options(), - lazy=True, + return ( + wrap_ldf(plf) + .sink_parquet( + pl.PartitionBy( + _normalize_windows_iceberg_file_uri(self.output_base_path()), + file_path_provider=PlIcebergPathProviderConfig(), + approximate_bytes_per_file=approximate_bytes_per_file, + ), + arrow_schema=arrow_schema, + storage_options=self._get_converted_storage_options(), + lazy=True, + ) + ._ldf ) - def commit(self) -> pl.DataFrame: + def commit(self, data_file_paths: list[str]) -> pl.DataFrame: import polars as pl import polars._utils.logging @@ -190,39 +192,16 @@ def commit(self) -> pl.DataFrame: if verbose: eprint(f"IcebergSinkState[commit]: mode: '{self.mode}'") - table = self.table.get() - - if verbose: - eprint("IcebergSinkState[commit]: begin path expansion") - - start_instant = perf_counter() - - output_base_path = self.output_base_path() - - data_file_paths_q = _expand_paths( - _normalize_windows_iceberg_file_uri(output_base_path), - storage_options=self._get_converted_storage_options(), - ) - - if output_base_path.startswith("file://") and not output_base_path.startswith( - "file:///" - ): - data_file_paths_q = data_file_paths_q.with_columns( - pl.col("path").str.replace(r"^file:///", "file://") - ) - - data_file_paths = data_file_paths_q.collect().to_series().to_list() - - if verbose: - elapsed = perf_counter() - start_instant - n_files = len(data_file_paths) - eprint( - f"IcebergSinkState[commit]: finish path expansion ({elapsed:.3f}s): " - f"{n_files = }" - ) + table = self.table() original_metadata_location = table.metadata_location + if sys.platform == "win32": + data_file_paths = [ + (f"file://{p[8:]}" if p.startswith("file:///") else p) + for p in data_file_paths + ] + with table.transaction() as tx: if self.mode == "overwrite": from pyiceberg.expressions import AlwaysTrue @@ -257,10 +236,12 @@ def commit(self) -> pl.DataFrame: assert new_metadata_location != original_metadata_location - out_df = pl.DataFrame( - {"metadata_path": new_metadata_location}, - schema={"metadata_path": pl.String}, - height=1, + self.commit_result_df.set( + pl.DataFrame( + {"metadata_path": new_metadata_location}, + schema={"metadata_path": pl.String}, + height=1, + ) ) if verbose: @@ -270,26 +251,22 @@ def commit(self) -> pl.DataFrame: f"IcebergSinkState[commit]: finished, total elapsed time: {total_elapsed:.3f}s" ) - return out_df + return self.commit_result_df.get() # type: ignore[return-value] def output_base_path(self) -> str: - if self._output_base_path is None: - from pyiceberg.table import TableProperties - - table = self.table.get() - table_metadata = table.metadata - table_properties = table_metadata.properties + from pyiceberg.table import TableProperties - output_base_path = ( - path.rstrip("/") - if (path := table_properties.get(TableProperties.WRITE_DATA_PATH)) - else f"{table_metadata.location.rstrip('/')}/data" - ) + table = self.table() + table_metadata = table.metadata + table_properties = table_metadata.properties - output_base_path = f"{output_base_path}/{self.sink_uuid_str}/" - self._output_base_path = output_base_path + output_base_path = ( + path.rstrip("/") + if (path := table_properties.get(TableProperties.WRITE_DATA_PATH)) + else f"{table_metadata.location.rstrip('/')}/data" + ) - return self._output_base_path + return f"{output_base_path}/{self.sink_uuid_str}/" class PlIcebergPathProviderConfig(_InternalPlPathProviderConfig): diff --git a/py-polars/src/polars/lazyframe/frame.py b/py-polars/src/polars/lazyframe/frame.py index a1cba3db60cf..79723b5eb296 100644 --- a/py-polars/src/polars/lazyframe/frame.py +++ b/py-polars/src/polars/lazyframe/frame.py @@ -3287,16 +3287,16 @@ def sink_iceberg( """ from polars.io.iceberg._sink import IcebergSinkState - sink_state = IcebergSinkState( - target=target, - catalog=catalog, + sink_state = IcebergSinkState.new( + target, mode=mode, + catalog=catalog, storage_options=storage_options, ) sink_state.attach_sink(self).collect(engine="streaming") - return sink_state.commit() + return sink_state.commit_result_df.get() # type: ignore[return-value] @overload def sink_ipc( diff --git a/py-polars/tests/unit/io/test_iceberg.py b/py-polars/tests/unit/io/test_iceberg.py index b704ccec045b..b79768f163eb 100644 --- a/py-polars/tests/unit/io/test_iceberg.py +++ b/py-polars/tests/unit/io/test_iceberg.py @@ -492,7 +492,7 @@ def test_sink_iceberg_raises_on_static_table(tmp_path: Path) -> None: ) with err_cx: - IcebergSinkState(StaticTable.from_metadata(tbl.metadata_location)) + IcebergSinkState.new(StaticTable.from_metadata(tbl.metadata_location)) with err_cx: pl.LazyFrame({"a": 1}).sink_iceberg( @@ -514,25 +514,29 @@ def test_sink_iceberg_pickle(tmp_path: Path) -> None: schema=IcebergSchema(NestedField(1, "a", LongType())), ) - sink_state = IcebergSinkState(tbl) + sink_state = IcebergSinkState.new(tbl) sink_q = sink_state.attach_sink(pl.LazyFrame({"a": 1})) sink_q.collect() - new_md_path = sink_state.commit().item(0, "metadata_path") + rdf = sink_state.commit_result_df.get() + assert rdf is not None + new_md_path = rdf.item(0, "metadata_path") + + tbl = sink_state.table() assert_frame_equal(pl.scan_iceberg(tbl).collect(), pl.DataFrame({"a": 1})) assert new_md_path == tbl.metadata_location - sink_state = IcebergSinkState(tbl) + sink_state = IcebergSinkState.new(tbl) sink_q = sink_state.attach_sink(pl.LazyFrame({"a": 2})) sink_q = pickle.loads(pickle.dumps(sink_q)) sink_q.collect() - new_md_path = ( - pickle.loads(pickle.dumps(sink_state)).commit().item(0, "metadata_path") - ) - assert new_md_path != tbl.metadata_location + new_tbl = catalog.load_table(tbl.name()) + new_md_path = new_tbl.metadata_location - tbl = catalog.load_table(tbl.name()) + assert new_tbl.metadata_location != tbl.metadata_location + + tbl = new_tbl assert_frame_equal( pl.scan_iceberg(tbl).collect(), @@ -543,39 +547,10 @@ def test_sink_iceberg_pickle(tmp_path: Path) -> None: @pytest.mark.write_disk -def test_sink_iceberg_simulate_from_multiple_workers(tmp_path: Path) -> None: - tbl, catalog = new_iceberg_table( - tmp_path, - schema=IcebergSchema(NestedField(1, "a", LongType())), - ) - - sink_state = IcebergSinkState(tbl) - sink_q = sink_state.attach_sink(pl.LazyFrame({"a": 1})) - pl.collect_all( - [ - pickle.loads(pickle.dumps(sink_q)), - pickle.loads(pickle.dumps(sink_q)), - pickle.loads(pickle.dumps(sink_q)), - sink_state.attach_sink(pl.LazyFrame({"a": 2})), - sink_state.attach_sink(pl.LazyFrame({"a": 3})), - ] - ) - - new_md_path = sink_state.commit().item(0, "metadata_path") - - tbl = catalog.load_table(tbl.name()) - - assert tbl.metadata_location == new_md_path - - assert_frame_equal( - pl.scan_iceberg(tbl).collect().sort("a"), - pl.DataFrame({"a": [1, 1, 1, 2, 3]}), - ) - - -@pytest.mark.write_disk -def test_sink_iceberg_attach_sink_deferred_to_ir_resolution(tmp_path: Path) -> None: - sink_state = IcebergSinkState( +def test_sink_iceberg__attach_resolved_sink_deferred_to_ir_resolution( + tmp_path: Path, +) -> None: + sink_state = IcebergSinkState.new( "x.x", catalog=SqlCatalog( "default",