diff --git a/docs/GCS-batchsink.md b/docs/GCS-batchsink.md index c107bfa55b..033fcef623 100644 --- a/docs/GCS-batchsink.md +++ b/docs/GCS-batchsink.md @@ -45,6 +45,23 @@ The delimiter will be ignored if the format is anything other than 'delimited'. **Location:** The location where the gcs bucket will get created. This value is ignored if the bucket already exists. +**Content Type:** The Content Type entity is used to indicate the media type of the resource. +Defaults to 'application/octet-stream'. The following table shows valid content types for each format. + +| Format type | Content type | +|---------------|--------------------------------------------------------------------------------------------| +| avro | application/avro, application/octet-stream | +| csv | text/csv, application/csv, text/plain, application/octet-stream | +| delimited | text/csv, application/csv, text/tab-separated-values, text/plain, application/octet-stream | +| json | application/json, text/plain, application/octet-stream | +| orc | application/octet-stream | +| parquet | application/octet-stream | +| tsv | text/tab-separated-values, text/plain, application/octet-stream | + +**Custom Content Type:** The Custom Content Type is used when the value of Content-Type is set to other. +User can provide specific Content-Type, different from the options in the dropdown. +More information about the Content-Type can be found at https://cloud.google.com/storage/docs/metadata + **Service Account** - service account key used for authorization * **File Path**: Path on the local file system of the service account key used for diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java index 7adb406201..3b21ad4715 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java @@ -67,6 +67,7 @@ public class GCSBatchSink extends AbstractFileSink getFileSystemProperties(BatchSinkContext context) { Map properties = GCPUtils.getFileSystemProperties(config, config.getPath(), new HashMap<>()); + properties.put(GCSBatchSink.CONTENT_TYPE, config.getContentType()); properties.putAll(config.getFileSystemProperties()); String outputFileBaseName = config.getOutputFileNameBase(); if (outputFileBaseName == null || outputFileBaseName.isEmpty()) { @@ -242,6 +244,23 @@ public static class GCSBatchSinkConfig extends GCPReferenceSinkConfig implements private static final String NAME_LOCATION = "location"; private static final String NAME_FS_PROPERTIES = "fileSystemProperties"; private static final String NAME_FILE_NAME_BASE = "outputFileNameBase"; + private static final String NAME_CONTENT_TYPE = "contentType"; + private static final String NAME_CUSTOM_CONTENT_TYPE = "customContentType"; + private static final String DEFAULT_CONTENT_TYPE = "application/octet-stream"; + private static final String CONTENT_TYPE_OTHER = "other"; + private static final String CONTENT_TYPE_APPLICATION_JSON = "application/json"; + private static final String CONTENT_TYPE_APPLICATION_AVRO = "application/avro"; + private static final String CONTENT_TYPE_APPLICATION_CSV = "application/csv"; + private static final String CONTENT_TYPE_TEXT_PLAIN = "text/plain"; + private static final String CONTENT_TYPE_TEXT_CSV = "text/csv"; + private static final String CONTENT_TYPE_TEXT_TSV = "text/tab-separated-values"; + private static final String FORMAT_AVRO = "avro"; + private static final String FORMAT_CSV = "csv"; + private static final String FORMAT_JSON = "json"; + private static final String FORMAT_TSV = "tsv"; + private static final String FORMAT_DELIMITED = "delimited"; + private static final String FORMAT_ORC = "orc"; + private static final String FORMAT_PARQUET = "parquet"; private static final String SCHEME = "gs://"; @Name(NAME_PATH) @@ -280,6 +299,18 @@ public static class GCSBatchSinkConfig extends GCPReferenceSinkConfig implements "This value is ignored if the bucket already exists") protected String location; + @Macro + @Description("The Content Type property is used to indicate the media type of the resource." + + "Defaults to 'application/octet-stream'.") + @Nullable + protected String contentType; + + @Macro + @Description("The Custom Content Type is used when the value of Content-Type is set to other." + + "User can provide specific Content-Type, different from the options in the dropdown.") + @Nullable + protected String customContentType; + @Name(NAME_FS_PROPERTIES) @Macro @Nullable @@ -326,10 +357,19 @@ public void validate(FailureCollector collector) { } } + if (!containsMacro(NAME_CONTENT_TYPE) && !containsMacro(NAME_CUSTOM_CONTENT_TYPE) + && !Strings.isNullOrEmpty(contentType) && !contentType.equalsIgnoreCase(CONTENT_TYPE_OTHER) + && !containsMacro(NAME_FORMAT)) { + if (!contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE)) { + validateContentType(collector); + } + } + try { getSchema(); } catch (IllegalArgumentException e) { - collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_SCHEMA).withStacktrace(e.getStackTrace()); + collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_SCHEMA) + .withStacktrace(e.getStackTrace()); } try { @@ -340,6 +380,69 @@ public void validate(FailureCollector collector) { } } + //This method validates the specified content type for the used format. + public void validateContentType(FailureCollector failureCollector) { + switch (format) { + case FORMAT_AVRO: + if (!contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_AVRO)) { + failureCollector.addFailure(String.format("Valid content types for avro are %s, %s.", + CONTENT_TYPE_APPLICATION_AVRO, DEFAULT_CONTENT_TYPE), null) + .withConfigProperty(NAME_CONTENT_TYPE); + } + break; + case FORMAT_JSON: + if (!contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_JSON) + && !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)) { + failureCollector.addFailure(String.format( + "Valid content types for json are %s, %s, %s.", CONTENT_TYPE_APPLICATION_JSON, + CONTENT_TYPE_TEXT_PLAIN, DEFAULT_CONTENT_TYPE), null + ).withConfigProperty(NAME_CONTENT_TYPE); + } + break; + case FORMAT_CSV: + if (!contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_CSV) + && !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_CSV) + && !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)) { + failureCollector.addFailure(String.format( + "Valid content types for csv are %s, %s, %s, %s.", CONTENT_TYPE_APPLICATION_CSV, + CONTENT_TYPE_TEXT_PLAIN, CONTENT_TYPE_TEXT_CSV, DEFAULT_CONTENT_TYPE), null + ).withConfigProperty(NAME_CONTENT_TYPE); + } + break; + case FORMAT_DELIMITED: + if (!contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN) + && !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_CSV) + && !contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_CSV) + && !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_TSV)) { + failureCollector.addFailure(String.format( + "Valid content types for delimited are %s, %s, %s, %s, %s.", CONTENT_TYPE_TEXT_PLAIN, + CONTENT_TYPE_TEXT_CSV, CONTENT_TYPE_APPLICATION_CSV, CONTENT_TYPE_TEXT_TSV, DEFAULT_CONTENT_TYPE), null + ).withConfigProperty(NAME_CONTENT_TYPE); + } + break; + case FORMAT_PARQUET: + if (!contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE)) { + failureCollector.addFailure(String.format("Valid content type for parquet is %s.", DEFAULT_CONTENT_TYPE), + null).withConfigProperty(NAME_CONTENT_TYPE); + } + break; + case FORMAT_ORC: + if (!contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE)) { + failureCollector.addFailure(String.format("Valid content type for orc is %s.", DEFAULT_CONTENT_TYPE), + null).withConfigProperty(NAME_CONTENT_TYPE); + } + break; + case FORMAT_TSV: + if (!contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN) + && !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_TSV)) { + failureCollector.addFailure(String.format( + "Valid content types for tsv are %s, %s, %s.", CONTENT_TYPE_TEXT_TSV, CONTENT_TYPE_TEXT_PLAIN, + DEFAULT_CONTENT_TYPE), null).withConfigProperty(NAME_CONTENT_TYPE); + } + break; + } + } + public String getBucket() { return GCSPath.from(path).getBucket(); } @@ -383,6 +486,30 @@ public String getLocation() { return location; } + /* This method gets the value of content type. Valid content types for each format are: + * + * avro -> application/avro, application/octet-stream + * json -> application/json, text/plain, application/octet-stream + * csv -> application/csv, text/csv, text/plain, application/octet-stream + * delimited -> application/csv, text/csv, text/plain, text/tsv, application/octet-stream + * orc -> application/octet-stream + * parquet -> application/octet-stream + * tsv -> text/tab-separated-values, application/octet-stream + */ + @Nullable + public String getContentType() { + if (!Strings.isNullOrEmpty(contentType)) { + if (contentType.equals(CONTENT_TYPE_OTHER)) { + if (Strings.isNullOrEmpty(customContentType)) { + return DEFAULT_CONTENT_TYPE; + } + return customContentType; + } + return contentType; + } + return DEFAULT_CONTENT_TYPE; + } + public Map getFileSystemProperties() { if (fileSystemProperties == null || fileSystemProperties.isEmpty()) { return Collections.emptyMap(); diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java index 2722b1c2cc..38fcf10ad7 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java @@ -95,7 +95,6 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati collector.getOrThrowException(); Map baseProperties = GCPUtils.getFileSystemProperties(config, config.getPath(), new HashMap<>()); - Map argumentCopy = new HashMap<>(context.getArguments().asMap()); String cmekKey = context.getArguments().get(GCPUtils.CMEK_KEY); @@ -137,7 +136,7 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati outputProperties.putAll(RecordFilterOutputFormat.configure(validatingOutputFormat.getOutputFormatClassName(), config.splitField, name, schema)); outputProperties.put(FileOutputFormat.OUTDIR, config.getOutputDir(context.getLogicalStartTime(), name)); - + outputProperties.put(GCSBatchSink.CONTENT_TYPE, config.getContentType()); context.addOutput(Output.of( config.getReferenceName() + "_" + name, new SinkOutputFormatProvider(RecordFilterOutputFormat.class.getName(), outputProperties))); diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputCommitter.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputCommitter.java new file mode 100644 index 0000000000..0d919313b5 --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputCommitter.java @@ -0,0 +1,165 @@ +/* + * Copyright © 2015-2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.gcs.sink; + +import com.google.cloud.storage.Blob; +import com.google.common.annotations.VisibleForTesting; +import io.cdap.plugin.gcp.common.GCPUtils; +import io.cdap.plugin.gcp.gcs.StorageClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * OutputCommitter for GCS + */ +public class GCSOutputCommitter extends OutputCommitter { + + private static final Logger LOG = LoggerFactory.getLogger(GCSOutputFormatProvider.class); + public static final String RECORD_COUNT_FORMAT = "recordcount.%s"; + + private final OutputCommitter delegate; + + public GCSOutputCommitter(OutputCommitter delegate) { + this.delegate = delegate; + } + + @Override + public void setupJob(JobContext jobContext) throws IOException { + delegate.setupJob(jobContext); + } + + @Override + public void cleanupJob(JobContext jobContext) throws IOException { + delegate.cleanupJob(jobContext); + } + + @Override + public void commitJob(JobContext jobContext) throws IOException { + delegate.commitJob(jobContext); + } + + @Override + public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { + delegate.abortJob(jobContext, state); + } + + @Override + public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException { + delegate.setupTask(taskAttemptContext); + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException { + return delegate.needsTaskCommit(taskAttemptContext); + } + + @Override + public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException { + /*On commit task, there seems to be some inconsistency across different hadoop implementations regarding the path + where output file is stored. For some implementations it appears in the path returned by FileOutputCommitter + getCommittedTaskPath and for some it does not.Before commit, the files appear to be consistently present in path + returned by FileOutputCommitter getTaskAttemptPath. Hence, find the output file from taskAttemptPath and add + metadata before commit happens. After commit, file would have been moved out of the taskAttemptPath. */ + try { + updateMetricMetaData(taskAttemptContext); + } catch (Exception exception) { + LOG.warn("Unable to record metric for task. Metric emitted for the number of affected rows may be incorrect.", + exception); + } + + delegate.commitTask(taskAttemptContext); + } + + private void updateMetricMetaData(TaskAttemptContext taskAttemptContext) throws IOException { + if (!(delegate instanceof FileOutputCommitter)) { + return; + } + + FileOutputCommitter fileOutputCommitter = (FileOutputCommitter) delegate; + Configuration configuration = taskAttemptContext.getConfiguration(); + //Task is not yet committed, so should be available in attempt path + Path taskAttemptPath = fileOutputCommitter.getTaskAttemptPath(taskAttemptContext); + if (configuration == null || taskAttemptPath == null) { + return; + } + + //read the count from configuration + String keyInConfig = String.format(RECORD_COUNT_FORMAT, taskAttemptContext.getTaskAttemptID()); + Map metaData = new HashMap<>(); + metaData.put(GCSBatchSink.RECORD_COUNT, String.valueOf(configuration.getLong(keyInConfig, 0L))); + StorageClient storageClient = getStorageClient(configuration); + //update metadata on the output file present in the directory for this task + Blob blob = storageClient.pickABlob(taskAttemptPath.toString()); + if (blob == null) { + LOG.info("Could not find a file in path {} to apply count metadata.", taskAttemptPath.toString()); + return; + } + blob.toBuilder().setContentType(configuration.get(GCSBatchSink.CONTENT_TYPE)).setMetadata(metaData).build() + .update(); + } + + @VisibleForTesting + StorageClient getStorageClient(Configuration configuration) throws IOException { + String project = configuration.get(GCPUtils.FS_GS_PROJECT_ID); + String serviceAccount = null; + boolean isServiceAccountFile = GCPUtils.SERVICE_ACCOUNT_TYPE_FILE_PATH + .equals(configuration.get(GCPUtils.SERVICE_ACCOUNT_TYPE)); + if (isServiceAccountFile) { + serviceAccount = configuration.get(GCPUtils.CLOUD_JSON_KEYFILE, null); + } else { + serviceAccount = configuration.get(String.format("%s.%s", GCPUtils.CLOUD_JSON_KEYFILE_PREFIX, + GCPUtils.CLOUD_ACCOUNT_JSON_SUFFIX)); + } + return StorageClient.create(project, serviceAccount, isServiceAccountFile); + } + + @Override + public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException { + delegate.abortTask(taskAttemptContext); + } + + @Override + public boolean isCommitJobRepeatable(JobContext jobContext) throws IOException { + return delegate.isCommitJobRepeatable(jobContext); + } + + @Override + public boolean isRecoverySupported(JobContext jobContext) throws IOException { + return delegate.isRecoverySupported(jobContext); + } + + @Override + public boolean isRecoverySupported() { + return delegate.isRecoverySupported(); + } + + @Override + public void recoverTask(TaskAttemptContext taskContext) throws IOException { + delegate.recoverTask(taskContext); + } +} diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java index 5750f9cba1..af558a6d3a 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java @@ -1,25 +1,16 @@ package io.cdap.plugin.gcp.gcs.sink; -import com.google.cloud.storage.Blob; -import com.google.common.annotations.VisibleForTesting; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.etl.api.validation.FormatContext; import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat; -import io.cdap.plugin.gcp.common.GCPUtils; -import io.cdap.plugin.gcp.gcs.StorageClient; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.util.ReflectionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; @@ -29,8 +20,6 @@ * OutputFormatProvider for GCSSink */ public class GCSOutputFormatProvider implements ValidatingOutputFormat { - - private static final Logger LOG = LoggerFactory.getLogger(GCSOutputFormatProvider.class); private static final String DELEGATE_OUTPUTFORMAT_CLASSNAME = "gcssink.delegate.outputformat.classname"; private static final String OUTPUT_FOLDER = "gcssink.metric.output.folder"; public static final String RECORD_COUNT_FORMAT = "recordcount.%s"; @@ -102,132 +91,6 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) } } - /** - * OutputCommitter for GCS - */ - public static class GCSOutputCommitter extends OutputCommitter { - - private final OutputCommitter delegate; - - public GCSOutputCommitter(OutputCommitter delegate) { - this.delegate = delegate; - } - - @Override - public void setupJob(JobContext jobContext) throws IOException { - delegate.setupJob(jobContext); - } - - @Override - public void cleanupJob(JobContext jobContext) throws IOException { - delegate.cleanupJob(jobContext); - } - - @Override - public void commitJob(JobContext jobContext) throws IOException { - delegate.commitJob(jobContext); - } - - @Override - public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { - delegate.abortJob(jobContext, state); - } - - @Override - public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException { - delegate.setupTask(taskAttemptContext); - } - - @Override - public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException { - return delegate.needsTaskCommit(taskAttemptContext); - } - - @Override - public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException { - /*On commit task, there seems to be some inconsistency across different hadoop implementations regarding the path - where output file is stored. For some implementations it appears in the path returned by FileOutputCommitter - getCommittedTaskPath and for some it does not.Before commit, the files appear to be consistently present in path - returned by FileOutputCommitter getTaskAttemptPath. Hence, find the output file from taskAttemptPath and add - metadata before commit happens. After commit, file would have been moved out of the taskAttemptPath. */ - try { - updateMetricMetaData(taskAttemptContext); - } catch (Exception exception) { - LOG.warn("Unable to record metric for task. Metric emitted for the number of affected rows may be incorrect.", - exception); - } - - delegate.commitTask(taskAttemptContext); - } - - private void updateMetricMetaData(TaskAttemptContext taskAttemptContext) throws IOException { - if (!(delegate instanceof FileOutputCommitter)) { - return; - } - - FileOutputCommitter fileOutputCommitter = (FileOutputCommitter) delegate; - Configuration configuration = taskAttemptContext.getConfiguration(); - //Task is not yet committed, so should be available in attempt path - Path taskAttemptPath = fileOutputCommitter.getTaskAttemptPath(taskAttemptContext); - if (configuration == null || taskAttemptPath == null) { - return; - } - - //read the count from configuration - String keyInConfig = String.format(RECORD_COUNT_FORMAT, taskAttemptContext.getTaskAttemptID()); - Map metaData = new HashMap<>(); - metaData.put(GCSBatchSink.RECORD_COUNT, String.valueOf(configuration.getLong(keyInConfig, 0L))); - StorageClient storageClient = getStorageClient(configuration); - //update metadata on the output file present in the directory for this task - Blob blob = storageClient.pickABlob(taskAttemptPath.toString()); - if (blob == null) { - LOG.info("Could not find a file in path %s to apply count metadata.", taskAttemptPath.toString()); - return; - } - storageClient.setMetaData(blob, metaData); - } - - @VisibleForTesting - StorageClient getStorageClient(Configuration configuration) throws IOException { - String project = configuration.get(GCPUtils.FS_GS_PROJECT_ID); - String serviceAccount = null; - boolean isServiceAccountFile = GCPUtils.SERVICE_ACCOUNT_TYPE_FILE_PATH - .equals(configuration.get(GCPUtils.SERVICE_ACCOUNT_TYPE)); - if (isServiceAccountFile) { - serviceAccount = configuration.get(GCPUtils.CLOUD_JSON_KEYFILE, null); - } else { - serviceAccount = configuration.get(String.format("%s.%s", GCPUtils.CLOUD_JSON_KEYFILE_PREFIX, - GCPUtils.CLOUD_ACCOUNT_JSON_SUFFIX)); - } - return StorageClient.create(project, serviceAccount, isServiceAccountFile); - } - - @Override - public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException { - delegate.abortTask(taskAttemptContext); - } - - @Override - public boolean isCommitJobRepeatable(JobContext jobContext) throws IOException { - return delegate.isCommitJobRepeatable(jobContext); - } - - @Override - public boolean isRecoverySupported(JobContext jobContext) throws IOException { - return delegate.isRecoverySupported(jobContext); - } - - @Override - public boolean isRecoverySupported() { - return delegate.isRecoverySupported(); - } - - @Override - public void recoverTask(TaskAttemptContext taskContext) throws IOException { - delegate.recoverTask(taskContext); - } - } - /** * RecordWriter for GCSSink */ diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/RecordFilterOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/RecordFilterOutputFormat.java index 43677d4ef8..f4f5d6caca 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/RecordFilterOutputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/RecordFilterOutputFormat.java @@ -78,7 +78,8 @@ public void checkOutputSpecs(JobContext context) throws IOException, Interrupted @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { - return getDelegateFormat(context.getConfiguration()).getOutputCommitter(context); + OutputCommitter outputCommitter = getDelegateFormat(context.getConfiguration()).getOutputCommitter(context); + return new GCSOutputCommitter(outputCommitter); } private OutputFormat getDelegateFormat(Configuration hConf) throws IOException { diff --git a/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSinkTest.java b/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSinkTest.java index 8c051ad551..073b00821f 100644 --- a/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSinkTest.java +++ b/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSinkTest.java @@ -16,12 +16,16 @@ package io.cdap.plugin.gcp.gcs.sink; +import io.cdap.cdap.etl.api.validation.CauseAttributes; +import io.cdap.cdap.etl.api.validation.ValidationFailure; import io.cdap.cdap.etl.mock.validation.MockFailureCollector; import io.cdap.plugin.gcp.common.GCPReferenceSinkConfig; import org.junit.Assert; import org.junit.Test; import org.mockito.internal.util.reflection.FieldSetter; +import java.util.List; + public class GCSBatchSinkTest { @Test @@ -60,4 +64,75 @@ public void testInvalidFSProperties() throws NoSuchFieldException { config.validate(collector); Assert.assertEquals(1, collector.getValidationFailures().size()); } + + @Test + public void testValidContentType() throws Exception { + GCSBatchSink.GCSBatchSinkConfig config = getConfig(null); + MockFailureCollector collector = new MockFailureCollector("gcssink"); + config.validate(collector); + FieldSetter + .setField(config, GCSBatchSink.GCSBatchSinkConfig.class.getDeclaredField("format"), + "csv"); + FieldSetter + .setField(config, GCSBatchSink.GCSBatchSinkConfig.class.getDeclaredField("contentType"), + "application/csv"); + config.validate(collector); + FieldSetter + .setField(config, GCSBatchSink.GCSBatchSinkConfig.class.getDeclaredField("format"), + "tsv"); + FieldSetter + .setField(config, GCSBatchSink.GCSBatchSinkConfig.class.getDeclaredField("contentType"), + "text/tab-separated-values"); + config.validate(collector); + FieldSetter + .setField(config, GCSBatchSink.GCSBatchSinkConfig.class.getDeclaredField("format"), + "json"); + FieldSetter + .setField(config, GCSBatchSink.GCSBatchSinkConfig.class.getDeclaredField("contentType"), + "other"); + FieldSetter + .setField(config, GCSBatchSink.GCSBatchSinkConfig.class.getDeclaredField("customContentType"), + "application/javascript"); + config.validate(collector); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testInvalidContentType() throws Exception { + GCSBatchSink.GCSBatchSinkConfig config = getConfig(null); + MockFailureCollector collector = new MockFailureCollector("gcssink"); + FieldSetter + .setField(config, GCSBatchSink.GCSBatchSinkConfig.class.getDeclaredField("format"), + "avro"); + FieldSetter + .setField(config, GCSBatchSink.GCSBatchSinkConfig.class.getDeclaredField("contentType"), + "text/plain"); + config.validate(collector); + FieldSetter + .setField(config, GCSBatchSink.GCSBatchSinkConfig.class.getDeclaredField("format"), + "csv"); + FieldSetter + .setField(config, GCSBatchSink.GCSBatchSinkConfig.class.getDeclaredField("contentType"), + "application/avro"); + config.validate(collector); + FieldSetter + .setField(config, GCSBatchSink.GCSBatchSinkConfig.class.getDeclaredField("format"), + "json"); + FieldSetter + .setField(config, GCSBatchSink.GCSBatchSinkConfig.class.getDeclaredField("contentType"), + "text/tab-separated-values"); + config.validate(collector); + ValidationFailure failure = collector.getValidationFailures().get(0); + List causes = failure.getCauses(); + Assert.assertEquals(1, causes.size()); + Assert.assertEquals("contentType", causes.get(0).getAttribute(CauseAttributes.STAGE_CONFIG)); + failure = collector.getValidationFailures().get(1); + causes = failure.getCauses(); + Assert.assertEquals(1, causes.size()); + Assert.assertEquals("contentType", causes.get(0).getAttribute(CauseAttributes.STAGE_CONFIG)); + failure = collector.getValidationFailures().get(2); + causes = failure.getCauses(); + Assert.assertEquals(1, causes.size()); + Assert.assertEquals("contentType", causes.get(0).getAttribute(CauseAttributes.STAGE_CONFIG)); + } } diff --git a/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputformatProviderTest.java b/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputformatProviderTest.java index 63405858a0..5daf73506d 100644 --- a/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputformatProviderTest.java +++ b/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputformatProviderTest.java @@ -46,9 +46,9 @@ public void testRecordWriter() throws IOException, InterruptedException { @Test public void testGCSOutputCommitter() throws IOException, URISyntaxException { FileOutputCommitter fileOutputCommitter = Mockito.mock(FileOutputCommitter.class); - GCSOutputFormatProvider.GCSOutputCommitter committer = new GCSOutputFormatProvider.GCSOutputCommitter( + GCSOutputCommitter committer = new GCSOutputCommitter( fileOutputCommitter); - GCSOutputFormatProvider.GCSOutputCommitter committerToTest = Mockito.spy(committer); + GCSOutputCommitter committerToTest = Mockito.spy(committer); JobContext mockJobContext = Mockito.mock(JobContext.class); JobStatus.State mockJobState = JobStatus.State.SUCCEEDED; TaskAttemptContext mockContext = Mockito.mock(TaskAttemptContext.class); diff --git a/widgets/GCS-batchsink.json b/widgets/GCS-batchsink.json index 87e9c64e3a..de9d120ed8 100644 --- a/widgets/GCS-batchsink.json +++ b/widgets/GCS-batchsink.json @@ -105,6 +105,29 @@ "widget-attributes": { "default": "us" } + }, + { + "widget-type": "select", + "label": "Content Type", + "name": "contentType", + "widget-attributes": { + "values": [ + "application/octet-stream", + "application/json", + "application/avro", + "application/csv", + "text/plain", + "text/csv", + "text/tab-separated-values", + "other" + ], + "default": "application/octet-stream" + } + }, + { + "widget-type": "textbox", + "label": "Custom Content Type", + "name": "customContentType" } ] }, @@ -166,6 +189,18 @@ "name": "serviceAccountJSON" } ] + }, + { + "name": "CustomContentType", + "condition": { + "expression": "contentType == 'other'" + }, + "show": [ + { + "type": "property", + "name": "customContentType" + } + ] } ], "jump-config": { diff --git a/widgets/GCSMultiFiles-batchsink.json b/widgets/GCSMultiFiles-batchsink.json index f6111bfa61..416d5c3dc9 100644 --- a/widgets/GCSMultiFiles-batchsink.json +++ b/widgets/GCSMultiFiles-batchsink.json @@ -150,6 +150,29 @@ "widget-attributes": { "default": "us" } + }, + { + "widget-type": "select", + "label": "Content Type", + "name": "contentType", + "widget-attributes": { + "values": [ + "application/octet-stream", + "application/json", + "application/avro", + "application/csv", + "text/plain", + "text/csv", + "text/tab-separated-values", + "other" + ], + "default": "application/octet-stream" + } + }, + { + "widget-type": "textbox", + "label": "Custom Content Type", + "name": "customContentType" } ] } @@ -196,6 +219,18 @@ "name": "serviceAccountJSON" } ] + }, + { + "name": "CustomContentType", + "condition": { + "expression": "contentType == 'other'" + }, + "show": [ + { + "type": "property", + "name": "customContentType" + } + ] } ], "jump-config": {