Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequ
try {
sourceTableName = schemaMapper.getSourceTableName("", spannerTableName);
} catch (NoSuchElementException e) {
LOG.warn(
"The spanner table {} was not found in source schema mapping. Source tables: {}",
spannerTableName, String.join(", ", sourceSchema.tables().keySet()));
return new DMLGeneratorResponse("");
}
com.google.cloud.teleport.v2.spanner.sourceddl.SourceTable sourceTable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public static boolean processRecord(
.build();

DMLGeneratorResponse dmlGeneratorResponse = dmlGenerator.getDMLStatement(dmlGeneratorRequest);
LOG.warn("Generated DML statements: " + dmlGeneratorResponse.getDmlStatement());
if (dmlGeneratorResponse.getDmlStatement().isEmpty()) {
LOG.warn("DML statement is empty for table: " + tableName);
return false;
Expand Down Expand Up @@ -146,6 +147,7 @@ public static boolean processRecord(
lagMetric.update(replicationLag); // update the lag metric
return false;
} catch (Exception e) {
LOG.warn("Exception in DML: " + e.getMessage());
// Not logging the error here since the error can be retryable error and high number of them
// could have side effects on the pipeline execution.
throw e; // throw the original exception since it needs to go to DLQ
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.common.io.Resources;
import com.google.pubsub.v1.SubscriptionName;
import java.io.IOException;
import java.math.BigDecimal;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -44,6 +45,7 @@
import org.apache.beam.it.jdbc.MySQLResourceManager;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.jline.utils.Log;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Ignore;
Expand Down Expand Up @@ -78,6 +80,7 @@ public class SpannerToSourceDbIT extends SpannerToSourceDbITBase {
private static final String TABLE_WITH_IDENTITY_COL = "TableWithIdentityColumn";
private static final String BOUNDARY_CHECK_TABLE =
"testtable_03TpCoVF16ED0KLxM3v808cH3bTGQ0uK_FEXuZHbttvYZPAeGeqiO";
private static final String NUMERICAL_BOUNDARY_CHECK_TABLE = "BoundaryConversionTestTable";
private static final HashSet<SpannerToSourceDbIT> testInstances = new HashSet<>();
private static PipelineLauncher.LaunchInfo jobInfo;
public static SpannerResourceManager spannerResourceManager;
Expand Down Expand Up @@ -460,4 +463,138 @@ private void assertIdentityColRowsInMySQLAfterInsert() {
assertThat(rows.get(1).get("id")).isEqualTo(2);
assertThat(rows.get(1).get("column1")).isEqualTo("id2");
}

@Test
public void spannerToMysqlDbBoundaryValues() throws InterruptedException, IOException {
assertThatPipeline(jobInfo).isRunning();
// Write row in Spanner
writeNumericalBoundaryRowInSpanner();
// Assert events on Mysql
assertNumericalBoundaryRowInMySQL();
}

private void writeNumericalBoundaryRowInSpanner() {
// Write a single record to Spanner
Mutation maxValueRow =
Mutation.newInsertOrUpdateBuilder(NUMERICAL_BOUNDARY_CHECK_TABLE)
.set("varchar_column")
.to("MaxBoundaryVarchar")
.set("tinyint_column")
.to(Byte.MAX_VALUE)
.set("smallint_column")
.to(Short.MAX_VALUE)
.set("int_column")
.to(Integer.MAX_VALUE)
.set("bigint_column")
.to(Long.MAX_VALUE)
.set("float_column")
.to(Float.POSITIVE_INFINITY)
.set("double_column")
.to(Double.POSITIVE_INFINITY)
.set("decimal_column")
.to(new BigDecimal("99999999999999999999999999999.999999999").toPlainString())
.set("bool_column")
.to(Boolean.TRUE)
.build();
spannerResourceManager.write(maxValueRow);

Mutation minValueRow =
Mutation.newInsertOrUpdateBuilder(NUMERICAL_BOUNDARY_CHECK_TABLE)
.set("varchar_column")
.to("MinBoundaryVarchar")
.set("tinyint_column")
.to(Byte.MIN_VALUE)
.set("smallint_column")
.to(Short.MIN_VALUE)
.set("int_column")
.to(Integer.MIN_VALUE)
.set("bigint_column")
.to(Long.MIN_VALUE)
.set("float_column")
.to(Float.NEGATIVE_INFINITY)
.set("double_column")
.to(Double.NEGATIVE_INFINITY)
.set("decimal_column")
.to(new BigDecimal("-99999999999999999999999999999.999999999").toPlainString())
.set("bool_column")
.to(Boolean.FALSE)
.build();
spannerResourceManager.write(minValueRow);

Mutation nanValueRow =
Mutation.newInsertOrUpdateBuilder(NUMERICAL_BOUNDARY_CHECK_TABLE)
.set("varchar_column")
.to("NanBoundaryVarchar")
.set("tinyint_column")
.to(Byte.MIN_VALUE)
.set("smallint_column")
.to(Short.MIN_VALUE)
.set("int_column")
.to(Integer.MIN_VALUE)
.set("bigint_column")
.to(Long.MIN_VALUE)
.set("float_column")
.to(Float.NaN)
.set("double_column")
.to(Double.NaN)
.set("decimal_column")
.to(new BigDecimal("-99999999999999999999999999999.999999999").toPlainString())
.set("bool_column")
.to(Boolean.FALSE)
.build();
spannerResourceManager.write(nanValueRow);
}

private void assertNumericalBoundaryRowInMySQL() throws InterruptedException {
PipelineOperator.Result result =
pipelineOperator()
.waitForCondition(
createConfig(jobInfo, TEST_TIMEOUT),
() -> {
LOG.warn("Table row count: {}", jdbcResourceManager.getRowCount(NUMERICAL_BOUNDARY_CHECK_TABLE));
return jdbcResourceManager.getRowCount(NUMERICAL_BOUNDARY_CHECK_TABLE) == 3;
});
assertThatResult(result).meetsConditions();
List<Map<String, Object>> rows = jdbcResourceManager.readTable(NUMERICAL_BOUNDARY_CHECK_TABLE);
// assertThat(rows).hasSize(3);

for (Map<String, Object> row : rows) {
String varcharColumn = (String) row.get("varchar_column");
switch (varcharColumn) {
case "MaxBoundaryVarchar":
assertThat(row.get("tinyint_column")).isEqualTo(Byte.MAX_VALUE);
assertThat(row.get("smallint_column")).isEqualTo(Short.MAX_VALUE);
assertThat(row.get("int_column")).isEqualTo(Integer.MAX_VALUE);
assertThat(row.get("bigint_column")).isEqualTo(Long.MAX_VALUE);
assertThat(row.get("float_column")).isEqualTo(Float.POSITIVE_INFINITY);
assertThat(row.get("double_column")).isEqualTo(Double.POSITIVE_INFINITY);
assertThat(row.get("decimal_column"))
.isEqualTo(new BigDecimal("99999999999999999999999999999.999999999"));
assertThat(row.get("bool_column")).isEqualTo(Boolean.TRUE);
break;
case "MinBoundaryVarchar":
assertThat(row.get("tinyint_column")).isEqualTo(Byte.MIN_VALUE);
assertThat(row.get("smallint_column")).isEqualTo(Short.MIN_VALUE);
assertThat(row.get("int_column")).isEqualTo(Integer.MIN_VALUE);
assertThat(row.get("bigint_column")).isEqualTo(Long.MIN_VALUE);
assertThat(row.get("float_column")).isEqualTo(Float.NEGATIVE_INFINITY);
assertThat(row.get("double_column")).isEqualTo(Double.NEGATIVE_INFINITY);
assertThat(row.get("decimal_column"))
.isEqualTo(new BigDecimal("-99999999999999999999999999999.999999999"));
assertThat(row.get("bool_column")).isEqualTo(Boolean.FALSE);
break;
case "NanBoundaryVarchar":
assertThat(row.get("tinyint_column")).isEqualTo(Byte.MIN_VALUE);
assertThat(row.get("smallint_column")).isEqualTo(Short.MIN_VALUE);
assertThat(row.get("int_column")).isEqualTo(Integer.MIN_VALUE);
assertThat(row.get("bigint_column")).isEqualTo(Long.MIN_VALUE);
assertThat(row.get("float_column")).isEqualTo(Float.NaN);
assertThat(row.get("double_column")).isEqualTo(Double.NaN);
assertThat(row.get("decimal_column"))
.isEqualTo(new BigDecimal("-99999999999999999999999999999.999999999"));
assertThat(row.get("bool_column")).isEqualTo(Boolean.FALSE);
break;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,15 @@ CREATE TABLE Users (
column1 VARCHAR(25),
PRIMARY KEY(id)
);

CREATE TABLE BoundaryConversionTestTable (
varchar_column VARCHAR(100) PRIMARY KEY,
tinyint_column tinyint,
smallint_column smallint,
int_column int,
bigint_column bigint,
float_column float,
double_column double,
decimal_column decimal,
bool_column boolean
);
Loading
Loading