diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java index 8ee552c98930..844b44f91b79 100644 --- a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java @@ -26,6 +26,7 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.procedure.Procedure; import io.trino.spi.session.PropertyMetadata; import io.trino.spi.transaction.IsolationLevel; @@ -51,6 +52,7 @@ public class LakehouseConnector private final LakehouseSessionProperties sessionProperties; private final LakehouseTableProperties tableProperties; private final IcebergMaterializedViewProperties materializedViewProperties; + private final Set procedures; @Inject public LakehouseConnector( @@ -62,7 +64,8 @@ public LakehouseConnector( LakehouseNodePartitioningProvider nodePartitioningProvider, LakehouseSessionProperties sessionProperties, LakehouseTableProperties tableProperties, - IcebergMaterializedViewProperties materializedViewProperties) + IcebergMaterializedViewProperties materializedViewProperties, + Set procedures) { this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null"); this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); @@ -73,6 +76,7 @@ public LakehouseConnector( this.sessionProperties = requireNonNull(sessionProperties, "sessionProperties is null"); this.tableProperties = requireNonNull(tableProperties, "tableProperties is null"); this.materializedViewProperties = requireNonNull(materializedViewProperties, "materializedViewProperties is null"); + this.procedures = requireNonNull(procedures, "procedures is null"); } @Override @@ -148,6 +152,12 @@ public List> getMaterializedViewProperties() return materializedViewProperties.getMaterializedViewProperties(); } + @Override + public Set getProcedures() + { + return procedures; + } + @Override public void shutdown() { diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseHiveModule.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseHiveModule.java index 13e97b466d58..61e65b5c63ad 100644 --- a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseHiveModule.java +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseHiveModule.java @@ -58,6 +58,7 @@ import io.trino.plugin.hive.line.SimpleTextFileWriterFactory; import io.trino.plugin.hive.metastore.HiveMetastoreConfig; import io.trino.plugin.hive.metastore.HiveMetastoreModule; +import io.trino.plugin.hive.metastore.glue.GlueCache; import io.trino.plugin.hive.orc.OrcFileWriterFactory; import io.trino.plugin.hive.orc.OrcPageSourceFactory; import io.trino.plugin.hive.parquet.ParquetFileWriterFactory; @@ -67,6 +68,7 @@ import java.util.Optional; import static com.google.inject.multibindings.Multibinder.newSetBinder; +import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static io.airlift.configuration.ConfigBinder.configBinder; import static io.airlift.json.JsonCodecBinder.jsonCodecBinder; import static org.weakref.jmx.guice.ExportBinder.newExporter; @@ -138,5 +140,8 @@ protected void setup(Binder binder) binder.install(new HiveExecutorModule()); install(new ParquetEncryptionModule()); + + newOptionalBinder(binder, GlueCache.class); + newOptionalBinder(binder, DirectoryLister.class); } } diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java index 6050c8fa54f0..22199288bab7 100644 --- a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java @@ -16,15 +16,33 @@ import com.google.inject.Binder; import com.google.inject.Key; import com.google.inject.Scopes; +import com.google.inject.multibindings.Multibinder; import io.airlift.configuration.AbstractConfigurationAwareModule; import io.trino.plugin.base.metrics.FileFormatDataSourceStats; +import io.trino.plugin.deltalake.procedure.DropExtendedStatsProcedure; +import io.trino.plugin.deltalake.procedure.FlushMetadataCacheProcedure; +import io.trino.plugin.deltalake.procedure.RegisterTableProcedure; +import io.trino.plugin.deltalake.procedure.UnregisterTableProcedure; +import io.trino.plugin.deltalake.procedure.VacuumProcedure; import io.trino.plugin.hive.HideDeltaLakeTables; import io.trino.plugin.hive.SortingFileWriterConfig; import io.trino.plugin.hive.orc.OrcReaderConfig; import io.trino.plugin.hive.orc.OrcWriterConfig; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.hive.parquet.ParquetWriterConfig; +import io.trino.plugin.hive.procedure.CreateEmptyPartitionProcedure; +import io.trino.plugin.hive.procedure.DropStatsProcedure; +import io.trino.plugin.hive.procedure.RegisterPartitionProcedure; +import io.trino.plugin.hive.procedure.SyncPartitionMetadataProcedure; +import io.trino.plugin.hive.procedure.UnregisterPartitionProcedure; +import io.trino.plugin.iceberg.procedure.RollbackToSnapshotProcedure; +import io.trino.plugin.lakehouse.procedures.LakehouseDropStatsProcedure; +import io.trino.plugin.lakehouse.procedures.LakehouseFlushMetadataCacheProcedure; +import io.trino.plugin.lakehouse.procedures.LakehouseRegisterTableProcedure; +import io.trino.plugin.lakehouse.procedures.LakehouseUnregisterTableProcedure; +import io.trino.spi.procedure.Procedure; +import static com.google.inject.multibindings.Multibinder.newSetBinder; import static io.airlift.configuration.ConfigBinder.configBinder; import static org.weakref.jmx.guice.ExportBinder.newExporter; @@ -53,6 +71,33 @@ protected void setup(Binder binder) binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON); newExporter(binder).export(FileFormatDataSourceStats.class).withGeneratedName(); + Multibinder procedures = newSetBinder(binder, Procedure.class); + // DeltaLake procedures + procedures.addBinding().toProvider(VacuumProcedure.class).in(Scopes.SINGLETON); + // Hive procedures + procedures.addBinding().toProvider(CreateEmptyPartitionProcedure.class).in(Scopes.SINGLETON); + procedures.addBinding().toProvider(RegisterPartitionProcedure.class).in(Scopes.SINGLETON); + procedures.addBinding().toProvider(UnregisterPartitionProcedure.class).in(Scopes.SINGLETON); + procedures.addBinding().toProvider(SyncPartitionMetadataProcedure.class).in(Scopes.SINGLETON); + // Iceberg procedures + procedures.addBinding().toProvider(RollbackToSnapshotProcedure.class).in(Scopes.SINGLETON); + // Mixed procedures + binder.bind(DropExtendedStatsProcedure.class).in(Scopes.SINGLETON); + binder.bind(DropStatsProcedure.class).in(Scopes.SINGLETON); + procedures.addBinding().toProvider(LakehouseDropStatsProcedure.class).in(Scopes.SINGLETON); + + binder.bind(RegisterTableProcedure.class).in(Scopes.SINGLETON); + binder.bind(io.trino.plugin.iceberg.procedure.RegisterTableProcedure.class).in(Scopes.SINGLETON); + procedures.addBinding().toProvider(LakehouseRegisterTableProcedure.class).in(Scopes.SINGLETON); + + binder.bind(UnregisterTableProcedure.class).in(Scopes.SINGLETON); + binder.bind(io.trino.plugin.iceberg.procedure.UnregisterTableProcedure.class).in(Scopes.SINGLETON); + procedures.addBinding().toProvider(LakehouseUnregisterTableProcedure.class).in(Scopes.SINGLETON); + + binder.bind(FlushMetadataCacheProcedure.class).in(Scopes.SINGLETON); + binder.bind(io.trino.plugin.hive.procedure.FlushMetadataCacheProcedure.class).in(Scopes.SINGLETON); + procedures.addBinding().toProvider(LakehouseFlushMetadataCacheProcedure.class).in(Scopes.SINGLETON); + binder.bind(Key.get(boolean.class, HideDeltaLakeTables.class)).toInstance(false); } } diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseDropStatsProcedure.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseDropStatsProcedure.java new file mode 100644 index 000000000000..628130041d03 --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseDropStatsProcedure.java @@ -0,0 +1,104 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse.procedures; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; +import com.google.inject.Provider; +import io.trino.plugin.deltalake.procedure.DropExtendedStatsProcedure; +import io.trino.plugin.hive.procedure.DropStatsProcedure; +import io.trino.plugin.lakehouse.TableType; +import io.trino.spi.connector.ConnectorAccessControl; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.procedure.Procedure; +import io.trino.spi.type.ArrayType; + +import java.lang.invoke.MethodHandle; +import java.util.List; + +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.lang.invoke.MethodHandles.lookup; +import static java.util.Objects.requireNonNull; + +/** + * A procedure that drops statistics. + *

+ * It is delegated to the appropriate underlying procedure based on the table type. + * Currently, it supports Delta Lake and Hive table types. + */ +public class LakehouseDropStatsProcedure + implements Provider +{ + private static final MethodHandle DROP_STATS; + + private static final String SYSTEM_SCHEMA = "system"; + private static final String PROCEDURE_NAME = "drop_stats"; + + private static final String TABLE_TYPE = "TABLE_TYPE"; + private static final String SCHEMA_NAME = "SCHEMA_NAME"; + private static final String TABLE_NAME = "TABLE_NAME"; + private static final String PARTITION_VALUES = "PARTITION_VALUES"; + + static { + try { + DROP_STATS = lookup().unreflect(LakehouseDropStatsProcedure.class.getMethod( + "dropStats", ConnectorSession.class, ConnectorAccessControl.class, String.class, String.class, String.class, List.class)); + } + catch (ReflectiveOperationException e) { + throw new AssertionError(e); + } + } + + private final DropExtendedStatsProcedure deltaLakeDropStatsProcedure; + private final DropStatsProcedure hiveDropStatsProcedure; + + @Inject + public LakehouseDropStatsProcedure( + DropExtendedStatsProcedure deltaLakeDropStatsProcedure, + DropStatsProcedure hiveDropStatsProcedure) + { + this.deltaLakeDropStatsProcedure = requireNonNull(deltaLakeDropStatsProcedure, "deltaLakeDropStatsProcedure is null"); + this.hiveDropStatsProcedure = requireNonNull(hiveDropStatsProcedure, "hiveDropStatsProcedure is null"); + } + + @Override + public Procedure get() + { + return new Procedure( + SYSTEM_SCHEMA, + PROCEDURE_NAME, + ImmutableList.of( + new Procedure.Argument(TABLE_TYPE, VARCHAR), + new Procedure.Argument(SCHEMA_NAME, VARCHAR), + new Procedure.Argument(TABLE_NAME, VARCHAR), + new Procedure.Argument(PARTITION_VALUES, new ArrayType(new ArrayType(VARCHAR)), false, null)), + DROP_STATS.bindTo(this)); + } + + public void dropStats(ConnectorSession session, ConnectorAccessControl accessControl, String tableType, String schema, String table, List partitionValues) + { + if (TableType.DELTA.name().equals(tableType)) { + if (partitionValues != null) { + throw new IllegalArgumentException("Partition values are not supported for Delta Lake procedure"); + } + deltaLakeDropStatsProcedure.dropStats(session, accessControl, schema, table); + } + else if (TableType.HIVE.name().equals(tableType)) { + hiveDropStatsProcedure.dropStats(session, accessControl, schema, table, partitionValues); + } + else { + throw new IllegalArgumentException("Unsupported table type: " + tableType); + } + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseFlushMetadataCacheProcedure.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseFlushMetadataCacheProcedure.java new file mode 100644 index 000000000000..3c31062ae86a --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseFlushMetadataCacheProcedure.java @@ -0,0 +1,107 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse.procedures; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; +import com.google.inject.Provider; +import io.trino.plugin.deltalake.procedure.FlushMetadataCacheProcedure; +import io.trino.plugin.lakehouse.TableType; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.procedure.Procedure; +import io.trino.spi.type.ArrayType; + +import java.lang.invoke.MethodHandle; +import java.util.List; + +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.lang.invoke.MethodHandles.lookup; +import static java.util.Objects.requireNonNull; + +/** + * A procedure that flushes the metadata cache for a table or a specific partition of a table. + *

+ * It is delegated to the appropriate underlying procedure based on the table type. + * Currently, it supports Delta Lake and Hive table types. + */ +public class LakehouseFlushMetadataCacheProcedure + implements Provider +{ + private static final MethodHandle FLUSH_METADATA_CACHE; + + private static final String SYSTEM_SCHEMA = "system"; + private static final String PROCEDURE_NAME = "flush_metadata_cache"; + + private static final String TABLE_TYPE = "TABLE_TYPE"; + private static final String SCHEMA_NAME = "SCHEMA_NAME"; + private static final String TABLE_NAME = "TABLE_NAME"; + private static final String PARAM_PARTITION_COLUMNS = "PARTITION_COLUMNS"; + private static final String PARAM_PARTITION_VALUES = "PARTITION_VALUES"; + + static { + try { + FLUSH_METADATA_CACHE = lookup().unreflect(LakehouseFlushMetadataCacheProcedure.class.getMethod( + "flushMetadataCache", ConnectorSession.class, String.class, String.class, String.class, List.class, List.class)); + } + catch (ReflectiveOperationException e) { + throw new AssertionError(e); + } + } + + private final FlushMetadataCacheProcedure deltaLakeFlushMetadataCacheProcedure; + private final io.trino.plugin.hive.procedure.FlushMetadataCacheProcedure hiveFlushMetadataCacheProcedure; + + @Inject + public LakehouseFlushMetadataCacheProcedure( + FlushMetadataCacheProcedure deltaLakeFlushMetadataCacheProcedure, + io.trino.plugin.hive.procedure.FlushMetadataCacheProcedure hiveFlushMetadataCacheProcedure) + { + this.deltaLakeFlushMetadataCacheProcedure = requireNonNull(deltaLakeFlushMetadataCacheProcedure, "deltaLakeFlushMetadataCacheProcedure is null"); + this.hiveFlushMetadataCacheProcedure = requireNonNull(hiveFlushMetadataCacheProcedure, "hiveFlushMetadataCacheProcedure is null"); + } + + @Override + public Procedure get() + { + return new Procedure( + SYSTEM_SCHEMA, + PROCEDURE_NAME, + ImmutableList.of( + new Procedure.Argument(TABLE_TYPE, VARCHAR), + new Procedure.Argument(SCHEMA_NAME, VARCHAR), + new Procedure.Argument(TABLE_NAME, VARCHAR), + new Procedure.Argument(PARAM_PARTITION_COLUMNS, new ArrayType(VARCHAR), false, null), + new Procedure.Argument(PARAM_PARTITION_VALUES, new ArrayType(VARCHAR), false, null)), + FLUSH_METADATA_CACHE.bindTo(this)); + } + + public void flushMetadataCache(ConnectorSession session, String tableType, String schema, String table, List partitionColumns, List partitionValues) + { + if (TableType.DELTA.name().equals(tableType)) { + if (partitionColumns != null && !partitionColumns.isEmpty()) { + throw new IllegalArgumentException("Partition columns are not supported for Delta Lake tables"); + } + if (partitionValues != null && !partitionValues.isEmpty()) { + throw new IllegalArgumentException("Partition values are not supported for Delta Lake tables"); + } + deltaLakeFlushMetadataCacheProcedure.flushMetadataCache(schema, table); + } + else if (TableType.HIVE.name().equals(tableType)) { + hiveFlushMetadataCacheProcedure.flushMetadataCache(session, schema, table, partitionColumns, partitionValues); + } + else { + throw new IllegalArgumentException("Unsupported table type: " + tableType); + } + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseRegisterTableProcedure.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseRegisterTableProcedure.java new file mode 100644 index 000000000000..605fea341aaa --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseRegisterTableProcedure.java @@ -0,0 +1,103 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse.procedures; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; +import com.google.inject.Provider; +import io.trino.plugin.deltalake.procedure.RegisterTableProcedure; +import io.trino.plugin.lakehouse.TableType; +import io.trino.spi.connector.ConnectorAccessControl; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.procedure.Procedure; + +import java.lang.invoke.MethodHandle; + +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.lang.invoke.MethodHandles.lookup; +import static java.util.Objects.requireNonNull; + +/** + * A procedure that registers a table in the metastore. + *

+ * It is delegated to the appropriate underlying procedure based on the table type. + * Currently, it supports Delta Lake and Iceberg table types. + */ +public class LakehouseRegisterTableProcedure + implements Provider +{ + private static final MethodHandle REGISTER_TABLE; + + private static final String SYSTEM_SCHEMA = "system"; + private static final String PROCEDURE_NAME = "register_table"; + + private static final String TABLE_TYPE = "TABLE_TYPE"; + private static final String SCHEMA_NAME = "SCHEMA_NAME"; + private static final String TABLE_NAME = "TABLE_NAME"; + private static final String TABLE_LOCATION = "TABLE_LOCATION"; + private static final String METADATA_FILE_NAME = "METADATA_FILE_NAME"; + + static { + try { + REGISTER_TABLE = lookup().unreflect(LakehouseRegisterTableProcedure.class.getMethod( + "registerTable", ConnectorAccessControl.class, ConnectorSession.class, String.class, String.class, String.class, String.class, String.class)); + } + catch (ReflectiveOperationException e) { + throw new AssertionError(e); + } + } + + private final RegisterTableProcedure deltaLakeRegisterTableProcedure; + private final io.trino.plugin.iceberg.procedure.RegisterTableProcedure icebergRegisterTableProcedure; + + @Inject + public LakehouseRegisterTableProcedure( + RegisterTableProcedure deltaLakeRegisterTableProcedure, + io.trino.plugin.iceberg.procedure.RegisterTableProcedure icebergRegisterTableProcedure) + { + this.deltaLakeRegisterTableProcedure = requireNonNull(deltaLakeRegisterTableProcedure, "deltaLakeRegisterTableProcedure is null"); + this.icebergRegisterTableProcedure = requireNonNull(icebergRegisterTableProcedure, "icebergRegisterTableProcedure is null"); + } + + @Override + public Procedure get() + { + return new Procedure( + SYSTEM_SCHEMA, + PROCEDURE_NAME, + ImmutableList.of( + new Procedure.Argument(TABLE_TYPE, VARCHAR), + new Procedure.Argument(SCHEMA_NAME, VARCHAR), + new Procedure.Argument(TABLE_NAME, VARCHAR), + new Procedure.Argument(TABLE_LOCATION, VARCHAR), + new Procedure.Argument(METADATA_FILE_NAME, VARCHAR, false, null)), + REGISTER_TABLE.bindTo(this)); + } + + public void registerTable(ConnectorAccessControl accessControl, ConnectorSession session, String tableType, String schema, String table, String tableLocation, String metadataFileName) + { + if (TableType.DELTA.name().equals(tableType)) { + if (metadataFileName != null) { + throw new IllegalArgumentException("Metadata file name value is not supported for Delta Lake procedure"); + } + deltaLakeRegisterTableProcedure.registerTable(accessControl, session, schema, table, tableLocation); + } + else if (TableType.ICEBERG.name().equals(tableType)) { + icebergRegisterTableProcedure.registerTable(accessControl, session, schema, table, tableLocation, metadataFileName); + } + else { + throw new IllegalArgumentException("Unsupported table type: " + tableType); + } + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseUnregisterTableProcedure.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseUnregisterTableProcedure.java new file mode 100644 index 000000000000..0a3e4234d22f --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseUnregisterTableProcedure.java @@ -0,0 +1,96 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse.procedures; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; +import com.google.inject.Provider; +import io.trino.plugin.deltalake.procedure.UnregisterTableProcedure; +import io.trino.plugin.lakehouse.TableType; +import io.trino.spi.connector.ConnectorAccessControl; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.procedure.Procedure; + +import java.lang.invoke.MethodHandle; + +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.lang.invoke.MethodHandles.lookup; +import static java.util.Objects.requireNonNull; + +/** + * A procedure that unregisters a table from the metastore. + *

+ * It is delegated to the appropriate underlying procedure based on the table type. + * Currently, it supports Delta Lake and Iceberg table types. + */ +public class LakehouseUnregisterTableProcedure + implements Provider +{ + private static final MethodHandle UNREGISTER_TABLE; + + private static final String SYSTEM_SCHEMA = "system"; + private static final String PROCEDURE_NAME = "unregister_table"; + + private static final String TABLE_TYPE = "TABLE_TYPE"; + private static final String SCHEMA_NAME = "SCHEMA_NAME"; + private static final String TABLE_NAME = "TABLE_NAME"; + + static { + try { + UNREGISTER_TABLE = lookup().unreflect(LakehouseUnregisterTableProcedure.class.getMethod( + "unregisterTable", ConnectorAccessControl.class, ConnectorSession.class, String.class, String.class, String.class)); + } + catch (ReflectiveOperationException e) { + throw new AssertionError(e); + } + } + + private final UnregisterTableProcedure deltaLakeUnregisterTableProcedure; + private final io.trino.plugin.iceberg.procedure.UnregisterTableProcedure icebergUnregisterTableProcedure; + + @Inject + public LakehouseUnregisterTableProcedure( + UnregisterTableProcedure deltaLakeUnregisterTableProcedure, + io.trino.plugin.iceberg.procedure.UnregisterTableProcedure icebergUnregisterTableProcedure) + { + this.deltaLakeUnregisterTableProcedure = requireNonNull(deltaLakeUnregisterTableProcedure, "deltaLakeUnregisterTableProcedure is null"); + this.icebergUnregisterTableProcedure = requireNonNull(icebergUnregisterTableProcedure, "icebergUnregisterTableProcedure is null"); + } + + @Override + public Procedure get() + { + return new Procedure( + SYSTEM_SCHEMA, + PROCEDURE_NAME, + ImmutableList.of( + new Procedure.Argument(TABLE_TYPE, VARCHAR), + new Procedure.Argument(SCHEMA_NAME, VARCHAR), + new Procedure.Argument(TABLE_NAME, VARCHAR)), + UNREGISTER_TABLE.bindTo(this)); + } + + public void unregisterTable(ConnectorAccessControl accessControl, ConnectorSession session, String tableType, String schema, String table) + { + if (TableType.DELTA.name().equals(tableType)) { + deltaLakeUnregisterTableProcedure.unregisterTable(accessControl, session, schema, table); + } + else if (TableType.ICEBERG.name().equals(tableType)) { + icebergUnregisterTableProcedure.unregisterTable(accessControl, session, schema, table); + } + else { + throw new IllegalArgumentException("Unsupported table type: " + tableType); + } + } +} diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/BaseLakehouseConnectorSmokeTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/BaseLakehouseConnectorSmokeTest.java index d5faf3eb11a1..3d8ee28e9382 100644 --- a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/BaseLakehouseConnectorSmokeTest.java +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/BaseLakehouseConnectorSmokeTest.java @@ -62,6 +62,9 @@ protected QueryRunner createQueryRunner() .addLakehouseProperty("s3.endpoint", hiveMinio.getMinio().getMinioAddress()) .addLakehouseProperty("s3.path-style-access", "true") .addLakehouseProperty("s3.streaming.part-size", "5MB") + .addLakehouseProperty("hive.allow-register-partition-procedure", "true") + .addLakehouseProperty("delta.register-table-procedure.enabled", "true") + .addLakehouseProperty("iceberg.register-table-procedure.enabled", "true") .build(); } diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java index 955bdd2da1ee..bee95da05969 100644 --- a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java @@ -16,6 +16,8 @@ import org.junit.jupiter.api.Test; import static io.trino.plugin.lakehouse.TableType.DELTA; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -59,4 +61,27 @@ public void testShowCreateTable() type = 'DELTA' )\\E"""); } + + @Test + void testProcedures() + { + String tableName = "table_for_procedures_" + randomNameSuffix(); + + assertUpdate(format("CREATE TABLE %s AS SELECT 2 AS age", tableName), 1); + + assertThat(query(format("CALL lakehouse.system.vacuum(CURRENT_SCHEMA, '%s', '8.00d')", tableName))) + .succeeds().returnsEmptyResult(); + + assertThat(query(format("CALL lakehouse.system.drop_stats('DELTA', CURRENT_SCHEMA, '%s')", tableName))) + .succeeds().returnsEmptyResult(); + + assertThat(query(format("CALL lakehouse.system.register_table('DELTA', CURRENT_SCHEMA, '%s', 's3://bucket/table')", tableName))) + .failure().hasMessage("Failed checking table location s3://bucket/table"); + + assertThat(query(format("CALL lakehouse.system.unregister_table('DELTA', CURRENT_SCHEMA, '%s')", tableName))) + .succeeds().returnsEmptyResult(); + + assertThat(query(format("CALL lakehouse.system.flush_metadata_cache('DELTA', CURRENT_SCHEMA, '%s')", tableName))) + .succeeds().returnsEmptyResult(); + } } diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java index 1745312c5c41..641d76530675 100644 --- a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java @@ -18,6 +18,8 @@ import org.junit.jupiter.api.Test; import static io.trino.plugin.lakehouse.TableType.HIVE; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; public class TestLakehouseHiveConnectorSmokeTest @@ -66,4 +68,36 @@ comment varchar(152) type = 'HIVE' )"""); } + + @Test + void testProcedures() + { + String tableName = "table_for_procedures_" + randomNameSuffix(); + + assertUpdate(format("CREATE TABLE %s (" + + " dummy_col bigint," + + " part varchar)" + + "WITH (" + + " format = 'ORC', " + + " partitioned_by = ARRAY[ 'part' ] " + + ")", tableName)); + + assertThat(query(format("CALL lakehouse.system.create_empty_partition(CURRENT_SCHEMA, '%s', ARRAY['part'], ARRAY['empty'])", tableName))) + .succeeds().returnsEmptyResult(); + + assertThat(query(format("CALL lakehouse.system.register_partition(CURRENT_SCHEMA, '%s', ARRAY['part'], ARRAY['p1'])", tableName))) + .succeeds().returnsEmptyResult(); + + assertThat(query(format("CALL lakehouse.system.unregister_partition(CURRENT_SCHEMA, '%s', ARRAY['part'], ARRAY['p1'])", tableName))) + .succeeds().returnsEmptyResult(); + + assertThat(query(format("CALL lakehouse.system.sync_partition_metadata(CURRENT_SCHEMA, '%s', 'FULL')", tableName))) + .succeeds().returnsEmptyResult(); + + assertThat(query(format("CALL lakehouse.system.drop_stats('HIVE', CURRENT_SCHEMA, '%s', ARRAY[ARRAY['p1']])", tableName))) + .succeeds().returnsEmptyResult(); + + assertThat(query(format("CALL lakehouse.system.flush_metadata_cache('HIVE', CURRENT_SCHEMA, '%s', ARRAY['part'])", tableName))) + .failure().hasMessage("Cannot flush, metastore cache is not enabled"); + } } diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java index 42bfaa102b22..75d0318b8d33 100644 --- a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java @@ -16,6 +16,8 @@ import org.junit.jupiter.api.Test; import static io.trino.plugin.lakehouse.TableType.ICEBERG; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; public class TestLakehouseIcebergConnectorSmokeTest @@ -44,4 +46,21 @@ public void testShowCreateTable() type = 'ICEBERG' )\\E"""); } + + @Test + void testProcedures() + { + String tableName = "table_for_procedures_" + randomNameSuffix(); + + assertUpdate(format("CREATE TABLE %s AS SELECT 2 AS age", tableName), 1); + + assertThat(query(format("CALL lakehouse.system.rollback_to_snapshot(CURRENT_SCHEMA, '%s', 1234)", tableName))) + .failure().hasMessage("Cannot roll back to unknown snapshot id: 1234"); + + assertThat(query(format("CALL lakehouse.system.register_table('ICEBERG', CURRENT_SCHEMA, '%s', 's3://bucket/table')", tableName))) + .failure().hasMessage("Failed checking table location: s3://bucket/table"); + + assertThat(query(format("CALL lakehouse.system.unregister_table('ICEBERG', CURRENT_SCHEMA, '%s')", tableName))) + .succeeds().returnsEmptyResult(); + } }