Skip to content

Commit 6aa4a6b

Browse files
authored
fix(meta): vacuum dropped table cleans policy refs (#19239)
1 parent 03adf8b commit 6aa4a6b

File tree

2 files changed

+223
-1
lines changed

2 files changed

+223
-1
lines changed

src/meta/api/src/garbage_collection_api.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,13 @@ use databend_common_base::vec_ext::VecExt;
2222
use databend_common_meta_app::app_error::AppError;
2323
use databend_common_meta_app::app_error::CleanDbIdTableNamesFailed;
2424
use databend_common_meta_app::app_error::MarkDatabaseMetaAsGCInProgressFailed;
25+
use databend_common_meta_app::data_mask::MaskPolicyIdTableId;
26+
use databend_common_meta_app::data_mask::MaskPolicyTableIdIdent;
2527
use databend_common_meta_app::principal::AutoIncrementKey;
2628
use databend_common_meta_app::principal::OwnershipObject;
2729
use databend_common_meta_app::principal::TenantOwnershipObjectIdent;
30+
use databend_common_meta_app::row_access_policy::RowAccessPolicyTableIdIdent;
31+
use databend_common_meta_app::row_access_policy::row_access_policy_table_id_ident::RowAccessPolicyIdTableId;
2832
use databend_common_meta_app::schema::AutoIncrementStorageIdent;
2933
use databend_common_meta_app::schema::DBIdTableName;
3034
use databend_common_meta_app::schema::DatabaseId;
@@ -740,7 +744,6 @@ async fn remove_data_for_dropped_table(
740744
// warn!("{}", err);
741745
// return Ok(Err(err));
742746
// }
743-
744747
txn_delete_exact(txn, table_id, seq_meta.seq);
745748

746749
// Get id -> name mapping
@@ -823,6 +826,42 @@ async fn remove_data_for_dropped_table(
823826
}
824827
}
825828

829+
let tb_meta = &seq_meta.data;
830+
831+
// Clean up policy references for the dropped table.
832+
// These records may have been orphaned if the table was dropped via DROP DATABASE.
833+
834+
// Delete masking policy references
835+
{
836+
let policy_ids: HashSet<u64> = tb_meta
837+
.column_mask_policy_columns_ids
838+
.values()
839+
.map(|policy_map| policy_map.policy_id)
840+
.collect();
841+
842+
txn.if_then.extend(policy_ids.into_iter().map(|policy_id| {
843+
txn_op_del(&MaskPolicyTableIdIdent::new_generic(
844+
tenant.clone(),
845+
MaskPolicyIdTableId {
846+
policy_id,
847+
table_id: table_id.table_id,
848+
},
849+
))
850+
}));
851+
}
852+
853+
// Delete row access policy reference
854+
if let Some(policy_map) = &tb_meta.row_access_policy_columns_ids {
855+
txn.if_then
856+
.push(txn_op_del(&RowAccessPolicyTableIdIdent::new_generic(
857+
tenant.clone(),
858+
RowAccessPolicyIdTableId {
859+
policy_id: policy_map.policy_id,
860+
table_id: table_id.table_id,
861+
},
862+
)));
863+
}
864+
826865
Ok(Ok(()))
827866
}
828867

src/query/ee/tests/it/storages/fuse/operations/vacuum.rs

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1043,3 +1043,186 @@ async fn test_vacuum_dropped_table_clean_tag_refs() -> Result<()> {
10431043

10441044
Ok(())
10451045
}
1046+
1047+
#[tokio::test(flavor = "multi_thread")]
1048+
async fn test_vacuum_dropped_table_clean_policies() -> Result<()> {
1049+
use databend_common_meta_app::data_mask::MaskPolicyIdTableId;
1050+
use databend_common_meta_app::data_mask::MaskPolicyTableIdIdent;
1051+
use databend_common_meta_app::row_access_policy::RowAccessPolicyTableIdIdent;
1052+
use databend_common_meta_app::row_access_policy::row_access_policy_table_id_ident::RowAccessPolicyIdTableId;
1053+
1054+
// 1. Prepare local meta service
1055+
let meta = new_local_meta().await;
1056+
let endpoints = meta.endpoints.clone();
1057+
1058+
// Modify config to use local meta store
1059+
let mut ee_setup = EESetup::new();
1060+
let config = ee_setup.config_mut();
1061+
config.meta.endpoints = endpoints.clone();
1062+
1063+
// 2. Setup test fixture by using local meta store
1064+
let fixture = TestFixture::setup_with_custom(ee_setup).await?;
1065+
1066+
// Adjust retention period to 0, so that dropped tables will be vacuumed immediately
1067+
let session = fixture.default_session();
1068+
session.get_settings().set_data_retention_time_in_days(0)?;
1069+
1070+
// Enable row access policy feature
1071+
fixture
1072+
.execute_command("set global enable_experimental_row_access_policy = 1")
1073+
.await?;
1074+
1075+
// 3. Prepare test db and table
1076+
let ctx = fixture.new_query_ctx().await?;
1077+
let db_name = "test_vacuum_clean_policies";
1078+
let tbl_name = "t";
1079+
fixture
1080+
.execute_command(format!("create database {db_name}").as_str())
1081+
.await?;
1082+
fixture
1083+
.execute_command(
1084+
format!("create table {db_name}.{tbl_name} (id int, name string, email string)")
1085+
.as_str(),
1086+
)
1087+
.await?;
1088+
1089+
// 4. Create masking policies and row access policy
1090+
fixture
1091+
.execute_command(
1092+
"CREATE MASKING POLICY email_mask AS (val STRING) RETURNS STRING -> CASE WHEN current_role() = 'admin' THEN val ELSE '***' END",
1093+
)
1094+
.await?;
1095+
1096+
fixture
1097+
.execute_command(
1098+
"CREATE MASKING POLICY name_mask AS (val STRING) RETURNS STRING -> CASE WHEN current_role() = 'admin' THEN val ELSE 'REDACTED' END",
1099+
)
1100+
.await?;
1101+
1102+
fixture
1103+
.execute_command(
1104+
"CREATE ROW ACCESS POLICY row_filter AS (id int) RETURNS boolean -> current_role() = 'admin' OR id > 0",
1105+
)
1106+
.await?;
1107+
1108+
// 5. Apply policies to table columns
1109+
fixture
1110+
.execute_command(
1111+
format!(
1112+
"ALTER TABLE {db_name}.{tbl_name} MODIFY COLUMN email SET MASKING POLICY email_mask"
1113+
)
1114+
.as_str(),
1115+
)
1116+
.await?;
1117+
1118+
fixture
1119+
.execute_command(
1120+
format!(
1121+
"ALTER TABLE {db_name}.{tbl_name} MODIFY COLUMN name SET MASKING POLICY name_mask"
1122+
)
1123+
.as_str(),
1124+
)
1125+
.await?;
1126+
1127+
fixture
1128+
.execute_command(
1129+
format!("ALTER TABLE {db_name}.{tbl_name} ADD ROW ACCESS POLICY row_filter ON (id)")
1130+
.as_str(),
1131+
)
1132+
.await?;
1133+
1134+
// 6. Get table ID and verify policy references exist
1135+
let tenant = ctx.get_tenant();
1136+
let table = ctx
1137+
.get_default_catalog()?
1138+
.get_table(&tenant, db_name, tbl_name)
1139+
.await?;
1140+
let table_id = table.get_id();
1141+
1142+
// Get policy IDs from table metadata
1143+
let table_meta = table.get_table_info().meta.clone();
1144+
1145+
// Verify masking policy references exist
1146+
assert!(
1147+
!table_meta.column_mask_policy_columns_ids.is_empty(),
1148+
"masking policy references should exist after applying policies"
1149+
);
1150+
1151+
// Verify row access policy reference exists
1152+
assert!(
1153+
table_meta.row_access_policy_columns_ids.is_some(),
1154+
"row access policy reference should exist after applying policy"
1155+
);
1156+
1157+
let mask_policy_ids: Vec<u64> = table_meta
1158+
.column_mask_policy_columns_ids
1159+
.values()
1160+
.map(|policy_map| policy_map.policy_id)
1161+
.collect();
1162+
1163+
let row_policy_id = table_meta
1164+
.row_access_policy_columns_ids
1165+
.as_ref()
1166+
.unwrap()
1167+
.policy_id;
1168+
1169+
// Verify policy references in meta store
1170+
for policy_id in &mask_policy_ids {
1171+
let mask_policy_key =
1172+
MaskPolicyTableIdIdent::new_generic(tenant.clone(), MaskPolicyIdTableId {
1173+
policy_id: *policy_id,
1174+
table_id,
1175+
});
1176+
let v = meta.get_pb(&mask_policy_key).await?;
1177+
assert!(
1178+
v.is_some(),
1179+
"masking policy reference should exist in meta store"
1180+
);
1181+
}
1182+
1183+
let row_policy_key =
1184+
RowAccessPolicyTableIdIdent::new_generic(tenant.clone(), RowAccessPolicyIdTableId {
1185+
policy_id: row_policy_id,
1186+
table_id,
1187+
});
1188+
let v = meta.get_pb(&row_policy_key).await?;
1189+
assert!(
1190+
v.is_some(),
1191+
"row access policy reference should exist in meta store"
1192+
);
1193+
1194+
// 7. Drop database (this will mark both database and table as dropped)
1195+
fixture
1196+
.execute_command(format!("drop database {db_name}").as_str())
1197+
.await?;
1198+
1199+
// 8. Vacuum dropped tables
1200+
fixture.execute_command("vacuum drop table").await?;
1201+
1202+
// 9. Ensure that policy references are cleaned up
1203+
for policy_id in &mask_policy_ids {
1204+
let mask_policy_key =
1205+
MaskPolicyTableIdIdent::new_generic(tenant.clone(), MaskPolicyIdTableId {
1206+
policy_id: *policy_id,
1207+
table_id,
1208+
});
1209+
let v = meta.get_pb(&mask_policy_key).await?;
1210+
assert!(
1211+
v.is_none(),
1212+
"masking policy reference should be cleaned up after vacuum"
1213+
);
1214+
}
1215+
1216+
let row_policy_key =
1217+
RowAccessPolicyTableIdIdent::new_generic(tenant.clone(), RowAccessPolicyIdTableId {
1218+
policy_id: row_policy_id,
1219+
table_id,
1220+
});
1221+
let v = meta.get_pb(&row_policy_key).await?;
1222+
assert!(
1223+
v.is_none(),
1224+
"row access policy reference should be cleaned up after vacuum"
1225+
);
1226+
1227+
Ok(())
1228+
}

0 commit comments

Comments
 (0)