Skip to content

Commit 5d8b286

Browse files
committed
[Python] Fix WriteToBigQuery transform using CopyJob does not work with WRITE_TRUNCATE write disposition (#34247)
* It only truncates the first table, but originally didn't take care of identical table-ids but from different dataset-id, or project-id.
1 parent a6e2729 commit 5d8b286

File tree

2 files changed

+207
-28
lines changed

2 files changed

+207
-28
lines changed

sdks/python/apache_beam/io/gcp/bigquery_file_loads.py

Lines changed: 47 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -541,45 +541,27 @@ def process_one(self, element, job_name_prefix):
541541
copy_from_reference.projectId = vp.RuntimeValueProvider.get_value(
542542
'project', str, '') or self.project
543543

544-
copy_job_name = '%s_%s' % (
545-
job_name_prefix,
546-
_bq_uuid(
547-
'%s:%s.%s' % (
548-
copy_from_reference.projectId,
549-
copy_from_reference.datasetId,
550-
copy_from_reference.tableId)))
551-
552544
_LOGGER.info(
553545
"Triggering copy job from %s to %s",
554546
copy_from_reference,
555547
copy_to_reference)
556-
if copy_to_reference.tableId not in self._observed_tables:
557-
# When the write_disposition for a job is WRITE_TRUNCATE,
558-
# multiple copy jobs to the same destination can stump on
559-
# each other, truncate data, and write to the BQ table over and
560-
# over.
561-
# Thus, the first copy job runs with the user's write_disposition,
562-
# but afterwards, all jobs must always WRITE_APPEND to the table.
563-
# If they do not, subsequent copy jobs will clear out data appended
564-
# by previous jobs.
565-
write_disposition = self.write_disposition
566-
wait_for_job = True
567-
self._observed_tables.add(copy_to_reference.tableId)
568-
Lineage.sinks().add(
569-
'bigquery',
570-
copy_to_reference.projectId,
571-
copy_to_reference.datasetId,
572-
copy_to_reference.tableId)
573-
else:
574-
wait_for_job = False
575-
write_disposition = 'WRITE_APPEND'
548+
549+
wait_for_job, write_disposition = (
550+
self._determine_write_disposition(copy_to_reference))
576551

577552
if not self.bq_io_metadata:
578553
self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
579554

580555
project_id = (
581556
copy_to_reference.projectId
582557
if self.load_job_project_id is None else self.load_job_project_id)
558+
copy_job_name = '%s_%s' % (
559+
job_name_prefix,
560+
_bq_uuid(
561+
'%s:%s.%s' % (
562+
copy_from_reference.projectId,
563+
copy_from_reference.datasetId,
564+
copy_from_reference.tableId)))
583565
job_reference = self.bq_wrapper._insert_copy_job(
584566
project_id,
585567
copy_job_name,
@@ -594,6 +576,43 @@ def process_one(self, element, job_name_prefix):
594576
self.pending_jobs.append(
595577
GlobalWindows.windowed_value((destination, job_reference)))
596578

579+
def _determine_write_disposition(self, copy_to_reference) -> tuple[bool, str]:
580+
"""
581+
Determines the write disposition for a BigQuery copy job,
582+
based on destination.
583+
584+
When the write_disposition for a job is WRITE_TRUNCATE, multiple copy jobs
585+
to the same destination can interfere with each other, truncate data, and
586+
write to the BigQuery table repeatedly. To prevent this, the first copy job
587+
runs with the user's specified write_disposition, but subsequent jobs must
588+
always use WRITE_APPEND. This ensures that subsequent copy jobs do not
589+
clear out data appended by previous jobs.
590+
591+
Args:
592+
copy_to_reference: The reference to the destination table.
593+
594+
Returns:
595+
A tuple containing a boolean indicating whether to wait for the job to
596+
complete and the write disposition to use for the job.
597+
"""
598+
full_table_ref = '%s:%s.%s' % (
599+
copy_to_reference.projectId,
600+
copy_to_reference.datasetId,
601+
copy_to_reference.tableId)
602+
if full_table_ref not in self._observed_tables:
603+
write_disposition = self.write_disposition
604+
wait_for_job = True
605+
self._observed_tables.add(full_table_ref)
606+
Lineage.sinks().add(
607+
'bigquery',
608+
copy_to_reference.projectId,
609+
copy_to_reference.datasetId,
610+
copy_to_reference.tableId)
611+
else:
612+
wait_for_job = False
613+
write_disposition = 'WRITE_APPEND'
614+
return wait_for_job, write_disposition
615+
597616
def finish_bundle(self):
598617
for windowed_value in self.pending_jobs:
599618
job_ref = windowed_value.value[1]

sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import secrets
2525
import time
2626
import unittest
27+
from unittest.mock import Mock
28+
from unittest.mock import call
2729

2830
import mock
2931
import pytest
@@ -39,6 +41,7 @@
3941
from apache_beam.io.gcp import bigquery
4042
from apache_beam.io.gcp import bigquery_tools
4143
from apache_beam.io.gcp.bigquery import BigQueryDisposition
44+
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
4245
from apache_beam.io.gcp.internal.clients import bigquery as bigquery_api
4346
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
4447
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultStreamingMatcher
@@ -820,6 +823,163 @@ def test_multiple_partition_files_write_dispositions(
820823
# TriggerCopyJob only processes once
821824
self.assertEqual(mock_call_process.call_count, 1)
822825

826+
@mock.patch(
827+
'apache_beam.io.gcp.bigquery_tools.BigQueryWrapper.wait_for_bq_job')
828+
@mock.patch(
829+
'apache_beam.io.gcp.bigquery_tools.BigQueryWrapper._insert_copy_job')
830+
@mock.patch(
831+
'apache_beam.io.gcp.bigquery_tools.BigQueryWrapper._start_job',
832+
wraps=BigQueryWrapper._start_job)
833+
def test_multiple_identical_destinations_on_write_truncate(
834+
self, mock_perform_start_job, mock_insert_copy_job, mock_wait_for_bq_job):
835+
"""
836+
Test that multiple identical destinations are handled correctly.
837+
This essentially means that the `write_disposition` is set
838+
to `WRITE_TRUNCATE` for the first job and `WRITE_APPEND` for the rest.
839+
840+
Previously this was not the case and all jobs were set to `WRITE_APPEND`
841+
from the 2nd table that was named identically with at least
842+
one previous table - but from different dataset.
843+
"""
844+
def dynamic_destination_resolver(element, *side_inputs):
845+
"""A dynamic destination resolver that returns a destination strictly the
846+
same table, but different dataset."""
847+
if element['name'] == 'beam':
848+
return 'project1:dataset1.table1'
849+
elif element['name'] == 'flink':
850+
return 'project1:dataset2.table1'
851+
852+
return 'project1:dataset3.table1'
853+
854+
job_reference = bigquery_api.JobReference()
855+
job_reference.projectId = 'project1'
856+
job_reference.jobId = 'job_name1'
857+
result_job = mock.Mock()
858+
result_job.jobReference = job_reference
859+
860+
mock_job = mock.Mock()
861+
mock_job.status.state = 'DONE'
862+
mock_job.status.errorResult = None
863+
mock_job.jobReference = job_reference
864+
865+
bq_client = mock.Mock()
866+
bq_client.jobs.Get.return_value = mock_job
867+
868+
bq_client.jobs.Insert.return_value = result_job
869+
bq_client.tables.Delete.return_value = None
870+
871+
m = bigquery_tools.BigQueryWrapper(bq_client)
872+
m.wait_for_bq_job = mock.Mock()
873+
m.wait_for_bq_job.return_value = None
874+
875+
mock_jobs = [
876+
Mock(jobReference=bigquery_api.JobReference(jobId=f'job_name{i}'))
877+
# Order matters in a sense to prove that jobs with different ids
878+
# (`2` & `3`) are run with `WRITE_APPEND` without this current fix.
879+
for i in [1, 2, 1, 3, 1]
880+
]
881+
mock_perform_start_job.side_effect = mock_jobs
882+
883+
# For now we don't care about the return value.
884+
mock_insert_copy_job.return_value = None
885+
886+
with TestPipeline('DirectRunner') as p:
887+
_ = (
888+
p
889+
| beam.Create([
890+
{
891+
'name': 'beam', 'language': 'java'
892+
},
893+
{
894+
'name': 'flink', 'language': 'java'
895+
},
896+
{
897+
'name': 'beam', 'language': 'java'
898+
},
899+
{
900+
'name': 'spark', 'language': 'java'
901+
},
902+
{
903+
'name': 'beam', 'language': 'java'
904+
},
905+
],
906+
reshuffle=False)
907+
| bqfl.BigQueryBatchFileLoads(
908+
dynamic_destination_resolver,
909+
custom_gcs_temp_location=self._new_tempdir(),
910+
test_client=bq_client,
911+
validate=False,
912+
temp_file_format=bigquery_tools.FileFormat.JSON,
913+
max_file_size=45,
914+
max_partition_size=80,
915+
max_files_per_partition=3,
916+
write_disposition=BigQueryDisposition.WRITE_TRUNCATE))
917+
918+
from apache_beam.io.gcp.internal.clients.bigquery import TableReference
919+
mock_insert_copy_job.assert_has_calls(
920+
[
921+
call(
922+
'project1',
923+
mock.ANY,
924+
TableReference(
925+
datasetId='dataset1',
926+
projectId='project1',
927+
tableId='job_name1'),
928+
TableReference(
929+
datasetId='dataset1',
930+
projectId='project1',
931+
tableId='table1'),
932+
create_disposition=None,
933+
write_disposition='WRITE_TRUNCATE',
934+
job_labels={'step_name': 'bigquerybatchfileloads'}),
935+
call(
936+
'project1',
937+
mock.ANY,
938+
TableReference(
939+
datasetId='dataset1',
940+
projectId='project1',
941+
tableId='job_name2'),
942+
TableReference(
943+
datasetId='dataset1',
944+
projectId='project1',
945+
tableId='table1'),
946+
create_disposition=None,
947+
write_disposition='WRITE_APPEND',
948+
job_labels={'step_name': 'bigquerybatchfileloads'}),
949+
call(
950+
'project1',
951+
mock.ANY,
952+
TableReference(
953+
datasetId='dataset2',
954+
projectId='project1',
955+
tableId='job_name1'),
956+
TableReference(
957+
datasetId='dataset2',
958+
projectId='project1',
959+
tableId='table1'),
960+
create_disposition=None,
961+
# Previously this was `WRITE_APPEND`.
962+
write_disposition='WRITE_TRUNCATE',
963+
job_labels={'step_name': 'bigquerybatchfileloads'}),
964+
call(
965+
'project1',
966+
mock.ANY,
967+
TableReference(
968+
datasetId='dataset3',
969+
projectId='project1',
970+
tableId='job_name3'),
971+
TableReference(
972+
datasetId='dataset3',
973+
projectId='project1',
974+
tableId='table1'),
975+
create_disposition=None,
976+
# Previously this was `WRITE_APPEND`.
977+
write_disposition='WRITE_TRUNCATE',
978+
job_labels={'step_name': 'bigquerybatchfileloads'}),
979+
],
980+
any_order=True)
981+
self.assertEqual(4, mock_insert_copy_job.call_count)
982+
823983
@parameterized.expand([
824984
param(is_streaming=False, with_auto_sharding=False, compat_version=None),
825985
param(is_streaming=True, with_auto_sharding=False, compat_version=None),

0 commit comments

Comments
 (0)