From 0a392bc76422414d1a0ded887ef335d124a8d586 Mon Sep 17 00:00:00 2001 From: Volodymyr Perebykivskyi Date: Fri, 18 Jul 2025 07:45:56 +0300 Subject: [PATCH] [bq] Parquet Avro Signed-off-by: Dgray16 --- spring-batch-bigquery/README.adoc | 1 + spring-batch-bigquery/pom.xml | 96 ++++++++++- .../BigQueryLoadJobBaseItemWriter.java | 14 +- .../json/BigQueryLoadJobJsonItemWriter.java | 5 - .../bigquery/writer/loadjob/package-info.java | 1 + .../BigQueryLoadJobParquetItemWriter.java | 93 +++++++++++ ...gQueryLoadJobParquetItemWriterBuilder.java | 138 ++++++++++++++++ .../extensions/bigquery/common/PersonDto.java | 11 ++ .../bigquery/common/ResultVerifier.java | 27 +++- .../bigquery/common/TestConstants.java | 33 +++- ...latorBigQueryLoadJobCsvItemWriterTest.java | 2 +- ...atorBigQueryLoadJobJsonItemWriterTest.java | 2 +- ...rBigQueryLoadJobParquetItemWriterTest.java | 58 +++++++ ...eryWriteApiCommitedJsonItemWriterTest.java | 4 +- ...ueryWriteApiPendingJsonItemWriterTest.java | 4 +- .../reader/GcloudBigQueryItemReaderTest.java | 6 +- .../GcloudBaseBigQueryItemWriterTest.java | 2 +- ...cloudBigQueryLoadJobCsvItemWriterTest.java | 2 +- ...loudBigQueryLoadJobJsonItemWriterTest.java | 2 +- .../unit/reader/BigQueryItemReaderTest.java | 2 +- .../unit/reader/builder/RecordMapperTest.java | 2 +- .../BigQueryLoadJobBaseItemWriterTest.java | 8 +- .../csv/BigQueryLoadJobCsvItemWriterTest.java | 4 +- .../BigQueryLoadJobJsonItemWriterTest.java | 8 +- .../BigQueryLoadJobParquetItemWriterTest.java | 153 ++++++++++++++++++ ...ryLoadJobParquetItemWriterBuilderTest.java | 103 ++++++++++++ ...eryWriteApiCommitedJsonItemWriterTest.java | 4 +- ...ueryWriteApiPendingJsonItemWriterTest.java | 4 +- .../src/test/resources/persons.avsc | 15 ++ 29 files changed, 753 insertions(+), 51 deletions(-) create mode 100644 spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/parquet/BigQueryLoadJobParquetItemWriter.java create mode 100644 spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/parquet/builder/BigQueryLoadJobParquetItemWriterBuilder.java create mode 100644 spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/loadjob/parquet/EmulatorBigQueryLoadJobParquetItemWriterTest.java create mode 100644 spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/parquet/BigQueryLoadJobParquetItemWriterTest.java create mode 100644 spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/parquet/builder/BigQueryLoadJobParquetItemWriterBuilderTest.java create mode 100644 spring-batch-bigquery/src/test/resources/persons.avsc diff --git a/spring-batch-bigquery/README.adoc b/spring-batch-bigquery/README.adoc index aa9d5a4..6524725 100644 --- a/spring-batch-bigquery/README.adoc +++ b/spring-batch-bigquery/README.adoc @@ -10,6 +10,7 @@ Spring Batch extension which contains an `ItemWriter` and `ItemReader` implement |https://en.wikipedia.org/wiki/JSON[JSON] |Supported |Supported | Supported |https://en.wikipedia.org/wiki/Comma-separated_values[CSV] |Supported | | +|https://en.wikipedia.org/wiki/Apache_Parquet[Parquet] |Supported | | |=== `ItemReader` support: diff --git a/spring-batch-bigquery/pom.xml b/spring-batch-bigquery/pom.xml index 0ab1b36..e9a97df 100644 --- a/spring-batch-bigquery/pom.xml +++ b/spring-batch-bigquery/pom.xml @@ -57,20 +57,77 @@ - - com.fasterxml.jackson.dataformat - jackson-dataformat-csv - com.google.cloud google-cloud-bigquery 2.51.0 + + + + org.slf4j + * + + org.springframework.batch spring-batch-core + + + com.fasterxml.jackson.dataformat + jackson-dataformat-csv + true + + + org.apache.parquet + parquet-avro + 1.15.2 + true + + + org.slf4j + * + + + + + org.apache.hadoop + hadoop-common + 3.4.1 + true + + + + org.slf4j + * + + + + + org.apache.avro + avro + + + org.apache.commons + commons-compress + + + com.google.guava + guava + + + org.apache.httpcomponents + httpclient + + + org.xerial.snappy + snappy-java + + + + ch.qos.logback @@ -96,6 +153,13 @@ org.testcontainers junit-jupiter test + + + + org.apache.commons + commons-compress + + org.wiremock @@ -124,6 +188,9 @@ + + org.springframework.batch.extensions.bigquery.common.generated + org.apache.maven.plugins @@ -184,6 +251,7 @@ io.spring.javaformat spring-javaformat-maven-plugin + 0.0.47 validate @@ -194,6 +262,26 @@ + + + + org.apache.avro + avro-maven-plugin + 1.12.0 + + + schemas + generate-sources + + schema + + + ${project.basedir}/src/test/resources/ + ${project.basedir}/target/generated-test-sources/avro/ + + + + diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/BigQueryLoadJobBaseItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/BigQueryLoadJobBaseItemWriter.java index d8b6573..afac7ee 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/BigQueryLoadJobBaseItemWriter.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/BigQueryLoadJobBaseItemWriter.java @@ -127,21 +127,18 @@ public void write(final Chunk chunk) throws Exception { } private ByteBuffer mapDataToBigQueryFormat(final List items) throws IOException { - final ByteBuffer byteBuffer; - try (final ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + try (final var outputStream = new ByteArrayOutputStream()) { + final List bytes = convertObjectsToByteArrays(items); - final List data = convertObjectsToByteArrays(items); - - for (byte[] byteArray : data) { + for (final byte[] byteArray : bytes) { outputStream.write(byteArray); } // It is extremely important to create larger ByteBuffer. // If you call TableDataWriteChannel too many times, it leads to BigQuery // exceptions. - byteBuffer = ByteBuffer.wrap(outputStream.toByteArray()); + return ByteBuffer.wrap(outputStream.toByteArray()); } - return byteBuffer; } private void doWriteDataToBigQuery(final ByteBuffer byteBuffer) { @@ -276,7 +273,8 @@ protected boolean tableHasDefinedSchema(final Table table) { * In reality is called once. * @param items current chunk */ - protected abstract void doInitializeProperties(List items); + protected void doInitializeProperties(List items) { + } /** * Converts chunk into a byte array. Each data type should be converted with respect diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/json/BigQueryLoadJobJsonItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/json/BigQueryLoadJobJsonItemWriter.java index e36ea5d..a5a5cee 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/json/BigQueryLoadJobJsonItemWriter.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/json/BigQueryLoadJobJsonItemWriter.java @@ -42,11 +42,6 @@ public class BigQueryLoadJobJsonItemWriter extends BigQueryLoadJobBaseItemWri private JsonObjectMarshaller marshaller; - @Override - protected void doInitializeProperties(List items) { - // Unused - } - @Override protected List convertObjectsToByteArrays(List items) { return items.stream() diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/package-info.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/package-info.java index 1e110f3..0b74efb 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/package-info.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/package-info.java @@ -23,6 +23,7 @@ *
    *
  • JSON
  • *
  • CSV
  • + *
  • Parquet
  • *
* *

diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/parquet/BigQueryLoadJobParquetItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/parquet/BigQueryLoadJobParquetItemWriter.java new file mode 100644 index 0000000..0205b04 --- /dev/null +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/parquet/BigQueryLoadJobParquetItemWriter.java @@ -0,0 +1,93 @@ +package org.springframework.batch.extensions.bigquery.writer.loadjob.parquet; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.LocalOutputFile; +import org.springframework.batch.extensions.bigquery.writer.loadjob.BigQueryLoadJobBaseItemWriter; +import org.springframework.util.Assert; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +/** + * Parquet writer for BigQuery using Load Job. + * + * @author Volodymyr Perebykivskyi + * @since 0.2.0 + * @see Apache Parquet + */ +public class BigQueryLoadJobParquetItemWriter extends BigQueryLoadJobBaseItemWriter { + + private Schema schema; + + private CompressionCodecName codecName; + + /** + * A {@link Schema} that is used to identify fields. + * @param schema your schema + */ + public void setSchema(final Schema schema) { + this.schema = schema; + } + + /** + * Specifies a codec for a compression algorithm. + * @param codecName your codec + */ + public void setCodecName(final CompressionCodecName codecName) { + this.codecName = codecName; + } + + @Override + protected List convertObjectsToByteArrays(final List items) { + if (items.isEmpty()) { + return List.of(); + } + + Path tempFile = null; + try { + tempFile = Files.createTempFile("parquet-avro-chunk-", null); + + final ParquetWriter writer = AvroParquetWriter + .builder(new LocalOutputFile(tempFile)) + .withSchema(this.schema) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withCompressionCodec(this.codecName) + .build(); + + try (writer) { + for (final GenericRecord item : items) { + writer.write(item); + } + } + return List.of(Files.readAllBytes(tempFile)); + } + catch (IOException e) { + logger.error(e); + return List.of(); + } + finally { + if (tempFile != null) { + try { + Files.deleteIfExists(tempFile); + } + catch (IOException e) { + logger.error(e); + } + } + } + } + + @Override + protected void performFormatSpecificChecks() { + Assert.notNull(this.schema, "Schema must be provided"); + Assert.notNull(this.codecName, "Codec must be provided"); + } + +} diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/parquet/builder/BigQueryLoadJobParquetItemWriterBuilder.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/parquet/builder/BigQueryLoadJobParquetItemWriterBuilder.java new file mode 100644 index 0000000..887a562 --- /dev/null +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/parquet/builder/BigQueryLoadJobParquetItemWriterBuilder.java @@ -0,0 +1,138 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.writer.loadjob.parquet.builder; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.WriteChannelConfiguration; +import org.apache.avro.Schema; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.springframework.batch.extensions.bigquery.writer.loadjob.parquet.BigQueryLoadJobParquetItemWriter; + +import java.util.function.Consumer; + +/** + * A builder for {@link BigQueryLoadJobParquetItemWriter}. + * + * @author Volodymyr Perebykivskyi + * @since 0.2.0 + * @see Examples + */ +public class BigQueryLoadJobParquetItemWriterBuilder { + + private Schema schema; + + private CompressionCodecName codecName; + + private Consumer jobConsumer; + + private DatasetInfo datasetInfo; + + private WriteChannelConfiguration writeChannelConfig; + + private BigQuery bigQuery; + + /** + * Instructs which fields are expected. + * @param schema your schema + * @return {@link BigQueryLoadJobParquetItemWriterBuilder} + * @see BigQueryLoadJobParquetItemWriter#setSchema(Schema) + */ + public BigQueryLoadJobParquetItemWriterBuilder schema(final Schema schema) { + this.schema = schema; + return this; + } + + /** + * Instructs what is the expected compression algorithm. + * @param codecName your codec + * @return {@link BigQueryLoadJobParquetItemWriterBuilder} + * @see BigQueryLoadJobParquetItemWriter#setSchema(Schema) + */ + public BigQueryLoadJobParquetItemWriterBuilder codecName(final CompressionCodecName codecName) { + this.codecName = codecName; + return this; + } + + /** + * Provides additional information about the + * {@link com.google.cloud.bigquery.Dataset}. + * @param datasetInfo BigQuery dataset info + * @return {@link BigQueryLoadJobParquetItemWriterBuilder} + * @see BigQueryLoadJobParquetItemWriter#setDatasetInfo(DatasetInfo) + */ + public BigQueryLoadJobParquetItemWriterBuilder datasetInfo(DatasetInfo datasetInfo) { + this.datasetInfo = datasetInfo; + return this; + } + + /** + * Callback when {@link Job} will be finished. + * @param consumer your consumer + * @return {@link BigQueryLoadJobParquetItemWriterBuilder} + * @see BigQueryLoadJobParquetItemWriter#setJobConsumer(Consumer) + */ + public BigQueryLoadJobParquetItemWriterBuilder jobConsumer(Consumer consumer) { + this.jobConsumer = consumer; + return this; + } + + /** + * Describes what should be written (format) and its destination (table). + * @param configuration BigQuery channel configuration + * @return {@link BigQueryLoadJobParquetItemWriterBuilder} + * @see BigQueryLoadJobParquetItemWriter#setWriteChannelConfig(WriteChannelConfiguration) + */ + public BigQueryLoadJobParquetItemWriterBuilder writeChannelConfig(WriteChannelConfiguration configuration) { + this.writeChannelConfig = configuration; + return this; + } + + /** + * BigQuery service, responsible for API calls. + * @param bigQuery BigQuery service + * @return {@link BigQueryLoadJobParquetItemWriter} + * @see BigQueryLoadJobParquetItemWriter#setBigQuery(BigQuery) + */ + public BigQueryLoadJobParquetItemWriterBuilder bigQuery(BigQuery bigQuery) { + this.bigQuery = bigQuery; + return this; + } + + /** + * Please remember about + * {@link BigQueryLoadJobParquetItemWriter#afterPropertiesSet()}. + * @return {@link BigQueryLoadJobParquetItemWriter} + */ + public BigQueryLoadJobParquetItemWriter build() { + BigQueryLoadJobParquetItemWriter writer = new BigQueryLoadJobParquetItemWriter(); + + writer.setCodecName(this.codecName == null ? CompressionCodecName.UNCOMPRESSED : this.codecName); + writer.setBigQuery(this.bigQuery == null ? BigQueryOptions.getDefaultInstance().getService() : this.bigQuery); + + writer.setSchema(this.schema); + writer.setWriteChannelConfig(this.writeChannelConfig); + writer.setJobConsumer(this.jobConsumer); + writer.setDatasetInfo(this.datasetInfo); + + return writer; + } + +} \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/PersonDto.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/PersonDto.java index 15648e9..b7df100 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/PersonDto.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/PersonDto.java @@ -23,6 +23,8 @@ import com.google.cloud.bigquery.storage.v1.TableFieldSchema; import com.google.cloud.bigquery.storage.v1.TableSchema; +import java.util.List; + @JsonPropertyOrder(value = { TestConstants.NAME, TestConstants.AGE }) public record PersonDto(String name, Integer age) { @@ -54,4 +56,13 @@ public static TableSchema getWriteApiSchema() { return TableSchema.newBuilder().addFields(name).addFields(age).build(); } + public static org.apache.avro.Schema getAvroSchema() { + var name = new org.apache.avro.Schema.Field(TestConstants.NAME, + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)); + var age = new org.apache.avro.Schema.Field(TestConstants.AGE, + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT)); + return org.apache.avro.Schema.createRecord("PersonAvroDto", "doc-1", + "org.springframework.batch.extensions.bigquery.common.generated", false, List.of(name, age)); + } + } \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/ResultVerifier.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/ResultVerifier.java index 3cd35ef..8a16cf7 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/ResultVerifier.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/ResultVerifier.java @@ -18,6 +18,7 @@ import com.google.cloud.bigquery.FieldValueList; import com.google.cloud.bigquery.TableResult; +import org.apache.avro.generic.GenericRecord; import org.junit.jupiter.api.Assertions; import org.springframework.batch.item.Chunk; @@ -28,7 +29,7 @@ public final class ResultVerifier { private ResultVerifier() { } - public static void verifyTableResult(Chunk expected, TableResult actual) { + public static void verifyJavaRecordTableResult(Chunk expected, TableResult actual) { List actualList = actual.streamValues().toList(); Assertions.assertEquals(expected.size(), actual.getTotalRows()); @@ -51,4 +52,28 @@ public static void verifyTableResult(Chunk expected, TableResult actu }); } + public static void verifyAvroTableResult(Chunk expected, TableResult actual) { + List actualList = actual.streamValues().toList(); + + Assertions.assertEquals(expected.size(), actual.getTotalRows()); + Assertions.assertEquals(expected.size(), actualList.size()); + + actualList.forEach(field -> { + boolean containsName = expected.getItems() + .stream() + .map(r -> r.get(TestConstants.NAME)) + .anyMatch(name -> field.get(0).getStringValue().equals(name)); + + boolean containsAge = expected.getItems() + .stream() + .map(r -> r.get(TestConstants.AGE)) + .map(Integer.class::cast) + .map(Integer::longValue) + .anyMatch(age -> age.compareTo(field.get(1).getLongValue()) == 0); + + Assertions.assertTrue(containsName); + Assertions.assertTrue(containsAge); + }); + } + } diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/TestConstants.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/TestConstants.java index 8b4a139..725e655 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/TestConstants.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/TestConstants.java @@ -17,6 +17,9 @@ package org.springframework.batch.extensions.bigquery.common; import com.google.cloud.bigquery.FieldValueList; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.springframework.batch.extensions.bigquery.common.generated.PersonAvroDto; import org.springframework.batch.item.Chunk; import org.springframework.core.convert.converter.Converter; @@ -41,15 +44,39 @@ private TestConstants() { public static final String JSON = "json"; + public static final String PARQUET = "parquet"; + + private static final String PERSON_1_NAME = "Volodymyr"; + + private static final int PERSON_1_AGE = 27; + + private static final String PERSON_2_NAME = "Oleksandra"; + + private static final int PERSON_2_AGE = 26; + public static final Converter PERSON_MAPPER = res -> new PersonDto( res.get(NAME).getStringValue(), res.get(AGE).getNumericValue().intValue()); /** Order must be defined so later executed queries results could be predictable */ - private static final List PERSONS = Stream - .of(new PersonDto("Volodymyr", 27), new PersonDto("Oleksandra", 26)) + private static final List JAVA_RECORD_PERSONS = Stream + .of(new PersonDto(PERSON_1_NAME, PERSON_1_AGE), new PersonDto(PERSON_2_NAME, PERSON_2_AGE)) .sorted(Comparator.comparing(PersonDto::name)) .toList(); - public static final Chunk CHUNK = new Chunk<>(PERSONS); + private static final List AVRO_GENERIC_PERSONS = List.of( + new GenericRecordBuilder(PersonDto.getAvroSchema()).set(NAME, PERSON_1_NAME).set(AGE, PERSON_1_AGE).build(), + new GenericRecordBuilder(PersonDto.getAvroSchema()).set(NAME, PERSON_2_NAME) + .set(AGE, PERSON_2_AGE) + .build()); + + private static final List AVRO_GENERATED_PERSONS = List.of( + PersonAvroDto.newBuilder().setName(PERSON_1_NAME).setAge(PERSON_1_AGE).build(), + PersonAvroDto.newBuilder().setName(PERSON_2_NAME).setAge(PERSON_2_AGE).build()); + + public static final Chunk JAVA_RECORD_CHUNK = new Chunk<>(JAVA_RECORD_PERSONS); + + public static final Chunk AVRO_GENERIC_CHUNK = new Chunk<>(AVRO_GENERIC_PERSONS); + + public static final Chunk AVRO_GENERATED_CHUNK = new Chunk<>(AVRO_GENERATED_PERSONS); } \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/loadjob/csv/EmulatorBigQueryLoadJobCsvItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/loadjob/csv/EmulatorBigQueryLoadJobCsvItemWriterTest.java index 5f9eb8e..70cd424 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/loadjob/csv/EmulatorBigQueryLoadJobCsvItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/loadjob/csv/EmulatorBigQueryLoadJobCsvItemWriterTest.java @@ -55,7 +55,7 @@ void testWrite() throws Exception { writer.write(expectedChunk); - ResultVerifier.verifyTableResult(expectedChunk, + ResultVerifier.verifyJavaRecordTableResult(expectedChunk, bigQuery.listTableData(tableId, BigQuery.TableDataListOption.pageSize(5L))); } diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/loadjob/json/EmulatorBigQueryLoadJobJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/loadjob/json/EmulatorBigQueryLoadJobJsonItemWriterTest.java index d02ba63..51e33a5 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/loadjob/json/EmulatorBigQueryLoadJobJsonItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/loadjob/json/EmulatorBigQueryLoadJobJsonItemWriterTest.java @@ -58,7 +58,7 @@ void testWrite(String table, boolean autodetect) throws Exception { writer.write(expectedChunk); - ResultVerifier.verifyTableResult(expectedChunk, + ResultVerifier.verifyJavaRecordTableResult(expectedChunk, bigQuery.listTableData(tableId, BigQuery.TableDataListOption.pageSize(5L))); } diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/loadjob/parquet/EmulatorBigQueryLoadJobParquetItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/loadjob/parquet/EmulatorBigQueryLoadJobParquetItemWriterTest.java new file mode 100644 index 0000000..d2b8c46 --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/loadjob/parquet/EmulatorBigQueryLoadJobParquetItemWriterTest.java @@ -0,0 +1,58 @@ +package org.springframework.batch.extensions.bigquery.emulator.writer.loadjob.parquet; + +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.WriteChannelConfiguration; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.junit.jupiter.api.Test; +import org.springframework.batch.extensions.bigquery.common.NameUtils; +import org.springframework.batch.extensions.bigquery.common.generated.PersonAvroDto; +import org.springframework.batch.extensions.bigquery.common.PersonDto; +import org.springframework.batch.extensions.bigquery.common.ResultVerifier; +import org.springframework.batch.extensions.bigquery.common.TestConstants; +import org.springframework.batch.extensions.bigquery.emulator.writer.base.EmulatorBaseItemWriterTest; +import org.springframework.batch.extensions.bigquery.writer.loadjob.parquet.BigQueryLoadJobParquetItemWriter; + +class EmulatorBigQueryLoadJobParquetItemWriterTest extends EmulatorBaseItemWriterTest { + + @Test + void testWrite_GenericRecord() throws Exception { + TableId tableId = TableId.of(TestConstants.DATASET, NameUtils.generateTableName(TestConstants.PARQUET)); + + WriteChannelConfiguration config = WriteChannelConfiguration.newBuilder(tableId) + .setFormatOptions(FormatOptions.parquet()) + .setSchema(PersonDto.getBigQuerySchema()) + .build(); + + BigQueryLoadJobParquetItemWriter writer = new BigQueryLoadJobParquetItemWriter(); + writer.setSchema(PersonDto.getAvroSchema()); + writer.setBigQuery(bigQuery); + writer.setWriteChannelConfig(config); + writer.setCodecName(CompressionCodecName.UNCOMPRESSED); + + writer.write(TestConstants.AVRO_GENERIC_CHUNK); + + ResultVerifier.verifyAvroTableResult(TestConstants.AVRO_GENERIC_CHUNK, bigQuery.listTableData(tableId)); + } + + @Test + void testWrite_GeneratedRecord() throws Exception { + TableId tableId = TableId.of(TestConstants.DATASET, NameUtils.generateTableName(TestConstants.PARQUET)); + + WriteChannelConfiguration config = WriteChannelConfiguration.newBuilder(tableId) + .setFormatOptions(FormatOptions.parquet()) + .setSchema(PersonDto.getBigQuerySchema()) + .build(); + + BigQueryLoadJobParquetItemWriter writer = new BigQueryLoadJobParquetItemWriter(); + writer.setSchema(PersonAvroDto.getClassSchema()); + writer.setBigQuery(bigQuery); + writer.setWriteChannelConfig(config); + writer.setCodecName(CompressionCodecName.UNCOMPRESSED); + + writer.write(TestConstants.AVRO_GENERATED_CHUNK); + + ResultVerifier.verifyAvroTableResult(TestConstants.AVRO_GENERATED_CHUNK, bigQuery.listTableData(tableId)); + } + +} diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/writeapi/json/EmulatorBigQueryWriteApiCommitedJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/writeapi/json/EmulatorBigQueryWriteApiCommitedJsonItemWriterTest.java index aa73537..587ec54 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/writeapi/json/EmulatorBigQueryWriteApiCommitedJsonItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/writeapi/json/EmulatorBigQueryWriteApiCommitedJsonItemWriterTest.java @@ -31,7 +31,7 @@ void testWrite() throws Exception { TableDefinition tableDefinition = StandardTableDefinition.of(PersonDto.getBigQuerySchema()); bigQuery.create(TableInfo.of(tableId, tableDefinition)); - Chunk expected = TestConstants.CHUNK; + Chunk expected = TestConstants.JAVA_RECORD_CHUNK; BigQueryWriteApiCommitedJsonItemWriter writer = new BigQueryWriteApiCommitedJsonItemWriter<>(); writer.setBigQueryWriteClient(bigQueryWriteClient); @@ -51,7 +51,7 @@ public void onSuccess(AppendRowsResponse result) { writer.write(expected); - ResultVerifier.verifyTableResult(expected, bigQuery.listTableData(tableId)); + ResultVerifier.verifyJavaRecordTableResult(expected, bigQuery.listTableData(tableId)); Assertions.assertTrue(consumerCalled.get()); } diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/writeapi/json/EmulatorBigQueryWriteApiPendingJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/writeapi/json/EmulatorBigQueryWriteApiPendingJsonItemWriterTest.java index 6effddb..4d0affa 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/writeapi/json/EmulatorBigQueryWriteApiPendingJsonItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/writeapi/json/EmulatorBigQueryWriteApiPendingJsonItemWriterTest.java @@ -31,7 +31,7 @@ void testWrite() throws Exception { TableDefinition tableDefinition = StandardTableDefinition.of(PersonDto.getBigQuerySchema()); bigQuery.create(TableInfo.of(tableId, tableDefinition)); - Chunk expected = TestConstants.CHUNK; + Chunk expected = TestConstants.JAVA_RECORD_CHUNK; BigQueryWriteApiPendingJsonItemWriter writer = new BigQueryWriteApiPendingJsonItemWriter<>(); writer.setBigQueryWriteClient(bigQueryWriteClient); @@ -51,7 +51,7 @@ public void onSuccess(AppendRowsResponse result) { writer.write(expected); - ResultVerifier.verifyTableResult(expected, bigQuery.listTableData(tableId)); + ResultVerifier.verifyJavaRecordTableResult(expected, bigQuery.listTableData(tableId)); Assertions.assertTrue(consumerCalled.get()); } diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/reader/GcloudBigQueryItemReaderTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/reader/GcloudBigQueryItemReaderTest.java index 9cfec45..3320137 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/reader/GcloudBigQueryItemReaderTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/reader/GcloudBigQueryItemReaderTest.java @@ -99,10 +99,10 @@ void testInteractiveQuery() throws Exception { private void verifyResult(BigQueryQueryItemReader reader) throws Exception { PersonDto actualFirstPerson = reader.read(); - PersonDto expectedFirstPerson = TestConstants.CHUNK.getItems().get(0); + PersonDto expectedFirstPerson = TestConstants.JAVA_RECORD_CHUNK.getItems().get(0); PersonDto actualSecondPerson = reader.read(); - PersonDto expectedSecondPerson = TestConstants.CHUNK.getItems().get(1); + PersonDto expectedSecondPerson = TestConstants.JAVA_RECORD_CHUNK.getItems().get(1); PersonDto actualThirdPerson = reader.read(); @@ -133,7 +133,7 @@ private static void loadCsvSample() throws Exception { .build(); writer.afterPropertiesSet(); - writer.write(TestConstants.CHUNK); + writer.write(TestConstants.JAVA_RECORD_CHUNK); job.get().waitFor(); } diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/GcloudBaseBigQueryItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/GcloudBaseBigQueryItemWriterTest.java index 0dbbc41..42a6473 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/GcloudBaseBigQueryItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/GcloudBaseBigQueryItemWriterTest.java @@ -36,7 +36,7 @@ protected void verifyResults(String tableName) { Assertions.assertNotNull(dataset.getDatasetId()); Assertions.assertNotNull(tableId); - ResultVerifier.verifyTableResult(TestConstants.CHUNK, tableResult); + ResultVerifier.verifyJavaRecordTableResult(TestConstants.JAVA_RECORD_CHUNK, tableResult); } } \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/loadjob/csv/GcloudBigQueryLoadJobCsvItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/loadjob/csv/GcloudBigQueryLoadJobCsvItemWriterTest.java index 209a6e8..70c7b04 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/loadjob/csv/GcloudBigQueryLoadJobCsvItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/loadjob/csv/GcloudBigQueryLoadJobCsvItemWriterTest.java @@ -79,7 +79,7 @@ void testWriteCsv(String tableName, boolean autodetect) throws Exception { .build(); writer.afterPropertiesSet(); - writer.write(TestConstants.CHUNK); + writer.write(TestConstants.JAVA_RECORD_CHUNK); job.get().waitFor(); verifyResults(tableName); diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/loadjob/json/GcloudBigQueryLoadJobJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/loadjob/json/GcloudBigQueryLoadJobJsonItemWriterTest.java index a7d1d7c..7aeec73 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/loadjob/json/GcloudBigQueryLoadJobJsonItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/loadjob/json/GcloudBigQueryLoadJobJsonItemWriterTest.java @@ -79,7 +79,7 @@ void testWrite(String tableName, boolean autodetect) throws Exception { .build(); writer.afterPropertiesSet(); - writer.write(TestConstants.CHUNK); + writer.write(TestConstants.JAVA_RECORD_CHUNK); job.get().waitFor(); verifyResults(tableName); diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/BigQueryItemReaderTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/BigQueryItemReaderTest.java index f573ea2..7435fa6 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/BigQueryItemReaderTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/BigQueryItemReaderTest.java @@ -88,7 +88,7 @@ void testSetJobConfiguration() throws IllegalAccessException, NoSuchFieldExcepti @Test void testRead() throws Exception { BigQuery bigQuery = prepareMockedBigQuery(); - List items = TestConstants.CHUNK.getItems(); + List items = TestConstants.JAVA_RECORD_CHUNK.getItems(); Field name = Field.of(TestConstants.NAME, StandardSQLTypeName.STRING); Field age = Field.of(TestConstants.AGE, StandardSQLTypeName.INT64); diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/builder/RecordMapperTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/builder/RecordMapperTest.java index 199845e..ee548a2 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/builder/RecordMapperTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/builder/RecordMapperTest.java @@ -34,7 +34,7 @@ class RecordMapperTest { @Test void testGenerateMapper() { RecordMapper mapper = new RecordMapper<>(); - List expected = TestConstants.CHUNK.getItems(); + List expected = TestConstants.JAVA_RECORD_CHUNK.getItems(); Field name = Field.of(TestConstants.NAME, StandardSQLTypeName.STRING); Field age = Field.of(TestConstants.AGE, StandardSQLTypeName.INT64); diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/BigQueryLoadJobBaseItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/BigQueryLoadJobBaseItemWriterTest.java index de70e3f..fa6b831 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/BigQueryLoadJobBaseItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/BigQueryLoadJobBaseItemWriterTest.java @@ -136,7 +136,7 @@ void testWrite() throws Exception { writer.setJobConsumer(j -> consumerCalled.set(true)); writer.setWriteChannelConfig(WriteChannelConfiguration.of(TABLE_ID)); - writer.write(TestConstants.CHUNK); + writer.write(TestConstants.JAVA_RECORD_CHUNK); AtomicLong actual = (AtomicLong) handle .findVarHandle(BigQueryLoadJobBaseItemWriter.class, "bigQueryWriteCounter", AtomicLong.class) @@ -173,7 +173,7 @@ void testWrite_Exception() throws Exception { writer.setWriteChannelConfig(WriteChannelConfiguration.of(TABLE_ID)); BigQueryItemWriterException actual = Assertions.assertThrows(BigQueryItemWriterException.class, - () -> writer.write(TestConstants.CHUNK)); + () -> writer.write(TestConstants.JAVA_RECORD_CHUNK)); Assertions.assertEquals("Error on write happened", actual.getMessage()); AtomicLong actualCounter = (AtomicLong) handle @@ -305,10 +305,6 @@ void testTableHasDefinedSchema() { private static final class TestWriter extends BigQueryLoadJobBaseItemWriter { - @Override - protected void doInitializeProperties(List items) { - } - @Override protected List convertObjectsToByteArrays(List items) { return items.stream().map(Objects::toString).map(String::getBytes).toList(); diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/csv/BigQueryLoadJobCsvItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/csv/BigQueryLoadJobCsvItemWriterTest.java index 2ac4eb5..40aa176 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/csv/BigQueryLoadJobCsvItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/csv/BigQueryLoadJobCsvItemWriterTest.java @@ -53,7 +53,7 @@ class BigQueryLoadJobCsvItemWriterTest extends AbstractBigQueryTest { @Test void testDoInitializeProperties() throws IllegalAccessException, NoSuchFieldException { TestWriter writer = new TestWriter(); - List items = TestConstants.CHUNK.getItems(); + List items = TestConstants.JAVA_RECORD_CHUNK.getItems(); MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryLoadJobCsvItemWriter.class, MethodHandles.lookup()); @@ -92,7 +92,7 @@ void testSetRowMapper() throws IllegalAccessException, NoSuchFieldException { @Test void testConvertObjectsToByteArrays() { TestWriter writer = new TestWriter(); - List items = TestConstants.CHUNK.getItems(); + List items = TestConstants.JAVA_RECORD_CHUNK.getItems(); // Empty Assertions.assertTrue(writer.testConvert(List.of()).isEmpty()); diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/json/BigQueryLoadJobJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/json/BigQueryLoadJobJsonItemWriterTest.java index eaf53b2..3b9eca8 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/json/BigQueryLoadJobJsonItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/json/BigQueryLoadJobJsonItemWriterTest.java @@ -72,9 +72,9 @@ void testConvertObjectsToByteArrays() { // Not empty writer.setMarshaller(Record::toString); - List actual = writer.testConvert(TestConstants.CHUNK.getItems()); + List actual = writer.testConvert(TestConstants.JAVA_RECORD_CHUNK.getItems()); - List expected = TestConstants.CHUNK.getItems() + List expected = TestConstants.JAVA_RECORD_CHUNK.getItems() .stream() .map(PersonDto::toString) .map(s -> s.concat("\n")) @@ -157,11 +157,11 @@ static Stream invalidFormats() { private static final class TestWriter extends BigQueryLoadJobJsonItemWriter { - public List testConvert(List items) { + List testConvert(List items) { return convertObjectsToByteArrays(items); } - public void testPerformFormatSpecificChecks() { + void testPerformFormatSpecificChecks() { performFormatSpecificChecks(); } diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/parquet/BigQueryLoadJobParquetItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/parquet/BigQueryLoadJobParquetItemWriterTest.java new file mode 100644 index 0000000..dc6c4a5 --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/parquet/BigQueryLoadJobParquetItemWriterTest.java @@ -0,0 +1,153 @@ +package org.springframework.batch.extensions.bigquery.unit.writer.loadjob.parquet; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.LocalOutputFile; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.springframework.batch.extensions.bigquery.common.generated.PersonAvroDto; +import org.springframework.batch.extensions.bigquery.common.PersonDto; +import org.springframework.batch.extensions.bigquery.common.TestConstants; +import org.springframework.batch.extensions.bigquery.writer.loadjob.parquet.BigQueryLoadJobParquetItemWriter; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +class BigQueryLoadJobParquetItemWriterTest { + + @Test + void testSetSchema() throws IllegalAccessException, NoSuchFieldException { + BigQueryLoadJobParquetItemWriter reader = new BigQueryLoadJobParquetItemWriter(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryLoadJobParquetItemWriter.class, + MethodHandles.lookup()); + Schema expected = PersonDto.getAvroSchema(); + + reader.setSchema(expected); + + Schema actual = (Schema) handle.findVarHandle(BigQueryLoadJobParquetItemWriter.class, "schema", Schema.class) + .get(reader); + + Assertions.assertEquals(expected, actual); + + } + + @Test + void testSetCodecName() throws IllegalAccessException, NoSuchFieldException { + BigQueryLoadJobParquetItemWriter reader = new BigQueryLoadJobParquetItemWriter(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryLoadJobParquetItemWriter.class, + MethodHandles.lookup()); + CompressionCodecName expected = CompressionCodecName.GZIP; + + reader.setCodecName(expected); + + CompressionCodecName actual = (CompressionCodecName) handle + .findVarHandle(BigQueryLoadJobParquetItemWriter.class, "codecName", CompressionCodecName.class) + .get(reader); + + Assertions.assertEquals(expected, actual); + + } + + @Test + void testConvertObjectsToByteArrays_GenericRecord() { + TestWriter writer = new TestWriter(); + writer.setSchema(PersonDto.getAvroSchema()); + writer.setCodecName(CompressionCodecName.UNCOMPRESSED); + + // Empty + Assertions.assertTrue(writer.testConvertObjectsToByteArrays(List.of()).isEmpty()); + + // Not empty + List actual = writer.testConvertObjectsToByteArrays(TestConstants.AVRO_GENERIC_CHUNK.getItems()); + List expected = convert(TestConstants.AVRO_GENERIC_CHUNK.getItems()); + + for (int i = 0; i < expected.size(); i++) { + Assertions.assertArrayEquals(expected.get(i), actual.get(i)); + } + } + + @Test + void testConvertObjectsToByteArrays_GeneratedRecord() { + TestWriter writer = new TestWriter(); + writer.setSchema(PersonAvroDto.getClassSchema()); + writer.setCodecName(CompressionCodecName.UNCOMPRESSED); + + // Empty + Assertions.assertTrue(writer.testConvertObjectsToByteArrays(List.of()).isEmpty()); + + // Not empty + List actual = writer.testConvertObjectsToByteArrays(TestConstants.AVRO_GENERATED_CHUNK.getItems()); + List expected = convert(TestConstants.AVRO_GENERATED_CHUNK.getItems()); + + for (int i = 0; i < expected.size(); i++) { + Assertions.assertArrayEquals(expected.get(i), actual.get(i)); + } + } + + @Test + void testPerformFormatSpecificChecks() { + TestWriter writer = new TestWriter(); + + // Schema + IllegalArgumentException actual = Assertions.assertThrows(IllegalArgumentException.class, + writer::testPerformFormatSpecificChecks); + Assertions.assertEquals("Schema must be provided", actual.getMessage()); + + // Codec + writer.setSchema(PersonDto.getAvroSchema()); + actual = Assertions.assertThrows(IllegalArgumentException.class, writer::testPerformFormatSpecificChecks); + Assertions.assertEquals("Codec must be provided", actual.getMessage()); + } + + private List convert(List items) { + Path tempFile = null; + try { + tempFile = Files.createTempFile("test-", null); + + final ParquetWriter writer = AvroParquetWriter + .builder(new LocalOutputFile(tempFile)) + .withSchema(items.get(0).getSchema()) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withCompressionCodec(CompressionCodecName.UNCOMPRESSED) + .build(); + + try (writer) { + for (final GenericRecord item : items) { + writer.write(item); + } + } + return List.of(Files.readAllBytes(tempFile)); + } + catch (IOException e) { + return List.of(); + } + finally { + try { + Files.deleteIfExists(tempFile); + } + catch (IOException e) { + // Ignored + } + } + } + + private static final class TestWriter extends BigQueryLoadJobParquetItemWriter { + + void testPerformFormatSpecificChecks() { + performFormatSpecificChecks(); + } + + List testConvertObjectsToByteArrays(List list) { + return convertObjectsToByteArrays(list); + } + + } + +} diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/parquet/builder/BigQueryLoadJobParquetItemWriterBuilderTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/parquet/builder/BigQueryLoadJobParquetItemWriterBuilderTest.java new file mode 100644 index 0000000..7e4910b --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/parquet/builder/BigQueryLoadJobParquetItemWriterBuilderTest.java @@ -0,0 +1,103 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.unit.writer.loadjob.parquet.builder; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.WriteChannelConfiguration; +import org.apache.avro.Schema; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.springframework.batch.extensions.bigquery.common.PersonDto; +import org.springframework.batch.extensions.bigquery.common.TestConstants; +import org.springframework.batch.extensions.bigquery.unit.base.AbstractBigQueryTest; +import org.springframework.batch.extensions.bigquery.writer.loadjob.BigQueryLoadJobBaseItemWriter; +import org.springframework.batch.extensions.bigquery.writer.loadjob.parquet.BigQueryLoadJobParquetItemWriter; +import org.springframework.batch.extensions.bigquery.writer.loadjob.parquet.builder.BigQueryLoadJobParquetItemWriterBuilder; + +import java.lang.invoke.MethodHandles; +import java.util.function.Consumer; + +class BigQueryLoadJobParquetItemWriterBuilderTest extends AbstractBigQueryTest { + + @Test + void testBuild() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup parquetWriterHandle = MethodHandles.privateLookupIn(BigQueryLoadJobParquetItemWriter.class, + MethodHandles.lookup()); + MethodHandles.Lookup baseWriterHandle = MethodHandles.privateLookupIn(BigQueryLoadJobBaseItemWriter.class, + MethodHandles.lookup()); + + DatasetInfo datasetInfo = DatasetInfo.newBuilder(TestConstants.DATASET).setLocation("europe-west-2").build(); + Consumer jobConsumer = job -> { + }; + BigQuery mockedBigQuery = prepareMockedBigQuery(); + Schema schema = PersonDto.getAvroSchema(); + CompressionCodecName codecName = CompressionCodecName.BROTLI; + + WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration + .newBuilder(TableId.of(datasetInfo.getDatasetId().getDataset(), TestConstants.PARQUET)) + .setFormatOptions(FormatOptions.parquet()) + .build(); + + BigQueryLoadJobParquetItemWriter writer = new BigQueryLoadJobParquetItemWriterBuilder().schema(schema) + .codecName(codecName) + .writeChannelConfig(writeConfiguration) + .jobConsumer(jobConsumer) + .bigQuery(mockedBigQuery) + .datasetInfo(datasetInfo) + .build(); + + Assertions.assertNotNull(writer); + + Schema actualSchema = (Schema) parquetWriterHandle + .findVarHandle(BigQueryLoadJobParquetItemWriter.class, "schema", Schema.class) + .get(writer); + + CompressionCodecName actualCodecName = (CompressionCodecName) parquetWriterHandle + .findVarHandle(BigQueryLoadJobParquetItemWriter.class, "codecName", CompressionCodecName.class) + .get(writer); + + WriteChannelConfiguration actualWriteChannelConfig = (WriteChannelConfiguration) parquetWriterHandle + .findVarHandle(BigQueryLoadJobParquetItemWriter.class, "writeChannelConfig", + WriteChannelConfiguration.class) + .get(writer); + + Consumer actualJobConsumer = (Consumer) baseWriterHandle + .findVarHandle(BigQueryLoadJobBaseItemWriter.class, "jobConsumer", Consumer.class) + .get(writer); + + BigQuery actualBigQuery = (BigQuery) baseWriterHandle + .findVarHandle(BigQueryLoadJobBaseItemWriter.class, "bigQuery", BigQuery.class) + .get(writer); + + DatasetInfo actualDatasetInfo = (DatasetInfo) baseWriterHandle + .findVarHandle(BigQueryLoadJobParquetItemWriter.class, "datasetInfo", DatasetInfo.class) + .get(writer); + + Assertions.assertEquals(schema, actualSchema); + Assertions.assertEquals(codecName, actualCodecName); + Assertions.assertEquals(writeConfiguration, actualWriteChannelConfig); + Assertions.assertEquals(jobConsumer, actualJobConsumer); + Assertions.assertEquals(mockedBigQuery, actualBigQuery); + Assertions.assertEquals(datasetInfo, actualDatasetInfo); + } + +} \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/BigQueryWriteApiCommitedJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/BigQueryWriteApiCommitedJsonItemWriterTest.java index aaf49b0..b770a82 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/BigQueryWriteApiCommitedJsonItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/BigQueryWriteApiCommitedJsonItemWriterTest.java @@ -46,7 +46,7 @@ void testWrite_Empty() throws Exception { @Test void testWrite_Exception() { BigQueryItemWriterException ex = Assertions.assertThrows(BigQueryItemWriterException.class, - () -> new BigQueryWriteApiCommitedJsonItemWriter<>().write(TestConstants.CHUNK)); + () -> new BigQueryWriteApiCommitedJsonItemWriter<>().write(TestConstants.JAVA_RECORD_CHUNK)); Assertions.assertEquals("Error on write happened", ex.getMessage()); } @@ -80,7 +80,7 @@ void testWrite() throws Exception { writer.setBigQueryWriteClient(writeClient); writer.setMarshaller(new JacksonJsonObjectMarshaller<>()); - writer.write(TestConstants.CHUNK); + writer.write(TestConstants.JAVA_RECORD_CHUNK); Mockito.verify(writeClient).createWriteStream(streamRequest); Mockito.verify(writeClient).finalizeWriteStream(streamName.toString()); diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/BigQueryWriteApiPendingJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/BigQueryWriteApiPendingJsonItemWriterTest.java index b8baed3..7b36787 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/BigQueryWriteApiPendingJsonItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/BigQueryWriteApiPendingJsonItemWriterTest.java @@ -38,7 +38,7 @@ void testWrite_Empty() throws Exception { @Test void testWrite_Exception() { BigQueryItemWriterException ex = Assertions.assertThrows(BigQueryItemWriterException.class, - () -> new BigQueryWriteApiPendingJsonItemWriter<>().write(TestConstants.CHUNK)); + () -> new BigQueryWriteApiPendingJsonItemWriter<>().write(TestConstants.JAVA_RECORD_CHUNK)); Assertions.assertEquals("Error on write happened", ex.getMessage()); } @@ -78,7 +78,7 @@ void testWrite() throws Exception { writer.setBigQueryWriteClient(writeClient); writer.setMarshaller(new JacksonJsonObjectMarshaller<>()); - writer.write(TestConstants.CHUNK); + writer.write(TestConstants.JAVA_RECORD_CHUNK); Mockito.verify(writeClient).createWriteStream(streamRequest); Mockito.verify(writeClient).finalizeWriteStream(streamName.toString()); diff --git a/spring-batch-bigquery/src/test/resources/persons.avsc b/spring-batch-bigquery/src/test/resources/persons.avsc new file mode 100644 index 0000000..b1faa70 --- /dev/null +++ b/spring-batch-bigquery/src/test/resources/persons.avsc @@ -0,0 +1,15 @@ +{ + "type": "record", + "name": "PersonAvroDto", + "namespace": "org.springframework.batch.extensions.bigquery.common.generated", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "age", + "type": "int" + } + ] +} \ No newline at end of file