|
29 | 29 | import com.google.common.io.Resources; |
30 | 30 | import com.google.pubsub.v1.SubscriptionName; |
31 | 31 | import java.io.IOException; |
| 32 | +import java.math.BigDecimal; |
32 | 33 | import java.time.Duration; |
33 | 34 | import java.util.ArrayList; |
34 | 35 | import java.util.HashMap; |
|
44 | 45 | import org.apache.beam.it.jdbc.MySQLResourceManager; |
45 | 46 | import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor; |
46 | 47 | import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; |
| 48 | +import org.jline.utils.Log; |
47 | 49 | import org.junit.AfterClass; |
48 | 50 | import org.junit.Before; |
49 | 51 | import org.junit.Ignore; |
@@ -78,6 +80,7 @@ public class SpannerToSourceDbIT extends SpannerToSourceDbITBase { |
78 | 80 | private static final String TABLE_WITH_IDENTITY_COL = "TableWithIdentityColumn"; |
79 | 81 | private static final String BOUNDARY_CHECK_TABLE = |
80 | 82 | "testtable_03TpCoVF16ED0KLxM3v808cH3bTGQ0uK_FEXuZHbttvYZPAeGeqiO"; |
| 83 | + private static final String NUMERICAL_BOUNDARY_CHECK_TABLE = "BoundaryConversionTestTable"; |
81 | 84 | private static final HashSet<SpannerToSourceDbIT> testInstances = new HashSet<>(); |
82 | 85 | private static PipelineLauncher.LaunchInfo jobInfo; |
83 | 86 | public static SpannerResourceManager spannerResourceManager; |
@@ -460,4 +463,138 @@ private void assertIdentityColRowsInMySQLAfterInsert() { |
460 | 463 | assertThat(rows.get(1).get("id")).isEqualTo(2); |
461 | 464 | assertThat(rows.get(1).get("column1")).isEqualTo("id2"); |
462 | 465 | } |
| 466 | + |
| 467 | + @Test |
| 468 | + public void spannerToMysqlDbBoundaryValues() throws InterruptedException, IOException { |
| 469 | + assertThatPipeline(jobInfo).isRunning(); |
| 470 | + // Write row in Spanner |
| 471 | + writeNumericalBoundaryRowInSpanner(); |
| 472 | + // Assert events on Mysql |
| 473 | + assertNumericalBoundaryRowInMySQL(); |
| 474 | + } |
| 475 | + |
| 476 | + private void writeNumericalBoundaryRowInSpanner() { |
| 477 | + // Write a single record to Spanner |
| 478 | + Mutation maxValueRow = |
| 479 | + Mutation.newInsertOrUpdateBuilder(NUMERICAL_BOUNDARY_CHECK_TABLE) |
| 480 | + .set("varchar_column") |
| 481 | + .to("MaxBoundaryVarchar") |
| 482 | + .set("tinyint_column") |
| 483 | + .to(Byte.MAX_VALUE) |
| 484 | + .set("smallint_column") |
| 485 | + .to(Short.MAX_VALUE) |
| 486 | + .set("int_column") |
| 487 | + .to(Integer.MAX_VALUE) |
| 488 | + .set("bigint_column") |
| 489 | + .to(Long.MAX_VALUE) |
| 490 | + .set("float_column") |
| 491 | + .to(Float.POSITIVE_INFINITY) |
| 492 | + .set("double_column") |
| 493 | + .to(Double.POSITIVE_INFINITY) |
| 494 | + .set("decimal_column") |
| 495 | + .to(new BigDecimal("99999999999999999999999999999.999999999").toPlainString()) |
| 496 | + .set("bool_column") |
| 497 | + .to(Boolean.TRUE) |
| 498 | + .build(); |
| 499 | + spannerResourceManager.write(maxValueRow); |
| 500 | + |
| 501 | + Mutation minValueRow = |
| 502 | + Mutation.newInsertOrUpdateBuilder(NUMERICAL_BOUNDARY_CHECK_TABLE) |
| 503 | + .set("varchar_column") |
| 504 | + .to("MinBoundaryVarchar") |
| 505 | + .set("tinyint_column") |
| 506 | + .to(Byte.MIN_VALUE) |
| 507 | + .set("smallint_column") |
| 508 | + .to(Short.MIN_VALUE) |
| 509 | + .set("int_column") |
| 510 | + .to(Integer.MIN_VALUE) |
| 511 | + .set("bigint_column") |
| 512 | + .to(Long.MIN_VALUE) |
| 513 | + .set("float_column") |
| 514 | + .to(Float.NEGATIVE_INFINITY) |
| 515 | + .set("double_column") |
| 516 | + .to(Double.NEGATIVE_INFINITY) |
| 517 | + .set("decimal_column") |
| 518 | + .to(new BigDecimal("-99999999999999999999999999999.999999999").toPlainString()) |
| 519 | + .set("bool_column") |
| 520 | + .to(Boolean.FALSE) |
| 521 | + .build(); |
| 522 | + spannerResourceManager.write(minValueRow); |
| 523 | + |
| 524 | + Mutation nanValueRow = |
| 525 | + Mutation.newInsertOrUpdateBuilder(NUMERICAL_BOUNDARY_CHECK_TABLE) |
| 526 | + .set("varchar_column") |
| 527 | + .to("NanBoundaryVarchar") |
| 528 | + .set("tinyint_column") |
| 529 | + .to(Byte.MIN_VALUE) |
| 530 | + .set("smallint_column") |
| 531 | + .to(Short.MIN_VALUE) |
| 532 | + .set("int_column") |
| 533 | + .to(Integer.MIN_VALUE) |
| 534 | + .set("bigint_column") |
| 535 | + .to(Long.MIN_VALUE) |
| 536 | + .set("float_column") |
| 537 | + .to(Float.NaN) |
| 538 | + .set("double_column") |
| 539 | + .to(Double.NaN) |
| 540 | + .set("decimal_column") |
| 541 | + .to(new BigDecimal("-99999999999999999999999999999.999999999").toPlainString()) |
| 542 | + .set("bool_column") |
| 543 | + .to(Boolean.FALSE) |
| 544 | + .build(); |
| 545 | + spannerResourceManager.write(nanValueRow); |
| 546 | + } |
| 547 | + |
| 548 | + private void assertNumericalBoundaryRowInMySQL() throws InterruptedException { |
| 549 | + PipelineOperator.Result result = |
| 550 | + pipelineOperator() |
| 551 | + .waitForCondition( |
| 552 | + createConfig(jobInfo, TEST_TIMEOUT), |
| 553 | + () -> { |
| 554 | + Log.warn("Table row count: {}", jdbcResourceManager.getRowCount(NUMERICAL_BOUNDARY_CHECK_TABLE)); |
| 555 | + return jdbcResourceManager.getRowCount(NUMERICAL_BOUNDARY_CHECK_TABLE) == 3; |
| 556 | + }); |
| 557 | + assertThatResult(result).meetsConditions(); |
| 558 | + List<Map<String, Object>> rows = jdbcResourceManager.readTable(NUMERICAL_BOUNDARY_CHECK_TABLE); |
| 559 | +// assertThat(rows).hasSize(3); |
| 560 | + |
| 561 | + for (Map<String, Object> row : rows) { |
| 562 | + String varcharColumn = (String) row.get("varchar_column"); |
| 563 | + switch (varcharColumn) { |
| 564 | + case "MaxBoundaryVarchar": |
| 565 | + assertThat(row.get("tinyint_column")).isEqualTo(Byte.MAX_VALUE); |
| 566 | + assertThat(row.get("smallint_column")).isEqualTo(Short.MAX_VALUE); |
| 567 | + assertThat(row.get("int_column")).isEqualTo(Integer.MAX_VALUE); |
| 568 | + assertThat(row.get("bigint_column")).isEqualTo(Long.MAX_VALUE); |
| 569 | + assertThat(row.get("float_column")).isEqualTo(Float.POSITIVE_INFINITY); |
| 570 | + assertThat(row.get("double_column")).isEqualTo(Double.POSITIVE_INFINITY); |
| 571 | + assertThat(row.get("decimal_column")) |
| 572 | + .isEqualTo(new BigDecimal("99999999999999999999999999999.999999999")); |
| 573 | + assertThat(row.get("bool_column")).isEqualTo(Boolean.TRUE); |
| 574 | + break; |
| 575 | + case "MinBoundaryVarchar": |
| 576 | + assertThat(row.get("tinyint_column")).isEqualTo(Byte.MIN_VALUE); |
| 577 | + assertThat(row.get("smallint_column")).isEqualTo(Short.MIN_VALUE); |
| 578 | + assertThat(row.get("int_column")).isEqualTo(Integer.MIN_VALUE); |
| 579 | + assertThat(row.get("bigint_column")).isEqualTo(Long.MIN_VALUE); |
| 580 | + assertThat(row.get("float_column")).isEqualTo(Float.NEGATIVE_INFINITY); |
| 581 | + assertThat(row.get("double_column")).isEqualTo(Double.NEGATIVE_INFINITY); |
| 582 | + assertThat(row.get("decimal_column")) |
| 583 | + .isEqualTo(new BigDecimal("-99999999999999999999999999999.999999999")); |
| 584 | + assertThat(row.get("bool_column")).isEqualTo(Boolean.FALSE); |
| 585 | + break; |
| 586 | + case "NanBoundaryVarchar": |
| 587 | + assertThat(row.get("tinyint_column")).isEqualTo(Byte.MIN_VALUE); |
| 588 | + assertThat(row.get("smallint_column")).isEqualTo(Short.MIN_VALUE); |
| 589 | + assertThat(row.get("int_column")).isEqualTo(Integer.MIN_VALUE); |
| 590 | + assertThat(row.get("bigint_column")).isEqualTo(Long.MIN_VALUE); |
| 591 | + assertThat(row.get("float_column")).isEqualTo(Float.NaN); |
| 592 | + assertThat(row.get("double_column")).isEqualTo(Double.NaN); |
| 593 | + assertThat(row.get("decimal_column")) |
| 594 | + .isEqualTo(new BigDecimal("-99999999999999999999999999999.999999999")); |
| 595 | + assertThat(row.get("bool_column")).isEqualTo(Boolean.FALSE); |
| 596 | + break; |
| 597 | + } |
| 598 | + } |
| 599 | + } |
463 | 600 | } |
0 commit comments