Skip to content

Commit c55ca25

Browse files
committed
[Python] Fix WriteToBigQuery transform using CopyJob does not work with WRITE_TRUNCATE write disposition (#34247)
* It only truncates the first table, but originally didn't take care of identical table-ids but from different dataset-id, or project-id.
1 parent 9039608 commit c55ca25

File tree

2 files changed

+208
-28
lines changed

2 files changed

+208
-28
lines changed

sdks/python/apache_beam/io/gcp/bigquery_file_loads.py

Lines changed: 47 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -541,45 +541,27 @@ def process_one(self, element, job_name_prefix):
541541
copy_from_reference.projectId = vp.RuntimeValueProvider.get_value(
542542
'project', str, '') or self.project
543543

544-
copy_job_name = '%s_%s' % (
545-
job_name_prefix,
546-
_bq_uuid(
547-
'%s:%s.%s' % (
548-
copy_from_reference.projectId,
549-
copy_from_reference.datasetId,
550-
copy_from_reference.tableId)))
551-
552544
_LOGGER.info(
553545
"Triggering copy job from %s to %s",
554546
copy_from_reference,
555547
copy_to_reference)
556-
if copy_to_reference.tableId not in self._observed_tables:
557-
# When the write_disposition for a job is WRITE_TRUNCATE,
558-
# multiple copy jobs to the same destination can stump on
559-
# each other, truncate data, and write to the BQ table over and
560-
# over.
561-
# Thus, the first copy job runs with the user's write_disposition,
562-
# but afterwards, all jobs must always WRITE_APPEND to the table.
563-
# If they do not, subsequent copy jobs will clear out data appended
564-
# by previous jobs.
565-
write_disposition = self.write_disposition
566-
wait_for_job = True
567-
self._observed_tables.add(copy_to_reference.tableId)
568-
Lineage.sinks().add(
569-
'bigquery',
570-
copy_to_reference.projectId,
571-
copy_to_reference.datasetId,
572-
copy_to_reference.tableId)
573-
else:
574-
wait_for_job = False
575-
write_disposition = 'WRITE_APPEND'
548+
549+
wait_for_job, write_disposition = (
550+
self._determine_write_disposition(copy_to_reference))
576551

577552
if not self.bq_io_metadata:
578553
self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
579554

580555
project_id = (
581556
copy_to_reference.projectId
582557
if self.load_job_project_id is None else self.load_job_project_id)
558+
copy_job_name = '%s_%s' % (
559+
job_name_prefix,
560+
_bq_uuid(
561+
'%s:%s.%s' % (
562+
copy_from_reference.projectId,
563+
copy_from_reference.datasetId,
564+
copy_from_reference.tableId)))
583565
job_reference = self.bq_wrapper._insert_copy_job(
584566
project_id,
585567
copy_job_name,
@@ -594,6 +576,43 @@ def process_one(self, element, job_name_prefix):
594576
self.pending_jobs.append(
595577
GlobalWindows.windowed_value((destination, job_reference)))
596578

579+
def _determine_write_disposition(self, copy_to_reference) -> tuple[bool, str]:
580+
"""
581+
Determines the write disposition for a BigQuery copy job,
582+
based on destination.
583+
584+
When the write_disposition for a job is WRITE_TRUNCATE, multiple copy jobs
585+
to the same destination can interfere with each other, truncate data, and
586+
write to the BigQuery table repeatedly. To prevent this, the first copy job
587+
runs with the user's specified write_disposition, but subsequent jobs must
588+
always use WRITE_APPEND. This ensures that subsequent copy jobs do not
589+
clear out data appended by previous jobs.
590+
591+
Args:
592+
copy_to_reference: The reference to the destination table.
593+
594+
Returns:
595+
A tuple containing a boolean indicating whether to wait for the job to
596+
complete and the write disposition to use for the job.
597+
"""
598+
full_table_ref = '%s:%s.%s' % (
599+
copy_to_reference.projectId,
600+
copy_to_reference.datasetId,
601+
copy_to_reference.tableId)
602+
if full_table_ref not in self._observed_tables:
603+
write_disposition = self.write_disposition
604+
wait_for_job = True
605+
self._observed_tables.add(full_table_ref)
606+
Lineage.sinks().add(
607+
'bigquery',
608+
copy_to_reference.projectId,
609+
copy_to_reference.datasetId,
610+
copy_to_reference.tableId)
611+
else:
612+
wait_for_job = False
613+
write_disposition = 'WRITE_APPEND'
614+
return wait_for_job, write_disposition
615+
597616
def finish_bundle(self):
598617
for windowed_value in self.pending_jobs:
599618
job_ref = windowed_value.value[1]

sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import secrets
2525
import time
2626
import unittest
27+
from unittest.mock import Mock
28+
from unittest.mock import call
2729

2830
import mock
2931
import pytest
@@ -39,6 +41,7 @@
3941
from apache_beam.io.gcp import bigquery
4042
from apache_beam.io.gcp import bigquery_tools
4143
from apache_beam.io.gcp.bigquery import BigQueryDisposition
44+
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
4245
from apache_beam.io.gcp.internal.clients import bigquery as bigquery_api
4346
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
4447
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultStreamingMatcher
@@ -820,6 +823,164 @@ def test_multiple_partition_files_write_dispositions(
820823
# TriggerCopyJob only processes once
821824
self.assertEqual(mock_call_process.call_count, 1)
822825

826+
@mock.patch(
827+
'apache_beam.io.gcp.bigquery_tools.BigQueryWrapper.wait_for_bq_job')
828+
@mock.patch(
829+
'apache_beam.io.gcp.bigquery_tools.BigQueryWrapper._insert_copy_job')
830+
@mock.patch(
831+
'apache_beam.io.gcp.bigquery_tools.BigQueryWrapper._start_job',
832+
wraps=BigQueryWrapper._start_job)
833+
def test_multiple_identical_destinations_on_write_truncate(
834+
self, mock_perform_start_job, mock_insert_copy_job, mock_wait_for_bq_job):
835+
"""
836+
Test that multiple identical table names,
837+
but under different datasets are handled correctly.
838+
This essentially means that the `write_disposition` is set
839+
to `WRITE_TRUNCATE` for the first job and `WRITE_APPEND` for the rest.
840+
841+
Previously this was not the case and all jobs were set to `WRITE_APPEND`
842+
from the 2nd table that was named identically with at least
843+
one previous table - but from different dataset.
844+
"""
845+
def dynamic_destination_resolver(element, *side_inputs):
846+
"""A dynamic destination resolver that returns a destination strictly the
847+
same table, but different dataset."""
848+
if element['name'] == 'beam':
849+
return 'project1:dataset1.table1'
850+
elif element['name'] == 'flink':
851+
return 'project1:dataset2.table1'
852+
853+
return 'project1:dataset3.table1'
854+
855+
job_reference = bigquery_api.JobReference()
856+
job_reference.projectId = 'project1'
857+
job_reference.jobId = 'job_name1'
858+
result_job = mock.Mock()
859+
result_job.jobReference = job_reference
860+
861+
mock_job = mock.Mock()
862+
mock_job.status.state = 'DONE'
863+
mock_job.status.errorResult = None
864+
mock_job.jobReference = job_reference
865+
866+
bq_client = mock.Mock()
867+
bq_client.jobs.Get.return_value = mock_job
868+
869+
bq_client.jobs.Insert.return_value = result_job
870+
bq_client.tables.Delete.return_value = None
871+
872+
m = bigquery_tools.BigQueryWrapper(bq_client)
873+
m.wait_for_bq_job = mock.Mock()
874+
m.wait_for_bq_job.return_value = None
875+
876+
mock_jobs = [
877+
Mock(jobReference=bigquery_api.JobReference(jobId=f'job_name{i}'))
878+
# Order matters in a sense to prove that jobs with different ids
879+
# (`2` & `3`) are run with `WRITE_APPEND` without this current fix.
880+
for i in [1, 2, 1, 3, 1]
881+
]
882+
mock_perform_start_job.side_effect = mock_jobs
883+
884+
# For now we don't care about the return value.
885+
mock_insert_copy_job.return_value = None
886+
887+
with TestPipeline('DirectRunner') as p:
888+
_ = (
889+
p
890+
| beam.Create([
891+
{
892+
'name': 'beam', 'language': 'java'
893+
},
894+
{
895+
'name': 'flink', 'language': 'java'
896+
},
897+
{
898+
'name': 'beam', 'language': 'java'
899+
},
900+
{
901+
'name': 'spark', 'language': 'java'
902+
},
903+
{
904+
'name': 'beam', 'language': 'java'
905+
},
906+
],
907+
reshuffle=False)
908+
| bqfl.BigQueryBatchFileLoads(
909+
dynamic_destination_resolver,
910+
custom_gcs_temp_location=self._new_tempdir(),
911+
test_client=bq_client,
912+
validate=False,
913+
temp_file_format=bigquery_tools.FileFormat.JSON,
914+
max_file_size=45,
915+
max_partition_size=80,
916+
max_files_per_partition=3,
917+
write_disposition=BigQueryDisposition.WRITE_TRUNCATE))
918+
919+
from apache_beam.io.gcp.internal.clients.bigquery import TableReference
920+
mock_insert_copy_job.assert_has_calls(
921+
[
922+
call(
923+
'project1',
924+
mock.ANY,
925+
TableReference(
926+
datasetId='dataset1',
927+
projectId='project1',
928+
tableId='job_name1'),
929+
TableReference(
930+
datasetId='dataset1',
931+
projectId='project1',
932+
tableId='table1'),
933+
create_disposition=None,
934+
write_disposition='WRITE_TRUNCATE',
935+
job_labels={'step_name': 'bigquerybatchfileloads'}),
936+
call(
937+
'project1',
938+
mock.ANY,
939+
TableReference(
940+
datasetId='dataset1',
941+
projectId='project1',
942+
tableId='job_name2'),
943+
TableReference(
944+
datasetId='dataset1',
945+
projectId='project1',
946+
tableId='table1'),
947+
create_disposition=None,
948+
write_disposition='WRITE_APPEND',
949+
job_labels={'step_name': 'bigquerybatchfileloads'}),
950+
call(
951+
'project1',
952+
mock.ANY,
953+
TableReference(
954+
datasetId='dataset2',
955+
projectId='project1',
956+
tableId='job_name1'),
957+
TableReference(
958+
datasetId='dataset2',
959+
projectId='project1',
960+
tableId='table1'),
961+
create_disposition=None,
962+
# Previously this was `WRITE_APPEND`.
963+
write_disposition='WRITE_TRUNCATE',
964+
job_labels={'step_name': 'bigquerybatchfileloads'}),
965+
call(
966+
'project1',
967+
mock.ANY,
968+
TableReference(
969+
datasetId='dataset3',
970+
projectId='project1',
971+
tableId='job_name3'),
972+
TableReference(
973+
datasetId='dataset3',
974+
projectId='project1',
975+
tableId='table1'),
976+
create_disposition=None,
977+
# Previously this was `WRITE_APPEND`.
978+
write_disposition='WRITE_TRUNCATE',
979+
job_labels={'step_name': 'bigquerybatchfileloads'}),
980+
],
981+
any_order=True)
982+
self.assertEqual(4, mock_insert_copy_job.call_count)
983+
823984
@parameterized.expand([
824985
param(is_streaming=False, with_auto_sharding=False, compat_version=None),
825986
param(is_streaming=True, with_auto_sharding=False, compat_version=None),

0 commit comments

Comments
 (0)