Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import com.google.gson.Gson;
Expand Down Expand Up @@ -77,9 +79,12 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;

/**
* Tests reading from GCS (Google Cloud Storage) and writing to GCS from within a Dataproc cluster.
*
* TODO: PLUGIN-553 for GCSMultiFiles sink plugin property 'format' with macro wouldn't work
*/
public class GCSTest extends DataprocETLTestBase {

Expand All @@ -90,6 +95,7 @@ public class GCSTest extends DataprocETLTestBase {
private static final String GCS_MOVE_PLUGIN_NAME = "GCSMove";
private static final String GCS_COPY_PLUGIN_NAME = "GCSCopy";
private static final String SINK_PLUGIN_NAME = "GCS";
private static final String MULTI_SINK_PLUGIN_NAME = "GCSMultiFiles";
private static final String SOURCE_PLUGIN_NAME = "GCSFile";
private static final Schema ALL_DT_SCHEMA = Schema.recordOf(
"record",
Expand Down Expand Up @@ -118,6 +124,8 @@ public class GCSTest extends DataprocETLTestBase {

private static Storage storage;
private List<String> markedForDeleteBuckets;
private static final String CSV_CONTENT_TYPE = "text/csv";
private static final String MULTISINK_RUNTIME_ARG = "multisink.%s";

@BeforeClass
public static void testClassSetup() throws IOException {
Expand Down Expand Up @@ -187,6 +195,10 @@ private String createPath(Bucket bucket, String blobName) {
return String.format("gs://%s/%s", bucket.getName(), blobName);
}

private void insertData(Bucket bucket, String inputPath, String... data) {
bucket.create(inputPath, String.join("\n", Arrays.asList(data)).getBytes(StandardCharsets.UTF_8));
}

@Test
public void testGCSCopy() throws Exception {
String prefix = "cdap-gcs-cp-test";
Expand Down Expand Up @@ -707,7 +719,7 @@ public void testGcsSourceFormats() throws Exception {
String line2 = "2,Terry Perez,[email protected]";
String line3 = "3,Jack Ferguson,[email protected]";
String inputPath = "input";
bucket.create(inputPath, String.join("\n", Arrays.asList(line1, line2, line3)).getBytes(StandardCharsets.UTF_8));
insertData(bucket, inputPath, line1, line2, line3);

String suffix = UUID.randomUUID().toString();
/*
Expand Down Expand Up @@ -761,7 +773,8 @@ public void testGcsSourceFormats() throws Exception {
id,first,last,email,address,city,state,zip
1,Marilyn,Hawkins,[email protected],238 Melvin Way,Palo Alto,CA,94302
*/
ETLStage sink = new ETLStage("sink", createSinkPlugin("csv", createPath(bucket, "output"), schema));
ETLStage sink = new ETLStage("sink", createSinkPlugin("csv", createPath(bucket, "output"),
schema, CSV_CONTENT_TYPE));
pipelineConfig = ETLBatchConfig.builder().addStage(sink);
for (String format : formats) {
String path = String.format("%s/%s", createPath(bucket, OUTPUT_BLOB_NAME), format);
Expand All @@ -776,6 +789,7 @@ public void testGcsSourceFormats() throws Exception {

Map<String, Integer> lineCounts = new HashMap<>();
List<String> results = getResultBlobsContent(bucket, "output");
List<String> resultBlobsContentType = getResultBlobsContentType(bucket, "output");
for (String result : results) {
for (String line : result.split("\n")) {
lineCounts.putIfAbsent(line, 0);
Expand All @@ -787,6 +801,71 @@ public void testGcsSourceFormats() throws Exception {
expected.put(line2, formats.size());
expected.put(line3, formats.size());
Assert.assertEquals(expected, lineCounts);
Assert.assertEquals(CSV_CONTENT_TYPE, resultBlobsContentType.get(0));
}

@Test
public void testMultiSinkContentType() throws Exception {
String bucketName = "cask-gcs-multisink-" + UUID.randomUUID().toString();
Bucket bucket = createBucket(bucketName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to delete the bucket at the end of the test.
As a best practice, all the resources created in the test should be deleted upon test assertion completion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already doing it, if you follow the method createBucket(bucketName) the bucket name is marked for delete after the test is done, also we run the the test and the bucket is deleted when the test is finished

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see.


Schema schema = Schema.recordOf("customer",
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
Schema.Field.of("name", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
Schema.Field.of("email", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
Schema.Field.of("departament", Schema.nullableOf(Schema.of(Schema.Type.STRING))));

Schema outputSchema = Schema.recordOf("output.schema",
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
Schema.Field.of("name", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
Schema.Field.of("email", Schema.nullableOf(Schema.of(Schema.Type.STRING))));

String line1 = "1,Marilyn Hawkins,[email protected],DepartmentA";
String line2 = "2,Terry Perez,[email protected],DepartmentB";
String line3 = "3,Jack Ferguson,[email protected],DepartmentA";
String inputPath = "input";
insertData(bucket, inputPath, line1, line2, line3);

Map<String, String> inputSourceConfig = new HashMap<>();
inputSourceConfig.put("schema", schema.toString());
inputSourceConfig.put("format", "${sourceFormat}");
inputSourceConfig.put("referenceName", "source_" + UUID.randomUUID().toString());
inputSourceConfig.put("project", getProjectId());
inputSourceConfig.put("path", createPath(bucket, inputPath));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you make it a macro.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In GCSMultiBatchSink if the format is macro (null) this always fails with null pointer exception.
Please check the line 75 in this file:
src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java.

That is the reason why we can not implement it right now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. We should still include it as a macro and have that bug fixed.
Could you please file a corresponding jira?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Jira Ticket regarding null pointer exception in GCSMultiBatchSink if the format is macro:
https://cdap.atlassian.net/browse/PLUGIN-553

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

ETLStage source = new ETLStage("source",
new ETLPlugin(SOURCE_PLUGIN_NAME,
BatchSource.PLUGIN_TYPE,
inputSourceConfig,
GOOGLE_CLOUD_ARTIFACT));

ETLBatchConfig.Builder pipelineConfig = ETLBatchConfig.builder().addStage(source);

String path = createPath(bucket, OUTPUT_BLOB_NAME);
ETLStage sink = new ETLStage("multsink", createMultiSinkPlugin("csv"));
pipelineConfig.addStage(sink).addConnection(source.getName(), sink.getName());

AppRequest<ETLBatchConfig> appRequest = getBatchAppRequestV2(pipelineConfig.build());
ApplicationId appId = TEST_NAMESPACE.app("GCSMultiSinkContentType");
ApplicationManager appManager = deployApplication(appId, appRequest);

String multisink1 = String.format(MULTISINK_RUNTIME_ARG, "DepartmentA");
String multisink2 = String.format(MULTISINK_RUNTIME_ARG, "DepartmentB");
Map<String, String> args = new HashMap<>();
args.put(multisink1, outputSchema.toString());
args.put(multisink2, outputSchema.toString());
args.put("sourceFormat", "csv");
args.put("multiSinkPath", path);
args.put("multiSinkProjectId", getProjectId());
args.put("multiSinkSchema", schema.toString());
args.put("multiSinkSplitField", "departament");
args.put("contentType", CSV_CONTENT_TYPE);
startWorkFlow(appManager, ProgramRunStatus.COMPLETED, args);

List<String> multisinkContentType1 = getResultBlobsContentType(bucket, OUTPUT_BLOB_NAME + "/DepartmentA");
List<String> multisinkContentType2 = getResultBlobsContentType(bucket, OUTPUT_BLOB_NAME + "/DepartmentB");
Assert.assertEquals(CSV_CONTENT_TYPE, multisinkContentType1.get(0));
Assert.assertEquals(CSV_CONTENT_TYPE, multisinkContentType2.get(0));

}

private ETLStage createSourceStage(String format, String path, String regex, Schema schema) {
Expand All @@ -803,14 +882,32 @@ private ETLStage createSourceStage(String format, String path, String regex, Sch
}

private ETLPlugin createSinkPlugin(String format, String path, Schema schema) {
return new ETLPlugin(SINK_PLUGIN_NAME, BatchSink.PLUGIN_TYPE,
ImmutableMap.of(
"path", path,
"format", format,
"project", getProjectId(),
"referenceName", format,
"schema", schema.toString()),
GOOGLE_CLOUD_ARTIFACT);
return createSinkPlugin(format, path, schema, null);
}

private ETLPlugin createSinkPlugin(String format, String path, Schema schema,@Nullable String contentType) {
ImmutableMap.Builder<String, String> propertyBuilder = new ImmutableMap.Builder<String, String>()
.put("path", path)
.put("format", format)
.put("project", getProjectId())
.put("referenceName", format)
.put("schema", schema.toString());
if (!Strings.isNullOrEmpty(contentType)) {
propertyBuilder.put("contentType", contentType);
}
return new ETLPlugin(SINK_PLUGIN_NAME, BatchSink.PLUGIN_TYPE, propertyBuilder.build(), GOOGLE_CLOUD_ARTIFACT);
}

private ETLPlugin createMultiSinkPlugin(String sinkFormat) {
Map<String, String> map = new HashMap<>();
map.put("path", "${multiSinkPath}");
map.put("format", sinkFormat);
map.put("project", "${multiSinkProjectId}");
map.put("schema", "${multiSinkSchema}");
map.put("referenceName", "gcs-multi-input");
map.put("splitField", "${multiSinkSplitField}");
map.put("contentType", "${contentType}");
return new ETLPlugin(MULTI_SINK_PLUGIN_NAME, BatchSink.PLUGIN_TYPE, map, GOOGLE_CLOUD_ARTIFACT);
}

static class DataTypesRecord {
Expand Down Expand Up @@ -881,4 +978,19 @@ private static String blobContentToString(Blob blob) {
return null;
}

/**
* Reads content type of files in path
*/
private List<String> getResultBlobsContentType(Bucket bucket, String path) {
String successFile = path + "/_SUCCESS";
assertExists(bucket, successFile);

return StreamSupport.stream(bucket.list().iterateAll().spliterator(), false)
.filter(blob -> blob.getName().startsWith(path + "/")
&& !successFile.equals(blob.getName()) && !blob.getName().endsWith("/"))
.map(BlobInfo::getContentType)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}

}