* Which means that result is coming asynchronously.
*
* Also, worth mentioning that you should take into account concurrency limits.
@@ -41,71 +43,76 @@
* @param your DTO type
* @author Volodymyr Perebykivskyi
* @since 0.2.0
- * @see Interactive queries
- * @see Batch queries
- * @see Concurrency limits
+ * @see Interactive
+ * queries
+ * @see Batch
+ * queries
+ * @see Concurrency
+ * limits
*/
public class BigQueryQueryItemReader implements ItemReader, InitializingBean {
- private final Log logger = LogFactory.getLog(getClass());
-
- private BigQuery bigQuery;
- private Converter rowMapper;
- private QueryJobConfiguration jobConfiguration;
- private Iterator iterator;
-
- /**
- * BigQuery service, responsible for API calls.
- *
- * @param bigQuery BigQuery service
- */
- public void setBigQuery(BigQuery bigQuery) {
- this.bigQuery = bigQuery;
- }
-
- /**
- * Row mapper which transforms single BigQuery row into a desired type.
- *
- * @param rowMapper your row mapper
- */
- public void setRowMapper(Converter rowMapper) {
- this.rowMapper = rowMapper;
- }
-
- /**
- * Specifies query to run, destination table, etc.
- *
- * @param jobConfiguration BigQuery job configuration
- */
- public void setJobConfiguration(QueryJobConfiguration jobConfiguration) {
- this.jobConfiguration = jobConfiguration;
- }
-
- @Override
- public T read() throws Exception {
- if (iterator == null) {
- doOpen();
- }
-
- if (logger.isDebugEnabled()) {
- logger.debug("Reading next element");
- }
-
- return iterator.hasNext() ? rowMapper.convert(iterator.next()) : null;
- }
-
- private void doOpen() throws Exception {
- if (logger.isDebugEnabled()) {
- logger.debug("Executing query");
- }
- iterator = bigQuery.query(jobConfiguration).getValues().iterator();
- }
-
- @Override
- public void afterPropertiesSet() {
- Assert.notNull(this.bigQuery, "BigQuery service must be provided");
- Assert.notNull(this.rowMapper, "Row mapper must be provided");
- Assert.notNull(this.jobConfiguration, "Job configuration must be provided");
- }
+ private final Log logger = LogFactory.getLog(getClass());
+
+ private BigQuery bigQuery;
+
+ private Converter rowMapper;
+
+ private QueryJobConfiguration jobConfiguration;
+
+ private Iterator iterator;
+
+ /**
+ * BigQuery service, responsible for API calls.
+ * @param bigQuery BigQuery service
+ */
+ public void setBigQuery(final BigQuery bigQuery) {
+ this.bigQuery = bigQuery;
+ }
+
+ /**
+ * Row mapper which transforms single BigQuery row into a desired type.
+ * @param rowMapper your row mapper
+ */
+ public void setRowMapper(final Converter rowMapper) {
+ this.rowMapper = rowMapper;
+ }
+
+ /**
+ * Specifies query to run, destination table, etc.
+ * @param jobConfiguration BigQuery job configuration
+ */
+ public void setJobConfiguration(final QueryJobConfiguration jobConfiguration) {
+ this.jobConfiguration = jobConfiguration;
+ }
+
+ @Override
+ public T read() throws Exception {
+ if (iterator == null) {
+ doOpen();
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Reading next element");
+ }
+
+ return iterator.hasNext() ? rowMapper.convert(iterator.next()) : null;
+ }
+
+ private void doOpen() throws Exception {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Executing query");
+ }
+ iterator = bigQuery.query(jobConfiguration).getValues().iterator();
+ }
+
+ @Override
+ public void afterPropertiesSet() {
+ Assert.notNull(this.bigQuery, "BigQuery service must be provided");
+ Assert.notNull(this.rowMapper, "Row mapper must be provided");
+ Assert.notNull(this.jobConfiguration, "Job configuration must be provided");
+ }
}
\ No newline at end of file
diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/builder/BigQueryQueryItemReaderBuilder.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/builder/BigQueryQueryItemReaderBuilder.java
index 76c7deb..50f4d52 100644
--- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/builder/BigQueryQueryItemReaderBuilder.java
+++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/builder/BigQueryQueryItemReaderBuilder.java
@@ -31,107 +31,109 @@
* @param your DTO type
* @author Volodymyr Perebykivskyi
* @since 0.2.0
- * @see Examples
- * @see Examples
+ * @see Examples
+ * @see Examples
*/
public class BigQueryQueryItemReaderBuilder {
- private BigQuery bigQuery;
- private String query;
- private Converter rowMapper;
- private QueryJobConfiguration jobConfiguration;
- private Class targetType;
-
- /**
- * BigQuery service, responsible for API calls.
- *
- * @param bigQuery BigQuery service
- * @return {@link BigQueryQueryItemReaderBuilder}
- * @see BigQueryQueryItemReader#setBigQuery(BigQuery)
- */
- public BigQueryQueryItemReaderBuilder bigQuery(final BigQuery bigQuery) {
- this.bigQuery = bigQuery;
- return this;
- }
-
- /**
- * Schema of the query: {@code SELECT FROM .
}.
- *
- * It is really recommended to use {@code LIMIT n}
- * because BigQuery charges you for the amount of data that is being processed.
- *
- * @param query your query to run
- * @return {@link BigQueryQueryItemReaderBuilder}
- * @see BigQueryQueryItemReader#setJobConfiguration(QueryJobConfiguration)
- */
- public BigQueryQueryItemReaderBuilder query(final String query) {
- this.query = query;
- return this;
- }
-
- /**
- * Row mapper which transforms single BigQuery row into a desired type.
- *
- * @param rowMapper your row mapper
- * @return {@link BigQueryQueryItemReaderBuilder}
- * @see BigQueryQueryItemReader#setRowMapper(Converter)
- */
- public BigQueryQueryItemReaderBuilder rowMapper(final Converter rowMapper) {
- this.rowMapper = rowMapper;
- return this;
- }
-
- /**
- * Specifies query to run, destination table, etc.
- *
- * @param jobConfiguration BigQuery job configuration
- * @return {@link BigQueryQueryItemReaderBuilder}
- * @see BigQueryQueryItemReader#setJobConfiguration(QueryJobConfiguration)
- */
- public BigQueryQueryItemReaderBuilder jobConfiguration(final QueryJobConfiguration jobConfiguration) {
- this.jobConfiguration = jobConfiguration;
- return this;
- }
-
- /**
- * Specifies a target type which will be used as a result.
- * Only needed when {@link BigQueryQueryItemReaderBuilder#rowMapper} is not provided.
- * Take into account that only {@link Class#isRecord()} supported.
- *
- * @param targetType a {@link Class} that represent desired type
- * @return {@link BigQueryQueryItemReaderBuilder}
- */
- public BigQueryQueryItemReaderBuilder targetType(final Class targetType) {
- this.targetType = targetType;
- return this;
- }
-
- /**
- * Please remember about {@link BigQueryQueryItemReader#afterPropertiesSet()}.
- *
- * @return {@link BigQueryQueryItemReader}
- */
- public BigQueryQueryItemReader build() {
- final BigQueryQueryItemReader reader = new BigQueryQueryItemReader<>();
-
- reader.setBigQuery(this.bigQuery == null ? BigQueryOptions.getDefaultInstance().getService() : this.bigQuery);
-
- if (this.rowMapper == null) {
- Assert.notNull(this.targetType, "No target type provided");
- Assert.isTrue(this.targetType.isRecord(), "Only Java record supported");
- reader.setRowMapper(new RecordMapper().generateMapper(this.targetType));
- } else {
- reader.setRowMapper(this.rowMapper);
- }
-
- if (this.jobConfiguration == null) {
- Assert.isTrue(StringUtils.hasText(this.query), "No query provided");
- reader.setJobConfiguration(QueryJobConfiguration.newBuilder(this.query).build());
- } else {
- reader.setJobConfiguration(this.jobConfiguration);
- }
-
- return reader;
- }
+ private BigQuery bigQuery;
+
+ private String query;
+
+ private Converter rowMapper;
+
+ private QueryJobConfiguration jobConfiguration;
+
+ private Class targetType;
+
+ /**
+ * BigQuery service, responsible for API calls.
+ * @param bigQuery BigQuery service
+ * @return {@link BigQueryQueryItemReaderBuilder}
+ * @see BigQueryQueryItemReader#setBigQuery(BigQuery)
+ */
+ public BigQueryQueryItemReaderBuilder bigQuery(final BigQuery bigQuery) {
+ this.bigQuery = bigQuery;
+ return this;
+ }
+
+ /**
+ * Schema of the query: {@code SELECT column FROM dataset.table}.
+ *
+ * It is really recommended to use {@code LIMIT n} because BigQuery charges you for
+ * the amount of data that is being processed.
+ * @param query your query to run
+ * @return {@link BigQueryQueryItemReaderBuilder}
+ * @see BigQueryQueryItemReader#setJobConfiguration(QueryJobConfiguration)
+ */
+ public BigQueryQueryItemReaderBuilder query(final String query) {
+ this.query = query;
+ return this;
+ }
+
+ /**
+ * Row mapper which transforms single BigQuery row into a desired type.
+ * @param rowMapper your row mapper
+ * @return {@link BigQueryQueryItemReaderBuilder}
+ * @see BigQueryQueryItemReader#setRowMapper(Converter)
+ */
+ public BigQueryQueryItemReaderBuilder rowMapper(final Converter rowMapper) {
+ this.rowMapper = rowMapper;
+ return this;
+ }
+
+ /**
+ * Specifies query to run, destination table, etc.
+ * @param jobConfiguration BigQuery job configuration
+ * @return {@link BigQueryQueryItemReaderBuilder}
+ * @see BigQueryQueryItemReader#setJobConfiguration(QueryJobConfiguration)
+ */
+ public BigQueryQueryItemReaderBuilder jobConfiguration(final QueryJobConfiguration jobConfiguration) {
+ this.jobConfiguration = jobConfiguration;
+ return this;
+ }
+
+ /**
+ * Specifies a target type which will be used as a result. Only needed when
+ * {@link BigQueryQueryItemReaderBuilder#rowMapper} is not provided. Take into account
+ * that only {@link Class#isRecord()} supported.
+ * @param targetType a {@link Class} that represent desired type
+ * @return {@link BigQueryQueryItemReaderBuilder}
+ */
+ public BigQueryQueryItemReaderBuilder targetType(final Class targetType) {
+ this.targetType = targetType;
+ return this;
+ }
+
+ /**
+ * Please remember about {@link BigQueryQueryItemReader#afterPropertiesSet()}.
+ * @return {@link BigQueryQueryItemReader}
+ */
+ public BigQueryQueryItemReader build() {
+ final BigQueryQueryItemReader reader = new BigQueryQueryItemReader<>();
+
+ reader.setBigQuery(this.bigQuery == null ? BigQueryOptions.getDefaultInstance().getService() : this.bigQuery);
+
+ if (this.rowMapper == null) {
+ Assert.notNull(this.targetType, "No target type provided");
+ Assert.isTrue(this.targetType.isRecord(), "Only Java record supported");
+ reader.setRowMapper(new RecordMapper().generateMapper(this.targetType));
+ }
+ else {
+ reader.setRowMapper(this.rowMapper);
+ }
+
+ if (this.jobConfiguration == null) {
+ Assert.isTrue(StringUtils.hasText(this.query), "No query provided");
+ reader.setJobConfiguration(QueryJobConfiguration.newBuilder(this.query).build());
+ }
+ else {
+ reader.setJobConfiguration(this.jobConfiguration);
+ }
+
+ return reader;
+ }
}
\ No newline at end of file
diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/builder/RecordMapper.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/builder/RecordMapper.java
index eaabfeb..1b3852a 100644
--- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/builder/RecordMapper.java
+++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/builder/RecordMapper.java
@@ -33,32 +33,33 @@
*/
public final class RecordMapper {
- private final SimpleTypeConverter simpleConverter = new SimpleTypeConverter();
+ private final SimpleTypeConverter simpleConverter = new SimpleTypeConverter();
- /**
- * Generates a conversion from BigQuery response to a Java record.
- *
- * @param targetType a Java record {@link Class}
- * @return {@link Converter}
- * @see org.springframework.batch.item.file.mapping.RecordFieldSetMapper
- */
- public Converter generateMapper(Class targetType) {
- Constructor constructor = BeanUtils.getResolvableConstructor(targetType);
- Assert.isTrue(constructor.getParameterCount() > 0, "Record without fields is redundant");
+ /**
+ * Generates a conversion from BigQuery response to a Java record.
+ * @param targetType a Java record {@link Class}
+ * @return {@link Converter}
+ * @see org.springframework.batch.item.file.mapping.RecordFieldSetMapper
+ */
+ public Converter generateMapper(Class targetType) {
+ Constructor constructor = BeanUtils.getResolvableConstructor(targetType);
+ Assert.isTrue(constructor.getParameterCount() > 0, "Record without fields is redundant");
- String[] parameterNames = BeanUtils.getParameterNames(constructor);
- Class>[] parameterTypes = constructor.getParameterTypes();
+ String[] parameterNames = BeanUtils.getParameterNames(constructor);
+ Class>[] parameterTypes = constructor.getParameterTypes();
- Object[] args = new Object[parameterNames.length];
+ Object[] args = new Object[parameterNames.length];
- return source -> {
- if (args[0] == null) {
- for (int i = 0; i < args.length; i++) {
- args[i] = simpleConverter.convertIfNecessary(source.get(parameterNames[i]).getValue(), parameterTypes[i]);
- }
- }
+ return source -> {
+ if (args[0] == null) {
+ for (int i = 0; i < args.length; i++) {
+ args[i] = simpleConverter.convertIfNecessary(source.get(parameterNames[i]).getValue(),
+ parameterTypes[i]);
+ }
+ }
+
+ return BeanUtils.instantiateClass(constructor, args);
+ };
+ }
- return BeanUtils.instantiateClass(constructor, args);
- };
- }
}
diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryItemWriterException.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryItemWriterException.java
index baa5429..da6b2bb 100644
--- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryItemWriterException.java
+++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryItemWriterException.java
@@ -21,26 +21,30 @@
import org.springframework.batch.item.ItemWriterException;
/**
- * Unchecked {@link Exception} indicating that an error has occurred on during {@link ItemWriter#write(Chunk)}.
+ * Unchecked {@link Exception} indicating that an error has occurred on during
+ * {@link ItemWriter#write(Chunk)}.
+ *
* @author Volodymyr Perebykivskyi
* @since 0.2.0
*/
public class BigQueryItemWriterException extends ItemWriterException {
- /**
- * Create a new {@link BigQueryItemWriterException} based on a message and another {@link Exception}.
- * @param message the message for this {@link Exception}
- * @param cause the other {@link Exception}
- */
- public BigQueryItemWriterException(String message, Throwable cause) {
- super(message, cause);
- }
+ /**
+ * Create a new {@link BigQueryItemWriterException} based on a message and another
+ * {@link Exception}.
+ * @param message the message for this {@link Exception}
+ * @param cause the other {@link Exception}
+ */
+ public BigQueryItemWriterException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Create a new {@link BigQueryItemWriterException} based on a message.
+ * @param message the message for this {@link Exception}
+ */
+ public BigQueryItemWriterException(String message) {
+ super(message);
+ }
- /**
- * Create a new {@link BigQueryItemWriterException} based on a message.
- * @param message the message for this {@link Exception}
- */
- public BigQueryItemWriterException(String message) {
- super(message);
- }
}
\ No newline at end of file
diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/BigQueryLoadJobBaseItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/BigQueryLoadJobBaseItemWriter.java
index 583ca3b..d8b6573 100644
--- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/BigQueryLoadJobBaseItemWriter.java
+++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/BigQueryLoadJobBaseItemWriter.java
@@ -43,254 +43,252 @@
*/
public abstract class BigQueryLoadJobBaseItemWriter implements ItemWriter, InitializingBean {
- /** Logger that can be reused */
- protected final Log logger = LogFactory.getLog(getClass());
-
- private final AtomicLong bigQueryWriteCounter = new AtomicLong();
-
- /**
- * Describes what should be written (format) and its destination (table).
- */
- protected WriteChannelConfiguration writeChannelConfig;
-
- /**
- * You can specify here some specific dataset configuration, like location.
- * This dataset will be created.
- */
- private DatasetInfo datasetInfo;
-
- /**
- * Your custom logic with {@link Job}.
- * {@link Job} will be assigned after {@link TableDataWriteChannel#close()}.
- */
- private Consumer jobConsumer;
-
- private BigQuery bigQuery;
-
- private boolean writeFailed;
-
- /**
- * Fetches table from the provided configuration.
- *
- * @return {@link Table} that is described in {@link BigQueryLoadJobBaseItemWriter#writeChannelConfig}
- */
- protected Table getTable() {
- return this.bigQuery.getTable(this.writeChannelConfig.getDestinationTable());
- }
-
- /**
- * Provides additional information about the {@link com.google.cloud.bigquery.Dataset}.
- *
- * @param datasetInfo BigQuery dataset info
- */
- public void setDatasetInfo(final DatasetInfo datasetInfo) {
- this.datasetInfo = datasetInfo;
- }
-
- /**
- * Callback when {@link Job} will be finished.
- *
- * @param consumer your consumer
- */
- public void setJobConsumer(final Consumer consumer) {
- this.jobConsumer = consumer;
- }
-
- /**
- * Describes what should be written (format) and its destination (table).
- *
- * @param writeChannelConfig BigQuery channel configuration
- */
- public void setWriteChannelConfig(final WriteChannelConfiguration writeChannelConfig) {
- this.writeChannelConfig = writeChannelConfig;
- }
-
- /**
- * BigQuery service, responsible for API calls.
- *
- * @param bigQuery BigQuery service
- */
- public void setBigQuery(final BigQuery bigQuery) {
- this.bigQuery = bigQuery;
- }
-
- @Override
- public void write(final Chunk extends T> chunk) throws Exception {
- if (!chunk.isEmpty()) {
- final List extends T> items = chunk.getItems();
- doInitializeProperties(items);
-
- if (logger.isDebugEnabled()) {
- logger.debug(String.format("Mapping %d elements", items.size()));
- }
-
- doWriteDataToBigQuery(mapDataToBigQueryFormat(items));
- }
- }
-
- private ByteBuffer mapDataToBigQueryFormat(final List extends T> items) throws IOException {
- final ByteBuffer byteBuffer;
- try (final ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
-
- final List data = convertObjectsToByteArrays(items);
-
- for (byte[] byteArray : data) {
- outputStream.write(byteArray);
- }
-
- /*
- * It is extremely important to create larger ByteBuffer.
- * If you call TableDataWriteChannel too many times, it leads to BigQuery exceptions.
- */
- byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
- }
- return byteBuffer;
- }
-
- private void doWriteDataToBigQuery(final ByteBuffer byteBuffer) {
- if (logger.isDebugEnabled()) {
- logger.debug("Writing data to BigQuery");
- }
-
- TableDataWriteChannel writeChannel = null;
-
- try (final TableDataWriteChannel writer = getWriteChannel()) {
- /* TableDataWriteChannel is not thread safe */
- writer.write(byteBuffer);
- writeChannel = writer;
- }
- catch (Exception e) {
- writeFailed = true;
- logger.error("BigQuery error", e);
- throw new BigQueryItemWriterException("Error on write happened", e);
- }
- finally {
- if (!writeFailed) {
- String logMessage = "Write operation submitted: " + bigQueryWriteCounter.incrementAndGet();
-
- if (writeChannel != null) {
- logMessage += " -- Job ID: " + writeChannel.getJob().getJobId().getJob();
- if (this.jobConsumer != null) {
- this.jobConsumer.accept(writeChannel.getJob());
- }
- }
-
- if (logger.isDebugEnabled()) {
- logger.debug(logMessage);
- }
- }
- }
- }
-
- /**
- * @return {@link TableDataWriteChannel} that should be closed manually.
- * @see Examples
- */
- private TableDataWriteChannel getWriteChannel() {
- return this.bigQuery.writer(this.writeChannelConfig);
- }
-
- /**
- * Performs common validation for CSV and JSON types.
- */
- @Override
- public void afterPropertiesSet() {
- Assert.notNull(this.bigQuery, "BigQuery service must be provided");
- Assert.notNull(this.writeChannelConfig, "Write channel configuration must be provided");
- Assert.notNull(this.writeChannelConfig.getFormat(), "Data format must be provided");
-
- Assert.isTrue(!isBigtable(), "Google BigTable is not supported");
- Assert.isTrue(!isGoogleSheets(), "Google Sheets is not supported");
- Assert.isTrue(!isDatastore(), "Google Datastore is not supported");
- Assert.isTrue(!isParquet(), "Parquet is not supported");
- Assert.isTrue(!isOrc(), "Orc is not supported");
- Assert.isTrue(!isAvro(), "Avro is not supported");
- Assert.isTrue(!isIceberg(), "Iceberg is not supported");
-
- performFormatSpecificChecks();
-
- final String dataset = this.writeChannelConfig.getDestinationTable().getDataset();
- if (this.datasetInfo == null) {
- this.datasetInfo = DatasetInfo.newBuilder(dataset).build();
- } else {
- Assert.isTrue(Objects.equals(this.datasetInfo.getDatasetId().getDataset(), dataset), "Dataset should be configured properly");
- }
-
- createDataset();
- }
-
- private void createDataset() {
- final TableId tableId = this.writeChannelConfig.getDestinationTable();
- final String datasetToCheck = tableId.getDataset();
-
- if (datasetToCheck != null && this.bigQuery.getDataset(datasetToCheck) == null && this.datasetInfo != null) {
- this.bigQuery.create(this.datasetInfo);
- }
- }
-
- private boolean isAvro() {
- return FormatOptions.avro().getType().equals(this.writeChannelConfig.getFormat());
- }
-
- private boolean isParquet() {
- return FormatOptions.parquet().getType().equals(this.writeChannelConfig.getFormat());
- }
-
- private boolean isOrc() {
- return FormatOptions.orc().getType().equals(this.writeChannelConfig.getFormat());
- }
-
- private boolean isBigtable() {
- return FormatOptions.bigtable().getType().equals(this.writeChannelConfig.getFormat());
- }
-
- private boolean isGoogleSheets() {
- return FormatOptions.googleSheets().getType().equals(this.writeChannelConfig.getFormat());
- }
-
- private boolean isDatastore() {
- return FormatOptions.datastoreBackup().getType().equals(this.writeChannelConfig.getFormat());
- }
-
- private boolean isIceberg() {
- return FormatOptions.iceberg().getType().equals(this.writeChannelConfig.getFormat());
- }
-
- /**
- * Schema can be computed on the BigQuery side during upload,
- * so it is good to know when schema is supplied by user manually.
- *
- * @param table BigQuery table
- * @return {@code true} if BigQuery {@link Table} has schema already described
- */
- protected boolean tableHasDefinedSchema(final Table table) {
- return Optional
- .ofNullable(table)
- .map(Table::getDefinition)
- .map(TableDefinition.class::cast)
- .map(TableDefinition::getSchema)
- .isPresent();
- }
-
- /**
- * Method that setting up metadata about chunk that is being processed. In reality is called once.
- *
- * @param items current chunk
- */
- protected abstract void doInitializeProperties(List extends T> items);
-
- /**
- * Converts chunk into a byte array.
- * Each data type should be converted with respect to its specification.
- *
- * @param items current chunk
- * @return {@link List} converted list of byte arrays
- */
- protected abstract List convertObjectsToByteArrays(List extends T> items);
-
- /**
- * Performs specific checks that are unique to the format.
- */
- protected abstract void performFormatSpecificChecks();
+ /** Logger that can be reused */
+ protected final Log logger = LogFactory.getLog(getClass());
+
+ private final AtomicLong bigQueryWriteCounter = new AtomicLong();
+
+ /**
+ * Describes what should be written (format) and its destination (table).
+ */
+ protected WriteChannelConfiguration writeChannelConfig;
+
+ /**
+ * You can specify here some specific dataset configuration, like location. This
+ * dataset will be created.
+ */
+ private DatasetInfo datasetInfo;
+
+ /**
+ * Your custom logic with {@link Job}.
+ *
+ * {@link Job} will be assigned after {@link TableDataWriteChannel#close()}.
+ */
+ private Consumer jobConsumer;
+
+ private BigQuery bigQuery;
+
+ private boolean writeFailed;
+
+ /**
+ * Fetches table from the provided configuration.
+ * @return {@link Table} that is described in
+ * {@link BigQueryLoadJobBaseItemWriter#writeChannelConfig}
+ */
+ protected Table getTable() {
+ return this.bigQuery.getTable(this.writeChannelConfig.getDestinationTable());
+ }
+
+ /**
+ * Provides additional information about the
+ * {@link com.google.cloud.bigquery.Dataset}.
+ * @param datasetInfo BigQuery dataset info
+ */
+ public void setDatasetInfo(final DatasetInfo datasetInfo) {
+ this.datasetInfo = datasetInfo;
+ }
+
+ /**
+ * Callback when {@link Job} will be finished.
+ * @param consumer your consumer
+ */
+ public void setJobConsumer(final Consumer consumer) {
+ this.jobConsumer = consumer;
+ }
+
+ /**
+ * Describes what should be written (format) and its destination (table).
+ * @param writeChannelConfig BigQuery channel configuration
+ */
+ public void setWriteChannelConfig(final WriteChannelConfiguration writeChannelConfig) {
+ this.writeChannelConfig = writeChannelConfig;
+ }
+
+ /**
+ * BigQuery service, responsible for API calls.
+ * @param bigQuery BigQuery service
+ */
+ public void setBigQuery(final BigQuery bigQuery) {
+ this.bigQuery = bigQuery;
+ }
+
+ @Override
+ public void write(final Chunk extends T> chunk) throws Exception {
+ if (!chunk.isEmpty()) {
+ final List extends T> items = chunk.getItems();
+ doInitializeProperties(items);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(String.format("Mapping %d elements", items.size()));
+ }
+
+ doWriteDataToBigQuery(mapDataToBigQueryFormat(items));
+ }
+ }
+
+ private ByteBuffer mapDataToBigQueryFormat(final List extends T> items) throws IOException {
+ final ByteBuffer byteBuffer;
+ try (final ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+
+ final List data = convertObjectsToByteArrays(items);
+
+ for (byte[] byteArray : data) {
+ outputStream.write(byteArray);
+ }
+
+ // It is extremely important to create larger ByteBuffer.
+ // If you call TableDataWriteChannel too many times, it leads to BigQuery
+ // exceptions.
+ byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
+ }
+ return byteBuffer;
+ }
+
+ private void doWriteDataToBigQuery(final ByteBuffer byteBuffer) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Writing data to BigQuery");
+ }
+
+ TableDataWriteChannel writeChannel = null;
+
+ try (final TableDataWriteChannel writer = getWriteChannel()) {
+ /* TableDataWriteChannel is not thread safe */
+ writer.write(byteBuffer);
+ writeChannel = writer;
+ }
+ catch (Exception e) {
+ writeFailed = true;
+ logger.error("BigQuery error", e);
+ throw new BigQueryItemWriterException("Error on write happened", e);
+ }
+ finally {
+ if (!writeFailed) {
+ String logMessage = "Write operation submitted: " + bigQueryWriteCounter.incrementAndGet();
+
+ if (writeChannel != null) {
+ logMessage += " -- Job ID: " + writeChannel.getJob().getJobId().getJob();
+ if (this.jobConsumer != null) {
+ this.jobConsumer.accept(writeChannel.getJob());
+ }
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(logMessage);
+ }
+ }
+ }
+ }
+
+ /**
+ * @return {@link TableDataWriteChannel} that should be closed manually.
+ * @see Examples
+ */
+ private TableDataWriteChannel getWriteChannel() {
+ return this.bigQuery.writer(this.writeChannelConfig);
+ }
+
+ /**
+ * Performs common validation for CSV and JSON types.
+ */
+ @Override
+ public void afterPropertiesSet() {
+ Assert.notNull(this.bigQuery, "BigQuery service must be provided");
+ Assert.notNull(this.writeChannelConfig, "Write channel configuration must be provided");
+ Assert.notNull(this.writeChannelConfig.getFormat(), "Data format must be provided");
+
+ Assert.isTrue(!isBigtable(), "Google BigTable is not supported");
+ Assert.isTrue(!isGoogleSheets(), "Google Sheets is not supported");
+ Assert.isTrue(!isDatastore(), "Google Datastore is not supported");
+ Assert.isTrue(!isParquet(), "Parquet is not supported");
+ Assert.isTrue(!isOrc(), "Orc is not supported");
+ Assert.isTrue(!isAvro(), "Avro is not supported");
+ Assert.isTrue(!isIceberg(), "Iceberg is not supported");
+
+ performFormatSpecificChecks();
+
+ final String dataset = this.writeChannelConfig.getDestinationTable().getDataset();
+ if (this.datasetInfo == null) {
+ this.datasetInfo = DatasetInfo.newBuilder(dataset).build();
+ }
+ else {
+ boolean datasetEquals = Objects.equals(this.datasetInfo.getDatasetId().getDataset(), dataset);
+ Assert.isTrue(datasetEquals, "Dataset should be configured properly");
+ }
+
+ createDataset();
+ }
+
+ private void createDataset() {
+ final TableId tableId = this.writeChannelConfig.getDestinationTable();
+ final String datasetToCheck = tableId.getDataset();
+
+ if (datasetToCheck != null && this.bigQuery.getDataset(datasetToCheck) == null && this.datasetInfo != null) {
+ this.bigQuery.create(this.datasetInfo);
+ }
+ }
+
+ private boolean isAvro() {
+ return FormatOptions.avro().getType().equals(this.writeChannelConfig.getFormat());
+ }
+
+ private boolean isParquet() {
+ return FormatOptions.parquet().getType().equals(this.writeChannelConfig.getFormat());
+ }
+
+ private boolean isOrc() {
+ return FormatOptions.orc().getType().equals(this.writeChannelConfig.getFormat());
+ }
+
+ private boolean isBigtable() {
+ return FormatOptions.bigtable().getType().equals(this.writeChannelConfig.getFormat());
+ }
+
+ private boolean isGoogleSheets() {
+ return FormatOptions.googleSheets().getType().equals(this.writeChannelConfig.getFormat());
+ }
+
+ private boolean isDatastore() {
+ return FormatOptions.datastoreBackup().getType().equals(this.writeChannelConfig.getFormat());
+ }
+
+ private boolean isIceberg() {
+ return FormatOptions.iceberg().getType().equals(this.writeChannelConfig.getFormat());
+ }
+
+ /**
+ * Schema can be computed on the BigQuery side during upload, so it is good to know
+ * when schema is supplied by user manually.
+ * @param table BigQuery table
+ * @return {@code true} if BigQuery {@link Table} has schema already described
+ */
+ protected boolean tableHasDefinedSchema(final Table table) {
+ return Optional.ofNullable(table)
+ .map(Table::getDefinition)
+ .map(TableDefinition.class::cast)
+ .map(TableDefinition::getSchema)
+ .isPresent();
+ }
+
+ /**
+ * Method that setting up metadata about chunk that is being processed.
+ *
+ * In reality is called once.
+ * @param items current chunk
+ */
+ protected abstract void doInitializeProperties(List extends T> items);
+
+ /**
+ * Converts chunk into a byte array. Each data type should be converted with respect
+ * to its specification.
+ * @param items current chunk
+ * @return {@link List} converted list of byte arrays
+ */
+ protected abstract List convertObjectsToByteArrays(List extends T> items);
+
+ /**
+ * Performs specific checks that are unique to the format.
+ */
+ protected abstract void performFormatSpecificChecks();
}
\ No newline at end of file
diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/csv/BigQueryLoadJobCsvItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/csv/BigQueryLoadJobCsvItemWriter.java
index aa5eb2d..ea1fe0b 100644
--- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/csv/BigQueryLoadJobCsvItemWriter.java
+++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/csv/BigQueryLoadJobCsvItemWriter.java
@@ -40,80 +40,78 @@
*/
public class BigQueryLoadJobCsvItemWriter extends BigQueryLoadJobBaseItemWriter {
- private Converter rowMapper;
- private ObjectWriter objectWriter;
- private Class> itemClass;
-
- /**
- * Actual type of incoming data can be obtained only in runtime
- */
- @Override
- protected synchronized void doInitializeProperties(List extends T> items) {
- if (this.itemClass == null) {
- T firstItem = items.stream().findFirst().orElseThrow(() -> {
- logger.warn("Class type was not found");
- return new IllegalStateException("Class type was not found");
- });
- this.itemClass = firstItem.getClass();
-
- if (this.rowMapper == null) {
- this.objectWriter = new CsvMapper().writerWithTypedSchemaFor(this.itemClass);
- }
-
- logger.debug("Writer setup is completed");
- }
- }
-
- @Override
- protected List convertObjectsToByteArrays(List extends T> items) {
- return items
- .stream()
- .map(this::mapItemToCsv)
- .filter(Predicate.not(ObjectUtils::isEmpty))
- .toList();
- }
-
- @Override
- protected void performFormatSpecificChecks() {
- Table table = getTable();
-
- if (Boolean.TRUE.equals(super.writeChannelConfig.getAutodetect())) {
- if (tableHasDefinedSchema(table) && super.logger.isWarnEnabled()) {
- logger.warn("Mixing autodetect mode with already defined schema may lead to errors on BigQuery side");
- }
- } else {
- Assert.notNull(super.writeChannelConfig.getSchema(), "Schema must be provided");
-
- if (tableHasDefinedSchema(table)) {
- Assert.isTrue(
- Objects.equals(table.getDefinition().getSchema(), super.writeChannelConfig.getSchema()),
- "Schema must be the same"
- );
- }
- }
-
- String format = FormatOptions.csv().getType();
- Assert.isTrue(Objects.equals(format, super.writeChannelConfig.getFormat()), "Only %s format is allowed".formatted(format));
-
- }
-
- /**
- * Row mapper which transforms single BigQuery row into a desired type.
- *
- * @param rowMapper your row mapper
- */
- public void setRowMapper(Converter rowMapper) {
- this.rowMapper = rowMapper;
- }
-
- private byte[] mapItemToCsv(T t) {
- try {
- return rowMapper == null ? objectWriter.writeValueAsBytes(t) : rowMapper.convert(t);
- }
- catch (JsonProcessingException e) {
- logger.error("Error during processing of the line: ", e);
- return new byte[]{};
- }
- }
+ private Converter rowMapper;
+
+ private ObjectWriter objectWriter;
+
+ private Class> itemClass;
+
+ /**
+ * Actual type of incoming data can be obtained only in runtime
+ */
+ @Override
+ protected synchronized void doInitializeProperties(List extends T> items) {
+ if (this.itemClass == null) {
+ T firstItem = items.stream().findFirst().orElseThrow(() -> {
+ logger.warn("Class type was not found");
+ return new IllegalStateException("Class type was not found");
+ });
+ this.itemClass = firstItem.getClass();
+
+ if (this.rowMapper == null) {
+ this.objectWriter = new CsvMapper().writerWithTypedSchemaFor(this.itemClass);
+ }
+
+ logger.debug("Writer setup is completed");
+ }
+ }
+
+ @Override
+ protected List convertObjectsToByteArrays(List extends T> items) {
+ return items.stream().map(this::mapItemToCsv).filter(Predicate.not(ObjectUtils::isEmpty)).toList();
+ }
+
+ @Override
+ protected void performFormatSpecificChecks() {
+ Table table = getTable();
+
+ if (Boolean.TRUE.equals(super.writeChannelConfig.getAutodetect())) {
+ if (tableHasDefinedSchema(table) && super.logger.isWarnEnabled()) {
+ logger.warn("Mixing autodetect mode with already defined schema may lead to errors on BigQuery side");
+ }
+ }
+ else {
+ Assert.notNull(super.writeChannelConfig.getSchema(), "Schema must be provided");
+
+ if (tableHasDefinedSchema(table)) {
+ boolean schemaEquals = Objects.equals(table.getDefinition().getSchema(),
+ super.writeChannelConfig.getSchema());
+ Assert.isTrue(schemaEquals, "Schema must be the same");
+ }
+ }
+
+ String format = FormatOptions.csv().getType();
+ boolean formatEquals = Objects.equals(format, super.writeChannelConfig.getFormat());
+ Assert.isTrue(formatEquals, "Only %s format is allowed".formatted(format));
+
+ }
+
+ /**
+ * Row mapper which transforms single BigQuery row into a desired type.
+ * @param rowMapper your row mapper
+ */
+ public void setRowMapper(Converter rowMapper) {
+ this.rowMapper = rowMapper;
+ }
+
+ private byte[] mapItemToCsv(T t) {
+ try {
+ return rowMapper == null ? objectWriter.writeValueAsBytes(t) : rowMapper.convert(t);
+ }
+ catch (JsonProcessingException e) {
+ logger.error("Error during processing of the line: ", e);
+ return new byte[] {};
+ }
+ }
}
\ No newline at end of file
diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/csv/builder/BigQueryCsvItemWriterBuilder.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/csv/builder/BigQueryCsvItemWriterBuilder.java
index fa13ed8..6e6ea83 100644
--- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/csv/builder/BigQueryCsvItemWriterBuilder.java
+++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/csv/builder/BigQueryCsvItemWriterBuilder.java
@@ -32,93 +32,92 @@
* @param your DTO type
* @author Volodymyr Perebykivskyi
* @since 0.2.0
- * @see Examples
+ * @see Examples
*/
-public class BigQueryCsvItemWriterBuilder {
-
- private Converter rowMapper;
-
- private Consumer jobConsumer;
- private DatasetInfo datasetInfo;
- private WriteChannelConfiguration writeChannelConfig;
- private BigQuery bigQuery;
-
- /**
- * Row mapper which transforms single BigQuery row into desired type.
- *
- * @param rowMapper your row mapper
- * @return {@link BigQueryCsvItemWriterBuilder}
- * @see BigQueryLoadJobCsvItemWriter#setRowMapper(Converter)
- */
- public BigQueryCsvItemWriterBuilder rowMapper(Converter rowMapper) {
- this.rowMapper = rowMapper;
- return this;
- }
-
- /**
- * Provides additional information about the {@link com.google.cloud.bigquery.Dataset}.
- *
- * @param datasetInfo BigQuery dataset info
- * @return {@link BigQueryCsvItemWriterBuilder}
- * @see BigQueryLoadJobCsvItemWriter#setDatasetInfo(DatasetInfo)
- */
- public BigQueryCsvItemWriterBuilder datasetInfo(DatasetInfo datasetInfo) {
- this.datasetInfo = datasetInfo;
- return this;
- }
-
- /**
- * Callback when {@link Job} will be finished.
- *
- * @param consumer your consumer
- * @return {@link BigQueryCsvItemWriterBuilder}
- * @see BigQueryLoadJobCsvItemWriter#setJobConsumer(Consumer)
- */
- public BigQueryCsvItemWriterBuilder jobConsumer(Consumer consumer) {
- this.jobConsumer = consumer;
- return this;
- }
-
- /**
- * Describes what should be written (format) and its destination (table).
- *
- * @param configuration BigQuery channel configuration
- * @return {@link BigQueryCsvItemWriterBuilder}
- * @see BigQueryLoadJobCsvItemWriter#setWriteChannelConfig(WriteChannelConfiguration)
- */
- public BigQueryCsvItemWriterBuilder writeChannelConfig(WriteChannelConfiguration configuration) {
- this.writeChannelConfig = configuration;
- return this;
- }
-
- /**
- * BigQuery service, responsible for API calls.
- *
- * @param bigQuery BigQuery service
- * @return {@link BigQueryCsvItemWriterBuilder}
- * @see BigQueryLoadJobCsvItemWriter#setBigQuery(BigQuery)
- */
- public BigQueryCsvItemWriterBuilder bigQuery(BigQuery bigQuery) {
- this.bigQuery = bigQuery;
- return this;
- }
-
- /**
- * Please remember about {@link BigQueryLoadJobCsvItemWriter#afterPropertiesSet()}.
- *
- * @return {@link BigQueryLoadJobCsvItemWriter}
- */
- public BigQueryLoadJobCsvItemWriter build() {
- BigQueryLoadJobCsvItemWriter writer = new BigQueryLoadJobCsvItemWriter<>();
-
- writer.setBigQuery(this.bigQuery == null ? BigQueryOptions.getDefaultInstance().getService() : this.bigQuery);
-
- writer.setRowMapper(this.rowMapper);
- writer.setWriteChannelConfig(this.writeChannelConfig);
- writer.setJobConsumer(this.jobConsumer);
- writer.setDatasetInfo(this.datasetInfo);
-
- return writer;
- }
+public class BigQueryCsvItemWriterBuilder {
+
+ private Converter rowMapper;
+
+ private Consumer jobConsumer;
+
+ private DatasetInfo datasetInfo;
+
+ private WriteChannelConfiguration writeChannelConfig;
+
+ private BigQuery bigQuery;
+
+ /**
+ * Row mapper which transforms single BigQuery row into desired type.
+ * @param rowMapper your row mapper
+ * @return {@link BigQueryCsvItemWriterBuilder}
+ * @see BigQueryLoadJobCsvItemWriter#setRowMapper(Converter)
+ */
+ public BigQueryCsvItemWriterBuilder rowMapper(Converter rowMapper) {
+ this.rowMapper = rowMapper;
+ return this;
+ }
+
+ /**
+ * Provides additional information about the
+ * {@link com.google.cloud.bigquery.Dataset}.
+ * @param datasetInfo BigQuery dataset info
+ * @return {@link BigQueryCsvItemWriterBuilder}
+ * @see BigQueryLoadJobCsvItemWriter#setDatasetInfo(DatasetInfo)
+ */
+ public BigQueryCsvItemWriterBuilder datasetInfo(DatasetInfo datasetInfo) {
+ this.datasetInfo = datasetInfo;
+ return this;
+ }
+
+ /**
+ * Callback when {@link Job} will be finished.
+ * @param consumer your consumer
+ * @return {@link BigQueryCsvItemWriterBuilder}
+ * @see BigQueryLoadJobCsvItemWriter#setJobConsumer(Consumer)
+ */
+ public BigQueryCsvItemWriterBuilder jobConsumer(Consumer consumer) {
+ this.jobConsumer = consumer;
+ return this;
+ }
+
+ /**
+ * Describes what should be written (format) and its destination (table).
+ * @param configuration BigQuery channel configuration
+ * @return {@link BigQueryCsvItemWriterBuilder}
+ * @see BigQueryLoadJobCsvItemWriter#setWriteChannelConfig(WriteChannelConfiguration)
+ */
+ public BigQueryCsvItemWriterBuilder writeChannelConfig(WriteChannelConfiguration configuration) {
+ this.writeChannelConfig = configuration;
+ return this;
+ }
+
+ /**
+ * BigQuery service, responsible for API calls.
+ * @param bigQuery BigQuery service
+ * @return {@link BigQueryCsvItemWriterBuilder}
+ * @see BigQueryLoadJobCsvItemWriter#setBigQuery(BigQuery)
+ */
+ public BigQueryCsvItemWriterBuilder bigQuery(BigQuery bigQuery) {
+ this.bigQuery = bigQuery;
+ return this;
+ }
+
+ /**
+ * Please remember about {@link BigQueryLoadJobCsvItemWriter#afterPropertiesSet()}.
+ * @return {@link BigQueryLoadJobCsvItemWriter}
+ */
+ public BigQueryLoadJobCsvItemWriter build() {
+ BigQueryLoadJobCsvItemWriter writer = new BigQueryLoadJobCsvItemWriter<>();
+
+ writer.setBigQuery(this.bigQuery == null ? BigQueryOptions.getDefaultInstance().getService() : this.bigQuery);
+
+ writer.setRowMapper(this.rowMapper);
+ writer.setWriteChannelConfig(this.writeChannelConfig);
+ writer.setJobConsumer(this.jobConsumer);
+ writer.setDatasetInfo(this.datasetInfo);
+
+ return writer;
+ }
}
\ No newline at end of file
diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/json/BigQueryLoadJobJsonItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/json/BigQueryLoadJobJsonItemWriter.java
index a557110..e36ea5d 100644
--- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/json/BigQueryLoadJobJsonItemWriter.java
+++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/json/BigQueryLoadJobJsonItemWriter.java
@@ -38,67 +38,66 @@
*/
public class BigQueryLoadJobJsonItemWriter extends BigQueryLoadJobBaseItemWriter {
- private static final String LF = "\n";
-
- private JsonObjectMarshaller marshaller;
-
- @Override
- protected void doInitializeProperties(List extends T> items) {
- // Unused
- }
-
- @Override
- protected List convertObjectsToByteArrays(List extends T> items) {
- return items
- .stream()
- .map(marshaller::marshal)
- .filter(Predicate.not(ObjectUtils::isEmpty))
- .map(this::convertToNdJson)
- .map(row -> row.getBytes(StandardCharsets.UTF_8))
- .toList();
- }
-
- @Override
- protected void performFormatSpecificChecks() {
- Assert.notNull(this.marshaller, "Marshaller must be provided");
-
- Table table = getTable();
-
- if (Boolean.TRUE.equals(writeChannelConfig.getAutodetect())) {
- if (tableHasDefinedSchema(table) && logger.isWarnEnabled()) {
- logger.warn("Mixing autodetect mode with already defined schema may lead to errors on BigQuery side");
- }
- } else {
- Assert.notNull(writeChannelConfig.getSchema(), "Schema must be provided");
-
- if (tableHasDefinedSchema(table)) {
- Assert.isTrue(
- Objects.equals(table.getDefinition().getSchema(), writeChannelConfig.getSchema()),
- "Schema must be the same"
- );
- }
- }
-
- String format = FormatOptions.json().getType();
- Assert.isTrue(Objects.equals(format, super.writeChannelConfig.getFormat()), "Only %s format is allowed".formatted(format));
- }
-
- /**
- * Converter that transforms a single row into a {@link String}.
- *
- * @param marshaller your JSON mapper
- */
- public void setMarshaller(JsonObjectMarshaller marshaller) {
- this.marshaller = marshaller;
- }
-
- /**
- * BigQuery uses ndjson.
- * It is expected that to pass here JSON line generated by
- * {@link com.fasterxml.jackson.databind.ObjectMapper} or any other JSON parser.
- */
- private String convertToNdJson(String json) {
- return json.concat(LF);
- }
+ private static final String LF = "\n";
+
+ private JsonObjectMarshaller marshaller;
+
+ @Override
+ protected void doInitializeProperties(List extends T> items) {
+ // Unused
+ }
+
+ @Override
+ protected List convertObjectsToByteArrays(List extends T> items) {
+ return items.stream()
+ .map(marshaller::marshal)
+ .filter(Predicate.not(ObjectUtils::isEmpty))
+ .map(this::convertToNdJson)
+ .map(row -> row.getBytes(StandardCharsets.UTF_8))
+ .toList();
+ }
+
+ @Override
+ protected void performFormatSpecificChecks() {
+ Assert.notNull(this.marshaller, "Marshaller must be provided");
+
+ Table table = getTable();
+
+ if (Boolean.TRUE.equals(writeChannelConfig.getAutodetect())) {
+ if (tableHasDefinedSchema(table) && logger.isWarnEnabled()) {
+ logger.warn("Mixing autodetect mode with already defined schema may lead to errors on BigQuery side");
+ }
+ }
+ else {
+ Assert.notNull(writeChannelConfig.getSchema(), "Schema must be provided");
+
+ if (tableHasDefinedSchema(table)) {
+ boolean schemaEquals = Objects.equals(table.getDefinition().getSchema(),
+ writeChannelConfig.getSchema());
+ Assert.isTrue(schemaEquals, "Schema must be the same");
+ }
+ }
+
+ String format = FormatOptions.json().getType();
+ boolean formatEquals = Objects.equals(format, super.writeChannelConfig.getFormat());
+ Assert.isTrue(formatEquals, "Only %s format is allowed".formatted(format));
+ }
+
+ /**
+ * Converter that transforms a single row into a {@link String}.
+ * @param marshaller your JSON mapper
+ */
+ public void setMarshaller(JsonObjectMarshaller marshaller) {
+ this.marshaller = marshaller;
+ }
+
+ /**
+ * BigQuery uses ndjson. It is
+ * expected that to pass here JSON line generated by
+ * {@link com.fasterxml.jackson.databind.ObjectMapper} or any other JSON parser.
+ */
+ private String convertToNdJson(String json) {
+ return json.concat(LF);
+ }
}
\ No newline at end of file
diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/json/builder/BigQueryLoadJobJsonItemWriterBuilder.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/json/builder/BigQueryLoadJobJsonItemWriterBuilder.java
index 7f8c612..59255a4 100644
--- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/json/builder/BigQueryLoadJobJsonItemWriterBuilder.java
+++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/json/builder/BigQueryLoadJobJsonItemWriterBuilder.java
@@ -33,92 +33,92 @@
* @param your DTO type
* @author Volodymyr Perebykivskyi
* @since 0.2.0
- * @see Examples
+ * @see Examples
*/
-public class BigQueryLoadJobJsonItemWriterBuilder {
-
- private JsonObjectMarshaller marshaller;
- private Consumer jobConsumer;
- private DatasetInfo datasetInfo;
- private WriteChannelConfiguration writeChannelConfig;
- private BigQuery bigQuery;
-
- /**
- * Converts your DTO into a {@link String}.
- *
- * @param marshaller your mapper
- * @return {@link BigQueryLoadJobJsonItemWriterBuilder}
- * @see BigQueryLoadJobJsonItemWriter#setMarshaller(JsonObjectMarshaller)
- */
- public BigQueryLoadJobJsonItemWriterBuilder marshaller(JsonObjectMarshaller marshaller) {
- this.marshaller = marshaller;
- return this;
- }
-
- /**
- * Provides additional information about the {@link com.google.cloud.bigquery.Dataset}.
- *
- * @param datasetInfo BigQuery dataset info
- * @return {@link BigQueryLoadJobJsonItemWriterBuilder}
- * @see BigQueryLoadJobJsonItemWriter#setDatasetInfo(DatasetInfo)
- */
- public BigQueryLoadJobJsonItemWriterBuilder datasetInfo(DatasetInfo datasetInfo) {
- this.datasetInfo = datasetInfo;
- return this;
- }
-
- /**
- * Callback when {@link Job} will be finished.
- *
- * @param consumer your consumer
- * @return {@link BigQueryLoadJobJsonItemWriterBuilder}
- * @see BigQueryLoadJobJsonItemWriter#setJobConsumer(Consumer)
- */
- public BigQueryLoadJobJsonItemWriterBuilder jobConsumer(Consumer consumer) {
- this.jobConsumer = consumer;
- return this;
- }
-
- /**
- * Describes what should be written (format) and its destination (table).
- *
- * @param configuration BigQuery channel configuration
- * @return {@link BigQueryLoadJobJsonItemWriterBuilder}
- * @see BigQueryLoadJobJsonItemWriter#setWriteChannelConfig(WriteChannelConfiguration)
- */
- public BigQueryLoadJobJsonItemWriterBuilder writeChannelConfig(WriteChannelConfiguration configuration) {
- this.writeChannelConfig = configuration;
- return this;
- }
-
- /**
- * BigQuery service, responsible for API calls.
- *
- * @param bigQuery BigQuery service
- * @return {@link BigQueryLoadJobJsonItemWriter}
- * @see BigQueryLoadJobJsonItemWriter#setBigQuery(BigQuery)
- */
- public BigQueryLoadJobJsonItemWriterBuilder bigQuery(BigQuery bigQuery) {
- this.bigQuery = bigQuery;
- return this;
- }
-
- /**
- * Please remember about {@link BigQueryLoadJobJsonItemWriter#afterPropertiesSet()}.
- *
- * @return {@link BigQueryLoadJobJsonItemWriter}
- */
- public BigQueryLoadJobJsonItemWriter build() {
- BigQueryLoadJobJsonItemWriter writer = new BigQueryLoadJobJsonItemWriter<>();
-
- writer.setMarshaller(this.marshaller == null ? new JacksonJsonObjectMarshaller<>() : this.marshaller);
- writer.setBigQuery(this.bigQuery == null ? BigQueryOptions.getDefaultInstance().getService() : this.bigQuery);
-
- writer.setWriteChannelConfig(this.writeChannelConfig);
- writer.setJobConsumer(this.jobConsumer);
- writer.setDatasetInfo(this.datasetInfo);
-
- return writer;
- }
+public class BigQueryLoadJobJsonItemWriterBuilder {
+
+ private JsonObjectMarshaller marshaller;
+
+ private Consumer jobConsumer;
+
+ private DatasetInfo datasetInfo;
+
+ private WriteChannelConfiguration writeChannelConfig;
+
+ private BigQuery bigQuery;
+
+ /**
+ * Converts your DTO into a {@link String}.
+ * @param marshaller your mapper
+ * @return {@link BigQueryLoadJobJsonItemWriterBuilder}
+ * @see BigQueryLoadJobJsonItemWriter#setMarshaller(JsonObjectMarshaller)
+ */
+ public BigQueryLoadJobJsonItemWriterBuilder marshaller(JsonObjectMarshaller marshaller) {
+ this.marshaller = marshaller;
+ return this;
+ }
+
+ /**
+ * Provides additional information about the
+ * {@link com.google.cloud.bigquery.Dataset}.
+ * @param datasetInfo BigQuery dataset info
+ * @return {@link BigQueryLoadJobJsonItemWriterBuilder}
+ * @see BigQueryLoadJobJsonItemWriter#setDatasetInfo(DatasetInfo)
+ */
+ public BigQueryLoadJobJsonItemWriterBuilder datasetInfo(DatasetInfo datasetInfo) {
+ this.datasetInfo = datasetInfo;
+ return this;
+ }
+
+ /**
+ * Callback when {@link Job} will be finished.
+ * @param consumer your consumer
+ * @return {@link BigQueryLoadJobJsonItemWriterBuilder}
+ * @see BigQueryLoadJobJsonItemWriter#setJobConsumer(Consumer)
+ */
+ public BigQueryLoadJobJsonItemWriterBuilder jobConsumer(Consumer consumer) {
+ this.jobConsumer = consumer;
+ return this;
+ }
+
+ /**
+ * Describes what should be written (format) and its destination (table).
+ * @param configuration BigQuery channel configuration
+ * @return {@link BigQueryLoadJobJsonItemWriterBuilder}
+ * @see BigQueryLoadJobJsonItemWriter#setWriteChannelConfig(WriteChannelConfiguration)
+ */
+ public BigQueryLoadJobJsonItemWriterBuilder writeChannelConfig(WriteChannelConfiguration configuration) {
+ this.writeChannelConfig = configuration;
+ return this;
+ }
+
+ /**
+ * BigQuery service, responsible for API calls.
+ * @param bigQuery BigQuery service
+ * @return {@link BigQueryLoadJobJsonItemWriter}
+ * @see BigQueryLoadJobJsonItemWriter#setBigQuery(BigQuery)
+ */
+ public BigQueryLoadJobJsonItemWriterBuilder bigQuery(BigQuery bigQuery) {
+ this.bigQuery = bigQuery;
+ return this;
+ }
+
+ /**
+ * Please remember about {@link BigQueryLoadJobJsonItemWriter#afterPropertiesSet()}.
+ * @return {@link BigQueryLoadJobJsonItemWriter}
+ */
+ public BigQueryLoadJobJsonItemWriter build() {
+ BigQueryLoadJobJsonItemWriter writer = new BigQueryLoadJobJsonItemWriter<>();
+
+ writer.setMarshaller(this.marshaller == null ? new JacksonJsonObjectMarshaller<>() : this.marshaller);
+ writer.setBigQuery(this.bigQuery == null ? BigQueryOptions.getDefaultInstance().getService() : this.bigQuery);
+
+ writer.setWriteChannelConfig(this.writeChannelConfig);
+ writer.setJobConsumer(this.jobConsumer);
+ writer.setDatasetInfo(this.datasetInfo);
+
+ return writer;
+ }
}
\ No newline at end of file
diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/package-info.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/package-info.java
index 77e882a..1e110f3 100644
--- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/package-info.java
+++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/package-info.java
@@ -15,16 +15,19 @@
*/
/**
- * {@link com.google.cloud.bigquery.JobConfiguration.Type#LOAD} {@link com.google.cloud.bigquery.Job}
+ * {@link com.google.cloud.bigquery.JobConfiguration.Type#LOAD}
+ * {@link com.google.cloud.bigquery.Job}
*
- *
Supported formats:
+ *
+ * Supported formats:
*
- *
JSON
- *
CSV
+ *
JSON
+ *
CSV
*
*
- *
If you generate {@link com.google.cloud.bigquery.TableDataWriteChannel}
- * and you {@link com.google.cloud.bigquery.TableDataWriteChannel#close()} it,
- * there is no guarantee that single {@link com.google.cloud.bigquery.Job} will be created.
+ *
+ * If you generate {@link com.google.cloud.bigquery.TableDataWriteChannel} and you
+ * {@link com.google.cloud.bigquery.TableDataWriteChannel#close()} it, there is no
+ * guarantee that single {@link com.google.cloud.bigquery.Job} will be created.
*/
package org.springframework.batch.extensions.bigquery.writer.loadjob;
\ No newline at end of file
diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/package-info.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/package-info.java
index 48c3746..3d30a24 100644
--- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/package-info.java
+++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/package-info.java
@@ -20,16 +20,20 @@
* These writers use a Java client from Google, so we cannot control this flow fully.
*
*
- * Take into account that BigQuery has rate limits, and it is very easy to exceed those in concurrent environment.
+ * Take into account that BigQuery has rate limits, and it is very easy to exceed those in
+ * concurrent environment.
*
- * Also, worth mentioning that you should ensure ordering of the fields in DTO that you are going to send to the BigQuery.
- * In case of CSV/JSON and Jackson consider using {@link com.fasterxml.jackson.annotation.JsonPropertyOrder}.
+ * Also, worth mentioning that you should ensure ordering of the fields in DTO that you
+ * are going to send to the BigQuery. In case of CSV/JSON and Jackson consider using
+ * {@link com.fasterxml.jackson.annotation.JsonPropertyOrder}.
*
* @author Volodymyr Perebykivskyi
* @since 0.2.0
* @see Google BigQuery
- * @see BigQuery Java Client on GitHub
- * @see BigQuery Quotas & Limits
+ * @see BigQuery Java Client on
+ * GitHub
+ * @see BigQuery Quotas &
+ * Limits
*/
@NonNullApi
package org.springframework.batch.extensions.bigquery.writer;
diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/BigQueryWriteApiCommitedJsonItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/BigQueryWriteApiCommitedJsonItemWriter.java
index 7c41069..fa93814 100644
--- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/BigQueryWriteApiCommitedJsonItemWriter.java
+++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/BigQueryWriteApiCommitedJsonItemWriter.java
@@ -47,139 +47,146 @@
* @param your DTO type
* @author Volodymyr Perebykivskyi
* @see JSON
- * @see Commited type storage write API
+ * @see Commited
+ * type storage write API
* @since 0.2.0
*/
public class BigQueryWriteApiCommitedJsonItemWriter implements ItemWriter, InitializingBean {
- /**
- * Logger that can be reused
- */
- private final Log logger = LogFactory.getLog(getClass());
-
- private final AtomicLong bigQueryWriteCounter = new AtomicLong();
-
- private BigQueryWriteClient bigQueryWriteClient;
- private TableName tableName;
- private JsonObjectMarshaller marshaller;
- private ApiFutureCallback apiFutureCallback;
- private Executor executor;
-
- private boolean writeFailed;
-
- @Override
- public void write(final Chunk extends T> chunk) throws Exception {
- if (!chunk.isEmpty()) {
- final List extends T> items = chunk.getItems();
- String streamName = null;
-
- try {
- WriteStream writeStreamToCreate = WriteStream.newBuilder()
- .setType(WriteStream.Type.COMMITTED)
- .build();
-
- CreateWriteStreamRequest createStreamRequest = CreateWriteStreamRequest.newBuilder()
- .setParent(tableName.toString())
- .setWriteStream(writeStreamToCreate)
- .build();
-
- WriteStream writeStream = bigQueryWriteClient.createWriteStream(createStreamRequest);
- streamName = writeStream.getName();
-
- if (logger.isDebugEnabled()) {
- logger.debug("Created a stream=" + streamName);
- }
-
- try (final JsonStreamWriter writer = JsonStreamWriter.newBuilder(writeStream.getName(), bigQueryWriteClient).build()) {
- if (logger.isDebugEnabled()) {
- logger.debug(String.format("Mapping %d elements", items.size()));
- }
- final JSONArray array = new JSONArray();
- items.stream().map(marshaller::marshal).map(JSONObject::new).forEach(array::put);
-
- if (logger.isDebugEnabled()) {
- logger.debug("Writing data to BigQuery");
- }
- final ApiFuture future = writer.append(array);
-
- if (apiFutureCallback != null) {
- ApiFutures.addCallback(future, apiFutureCallback, executor);
- }
- }
- } catch (Exception e) {
- writeFailed = true;
- logger.error("BigQuery error", e);
- throw new BigQueryItemWriterException("Error on write happened", e);
- } finally {
- if (StringUtils.hasText(streamName)) {
- long rowCount = bigQueryWriteClient.finalizeWriteStream(streamName).getRowCount();
- if (chunk.size() != rowCount) {
- logger.warn("Finalized response row count=%d is not the same as chunk size=%d".formatted(rowCount, chunk.size()));
- }
- }
-
- if (!writeFailed && logger.isDebugEnabled()) {
- logger.debug("Write operation submitted: " + bigQueryWriteCounter.incrementAndGet());
- }
- }
- }
- }
-
- @Override
- public void afterPropertiesSet() throws Exception {
- Assert.notNull(this.bigQueryWriteClient, "BigQuery write client must be provided");
- Assert.notNull(this.tableName, "Table name must be provided");
- Assert.notNull(this.marshaller, "Marshaller must be provided");
-
- if (this.apiFutureCallback != null) {
- Assert.notNull(this.executor, "Executor must be provided");
- }
- }
-
- /**
- * GRPC client that wraps communication with BigQuery.
- *
- * @param bigQueryWriteClient a client
- */
- public void setBigQueryWriteClient(final BigQueryWriteClient bigQueryWriteClient) {
- this.bigQueryWriteClient = bigQueryWriteClient;
- }
-
- /**
- * A full path to the BigQuery table.
- *
- * @param tableName a name
- */
- public void setTableName(final TableName tableName) {
- this.tableName = tableName;
- }
-
- /**
- * Converter that transforms a single row into a {@link String}.
- *
- * @param marshaller your JSON mapper
- */
- public void setMarshaller(final JsonObjectMarshaller marshaller) {
- this.marshaller = marshaller;
- }
-
- /**
- * {@link ApiFutureCallback} that will be called in case of successful of failed response.
- *
- * @param apiFutureCallback a callback
- * @see BigQueryWriteApiCommitedJsonItemWriter#setExecutor(Executor)
- */
- public void setApiFutureCallback(final ApiFutureCallback apiFutureCallback) {
- this.apiFutureCallback = apiFutureCallback;
- }
-
- /**
- * An {@link Executor} that will be calling a {@link ApiFutureCallback}.
- *
- * @param executor an executor
- * @see BigQueryWriteApiCommitedJsonItemWriter#setApiFutureCallback(ApiFutureCallback)
- */
- public void setExecutor(final Executor executor) {
- this.executor = executor;
- }
+ /**
+ * Logger that can be reused
+ */
+ private final Log logger = LogFactory.getLog(getClass());
+
+ private final AtomicLong bigQueryWriteCounter = new AtomicLong();
+
+ private BigQueryWriteClient bigQueryWriteClient;
+
+ private TableName tableName;
+
+ private JsonObjectMarshaller marshaller;
+
+ private ApiFutureCallback apiFutureCallback;
+
+ private Executor executor;
+
+ private boolean writeFailed;
+
+ @Override
+ public void write(final Chunk extends T> chunk) throws Exception {
+ if (!chunk.isEmpty()) {
+ final List extends T> items = chunk.getItems();
+ String streamName = null;
+
+ try {
+ WriteStream writeStreamToCreate = WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build();
+
+ CreateWriteStreamRequest createStreamRequest = CreateWriteStreamRequest.newBuilder()
+ .setParent(tableName.toString())
+ .setWriteStream(writeStreamToCreate)
+ .build();
+
+ WriteStream writeStream = bigQueryWriteClient.createWriteStream(createStreamRequest);
+ streamName = writeStream.getName();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Created a stream=" + streamName);
+ }
+
+ final JsonStreamWriter jsonWriter = JsonStreamWriter
+ .newBuilder(writeStream.getName(), bigQueryWriteClient)
+ .build();
+
+ try (jsonWriter) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(String.format("Mapping %d elements", items.size()));
+ }
+ final JSONArray array = new JSONArray();
+ items.stream().map(marshaller::marshal).map(JSONObject::new).forEach(array::put);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Writing data to BigQuery");
+ }
+ final ApiFuture future = jsonWriter.append(array);
+
+ if (apiFutureCallback != null) {
+ ApiFutures.addCallback(future, apiFutureCallback, executor);
+ }
+ }
+ }
+ catch (Exception e) {
+ writeFailed = true;
+ logger.error("BigQuery error", e);
+ throw new BigQueryItemWriterException("Error on write happened", e);
+ }
+ finally {
+ if (StringUtils.hasText(streamName)) {
+ final long rowCount = bigQueryWriteClient.finalizeWriteStream(streamName).getRowCount();
+ if (chunk.size() != rowCount) {
+ logger.warn("Finalized response row count=%d is not the same as chunk size=%d"
+ .formatted(rowCount, chunk.size()));
+ }
+ }
+
+ if (!writeFailed && logger.isDebugEnabled()) {
+ logger.debug("Write operation submitted: " + bigQueryWriteCounter.incrementAndGet());
+ }
+ }
+ }
+ }
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ Assert.notNull(this.bigQueryWriteClient, "BigQuery write client must be provided");
+ Assert.notNull(this.tableName, "Table name must be provided");
+ Assert.notNull(this.marshaller, "Marshaller must be provided");
+
+ if (this.apiFutureCallback != null) {
+ Assert.notNull(this.executor, "Executor must be provided");
+ }
+ }
+
+ /**
+ * GRPC client that wraps communication with BigQuery.
+ * @param bigQueryWriteClient a client
+ */
+ public void setBigQueryWriteClient(final BigQueryWriteClient bigQueryWriteClient) {
+ this.bigQueryWriteClient = bigQueryWriteClient;
+ }
+
+ /**
+ * A full path to the BigQuery table.
+ * @param tableName a name
+ */
+ public void setTableName(final TableName tableName) {
+ this.tableName = tableName;
+ }
+
+ /**
+ * Converter that transforms a single row into a {@link String}.
+ * @param marshaller your JSON mapper
+ */
+ public void setMarshaller(final JsonObjectMarshaller marshaller) {
+ this.marshaller = marshaller;
+ }
+
+ /**
+ * {@link ApiFutureCallback} that will be called in case of successful of failed
+ * response.
+ * @param apiFutureCallback a callback
+ * @see BigQueryWriteApiCommitedJsonItemWriter#setExecutor(Executor)
+ */
+ public void setApiFutureCallback(final ApiFutureCallback apiFutureCallback) {
+ this.apiFutureCallback = apiFutureCallback;
+ }
+
+ /**
+ * An {@link Executor} that will be calling a {@link ApiFutureCallback}.
+ * @param executor an executor
+ * @see BigQueryWriteApiCommitedJsonItemWriter#setApiFutureCallback(ApiFutureCallback)
+ */
+ public void setExecutor(final Executor executor) {
+ this.executor = executor;
+ }
+
}
diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/BigQueryWriteApiPendingJsonItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/BigQueryWriteApiPendingJsonItemWriter.java
index b64fc20..01025f9 100644
--- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/BigQueryWriteApiPendingJsonItemWriter.java
+++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/BigQueryWriteApiPendingJsonItemWriter.java
@@ -49,154 +49,163 @@
* @param your DTO type
* @author Volodymyr Perebykivskyi
* @see JSON
- * @see Pending type storage write API
+ * @see Pending
+ * type storage write API
* @since 0.2.0
*/
public class BigQueryWriteApiPendingJsonItemWriter implements ItemWriter, InitializingBean {
- /**
- * Logger that can be reused
- */
- private final Log logger = LogFactory.getLog(getClass());
-
- private final AtomicLong bigQueryWriteCounter = new AtomicLong();
-
- private BigQueryWriteClient bigQueryWriteClient;
- private TableName tableName;
- private JsonObjectMarshaller marshaller;
- private ApiFutureCallback apiFutureCallback;
- private Executor executor;
-
- private boolean writeFailed;
-
- @Override
- public void write(final Chunk extends T> chunk) throws Exception {
- if (!chunk.isEmpty()) {
- final List extends T> items = chunk.getItems();
- String streamName = null;
-
- try {
- WriteStream writeStreamToCreate = WriteStream.newBuilder()
- .setType(WriteStream.Type.PENDING)
- .build();
-
- CreateWriteStreamRequest createStreamRequest = CreateWriteStreamRequest.newBuilder()
- .setParent(tableName.toString())
- .setWriteStream(writeStreamToCreate)
- .build();
-
- WriteStream writeStream = bigQueryWriteClient.createWriteStream(createStreamRequest);
- streamName = writeStream.getName();
-
- if (logger.isDebugEnabled()) {
- logger.debug("Created a stream=" + streamName);
- }
-
- try (final JsonStreamWriter writer = JsonStreamWriter.newBuilder(writeStream.getName(), bigQueryWriteClient).build()) {
- if (logger.isDebugEnabled()) {
- logger.debug(String.format("Mapping %d elements", items.size()));
- }
- final JSONArray array = new JSONArray();
- items.stream().map(marshaller::marshal).map(JSONObject::new).forEach(array::put);
-
- if (logger.isDebugEnabled()) {
- logger.debug("Writing data to BigQuery");
- }
- final ApiFuture future = writer.append(array);
-
- if (apiFutureCallback != null) {
- ApiFutures.addCallback(future, apiFutureCallback, executor);
- }
- }
- } catch (Exception e) {
- writeFailed = true;
- logger.error("BigQuery error", e);
- throw new BigQueryItemWriterException("Error on write happened", e);
- } finally {
- if (StringUtils.hasText(streamName)) {
- long rowCount = bigQueryWriteClient.finalizeWriteStream(streamName).getRowCount();
- if (chunk.size() != rowCount) {
- logger.warn("Finalized response row count=%d is not the same as chunk size=%d".formatted(rowCount, chunk.size()));
- }
-
- BatchCommitWriteStreamsRequest batchRequest = BatchCommitWriteStreamsRequest.newBuilder()
- .setParent(tableName.toString())
- .addWriteStreams(streamName)
- .build();
- BatchCommitWriteStreamsResponse batchResponse = bigQueryWriteClient.batchCommitWriteStreams(batchRequest);
-
- if (!batchResponse.hasCommitTime()) {
- writeFailed = true;
- logger.error("BigQuery error=" + batchResponse.getStreamErrorsList());
- }
- }
-
- if (!writeFailed && logger.isDebugEnabled()) {
- logger.debug("Write operation submitted: " + bigQueryWriteCounter.incrementAndGet());
- }
- }
-
- if (writeFailed) {
- throw new BigQueryItemWriterException("Error on write happened");
- }
- }
- }
-
- @Override
- public void afterPropertiesSet() throws Exception {
- Assert.notNull(this.bigQueryWriteClient, "BigQuery write client must be provided");
- Assert.notNull(this.tableName, "Table name must be provided");
- Assert.notNull(this.marshaller, "Marshaller must be provided");
-
- if (this.apiFutureCallback != null) {
- Assert.notNull(this.executor, "Executor must be provided");
- }
- }
-
- /**
- * GRPC client that wraps communication with BigQuery.
- *
- * @param bigQueryWriteClient a client
- */
- public void setBigQueryWriteClient(final BigQueryWriteClient bigQueryWriteClient) {
- this.bigQueryWriteClient = bigQueryWriteClient;
- }
-
- /**
- * A full path to the BigQuery table.
- *
- * @param tableName a name
- */
- public void setTableName(final TableName tableName) {
- this.tableName = tableName;
- }
-
- /**
- * Converter that transforms a single row into a {@link String}.
- *
- * @param marshaller your JSON mapper
- */
- public void setMarshaller(final JsonObjectMarshaller marshaller) {
- this.marshaller = marshaller;
- }
-
- /**
- * {@link ApiFutureCallback} that will be called in case of successful of failed response.
- *
- * @param apiFutureCallback a callback
- * @see BigQueryWriteApiPendingJsonItemWriter#setExecutor(Executor)
- */
- public void setApiFutureCallback(final ApiFutureCallback apiFutureCallback) {
- this.apiFutureCallback = apiFutureCallback;
- }
-
- /**
- * An {@link Executor} that will be calling a {@link ApiFutureCallback}.
- *
- * @param executor an executor
- * @see BigQueryWriteApiPendingJsonItemWriter#setApiFutureCallback(ApiFutureCallback)
- */
- public void setExecutor(final Executor executor) {
- this.executor = executor;
- }
+ /**
+ * Logger that can be reused
+ */
+ private final Log logger = LogFactory.getLog(getClass());
+
+ private final AtomicLong bigQueryWriteCounter = new AtomicLong();
+
+ private BigQueryWriteClient bigQueryWriteClient;
+
+ private TableName tableName;
+
+ private JsonObjectMarshaller marshaller;
+
+ private ApiFutureCallback apiFutureCallback;
+
+ private Executor executor;
+
+ private boolean writeFailed;
+
+ @Override
+ public void write(final Chunk extends T> chunk) throws Exception {
+ if (!chunk.isEmpty()) {
+ final List extends T> items = chunk.getItems();
+ String streamName = null;
+
+ try {
+ WriteStream writeStreamToCreate = WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build();
+
+ CreateWriteStreamRequest createStreamRequest = CreateWriteStreamRequest.newBuilder()
+ .setParent(tableName.toString())
+ .setWriteStream(writeStreamToCreate)
+ .build();
+
+ WriteStream writeStream = bigQueryWriteClient.createWriteStream(createStreamRequest);
+ streamName = writeStream.getName();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Created a stream=" + streamName);
+ }
+
+ final JsonStreamWriter jsonWriter = JsonStreamWriter
+ .newBuilder(writeStream.getName(), bigQueryWriteClient)
+ .build();
+
+ try (jsonWriter) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(String.format("Mapping %d elements", items.size()));
+ }
+ final JSONArray array = new JSONArray();
+ items.stream().map(marshaller::marshal).map(JSONObject::new).forEach(array::put);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Writing data to BigQuery");
+ }
+ final ApiFuture future = jsonWriter.append(array);
+
+ if (apiFutureCallback != null) {
+ ApiFutures.addCallback(future, apiFutureCallback, executor);
+ }
+ }
+ }
+ catch (Exception e) {
+ writeFailed = true;
+ logger.error("BigQuery error", e);
+ throw new BigQueryItemWriterException("Error on write happened", e);
+ }
+ finally {
+ if (StringUtils.hasText(streamName)) {
+ final long rowCount = bigQueryWriteClient.finalizeWriteStream(streamName).getRowCount();
+ if (chunk.size() != rowCount) {
+ logger.warn("Finalized response row count=%d is not the same as chunk size=%d"
+ .formatted(rowCount, chunk.size()));
+ }
+
+ final BatchCommitWriteStreamsRequest batchRequest = BatchCommitWriteStreamsRequest.newBuilder()
+ .setParent(tableName.toString())
+ .addWriteStreams(streamName)
+ .build();
+
+ final BatchCommitWriteStreamsResponse batchResponse = bigQueryWriteClient
+ .batchCommitWriteStreams(batchRequest);
+
+ if (!batchResponse.hasCommitTime()) {
+ writeFailed = true;
+ logger.error("BigQuery error=" + batchResponse.getStreamErrorsList());
+ }
+ }
+
+ if (!writeFailed && logger.isDebugEnabled()) {
+ logger.debug("Write operation submitted: " + bigQueryWriteCounter.incrementAndGet());
+ }
+ }
+
+ if (writeFailed) {
+ throw new BigQueryItemWriterException("Error on write happened");
+ }
+ }
+ }
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ Assert.notNull(this.bigQueryWriteClient, "BigQuery write client must be provided");
+ Assert.notNull(this.tableName, "Table name must be provided");
+ Assert.notNull(this.marshaller, "Marshaller must be provided");
+
+ if (this.apiFutureCallback != null) {
+ Assert.notNull(this.executor, "Executor must be provided");
+ }
+ }
+
+ /**
+ * GRPC client that wraps communication with BigQuery.
+ * @param bigQueryWriteClient a client
+ */
+ public void setBigQueryWriteClient(final BigQueryWriteClient bigQueryWriteClient) {
+ this.bigQueryWriteClient = bigQueryWriteClient;
+ }
+
+ /**
+ * A full path to the BigQuery table.
+ * @param tableName a name
+ */
+ public void setTableName(final TableName tableName) {
+ this.tableName = tableName;
+ }
+
+ /**
+ * Converter that transforms a single row into a {@link String}.
+ * @param marshaller your JSON mapper
+ */
+ public void setMarshaller(final JsonObjectMarshaller marshaller) {
+ this.marshaller = marshaller;
+ }
+
+ /**
+ * {@link ApiFutureCallback} that will be called in case of successful of failed
+ * response.
+ * @param apiFutureCallback a callback
+ * @see BigQueryWriteApiPendingJsonItemWriter#setExecutor(Executor)
+ */
+ public void setApiFutureCallback(final ApiFutureCallback apiFutureCallback) {
+ this.apiFutureCallback = apiFutureCallback;
+ }
+
+ /**
+ * An {@link Executor} that will be calling a {@link ApiFutureCallback}.
+ * @param executor an executor
+ * @see BigQueryWriteApiPendingJsonItemWriter#setApiFutureCallback(ApiFutureCallback)
+ */
+ public void setExecutor(final Executor executor) {
+ this.executor = executor;
+ }
+
}
diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/builder/BigQueryWriteApiCommitedJsonItemWriterBuilder.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/builder/BigQueryWriteApiCommitedJsonItemWriterBuilder.java
index fa8bd88..2639800 100644
--- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/builder/BigQueryWriteApiCommitedJsonItemWriterBuilder.java
+++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/builder/BigQueryWriteApiCommitedJsonItemWriterBuilder.java
@@ -34,97 +34,102 @@
* @param your DTO type
* @author Volodymyr Perebykivskyi
* @since 0.2.0
- * @see Examples
+ * @see Examples
*/
public class BigQueryWriteApiCommitedJsonItemWriterBuilder {
- private BigQueryWriteClient bigQueryWriteClient;
- private TableName tableName;
- private JsonObjectMarshaller marshaller;
- private ApiFutureCallback apiFutureCallback;
- private Executor executor;
-
- /**
- * GRPC client that will be responsible for communication with BigQuery.
- *
- * @param bigQueryWriteClient a client
- * @return {@link BigQueryWriteApiCommitedJsonItemWriterBuilder}
- * @see BigQueryWriteApiCommitedJsonItemWriter#setBigQueryWriteClient(BigQueryWriteClient)
- */
- public BigQueryWriteApiCommitedJsonItemWriterBuilder bigQueryWriteClient(final BigQueryWriteClient bigQueryWriteClient) {
- this.bigQueryWriteClient = bigQueryWriteClient;
- return this;
- }
-
- /**
- * A table name along with a full path.
- *
- * @param tableName a name
- * @return {@link BigQueryWriteApiCommitedJsonItemWriterBuilder}
- * @see BigQueryWriteApiCommitedJsonItemWriter#setTableName(TableName)
- */
- public BigQueryWriteApiCommitedJsonItemWriterBuilder tableName(final TableName tableName) {
- this.tableName = tableName;
- return this;
- }
-
- /**
- * Converts your DTO into a {@link String}.
- *
- * @param marshaller your mapper
- * @return {@link BigQueryWriteApiCommitedJsonItemWriterBuilder}
- * @see BigQueryWriteApiCommitedJsonItemWriter#setMarshaller(JsonObjectMarshaller)
- */
- public BigQueryWriteApiCommitedJsonItemWriterBuilder marshaller(final JsonObjectMarshaller marshaller) {
- this.marshaller = marshaller;
- return this;
- }
-
- /**
- * A {@link ApiFutureCallback} that will be called on successful or failed event.
- *
- * @param apiFutureCallback a callback
- * @return {@link BigQueryWriteApiCommitedJsonItemWriterBuilder}
- * @see BigQueryWriteApiCommitedJsonItemWriter#setApiFutureCallback(ApiFutureCallback)
- */
- public BigQueryWriteApiCommitedJsonItemWriterBuilder apiFutureCallback(final ApiFutureCallback apiFutureCallback) {
- this.apiFutureCallback = apiFutureCallback;
- return this;
- }
-
- /**
- * {@link Executor} that will be used for {@link ApiFutureCallback}.
- *
- * @param executor an executor
- * @return {@link BigQueryWriteApiCommitedJsonItemWriterBuilder}
- * @see BigQueryWriteApiCommitedJsonItemWriter#setExecutor(Executor)
- * @see BigQueryWriteApiCommitedJsonItemWriter#setApiFutureCallback(ApiFutureCallback)
- */
- public BigQueryWriteApiCommitedJsonItemWriterBuilder executor(final Executor executor) {
- this.executor = executor;
- return this;
- }
-
- /**
- * Please remember about {@link BigQueryWriteApiCommitedJsonItemWriter#afterPropertiesSet()}.
- *
- * @return {@link BigQueryWriteApiCommitedJsonItemWriter}
- * @throws IOException in case when {@link BigQueryWriteClient} failed to be created automatically
- */
- public BigQueryWriteApiCommitedJsonItemWriter build() throws IOException {
- BigQueryWriteApiCommitedJsonItemWriter writer = new BigQueryWriteApiCommitedJsonItemWriter<>();
-
- writer.setMarshaller(this.marshaller == null ? new JacksonJsonObjectMarshaller<>() : this.marshaller);
- writer.setBigQueryWriteClient(this.bigQueryWriteClient == null ? BigQueryWriteClient.create() : this.bigQueryWriteClient);
-
- if (apiFutureCallback != null) {
- writer.setApiFutureCallback(apiFutureCallback);
- writer.setExecutor(this.executor == null ? MoreExecutors.directExecutor() : this.executor);
- }
-
- writer.setTableName(tableName);
-
- return writer;
- }
+ private BigQueryWriteClient bigQueryWriteClient;
+
+ private TableName tableName;
+
+ private JsonObjectMarshaller marshaller;
+
+ private ApiFutureCallback apiFutureCallback;
+
+ private Executor executor;
+
+ /**
+ * GRPC client that will be responsible for communication with BigQuery.
+ * @param bigQueryWriteClient a client
+ * @return {@link BigQueryWriteApiCommitedJsonItemWriterBuilder}
+ * @see BigQueryWriteApiCommitedJsonItemWriter#setBigQueryWriteClient(BigQueryWriteClient)
+ */
+ public BigQueryWriteApiCommitedJsonItemWriterBuilder bigQueryWriteClient(
+ final BigQueryWriteClient bigQueryWriteClient) {
+ this.bigQueryWriteClient = bigQueryWriteClient;
+ return this;
+ }
+
+ /**
+ * A table name along with a full path.
+ * @param tableName a name
+ * @return {@link BigQueryWriteApiCommitedJsonItemWriterBuilder}
+ * @see BigQueryWriteApiCommitedJsonItemWriter#setTableName(TableName)
+ */
+ public BigQueryWriteApiCommitedJsonItemWriterBuilder tableName(final TableName tableName) {
+ this.tableName = tableName;
+ return this;
+ }
+
+ /**
+ * Converts your DTO into a {@link String}.
+ * @param marshaller your mapper
+ * @return {@link BigQueryWriteApiCommitedJsonItemWriterBuilder}
+ * @see BigQueryWriteApiCommitedJsonItemWriter#setMarshaller(JsonObjectMarshaller)
+ */
+ public BigQueryWriteApiCommitedJsonItemWriterBuilder marshaller(final JsonObjectMarshaller marshaller) {
+ this.marshaller = marshaller;
+ return this;
+ }
+
+ /**
+ * A {@link ApiFutureCallback} that will be called on successful or failed event.
+ * @param apiFutureCallback a callback
+ * @return {@link BigQueryWriteApiCommitedJsonItemWriterBuilder}
+ * @see BigQueryWriteApiCommitedJsonItemWriter#setApiFutureCallback(ApiFutureCallback)
+ */
+ public BigQueryWriteApiCommitedJsonItemWriterBuilder apiFutureCallback(
+ final ApiFutureCallback apiFutureCallback) {
+ this.apiFutureCallback = apiFutureCallback;
+ return this;
+ }
+
+ /**
+ * {@link Executor} that will be used for {@link ApiFutureCallback}.
+ * @param executor an executor
+ * @return {@link BigQueryWriteApiCommitedJsonItemWriterBuilder}
+ * @see BigQueryWriteApiCommitedJsonItemWriter#setExecutor(Executor)
+ * @see BigQueryWriteApiCommitedJsonItemWriter#setApiFutureCallback(ApiFutureCallback)
+ */
+ public BigQueryWriteApiCommitedJsonItemWriterBuilder executor(final Executor executor) {
+ this.executor = executor;
+ return this;
+ }
+
+ /**
+ * Please remember about
+ * {@link BigQueryWriteApiCommitedJsonItemWriter#afterPropertiesSet()}.
+ * @return {@link BigQueryWriteApiCommitedJsonItemWriter}
+ * @throws IOException in case when {@link BigQueryWriteClient} failed to be created
+ * automatically
+ */
+ public BigQueryWriteApiCommitedJsonItemWriter build() throws IOException {
+ BigQueryWriteApiCommitedJsonItemWriter writer = new BigQueryWriteApiCommitedJsonItemWriter<>();
+
+ writer.setMarshaller(this.marshaller == null ? new JacksonJsonObjectMarshaller<>() : this.marshaller);
+
+ writer.setBigQueryWriteClient(
+ this.bigQueryWriteClient == null ? BigQueryWriteClient.create() : this.bigQueryWriteClient);
+
+ if (apiFutureCallback != null) {
+ writer.setApiFutureCallback(apiFutureCallback);
+ writer.setExecutor(this.executor == null ? MoreExecutors.directExecutor() : this.executor);
+ }
+
+ writer.setTableName(tableName);
+
+ return writer;
+ }
}
diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/builder/BigQueryWriteApiPendingJsonItemWriterBuilder.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/builder/BigQueryWriteApiPendingJsonItemWriterBuilder.java
index 51986b4..000368a 100644
--- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/builder/BigQueryWriteApiPendingJsonItemWriterBuilder.java
+++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/builder/BigQueryWriteApiPendingJsonItemWriterBuilder.java
@@ -34,97 +34,102 @@
* @param your DTO type
* @author Volodymyr Perebykivskyi
* @since 0.2.0
- * @see Examples
+ * @see Examples
*/
public class BigQueryWriteApiPendingJsonItemWriterBuilder {
- private BigQueryWriteClient bigQueryWriteClient;
- private TableName tableName;
- private JsonObjectMarshaller marshaller;
- private ApiFutureCallback apiFutureCallback;
- private Executor executor;
-
- /**
- * GRPC client that will be responsible for communication with BigQuery.
- *
- * @param bigQueryWriteClient a client
- * @return {@link BigQueryWriteApiPendingJsonItemWriterBuilder}
- * @see BigQueryWriteApiPendingJsonItemWriter#setBigQueryWriteClient(BigQueryWriteClient)
- */
- public BigQueryWriteApiPendingJsonItemWriterBuilder bigQueryWriteClient(final BigQueryWriteClient bigQueryWriteClient) {
- this.bigQueryWriteClient = bigQueryWriteClient;
- return this;
- }
-
- /**
- * A table name along with a full path.
- *
- * @param tableName a name
- * @return {@link BigQueryWriteApiPendingJsonItemWriterBuilder}
- * @see BigQueryWriteApiPendingJsonItemWriter#setTableName(TableName)
- */
- public BigQueryWriteApiPendingJsonItemWriterBuilder tableName(final TableName tableName) {
- this.tableName = tableName;
- return this;
- }
-
- /**
- * Converts your DTO into a {@link String}.
- *
- * @param marshaller your mapper
- * @return {@link BigQueryWriteApiPendingJsonItemWriterBuilder}
- * @see BigQueryWriteApiPendingJsonItemWriter#setMarshaller(JsonObjectMarshaller)
- */
- public BigQueryWriteApiPendingJsonItemWriterBuilder marshaller(final JsonObjectMarshaller marshaller) {
- this.marshaller = marshaller;
- return this;
- }
-
- /**
- * A {@link ApiFutureCallback} that will be called on successful or failed event.
- *
- * @param apiFutureCallback a callback
- * @return {@link BigQueryWriteApiPendingJsonItemWriterBuilder}
- * @see BigQueryWriteApiPendingJsonItemWriter#setApiFutureCallback(ApiFutureCallback)
- */
- public BigQueryWriteApiPendingJsonItemWriterBuilder apiFutureCallback(final ApiFutureCallback apiFutureCallback) {
- this.apiFutureCallback = apiFutureCallback;
- return this;
- }
-
- /**
- * {@link Executor} that will be used for {@link ApiFutureCallback}.
- *
- * @param executor an executor
- * @return {@link BigQueryWriteApiPendingJsonItemWriterBuilder}
- * @see BigQueryWriteApiPendingJsonItemWriter#setExecutor(Executor)
- * @see BigQueryWriteApiPendingJsonItemWriter#setApiFutureCallback(ApiFutureCallback)
- */
- public BigQueryWriteApiPendingJsonItemWriterBuilder executor(final Executor executor) {
- this.executor = executor;
- return this;
- }
-
- /**
- * Please remember about {@link BigQueryWriteApiPendingJsonItemWriter#afterPropertiesSet()}.
- *
- * @return {@link BigQueryWriteApiPendingJsonItemWriter}
- * @throws IOException in case when {@link BigQueryWriteClient} failed to be created automatically
- */
- public BigQueryWriteApiPendingJsonItemWriter build() throws IOException {
- BigQueryWriteApiPendingJsonItemWriter writer = new BigQueryWriteApiPendingJsonItemWriter<>();
-
- writer.setMarshaller(this.marshaller == null ? new JacksonJsonObjectMarshaller<>() : this.marshaller);
- writer.setBigQueryWriteClient(this.bigQueryWriteClient == null ? BigQueryWriteClient.create() : this.bigQueryWriteClient);
-
- if (apiFutureCallback != null) {
- writer.setApiFutureCallback(apiFutureCallback);
- writer.setExecutor(this.executor == null ? MoreExecutors.directExecutor() : this.executor);
- }
-
- writer.setTableName(tableName);
-
- return writer;
- }
+ private BigQueryWriteClient bigQueryWriteClient;
+
+ private TableName tableName;
+
+ private JsonObjectMarshaller marshaller;
+
+ private ApiFutureCallback apiFutureCallback;
+
+ private Executor executor;
+
+ /**
+ * GRPC client that will be responsible for communication with BigQuery.
+ * @param bigQueryWriteClient a client
+ * @return {@link BigQueryWriteApiPendingJsonItemWriterBuilder}
+ * @see BigQueryWriteApiPendingJsonItemWriter#setBigQueryWriteClient(BigQueryWriteClient)
+ */
+ public BigQueryWriteApiPendingJsonItemWriterBuilder bigQueryWriteClient(
+ final BigQueryWriteClient bigQueryWriteClient) {
+ this.bigQueryWriteClient = bigQueryWriteClient;
+ return this;
+ }
+
+ /**
+ * A table name along with a full path.
+ * @param tableName a name
+ * @return {@link BigQueryWriteApiPendingJsonItemWriterBuilder}
+ * @see BigQueryWriteApiPendingJsonItemWriter#setTableName(TableName)
+ */
+ public BigQueryWriteApiPendingJsonItemWriterBuilder tableName(final TableName tableName) {
+ this.tableName = tableName;
+ return this;
+ }
+
+ /**
+ * Converts your DTO into a {@link String}.
+ * @param marshaller your mapper
+ * @return {@link BigQueryWriteApiPendingJsonItemWriterBuilder}
+ * @see BigQueryWriteApiPendingJsonItemWriter#setMarshaller(JsonObjectMarshaller)
+ */
+ public BigQueryWriteApiPendingJsonItemWriterBuilder marshaller(final JsonObjectMarshaller marshaller) {
+ this.marshaller = marshaller;
+ return this;
+ }
+
+ /**
+ * A {@link ApiFutureCallback} that will be called on successful or failed event.
+ * @param apiFutureCallback a callback
+ * @return {@link BigQueryWriteApiPendingJsonItemWriterBuilder}
+ * @see BigQueryWriteApiPendingJsonItemWriter#setApiFutureCallback(ApiFutureCallback)
+ */
+ public BigQueryWriteApiPendingJsonItemWriterBuilder apiFutureCallback(
+ final ApiFutureCallback apiFutureCallback) {
+ this.apiFutureCallback = apiFutureCallback;
+ return this;
+ }
+
+ /**
+ * {@link Executor} that will be used for {@link ApiFutureCallback}.
+ * @param executor an executor
+ * @return {@link BigQueryWriteApiPendingJsonItemWriterBuilder}
+ * @see BigQueryWriteApiPendingJsonItemWriter#setExecutor(Executor)
+ * @see BigQueryWriteApiPendingJsonItemWriter#setApiFutureCallback(ApiFutureCallback)
+ */
+ public BigQueryWriteApiPendingJsonItemWriterBuilder executor(final Executor executor) {
+ this.executor = executor;
+ return this;
+ }
+
+ /**
+ * Please remember about
+ * {@link BigQueryWriteApiPendingJsonItemWriter#afterPropertiesSet()}.
+ * @return {@link BigQueryWriteApiPendingJsonItemWriter}
+ * @throws IOException in case when {@link BigQueryWriteClient} failed to be created
+ * automatically
+ */
+ public BigQueryWriteApiPendingJsonItemWriter build() throws IOException {
+ BigQueryWriteApiPendingJsonItemWriter writer = new BigQueryWriteApiPendingJsonItemWriter<>();
+
+ writer.setMarshaller(this.marshaller == null ? new JacksonJsonObjectMarshaller<>() : this.marshaller);
+
+ writer.setBigQueryWriteClient(
+ this.bigQueryWriteClient == null ? BigQueryWriteClient.create() : this.bigQueryWriteClient);
+
+ if (apiFutureCallback != null) {
+ writer.setApiFutureCallback(apiFutureCallback);
+ writer.setExecutor(this.executor == null ? MoreExecutors.directExecutor() : this.executor);
+ }
+
+ writer.setTableName(tableName);
+
+ return writer;
+ }
}
diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/package-info.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/package-info.java
index d81b6e9..a539c9c 100644
--- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/package-info.java
+++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/package-info.java
@@ -19,13 +19,13 @@
*
* Supported types:
*