-
Notifications
You must be signed in to change notification settings - Fork 5.5k
feat(native): Support basic writing to iceberg table #26338
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(native): Support basic writing to iceberg table #26338
Conversation
Reviewer's GuideThis PR integrates basic Iceberg table write support into the Prestissimo native engine by enhancing the connector to translate Presto Iceberg protocol handles into Velox insert handles, consolidating file compression mapping in the shared connector, exposing necessary constants for testing, and adding a full suite of Java integration tests to validate unpartitioned writes, metadata queries, selects, TPCH-driven inserts, and create‐table scenarios. Sequence diagram for Iceberg table insertion via Prestissimo native connectorsequenceDiagram
participant Presto as Presto Engine
participant Connector as IcebergPrestoToVeloxConnector
participant Velox as Velox Engine
Presto->>Connector: Request to insert into Iceberg table
Connector->>Connector: Translate Presto protocol handle to Velox insert handle
Connector->>Velox: Pass VeloxInsertTableHandle for write
Velox->>Velox: Write data to Iceberg table (basic, unpartitioned)
Velox-->>Connector: Return write result
Connector-->>Presto: Return insert status
Class diagram for updated IcebergPrestoToVeloxConnector and related handle translationclassDiagram
class IcebergPrestoToVeloxConnector {
+createConnectorProtocol()
+toVeloxInsertTableHandle(createHandle, typeParser)
+toVeloxInsertTableHandle(insertHandle, typeParser)
-toHiveColumns(inputColumns, typeParser)
}
class PrestoToVeloxConnector {
<<abstract>>
+~PrestoToVeloxConnector()
}
class IcebergOutputTableHandle {
+inputColumns
+outputPath
+fileFormat
+compressionCodec
}
class IcebergInsertTableHandle {
+inputColumns
+outputPath
+fileFormat
+compressionCodec
}
class HiveColumnHandle {}
class LocationHandle {
+TableType
}
class IcebergInsertTableHandle_velox {
+inputColumns
+locationHandle
+fileFormat
+compressionKind
}
IcebergPrestoToVeloxConnector --|> PrestoToVeloxConnector
IcebergPrestoToVeloxConnector --> IcebergOutputTableHandle
IcebergPrestoToVeloxConnector --> IcebergInsertTableHandle
IcebergPrestoToVeloxConnector --> HiveColumnHandle
IcebergPrestoToVeloxConnector --> LocationHandle
IcebergPrestoToVeloxConnector --> IcebergInsertTableHandle_velox
Class diagram for consolidated file compression mapping functionclassDiagram
class PrestoToVeloxConnector {
+toFileCompressionKind(hiveCompressionCodec)
}
class HiveCompressionCodec {
+SNAPPY
+GZIP
+LZ4
+ZSTD
+NONE
}
class CompressionKind {
+CompressionKind_SNAPPY
+CompressionKind_GZIP
+CompressionKind_LZ4
+CompressionKind_ZSTD
+CompressionKind_NONE
}
PrestoToVeloxConnector --> HiveCompressionCodec
PrestoToVeloxConnector --> CompressionKind
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- The two toVeloxInsertTableHandle overloads share almost identical logic—consider extracting the common handle‐creation steps into a helper function to reduce duplication and ensure consistency.
- The path suffix "{}/data" is hardcoded in multiple places—pull that string into a named constant or small helper to avoid magic literals and make future path changes easier.
- Since this PR only adds basic insertion (without partition transforms, metrics, or sort order), add a TODO or issue reference in the code to track implementing those missing Iceberg features later.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The two toVeloxInsertTableHandle overloads share almost identical logic—consider extracting the common handle‐creation steps into a helper function to reduce duplication and ensure consistency.
- The path suffix "{}/data" is hardcoded in multiple places—pull that string into a named constant or small helper to avoid magic literals and make future path changes easier.
- Since this PR only adds basic insertion (without partition transforms, metrics, or sort order), add a TODO or issue reference in the code to track implementing those missing Iceberg features later.
## Individual Comments
### Comment 1
<location> `presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/TestUnpartitionedWrite.java:526` </location>
<code_context>
+ }
+
+ @Test
+ public void testInsertWithNumericEdgeCases()
+ {
+ String tableName = "test_numeric_edges";
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding tests for decimal edge cases (max/min precision, scale, negative, zero, null, overflow).
Tests for decimal columns should include cases for max/min precision and scale, negative values, zero, nulls, and potential overflow/underflow to ensure robust coverage.
Suggested implementation:
```java
assertUpdate(String.format("CREATE TABLE %s (" +
"id INTEGER, " +
"int_val INTEGER, " +
"bigint_val BIGINT, " +
"double_val DOUBLE, " +
"real_val REAL, " +
"decimal_val DECIMAL(38, 18)" +
") WITH (format = 'PARQUET')", tableName));
```
```java
// Insert max precision/scale
assertUpdate(String.format("INSERT INTO %s VALUES (1, 123, 123456789012345, 123.456, 123.456, 99999999999999999999999999999999999999.999999999999999999)", tableName));
// Insert min precision/scale
assertUpdate(String.format("INSERT INTO %s VALUES (2, 0, 0, 0.0, 0.0, 0.000000000000000001)", tableName));
// Insert negative value
assertUpdate(String.format("INSERT INTO %s VALUES (3, -1, -100, -1.23, -1.23, -12345.678901234567890123)", tableName));
// Insert zero
assertUpdate(String.format("INSERT INTO %s VALUES (4, 0, 0, 0.0, 0.0, 0.0)", tableName));
// Insert null
assertUpdate(String.format("INSERT INTO %s VALUES (5, null, null, null, null, null)", tableName));
// Insert value that would overflow DECIMAL(38, 18)
try {
assertUpdate(String.format("INSERT INTO %s VALUES (6, 0, 0, 0.0, 0.0, 100000000000000000000000000000000000000.000000000000000000)", tableName));
fail("Expected overflow exception for decimal_val");
}
catch (Exception e) {
// Expected: overflow
}
// Verify inserted values
assertQuery(String.format("SELECT decimal_val FROM %s WHERE id = 1", tableName),
"VALUES (CAST('99999999999999999999999999999999999999.999999999999999999' AS DECIMAL(38,18)))");
assertQuery(String.format("SELECT decimal_val FROM %s WHERE id = 2", tableName),
"VALUES (CAST('0.000000000000000001' AS DECIMAL(38,18)))");
assertQuery(String.format("SELECT decimal_val FROM %s WHERE id = 3", tableName),
"VALUES (CAST('-12345.678901234567890123' AS DECIMAL(38,18)))");
assertQuery(String.format("SELECT decimal_val FROM %s WHERE id = 4", tableName),
"VALUES (CAST('0.0' AS DECIMAL(38,18)))");
assertQuery(String.format("SELECT decimal_val FROM %s WHERE id = 5", tableName),
"VALUES (CAST(NULL AS DECIMAL(38,18)))");
```
</issue_to_address>
### Comment 2
<location> `presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/TestUnpartitionedWrite.java:712` </location>
<code_context>
+ }
+
+ @Test
+ public void testMaximumLengthVarchar()
+ {
+ String tableName = "long_varchar";
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test for inserting and reading empty VARCHAR and VARBINARY values.
Empty string and VARBINARY cases are not currently tested and should be included to ensure correct handling of these edge cases.
</issue_to_address>
### Comment 3
<location> `presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/TestUnpartitionedWrite.java:272` </location>
<code_context>
+ }
+
+ @Test
+ public void testInsertIntoSelect()
+ {
+ String sourceTable = "source";
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test for mismatched column types in INSERT INTO ... SELECT.
Please add a test that checks for errors when the source and target tables have mismatched column types or counts.
Suggested implementation:
```java
@Test
public void testInsertIntoSelect()
{
String sourceTable = "source";
String targetTable = "target";
try {
assertUpdate(String.format("CREATE TABLE %s (" +
"id INTEGER, " +
"name VARCHAR, " +
"value DOUBLE" +
") WITH (format = 'PARQUET')", sourceTable));
assertUpdate(String.format("INSERT INTO %s VALUES " +
```
```java
}
finally {
assertUpdate(String.format("DROP TABLE IF EXISTS %s", tableName));
}
}
@Test
public void testInsertIntoSelectWithMismatchedColumns()
{
String sourceTable = "source_mismatch";
String targetTable = "target_mismatch";
try {
// Create source table with two columns
assertUpdate(String.format("CREATE TABLE %s (" +
"id INTEGER, " +
"name VARCHAR" +
") WITH (format = 'PARQUET')", sourceTable));
// Create target table with three columns, last column type mismatched
assertUpdate(String.format("CREATE TABLE %s (" +
"id INTEGER, " +
"name VARCHAR, " +
"value DOUBLE" +
") WITH (format = 'PARQUET')", targetTable));
// Insert some data into source
assertUpdate(String.format("INSERT INTO %s VALUES (1, 'first'), (2, 'second')", sourceTable));
// Try to insert from source to target, expect failure due to column count mismatch
assertQueryFails(
String.format("INSERT INTO %s SELECT * FROM %s", targetTable, sourceTable),
".*Number of columns.*does not match.*");
// Now create another source table with three columns but mismatched types
String sourceTableTypeMismatch = "source_type_mismatch";
assertUpdate(String.format("CREATE TABLE %s (" +
"id VARCHAR, " + // should be INTEGER
"name VARCHAR, " +
"value DOUBLE" +
") WITH (format = 'PARQUET')", sourceTableTypeMismatch));
assertUpdate(String.format("INSERT INTO %s VALUES ('one', 'first', 1.0)", sourceTableTypeMismatch));
// Try to insert from source with type mismatch to target, expect failure
assertQueryFails(
String.format("INSERT INTO %s SELECT * FROM %s", targetTable, sourceTableTypeMismatch),
".*column.*type.*mismatch.*");
}
finally {
assertUpdate(String.format("DROP TABLE IF EXISTS %s", sourceTable));
assertUpdate(String.format("DROP TABLE IF EXISTS %s", targetTable));
assertUpdate(String.format("DROP TABLE IF EXISTS source_type_mismatch"));
}
}
```
</issue_to_address>
### Comment 4
<location> `presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/TestUnpartitionedWrite.java:496` </location>
<code_context>
+ }
+
+ @Test
+ public void testInsertWithSpecialCharacters()
+ {
+ String tableName = "special_chars";
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test for escaping and reading back special characters in column names.
Testing columns with special characters in their names will help verify correct handling in both DDL and DML operations.
</issue_to_address>
### Comment 5
<location> `presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/TestUnpartitionedWrite.java:641` </location>
<code_context>
+ }
+
+ @Test
+ public void testDateTimeEdgeCases()
+ {
+ String tableName = "datetime_edges";
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test for inserting and reading NULL date/timestamp values.
Please include a test case for NULL date/timestamp values to verify correct handling.
Suggested implementation:
```java
assertUpdate(String.format("INSERT INTO %s VALUES (1, DATE '1970-01-01', TIMESTAMP '1970-01-01 00:00:00')", tableName), 1);
assertUpdate(String.format("INSERT INTO %s VALUES (2, DATE '2024-02-29', TIMESTAMP '2024-02-29 23:59:59')", tableName), 1);
// Insert a row with NULL date and timestamp values
assertUpdate(String.format("INSERT INTO %s VALUES (3, NULL, NULL)", tableName), 1);
```
```java
// Verify the inserted rows, including the NULLs
assertQuery(
String.format("SELECT id, date_val, timestamp_val FROM %s ORDER BY id", tableName),
"VALUES " +
"(1, DATE '1970-01-01', TIMESTAMP '1970-01-01 00:00:00'), " +
"(2, DATE '2024-02-29', TIMESTAMP '2024-02-29 23:59:59'), " +
"(3, NULL, NULL)"
);
}
```
</issue_to_address>
### Comment 6
<location> `presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/TestCreateTable.java:95` </location>
<code_context>
+ }
+
+ @Test
+ public void testComplexTypes()
+ {
+ String tableName = "complex_types";
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test for deeply nested complex types (e.g., array of map of row).
Please add a test case for a deeply nested type like ARRAY(MAP(VARCHAR, ROW(...))) to verify correct handling of complex nesting.
Suggested implementation:
```java
assertUpdate(String.format("CREATE TABLE %s (" +
"id INTEGER, " +
"int_array ARRAY(INTEGER), " +
"string_map MAP(VARCHAR, INTEGER), " +
"person ROW(name VARCHAR, age INTEGER), " +
"deep_nested ARRAY(MAP(VARCHAR, ROW(address VARCHAR, zip INTEGER)))" +
") WITH (format = 'PARQUET')", tableName));
```
```java
assertQuery(String.format("SELECT COUNT(*) FROM %s", tableName), "VALUES (BIGINT '0')");
// Insert a row with deeply nested complex types
assertUpdate(String.format(
"INSERT INTO %s (id, int_array, string_map, person, deep_nested) VALUES " +
"(1, ARRAY[1, 2, 3], MAP(ARRAY['a', 'b'], ARRAY[10, 20]), ROW('Alice', 30), " +
"ARRAY[MAP(ARRAY['home'], ARRAY[ROW('123 Main St', 12345)]), MAP(ARRAY['work'], ARRAY[ROW('456 Office Rd', 67890)])])",
tableName), 1);
// Verify the deeply nested value
assertQuery(
String.format("SELECT deep_nested FROM %s WHERE id = 1", tableName),
"VALUES (" +
"ARRAY[" +
"MAP(ARRAY['home'], ARRAY[ROW('123 Main St', 12345)])," +
"MAP(ARRAY['work'], ARRAY[ROW('456 Office Rd', 67890)])" +
"])"
);
```
</issue_to_address>
### Comment 7
<location> `presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/TestSelect.java:155` </location>
<code_context>
+ }
+
+ @Test
+ public void testSelectWithNullValues()
+ {
+ String tableName = "select_nulls";
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test for IS NOT DISTINCT FROM and COALESCE with NULLs.
Include test cases using IS NOT DISTINCT FROM and COALESCE to ensure proper handling of NULLs in Iceberg tables.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
} | ||
|
||
@Test | ||
public void testMaximumLengthVarchar() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Add a test for inserting and reading empty VARCHAR and VARBINARY values.
Empty string and VARBINARY cases are not currently tested and should be included to ensure correct handling of these edge cases.
} | ||
|
||
@Test | ||
public void testDateTimeEdgeCases() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Add a test for inserting and reading NULL date/timestamp values.
Please include a test case for NULL date/timestamp values to verify correct handling.
Suggested implementation:
assertUpdate(String.format("INSERT INTO %s VALUES (1, DATE '1970-01-01', TIMESTAMP '1970-01-01 00:00:00')", tableName), 1);
assertUpdate(String.format("INSERT INTO %s VALUES (2, DATE '2024-02-29', TIMESTAMP '2024-02-29 23:59:59')", tableName), 1);
// Insert a row with NULL date and timestamp values
assertUpdate(String.format("INSERT INTO %s VALUES (3, NULL, NULL)", tableName), 1);
// Verify the inserted rows, including the NULLs
assertQuery(
String.format("SELECT id, date_val, timestamp_val FROM %s ORDER BY id", tableName),
"VALUES " +
"(1, DATE '1970-01-01', TIMESTAMP '1970-01-01 00:00:00'), " +
"(2, DATE '2024-02-29', TIMESTAMP '2024-02-29 23:59:59'), " +
"(3, NULL, NULL)"
);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@PingLiuPing Mostly look good. The first commit needs to come with tests. Would it be possible to move the TestUnpartitionedWrite and TestInsertFromTpch from the second commit to the first commit?
} | ||
} | ||
|
||
velox::common::CompressionKind toFileCompressionKind( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this function to behind toHiveTableHandle() to keep it in the same order of the .h file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
public class TestInsertFromTpch | ||
extends AbstractTestQueryFramework | ||
{ | ||
private static final String TPCH_SCHEMA = "iceberg.tpch"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add an empty line below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
") WITH (format = 'PARQUET')", targetTable)); | ||
|
||
long rowCount = (Long) computeActual(String.format("SELECT COUNT(*) FROM %s", sourceTable)).getOnlyValue(); | ||
assertUpdate(String.format("INSERT INTO %s SELECT * FROM %s", targetTable, sourceTable), rowCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if we can do multiple rounds of insertion tests on the nation or region tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good idea.
") WITH (format = 'PARQUET')", targetTable)); | ||
|
||
long rowCount = (Long) computeActual(String.format("SELECT COUNT(*) FROM %s WHERE acctbal > 0", sourceTable)).getOnlyValue(); | ||
assertUpdate(String.format("INSERT INTO %s " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a test case where filter is evaluated as false and 0 rows being inserted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I missed this edge case.
f567fc4
to
59722e8
Compare
@yingsu00 Thanks for the review comments. I have fixed them and re-arranged the commit. For the macos build error, that is an issue in prestissimo macos build script, I have submitted a PR to fix it. And now it should pass the CI. I have rebased the code. |
59722e8
to
61dca67
Compare
61dca67
to
b7fcda3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@PingLiuPing Sorry for few late comments. But you can add in follow up PRs.
velox::connector::hive::iceberg::IcebergInsertTableHandle>( | ||
inputColumns, | ||
std::make_shared<velox::connector::hive::LocationHandle>( | ||
fmt::format("{}/data", icebergInsertTableHandle->outputPath), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fmt::format("{}/data", icebergInsertTableHandle->outputPath) seems repeated at multiple places in the code. Abstract an inline function for this and reuse.
const VeloxExprConverter& exprConverter, | ||
const TypeParser& typeParser); | ||
|
||
velox::common::CompressionKind toFileCompressionKind( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you are separating a HiveToPrestoVeloxConnector file then this can be moved there as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@PingLiuPing : We have been adding native tests to presto-native-tests modules so that presto-native-execution remains light-weight and has only basic tests that Meta and us care about.
Would you be able to add some tests into presto-native-tests, and also enhance the framework so that we can use IcebergQueryRunner it in instead of always Hive, along with side-car as that is more realistic setup. https://github.com/prestodb/presto/tree/master/presto-native-tests/src/test/java/com/facebook/presto/nativetests
Description
Velox PR facebookincubator/velox#14723
This commit adds support for basic Iceberg table insertion
in Prestissimo (native execution) and provides comprehensive test coverage.
All tests use the native query runner to verify Prestissimo's Iceberg
functionality
Basic insertion does not include partition transform, metrics, sort order.
Motivation and Context
Impact
Test Plan
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.