diff --git a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json index 7ce5591525f..2031943fc38 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json +++ b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json @@ -728,6 +728,22 @@ "isInput" : false, "comment" : "" } ] +}, { + "classname" : "org.apache.spark.sql.catalyst.plans.logical.SetViewProperties", + "tableDescs" : [ { + "fieldName" : "child", + "fieldExtractor" : "ResolvedTableTableExtractor", + "columnDesc" : null, + "actionTypeDesc" : null, + "tableTypeDesc" : null, + "catalogDesc" : null, + "isInput" : true, + "setCurrentDatabaseIfMissing" : false, + "comment" : "" + } ], + "opType" : "ALTERTABLE_PROPERTIES", + "queryDescs" : [ ], + "uriDescs" : [ ] }, { "classname" : "org.apache.spark.sql.catalyst.plans.logical.ShowCreateTable", "tableDescs" : [ { @@ -808,6 +824,22 @@ "opType" : "ALTERTABLE_PROPERTIES", "queryDescs" : [ ], "uriDescs" : [ ] +}, { + "classname" : "org.apache.spark.sql.catalyst.plans.logical.UnsetViewProperties", + "tableDescs" : [ { + "fieldName" : "child", + "fieldExtractor" : "ResolvedTableTableExtractor", + "columnDesc" : null, + "actionTypeDesc" : null, + "tableTypeDesc" : null, + "catalogDesc" : null, + "isInput" : true, + "setCurrentDatabaseIfMissing" : false, + "comment" : "" + } ], + "opType" : "ALTERTABLE_PROPERTIES", + "queryDescs" : [ ], + "uriDescs" : [ ] }, { "classname" : "org.apache.spark.sql.catalyst.plans.logical.UpdateTable", "tableDescs" : [ { @@ -2077,6 +2109,42 @@ "opType" : "QUERY", "queryDescs" : [ ], "uriDescs" : [ ] +}, { + "classname" : "org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView", + "tableDescs" : [ { + "fieldName" : "child", + "fieldExtractor" : "ResolvedIdentifierTableExtractor", + "columnDesc" : null, + "actionTypeDesc" : null, + "tableTypeDesc" : null, + "catalogDesc" : null, + "isInput" : false, + "setCurrentDatabaseIfMissing" : false, + "comment" : "" + } ], + "opType" : "CREATEVIEW", + "queryDescs" : [ { + "fieldName" : "query", + "fieldExtractor" : "LogicalPlanQueryExtractor", + "comment" : "" + } ], + "uriDescs" : [ ] +}, { + "classname" : "org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView", + "tableDescs" : [ { + "fieldName" : "child", + "fieldExtractor" : "ResolvedIdentifierTableExtractor", + "columnDesc" : null, + "actionTypeDesc" : null, + "tableTypeDesc" : null, + "catalogDesc" : null, + "isInput" : false, + "setCurrentDatabaseIfMissing" : false, + "comment" : "" + } ], + "opType" : "DROPVIEW", + "queryDescs" : [ ], + "uriDescs" : [ ] }, { "classname" : "org.apache.spark.sql.hudi.command.AlterHoodieTableAddColumnsCommand", "tableDescs" : [ { diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala index c036660691c..3770d12c1ed 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala @@ -140,8 +140,13 @@ class ResolvedTableTableExtractor extends TableExtractor { val catalog = lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogVal) val identifier = invokeAs[AnyRef](v1, "identifier") val maybeTable = lookupExtractor[IdentifierTableExtractor].apply(spark, identifier) - val maybeOwner = TableExtractor.getOwner(v1) - maybeTable.map(_.copy(catalog = catalog, owner = maybeOwner)) + // Iceberg show create table $viewName use ResolvedV2View + if (!v1.getClass.getName + .equals("org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View")) { + val maybeOwner = TableExtractor.getOwner(v1) + maybeTable.map(_.copy(catalog = catalog, owner = maybeOwner)) + } + maybeTable } } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/IcebergCommands.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/IcebergCommands.scala index 2cb3930fb20..497fb146abc 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/IcebergCommands.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/IcebergCommands.scala @@ -18,6 +18,7 @@ package org.apache.kyuubi.plugin.spark.authz.gen import org.apache.kyuubi.plugin.spark.authz.OperationType +import org.apache.kyuubi.plugin.spark.authz.OperationType.{CREATEVIEW, DROPVIEW} import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._ import org.apache.kyuubi.plugin.spark.authz.serde._ @@ -113,6 +114,31 @@ object IcebergCommands extends CommandSpecs[TableCommandSpec] { AddPartitionFiled.copy(cmd) } + val CreateIcebergView = { + val cmd = "org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView" + val tableDesc = TableDesc( + "child", + classOf[ResolvedIdentifierTableExtractor], + isInput = false) + val queryDesc = QueryDesc("query") + TableCommandSpec( + cmd, + Seq(tableDesc), + CREATEVIEW, + queryDescs = Seq(queryDesc)) + } + + val DropIcebergView = { + val cmd = "org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView" + val tableDesc = TableDesc( + "child", + classOf[ResolvedIdentifierTableExtractor]) + TableCommandSpec( + cmd, + Seq(tableDesc), + DROPVIEW) + } + override def specs: Seq[TableCommandSpec] = Seq( CallProcedure, DeleteFromIcebergTable, @@ -128,6 +154,8 @@ object IcebergCommands extends CommandSpecs[TableCommandSpec] { CreateOrReplaceTag, DropBranch, DropTag, + CreateIcebergView, + DropIcebergView, MergeIntoIcebergTable.copy(classname = "org.apache.spark.sql.catalyst.plans.logical.UnresolvedMergeIntoIcebergTable")) } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala index 3c08ad4ab6f..472bcf7bba4 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala @@ -674,6 +674,20 @@ object TableCommands extends CommandSpecs[TableCommandSpec] { TableCommandSpec(cmd, Nil, ADD, uriDescs = Seq(uriDesc)) } + val UnsetViewProperties = { + val cmd = "org.apache.spark.sql.catalyst.plans.logical.UnsetViewProperties" + val tableDesc = + TableDesc("child", classOf[ResolvedTableTableExtractor], isInput = true) + TableCommandSpec(cmd, Seq(tableDesc), ALTERTABLE_PROPERTIES) + } + + val SetViewProperties = { + val cmd = "org.apache.spark.sql.catalyst.plans.logical.SetViewProperties" + val tableDesc = + TableDesc("child", classOf[ResolvedTableTableExtractor], isInput = true) + TableCommandSpec(cmd, Seq(tableDesc), ALTERTABLE_PROPERTIES) + } + override def specs: Seq[TableCommandSpec] = Seq( AddArchivesCommand, AddArchivesCommand.copy(classname = "org.apache.spark.sql.execution.command.AddFilesCommand"), @@ -764,5 +778,7 @@ object TableCommands extends CommandSpecs[TableCommandSpec] { ShowTablePropertiesV2, TruncateTable, TruncateTableV2, + SetViewProperties, + UnsetViewProperties, UpdateTable) } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala index 16a8beb22c4..f1794cc72a4 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala @@ -66,6 +66,7 @@ class IcebergCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite spark.conf.set(s"spark.sql.catalog.$catalogV2.uri", getMysqlJdbcUrl) spark.conf.set(s"spark.sql.catalog.$catalogV2.jdbc.user", getMysqlUsername) spark.conf.set(s"spark.sql.catalog.$catalogV2.jdbc.password", getMysqlPassword) + spark.conf.set(s"spark.sql.catalog.$catalogV2.jdbc.schema-version", "V1") spark.conf.set( s"spark.sql.catalog.$catalogV2.warehouse", Utils.createTempDir("iceberg-hadoop").toString) @@ -336,6 +337,10 @@ class IcebergCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite existedSnapshots(0) } + private def getSnapshots(table: String): Array[Row] = { + sql(s"SELECT * FROM $table.snapshots ORDER BY committed_at ASC").collect() + } + test("CALL rollback_to_snapshot") { val tableName = "table_rollback_to_snapshot" val table = s"$catalogV2.$namespace1.$tableName" @@ -689,4 +694,261 @@ class IcebergCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite } } + test("View Operation Permission Check For Iceberg") { + val sourceTable = s"$catalogV2.$namespace1.source_table" + withCleanTmpResources(Seq((sourceTable, "table"))) { + val baseView = s"$catalogV2.$namespace1.base_view" + val viewWithProperties = s"$catalogV2.$namespace1.view_with_properties" + doAs( + admin, + sql( + s"CREATE TABLE $sourceTable" + + s"(id int NOT NULL, name string, city string) USING iceberg")) + val createViewSql = + s""" + |CREATE VIEW IF NOT EXISTS $baseView AS + |SELECT * FROM $sourceTable + |""".stripMargin + val createViewWithPropertiesSql = + s""" + |CREATE VIEW IF NOT EXISTS $viewWithProperties + |TBLPROPERTIES('key1'='val1', 'key2'='val2') + |AS SELECT * FROM $sourceTable + |""".stripMargin + val selectViewSql = s"SELECT * FROM $baseView" + val showCreateViewSql = s"SHOW CREATE TABLE $baseView" + val displayBaseViewDetailSql = s"DESCRIBE $baseView" + val replaceBaseViewSql = + s""" + |CREATE OR REPLACE VIEW $baseView (id COMMENT 'id') + |AS SELECT id FROM $sourceTable + |""".stripMargin + val setViewProperties = + s""" + |ALTER VIEW $viewWithProperties + |SET TBLPROPERTIES('key3'='val3') + |""".stripMargin + val removeViewProperties = + s""" + |ALTER VIEW $viewWithProperties + |UNSET TBLPROPERTIES('key1', 'key2') + |""".stripMargin + val dropBaseViewSql = s"DROP VIEW $baseView" + + // test create base view + interceptEndsWith[AccessControlException] { + doAs(someone, sql(createViewSql)) + }(s"user [someone] does not have [select] privilege on [$namespace1/source_table/city]") + doAs(admin, sql(createViewSql)) + + // test create view with properties + interceptEndsWith[AccessControlException] { + doAs(someone, sql(createViewWithPropertiesSql)) + }(s"user [someone] does not have [select] privilege on [$namespace1/source_table/city]") + doAs(admin, sql(createViewWithPropertiesSql)) + + // test select view + interceptEndsWith[AccessControlException] { + doAs(someone, sql(selectViewSql).show) + }(s"user [someone] does not have [select] privilege on [$namespace1/source_table/city]") + doAs(admin, sql(selectViewSql)) + + // test show create table viewName + interceptEndsWith[AccessControlException] { + doAs(someone, sql(showCreateViewSql)) + }(s"user [someone] does not have [select] privilege on [$namespace1/base_view]") + doAs(admin, sql(showCreateViewSql)) + + // test describe view + interceptEndsWith[AccessControlException] { + doAs(someone, sql(displayBaseViewDetailSql)) + }(s"user [someone] does not have [select] privilege on [$namespace1/base_view]") + doAs(admin, sql(displayBaseViewDetailSql)) + + // test replace view + interceptEndsWith[AccessControlException] { + doAs(someone, sql(replaceBaseViewSql)) + }(s"user [someone] does not have [select] privilege on [$namespace1/source_table/id]") + doAs(admin, sql(replaceBaseViewSql)) + + // test setting or remove view propertis + interceptEndsWith[AccessControlException] { + doAs(someone, sql(setViewProperties)) + }(s"user [someone] does not have [alter] privilege on [$namespace1/view_with_properties]") + doAs(admin, sql(setViewProperties)) + + interceptEndsWith[AccessControlException] { + doAs(someone, sql(removeViewProperties)) + }(s"user [someone] does not have [alter] privilege on [$namespace1/view_with_properties]") + doAs(admin, sql(removeViewProperties)) + + // test drop view + interceptEndsWith[AccessControlException] { + doAs(someone, sql(dropBaseViewSql)) + }(s"user [someone] does not have [drop] privilege on [iceberg_ns/base_view]") + doAs(admin, sql(dropBaseViewSql)) + } + } + + test("test producers for iceberg") { + val tableName = "table_snapshot" + val table = s"$catalogV2.$namespace1.$tableName" + val wapTable = s"$catalogV2.$namespace1.wap_table" + val sparkTable = s"$catalogV2.$namespace1.spark_table" + withCleanTmpResources(Seq( + (table, "table"), + (wapTable, "table"), + (sparkTable, "table"))) { + prepareExampleIcebergTable(table, 3) + val snapshots = getSnapshots(table) + + // test cherryPick snapshot, first rollback + val rollbackSnapshotId = snapshots(0).getAs[Long]("snapshot_id") + val cherrypickSnapshotId = snapshots(1).getAs[Long]("snapshot_id") + doAs( + admin, + sql( + s"CALL $catalogV2.system.rollback_to_snapshot(table => '$table', snapshot_id => $rollbackSnapshotId)")) + val cherryPickSnapshotSql = + s"CALL $catalogV2.system.cherrypick_snapshot(table => '$table', snapshot_id => $cherrypickSnapshotId)" + interceptEndsWith[AccessControlException](doAs(someone, sql(cherryPickSnapshotSql)))( + s"user [someone] does not have [alter] privilege on [$namespace1/$tableName]") + doAs(admin, sql(cherryPickSnapshotSql)) + + // test publish_changes + val wapId = "test_wap_id" + doAs( + admin, + sql( + s""" + |CREATE TABLE $wapTable + |(id int, name string) + |using iceberg + |TBLPROPERTIES('write.wap.enabled'='true') + |""".stripMargin)) + doAs( + admin, { + spark.conf.set("spark.wap.id", wapId) + sql(s"INSERT INTO $wapTable VALUES (100, 'wap_test')") + }) + val publishChangesSql = + s""" + |CALL $catalogV2.system.publish_changes + |(table => '$wapTable', wap_id => '$wapId') + |""".stripMargin + interceptEndsWith[AccessControlException](doAs(someone, sql(publishChangesSql)))( + s"user [someone] does not have [alter] privilege on [$namespace1/wap_table]") + doAs(admin, sql(publishChangesSql)) + + // test fast_forward + doAs(admin, sql(s"ALTER TABLE $table CREATE branch test")) + val fastForwardSql = + s"CALL $catalogV2.system.fast_forward(table => '$table', branch => 'main', to => 'test')" + interceptEndsWith[AccessControlException](doAs(someone, sql(fastForwardSql)))( + s"user [someone] does not have [alter] privilege on [$namespace1/$tableName]") + doAs(admin, sql(fastForwardSql)) + + // test expire_snapshots + val expireSnapshotsSql = + s"CALL $catalogV2.system.expire_snapshots(table => '$table', older_than => TIMESTAMP '2999-12-31 00:00:00.000')" + interceptEndsWith[AccessControlException](doAs(someone, sql(expireSnapshotsSql)))( + s"user [someone] does not have [alter] privilege on [$namespace1/$tableName]") + doAs(admin, sql(expireSnapshotsSql)) + + // test remove_orphan_files + val removeOrphanFilesSql = + s"CALL $catalogV2.system.remove_orphan_files(table => '$table', older_than => TIMESTAMP '2000-01-01 00:00:00.000')" + interceptEndsWith[AccessControlException](doAs(someone, sql(removeOrphanFilesSql)))( + s"user [someone] does not have [alter] privilege on [$namespace1/$tableName]") + doAs(admin, sql(removeOrphanFilesSql)) + + // test rewrite_data_files + val rewriteDataFilesSql = s"CALL $catalogV2.system.rewrite_data_files(table => '$table')" + interceptEndsWith[AccessControlException](doAs(someone, sql(rewriteDataFilesSql)))( + s"user [someone] does not have [alter] privilege on [$namespace1/$tableName]") + doAs(admin, sql(rewriteDataFilesSql)) + + // test rewrite_manifests + val rewriteManifestsSql = s"CALL $catalogV2.system.rewrite_manifests(table => '$table')" + interceptEndsWith[AccessControlException](doAs(someone, sql(rewriteManifestsSql)))( + s"user [someone] does not have [alter] privilege on [$namespace1/$tableName]") + doAs(admin, sql(rewriteManifestsSql)) + + // test rewrite_position_delete_files + val rewritePosDeleteFilesSql = + s"CALL $catalogV2.system.rewrite_position_delete_files(table => '$table')" + interceptEndsWith[AccessControlException](doAs(someone, sql(rewritePosDeleteFilesSql)))( + s"user [someone] does not have [alter] privilege on [$namespace1/$tableName]") + doAs(admin, sql(rewritePosDeleteFilesSql)) + + // test migration of snapshot + val snapshotSql = s"CALL $catalogV2.system.snapshot('$table','$catalogV2.$namespace1.snap')" + interceptEndsWith[AccessControlException](doAs(someone, sql(snapshotSql)))( + s"user [someone] does not have [alter] privilege on [$namespace1/$tableName]") + + // test migration of migrate + doAs( + admin, + sql( + s""" + |CREATE TABLE $sparkTable + |(id int, name string) + |""".stripMargin)) + val migrateSql = s"CALL $catalogV2.system.migrate('$namespace1.spark_table')" + interceptEndsWith[AccessControlException](doAs(someone, sql(migrateSql)))( + s"user [someone] does not have [alter] privilege on [$namespace1/spark_table]") + + // test migration of add_files + val addFilesSql = + s""" + |CALL $catalogV2.system.add_files( + |table => '$wapTable', + |source_table => '$table' + |) + |""".stripMargin + interceptEndsWith[AccessControlException](doAs(someone, sql(addFilesSql)))( + s"user [someone] does not have [alter] privilege on [$namespace1/wap_table]") + + // test register_table + val registerTableSql = + s"CALL $catalogV2.system.register_table(table => '$table', metadata_file => 'dummy-location')" + interceptEndsWith[AccessControlException](doAs(someone, sql(registerTableSql)))( + s"user [someone] does not have [alter] privilege on [$namespace1/$tableName]") + + // test ancestors_of + val ancestorsOfSql = s"CALL $catalogV2.system.ancestors_of(table => '$table')" + interceptEndsWith[AccessControlException](doAs(someone, sql(ancestorsOfSql)))( + s"user [someone] does not have [alter] privilege on [$namespace1/$tableName]") + doAs(admin, sql(ancestorsOfSql)) + + // test create_change_log_view + val changeLogViewName = s"${tableName}_changelog_view" + val createChangeLogViewSql = + s"CALL $catalogV2.system.create_changelog_view(table => '$table'," + + s"options => map('start-timestamp','1678335750489','end-timestamp', '1678992105265')," + + s"changelog_view => '$changeLogViewName')" + interceptEndsWith[AccessControlException](doAs(someone, sql(createChangeLogViewSql)))( + s"user [someone] does not have [alter] privilege on [$namespace1/$tableName]") + doAs(admin, sql(createChangeLogViewSql)) + } + } + + // TODO: Enable after upgrading Iceberg 1.10 + ignore("currently unsupported producers for iceberg") { + val tableName = "table_snapshot" + val table = s"$catalogV2.$namespace1.$tableName" + withCleanTmpResources(Seq((table, "table"))) { + // test compute_table_stats + val computeTableStatsSql = s"CALL $catalogV2.system.compute_table_stats(table => '$table')" + interceptEndsWith[AccessControlException](doAs(someone, sql(computeTableStatsSql)))( + s"user [someone] does not have [alter] privilege on [$namespace1/$tableName]") + doAs(admin, sql(computeTableStatsSql)) + + // test rewrite_table_stats + val rewriteTableStatsSql = s"CALL $catalogV2.system.rewrite_table_stats(table => '$table')" + interceptEndsWith[AccessControlException](doAs(someone, sql(rewriteTableStatsSql)))( + s"user [someone] does not have [alter] privilege on [$namespace1/$tableName]") + doAs(admin, sql(rewriteTableStatsSql)) + } + } }