Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/GCS-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,23 @@ The delimiter will be ignored if the format is anything other than 'delimited'.

**Location:** The location where the gcs bucket will get created. This value is ignored if the bucket already exists.

**Content Type:** The Content Type entity is used to indicate the media type of the resource.
Defaults to 'application/octet-stream'. The following table shows valid content types for each format.

| Format type | Content type |
|---------------|--------------------------------------------------------------------------------------------|
| avro | application/avro, application/octet-stream |
| csv | text/csv, application/csv, text/plain, application/octet-stream |
| delimited | text/csv, application/csv, text/tab-separated-values, text/plain, application/octet-stream |
| json | application/json, text/plain, application/octet-stream |
| orc | application/octet-stream |
| parquet | application/octet-stream |
| tsv | text/tab-separated-values, text/plain, application/octet-stream |

**Custom Content Type:** The Custom Content Type is used when the value of Content-Type is set to other.
User can provide specific Content-Type, different from the options in the dropdown.
More information about the Content-Type can be found at https://cloud.google.com/storage/docs/metadata

**Service Account** - service account key used for authorization

* **File Path**: Path on the local file system of the service account key used for
Expand Down
129 changes: 128 additions & 1 deletion src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class GCSBatchSink extends AbstractFileSink<GCSBatchSink.GCSBatchSinkConf
private static final String RECORDS_UPDATED_METRIC = "records.updated";
public static final String AVRO_NAMED_OUTPUT = "avro.mo.config.namedOutput";
public static final String COMMON_NAMED_OUTPUT = "mapreduce.output.basename";
public static final String CONTENT_TYPE = "io.cdap.gcs.batch.sink.content.type";

private final GCSBatchSinkConfig config;
private String outputPath;
Expand Down Expand Up @@ -125,6 +126,7 @@ public void prepareRun(BatchSinkContext context) throws Exception {
@Override
protected Map<String, String> getFileSystemProperties(BatchSinkContext context) {
Map<String, String> properties = GCPUtils.getFileSystemProperties(config, config.getPath(), new HashMap<>());
properties.put(GCSBatchSink.CONTENT_TYPE, config.getContentType());
properties.putAll(config.getFileSystemProperties());
String outputFileBaseName = config.getOutputFileNameBase();
if (outputFileBaseName == null || outputFileBaseName.isEmpty()) {
Expand Down Expand Up @@ -242,6 +244,23 @@ public static class GCSBatchSinkConfig extends GCPReferenceSinkConfig implements
private static final String NAME_LOCATION = "location";
private static final String NAME_FS_PROPERTIES = "fileSystemProperties";
private static final String NAME_FILE_NAME_BASE = "outputFileNameBase";
private static final String NAME_CONTENT_TYPE = "contentType";
private static final String NAME_CUSTOM_CONTENT_TYPE = "customContentType";
private static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";
private static final String CONTENT_TYPE_OTHER = "other";
private static final String CONTENT_TYPE_APPLICATION_JSON = "application/json";
private static final String CONTENT_TYPE_APPLICATION_AVRO = "application/avro";
private static final String CONTENT_TYPE_APPLICATION_CSV = "application/csv";
private static final String CONTENT_TYPE_TEXT_PLAIN = "text/plain";
private static final String CONTENT_TYPE_TEXT_CSV = "text/csv";
private static final String CONTENT_TYPE_TEXT_TSV = "text/tab-separated-values";
private static final String FORMAT_AVRO = "avro";
private static final String FORMAT_CSV = "csv";
private static final String FORMAT_JSON = "json";
private static final String FORMAT_TSV = "tsv";
private static final String FORMAT_DELIMITED = "delimited";
private static final String FORMAT_ORC = "orc";
private static final String FORMAT_PARQUET = "parquet";

private static final String SCHEME = "gs://";
@Name(NAME_PATH)
Expand Down Expand Up @@ -280,6 +299,18 @@ public static class GCSBatchSinkConfig extends GCPReferenceSinkConfig implements
"This value is ignored if the bucket already exists")
protected String location;

@Macro
@Description("The Content Type property is used to indicate the media type of the resource." +
"Defaults to 'application/octet-stream'.")
@Nullable
protected String contentType;

@Macro
@Description("The Custom Content Type is used when the value of Content-Type is set to other." +
"User can provide specific Content-Type, different from the options in the dropdown.")
@Nullable
protected String customContentType;

@Name(NAME_FS_PROPERTIES)
@Macro
@Nullable
Expand Down Expand Up @@ -326,10 +357,19 @@ public void validate(FailureCollector collector) {
}
}

if (!containsMacro(NAME_CONTENT_TYPE) && !containsMacro(NAME_CUSTOM_CONTENT_TYPE)
&& !Strings.isNullOrEmpty(contentType) && !contentType.equalsIgnoreCase(CONTENT_TYPE_OTHER)
&& !containsMacro(NAME_FORMAT)) {
if (!contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE)) {
validateContentType(collector);
}
}

try {
getSchema();
} catch (IllegalArgumentException e) {
collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_SCHEMA).withStacktrace(e.getStackTrace());
collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_SCHEMA)
.withStacktrace(e.getStackTrace());
}

try {
Expand All @@ -340,6 +380,69 @@ public void validate(FailureCollector collector) {
}
}

//This method validates the specified content type for the used format.
public void validateContentType(FailureCollector failureCollector) {
switch (format) {
case FORMAT_AVRO:
if (!contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_AVRO)) {
failureCollector.addFailure(String.format("Valid content types for avro are %s, %s.",
CONTENT_TYPE_APPLICATION_AVRO, DEFAULT_CONTENT_TYPE), null)
.withConfigProperty(NAME_CONTENT_TYPE);
}
break;
case FORMAT_JSON:
if (!contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_JSON)
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)) {
failureCollector.addFailure(String.format(
"Valid content types for json are %s, %s, %s.", CONTENT_TYPE_APPLICATION_JSON,
CONTENT_TYPE_TEXT_PLAIN, DEFAULT_CONTENT_TYPE), null
).withConfigProperty(NAME_CONTENT_TYPE);
}
break;
case FORMAT_CSV:
if (!contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_CSV)
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_CSV)
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)) {
failureCollector.addFailure(String.format(
"Valid content types for csv are %s, %s, %s, %s.", CONTENT_TYPE_APPLICATION_CSV,
CONTENT_TYPE_TEXT_PLAIN, CONTENT_TYPE_TEXT_CSV, DEFAULT_CONTENT_TYPE), null
).withConfigProperty(NAME_CONTENT_TYPE);
}
break;
case FORMAT_DELIMITED:
if (!contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_CSV)
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_CSV)
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_TSV)) {
failureCollector.addFailure(String.format(
"Valid content types for delimited are %s, %s, %s, %s, %s.", CONTENT_TYPE_TEXT_PLAIN,
CONTENT_TYPE_TEXT_CSV, CONTENT_TYPE_APPLICATION_CSV, CONTENT_TYPE_TEXT_TSV, DEFAULT_CONTENT_TYPE), null
).withConfigProperty(NAME_CONTENT_TYPE);
}
break;
case FORMAT_PARQUET:
if (!contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE)) {
failureCollector.addFailure(String.format("Valid content type for parquet is %s.", DEFAULT_CONTENT_TYPE),
null).withConfigProperty(NAME_CONTENT_TYPE);
}
break;
case FORMAT_ORC:
if (!contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE)) {
failureCollector.addFailure(String.format("Valid content type for orc is %s.", DEFAULT_CONTENT_TYPE),
null).withConfigProperty(NAME_CONTENT_TYPE);
}
break;
case FORMAT_TSV:
if (!contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_TSV)) {
failureCollector.addFailure(String.format(
"Valid content types for tsv are %s, %s, %s.", CONTENT_TYPE_TEXT_TSV, CONTENT_TYPE_TEXT_PLAIN,
DEFAULT_CONTENT_TYPE), null).withConfigProperty(NAME_CONTENT_TYPE);
}
break;
}
}

public String getBucket() {
return GCSPath.from(path).getBucket();
}
Expand Down Expand Up @@ -383,6 +486,30 @@ public String getLocation() {
return location;
}

/* This method gets the value of content type. Valid content types for each format are:
*
* avro -> application/avro, application/octet-stream
* json -> application/json, text/plain, application/octet-stream
* csv -> application/csv, text/csv, text/plain, application/octet-stream
* delimited -> application/csv, text/csv, text/plain, text/tsv, application/octet-stream
* orc -> application/octet-stream
* parquet -> application/octet-stream
* tsv -> text/tab-separated-values, application/octet-stream
*/
@Nullable
public String getContentType() {
if (!Strings.isNullOrEmpty(contentType)) {
if (contentType.equals(CONTENT_TYPE_OTHER)) {
if (Strings.isNullOrEmpty(customContentType)) {
return DEFAULT_CONTENT_TYPE;
}
return customContentType;
}
return contentType;
}
return DEFAULT_CONTENT_TYPE;
}

public Map<String, String> getFileSystemProperties() {
if (fileSystemProperties == null || fileSystemProperties.isEmpty()) {
return Collections.emptyMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati
collector.getOrThrowException();

Map<String, String> baseProperties = GCPUtils.getFileSystemProperties(config, config.getPath(), new HashMap<>());

Map<String, String> argumentCopy = new HashMap<>(context.getArguments().asMap());

String cmekKey = context.getArguments().get(GCPUtils.CMEK_KEY);
Expand Down Expand Up @@ -137,7 +136,7 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati
outputProperties.putAll(RecordFilterOutputFormat.configure(validatingOutputFormat.getOutputFormatClassName(),
config.splitField, name, schema));
outputProperties.put(FileOutputFormat.OUTDIR, config.getOutputDir(context.getLogicalStartTime(), name));

outputProperties.put(GCSBatchSink.CONTENT_TYPE, config.getContentType());
context.addOutput(Output.of(
config.getReferenceName() + "_" + name,
new SinkOutputFormatProvider(RecordFilterOutputFormat.class.getName(), outputProperties)));
Expand Down
165 changes: 165 additions & 0 deletions src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputCommitter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Copyright © 2015-2020 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.gcp.gcs.sink;

import com.google.cloud.storage.Blob;
import com.google.common.annotations.VisibleForTesting;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.StorageClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
* OutputCommitter for GCS
*/
public class GCSOutputCommitter extends OutputCommitter {

private static final Logger LOG = LoggerFactory.getLogger(GCSOutputFormatProvider.class);
public static final String RECORD_COUNT_FORMAT = "recordcount.%s";

private final OutputCommitter delegate;

public GCSOutputCommitter(OutputCommitter delegate) {
this.delegate = delegate;
}

@Override
public void setupJob(JobContext jobContext) throws IOException {
delegate.setupJob(jobContext);
}

@Override
public void cleanupJob(JobContext jobContext) throws IOException {
delegate.cleanupJob(jobContext);
}

@Override
public void commitJob(JobContext jobContext) throws IOException {
delegate.commitJob(jobContext);
}

@Override
public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
delegate.abortJob(jobContext, state);
}

@Override
public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
delegate.setupTask(taskAttemptContext);
}

@Override
public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
return delegate.needsTaskCommit(taskAttemptContext);
}

@Override
public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
/*On commit task, there seems to be some inconsistency across different hadoop implementations regarding the path
where output file is stored. For some implementations it appears in the path returned by FileOutputCommitter
getCommittedTaskPath and for some it does not.Before commit, the files appear to be consistently present in path
returned by FileOutputCommitter getTaskAttemptPath. Hence, find the output file from taskAttemptPath and add
metadata before commit happens. After commit, file would have been moved out of the taskAttemptPath. */
try {
updateMetricMetaData(taskAttemptContext);
} catch (Exception exception) {
LOG.warn("Unable to record metric for task. Metric emitted for the number of affected rows may be incorrect.",
exception);
}

delegate.commitTask(taskAttemptContext);
}

private void updateMetricMetaData(TaskAttemptContext taskAttemptContext) throws IOException {
if (!(delegate instanceof FileOutputCommitter)) {
return;
}

FileOutputCommitter fileOutputCommitter = (FileOutputCommitter) delegate;
Configuration configuration = taskAttemptContext.getConfiguration();
//Task is not yet committed, so should be available in attempt path
Path taskAttemptPath = fileOutputCommitter.getTaskAttemptPath(taskAttemptContext);
if (configuration == null || taskAttemptPath == null) {
return;
}

//read the count from configuration
String keyInConfig = String.format(RECORD_COUNT_FORMAT, taskAttemptContext.getTaskAttemptID());
Map<String, String> metaData = new HashMap<>();
metaData.put(GCSBatchSink.RECORD_COUNT, String.valueOf(configuration.getLong(keyInConfig, 0L)));
StorageClient storageClient = getStorageClient(configuration);
//update metadata on the output file present in the directory for this task
Blob blob = storageClient.pickABlob(taskAttemptPath.toString());
if (blob == null) {
LOG.info("Could not find a file in path {} to apply count metadata.", taskAttemptPath.toString());
return;
}
blob.toBuilder().setContentType(configuration.get(GCSBatchSink.CONTENT_TYPE)).setMetadata(metaData).build()
.update();
}

@VisibleForTesting
StorageClient getStorageClient(Configuration configuration) throws IOException {
String project = configuration.get(GCPUtils.FS_GS_PROJECT_ID);
String serviceAccount = null;
boolean isServiceAccountFile = GCPUtils.SERVICE_ACCOUNT_TYPE_FILE_PATH
.equals(configuration.get(GCPUtils.SERVICE_ACCOUNT_TYPE));
if (isServiceAccountFile) {
serviceAccount = configuration.get(GCPUtils.CLOUD_JSON_KEYFILE, null);
} else {
serviceAccount = configuration.get(String.format("%s.%s", GCPUtils.CLOUD_JSON_KEYFILE_PREFIX,
GCPUtils.CLOUD_ACCOUNT_JSON_SUFFIX));
}
return StorageClient.create(project, serviceAccount, isServiceAccountFile);
}

@Override
public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
delegate.abortTask(taskAttemptContext);
}

@Override
public boolean isCommitJobRepeatable(JobContext jobContext) throws IOException {
return delegate.isCommitJobRepeatable(jobContext);
}

@Override
public boolean isRecoverySupported(JobContext jobContext) throws IOException {
return delegate.isRecoverySupported(jobContext);
}

@Override
public boolean isRecoverySupported() {
return delegate.isRecoverySupported();
}

@Override
public void recoverTask(TaskAttemptContext taskContext) throws IOException {
delegate.recoverTask(taskContext);
}
}
Loading