Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions src/main/java/org/antvoice/beam/BigQueryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,38 @@

import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.protobuf.ByteString;
import org.antvoice.beam.entities.BigQueryRow;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.AbstractMap;
public class BigQueryWriter extends PTransform<PCollection<BigQueryRow>, PDone> {

public class BigQueryWriter extends PTransform<PCollection<AbstractMap.SimpleImmutableEntry<String, String>>, PDone> {
private String _project;
private SerializableFunction<String, TableRow> _formatter;

private SerializableFunction<AbstractMap.SimpleImmutableEntry<String, String>, TableRow> _formatter;

public BigQueryWriter(SerializableFunction<AbstractMap.SimpleImmutableEntry<String, String>, TableRow> formatter) {
public BigQueryWriter(String project, SerializableFunction<String, TableRow> formatter) {
_project = project;
_formatter = formatter;
}

private static class DestinationComputer extends DynamicDestinations<AbstractMap.SimpleImmutableEntry<String, String>, String>{
private static class DestinationComputer
extends DynamicDestinations<BigQueryRow, String>{

private String _project;

public DestinationComputer(String project) {
_project = project;
}

@Override
public String getDestination(ValueInSingleWindow<AbstractMap.SimpleImmutableEntry<String, String>> element) {
return element.getValue().getKey();
public String getDestination(ValueInSingleWindow<BigQueryRow> element) {
return _project + ":" + element.getValue().getDataset() + "." + element.getValue().getTable() ;
}

@Override
Expand All @@ -45,12 +48,12 @@ public TableSchema getSchema(String destination) {
}

@Override
public PDone expand(PCollection<AbstractMap.SimpleImmutableEntry<String, String>> input) {
input.apply(BigQueryIO.<AbstractMap.SimpleImmutableEntry<String, String>>write()
.to(new DestinationComputer())
public PDone expand(PCollection<BigQueryRow> input) {
input.apply(BigQueryIO.<BigQueryRow>write()
.to(new DestinationComputer(_project))
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withFormatFunction(_formatter));
.withFormatFunction(row -> _formatter.apply(row.getRow())));

return PDone.in(input.getPipeline());
}
Expand Down
43 changes: 43 additions & 0 deletions src/main/java/org/antvoice/beam/GoogleStorageWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.antvoice.beam;

import org.antvoice.beam.entities.BigQueryRow;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;

public class GoogleStorageWriter
extends PTransform<PCollection<BigQueryRow>, PDone> {

private String _dumpLocation;
private long _currentTimeMillis;

public GoogleStorageWriter(String dumpLocation, long currentTimeMillis) {
_dumpLocation = dumpLocation;
this._currentTimeMillis = currentTimeMillis;
}

@Override
public PDone expand(PCollection<BigQueryRow> input) {
input.apply(
FileIO
.<String, BigQueryRow>writeDynamic()
.by((SerializableFunction<BigQueryRow, String>)
row -> String.format("%s/%s/", row.getDataset(), row.getTable()))
.via(Contextful.fn((SerializableFunction<BigQueryRow, String>) BigQueryRow::getRow),
TextIO.sink())
.to(_dumpLocation)
.withNaming(partition -> FileIO.Write.defaultNaming(partition, ""))
.withDestinationCoder(StringUtf8Coder.of())
.withNumShards(10));

return PDone.in(input.getPipeline());
}
}
51 changes: 13 additions & 38 deletions src/main/java/org/antvoice/beam/PubsubMessageProcessor.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.antvoice.beam;

import com.google.protobuf.ByteString;
import org.antvoice.beam.entities.BigQueryRow;
import org.antvoice.beam.helper.Zip;
import org.antvoice.beam.metrics.CounterProvider;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.transforms.DoFn;
Expand All @@ -10,29 +11,22 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.AbstractMap;
import java.util.Map;
import java.util.zip.GZIPInputStream;

public class PubsubMessageProcessor extends PTransform<PCollection<PubsubMessage>, PCollection<AbstractMap.SimpleImmutableEntry<String, String>>> {
public class PubsubMessageProcessor
extends PTransform<PCollection<PubsubMessage>, PCollection<BigQueryRow>> {

private static final Logger LOG = LoggerFactory.getLogger(PubsubMessageProcessor.class);
private String _project;

public PubsubMessageProcessor(String project) {
_project = project;
public PubsubMessageProcessor() {
}

public static class ExtractMessagesFn extends DoFn<PubsubMessage, AbstractMap.SimpleImmutableEntry<String, String>> {
private String _project;
public static class ExtractMessagesFn extends DoFn<PubsubMessage, BigQueryRow> {

private final CounterProvider _counterProvider = new CounterProvider();
public ExtractMessagesFn(String project) {
_project = project;
public ExtractMessagesFn() {
}

@ProcessElement
Expand All @@ -50,7 +44,8 @@ public void processElement(ProcessContext c) {

String dataset = metadata.get("dataset");
String table = metadata.get("table");
_counterProvider.getCounter("PubsubMessageProcessor", dataset + "." + table).inc(c.element().getPayload().length);
_counterProvider.getCounter("PubsubMessageProcessor", dataset + "." + table)
.inc(c.element().getPayload().length);

String message;
if(metadata.containsKey("compression")){
Expand All @@ -61,7 +56,7 @@ public void processElement(ProcessContext c) {
}

try {
message = UnzipMessage(c);
message = Zip.Unzip(c.element().getPayload());
} catch (IOException e) {
LOG.error("Cannot uncompress gzip message", e);
return;
Expand All @@ -75,33 +70,13 @@ public void processElement(ProcessContext c) {
}
}

AbstractMap.SimpleImmutableEntry row = new AbstractMap.SimpleImmutableEntry<>(_project + ":" + dataset + "." + table, message);
c.output(row);
}

private String UnzipMessage(ProcessContext c) throws IOException {
String message;ByteArrayInputStream bytein = new ByteArrayInputStream(c.element().getPayload());
GZIPInputStream gzip = new GZIPInputStream(bytein);
ByteArrayOutputStream byteout = new ByteArrayOutputStream();

int res = 0;
byte buf[] = new byte[1024];
while (res >= 0) {
res = gzip.read(buf, 0, buf.length);
if (res > 0) {
byteout.write(buf, 0, res);
}
}

byte uncompressed[] = byteout.toByteArray();
message = new String(uncompressed, "UTF-8");
return message;
c.output(new BigQueryRow(dataset, table, message));
}
}

@Override
public PCollection<AbstractMap.SimpleImmutableEntry<String, String>> expand(PCollection<PubsubMessage> input) {
return input.apply(ParDo.of(new ExtractMessagesFn(_project)));
public PCollection<BigQueryRow> expand(PCollection<PubsubMessage> input) {
return input.apply(ParDo.of(new ExtractMessagesFn()));
}
}

13 changes: 11 additions & 2 deletions src/main/java/org/antvoice/beam/StreamerOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;

public interface StreamerOptions extends GcpOptions {
Expand All @@ -15,7 +14,7 @@ public interface StreamerOptions extends GcpOptions {
String getSubscription();
void setSubscription(String value);

@Description("Serializtion type. Uses uncompressed JSON as a default. No other value at the moment")
@Description("Serialization type. Uses uncompressed JSON as a default. No other value at the moment")
String getFormat();
void setFormat(String value);

Expand All @@ -28,4 +27,14 @@ public interface StreamerOptions extends GcpOptions {
@Default.Boolean(false)
Boolean getAttached();
void setAttached(Boolean value);

@Description("Used to redirect the data to Google Storage instead of BigQuery")
@Default.Boolean(false)
Boolean getDumpGoogleStorage();
void setDumpGoogleStorage(Boolean isDumpGoogleStorage);

@Description("The GS directory where the data will be dumped (they will dump in gs://dumpLocation/dataset/table/*")
@Default.String("")
String getDumpLocation();
void setDumpLocation(String gsLocation);
}
24 changes: 18 additions & 6 deletions src/main/java/org/antvoice/beam/StreamerRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
*/
package org.antvoice.beam;

import org.antvoice.beam.entities.BigQueryRow;
import org.antvoice.beam.formatter.FormatterFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,19 +39,28 @@
* --runner=DataflowRunner
*/
public class StreamerRunner {
private static final Logger LOG = LoggerFactory.getLogger(StreamerRunner.class);

public static void main(String[] args) {
public static void main(String[] args) throws Exception {
StreamerOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(StreamerOptions.class);
Pipeline p = Pipeline.create(options);
long currentTimeMillis = System.currentTimeMillis();

p.apply("ReadLines", new PubsubReader(options.getTopic(), options.getSubscription()))
.apply("ExtractMessages", new PubsubMessageProcessor(options.getProject()))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration()))))
.apply("WriteBq", new BigQueryWriter(new FormatterFactory(options.getFormat()).getFormatter()));
PCollection<BigQueryRow> rows = p.apply("ReadLines", new PubsubReader(options.getTopic(), options.getSubscription()))
.apply("ExtractMessages", new PubsubMessageProcessor())
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration()))));

if(options.getDumpGoogleStorage()) {
if(options.getDumpLocation().isEmpty() || !options.getDumpLocation().startsWith("gs://")){
throw new Exception("Invalid dumpLocation parameter. Must be a valid Google Storage URI.");
}
rows.apply("WriteGS", new GoogleStorageWriter(options.getDumpLocation(), currentTimeMillis));
}else {
rows.apply("WriteBq", new BigQueryWriter(options.getProject(),
new FormatterFactory(options.getFormat()).getFormatter()));
}

if(options.getAttached()){
p.run().waitUntilFinish();
Expand All @@ -58,3 +69,4 @@ public static void main(String[] args) {
}
}
}

33 changes: 33 additions & 0 deletions src/main/java/org/antvoice/beam/entities/BigQueryRow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.antvoice.beam.entities;

import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;

@DefaultCoder(AvroCoder.class)
public class BigQueryRow {
private String dataset;
private String table;
private String row;

public BigQueryRow(){

}

public BigQueryRow(String dataset, String table, String row) {
this.dataset = dataset;
this.table = table;
this.row = row;
}

public String getDataset() {
return dataset;
}

public String getTable() {
return table;
}

public String getRow() {
return row;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.api.services.bigquery.model.TableRow;
import com.google.protobuf.ByteString;
import org.antvoice.beam.entities.BigQueryRow;
import org.apache.beam.sdk.transforms.SerializableFunction;

import java.util.AbstractMap;
Expand All @@ -14,7 +15,7 @@ public FormatterFactory(String format) {
_format = format;
}

public SerializableFunction<AbstractMap.SimpleImmutableEntry<String, String>, TableRow> getFormatter() {
public SerializableFunction<String, TableRow> getFormatter() {
// Only known format at the moment
return new JsonRowFormatter();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.antvoice.beam.formatter;

import com.google.api.services.bigquery.model.TableRow;
import org.antvoice.beam.entities.BigQueryRow;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.json.JSONArray;
import org.json.JSONException;
Expand All @@ -13,7 +14,7 @@
import java.util.ArrayList;
import java.util.List;

public class JsonRowFormatter implements SerializableFunction<AbstractMap.SimpleImmutableEntry<String, String>, TableRow> {
public class JsonRowFormatter implements SerializableFunction<String, TableRow> {

private static final Logger LOG = LoggerFactory.getLogger(JsonRowFormatter.class);

Expand Down Expand Up @@ -56,9 +57,9 @@ private TableRow convertRow(JSONObject object) {
}

@Override
public TableRow apply(AbstractMap.SimpleImmutableEntry<String, String> input) {
public TableRow apply(String input) {
try {
JSONObject obj = new JSONObject(input.getValue());
JSONObject obj = new JSONObject(input);

TableRow row = convertRow(obj);

Expand Down
Loading