Skip to content

Commit 8d0dcdd

Browse files
authored
[bq] Parquet Avro
1 parent f603b07 commit 8d0dcdd

29 files changed

+753
-51
lines changed

spring-batch-bigquery/README.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Spring Batch extension which contains an `ItemWriter` and `ItemReader` implement
1010

1111
|https://en.wikipedia.org/wiki/JSON[JSON] |Supported |Supported | Supported
1212
|https://en.wikipedia.org/wiki/Comma-separated_values[CSV] |Supported | |
13+
|https://en.wikipedia.org/wiki/Apache_Parquet[Parquet] |Supported | |
1314
|===
1415

1516
`ItemReader` support:

spring-batch-bigquery/pom.xml

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,20 +57,77 @@
5757

5858
<dependencies>
5959
<!-- Compile -->
60-
<dependency>
61-
<groupId>com.fasterxml.jackson.dataformat</groupId>
62-
<artifactId>jackson-dataformat-csv</artifactId>
63-
</dependency>
6460
<dependency>
6561
<groupId>com.google.cloud</groupId>
6662
<artifactId>google-cloud-bigquery</artifactId>
6763
<version>2.51.0</version>
64+
<exclusions>
65+
<!-- Avoid version conflicts -->
66+
<exclusion>
67+
<groupId>org.slf4j</groupId>
68+
<artifactId>*</artifactId>
69+
</exclusion>
70+
</exclusions>
6871
</dependency>
6972
<dependency>
7073
<groupId>org.springframework.batch</groupId>
7174
<artifactId>spring-batch-core</artifactId>
7275
</dependency>
7376

77+
<!-- Optional -->
78+
<dependency>
79+
<groupId>com.fasterxml.jackson.dataformat</groupId>
80+
<artifactId>jackson-dataformat-csv</artifactId>
81+
<optional>true</optional>
82+
</dependency>
83+
<dependency>
84+
<groupId>org.apache.parquet</groupId>
85+
<artifactId>parquet-avro</artifactId>
86+
<version>1.15.2</version>
87+
<optional>true</optional>
88+
<exclusions>
89+
<exclusion>
90+
<groupId>org.slf4j</groupId>
91+
<artifactId>*</artifactId>
92+
</exclusion>
93+
</exclusions>
94+
</dependency>
95+
<dependency>
96+
<groupId>org.apache.hadoop</groupId>
97+
<artifactId>hadoop-common</artifactId>
98+
<version>3.4.1</version>
99+
<optional>true</optional>
100+
<exclusions>
101+
<!-- End implementation should not be forced by library -->
102+
<exclusion>
103+
<groupId>org.slf4j</groupId>
104+
<artifactId>*</artifactId>
105+
</exclusion>
106+
107+
<!-- Avoid version conflicts -->
108+
<exclusion>
109+
<groupId>org.apache.avro</groupId>
110+
<artifactId>avro</artifactId>
111+
</exclusion>
112+
<exclusion>
113+
<groupId>org.apache.commons</groupId>
114+
<artifactId>commons-compress</artifactId>
115+
</exclusion>
116+
<exclusion>
117+
<groupId>com.google.guava</groupId>
118+
<artifactId>guava</artifactId>
119+
</exclusion>
120+
<exclusion>
121+
<groupId>org.apache.httpcomponents</groupId>
122+
<artifactId>httpclient</artifactId>
123+
</exclusion>
124+
<exclusion>
125+
<groupId>org.xerial.snappy</groupId>
126+
<artifactId>snappy-java</artifactId>
127+
</exclusion>
128+
</exclusions>
129+
</dependency>
130+
74131
<!-- Test -->
75132
<dependency>
76133
<groupId>ch.qos.logback</groupId>
@@ -96,6 +153,13 @@
96153
<groupId>org.testcontainers</groupId>
97154
<artifactId>junit-jupiter</artifactId>
98155
<scope>test</scope>
156+
<exclusions>
157+
<!-- Avoid version conflicts -->
158+
<exclusion>
159+
<groupId>org.apache.commons</groupId>
160+
<artifactId>commons-compress</artifactId>
161+
</exclusion>
162+
</exclusions>
99163
</dependency>
100164
<dependency>
101165
<groupId>org.wiremock</groupId>
@@ -124,6 +188,9 @@
124188
</goals>
125189
</execution>
126190
</executions>
191+
<configuration>
192+
<excludePackageNames>org.springframework.batch.extensions.bigquery.common.generated</excludePackageNames>
193+
</configuration>
127194
</plugin>
128195
<plugin>
129196
<groupId>org.apache.maven.plugins</groupId>
@@ -184,6 +251,7 @@
184251
<plugin>
185252
<groupId>io.spring.javaformat</groupId>
186253
<artifactId>spring-javaformat-maven-plugin</artifactId>
254+
<version>0.0.47</version>
187255
<executions>
188256
<execution>
189257
<phase>validate</phase>
@@ -194,6 +262,26 @@
194262
</execution>
195263
</executions>
196264
</plugin>
265+
266+
<!-- Avro code generation for tests -->
267+
<plugin>
268+
<groupId>org.apache.avro</groupId>
269+
<artifactId>avro-maven-plugin</artifactId>
270+
<version>1.12.0</version>
271+
<executions>
272+
<execution>
273+
<id>schemas</id>
274+
<phase>generate-sources</phase>
275+
<goals>
276+
<goal>schema</goal>
277+
</goals>
278+
<configuration>
279+
<sourceDirectory>${project.basedir}/src/test/resources/</sourceDirectory>
280+
<outputDirectory>${project.basedir}/target/generated-test-sources/avro/</outputDirectory>
281+
</configuration>
282+
</execution>
283+
</executions>
284+
</plugin>
197285
</plugins>
198286
</build>
199287
</project>

spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/BigQueryLoadJobBaseItemWriter.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -127,21 +127,18 @@ public void write(final Chunk<? extends T> chunk) throws Exception {
127127
}
128128

129129
private ByteBuffer mapDataToBigQueryFormat(final List<? extends T> items) throws IOException {
130-
final ByteBuffer byteBuffer;
131-
try (final ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
130+
try (final var outputStream = new ByteArrayOutputStream()) {
131+
final List<byte[]> bytes = convertObjectsToByteArrays(items);
132132

133-
final List<byte[]> data = convertObjectsToByteArrays(items);
134-
135-
for (byte[] byteArray : data) {
133+
for (final byte[] byteArray : bytes) {
136134
outputStream.write(byteArray);
137135
}
138136

139137
// It is extremely important to create larger ByteBuffer.
140138
// If you call TableDataWriteChannel too many times, it leads to BigQuery
141139
// exceptions.
142-
byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
140+
return ByteBuffer.wrap(outputStream.toByteArray());
143141
}
144-
return byteBuffer;
145142
}
146143

147144
private void doWriteDataToBigQuery(final ByteBuffer byteBuffer) {
@@ -276,7 +273,8 @@ protected boolean tableHasDefinedSchema(final Table table) {
276273
* In reality is called once.
277274
* @param items current chunk
278275
*/
279-
protected abstract void doInitializeProperties(List<? extends T> items);
276+
protected void doInitializeProperties(List<? extends T> items) {
277+
}
280278

281279
/**
282280
* Converts chunk into a byte array. Each data type should be converted with respect

spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/json/BigQueryLoadJobJsonItemWriter.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,6 @@ public class BigQueryLoadJobJsonItemWriter<T> extends BigQueryLoadJobBaseItemWri
4242

4343
private JsonObjectMarshaller<T> marshaller;
4444

45-
@Override
46-
protected void doInitializeProperties(List<? extends T> items) {
47-
// Unused
48-
}
49-
5045
@Override
5146
protected List<byte[]> convertObjectsToByteArrays(List<? extends T> items) {
5247
return items.stream()

spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/package-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
* <ul>
2424
* <li>JSON</li>
2525
* <li>CSV</li>
26+
* <li>Parquet</li>
2627
* </ul>
2728
*
2829
* <p>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package org.springframework.batch.extensions.bigquery.writer.loadjob.parquet;
2+
3+
import org.apache.avro.Schema;
4+
import org.apache.avro.generic.GenericRecord;
5+
import org.apache.parquet.avro.AvroParquetWriter;
6+
import org.apache.parquet.hadoop.ParquetFileWriter;
7+
import org.apache.parquet.hadoop.ParquetWriter;
8+
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
9+
import org.apache.parquet.io.LocalOutputFile;
10+
import org.springframework.batch.extensions.bigquery.writer.loadjob.BigQueryLoadJobBaseItemWriter;
11+
import org.springframework.util.Assert;
12+
13+
import java.io.IOException;
14+
import java.nio.file.Files;
15+
import java.nio.file.Path;
16+
import java.util.List;
17+
18+
/**
19+
* Parquet writer for BigQuery using Load Job.
20+
*
21+
* @author Volodymyr Perebykivskyi
22+
* @since 0.2.0
23+
* @see <a href="https://en.wikipedia.org/wiki/Apache_Parquet">Apache Parquet</a>
24+
*/
25+
public class BigQueryLoadJobParquetItemWriter extends BigQueryLoadJobBaseItemWriter<GenericRecord> {
26+
27+
private Schema schema;
28+
29+
private CompressionCodecName codecName;
30+
31+
/**
32+
* A {@link Schema} that is used to identify fields.
33+
* @param schema your schema
34+
*/
35+
public void setSchema(final Schema schema) {
36+
this.schema = schema;
37+
}
38+
39+
/**
40+
* Specifies a codec for a compression algorithm.
41+
* @param codecName your codec
42+
*/
43+
public void setCodecName(final CompressionCodecName codecName) {
44+
this.codecName = codecName;
45+
}
46+
47+
@Override
48+
protected List<byte[]> convertObjectsToByteArrays(final List<? extends GenericRecord> items) {
49+
if (items.isEmpty()) {
50+
return List.of();
51+
}
52+
53+
Path tempFile = null;
54+
try {
55+
tempFile = Files.createTempFile("parquet-avro-chunk-", null);
56+
57+
final ParquetWriter<GenericRecord> writer = AvroParquetWriter
58+
.<GenericRecord>builder(new LocalOutputFile(tempFile))
59+
.withSchema(this.schema)
60+
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
61+
.withCompressionCodec(this.codecName)
62+
.build();
63+
64+
try (writer) {
65+
for (final GenericRecord item : items) {
66+
writer.write(item);
67+
}
68+
}
69+
return List.of(Files.readAllBytes(tempFile));
70+
}
71+
catch (IOException e) {
72+
logger.error(e);
73+
return List.of();
74+
}
75+
finally {
76+
if (tempFile != null) {
77+
try {
78+
Files.deleteIfExists(tempFile);
79+
}
80+
catch (IOException e) {
81+
logger.error(e);
82+
}
83+
}
84+
}
85+
}
86+
87+
@Override
88+
protected void performFormatSpecificChecks() {
89+
Assert.notNull(this.schema, "Schema must be provided");
90+
Assert.notNull(this.codecName, "Codec must be provided");
91+
}
92+
93+
}

0 commit comments

Comments
 (0)