diff --git a/docs/BigQueryMultiTable-batchsink.md b/docs/BigQueryMultiTable-batchsink.md
index b3635b8365..4261890479 100644
--- a/docs/BigQueryMultiTable-batchsink.md
+++ b/docs/BigQueryMultiTable-batchsink.md
@@ -27,6 +27,10 @@ It can be found on the Dashboard in the Google Cloud Platform Console. This is t
that the BigQuery job will run in. If a temporary bucket needs to be created, the service account
must have permission in this project to create buckets.
+**Dataset Project**: Project the dataset belongs to. This is only required if the dataset is not
+in the same project that the BigQuery job will run in. If no value is given,
+it will default to the configured Project ID.
+
**Dataset:** Dataset the tables belongs to. A dataset is contained within a specific project.
Datasets are top-level containers that are used to organize and control access to tables and views.
If dataset does not exist, it will be created.
diff --git a/docs/BigQueryTable-batchsink.md b/docs/BigQueryTable-batchsink.md
index 5db30a9c50..bb62c46852 100644
--- a/docs/BigQueryTable-batchsink.md
+++ b/docs/BigQueryTable-batchsink.md
@@ -28,6 +28,10 @@ It can be found on the Dashboard in the Google Cloud Platform Console. This is t
that the BigQuery job will run in. If a temporary bucket needs to be created, the service account
must have permission in this project to create buckets.
+**Dataset Project**: Project the dataset belongs to. This is only required if the dataset is not
+in the same project that the BigQuery job will run in. If no value is given,
+it will default to the configured Project ID.
+
**Dataset**: Dataset the table belongs to. A dataset is contained within a specific project.
Datasets are top-level containers that are used to organize and control access to tables and views.
@@ -66,10 +70,29 @@ the update operation will be performed only in the partitions meeting the criter
**Location:** The location where the big query dataset will get created. This value is ignored
if the dataset or temporary bucket already exist.
-**Create Partitioned Table**: Whether to create the BigQuery table with time partitioning. This value
+**Create Partitioned Table [DEPRECATED]**: Whether to create the BigQuery table with time partitioning. This value
is ignored if the table already exists.
* When this is set to true, table will be created with time partitioning.
-* When this is set to false, table will be created without time partitioning.
+* When this is set to false, value of Partitioning type will be used.
+* [DEPRECATED] use Partitioning Type
+
+**Partitioning Type**: Specifies the partitioning type. Can either be Integer or Time or None. Defaults to Time.
+ This value is ignored if the table already exists.
+* When this is set to Time, table will be created with time partitioning.
+* When this is set to Integer, table will be created with range partitioning.
+* When this is set to None, table will be created without time partitioning.
+
+**Range Start**: For integer partitioning, specifies the start of the range. Only used when table doesn’t
+exist already, and partitioning type is set to Integer.
+* The start value is inclusive.
+
+**Range End**: For integer partitioning, specifies the end of the range. Only used when table doesn’t
+exist already, and partitioning type is set to Integer.
+* The end value is exclusive.
+
+**Range Interval**: For integer partitioning, specifies the partition interval. Only used when table doesn’t exist already,
+ and partitioning type is set to Integer.
+* The interval value must be a positive integer.
**Partition Field**: Partitioning column for the BigQuery table. This should be left empty if the
BigQuery table is an ingestion-time partitioned table.
diff --git a/examples/example-csv-file-to-bigquery-with-range-partition-cdap-data-pipeline.json b/examples/example-csv-file-to-bigquery-with-range-partition-cdap-data-pipeline.json
new file mode 100644
index 0000000000..740fb40f40
--- /dev/null
+++ b/examples/example-csv-file-to-bigquery-with-range-partition-cdap-data-pipeline.json
@@ -0,0 +1,110 @@
+{
+ "artifact": {
+ "name": "cdap-data-pipeline",
+ "version": "6.2.0",
+ "scope": "SYSTEM"
+ },
+ "description": "Data Pipeline Application",
+ "name": "example-csv-file-to-bigquery-with-range-partition",
+ "config": {
+ "resources": {
+ "memoryMB": 2048,
+ "virtualCores": 1
+ },
+ "driverResources": {
+ "memoryMB": 2048,
+ "virtualCores": 1
+ },
+ "connections": [
+ {
+ "from": "File",
+ "to": "BigQuery"
+ }
+ ],
+ "comments": [],
+ "postActions": [],
+ "properties": {},
+ "processTimingEnabled": true,
+ "stageLoggingEnabled": false,
+ "stages": [
+ {
+ "name": "File",
+ "plugin": {
+ "name": "File",
+ "type": "batchsource",
+ "label": "File",
+ "artifact": {
+ "name": "core-plugins",
+ "version": "2.5.0-SNAPSHOT",
+ "scope": "USER"
+ },
+ "properties": {
+ "format": "delimited",
+ "skipHeader": "true",
+ "filenameOnly": "false",
+ "recursive": "false",
+ "ignoreNonExistingFolders": "false",
+ "schema": "${out_schema}",
+ "referenceName": "file",
+ "delimiter": ";",
+ "path": "${path_to_csv_example}"
+ }
+ },
+ "outputSchema": [
+ {
+ "name": "etlSchemaBody",
+ "schema": "${out_schema}"
+ }
+ ]
+ },
+ {
+ "name": "BigQuery",
+ "plugin": {
+ "name": "BigQueryTable",
+ "type": "batchsink",
+ "label": "BigQuery",
+ "artifact": {
+ "name": "google-cloud",
+ "version": "0.14.5",
+ "scope": "USER"
+ },
+ "properties": {
+ "project": "auto-detect",
+ "serviceFilePath": "auto-detect",
+ "operation": "insert",
+ "truncateTable": "false",
+ "allowSchemaRelaxation": "false",
+ "location": "US",
+ "createPartitionedTable": "false",
+ "partitioningType": "INTEGER",
+ "rangeStart": "${range_start}",
+ "partitionFilterRequired": "false",
+ "dataset": "${bqsink_dataset}",
+ "table": "${bqsink_table}",
+ "referenceName": "BigQuerySink",
+ "partitionByField": "${partitionfield}",
+ "rangeEnd": "${range_end}",
+ "rangeInterval": "${range_interval}"
+ }
+ },
+ "outputSchema": [
+ {
+ "name": "etlSchemaBody",
+ "schema": ""
+ }
+ ],
+ "inputSchema": [
+ {
+ "name": "File",
+ "schema": "${out_schema}"
+ }
+ ]
+ }
+ ],
+ "schedule": "0 * * * *",
+ "engine": "spark",
+ "numOfRecordsPreview": 100,
+ "description": "Data Pipeline Application",
+ "maxConcurrentRuns": 1
+ }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index acae1e2ae1..40d70e1fc7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,7 +77,7 @@
0.3.1
hadoop2-2.0.0
1.11.0
- 1.92.0
+ 1.100.0
1.92.0
1.37.0
1.20.0
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java
index 36b3cf98d4..46cdec7bc8 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java
@@ -102,7 +102,8 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
Credentials credentials = serviceAccountFilePath == null ?
null : GCPUtils.loadServiceAccountCredentials(serviceAccountFilePath);
String project = config.getProject();
- bigQuery = GCPUtils.getBigQuery(project, credentials);
+ String datasetProjectId = config.getDatasetProject();
+ bigQuery = GCPUtils.getBigQuery(datasetProjectId, credentials);
String cmekKey = context.getArguments().get(GCPUtils.CMEK_KEY);
baseConfiguration = getBaseConfiguration(cmekKey);
String bucket = configureBucket();
@@ -461,7 +462,8 @@ private Configuration getOutputConfiguration(String bucket,
BigQueryOutputConfiguration.configure(
configuration,
- String.format("%s.%s", getConfig().getDataset(), tableName),
+ String.format("%s:%s.%s", getConfig().getDatasetProject(), getConfig().getDataset(),
+ tableName),
outputTableSchema,
temporaryGcsPath,
BigQueryFileFormat.AVRO,
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java
index f84053a809..7efff83b4d 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java
@@ -15,7 +15,9 @@
*/
package io.cdap.plugin.gcp.bigquery.sink;
+import com.google.cloud.ServiceOptions;
import com.google.cloud.bigquery.JobInfo;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
@@ -23,6 +25,7 @@
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
+import io.cdap.plugin.gcp.common.GCPConfig;
import io.cdap.plugin.gcp.common.GCPReferenceSinkConfig;
import java.util.Set;
@@ -37,12 +40,20 @@ public abstract class AbstractBigQuerySinkConfig extends GCPReferenceSinkConfig
ImmutableSet.of(Schema.Type.INT, Schema.Type.LONG, Schema.Type.STRING, Schema.Type.FLOAT, Schema.Type.DOUBLE,
Schema.Type.BOOLEAN, Schema.Type.BYTES, Schema.Type.ARRAY, Schema.Type.RECORD);
+ public static final String DATASET_PROJECT_ID = "datasetProject";
public static final String NAME_DATASET = "dataset";
public static final String NAME_BUCKET = "bucket";
public static final String NAME_TRUNCATE_TABLE = "truncateTable";
public static final String NAME_LOCATION = "location";
private static final String NAME_GCS_CHUNK_SIZE = "gcsChunkSize";
+ @Name(DATASET_PROJECT_ID)
+ @Macro
+ @Nullable
+ @Description("The project in which the dataset is located/should be created."
+ + " Defaults to the project specified in the Project Id property.")
+ private String datasetProject;
+
@Name(NAME_DATASET)
@Macro
@Description("The dataset to write to. A dataset is contained within a specific project. "
@@ -98,6 +109,14 @@ public String getDataset() {
return dataset;
}
+ @Nullable
+ public String getDatasetProject() {
+ if (GCPConfig.AUTO_DETECT.equalsIgnoreCase(datasetProject)) {
+ return ServiceOptions.getDefaultProjectId();
+ }
+ return Strings.isNullOrEmpty(datasetProject) ? getProject() : datasetProject;
+ }
+
@Nullable
public String getGcsChunkSize() {
return gcsChunkSize;
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java
index a6be6f20b4..4b00f583a2 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java
@@ -30,6 +30,8 @@
import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.RangePartitioning;
+import com.google.api.services.bigquery.model.RangePartitioning.Range;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
@@ -150,8 +152,9 @@ public void commitJob(JobContext jobContext) throws IOException {
allowSchemaRelaxation = conf.getBoolean(BigQueryConstants.CONFIG_ALLOW_SCHEMA_RELAXATION, false);
LOG.debug("Allow schema relaxation: '{}'", allowSchemaRelaxation);
- boolean createPartitionedTable = conf.getBoolean(BigQueryConstants.CONFIG_CREATE_PARTITIONED_TABLE, false);
- LOG.debug("Create Partitioned Table: '{}'", createPartitionedTable);
+ PartitionType partitionType = conf.getEnum(BigQueryConstants.CONFIG_PARTITION_TYPE, PartitionType.NONE);
+ LOG.debug("Create Partitioned Table type: '{}'", partitionType);
+ Range range = partitionType == PartitionType.INTEGER ? createRangeForIntegerPartitioning(conf) : null;
String partitionByField = conf.get(BigQueryConstants.CONFIG_PARTITION_BY_FIELD, null);
LOG.debug("Partition Field: '{}'", partitionByField);
boolean requirePartitionFilter = conf.getBoolean(BigQueryConstants.CONFIG_REQUIRE_PARTITION_FILTER, false);
@@ -178,7 +181,7 @@ public void commitJob(JobContext jobContext) throws IOException {
JobId jobId = JobId.of(jobIdString);
try {
importFromGcs(destProjectId, destTable, destSchema.orElse(null), kmsKeyName, outputFileFormat,
- writeDisposition, sourceUris, createPartitionedTable, partitionByField,
+ writeDisposition, sourceUris, partitionType, range, partitionByField,
requirePartitionFilter, clusteringOrderList, tableExists, jobIdString);
if (temporaryTableReference != null) {
operationAction(destTable, kmsKeyName, jobId);
@@ -202,7 +205,7 @@ public void abortJob(JobContext context, JobStatus.State state) throws IOExcepti
*/
private void importFromGcs(String projectId, TableReference tableRef, @Nullable TableSchema schema,
@Nullable String kmsKeyName, BigQueryFileFormat sourceFormat, String writeDisposition,
- List gcsPaths, boolean createPartitionedTable,
+ List gcsPaths, PartitionType partitionType, @Nullable Range range,
@Nullable String partitionByField, boolean requirePartitionFilter,
List clusteringOrderList, boolean tableExists, String jobId)
throws IOException, InterruptedException {
@@ -228,15 +231,27 @@ private void importFromGcs(String projectId, TableReference tableRef, @Nullable
loadConfig.setSourceUris(gcsPaths);
loadConfig.setWriteDisposition(writeDisposition);
loadConfig.setUseAvroLogicalTypes(true);
- if (!tableExists && createPartitionedTable) {
- TimePartitioning timePartitioning = new TimePartitioning();
- timePartitioning.setType("DAY");
- if (partitionByField != null) {
- timePartitioning.setField(partitionByField);
+ if (!tableExists) {
+ switch (partitionType) {
+ case TIME:
+ TimePartitioning timePartitioning = createTimePartitioning(partitionByField,
+ requirePartitionFilter);
+ loadConfig.setTimePartitioning(timePartitioning);
+ break;
+ case INTEGER:
+ RangePartitioning rangePartitioning = createRangePartitioning(partitionByField,
+ range);
+ if (requirePartitionFilter) {
+ createTableWithRangePartitionAndRequirePartitionFilter(tableRef, schema,
+ rangePartitioning);
+ } else {
+ loadConfig.setRangePartitioning(rangePartitioning);
+ }
+ break;
+ case NONE:
+ break;
}
- timePartitioning.setRequirePartitionFilter(requirePartitionFilter);
- loadConfig.setTimePartitioning(timePartitioning);
- if (!clusteringOrderList.isEmpty()) {
+ if (PartitionType.NONE != partitionType && !clusteringOrderList.isEmpty()) {
Clustering clustering = new Clustering();
clustering.setFields(clusteringOrderList);
loadConfig.setClustering(clustering);
@@ -524,5 +539,50 @@ private static String formatPartitionFilter(String partitionFilter) {
}
return String.join(" ", queryWords);
}
+
+ private Range createRangeForIntegerPartitioning(Configuration conf) {
+ long rangeStart = conf.getLong(BigQueryConstants.CONFIG_PARTITION_INTEGER_RANGE_START, 0);
+ long rangeEnd = conf.getLong(BigQueryConstants.CONFIG_PARTITION_INTEGER_RANGE_END, 0);
+ long rangeInterval = conf
+ .getLong(BigQueryConstants.CONFIG_PARTITION_INTEGER_RANGE_INTERVAL, 0);
+ Range range = new Range();
+ range.setStart(rangeStart);
+ range.setEnd(rangeEnd);
+ range.setInterval(rangeInterval);
+ return range;
+ }
+
+ private TimePartitioning createTimePartitioning(
+ @Nullable String partitionByField, boolean requirePartitionFilter) {
+ TimePartitioning timePartitioning = new TimePartitioning();
+ timePartitioning.setType("DAY");
+ if (partitionByField != null) {
+ timePartitioning.setField(partitionByField);
+ }
+ timePartitioning.setRequirePartitionFilter(requirePartitionFilter);
+ return timePartitioning;
+ }
+
+ private void createTableWithRangePartitionAndRequirePartitionFilter(TableReference tableRef,
+ @Nullable TableSchema schema, RangePartitioning rangePartitioning) throws IOException {
+ Table table = new Table();
+ table.setSchema(schema);
+ table.setTableReference(tableRef);
+ table.setRequirePartitionFilter(true);
+ table.setRangePartitioning(rangePartitioning);
+ bigQueryHelper.getRawBigquery().tables()
+ .insert(tableRef.getProjectId(), tableRef.getDatasetId(), table)
+ .execute();
+ }
+
+ private RangePartitioning createRangePartitioning(@Nullable String partitionByField,
+ @Nullable Range range) {
+ RangePartitioning rangePartitioning = new RangePartitioning();
+ rangePartitioning.setRange(range);
+ if (partitionByField != null) {
+ rangePartitioning.setField(partitionByField);
+ }
+ return rangePartitioning;
+ }
}
}
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java
index a68e6aea4e..5893af4acc 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java
@@ -141,8 +141,6 @@ public void transform(StructuredRecord input,
* Sets addition configuration for the AbstractBigQuerySink's Hadoop configuration
*/
private void configureBigQuerySink() {
- baseConfiguration.setBoolean(BigQueryConstants.CONFIG_CREATE_PARTITIONED_TABLE,
- getConfig().shouldCreatePartitionedTable());
if (config.getPartitionByField() != null) {
baseConfiguration.set(BigQueryConstants.CONFIG_PARTITION_BY_FIELD, getConfig().getPartitionByField());
}
@@ -161,6 +159,21 @@ private void configureBigQuerySink() {
if (config.getPartitionFilter() != null) {
baseConfiguration.set(BigQueryConstants.CONFIG_PARTITION_FILTER, getConfig().getPartitionFilter());
}
+
+ PartitionType partitioningType = getConfig().getPartitioningType();
+ baseConfiguration.setEnum(BigQueryConstants.CONFIG_PARTITION_TYPE, partitioningType);
+
+ if (config.getRangeStart() != null) {
+ baseConfiguration.setLong(BigQueryConstants.CONFIG_PARTITION_INTEGER_RANGE_START, config.getRangeStart());
+ }
+
+ if (config.getRangeEnd() != null) {
+ baseConfiguration.setLong(BigQueryConstants.CONFIG_PARTITION_INTEGER_RANGE_END, config.getRangeEnd());
+ }
+
+ if (config.getRangeInterval() != null) {
+ baseConfiguration.setLong(BigQueryConstants.CONFIG_PARTITION_INTEGER_RANGE_INTERVAL, config.getRangeInterval());
+ }
}
/**
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java
index f48ddc8012..038ff8bca1 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java
@@ -17,6 +17,7 @@
package io.cdap.plugin.gcp.bigquery.sink;
import com.google.cloud.bigquery.JobInfo;
+import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TimePartitioning;
@@ -26,11 +27,12 @@
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.data.schema.Schema.LogicalType;
+import io.cdap.cdap.api.data.schema.Schema.Type;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
@@ -45,6 +47,7 @@
* configuring the BigQuerySink plugin.
*/
public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig {
+
private static final Logger LOG = LoggerFactory.getLogger(BigQuerySinkConfig.class);
private static final String WHERE = "WHERE";
public static final Set SUPPORTED_CLUSTERING_TYPES =
@@ -58,6 +61,10 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig {
public static final String NAME_CLUSTERING_ORDER = "clusteringOrder";
public static final String NAME_OPERATION = "operation";
public static final String PARTITION_FILTER = "partitionFilter";
+ public static final String NAME_PARTITIONING_TYPE = "partitioningType";
+ public static final String NAME_RANGE_START = "rangeStart";
+ public static final String NAME_RANGE_END = "rangeEnd";
+ public static final String NAME_RANGE_INTERVAL = "rangeInterval";
public static final int MAX_NUMBER_OF_COLUMNS = 4;
@@ -73,13 +80,46 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig {
@Nullable
@Description("The schema of the data to write. If provided, must be compatible with the table schema.")
private String schema;
+//**Create Partitioned Table [DEPRECATED]**: Whether to create the BigQuery table with time partitioning. This value
+// is ignored if the table already exists.
+// * When this is set to true, table will be created with time partitioning.
+//* When this is set to false, value of Partitioning type will be used.
+//* [DEPRECATED] use Partition Type
@Macro
@Nullable
- @Description("Whether to create the BigQuery table with time partitioning. This value is ignored if the table " +
- "already exists.")
+ @Description("DEPRECATED!. Whether to create the BigQuery table with time partitioning. "
+ + "This value is ignored if the table already exists."
+ + " When this is set to false, value of Partitioning type will be used. Use 'Partitioning type' property")
protected Boolean createPartitionedTable;
+ @Name(NAME_PARTITIONING_TYPE)
+ @Macro
+ @Nullable
+ @Description("Specifies the partitioning type. Can either be Integer or Time or None. "
+ + "Ignored when table already exists")
+ protected String partitioningType;
+
+ @Name(NAME_RANGE_START)
+ @Macro
+ @Nullable
+ @Description("Start value for range partitioning. The start value is inclusive. Ignored when table already exists")
+ protected Long rangeStart;
+
+ @Name(NAME_RANGE_END)
+ @Macro
+ @Nullable
+ @Description("End value for range partitioning. The end value is exclusive. Ignored when table already exists")
+ protected Long rangeEnd;
+
+ @Name(NAME_RANGE_INTERVAL)
+ @Macro
+ @Nullable
+ @Description(
+ "Interval value for range partitioning. The interval value must be a positive integer."
+ + "Ignored when table already exists")
+ protected Long rangeInterval;
+
@Name(NAME_PARTITION_BY_FIELD)
@Macro
@Nullable
@@ -129,12 +169,17 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig {
protected String partitionFilter;
public BigQuerySinkConfig(String referenceName, String dataset, String table,
- @Nullable String bucket, @Nullable String schema) {
+ @Nullable String bucket, @Nullable String schema, @Nullable String partitioningType,
+ @Nullable Long rangeStart, @Nullable Long rangeEnd, @Nullable Long rangeInterval) {
this.referenceName = referenceName;
this.dataset = dataset;
this.table = table;
this.bucket = bucket;
this.schema = schema;
+ this.partitioningType = partitioningType;
+ this.rangeStart = rangeStart;
+ this.rangeEnd = rangeEnd;
+ this.rangeInterval = rangeInterval;
}
public String getTable() {
@@ -142,7 +187,7 @@ public String getTable() {
}
public boolean shouldCreatePartitionedTable() {
- return createPartitionedTable == null ? false : createPartitionedTable;
+ return getPartitioningType() != PartitionType.NONE;
}
@Nullable
@@ -160,7 +205,8 @@ public String getClusteringOrder() {
}
public Operation getOperation() {
- return Strings.isNullOrEmpty(operation) ? Operation.INSERT : Operation.valueOf(operation.toUpperCase());
+ return Strings.isNullOrEmpty(operation) ? Operation.INSERT
+ : Operation.valueOf(operation.toUpperCase());
}
@Nullable
@@ -183,7 +229,30 @@ public String getPartitionFilter() {
if (partitionFilter.toUpperCase().startsWith(WHERE)) {
partitionFilter = partitionFilter.substring(WHERE.length());
}
- return partitionFilter;
+ return partitionFilter;
+ }
+
+ @Nullable
+ public Long getRangeStart() {
+ return rangeStart;
+ }
+
+ @Nullable
+ public Long getRangeEnd() {
+ return rangeEnd;
+ }
+
+ @Nullable
+ public Long getRangeInterval() {
+ return rangeInterval;
+ }
+
+ public PartitionType getPartitioningType() {
+ if (createPartitionedTable != null && createPartitionedTable) {
+ return PartitionType.TIME;
+ }
+ return Strings.isNullOrEmpty(partitioningType) ? PartitionType.TIME
+ : PartitionType.valueOf(partitioningType.toUpperCase());
}
/**
@@ -197,7 +266,8 @@ public Schema getSchema(FailureCollector collector) {
try {
return Schema.parseJson(schema);
} catch (IOException e) {
- collector.addFailure("Invalid schema: " + e.getMessage(), null).withConfigProperty(NAME_SCHEMA);
+ collector.addFailure("Invalid schema: " + e.getMessage(), null)
+ .withConfigProperty(NAME_SCHEMA);
}
// if there was an error that was added, it will throw an exception, otherwise, this statement will not be executed
throw collector.getOrThrowException();
@@ -256,7 +326,7 @@ private void validatePartitionProperties(@Nullable Schema schema, FailureCollect
if (tryGetProject() == null) {
return;
}
- String project = getProject();
+ String project = getDatasetProject();
String dataset = getDataset();
String tableName = getTable();
String serviceAccountPath = getServiceAccountFilePath();
@@ -269,62 +339,156 @@ private void validatePartitionProperties(@Nullable Schema schema, FailureCollect
if (table != null) {
StandardTableDefinition tableDefinition = table.getDefinition();
TimePartitioning timePartitioning = tableDefinition.getTimePartitioning();
- if (timePartitioning == null && createPartitionedTable != null && createPartitionedTable) {
- LOG.warn(String.format("The plugin is configured to auto-create a partitioned table, but table '%s' already " +
- "exists without partitioning. Please verify the partitioning configuration.",
- table.getTableId().getTable()));
- }
- if (timePartitioning != null && timePartitioning.getField() != null
- && !timePartitioning.getField().equals(partitionByField)) {
- collector.addFailure(String.format("Destination table '%s' is partitioned by column '%s'.",
- table.getTableId().getTable(),
- timePartitioning.getField()),
- String.format("Set the partition field to '%s'.", timePartitioning.getField()))
- .withConfigProperty(NAME_PARTITION_BY_FIELD);
+ RangePartitioning rangePartitioning = tableDefinition.getRangePartitioning();
+ if (timePartitioning == null && rangePartitioning == null && shouldCreatePartitionedTable()) {
+ LOG.warn(String.format(
+ "The plugin is configured to auto-create a partitioned table, but table '%s' already " +
+ "exists without partitioning. Please verify the partitioning configuration.",
+ table.getTableId().getTable()));
+ } else if (timePartitioning != null) {
+ validateTimePartitionTableWithInputConfiguration(table, timePartitioning, collector);
+ } else if (rangePartitioning != null) {
+ validateRangePartitionTableWithInputConfiguration(table, rangePartitioning, collector);
}
validateColumnForPartition(partitionByField, schema, collector);
return;
}
- if (createPartitionedTable == null || !createPartitionedTable) {
- return;
+ if (!shouldCreatePartitionedTable()) {
+ validateColumnForPartition(partitionByField, schema, collector);
+ }
+ }
+
+ private void validateTimePartitionTableWithInputConfiguration(Table table,
+ TimePartitioning timePartitioning, FailureCollector collector) {
+ PartitionType partitioningType = getPartitioningType();
+ if (partitioningType == PartitionType.TIME && timePartitioning.getField() != null
+ && !timePartitioning.getField()
+ .equals(partitionByField)) {
+ collector.addFailure(String.format("Destination table '%s' is partitioned by column '%s'.",
+ table.getTableId().getTable(),
+ timePartitioning.getField()),
+ String.format("Set the partition field to '%s'.", timePartitioning.getField()))
+ .withConfigProperty(NAME_PARTITION_BY_FIELD);
+ } else if (partitioningType != PartitionType.TIME) {
+ LOG.warn(String.format("The plugin is configured to %s, but table '%s' already " +
+ "exists with Time partitioning. Please verify the partitioning configuration.",
+ partitioningType == PartitionType.INTEGER ? "auto-create a Integer partitioned table"
+ : "auto-create table without partition",
+ table.getTableId().getTable()));
+ }
+ }
+
+ private void validateRangePartitionTableWithInputConfiguration(Table table,
+ RangePartitioning rangePartitioning, FailureCollector collector) {
+ PartitionType partitioningType = getPartitioningType();
+ if (partitioningType != PartitionType.INTEGER) {
+ LOG.warn(String.format("The plugin is configured to %s, but table '%s' already " +
+ "exists with Integer partitioning. Please verify the partitioning configuration.",
+ partitioningType == PartitionType.TIME ? "auto-create a Time partitioned table"
+ : "auto-create table without partition",
+ table.getTableId().getTable()));
+ } else if (rangePartitioning.getField() != null && !rangePartitioning.getField()
+ .equals(partitionByField)) {
+ collector.addFailure(String.format("Destination table '%s' is partitioned by column '%s'.",
+ table.getTableId().getTable(),
+ rangePartitioning.getField()),
+ String.format("Set the partition field to '%s'.", rangePartitioning.getField()))
+ .withConfigProperty(NAME_PARTITION_BY_FIELD);
}
- validateColumnForPartition(partitionByField, schema, collector);
}
private void validateColumnForPartition(@Nullable String columnName, @Nullable Schema schema,
- FailureCollector collector) {
+ FailureCollector collector) {
if (columnName == null || schema == null) {
return;
}
Schema.Field field = schema.getField(columnName);
if (field == null) {
- collector.addFailure(String.format("Partition column '%s' must be present in the schema.", columnName),
- "Change the Partition column to be one of the schema fields.")
- .withConfigProperty(NAME_PARTITION_BY_FIELD);
+ collector.addFailure(
+ String.format("Partition column '%s' must be present in the schema.", columnName),
+ "Change the Partition column to be one of the schema fields.")
+ .withConfigProperty(NAME_PARTITION_BY_FIELD);
return;
}
Schema fieldSchema = field.getSchema();
fieldSchema = fieldSchema.isNullable() ? fieldSchema.getNonNullable() : fieldSchema;
+ PartitionType partitioningType = getPartitioningType();
+ if (partitioningType == PartitionType.TIME) {
+ validateTimePartitioningColumn(columnName, collector, fieldSchema);
+ } else if (partitioningType == PartitionType.INTEGER) {
+ validateIntegerPartitioningColumn(columnName, collector, fieldSchema);
+ validateIntegerPartitioningRange(getRangeStart(), getRangeEnd(), getRangeInterval(),
+ collector);
+ }
+ }
+
+ private void validateIntegerPartitioningColumn(String columnName,
+ FailureCollector collector, Schema fieldSchema) {
+ if (fieldSchema.getType() != Type.INT && fieldSchema.getType() != Type.LONG) {
+ collector.addFailure(
+ String.format("Partition column '%s' is of invalid type '%s'.", columnName,
+ fieldSchema.getDisplayName()),
+ "Partition column must be a int or long.").withConfigProperty(NAME_PARTITION_BY_FIELD)
+ .withOutputSchemaField(columnName).withInputSchemaField(columnName);
+ }
+ }
+
+ private void validateTimePartitioningColumn(String columnName,
+ FailureCollector collector, Schema fieldSchema) {
Schema.LogicalType logicalType = fieldSchema.getLogicalType();
- if (logicalType != Schema.LogicalType.DATE && logicalType != Schema.LogicalType.TIMESTAMP_MICROS
- && logicalType != Schema.LogicalType.TIMESTAMP_MILLIS) {
+ if (logicalType != LogicalType.DATE && logicalType != LogicalType.TIMESTAMP_MICROS
+ && logicalType != LogicalType.TIMESTAMP_MILLIS) {
collector.addFailure(
- String.format("Partition column '%s' is of invalid type '%s'.", columnName, fieldSchema.getDisplayName()),
- "Partition column must be a date or timestamp.").withConfigProperty(NAME_PARTITION_BY_FIELD)
- .withOutputSchemaField(columnName).withInputSchemaField(columnName);
+ String.format("Partition column '%s' is of invalid type '%s'.", columnName,
+ fieldSchema.getDisplayName()),
+ "Partition column must be a date or timestamp.")
+ .withConfigProperty(NAME_PARTITION_BY_FIELD)
+ .withOutputSchemaField(columnName).withInputSchemaField(columnName);
+ }
+ }
+
+ private void validateIntegerPartitioningRange(Long rangeStart, Long rangeEnd, Long rangeInterval,
+ FailureCollector collector) {
+ if (!containsMacro(NAME_RANGE_START) && rangeStart == null) {
+ collector.addFailure("Range Start is not defined.",
+ "For Integer Partitioning, Range Start must be defined.")
+ .withConfigProperty(NAME_RANGE_START);
+ }
+ if (!containsMacro(NAME_RANGE_END) && rangeEnd == null) {
+ collector.addFailure("Range End is not defined.",
+ "For Integer Partitioning, Range End must be defined.")
+ .withConfigProperty(NAME_RANGE_END);
+ }
+
+ if (!containsMacro(NAME_RANGE_INTERVAL)) {
+ if (rangeInterval == null) {
+ collector
+ .addFailure(
+ "Range Interval is not defined.",
+ "For Integer Partitioning, Range Interval must be defined.")
+ .withConfigProperty(NAME_RANGE_INTERVAL);
+ } else if (rangeInterval <= 0) {
+ collector
+ .addFailure(
+ "Range Interval is not a positive number.",
+ "Range interval must be a valid positive integer.")
+ .withConfigProperty(NAME_RANGE_INTERVAL);
+ }
}
}
private void validateClusteringOrder(@Nullable Schema schema, FailureCollector collector) {
- if (!shouldCreatePartitionedTable() || Strings.isNullOrEmpty(clusteringOrder) || schema == null) {
+ if (!shouldCreatePartitionedTable() || Strings.isNullOrEmpty(clusteringOrder)
+ || schema == null) {
return;
}
List columnsNames = Arrays.stream(clusteringOrder.split(",")).map(String::trim)
- .collect(Collectors.toList());
+ .collect(Collectors.toList());
if (columnsNames.size() > MAX_NUMBER_OF_COLUMNS) {
- collector.addFailure(String.format("Found '%d' number of clustering fields.", columnsNames.size()),
- String.format("Expected at most '%d' clustering fields.", MAX_NUMBER_OF_COLUMNS))
- .withConfigProperty(NAME_CLUSTERING_ORDER);
+ collector
+ .addFailure(String.format("Found '%d' number of clustering fields.", columnsNames.size()),
+ String.format("Expected at most '%d' clustering fields.", MAX_NUMBER_OF_COLUMNS))
+ .withConfigProperty(NAME_CLUSTERING_ORDER);
return;
}
@@ -441,8 +605,11 @@ private boolean isSupportedLogicalType(Schema.LogicalType logicalType) {
* Returns true if bigquery table can be connected to or schema is not a macro.
*/
boolean shouldConnect() {
- return !containsMacro(BigQuerySinkConfig.NAME_DATASET) && !containsMacro(BigQuerySinkConfig.NAME_TABLE) &&
- !containsMacro(BigQuerySinkConfig.NAME_SERVICE_ACCOUNT_FILE_PATH) &&
- !containsMacro(BigQuerySinkConfig.NAME_PROJECT) && !containsMacro(BigQuerySinkConfig.NAME_SCHEMA);
+ return !containsMacro(BigQuerySinkConfig.NAME_DATASET) &&
+ !containsMacro(BigQuerySinkConfig.NAME_TABLE) &&
+ !containsMacro(BigQuerySinkConfig.NAME_SERVICE_ACCOUNT_FILE_PATH) &&
+ !containsMacro(BigQuerySinkConfig.NAME_PROJECT) &&
+ !containsMacro(BigQuerySinkConfig.DATASET_PROJECT_ID) &&
+ !containsMacro(BigQuerySinkConfig.NAME_SCHEMA);
}
}
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/PartitionType.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/PartitionType.java
new file mode 100644
index 0000000000..d916e3015c
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/PartitionType.java
@@ -0,0 +1,10 @@
+package io.cdap.plugin.gcp.bigquery.sink;
+
+/**
+ * The type of partition
+ */
+public enum PartitionType {
+ TIME,
+ INTEGER,
+ NONE
+}
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java
index ae0014e4b8..27d1926555 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java
@@ -22,7 +22,6 @@ public interface BigQueryConstants {
String CONFIG_ALLOW_SCHEMA_RELAXATION = "cdap.bq.sink.allow.schema.relaxation";
String CONFIG_DESTINATION_TABLE_EXISTS = "cdap.bq.sink.destination.table.exists";
- String CONFIG_CREATE_PARTITIONED_TABLE = "cdap.bq.sink.create.partitioned.table";
String CONFIG_PARTITION_BY_FIELD = "cdap.bq.sink.partition.by.field";
String CONFIG_REQUIRE_PARTITION_FILTER = "cdap.bq.sink.require.partition.filter";
String CONFIG_PARTITION_FROM_DATE = "cdap.bq.source.partition.from.date";
@@ -36,4 +35,8 @@ public interface BigQueryConstants {
String CONFIG_PARTITION_FILTER = "cdap.bq.sink.partition.filter";
String CONFIG_FILTER = "cdap.bq.source.filter";
String CONFIG_JOB_ID = "cdap.bq.sink.job.id";
+ String CONFIG_PARTITION_TYPE = "cdap.bq.sink.partition.type";
+ String CONFIG_PARTITION_INTEGER_RANGE_START = "cdap.bq.sink.partition.integer.range.start";
+ String CONFIG_PARTITION_INTEGER_RANGE_END = "cdap.bq.sink.partition.integer.range.end";
+ String CONFIG_PARTITION_INTEGER_RANGE_INTERVAL = "cdap.bq.sink.partition.integer.range.interval";
}
diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/BigQuerySinkTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/BigQuerySinkTest.java
index e486766522..84cd94083f 100644
--- a/src/test/java/io/cdap/plugin/gcp/bigquery/BigQuerySinkTest.java
+++ b/src/test/java/io/cdap/plugin/gcp/bigquery/BigQuerySinkTest.java
@@ -61,7 +61,8 @@ public void testBigQuerySinkConfig() {
Schema.Field.of("timestamp",
Schema.nullableOf(Schema.of(Schema.LogicalType.TIMESTAMP_MICROS))));
- BigQuerySinkConfig config = new BigQuerySinkConfig("44", "ds", "tb", "bucket", schema.toString());
+ BigQuerySinkConfig config = new BigQuerySinkConfig("44", "ds", "tb", "bucket",
+ schema.toString(), "INTEGER", 0L, 100L, 10L);
MockFailureCollector collector = new MockFailureCollector("bqsink");
config.validate(collector);
Assert.assertEquals(0, collector.getValidationFailures().size());
@@ -72,7 +73,8 @@ public void testBigQuerySinkInvalidConfig() {
Schema invalidSchema = Schema.recordOf("record",
Schema.Field.of("id", Schema.of(Schema.Type.LONG)));
- BigQuerySinkConfig config = new BigQuerySinkConfig("reference!!", "ds", "tb", "buck3t$$", invalidSchema.toString());
+ BigQuerySinkConfig config = new BigQuerySinkConfig("reference!!", "ds", "tb", "buck3t$$",
+ invalidSchema.toString(), "INTEGER", 0L, 100L, 10L);
MockFailureCollector collector = new MockFailureCollector("bqsink");
config.validate(collector);
List failures = collector.getValidationFailures();
@@ -126,7 +128,8 @@ private static BigQuerySink getSinkToTest(Job mockJob) throws NoSuchFieldExcepti
Schema.Field.of("id", Schema.of(Schema.Type.LONG)),
Schema.Field.of("name", Schema.of(Schema.Type.STRING)));
BigQuerySinkConfig config =
- new BigQuerySinkConfig("testmetric", "ds", "tb", "bkt", schema.toString());
+ new BigQuerySinkConfig("testmetric", "ds", "tb", "bkt", schema.toString(), null, null, null,
+ null);
BigQuery bigQueryMock = mock(BigQuery.class);
BigQuerySink sink = new BigQuerySink(config);
setBigQuery(sink, bigQueryMock);
diff --git a/widgets/BigQueryMultiTable-batchsink.json b/widgets/BigQueryMultiTable-batchsink.json
index 0137b31f7e..00fcff08d5 100644
--- a/widgets/BigQueryMultiTable-batchsink.json
+++ b/widgets/BigQueryMultiTable-batchsink.json
@@ -23,6 +23,14 @@
"default": "auto-detect"
}
},
+ {
+ "widget-type": "textbox",
+ "label": "Dataset Project Id",
+ "name": "datasetProject",
+ "widget-attributes": {
+ "placeholder": "The project in which the dataset is located/should be created. Defaults to the project specified in the Project Id property."
+ }
+ },
{
"widget-type": "textbox",
"label": "Dataset",
diff --git a/widgets/BigQueryTable-batchsink.json b/widgets/BigQueryTable-batchsink.json
index af48f9cbab..b9085f464b 100644
--- a/widgets/BigQueryTable-batchsink.json
+++ b/widgets/BigQueryTable-batchsink.json
@@ -23,6 +23,14 @@
"default": "auto-detect"
}
},
+ {
+ "widget-type": "textbox",
+ "label": "Dataset Project Id",
+ "name": "datasetProject",
+ "widget-attributes": {
+ "placeholder": "The project in which the dataset is located/should be created. Defaults to the project specified in the Project Id property."
+ }
+ },
{
"widget-type": "textbox",
"label": "Dataset",
@@ -181,6 +189,47 @@
"default": "false"
}
},
+ {
+ "widget-type": "radio-group",
+ "label": "Partitioning type",
+ "name": "partitioningType",
+ "widget-attributes": {
+ "layout": "inline",
+ "default": "TIME",
+ "options": [
+ {
+ "id": "TIME",
+ "label": "Time"
+ },
+ {
+ "id": "INTEGER",
+ "label": "Integer"
+ },
+ {
+ "id": "NONE",
+ "label": "None"
+ }
+ ]
+ }
+ },
+ {
+ "widget-type": "Number",
+ "label": "Range Start",
+ "name": "rangeStart",
+ "widget-attributes": {
+ "default": "0"
+ }
+ },
+ {
+ "widget-type": "Number",
+ "label": "Range End",
+ "name": "rangeEnd"
+ },
+ {
+ "widget-type": "Number",
+ "label": "Range Interval",
+ "name": "rangeInterval"
+ },
{
"widget-type": "textbox",
"label": "Partition Field",
@@ -233,6 +282,52 @@
}
}
],
+ "filters": [
+ {
+ "name": "PartitioningIntegerFieldsFilter",
+ "condition": {
+ "expression": "partitioningType == 'INTEGER'"
+ },
+ "show": [
+ {
+ "type": "property",
+ "name": "rangeStart"
+ },
+ {
+ "type": "property",
+ "name": "rangeEnd"
+ },
+ {
+ "type": "property",
+ "name": "rangeInterval"
+ },
+ {
+ "type": "property",
+ "name": "clientAccessToken"
+ }
+ ]
+ },
+ {
+ "name": "PartitionFieldFilter",
+ "condition": {
+ "expression": "createPartitionedTable == true || partitioningType == 'INTEGER' || partitioningType == 'TIME'"
+ },
+ "show": [
+ {
+ "type": "property",
+ "name": "partitionByField"
+ },
+ {
+ "type": "property",
+ "name": "partitionFilterRequired"
+ },
+ {
+ "type": "property",
+ "name": "clusteringOrder"
+ }
+ ]
+ }
+ ],
"jump-config": {
"datasets": [
{