Skip to content

[bq] Parquet Avro #183

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions spring-batch-bigquery/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Spring Batch extension which contains an `ItemWriter` and `ItemReader` implement

|https://en.wikipedia.org/wiki/JSON[JSON] |Supported |Supported | Supported
|https://en.wikipedia.org/wiki/Comma-separated_values[CSV] |Supported | |
|https://en.wikipedia.org/wiki/Apache_Parquet[Parquet] |Supported | |
|===

`ItemReader` support:
Expand Down
96 changes: 92 additions & 4 deletions spring-batch-bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,77 @@

<dependencies>
<!-- Compile -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
<version>2.51.0</version>
<exclusions>
<!-- Avoid version conflicts -->
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
</dependency>

<!-- Optional -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.15.2</version>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.4.1</version>
<optional>true</optional>
<exclusions>
<!-- End implementation should not be forced by library -->
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>*</artifactId>
</exclusion>

<!-- Avoid version conflicts -->
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Test -->
<dependency>
<groupId>ch.qos.logback</groupId>
Expand All @@ -96,6 +153,13 @@
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
<exclusions>
<!-- Avoid version conflicts -->
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.wiremock</groupId>
Expand Down Expand Up @@ -124,6 +188,9 @@
</goals>
</execution>
</executions>
<configuration>
<excludePackageNames>org.springframework.batch.extensions.bigquery.common.generated</excludePackageNames>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down Expand Up @@ -184,6 +251,7 @@
<plugin>
<groupId>io.spring.javaformat</groupId>
<artifactId>spring-javaformat-maven-plugin</artifactId>
<version>0.0.47</version>
<executions>
<execution>
<phase>validate</phase>
Expand All @@ -194,6 +262,26 @@
</execution>
</executions>
</plugin>

<!-- Avro code generation for tests -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.12.0</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/test/resources/</sourceDirectory>
<outputDirectory>${project.basedir}/target/generated-test-sources/avro/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -127,21 +127,18 @@ public void write(final Chunk<? extends T> chunk) throws Exception {
}

private ByteBuffer mapDataToBigQueryFormat(final List<? extends T> items) throws IOException {
final ByteBuffer byteBuffer;
try (final ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
try (final var outputStream = new ByteArrayOutputStream()) {
final List<byte[]> bytes = convertObjectsToByteArrays(items);

final List<byte[]> data = convertObjectsToByteArrays(items);

for (byte[] byteArray : data) {
for (final byte[] byteArray : bytes) {
outputStream.write(byteArray);
}

// It is extremely important to create larger ByteBuffer.
// If you call TableDataWriteChannel too many times, it leads to BigQuery
// exceptions.
byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
return ByteBuffer.wrap(outputStream.toByteArray());
}
return byteBuffer;
}

private void doWriteDataToBigQuery(final ByteBuffer byteBuffer) {
Expand Down Expand Up @@ -276,7 +273,8 @@ protected boolean tableHasDefinedSchema(final Table table) {
* In reality is called once.
* @param items current chunk
*/
protected abstract void doInitializeProperties(List<? extends T> items);
protected void doInitializeProperties(List<? extends T> items) {
}

/**
* Converts chunk into a byte array. Each data type should be converted with respect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ public class BigQueryLoadJobJsonItemWriter<T> extends BigQueryLoadJobBaseItemWri

private JsonObjectMarshaller<T> marshaller;

@Override
protected void doInitializeProperties(List<? extends T> items) {
// Unused
}

@Override
protected List<byte[]> convertObjectsToByteArrays(List<? extends T> items) {
return items.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* <ul>
* <li>JSON</li>
* <li>CSV</li>
* <li>Parquet</li>
* </ul>
*
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.springframework.batch.extensions.bigquery.writer.loadjob.parquet;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.LocalOutputFile;
import org.springframework.batch.extensions.bigquery.writer.loadjob.BigQueryLoadJobBaseItemWriter;
import org.springframework.util.Assert;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;

/**
* Parquet writer for BigQuery using Load Job.
*
* @author Volodymyr Perebykivskyi
* @since 0.2.0
* @see <a href="https://en.wikipedia.org/wiki/Apache_Parquet">Apache Parquet</a>
*/
public class BigQueryLoadJobParquetItemWriter extends BigQueryLoadJobBaseItemWriter<GenericRecord> {

private Schema schema;

private CompressionCodecName codecName;

/**
* A {@link Schema} that is used to identify fields.
* @param schema your schema
*/
public void setSchema(final Schema schema) {
this.schema = schema;
}

/**
* Specifies a codec for a compression algorithm.
* @param codecName your codec
*/
public void setCodecName(final CompressionCodecName codecName) {
this.codecName = codecName;
}

@Override
protected List<byte[]> convertObjectsToByteArrays(final List<? extends GenericRecord> items) {
if (items.isEmpty()) {
return List.of();
}

Path tempFile = null;
try {
tempFile = Files.createTempFile("parquet-avro-chunk-", null);

final ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(new LocalOutputFile(tempFile))
.withSchema(this.schema)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withCompressionCodec(this.codecName)
.build();

try (writer) {
for (final GenericRecord item : items) {
writer.write(item);
}
}
return List.of(Files.readAllBytes(tempFile));
}
catch (IOException e) {
logger.error(e);
return List.of();
}
finally {
if (tempFile != null) {
try {
Files.deleteIfExists(tempFile);
}
catch (IOException e) {
logger.error(e);
}
}
}
}

@Override
protected void performFormatSpecificChecks() {
Assert.notNull(this.schema, "Schema must be provided");
Assert.notNull(this.codecName, "Codec must be provided");
}

}
Loading