diff --git a/src/sentry/db/deletion.py b/src/sentry/db/deletion.py index 0fb629e5af8f68..49822953867b5b 100644 --- a/src/sentry/db/deletion.py +++ b/src/sentry/db/deletion.py @@ -11,9 +11,12 @@ class BulkDeleteQuery: - def __init__(self, model, project_id=None, dtfield=None, days=None, order_by=None): + def __init__( + self, model, project_id=None, organization_id=None, dtfield=None, days=None, order_by=None + ): self.model = model self.project_id = int(project_id) if project_id else None + self.organization_id = int(organization_id) if organization_id else None self.dtfield = dtfield self.days = int(days) if days is not None else None self.order_by = order_by @@ -32,6 +35,8 @@ def execute(self, chunk_size=10000): ) if self.project_id: where.append(f"project_id = {self.project_id}") + if self.organization_id: + where.append(f"organization_id = {self.organization_id}") if where: where_clause = "where {}".format(" and ".join(where)) @@ -101,6 +106,8 @@ def iterator(self, chunk_size=100, batch_size=100000) -> Generator[tuple[int, .. if self.project_id: where.append(("project_id = %s", [self.project_id])) + if self.organization_id: + where.append(("organization_id = %s", [self.organization_id])) if self.order_by[0] == "-": direction = "desc" diff --git a/src/sentry/runner/commands/cleanup.py b/src/sentry/runner/commands/cleanup.py index 7e4bc6ce989b94..448307bbb38df5 100644 --- a/src/sentry/runner/commands/cleanup.py +++ b/src/sentry/runner/commands/cleanup.py @@ -33,6 +33,17 @@ def get_project(value: str) -> int | None: return None +def get_organization(value: str) -> int | None: + from sentry.models.organization import Organization + + try: + if value.isdigit(): + return int(value) + return Organization.objects.get(slug=value).id + except Organization.DoesNotExist: + return None + + # We need a unique value to indicate when to stop multiprocessing queue # an identity on an object() isn't guaranteed to work between parent # and child proc @@ -104,6 +115,7 @@ def multiprocess_worker(task_queue: _WorkQueue) -> None: @click.command() @click.option("--days", default=30, show_default=True, help="Numbers of days to truncate on.") @click.option("--project", help="Limit truncation to only entries from project.") +@click.option("--organization", help="Limit truncation to only entries from organization.") @click.option( "--concurrency", type=int, @@ -127,6 +139,7 @@ def multiprocess_worker(task_queue: _WorkQueue) -> None: def cleanup( days: int, project: str | None, + organization: str | None, concurrency: int, silent: bool, model: tuple[str, ...], @@ -137,9 +150,9 @@ def cleanup( All data that is older than `--days` will be deleted. The default for this is 30 days. In the default setting all projects will be truncated - but if you have a specific project you want to limit this to this can be - done with the `--project` flag which accepts a project ID or a string - with the form `org/project` where both are slugs. + but if you have a specific project or organization you want to limit this to, + this can be done with the `--project` or `--organization` flags respectively, + which accepts a project/organization ID or a string with the form `org/project` where both are slugs. """ if concurrency < 1: click.echo("Error: Minimum concurrency is 1", err=True) @@ -209,10 +222,13 @@ def is_filtered(model: type[Model]) -> bool: exported_data(is_filtered, silent) project_id = None + organization_id = None if SiloMode.get_current_mode() != SiloMode.CONTROL: if project: remove_cross_project_models(deletes) project_id = get_project_id_or_fail(project) + elif organization: + organization_id = get_organization_id_or_fail(organization) else: remove_old_nodestore_values(days) @@ -268,6 +284,36 @@ def is_filtered(model: type[Model]) -> bool: for chunk in q.iterator(chunk_size=100): task_queue.put((imp, chunk)) + task_queue.join() + + organization_deletion_query, to_delete_by_organization = prepare_deletes_by_organization( + organization, organization_id, is_filtered + ) + + if organization_deletion_query is not None and len(to_delete_by_organization): + debug_output("Running bulk deletes in DELETES_BY_ORGANIZATION") + for organization_id_for_deletion in RangeQuerySetWrapper( + organization_deletion_query.values_list("id", flat=True), + result_value_getter=lambda item: item, + ): + for model_tp, dtfield, order_by in to_delete_by_organization: + debug_output( + f"Removing {model_tp.__name__} for days={days} organization={organization_id_for_deletion}" + ) + + imp = ".".join((model_tp.__module__, model_tp.__name__)) + + q = BulkDeleteQuery( + model=model_tp, + dtfield=dtfield, + days=days, + organization_id=organization_id_for_deletion, + order_by=order_by, + ) + + for chunk in q.iterator(chunk_size=100): + task_queue.put((imp, chunk)) + task_queue.join() remove_file_blobs(is_filtered, silent) @@ -374,7 +420,7 @@ def models_which_use_deletions_code_path() -> list[tuple[type[Model], str, str]] def remove_cross_project_models( - deletes: list[tuple[type[Model], str, str]] + deletes: list[tuple[type[Model], str, str]], ) -> list[tuple[type[Model], str, str]]: from sentry.models.artifactbundle import ArtifactBundle @@ -392,6 +438,15 @@ def get_project_id_or_fail(project: str) -> int: return project_id +def get_organization_id_or_fail(organization: str) -> int: + click.echo("Bulk NodeStore deletion not available for organization selection", err=True) + organization_id = get_organization(organization) + if organization_id is None: + click.echo("Error: Organization not found", err=True) + raise click.Abort() + return organization_id + + def remove_old_nodestore_values(days: int) -> None: from sentry import nodestore @@ -486,6 +541,37 @@ def prepare_deletes_by_project( return project_deletion_query, to_delete_by_project +def prepare_deletes_by_organization( + organization: str | None, + organization_id: int | None, + is_filtered: Callable[[type[Model]], bool], +) -> tuple[QuerySet[Any] | None, list[tuple[Any, str, str]]]: + from sentry.constants import ObjectStatus + from sentry.models.organization import Organization + from sentry.models.releasefile import ReleaseFile + + # Deletions that we run per organization. In some cases we can't use an index on just the date + # column, so as an alternative we use `(organization_id, )` instead + DELETES_BY_ORGANIZATION = [ + (ReleaseFile, "date_accessed", "date_accessed"), + ] + organization_deletion_query = None + to_delete_by_organization = [] + if SiloMode.get_current_mode() != SiloMode.CONTROL: + debug_output("Preparing DELETES_BY_ORGANIZATION context") + organization_deletion_query = Organization.objects.filter(status=ObjectStatus.ACTIVE) + if organization: + organization_deletion_query = Organization.objects.filter(id=organization_id) + + for model_tp_tup in DELETES_BY_ORGANIZATION: + if is_filtered(model_tp_tup[0]): + debug_output(f">> Skipping {model_tp_tup[0].__name__}") + else: + to_delete_by_organization.append(model_tp_tup) + + return organization_deletion_query, to_delete_by_organization + + def remove_file_blobs(is_filtered: Callable[[type[Model]], bool], silent: bool) -> None: from sentry.models.file import FileBlob