diff --git a/src/sentry/profiles/data_export.py b/src/sentry/profiles/data_export.py index 811e857da40da4..b8f2a93e6ffb3f 100644 --- a/src/sentry/profiles/data_export.py +++ b/src/sentry/profiles/data_export.py @@ -14,6 +14,7 @@ def export_profiles_data( organization_id: int, gcp_project_id: str, destination_bucket: str, + destination_prefix: str, blob_export_job_duration: timedelta = EXPORT_JOB_DURATION_DEFAULT, source_bucket: str = EXPORT_JOB_SOURCE_BUCKET, pubsub_topic_name: str | None = None, @@ -43,6 +44,7 @@ def export_profiles_data( source_bucket=source_bucket, source_prefix=f"{organization_id}", destination_bucket=destination_bucket, + destination_prefix=destination_prefix, notification_topic=pubsub_topic_name, job_description="Profiles EU Compliance Export", job_duration=blob_export_job_duration, diff --git a/src/sentry/replays/data_export.py b/src/sentry/replays/data_export.py index a7cca6b571d31a..0e900d358994ad 100644 --- a/src/sentry/replays/data_export.py +++ b/src/sentry/replays/data_export.py @@ -139,6 +139,7 @@ def create_transfer_job[T]( source_bucket: str, source_prefix: str, destination_bucket: str, + destination_prefix: str, job_description: str, job_duration: timedelta, do_create_transfer_job: Callable[[CreateTransferJobRequest], T], @@ -164,6 +165,7 @@ def create_transfer_job[T]( :param destination_bucket: :param job_duration: The amount of time the job should take to complete. Longer runs put less pressure on our buckets. + :param destination_prefix: :param notification_topic: Specifying a topic will enable automatic run retries on failure. :param do_create_transfer_job: Injected function which creates the transfer-job. :param get_current_datetime: Injected function which computes the current datetime. @@ -177,7 +179,7 @@ def create_transfer_job[T]( status=storage_transfer_v1.TransferJob.Status.ENABLED, transfer_spec=TransferSpec( gcs_data_source=GcsData(bucket_name=source_bucket, path=source_prefix), - gcs_data_sink=GcsData(bucket_name=destination_bucket), + gcs_data_sink=GcsData(bucket_name=destination_bucket, path=destination_prefix), ), schedule=Schedule( schedule_start_date=date_pb2.Date( @@ -570,6 +572,7 @@ def export_replay_blob_data[T]( project_id: int, gcp_project_id: str, destination_bucket: str, + destination_prefix: str, job_duration: timedelta, do_create_transfer_job: Callable[[CreateTransferJobRequest], T], pubsub_topic_name: str | None = None, @@ -589,6 +592,7 @@ def export_replay_blob_data[T]( source_bucket=source_bucket, source_prefix=f"{retention_days}/{project_id}", destination_bucket=destination_bucket, + destination_prefix=destination_prefix, notification_topic=pubsub_topic_name, job_description="Session Replay EU Compliance Export", job_duration=job_duration, @@ -600,6 +604,7 @@ def export_replay_data( organization_id: int, gcp_project_id: str, destination_bucket: str, + destination_prefix: str, blob_export_job_duration: timedelta = EXPORT_JOB_DURATION_DEFAULT, database_rows_per_page: int = EXPORT_QUERY_ROWS_PER_PAGE, database_pages_per_task: int = EXPORT_QUERY_PAGES_PER_TASK, @@ -647,6 +652,7 @@ def export_replay_data( project_id=project.id, gcp_project_id=gcp_project_id, destination_bucket=destination_bucket, + destination_prefix=destination_prefix, pubsub_topic_name=pubsub_topic_name, source_bucket=source_bucket, job_duration=blob_export_job_duration, diff --git a/tests/sentry/replays/test_data_export.py b/tests/sentry/replays/test_data_export.py index c39b714e21e0a4..1ad1b4da801857 100644 --- a/tests/sentry/replays/test_data_export.py +++ b/tests/sentry/replays/test_data_export.py @@ -52,6 +52,7 @@ def test_replay_data_export(default_organization, default_project, replay_store) organization_id=default_organization.id, gcp_project_id="1", destination_bucket="destination", + destination_prefix="destination_prefix/", database_rows_per_page=1, ) assert create_job.called @@ -82,6 +83,7 @@ def test_replay_data_export_invalid_organization(default_project, replay_store) organization_id=1, gcp_project_id="1", destination_bucket="destination", + destination_prefix="destination_prefix/", database_rows_per_page=1, ) assert not create_job.called @@ -107,6 +109,7 @@ def test_replay_data_export_no_replay_projects( # type: ignore[no-untyped-def] organization_id=default_organization.id, gcp_project_id="1", destination_bucket="destination", + destination_prefix="destination_prefix/", database_rows_per_page=1, ) assert not create_job.called @@ -133,6 +136,7 @@ def test_replay_data_export_no_replay_data( # type: ignore[no-untyped-def] organization_id=default_organization.id, gcp_project_id="1", destination_bucket="destination", + destination_prefix="destination_prefix/", database_rows_per_page=1, ) diff --git a/tests/sentry/replays/unit/test_data_export.py b/tests/sentry/replays/unit/test_data_export.py index 5c769e3aabf163..cfc6f78d766fc0 100644 --- a/tests/sentry/replays/unit/test_data_export.py +++ b/tests/sentry/replays/unit/test_data_export.py @@ -37,6 +37,7 @@ def test_export_blob_data() -> None: source_bucket=bucket_name, source_prefix=bucket_prefix, destination_bucket="b", + destination_prefix="destination_prefix/", notification_topic=pubsub_topic, job_description=job_description, job_duration=job_duration, @@ -52,7 +53,7 @@ def test_export_blob_data() -> None: status=storage_transfer_v1.TransferJob.Status.ENABLED, transfer_spec=TransferSpec( gcs_data_source=GcsData(bucket_name=bucket_name, path=bucket_prefix), - gcs_data_sink=GcsData(bucket_name="b"), + gcs_data_sink=GcsData(bucket_name="b", path="destination_prefix/"), ), schedule=Schedule( schedule_start_date=date_pb2.Date( @@ -97,7 +98,9 @@ def test_retry_export_blob_data() -> None: def test_export_replay_blob_data() -> None: jobs = [] - export_replay_blob_data(1, "1", "test", timedelta(days=1), lambda job: jobs.append(job)) + export_replay_blob_data( + 1, "1", "test", "dest_prefix/", timedelta(days=1), lambda job: jobs.append(job) + ) # Assert a job is created for each retention-period. assert len(jobs) == 3