Skip to content

Commit ccf56b7

Browse files
authored
ref(data_export): add path param to gcs_data_sink (#101993)
This allows us to specify a prefix under the destination bucket when transferring data from another bucket. This will make it possible to use the same bucket across several event types and specify the right prefix for each.
1 parent 165d192 commit ccf56b7

File tree

4 files changed

+18
-3
lines changed

4 files changed

+18
-3
lines changed

src/sentry/profiles/data_export.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ def export_profiles_data(
1414
organization_id: int,
1515
gcp_project_id: str,
1616
destination_bucket: str,
17+
destination_prefix: str,
1718
blob_export_job_duration: timedelta = EXPORT_JOB_DURATION_DEFAULT,
1819
source_bucket: str = EXPORT_JOB_SOURCE_BUCKET,
1920
pubsub_topic_name: str | None = None,
@@ -43,6 +44,7 @@ def export_profiles_data(
4344
source_bucket=source_bucket,
4445
source_prefix=f"{organization_id}",
4546
destination_bucket=destination_bucket,
47+
destination_prefix=destination_prefix,
4648
notification_topic=pubsub_topic_name,
4749
job_description="Profiles EU Compliance Export",
4850
job_duration=blob_export_job_duration,

src/sentry/replays/data_export.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ def create_transfer_job[T](
139139
source_bucket: str,
140140
source_prefix: str,
141141
destination_bucket: str,
142+
destination_prefix: str,
142143
job_description: str,
143144
job_duration: timedelta,
144145
do_create_transfer_job: Callable[[CreateTransferJobRequest], T],
@@ -164,6 +165,7 @@ def create_transfer_job[T](
164165
:param destination_bucket:
165166
:param job_duration: The amount of time the job should take to complete. Longer runs put less
166167
pressure on our buckets.
168+
:param destination_prefix:
167169
:param notification_topic: Specifying a topic will enable automatic run retries on failure.
168170
:param do_create_transfer_job: Injected function which creates the transfer-job.
169171
:param get_current_datetime: Injected function which computes the current datetime.
@@ -177,7 +179,7 @@ def create_transfer_job[T](
177179
status=storage_transfer_v1.TransferJob.Status.ENABLED,
178180
transfer_spec=TransferSpec(
179181
gcs_data_source=GcsData(bucket_name=source_bucket, path=source_prefix),
180-
gcs_data_sink=GcsData(bucket_name=destination_bucket),
182+
gcs_data_sink=GcsData(bucket_name=destination_bucket, path=destination_prefix),
181183
),
182184
schedule=Schedule(
183185
schedule_start_date=date_pb2.Date(
@@ -570,6 +572,7 @@ def export_replay_blob_data[T](
570572
project_id: int,
571573
gcp_project_id: str,
572574
destination_bucket: str,
575+
destination_prefix: str,
573576
job_duration: timedelta,
574577
do_create_transfer_job: Callable[[CreateTransferJobRequest], T],
575578
pubsub_topic_name: str | None = None,
@@ -589,6 +592,7 @@ def export_replay_blob_data[T](
589592
source_bucket=source_bucket,
590593
source_prefix=f"{retention_days}/{project_id}",
591594
destination_bucket=destination_bucket,
595+
destination_prefix=destination_prefix,
592596
notification_topic=pubsub_topic_name,
593597
job_description="Session Replay EU Compliance Export",
594598
job_duration=job_duration,
@@ -600,6 +604,7 @@ def export_replay_data(
600604
organization_id: int,
601605
gcp_project_id: str,
602606
destination_bucket: str,
607+
destination_prefix: str,
603608
blob_export_job_duration: timedelta = EXPORT_JOB_DURATION_DEFAULT,
604609
database_rows_per_page: int = EXPORT_QUERY_ROWS_PER_PAGE,
605610
database_pages_per_task: int = EXPORT_QUERY_PAGES_PER_TASK,
@@ -647,6 +652,7 @@ def export_replay_data(
647652
project_id=project.id,
648653
gcp_project_id=gcp_project_id,
649654
destination_bucket=destination_bucket,
655+
destination_prefix=destination_prefix,
650656
pubsub_topic_name=pubsub_topic_name,
651657
source_bucket=source_bucket,
652658
job_duration=blob_export_job_duration,

tests/sentry/replays/test_data_export.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def test_replay_data_export(default_organization, default_project, replay_store)
5252
organization_id=default_organization.id,
5353
gcp_project_id="1",
5454
destination_bucket="destination",
55+
destination_prefix="destination_prefix/",
5556
database_rows_per_page=1,
5657
)
5758
assert create_job.called
@@ -82,6 +83,7 @@ def test_replay_data_export_invalid_organization(default_project, replay_store)
8283
organization_id=1,
8384
gcp_project_id="1",
8485
destination_bucket="destination",
86+
destination_prefix="destination_prefix/",
8587
database_rows_per_page=1,
8688
)
8789
assert not create_job.called
@@ -107,6 +109,7 @@ def test_replay_data_export_no_replay_projects( # type: ignore[no-untyped-def]
107109
organization_id=default_organization.id,
108110
gcp_project_id="1",
109111
destination_bucket="destination",
112+
destination_prefix="destination_prefix/",
110113
database_rows_per_page=1,
111114
)
112115
assert not create_job.called
@@ -133,6 +136,7 @@ def test_replay_data_export_no_replay_data( # type: ignore[no-untyped-def]
133136
organization_id=default_organization.id,
134137
gcp_project_id="1",
135138
destination_bucket="destination",
139+
destination_prefix="destination_prefix/",
136140
database_rows_per_page=1,
137141
)
138142

tests/sentry/replays/unit/test_data_export.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ def test_export_blob_data() -> None:
3737
source_bucket=bucket_name,
3838
source_prefix=bucket_prefix,
3939
destination_bucket="b",
40+
destination_prefix="destination_prefix/",
4041
notification_topic=pubsub_topic,
4142
job_description=job_description,
4243
job_duration=job_duration,
@@ -52,7 +53,7 @@ def test_export_blob_data() -> None:
5253
status=storage_transfer_v1.TransferJob.Status.ENABLED,
5354
transfer_spec=TransferSpec(
5455
gcs_data_source=GcsData(bucket_name=bucket_name, path=bucket_prefix),
55-
gcs_data_sink=GcsData(bucket_name="b"),
56+
gcs_data_sink=GcsData(bucket_name="b", path="destination_prefix/"),
5657
),
5758
schedule=Schedule(
5859
schedule_start_date=date_pb2.Date(
@@ -97,7 +98,9 @@ def test_retry_export_blob_data() -> None:
9798

9899
def test_export_replay_blob_data() -> None:
99100
jobs = []
100-
export_replay_blob_data(1, "1", "test", timedelta(days=1), lambda job: jobs.append(job))
101+
export_replay_blob_data(
102+
1, "1", "test", "dest_prefix/", timedelta(days=1), lambda job: jobs.append(job)
103+
)
101104

102105
# Assert a job is created for each retention-period.
103106
assert len(jobs) == 3

0 commit comments

Comments
 (0)