Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/BigQueryMultiTable-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ It can be found on the Dashboard in the Google Cloud Platform Console. This is t
that the BigQuery job will run in. If a temporary bucket needs to be created, the service account
must have permission in this project to create buckets.

**Dataset Project**: Project the dataset belongs to. This is only required if the dataset is not
in the same project that the BigQuery job will run in. If no value is given,
it will default to the configured Project ID.

**Dataset:** Dataset the tables belongs to. A dataset is contained within a specific project.
Datasets are top-level containers that are used to organize and control access to tables and views.
If dataset does not exist, it will be created.
Expand Down
27 changes: 25 additions & 2 deletions docs/BigQueryTable-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ It can be found on the Dashboard in the Google Cloud Platform Console. This is t
that the BigQuery job will run in. If a temporary bucket needs to be created, the service account
must have permission in this project to create buckets.

**Dataset Project**: Project the dataset belongs to. This is only required if the dataset is not
in the same project that the BigQuery job will run in. If no value is given,
it will default to the configured Project ID.

**Dataset**: Dataset the table belongs to. A dataset is contained within a specific project.
Datasets are top-level containers that are used to organize and control access to tables and views.

Expand Down Expand Up @@ -66,10 +70,29 @@ the update operation will be performed only in the partitions meeting the criter
**Location:** The location where the big query dataset will get created. This value is ignored
if the dataset or temporary bucket already exist.

**Create Partitioned Table**: Whether to create the BigQuery table with time partitioning. This value
**Create Partitioned Table [DEPRECATED]**: Whether to create the BigQuery table with time partitioning. This value
is ignored if the table already exists.
* When this is set to true, table will be created with time partitioning.
* When this is set to false, table will be created without time partitioning.
* When this is set to false, value of Partitioning type will be used.
* [DEPRECATED] use Partitioning Type

**Partitioning Type**: Specifies the partitioning type. Can either be Integer or Time or None. Defaults to Time.
This value is ignored if the table already exists.
* When this is set to Time, table will be created with time partitioning.
* When this is set to Integer, table will be created with range partitioning.
* When this is set to None, table will be created without time partitioning.

**Range Start**: For integer partitioning, specifies the start of the range. Only used when table doesn’t
exist already, and partitioning type is set to Integer.
* The start value is inclusive.

**Range End**: For integer partitioning, specifies the end of the range. Only used when table doesn’t
exist already, and partitioning type is set to Integer.
* The end value is exclusive.

**Range Interval**: For integer partitioning, specifies the partition interval. Only used when table doesn’t exist already,
and partitioning type is set to Integer.
* The interval value must be a positive integer.

**Partition Field**: Partitioning column for the BigQuery table. This should be left empty if the
BigQuery table is an ingestion-time partitioned table.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
{
"artifact": {
"name": "cdap-data-pipeline",
"version": "6.2.0",
"scope": "SYSTEM"
},
"description": "Data Pipeline Application",
"name": "example-csv-file-to-bigquery-with-range-partition",
"config": {
"resources": {
"memoryMB": 2048,
"virtualCores": 1
},
"driverResources": {
"memoryMB": 2048,
"virtualCores": 1
},
"connections": [
{
"from": "File",
"to": "BigQuery"
}
],
"comments": [],
"postActions": [],
"properties": {},
"processTimingEnabled": true,
"stageLoggingEnabled": false,
"stages": [
{
"name": "File",
"plugin": {
"name": "File",
"type": "batchsource",
"label": "File",
"artifact": {
"name": "core-plugins",
"version": "2.5.0-SNAPSHOT",
"scope": "USER"
},
"properties": {
"format": "delimited",
"skipHeader": "true",
"filenameOnly": "false",
"recursive": "false",
"ignoreNonExistingFolders": "false",
"schema": "${out_schema}",
"referenceName": "file",
"delimiter": ";",
"path": "${path_to_csv_example}"
}
},
"outputSchema": [
{
"name": "etlSchemaBody",
"schema": "${out_schema}"
}
]
},
{
"name": "BigQuery",
"plugin": {
"name": "BigQueryTable",
"type": "batchsink",
"label": "BigQuery",
"artifact": {
"name": "google-cloud",
"version": "0.14.5",
"scope": "USER"
},
"properties": {
"project": "auto-detect",
"serviceFilePath": "auto-detect",
"operation": "insert",
"truncateTable": "false",
"allowSchemaRelaxation": "false",
"location": "US",
"createPartitionedTable": "false",
"partitioningType": "INTEGER",
"rangeStart": "${range_start}",
"partitionFilterRequired": "false",
"dataset": "${bqsink_dataset}",
"table": "${bqsink_table}",
"referenceName": "BigQuerySink",
"partitionByField": "${partitionfield}",
"rangeEnd": "${range_end}",
"rangeInterval": "${range_interval}"
}
},
"outputSchema": [
{
"name": "etlSchemaBody",
"schema": ""
}
],
"inputSchema": [
{
"name": "File",
"schema": "${out_schema}"
}
]
}
],
"schedule": "0 * * * *",
"engine": "spark",
"numOfRecordsPreview": 100,
"description": "Data Pipeline Application",
"maxConcurrentRuns": 1
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
<flogger.system.backend.version>0.3.1</flogger.system.backend.version>
<gcs.connector.version>hadoop2-2.0.0</gcs.connector.version>
<google.cloud.bigtable.version>1.11.0</google.cloud.bigtable.version>
<google.cloud.bigquery.version>1.92.0</google.cloud.bigquery.version>
<google.cloud.bigquery.version>1.100.0</google.cloud.bigquery.version>
<google.cloud.pubsub.version>1.92.0</google.cloud.pubsub.version>
<google.cloud.spanner.version>1.37.0</google.cloud.spanner.version>
<google.cloud.speech.version>1.20.0</google.cloud.speech.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
Credentials credentials = serviceAccountFilePath == null ?
null : GCPUtils.loadServiceAccountCredentials(serviceAccountFilePath);
String project = config.getProject();
bigQuery = GCPUtils.getBigQuery(project, credentials);
String datasetProjectId = config.getDatasetProject();
bigQuery = GCPUtils.getBigQuery(datasetProjectId, credentials);
String cmekKey = context.getArguments().get(GCPUtils.CMEK_KEY);
baseConfiguration = getBaseConfiguration(cmekKey);
String bucket = configureBucket();
Expand Down Expand Up @@ -461,7 +462,8 @@ private Configuration getOutputConfiguration(String bucket,

BigQueryOutputConfiguration.configure(
configuration,
String.format("%s.%s", getConfig().getDataset(), tableName),
String.format("%s:%s.%s", getConfig().getDatasetProject(), getConfig().getDataset(),
tableName),
outputTableSchema,
temporaryGcsPath,
BigQueryFileFormat.AVRO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
*/
package io.cdap.plugin.gcp.bigquery.sink;

import com.google.cloud.ServiceOptions;
import com.google.cloud.bigquery.JobInfo;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.GCPConfig;
import io.cdap.plugin.gcp.common.GCPReferenceSinkConfig;

import java.util.Set;
Expand All @@ -37,12 +40,20 @@ public abstract class AbstractBigQuerySinkConfig extends GCPReferenceSinkConfig
ImmutableSet.of(Schema.Type.INT, Schema.Type.LONG, Schema.Type.STRING, Schema.Type.FLOAT, Schema.Type.DOUBLE,
Schema.Type.BOOLEAN, Schema.Type.BYTES, Schema.Type.ARRAY, Schema.Type.RECORD);

public static final String DATASET_PROJECT_ID = "datasetProject";
public static final String NAME_DATASET = "dataset";
public static final String NAME_BUCKET = "bucket";
public static final String NAME_TRUNCATE_TABLE = "truncateTable";
public static final String NAME_LOCATION = "location";
private static final String NAME_GCS_CHUNK_SIZE = "gcsChunkSize";

@Name(DATASET_PROJECT_ID)
@Macro
@Nullable
@Description("The project in which the dataset is located/should be created."
+ " Defaults to the project specified in the Project Id property.")
private String datasetProject;

@Name(NAME_DATASET)
@Macro
@Description("The dataset to write to. A dataset is contained within a specific project. "
Expand Down Expand Up @@ -98,6 +109,14 @@ public String getDataset() {
return dataset;
}

@Nullable
public String getDatasetProject() {
if (GCPConfig.AUTO_DETECT.equalsIgnoreCase(datasetProject)) {
return ServiceOptions.getDefaultProjectId();
}
return Strings.isNullOrEmpty(datasetProject) ? getProject() : datasetProject;
}

@Nullable
public String getGcsChunkSize() {
return gcsChunkSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.RangePartitioning;
import com.google.api.services.bigquery.model.RangePartitioning.Range;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
Expand Down Expand Up @@ -150,8 +152,9 @@ public void commitJob(JobContext jobContext) throws IOException {

allowSchemaRelaxation = conf.getBoolean(BigQueryConstants.CONFIG_ALLOW_SCHEMA_RELAXATION, false);
LOG.debug("Allow schema relaxation: '{}'", allowSchemaRelaxation);
boolean createPartitionedTable = conf.getBoolean(BigQueryConstants.CONFIG_CREATE_PARTITIONED_TABLE, false);
LOG.debug("Create Partitioned Table: '{}'", createPartitionedTable);
PartitionType partitionType = conf.getEnum(BigQueryConstants.CONFIG_PARTITION_TYPE, PartitionType.NONE);
LOG.debug("Create Partitioned Table type: '{}'", partitionType);
Range range = partitionType == PartitionType.INTEGER ? createRangeForIntegerPartitioning(conf) : null;
String partitionByField = conf.get(BigQueryConstants.CONFIG_PARTITION_BY_FIELD, null);
LOG.debug("Partition Field: '{}'", partitionByField);
boolean requirePartitionFilter = conf.getBoolean(BigQueryConstants.CONFIG_REQUIRE_PARTITION_FILTER, false);
Expand All @@ -178,7 +181,7 @@ public void commitJob(JobContext jobContext) throws IOException {
JobId jobId = JobId.of(jobIdString);
try {
importFromGcs(destProjectId, destTable, destSchema.orElse(null), kmsKeyName, outputFileFormat,
writeDisposition, sourceUris, createPartitionedTable, partitionByField,
writeDisposition, sourceUris, partitionType, range, partitionByField,
requirePartitionFilter, clusteringOrderList, tableExists, jobIdString);
if (temporaryTableReference != null) {
operationAction(destTable, kmsKeyName, jobId);
Expand All @@ -202,7 +205,7 @@ public void abortJob(JobContext context, JobStatus.State state) throws IOExcepti
*/
private void importFromGcs(String projectId, TableReference tableRef, @Nullable TableSchema schema,
@Nullable String kmsKeyName, BigQueryFileFormat sourceFormat, String writeDisposition,
List<String> gcsPaths, boolean createPartitionedTable,
List<String> gcsPaths, PartitionType partitionType, @Nullable Range range,
@Nullable String partitionByField, boolean requirePartitionFilter,
List<String> clusteringOrderList, boolean tableExists, String jobId)
throws IOException, InterruptedException {
Expand All @@ -228,15 +231,27 @@ private void importFromGcs(String projectId, TableReference tableRef, @Nullable
loadConfig.setSourceUris(gcsPaths);
loadConfig.setWriteDisposition(writeDisposition);
loadConfig.setUseAvroLogicalTypes(true);
if (!tableExists && createPartitionedTable) {
TimePartitioning timePartitioning = new TimePartitioning();
timePartitioning.setType("DAY");
if (partitionByField != null) {
timePartitioning.setField(partitionByField);
if (!tableExists) {
switch (partitionType) {
case TIME:
TimePartitioning timePartitioning = createTimePartitioning(partitionByField,
requirePartitionFilter);
loadConfig.setTimePartitioning(timePartitioning);
break;
case INTEGER:
RangePartitioning rangePartitioning = createRangePartitioning(partitionByField,
range);
if (requirePartitionFilter) {
createTableWithRangePartitionAndRequirePartitionFilter(tableRef, schema,
rangePartitioning);
} else {
loadConfig.setRangePartitioning(rangePartitioning);
}
break;
case NONE:
break;
}
timePartitioning.setRequirePartitionFilter(requirePartitionFilter);
loadConfig.setTimePartitioning(timePartitioning);
if (!clusteringOrderList.isEmpty()) {
if (PartitionType.NONE != partitionType && !clusteringOrderList.isEmpty()) {
Clustering clustering = new Clustering();
clustering.setFields(clusteringOrderList);
loadConfig.setClustering(clustering);
Expand Down Expand Up @@ -524,5 +539,50 @@ private static String formatPartitionFilter(String partitionFilter) {
}
return String.join(" ", queryWords);
}

private Range createRangeForIntegerPartitioning(Configuration conf) {
long rangeStart = conf.getLong(BigQueryConstants.CONFIG_PARTITION_INTEGER_RANGE_START, 0);
long rangeEnd = conf.getLong(BigQueryConstants.CONFIG_PARTITION_INTEGER_RANGE_END, 0);
long rangeInterval = conf
.getLong(BigQueryConstants.CONFIG_PARTITION_INTEGER_RANGE_INTERVAL, 0);
Range range = new Range();
range.setStart(rangeStart);
range.setEnd(rangeEnd);
range.setInterval(rangeInterval);
return range;
}

private TimePartitioning createTimePartitioning(
@Nullable String partitionByField, boolean requirePartitionFilter) {
TimePartitioning timePartitioning = new TimePartitioning();
timePartitioning.setType("DAY");
if (partitionByField != null) {
timePartitioning.setField(partitionByField);
}
timePartitioning.setRequirePartitionFilter(requirePartitionFilter);
return timePartitioning;
}

private void createTableWithRangePartitionAndRequirePartitionFilter(TableReference tableRef,
@Nullable TableSchema schema, RangePartitioning rangePartitioning) throws IOException {
Table table = new Table();
table.setSchema(schema);
table.setTableReference(tableRef);
table.setRequirePartitionFilter(true);
table.setRangePartitioning(rangePartitioning);
bigQueryHelper.getRawBigquery().tables()
.insert(tableRef.getProjectId(), tableRef.getDatasetId(), table)
.execute();
}

private RangePartitioning createRangePartitioning(@Nullable String partitionByField,
@Nullable Range range) {
RangePartitioning rangePartitioning = new RangePartitioning();
rangePartitioning.setRange(range);
if (partitionByField != null) {
rangePartitioning.setField(partitionByField);
}
return rangePartitioning;
}
}
}
Loading